本文共 4614 字,大约阅读时间需要 15 分钟。
搭建 RocketMQ 本地开发环境并进行调试,是理解 RocketMQ 内部工作机制的基础步骤。本文将详细介绍如何在本地环境中配置并启动 RocketMQ 的各个组件。
在开始操作之前,确保你的开发环境已经准备好。以下是所需工具的版本要求:
首先,我们需要从 RocketMQ 的官方仓库拉取最新的源码。建议在 GitHub 上找到对应的仓库并进行 Fork,原因是我们可能会对源码进行修改和注释。
克隆仓库:
git clone https://github.com/apache/rocketmq.git
拉取最新代码:
cd rocketmqgit pull origin master
使用 IntelliJ IDEA 打开项目。在导入项目时,确保选择正确的 Maven 工作目录。
Namesrv 是 RocketMQ 的配置中心,负责管理 topics 和 brokers 的信息。我们可以通过单元测试类 org.apache.rocketmq.namesrv.NameServerInstanceTest 来启动 Namesrv。
打开类并添加主函数:
public static void main(String[] args) throws Exception { NamesrvConfig namesrvConfig = new NamesrvConfig(); NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // 设置 Namesrv 监听端口 NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); namesrvController.initialize(); namesrvController.start(); Thread.sleep(DateUtils.MILLIS_PER_DAY);} 运行测试类:
右键点击主函数,选择运行,Namesrv 将会启动。 验证启动状态:
打开终端,输入命令:telnet 127.0.0.1 9876
如果连接成功,说明 Namesrv 已经启动。
Broker 是 RocketMQ 的消息存储和处理核心。我们可以通过单元测试类 org.apache.rocketmq.broker.BrokerControllerTest 来启动 Broker。
打开类并添加主函数:
public static void main(String[] args) throws Exception { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(10911); // 设置 Broker 监听端口 BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setBrokerName("broker-a"); brokerConfig.setNamesrvAddr("127.0.0.1:9876"); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setDeleteWhen("04"); messageStoreConfig.setFileReservedTime(48); messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); messageStoreConfig.setDuplicationEnable(false); BrokerPathConfigHelper.setBrokerConfigPath("/path/to/broker.conf"); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, messageStoreConfig); brokerController.initialize(); brokerController.start(); System.out.println("Broker Started."); Thread.sleep(DateUtils.MILLIS_PER_DAY);} 运行测试类:
右键点击主函数,选择运行,Broker 将会启动。 验证 Broker 启动状态:
打开终端,输入命令:telnet 127.0.0.1 10911
如果连接成功,说明 Broker 已经启动。
Producer 是向 RocketMQ Broker 发送消息的客户端。我们可以通过示例类 org.apache.rocketmq.example.quickstart.Producer 来运行。
打开类并添加主函数:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("topic-test", "tag-a", "hello-rocketmq" + i); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }} 运行测试类:
右键点击主函数,选择运行,Producer 将会启动。 验证消息发送状态:
查看控制台输出,确认消息是否成功发送到 RocketMQ Broker。Consumer 是从 RocketMQ Broker 中读取消息的客户端。我们可以通过示例类 org.apache.rocketmq.example.quickstart.Consumer 来运行。
打开类并添加主函数:
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topic-test", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s\n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }} 运行测试类:
右键点击主函数,选择运行,Consumer 将会启动。 验证消息消费状态:
查看控制台输出,确认是否有消息被成功消费。看到这里你是不是想说,搭建 RocketMQ 环境还挺有趣的?别忘了阅读 「0. 友情提示」 中提到的文档,先熟悉 RocketMQ 的原理,再来深入源码研究。源码是原理的具象化,原理是代码的抽象化。
欢迎加入我的知识星球,一起探讨架构和源码。扫描下方二维码,回复 666 关键字获取更多技术内容。
转载地址:http://wsqfk.baihongyu.com/