博客
关于我
RocketMQ 源码解析 —— 调试环境搭建
阅读量:796 次
发布时间:2023-03-22

本文共 4614 字,大约阅读时间需要 15 分钟。

RocketMQ 本地开发环境搭建与调试指南

搭建 RocketMQ 本地开发环境并进行调试,是理解 RocketMQ 内部工作机制的基础步骤。本文将详细介绍如何在本地环境中配置并启动 RocketMQ 的各个组件。


1. 依赖工具

在开始操作之前,确保你的开发环境已经准备好。以下是所需工具的版本要求:

  • JDK:版本 1.8 或更高
  • Maven:最新版本
  • IntelliJ IDEA:推荐使用

2. 源码拉取

首先,我们需要从 RocketMQ 的官方仓库拉取最新的源码。建议在 GitHub 上找到对应的仓库并进行 Fork,原因是我们可能会对源码进行修改和注释。

  • 克隆仓库:

    git clone https://github.com/apache/rocketmq.git
  • 拉取最新代码:

    cd rocketmq
    git pull origin master
  • 使用 IntelliJ IDEA 打开项目。在导入项目时,确保选择正确的 Maven 工作目录。


  • 3. 启动 RocketMQ Namesrv

    Namesrv 是 RocketMQ 的配置中心,负责管理 topics 和 brokers 的信息。我们可以通过单元测试类 org.apache.rocketmq.namesrv.NameServerInstanceTest 来启动 Namesrv。

  • 打开类并添加主函数:

    public static void main(String[] args) throws Exception {
    NamesrvConfig namesrvConfig = new NamesrvConfig();
    NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876); // 设置 Namesrv 监听端口
    NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
    namesrvController.initialize();
    namesrvController.start();
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
    }
  • 运行测试类:

    右键点击主函数,选择 运行,Namesrv 将会启动。

  • 验证启动状态:

    打开终端,输入命令:

    telnet 127.0.0.1 9876

    如果连接成功,说明 Namesrv 已经启动。


  • 4. 启动 RocketMQ Broker

    Broker 是 RocketMQ 的消息存储和处理核心。我们可以通过单元测试类 org.apache.rocketmq.broker.BrokerControllerTest 来启动 Broker。

  • 打开类并添加主函数:

    public static void main(String[] args) throws Exception {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(10911); // 设置 Broker 监听端口
    BrokerConfig brokerConfig = new BrokerConfig();
    brokerConfig.setBrokerName("broker-a");
    brokerConfig.setNamesrvAddr("127.0.0.1:9876");
    MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    messageStoreConfig.setDeleteWhen("04");
    messageStoreConfig.setFileReservedTime(48);
    messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
    messageStoreConfig.setDuplicationEnable(false);
    BrokerPathConfigHelper.setBrokerConfigPath("/path/to/broker.conf");
    BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, messageStoreConfig);
    brokerController.initialize();
    brokerController.start();
    System.out.println("Broker Started.");
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
    }
  • 运行测试类:

    右键点击主函数,选择 运行,Broker 将会启动。

  • 验证 Broker 启动状态:

    打开终端,输入命令:

    telnet 127.0.0.1 10911

    如果连接成功,说明 Broker 已经启动。


  • 5. 启动 RocketMQ Producer

    Producer 是向 RocketMQ Broker 发送消息的客户端。我们可以通过示例类 org.apache.rocketmq.example.quickstart.Producer 来运行。

  • 打开类并添加主函数:

    public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("producer-group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 1000; i++) {
    try {
    Message msg = new Message("topic-test", "tag-a", "hello-rocketmq" + i);
    SendResult sendResult = producer.send(msg);
    System.out.println(sendResult);
    } catch (Exception e) {
    e.printStackTrace();
    Thread.sleep(1000);
    }
    }
    producer.shutdown();
    }
    }
  • 运行测试类:

    右键点击主函数,选择 运行,Producer 将会启动。

  • 验证消息发送状态:

    查看控制台输出,确认消息是否成功发送到 RocketMQ Broker。


  • 6. 启动 RocketMQ Consumer

    Consumer 是从 RocketMQ Broker 中读取消息的客户端。我们可以通过示例类 org.apache.rocketmq.example.quickstart.Consumer 来运行。

  • 打开类并添加主函数:

    public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("topic-test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List
    msgs, ConsumeConcurrentlyContext context) {
    System.out.printf("%s Receive New Messages: %s\n", Thread.currentThread().getName(), msgs);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.println("Consumer Started.");
    }
    }
  • 运行测试类:

    右键点击主函数,选择 运行,Consumer 将会启动。

  • 验证消息消费状态:

    查看控制台输出,确认是否有消息被成功消费。


  • 666. 彩蛋

    看到这里你是不是想说,搭建 RocketMQ 环境还挺有趣的?别忘了阅读 「0. 友情提示」 中提到的文档,先熟悉 RocketMQ 的原理,再来深入源码研究。源码是原理的具象化,原理是代码的抽象化。


    欢迎加入我的知识星球,一起探讨架构和源码。扫描下方二维码,回复 666 关键字获取更多技术内容。

    转载地址:http://wsqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现唯一路径问题的回溯方法的算法(附完整源码)
    查看>>
    Objective-C实现四舍五入(附完整源码)
    查看>>
    Objective-C实现四阶龙格库塔法(附完整源码)
    查看>>
    Objective-C实现四阶龙格库塔法(附完整源码)
    查看>>
    Objective-C实现回调实例(附完整源码)
    查看>>
    Objective-C实现图-弗洛伊德FloydWarshall算法(附完整源码)
    查看>>
    Objective-C实现图书借阅系统(附完整源码)
    查看>>
    Objective-C实现图像二维熵的图像信号丢失检测(附完整源码)
    查看>>
    Objective-C实现图像去雾算法(附完整源码)
    查看>>
    Objective-C实现图像灰度变换(附完整源码)
    查看>>
    Objective-C实现图像移动(附完整源码)
    查看>>
    Objective-C实现图层混合算法(附完整源码)
    查看>>
    Objective-C实现图片erosion operation侵蚀操作算法(附完整源码)
    查看>>
    Objective-C实现图片的放大缩小(附完整源码)
    查看>>
    Objective-C实现图片腐蚀(附完整源码)
    查看>>
    Objective-C实现图片膨胀(附完整源码)
    查看>>
    Objective-C实现图的邻接矩阵(附完整源码)
    查看>>
    Objective-C实现圆球的表面积和体积(附完整源码)
    查看>>
    Objective-C实现在Regex的帮助下检查字谜算法(附完整源码)
    查看>>
    Objective-C实现均值滤波(附完整源码)
    查看>>