聊聊rocketmq的PullConsumerImpl

作者 : 开心源码 本文共9359个字,预计阅读时间需要24分钟 发布时间: 2022-05-12 共251人阅读

本文主要研究一下rocketmq的PullConsumerImpl

PullConsumerImpl

io/openmessaging/rocketmq/consumer/PullConsumerImpl.java

public class PullConsumerImpl implements PullConsumer {

private final DefaultMQPullConsumer rocketmqPullConsumer;

private final KeyValue properties;

private boolean started = false;

private String targetQueueName;

private final MQPullConsumerScheduleService pullConsumerScheduleService;

private final LocalMessageCache localMessageCache;

private final ClientConfig clientConfig;

final static Logger log = ClientLogger.getLog();

public PullConsumerImpl(final String queueName, final KeyValue properties) {

this.properties = properties;

this.targetQueueName = queueName;

this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);

String consumerGroup = clientConfig.getRmqConsumerGroup();

if (null == consumerGroup || consumerGroup.isEmpty()) {

throw new OMSRuntimeException(“-1”, “Consumer Group is necessary for RocketMQ, please set it.”);

}

pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);

this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

String accessPoints = clientConfig.getOmsAccessPoints();

if (accessPoints == null || accessPoints.isEmpty()) {

throw new OMSRuntimeException(“-1”, “OMS AccessPoints is null or empty.”);

}

this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));

this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);

int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();

this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);

String consumerId = OMSUtil.buildInstanceName();

this.rocketmqPullConsumer.setInstanceName(consumerId);

properties.put(PropertyKeys.CONSUMER_ID, consumerId);

this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);

}

@Override

public KeyValue properties() {

return properties;

}

@Override

public Message poll() {

MessageExt rmqMsg = localMessageCache.poll();

return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);

}

@Override

public Message poll(final KeyValue properties) {

MessageExt rmqMsg = localMessageCache.poll(properties);

return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);

}

@Override

public void ack(final String messageId) {

localMessageCache.ack(messageId);

}

@Override

public void ack(final String messageId, final KeyValue properties) {

localMessageCache.ack(messageId);

}

@Override

public synchronized void startup() {

if (!started) {

try {

registerPullTaskCallback();

this.pullConsumerScheduleService.start();

this.localMessageCache.startup();

} catch (MQClientException e) {

throw new OMSRuntimeException(“-1”, e);

}

}

this.started = true;

}

private void registerPullTaskCallback() {

this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {

@Override

public void doPullTask(final MessageQueue mq, final PullTaskContext context) {

MQPullConsumer consumer = context.getPullConsumer();

try {

long offset = localMessageCache.nextPullOffset(mq);

PullResult pullResult = consumer.pull(mq, “*”,

offset, localMessageCache.nextPullBatchNums());

ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()

.getProcessQueueTable().get(mq);

switch (pullResult.getPullStatus()) {

case FOUND:

if (pq != null) {

pq.putMessage(pullResult.getMsgFoundList());

for (final MessageExt messageExt : pullResult.getMsgFoundList()) {

localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));

}

}

break;

default:

break;

}

localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());

} catch (Exception e) {

log.error(“A error occurred in pull message process.”, e);

}

}

});

}

@Override

public synchronized void shutdown() {

if (this.started) {

this.localMessageCache.shutdown();

this.pullConsumerScheduleService.shutdown();

this.rocketmqPullConsumer.shutdown();

}

this.started = false;

}

}

  • 这里poll方法从LocalMessageCache里头拉取消息
  • LocalMessageCache是从consumeRequestCache这个LinkedBlockingQueue中poll出来ConsumeRequest,该request携带了MessageExt
  • MQPullConsumerScheduleService的registerPullTaskCallback方法注册了一个callback,该callback会往consumeRequestCache这个LinkedBlockingQueue存ConsumeRequest

MQPullConsumerScheduleService

org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java

/**

* Schedule service for pull consumer

*/

public class MQPullConsumerScheduleService {

private final Logger log = ClientLogger.getLog();

private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();

private final ConcurrentMap taskTable =

new ConcurrentHashMap();

private DefaultMQPullConsumer defaultMQPullConsumer;

private int pullThreadNums = 20;

private ConcurrentMap callbackTable =

new ConcurrentHashMap();

private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

public MQPullConsumerScheduleService(final String consumerGroup) {

this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);

this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);

}

public void start() throws MQClientException {

final String group = this.defaultMQPullConsumer.getConsumerGroup();

this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(

this.pullThreadNums,

new ThreadFactoryImpl(“PullMsgThread-” + group)

);

this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);

this.defaultMQPullConsumer.start();

log.info(“MQPullConsumerScheduleService start OK, {} {}”,

this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);

}

//……

}

  • 类成员直接new了一个MessageQueueListenerImpl
  • 这里start的时候将MessageQueueListenerImpl设置到defaultMQPullConsumer,再调使用defaultMQPullConsumer的start方法

MessageQueueListenerImpl

org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java

class MessageQueueListenerImpl implements MessageQueueListener {

@Override

public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) {

MessageModel messageModel =

MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();

switch (messageModel) {

case BROADCASTING:

MQPullConsumerScheduleService.this.putTask(topic, mqAll);

break;

case CLUSTERING:

MQPullConsumerScheduleService.this.putTask(topic, mqDivided);

break;

default:

break;

}

}

}

  • MessageQueueListenerImpl在messageQueueChanged的时候会调使用putTask
  • RebalancePullImpl的messageQueueChanged会触发messageQueueListener.messageQueueChanged方法
  • DefaultMQPullConsumerImpl的doRebalance方法会触发RebalancePullImpl的messageQueueChanged方法

MQPullConsumerScheduleService.putTask

org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java

public void putTask(String topic, Set mqNewSet) {

Iterator<Entry> it = this.taskTable.entrySet().iterator();

while (it.hasNext()) {

Entry next = it.next();

if (next.getKey().getTopic().equals(topic)) {

if (!mqNewSet.contains(next.getKey())) {

next.getValue().setCancelled(true);

it.remove();

}

}

}

for (MessageQueue mq : mqNewSet) {

if (!this.taskTable.containsKey(mq)) {

PullTaskImpl command = new PullTaskImpl(mq);

this.taskTable.put(mq, command);

this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);

}

}

}

  • 这个putTask方法会创立PullTaskImpl而后进行调度

PullTaskImpl

org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java

class PullTaskImpl implements Runnable {

private final MessageQueue messageQueue;

private volatile boolean cancelled = false;

public PullTaskImpl(final MessageQueue messageQueue) {

this.messageQueue = messageQueue;

}

@Override

public void run() {

String topic = this.messageQueue.getTopic();

if (!this.isCancelled()) {

PullTaskCallback pullTaskCallback =

MQPullConsumerScheduleService.this.callbackTable.get(topic);

if (pullTaskCallback != null) {

final PullTaskContext context = new PullTaskContext();

context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);

try {

pullTaskCallback.doPullTask(this.messageQueue, context);

} catch (Throwable e) {

context.setPullNextDelayTimeMillis(1000);

log.error(“doPullTask Exception”, e);

}

if (!this.isCancelled()) {

MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,

context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);

} else {

log.warn(“The Pull Task is cancelled after doPullTask, {}”, messageQueue);

}

} else {

log.warn(“Pull Task Callback not exist , {}”, topic);

}

} else {

log.warn(“The Pull Task is cancelled, {}”, messageQueue);

}

}

public boolean isCancelled() {

return cancelled;

}

public void setCancelled(boolean cancelled) {

this.cancelled = cancelled;

}

public MessageQueue getMessageQueue() {

return messageQueue;

}

}

  • PullTaskImpl的run方法会获取pullTaskCallback,而后调使用pullTaskCallback的doPullTask,回调拉取动作

PullConsumerImpl.registerPullTaskCallback

io/openmessaging/rocketmq/consumer/PullConsumerImpl.java

private void registerPullTaskCallback() {

this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {

@Override

public void doPullTask(final MessageQueue mq, final PullTaskContext context) {

MQPullConsumer consumer = context.getPullConsumer();

try {

long offset = localMessageCache.nextPullOffset(mq);

PullResult pullResult = consumer.pull(mq, “*”,

offset, localMessageCache.nextPullBatchNums());

ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()

.getProcessQueueTable().get(mq);

switch (pullResult.getPullStatus()) {

case FOUND:

if (pq != null) {

pq.putMessage(pullResult.getMsgFoundList());

for (final MessageExt messageExt : pullResult.getMsgFoundList()) {

localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));

}

}

break;

default:

break;

}

localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());

} catch (Exception e) {

log.error(“A error occurred in pull message process.”, e);

}

}

});

}

  • 这个callback做的动作就是调使用MQPullConsumer的pull方法获取PullResult
  • 而后将PullResult的MessageExt通过localMessageCache的submitConsumeRequest方法放入localMessageCache

小结

  • rocketmq的PullConsumerImpl的poll方法是从localMessageCache拉取消息,而localMessageCache是通过PullTaskImpl这个定时任务定时执行触发消息拉取
  • PullTaskImpl定时任务触发pullTaskCallback.doPullTask,而在PullConsumerImpl中,这个回调从MQPullConsumer拉取消息,而后放入到localMessageCache

doc

  • PullConsumerImpl
说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 聊聊rocketmq的PullConsumerImpl

发表回复