RocketMQ DefaultMQProducer#send 方法源码分析(二)

作者: vnjohn / 发表于 2024-01-25 / 分类: RocketMQ

RocketMQ, 源码

前言

RocketMQ 专栏篇:

RocketMQ 单节点、集群节点部署

RocketMQ 分布式事务源码分析

RocketMQ 分布式事务实战

RocketMQ 生产者 DefaultMQProducer 源码分析

RocketMQ MQClientInstance 生产者启动源码分析

RocketMQ MessageQueue、MessageQueueSelector 结构

RocketMQ DefaultMQProducer#send 方法源码分析(一)

在这里插入图片描述

在 send 方法中分为三种方式单向、同步、异步,而在异步方式投递消息,在 RocketMQ DefaultMQProducer#send 方法源码分析(一) 博文中主要介绍的是在处理这三种方式投递之前所要处理的一些消息的压缩、消息属性的赋值以及在处理异步消息和投递不同模式的消息所要作的准备工作,这篇文章主要介绍在更底层上是如何区分这三种消息发送方式时的处理逻辑的.

NettyRemotingClient#invokeOneway

执行单向发送的方法,会先与 Broker Master 节点建立 Socket 连接,执行发送消息的操作,具体源码如下:

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
  // 与 Broker 建立 Socket 连接
  final Channel channel = this.getAndCreateChannel(addr);
  if (channel != null && channel.isActive()) {
    try {
      doBeforeRpcHooks(addr, request);
      this.invokeOnewayImpl(channel, request, timeoutMillis);
    } catch (RemotingSendRequestException e) {
      log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
      this.closeChannel(addr, channel);
      throw e;
    }
  } else {
    this.closeChannel(addr, channel);
    throw new RemotingConnectException(addr);
  }
}

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
  throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
  request.markOnewayRPC();
  boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
  if (acquired) {
    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
    try {
      channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
          once.release();
          if (!f.isSuccess()) {
            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
          }
        }
      });
    } catch (Exception e) {
      once.release();
      log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
      throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
    }
  } else {
    if (timeoutMillis <= 0) {
      throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
    } else {
      String info = String.format(
        "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
        timeoutMillis,
        this.semaphoreOneway.getQueueLength(),
        this.semaphoreOneway.availablePermits()
      );
      log.warn(info);
      throw new RemotingTimeoutException(info);
    }
  }
}

在 invokeOneway 方法中,先通过 Netty Bootstrap 建立与 Broker 之间的 Socket 连接,拿到 Channel 连接通道以后,再执行 invokeOnewayImpl 方法,该方法的源码分为以下几步走:

  1. 通过 RemotingCommand#markOnewayRPC 方法将 RequestCommand#flag 属性标志位为 2

  2. 通过 Semaphore 信号量机制获取一个请求凭证,它在初始化 NettyRemotingClient 时会创建出具体数量的一个凭证数,异步也是基于信号量机制获取一个请求凭证的,如下:

    // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"))
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
    // public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener)
    // 单向发送信号量数、异步发送信号量数 = 65535
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    
  3. 获取到请求凭证后,向 Broker 发送写事件成功或者出现异常时将请求凭证资源进行释放

MQClientAPIImpl#sendMessageAsync

通过 MQClientAPIImpl#sendMessageAsync 方法发送异步消息,在这里会组装好 InvokeCallback 执行回调的逻辑,未来当 Broker 处理完以后会调用这个回调处理里面的逻辑,该方法源码如下:

private void sendMessageAsync(
  final String addr,
  final String brokerName,
  final Message msg,
  final long timeoutMillis,
  final RemotingCommand request,
  final SendCallback sendCallback,
  final TopicPublishInfo topicPublishInfo,
  final MQClientInstance instance,
  final int retryTimesWhenSendFailed,
  final AtomicInteger times,
  final SendMessageContext context,
  final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
  final long beginStartTime = System.currentTimeMillis();
  try {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {  ... });
    // 异常进行重试
  } catch (Exception ex) {
    long cost = System.currentTimeMillis() - beginStartTime;
    producer.updateFaultItem(brokerName, cost, true);
    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, ex, context, true, producer);
  }
}

该方法具体的是先要处理完 NettyRemotingClient#invokeAsync 方法的逻辑,然后再回头来看这一块的回调处理

public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
  throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
  long beginStartTime = System.currentTimeMillis();
  // 通过 Netty 创建客户端与服务端之间的连接
  final Channel channel = this.getAndCreateChannel(addr);
  if (channel != null && channel.isActive()) {
    try {
      doBeforeRpcHooks(addr, request);
      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTime) {
        throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + addr + "] timeout");
      }
      this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
    } catch (RemotingSendRequestException e) {
      log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
      this.closeChannel(addr, channel);
      throw e;
    }
  } else {
    this.closeChannel(addr, channel);
    throw new RemotingConnectException(addr);
  }
}

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                            final InvokeCallback invokeCallback)
  throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
  long beginStartTime = System.currentTimeMillis();
  final int opaque = request.getOpaque();
  boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
  if (acquired) {
    // 采用 CAS 操作对信号量进行释放
    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
    long costTime = System.currentTimeMillis() - beginStartTime;
    if (timeoutMillis < costTime) {
      once.release();
      throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
    }
    final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
    this.responseTable.put(opaque, responseFuture);
    try {
      // Netty 方式发起异步请求,最终会由 NettyRemotingClient.NettyClientHandler 方法来处理 responseFuture
      channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
          if (f.isSuccess()) {
            responseFuture.setSendRequestOK(true);
            return;
          }
          requestFail(opaque);
          log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
        }
      });
    } catch (Exception e) {
      responseFuture.release();
      log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
      throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
    }
  } else {
    if (timeoutMillis <= 0) {
      throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
    } else {
      String info =
        String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
      log.warn(info);
      throw new RemotingTimeoutException(info);
    }
  }
}

通过 NettyRemotingClient#invokeAsync 方法先创建好连接通道:Channel,确保其是活跃的,然后再调用 NettyRemotingClient#invokeAsyncImpl,该方法的逻辑分为以下几步走:

  1. 通过 SemaphoreAsync 异步信号量对象获取一个请求凭证

  2. 获取成功以后,组装 ResponseFuture 对象实例,将之前组装的 InvokeCallback 回调机制作为参数传入,将其存入到 ResponseTable 集合中,当 Broker 将消息存储成功或失败以后,会通过 NettyClientHandler 来调用,进行后续的处理

    在判断响应信息时,会查看 ResponseFuture 是否有 InvokeCallback 要处理,它由 executeInvokeCallback 方法处理,也是采用异步线程的方式去处理该回调的.

    private void executeInvokeCallback(final ResponseFuture responseFuture) {
      boolean runInThisThread = false;
      ExecutorService executor = this.getCallbackExecutor();
      if (executor != null) {
        try {
          executor.submit(new Runnable() {
            @Override
            public void run() {
              try {
                responseFuture.executeInvokeCallback();
              } catch (Throwable e) {
                log.warn("execute callback in executor exception, and callback throw", e);
              } finally {
                responseFuture.release();
              }
            }
          });
        } catch (Exception e) {
          runInThisThread = true;
          log.warn("execute callback in executor exception, maybe executor busy", e);
        }
      } else {
        runInThisThread = true;
      }
    
      if (runInThisThread) {
        try {
          responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
          log.warn("executeInvokeCallback Exception", e);
        } finally {
          responseFuture.release();
        }
      }
    }
    

接着再回到 MQClientAPIImpl#sendMessageAsync 方法查看 InvokeCallback 回调函数里处理的代码逻辑,如下:

new InvokeCallback() {
  @Override
  public void operationComplete(ResponseFuture responseFuture) {
    long cost = System.currentTimeMillis() - beginStartTime;
    RemotingCommand response = responseFuture.getResponseCommand();
    // 当回调函数为空时 & 响应信息不为空
    if (null == sendCallback && response != null) {
      try {
        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
        if (context != null && sendResult != null) {
          context.setSendResult(sendResult);
          context.getProducer().executeSendMessageHookAfter(context);
        }
      } catch (Throwable e) {
      }
      // 调整平衡
      producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
      return;
    }
    // 回调函数不为空
    if (response != null) {
      try {
        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
        assert sendResult != null;
        if (context != null) {
          context.setSendResult(sendResult);
          context.getProducer().executeSendMessageHookAfter(context);
        }
        // 调用自定义回调函数的 onSuccess 方法处理
        try {
          sendCallback.onSuccess(sendResult);
        } catch (Throwable e) {
        }
        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
      } catch (Exception e) {
        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer);
      }
      // send request failed、wait response timeout、其他原因,调用 onExceptionImpl 对异步消息进行重试
    } else {
      producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
      if (!responseFuture.isSendRequestOK()) {
        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
      } else if (responseFuture.isTimeout()) {
        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause());
        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
      } else {
        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
      }
    }
  }
}

该回调方法是属于生产者源码侧的处理,还有一个是在投递消息时自定义的 SendCallback 回调函数

  1. 当自定义回调函数 SendCallback 为空时 & 响应信息不为空时,调用 processSendResponse 方法返回一个 SendResult 对象存入到 SendMessageContext 上下文中
  2. 当自定义回调函数 SendCallback 不为空时 & 响应信息不为空时,调用 processSendResponse 方法返回一个 SendResult 对象存入到 SendMessageContext 上下文中,再调用自定义的 SendCallback#onSuccess 回调函数处理逻辑.
  3. 在处理消息时属于 Send Request Failed、Wait Response Timeout、其他原因,调用 MQClientAPIImpl#onExceptionImpl 对异步消息进行重试,调用 processSendResponse 方法不属于{0、10、11、12} 响应码时,那么就说明投递消息存在问题,则调用自定义回调函数的 SendCallback#onException 方法.

MQClientAPIImpl#onExceptionImpl 方法主要的逻辑就是对异步投递消息失败的情况下进行重试或者调用自定义函数的 onException 方法进行返回,具体的源码如下:

private void onExceptionImpl(final String brokerName,
                             final Message msg,
                             final long timeoutMillis,
                             final RemotingCommand request,
                             final SendCallback sendCallback,
                             final TopicPublishInfo topicPublishInfo,
                             final MQClientInstance instance,
                             final int timesTotal,
                             final AtomicInteger curTimes,
                             final Exception e,
                             final SendMessageContext context,
                             final boolean needRetry,
                             final DefaultMQProducerImpl producer
                            ) {
  int tmp = curTimes.incrementAndGet();
  if (needRetry && tmp <= timesTotal) {
    String retryBrokerName = brokerName;//by default, it will send to the same broker
    if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
      MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
      retryBrokerName = mqChosen.getBrokerName();
    }
    // 重新获取 MASTER Broker
    String addr = instance.findBrokerAddressInPublish(retryBrokerName);
    log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName), e);
    try {
      // 设置一个新的 opaque
      request.setOpaque(RemotingCommand.createNewRequestId());
      sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                       timesTotal, curTimes, context, producer);
    } catch (InterruptedException e1) {
      onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer);
    } catch (RemotingTooMuchRequestException e1) {
      onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer);
    } catch (RemotingException e1) {
      producer.updateFaultItem(brokerName, 3000, true);
      onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer);
    }
    // 重试次数用完以后或处理 processSendResponse 方法失败时,调用自定义回调函数的 onException 方法处理
  } else {
    if (context != null) {
      context.setException(e);
      context.getProducer().executeSendMessageHookAfter(context);
    }
    try {
      sendCallback.onException(e);
    } catch (Exception ignored) {
    }
  }
}

在进行重试时,会生成一个新的 opaque 请求-响应唯一标识,再调用 MQClientAPIImpl#sendMessageAsync 方法进行重试而不是重试回到 DefaultMQProducerImpl#sendKernelImpl 方法进行重试,所以之前在异步方式投递消息时会将压缩以后的消息传进来,在这里是不会再对消息再次压缩的.

当异步重试次数用完或处理 processSendResponse 方法失败时,调用自定义回调函数的 onException 方法返回.

MQClientAPIImpl#sendMessageSync

通过调用 MQClientAPIImpl#sendMessageSync 方法先向 Broker 发起同步消息投递请求后处理消息返回的结果给到生产者,相关的源码如下:

private SendResult sendMessageSync(
  final String addr,
  final String brokerName,
  final Message msg,
  final long timeoutMillis,
  final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
  RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  assert response != null;
  return this.processSendResponse(brokerName, msg, response, addr);
}

先通过调用 NettyRemotingClient#invokeSync 完成同步写的逻辑后再调用 MQClientAPIImpl#processSendResponse 方法处理同步发送以后返回的结果.

在 NettyRemotingClient#invokeSync 方法中会创建好与 Broker Master 之间的 channel 通道,再向 Broker Master 发起同步的请求,一直阻塞等待 Broker 返回再作处理,具体的源码如下:

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
  throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
  long beginStartTime = System.currentTimeMillis();
  final Channel channel = this.getAndCreateChannel(addr);
  if (channel != null && channel.isActive()) {
    try {
      doBeforeRpcHooks(addr, request);
      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTime) {
        throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
      }
      // 通过获取到的活动连接去向 Broker Master 发出请求
      RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
      doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
      return response;
    } catch (RemotingSendRequestException e) {
      log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
      this.closeChannel(addr, channel);
      throw e;
    } catch (RemotingTimeoutException e) {
      if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
        this.closeChannel(addr, channel);
        log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
      }
      log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
      throw e;
    }
  } else {
    this.closeChannel(addr, channel);
    throw new RemotingConnectException(addr);
  }
}

具体的同步逻辑在 NettyRemotingClient#invokeSyncImpl 可以观察到.

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
                                      final long timeoutMillis)
  throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
  final int opaque = request.getOpaque();
  try {
    final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
    this.responseTable.put(opaque, responseFuture);
    final SocketAddress addr = channel.remoteAddress();
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture f) throws Exception {
        if (f.isSuccess()) {
          responseFuture.setSendRequestOK(true);
          return;
        } else {
          responseFuture.setSendRequestOK(false);
        }
        responseTable.remove(opaque);
        responseFuture.setCause(f.cause());
        responseFuture.putResponse(null);
        log.warn("send a request command to channel <" + addr + "> failed.");
      }
    });
    // 一直阻塞等待 Broker 处理完成再返回
    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    if (null == responseCommand) {
      if (responseFuture.isSendRequestOK()) {
        throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                                           responseFuture.getCause());
      } else {
        throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
      }
    }
    return responseCommand;
  } finally {
    this.responseTable.remove(opaque);
  }
}

NettyRemotingClient#invokeSync 与 NettyRemotingClient#invokeSyncImpl 方法整体的操作步骤分为以下几步:

  1. 先通过 NettyRemotingClient#getAndCreateChannel 方法获取或创建当前与 Broker Master 之间的连接,底层实际是使用的 Netty 核心方法 Bootstrap#connect(java.net.SocketAddress) 建立的 Socket 连接

  2. 当连接处于活跃状态时,再调用 NettyRemotingClient#invokeSyncImpl 方法处理 Socket 连接的写事件动作,将发起的同步请求写给 Broker Master 处理,会生成 ResponseFuture 对象实例

  3. 同步模式下,调用的是 ResponseFuture#waitResponse 方法一直阻塞等待 Broker 处理完成返回结果,当 RemotingCommand 不为空时才代表是网络 I/O 正常处理后返回的结果,不为空时会对同步模式下发送的消息进行重试投递,默认次数为 3

    在同步消息处理时发生 RemotingException、MQClientException 异常时,会对消息进行重试投递.

  4. 拿到 Broker 侧处理返回的结果以后,会再调用 MQClientAPIImpl#processSendResponse 方法处理返回的结果,封装SendResult

    处理 broker 处理请求后返回的响应信息,0、10、11、12 响应码正常返回结果集,其他响应抛出异常

MQClientAPIImpl#processSendResponse 方法源码,在异步模式下回调方法处理消息投递的结果也用到该方法的异常码分支判断逻辑,具体源码如下:

private SendResult processSendResponse(
  final String brokerName,
  final Message msg,
  final RemotingCommand response,
  final String addr
) throws MQBrokerException, RemotingCommandException {
  SendStatus sendStatus;
  switch (response.getCode()) {
    case ResponseCode.FLUSH_DISK_TIMEOUT: {
      sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
      break;
    }
    case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
      sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
      break;
    }
    case ResponseCode.SLAVE_NOT_AVAILABLE: {
      sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
      break;
    }
    case ResponseCode.SUCCESS: {
      sendStatus = SendStatus.SEND_OK;
      break;
    }
    default: {
      throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }
  }
  SendMessageResponseHeader responseHeader =
    (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

  //If namespace not null , reset Topic without namespace.
  String topic = msg.getTopic();
  if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
    topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
  }
  MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
  String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
  if (msg instanceof MessageBatch) {
    StringBuilder sb = new StringBuilder();
    for (Message message : (MessageBatch) msg) {
      sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
    }
    uniqMsgId = sb.toString();
  }
  SendResult sendResult = new SendResult(sendStatus,
                                         uniqMsgId,
                                         responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
  sendResult.setTransactionId(responseHeader.getTransactionId());
  String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
  String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
  if (regionId == null || regionId.isEmpty()) {
    regionId = MixAll.DEFAULT_TRACE_REGION_ID;
  }
  if (traceOn != null && traceOn.equals("false")) {
    sendResult.setTraceOn(false);
  } else {
    sendResult.setTraceOn(true);
  }
  sendResult.setRegionId(regionId);
  return sendResult;
}

总结

该篇博文主要分析几种不同方式的投递消息方式采用的核心方法,SYNC 采用的是 MQClientAPIImpl#sendMessageSync 方法处理、ASYNC 采用的是 MQClientAPIImpl#sendMessageAsync、ONEWAY 采用的是 NettyRemotingClient#invokeOneway 方法处理,而 ONEWAY、ASYNC 采用了信号量的方式去获取令牌凭证去向 Broker 发起请求,同时,由于是异步必须是要有回调存在的,所以会有一个异步调度线程去阻塞等待消息的投递结果,使用的 CountDownLatch 机制的阻塞等待,SYNC 方式也是如此只不过它是同步调用 CountDownLatch 去等待执行完成的,每当创建好一个新的 RequestFuture 都会有一个新的 CountDownLatch 长度为 1 的实例存在,在这里没有过多的介绍客户端与 Broker 之间的网络模型,会在在下篇文章再具体介绍.

在这里插入图片描述

博文放在 RocketMQ 分类里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

vnjohn

作者

vnjohn

后端研发工程师。喜欢探索新技术,空闲时也折腾 AIGC 等效率工具。 可以在 GitHub 关注我了解更多,也可以加我微信(vnjohn) 与我交流。