Rocket MQ consumer 源码分析(绝对干货)

xiaoxiao2021-02-27  271

在使用rocket mq 的时候,接触最多的还是consumer端。在实际使用的时候,踩过不少坑,如订阅关系不一致、广播消息重复消费等,因此,是时候对rocket mq的源码进行一下分析了。

一、消息订阅

当consumer订阅topic时,例如 consumer.subscribe(“TopicTest”, “*”); 实际上,源码里只是将订阅消息放到rebalanceImpl对象的subscriptionInner(map类型)里。该map的key为topic。因此,如果一个comsumer需要订阅多个topic,只需要连续调用subscribe。 例如: consumer.subscribe(“TopicTest1”, “*”); consumer.subscribe(“TopicTest2”, “*”);

二、主要线程分析

MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。

MQClientInstance定期updateTopicRouteInfoFromNameServer,从nameserver获取broker信息。更新brokerAddrTable,更新updateTopicPublishInfo,最后更新topicRouteTable。

MQClientInstanc定期 persistConsumerOffset,从rebalanceImpl.getProcessQueueTable() 获取消费进度,然后执行offsetStore.persistAll()。其中集群消费为RemoteBrokerOffsetStore,他会同步消费进度到broker,广播消费为LocalFileOffsetStore,他只会将消费进度保存到隐藏本地文件中。

RebalanceService比较复杂。RebalanceService如果服务不停,周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将信息加入到阻塞队列pullRequestQueue。

PullMessageService 如果服务不停,不断的从阻塞队列pullRequestQueue获取消息,然后基于该信息从broker拉取消息。

三、消息消费流程分析

1、MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。

2、MQClientInstance定期updateTopicRouteInfoFromNameServer(关键方法),从通过getTopicRouteInfoFromNameServer 从nameserver获取TopicRouteData信息。然后判断是否有变化,有变化则更新topicRouteTable 里的TopicRouteData

某个topic对应的TopicRouteData内容如下,主要包括queueDatas和brokerDatas:

TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=taobaodaily-01, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0], QueueData [brokerName=taobaodaily-03, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]], brokerDatas=[BrokerData [brokerName=taobaodaily-03, brokerAddrs={0=100.81.165.119:10911}], BrokerData [brokerName=taobaodaily-01, brokerAddrs={0=10.218.141.54:10911}]], filterServerTable={}]

3、通过topicRouteData2TopicSubscribeInfo 将topic 和TopicRouteData转换为Set《MessageQueue》,MessageQueue里维护topic、brokerName和queueId的信息,并同步到rebalanceImpl 的 topicSubscribeInfoTable里,如下:

[MessageQueue [topic=TBW102, brokerName=taobaodaily-03, queueId=0]

4、更新brokerAddrTable

{taobaodaily-01={0=10.218.141.54:10911}, taobaodaily-03={0=100.81.165.119:10911}}

5、RebalanceService 依次调用RebalanceService doRebalance ->rebalanceByTopic ->findConsumerIdList ->updateTopicRouteInfoFromNameServer 触发上面流程。另一方面,RebalanceService周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。然后调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将PullRequest信息加入到阻塞队列pullRequestQueue

6、PullMessageService不断的从阻塞队列pullRequestQueue获取消息。然后依次调用mQClientFactory.getMQClientAPIImpl().pullMessage和pullAPIWrapper.pullKernelImp**l,根据message queue和offset拉取消息,成功后通过, 异步回调 **PullCallback.onSuccess,返回pullResult,里面包括获取的msgFoundList。然后依次调用pullAPIWrapper.processPullResult和consumeMessageService.submitConsumeRequest,构造ConsumeRequest对象并提交到consumeExecutor线程池上。

7、在consumeExecutor线程池上,ConsumeRequest类的run方法从processQueue获取待处理的消息 并调用listener.consumeMessage(用户注册的handler)消费消息。整个消费流程完成。

转载请注明原文地址: https://www.6miu.com/read-2755.html

最新回复(0)