当前位置:首页 > 生活百科

rocketmq源码解析(rocketmq源码部署)

栏目:生活百科日期:2025-03-19浏览:0

本文主要分析RocketMQ中如何保证消息有序的。

RocketMQ的版本为:4.2.0 release。

一.时序图

还是老规矩,先把分析过程的时序图摆出来:

1.Producer发送顺序消息

2.Consumer接收顺序消息(一)

3.Consumer接收顺序消息(二)

二.源码分析 &– Producer发送顺序消息

1 DefaultMQProducer#send:发送消息,入参中有自定义的消息队列选择器。

 // DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }

1.1 DefaultMQProducerImpl#makeSureStateOK:确保Producer的状态是运行状态-ServiceState.RUNNING。

 // DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "+ this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }

1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:根据Topic获取发布Topic用到的路由信息。

 // DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer更新获取,false,不传入 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由信息而且状态OK,则返回 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }

1.3 调用自定义消息队列选择器的select方法。

 // DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List&<MessageQueue&> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);

1.4 DefaultMQProducerImpl#sendKernelImpl:发送消息的核心实现方法。

 // DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout &< costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......

1.4.1 MQClientAPIImpl#sendMessage:发送消息。

 // MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 根据发送消息的模式(同步/异步)选择不同的方式,默认是同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis &< costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......

1.4.1.1 MQClientAPIImpl#sendMessageSync:发送同步消息。

 // MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }

1.4.1.1.1 NettyRemotingClient#invokeSync:构造RemotingCommand,调用的方式是同步。

 // NettyRemotingClient#invokeSync  RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;

三.源码分析 &– Consumer接收顺序消息(一)

1 DefaultMQPushConsumer#registerMessageListener:把Consumer传入的消息监听器加入到messageListener中。

 // DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }

1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的消息监听器加入到messageListenerInner中。

 // DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }

2 DefaultMQPushConsumer#start:启动Consumer。

 // DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }

2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl。

 // DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 刚刚创建 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 有序消息服务 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并发无序消息服务 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 启动消息服务 ...... mQClientFactory.start();// 启动MQClientInstance ......

2.1.1 new
ConsumeMessageOrderlyService():构造顺序消息服务。

 // ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue&<Runnable&>(); this.consumeExecutor = new ThreadPoolExecutor(// 主消息消费线程池,正常执行收到的ConsumeRequest。多线程 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }

2.1.2
ConsumeMessageOrderlyService#start:启动消息队列客户端实例。

 // DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 定时向broker发送批量锁住当前正在消费的队列集合的消息 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:定时向broker发送批量锁住当前正在消费的队列集合的消息。

2.1.2.1.1 RebalanceImpl#lockAll:锁住所有正在消息的队列。

 // ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap&<String, Set&<MessageQueue&>&> brokerMqs = this.buildProcessQueueTableByBrokerName();// 根据brokerName从processQueueTable获取正在消费的队列集合 ...... Set&<MessageQueue&> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker发送锁住消息队列的指令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......

2.1.3 MQClientInstance#start:启动MQClientInstance。过程较复杂,放到大标题四中分析。

 // DefaultMQPushConsumerImpl#start mQClientFactory.start();

四.源码分析 &– Consumer接收顺序消息(二)

1 MQClientInstance#start:启动客户端实例MQClientInstance。

 // MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 启动拉取消息服务 this.pullMessageService.start(); // Start rebalance service 启动消费端负载均衡服务 this.rebalanceService.start(); ......

1.1 PullMessageService#run:启动拉取消息服务。实际调用的是DefaultMQPushConsumerImpl的pullMessage方法。

 // PullMessageService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }

1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。提交到
ConsumeMessageOrderlyService的线程池consumeExecutor中执行。

 // DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......

1.1.1.1.1.1.1 ConsumeRequest#run:处理消息消费的线程。

 // ConsumeMessageOrderlyService.ConsumeRequest#run List&<MessageExt&> msgs = this.processQueue.takeMessags(consumeBatchSize); ...... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 实际消费消息的地方,回调消息监听器的consumeMessage方法 } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs,messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } ......

1.2 RebalanceService#run:启动消息端负载均衡服务。

 // RebalanceService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } // MQClientInstance#doRebalance public void doRebalance() { for (Map.Entry&<String, MQConsumerInner&> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } } // DefaultMQPushConsumerImpl#doRebalance public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }

1.2.1.1.1 RebalanceImpl#doRebalance:负载均衡服务类处理。

 // RebalanceImpl#doRebalance public void doRebalance(final boolean isOrder) { Map&<String, SubscriptionData&> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry&<String, SubscriptionData&> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); } // RebalanceImpl#rebalanceByTopic switch (messageModel) { case BROADCASTING: { Set&<MessageQueue&> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 根据Toipc去除queue if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { ...... // RebalanceImpl#updateProcessQueueTableInRebalance this.dispatchPullRequest(pullRequestList);// RebalancePushImpl分发消息

1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发。

 // RebalancePushImpl#dispatchPullRequest public void dispatchPullRequest(List&<PullRequest&> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }

五.总结

相比Producer的发送流程,Consumer的接收流程稍微复杂一点。通过上面的源码分析,可以知道RocketMQ是怎样保证消息的有序的:

1.通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。设置本地队列ProcessQueue的locked属性为true。保证broker中的每个消息队列只对应一个消费端;

2.另外,消费端也是通过锁,保证每个ProcessQueue只有一个线程消费。

“rocketmq源码解析(rocketmq源码部署)” 的相关文章

修手机一个月能挣多少,带你了解手机维修的利润

随着技术的提高,人手一台智能手机已成为现实。原料价格的下降也使得手机售价趋于平价化,一旦手机出现故障问题,很多人第一想法就是——换。加上排队、维修等方面的时间耗...

利用SEO快速打造高权重网站的方法,这5点缺一不可!

大家好,我是谢盼龙,做了几年的SEO下来,我积累的经验也很多,尤其是对高权重网站打造的一些技巧,熟记于心。也是因为这些技巧才让我第一次打造出了权重4-6的网站,...

全国联保是什么意思,全国联保和店铺保修的区别介绍

2019年11月30日手机报价已出,大家可以看一下,手机报价基本没什么变化,涨跌也不是很多,大家了解一下,让你买手机不再花冤枉钱!好多人在我这里拿手机都会问,手...

ipad闹钟声音小怎么办(苹果12闹钟铃声大小设置)

苹果今天为开发人员和公测用户推送了iOS14和iPadOS14的第六个Beta版本,目的是更新和完善该软件中包含的某些功能。随着Beta测试期的推进,更新变得越...

mysql操作语句大全(mysql的使用教程)

前言学习一个新知识最好的方式就是上官网,所以我先把官网贴出来MySQL官网(点击查阅),如果大家有想了解我没有说到的东西可以直接上官网看哈~目前MySQL最新大...

什么地摊生意好做,18种最赚钱的地摊生意

做地摊生意卖什么最赚钱?这是地摊商贩们经常焦头烂额的事,不过选择卖什么并不是最关键的,关键在于在什么位置卖,不同选址消费的人群是不一样的,只要两者完美契合,自己...

大学都有什么专业,最有前景的11个专业推荐

普通高校招生专业主要分为11个学科门类,有些专业大类备受考生喜爱,有专业大类无人问津。其中在热门的专业大类中也有备受“冷落”的专业,具体情况如何呢?本文结合近年...

国产手机排名前几的是哪些(最新国产手机排名)

以出货量来排名的话,国内四大手机厂商分别是小米、OPPO、ViVO以及华为,而如今它们已经相继发布了旗下最顶级的旗舰手机,分别是华为Mate40Pro、小米11...

excel数据有效性序列怎么设置(数据有效性设置下拉选项)

数据有效性是对单元格或单元格区域输入的数据从内容到数量上的限制。该功能可以用来确保输入数据的正确性。设置方法也很简单,按照下面的设置方法即可。1、打开excel...

excel怎么批量向下复制相同内容(表格太长快速复制的方

在日常办公中,最常用的办公软件之一便是Excel表格,它统计数据的公式非常的方便实用,在一个表格中经常会碰到应用同个公式的情况,那如何快速复制公式?接下来,小编...