聊聊rocketmq的PushConsumerImpl

作者 : 开心源码 本文共12837个字,预计阅读时间需要33分钟 发布时间: 2022-05-12 共221人阅读

本文主要研究一下rocketmq的PushConsumerImpl

PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

public class PushConsumerImpl implements PushConsumer {

private final DefaultMQPushConsumer rocketmqPushConsumer;

private final KeyValue properties;

private boolean started = false;

private final Map subscribeTable = new ConcurrentHashMap();

private final ClientConfig clientConfig;

public PushConsumerImpl(final KeyValue properties) {

this.rocketmqPushConsumer = new DefaultMQPushConsumer();

this.properties = properties;

this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);

String accessPoints = clientConfig.getOmsAccessPoints();

if (accessPoints == null || accessPoints.isEmpty()) {

throw new OMSRuntimeException(“-1”, “OMS AccessPoints is null or empty.”);

}

this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));

String consumerGroup = clientConfig.getRmqConsumerGroup();

if (null == consumerGroup || consumerGroup.isEmpty()) {

throw new OMSRuntimeException(“-1”, “Consumer Group is necessary for RocketMQ, please set it.”);

}

this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);

this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());

this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());

this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());

this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());

String consumerId = OMSUtil.buildInstanceName();

this.rocketmqPushConsumer.setInstanceName(consumerId);

properties.put(PropertyKeys.CONSUMER_ID, consumerId);

this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());

}

@Override

public KeyValue properties() {

return properties;

}

@Override

public void resume() {

this.rocketmqPushConsumer.resume();

}

@Override

public void suspend() {

this.rocketmqPushConsumer.suspend();

}

@Override

public boolean isSuspended() {

return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();

}

@Override

public PushConsumer attachQueue(final String queueName, final MessageListener listener) {

this.subscribeTable.put(queueName, listener);

try {

this.rocketmqPushConsumer.subscribe(queueName, “*”);

} catch (MQClientException e) {

throw new OMSRuntimeException(“-1”, String.format(“RocketMQ push consumer can't attach to %s.”, queueName));

}

return this;

}

@Override

public synchronized void startup() {

if (!started) {

try {

this.rocketmqPushConsumer.start();

} catch (MQClientException e) {

throw new OMSRuntimeException(“-1”, e);

}

}

this.started = true;

}

@Override

public synchronized void shutdown() {

if (this.started) {

this.rocketmqPushConsumer.shutdown();

}

this.started = false;

}

//……

}

  • 这里创立的是DefaultMQPushConsumer,同时设置的messageListener为MessageListenerImpl
  • attachQueue的时候,用的是rocketmqPushConsumer.subscribe
  • 启动时调使用rocketmqPushConsumer.start(),关闭时调使用rocketmqPushConsumer.shutdown()

MessageListenerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

class MessageListenerImpl implements MessageListenerConcurrently {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList,

ConsumeConcurrentlyContext contextRMQ) {

MessageExt rmqMsg = rmqMsgList.get(0);

BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);

MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());

if (listener == null) {

throw new OMSRuntimeException(“-1”,

String.format(“The topic/queue %s isn't attached to this consumer”, rmqMsg.getTopic()));

}

final KeyValue contextProperties = OMS.newKeyValue();

final CountDownLatch sync = new CountDownLatch(1);

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());

ReceivedMessageContext context = new ReceivedMessageContext() {

@Override

public KeyValue properties() {

return contextProperties;

}

@Override

public void ack() {

sync.countDown();

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,

ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());

}

@Override

public void ack(final KeyValue properties) {

sync.countDown();

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,

properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));

}

};

long begin = System.currentTimeMillis();

listener.onMessage(omsMsg, context);

long costs = System.currentTimeMillis() – begin;

long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;

try {

sync.await(Math.max(0, timeoutMills – costs), TimeUnit.MILLISECONDS);

} catch (InterruptedException ignore) {

}

return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));

}

}

  • 实现了MessageListenerConcurrently接口的consumeMessage方法
  • 这个方法接收到omsMsg的时候,调使用了该topic对应的listener的onMessage方法,而后同步阻塞等待指定时间再返回

DefaultMQPushConsumerImpl.start

org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public synchronized void start() throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

log.info(“the consumer [{}] start beginning. messageModel={}, isUnitMode={}”, this.defaultMQPushConsumer.getConsumerGroup(),

this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

this.defaultMQPushConsumer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(

mQClientFactory,

this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPushConsumer.getOffsetStore() != null) {

this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

} else {

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

case CLUSTERING:

this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

default:

break;

}

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

}

this.offsetStore.load();

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

this.consumeOrderly = true;

this.consumeMessageService =

new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

this.consumeOrderly = false;

this.consumeMessageService =

new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

}

this.consumeMessageService.start();

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

this.consumeMessageService.shutdown();

throw new MQClientException(“The consumer group[” + this.defaultMQPushConsumer.getConsumerGroup()

+ “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

mQClientFactory.start();

log.info(“the consumer [{}] start OK.”, this.defaultMQPushConsumer.getConsumerGroup());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException(“The PushConsumer service state not OK, maybe started once, “

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();

this.mQClientFactory.checkClientInBroker();

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.mQClientFactory.rebalanceImmediately();

}

  • start方法根据serviceState的状态值来执行不同的逻辑
  • CREATE_JUST的时候,假如messageListener是MessageListenerOrderly,则创立ConsumeMessageOrderlyService,否则创立ConsumeMessageConcurrentlyService
  • 之后调使用consumeMessageService.start()

ConsumeMessageConcurrentlyService.consumeMessageDirectly

org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

@Override

public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {

ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();

result.setOrder(false);

result.setAutoCommit(true);

List msgs = new ArrayList();

msgs.add(msg);

MessageQueue mq = new MessageQueue();

mq.setBrokerName(brokerName);

mq.setTopic(msg.getTopic());

mq.setQueueId(msg.getQueueId());

ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);

this.resetRetryTopic(msgs);

final long beginTime = System.currentTimeMillis();

log.info(“consumeMessageDirectly receive new message: {}”, msg);

try {

ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);

if (status != null) {

switch (status) {

case CONSUME_SUCCESS:

result.setConsumeResult(CMResult.CR_SUCCESS);

break;

case RECONSUME_LATER:

result.setConsumeResult(CMResult.CR_LATER);

break;

default:

break;

}

} else {

result.setConsumeResult(CMResult.CR_RETURN_NULL);

}

} catch (Throwable e) {

result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);

result.setRemark(RemotingHelper.exceptionSimpleDesc(e));

log.warn(String.format(“consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s”,

RemotingHelper.exceptionSimpleDesc(e),

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

mq), e);

}

result.setSpentTimeMills(System.currentTimeMillis() – beginTime);

log.info(“consumeMessageDirectly Result: {}”, result);

return result;

}

  • 这里调使用messageListener.consumeMessage方法
  • ConsumeMessageOrderlyService的consumeMessageDirectly方法也是调使用messageListener.consumeMessage方法

ClientRemotingProcessor

org/apache/rocketmq/client/impl/ClientRemotingProcessor.java

public class ClientRemotingProcessor implements NettyRequestProcessor {

private final Logger log = ClientLogger.getLog();

private final MQClientInstance mqClientFactory;

public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {

this.mqClientFactory = mqClientFactory;

}

@Override

public RemotingCommand processRequest(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

switch (request.getCode()) {

case RequestCode.CHECK_TRANSACTION_STATE:

return this.checkTransactionState(ctx, request);

case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:

return this.notifyConsumerIdsChanged(ctx, request);

case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:

return this.resetOffset(ctx, request);

case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:

return this.getConsumeStatus(ctx, request);

case RequestCode.GET_CONSUMER_RUNNING_INFO:

return this.getConsumerRunningInfo(ctx, request);

case RequestCode.CONSUME_MESSAGE_DIRECTLY:

return this.consumeMessageDirectly(ctx, request);

default:

break;

}

return null;

}

private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final ConsumeMessageDirectlyResultRequestHeader requestHeader =

(ConsumeMessageDirectlyResultRequestHeader) request

.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);

final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));

ConsumeMessageDirectlyResult result =

this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());

if (null != result) {

response.setCode(ResponseCode.SUCCESS);

response.setBody(result.encode());

} else {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(String.format(“The Consumer Group <%s> not exist in this consumer”, requestHeader.getConsumerGroup()));

}

return response;

}

//……

}

  • client在接收到server的消息的时候,会解析消息的类型,假如是RequestCode.CONSUME_MESSAGE_DIRECTLY,则会调使用consumeMessageDirectly
  • consumeMessageDirectly则调使用this.mqClientFactory.consumeMessageDirectly

MQClientInstance.consumeMessageDirectly

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,

final String consumerGroup,

final String brokerName) {

MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);

if (null != mqConsumerInner) {

DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;

ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);

return result;

}

return null;

}

  • 这里是直接调使用DefaultMQPushConsumerImpl的getConsumeMessageService()的consumeMessageDirectly方法
  • 这个方法则触发messageListener.consumeMessage,进行消息推动

小结

  • rocketmq的PushConsumerImpl主要是注册MessageListenerImpl,实现consumeMessage方法
  • consumeMessage方法又会触发topic的MessageListener的onMessage方法实现推送
  • 而mq的client端在接收到RequestCode.CONSUME_MESSAGE_DIRECTLY请求的时候,则会触发consumeMessageDirectly方法
  • consumeMessageDirectly方法最后是调使用了ConsumeMessageService的consumeMessageDirectly方法
  • 而ConsumeMessageConcurrentlyService则会回调到MessageListenerImpl(实现MessageListenerConcurrently接口)的consumeMessage方法
  • ConsumeMessageOrderlyService的consumeMessageDirectly,则回调使用户自己设置的实现MessageListener(实现MessageListenerOrderly接口)的consumeMessage方法

doc

  • PushConsumerImpl
说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 聊聊rocketmq的PushConsumerImpl

发表回复