在使用rocket mq 的时候,接触最多的还是consumer端。在实际使用的时候,踩过不少坑,如订阅关系不一致、广播消息重复消费等,因此,是时候对rocket mq的源码进行一下分析了。
当consumer订阅topic时,例如 consumer.subscribe(“TopicTest”, “*”); 实际上,源码里只是将订阅消息放到rebalanceImpl对象的subscriptionInner(map类型)里。该map的key为topic。因此,如果一个comsumer需要订阅多个topic,只需要连续调用subscribe。 例如: consumer.subscribe(“TopicTest1”, “*”); consumer.subscribe(“TopicTest2”, “*”);
MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。
MQClientInstance定期updateTopicRouteInfoFromNameServer,从nameserver获取broker信息。更新brokerAddrTable,更新updateTopicPublishInfo,最后更新topicRouteTable。
MQClientInstanc定期 persistConsumerOffset,从rebalanceImpl.getProcessQueueTable() 获取消费进度,然后执行offsetStore.persistAll()。其中集群消费为RemoteBrokerOffsetStore,他会同步消费进度到broker,广播消费为LocalFileOffsetStore,他只会将消费进度保存到隐藏本地文件中。
RebalanceService比较复杂。RebalanceService如果服务不停,周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将信息加入到阻塞队列pullRequestQueue。
PullMessageService 如果服务不停,不断的从阻塞队列pullRequestQueue获取消息,然后基于该信息从broker拉取消息。
1、MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。
2、MQClientInstance定期updateTopicRouteInfoFromNameServer(关键方法),从通过getTopicRouteInfoFromNameServer 从nameserver获取TopicRouteData信息。然后判断是否有变化,有变化则更新topicRouteTable 里的TopicRouteData
某个topic对应的TopicRouteData内容如下,主要包括queueDatas和brokerDatas:
TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=taobaodaily-01, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0], QueueData [brokerName=taobaodaily-03, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]], brokerDatas=[BrokerData [brokerName=taobaodaily-03, brokerAddrs={0=100.81.165.119:10911}], BrokerData [brokerName=taobaodaily-01, brokerAddrs={0=10.218.141.54:10911}]], filterServerTable={}]3、通过topicRouteData2TopicSubscribeInfo 将topic 和TopicRouteData转换为Set《MessageQueue》,MessageQueue里维护topic、brokerName和queueId的信息,并同步到rebalanceImpl 的 topicSubscribeInfoTable里,如下:
[MessageQueue [topic=TBW102, brokerName=taobaodaily-03, queueId=0]4、更新brokerAddrTable
{taobaodaily-01={0=10.218.141.54:10911}, taobaodaily-03={0=100.81.165.119:10911}}5、RebalanceService 依次调用RebalanceService doRebalance ->rebalanceByTopic ->findConsumerIdList ->updateTopicRouteInfoFromNameServer 触发上面流程。另一方面,RebalanceService周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。然后调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将PullRequest信息加入到阻塞队列pullRequestQueue
6、PullMessageService不断的从阻塞队列pullRequestQueue获取消息。然后依次调用mQClientFactory.getMQClientAPIImpl().pullMessage和pullAPIWrapper.pullKernelImp**l,根据message queue和offset拉取消息,成功后通过, 异步回调 **PullCallback.onSuccess,返回pullResult,里面包括获取的msgFoundList。然后依次调用pullAPIWrapper.processPullResult和consumeMessageService.submitConsumeRequest,构造ConsumeRequest对象并提交到consumeExecutor线程池上。
7、在consumeExecutor线程池上,ConsumeRequest类的run方法从processQueue获取待处理的消息 并调用listener.consumeMessage(用户注册的handler)消费消息。整个消费流程完成。