RocketMQ 底层通信机制源码分析

作者: vnjohn / 发表于 2024-07-01 / 分类: RocketMQ

RocketMQ, 源码

前言

RocketMQ 专栏篇:

RocketMQ 单节点、集群节点部署

RocketMQ 分布式事务源码分析

RocketMQ 分布式事务实战

RocketMQ 生产者 DefaultMQProducer 源码分析

RocketMQ MQClientInstance 生产者启动源码分析

RocketMQ MessageQueue、MessageQueueSelector 结构

RocketMQ DefaultMQProducer#send 方法源码分析(一)

RocketMQ DefaultMQProducer#send 方法源码分析(二)

在这里插入图片描述 如上来自于官网中的通信结构图,核心接口:RemoteService、RemoteServer、RemoteClient,核心类:NettryRemotingServer、NettyRemotingClient,底层基于 Netty 网络通信框架实现

通信机制

RocketMQ 消息队列集群主要包括:NameServer、Broker(Master/Slave)、Producer、Consumer 四个角色,基本通讯流程如下:

  1. Broker 在调用 start 方法时需要完成一次将自己注册到 NameServer 的操作,随后每隔 30s 定时向 NameServer 上报 Topic 路由信息,如下源码来自于 BrokerController#start 方法

    // 如果没有开启DLeger的相关设置,默认没有启动,启动时首次向 NameServer 上报 Topic 路由信息
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
      startProcessorByHa(messageStoreConfig.getBrokerRole());
      handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
      this.registerBrokerAll(true, false, true);
    }
    // 每隔 30s 时间定时向 NameServer 上报 Topic 路由信息,允许值范围:10000~60000.
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
          log.error("registerBrokerAll Exception", e);
        }
      }
      // 1000 * 30
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
  2. 消息生产者 Producer 作为客户端发送消息时,需要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息

    Producer 定时调度拉取 Topic 路由信息,源码如下:

    // MQClientInstance#startScheduledTask
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
          log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
      }
      // 1000 * 30
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
  3. 消息生产者 Producer 根据第二点中获取到的路由信息选择一个 MessageQueue 队列进行消息发送,Broker 作为消息的接收者接收消息并落盘存储

  4. 消息消费者 Consumer 根据第二点中获取到的路由信息,完成客户端的负载均衡后,选择其中某一个或几个消息队列来拉取消息并进行消费

从上面第 1-3 点,可以看出消息生产者、消费者、Broker、NameServer 之间都会发送通信

rocketmq-remoting 模块是 RocketMQ 消息队列中负责网络通信模块,它几乎被其他所有需要网络通信的模块(诸如:rocketmq-client、rocketmq-broker、rocketmq-namesrv)所引用,为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ 消息队列自定义了通信协议并在 Netty 基础之上扩展了通信模块.

Remoting 通信类结构

接下来就开始介绍 remoting 通信类结构下的几个类

接口:RemotingService,实现子接口:RemotingServer、RemotingClient,实现子类:NettyRemotingServer、NettyRemotingClient

public interface RemotingService {
  	// 启动 Netty Boss、Worker 事件循环组
    void start();
		// 释放对应的线程资源,优雅退出等
    void shutdown();
		// 注册类似拦截器的功能,在请求前、后执行对应的操作
    void registerRPCHook(RPCHook rpcHook);
}

接口:RemotingServer,实现子类:NettyRemotingServer,Broker、NameServer 会使用它来进行创建实例

public interface RemotingServer extends RemotingService {
    // 注册请求编码器对应处理器,并交给对应的 ExecutorService 执行
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    // 注册默认请求编码器对应处理器,并交给对应的 ExecutorService 执行
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    // 获取请求编码器对应处理器,并交给对应的 ExecutorService 执行
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    // 执行同步请求
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    // 执行异步请求
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    // 执行单向请求
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

接口:RemotingClient,实现子类:NettyRemotingClient,Broker 会使用它来创建实例与 NameServer 通信、生产者消费者会用它来创建实例与 Broker 通信

public interface RemotingClient extends RemotingService {
    // 更新 NameServer 地址
    void updateNameServerAddressList(final List<String> addrs);

    // 获取 NameServer 地址
    List<String> getNameServerAddressList();

    // 执行同步请求
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;

    // 执行异步请求
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    // 执行单向请求
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;

    // 注册处理器
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
}

可以得知,为了提高代码的复用性和扩展性,提供了 NettyRemotingAbstract 类,NettyRemotingServer、NettyRemotingClient 都继承于它,一般调用 invokeSync、invokeAsync、invokeOneway 方法时都是由它实现,随即调用对应 REQUEST_CODE 所属的 Processor 进行处理

Processor 与 REQUEST_CODE 是一对多的关系,也就是说多个不同类型的请求可以让一个 Processor 进行处理

协议设计与编码解码

在 Client、Server 之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义 RocketMQ 消息协议,同时,为了高效地在网络中传输消息与对接收到的消息进行读取,就需要对消息进行编码、解码

在 RocketMQ 中,RemotingCommand 这个类在消息传输过程中对所有数据内容进行封装,不但包含了所有的数据结构,还包含了编码解码的操作,编码由 NettyEncoder 实现,解码由 NettyDecoder 实现

Header字段类型Request 说明Response 说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通RPC还是onewayRPC的标志区分是普通RPC还是onewayRPC的标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>请求自定义扩展信息响应自定义扩展信息

以上解释来自于 RocketMQ 设计文档,其对应的源码如下:

在这里插入图片描述 opaque 字段是在客户端每次发出请求实例化 RemotingCommand 类时,通过 Atomic 原子性自增生成的,服务端能够根据不同客户端识别出来,然后作出响应时将该字段一起组装返回

在 RocketMQ 中提供了两种序列化类型 SerializeType,一种是 ROCKETMQ,另外一种是 JSON,默认采用的是 JSON 格式,该 RocketMQSerializable#rocketMQProtocolEncode 方法是编码时序列化的核心方法,一个请求头占用的总字节大小如下:

// 如下方法也就是官网中介绍的 Header 字段
// remark、ext 字段需要额外计算
private static int calTotalLen(int remark, int ext) {
  // int code(~32767)
  int length = 2
    // LanguageCode language
    + 1
    // int version(~32767)
    + 2
    // int opaque
    + 4
    // int flag
    + 4
    // String remark
    + 4 + remark
    // HashMap<String, String> extFields
    + 4 + ext;
  return length;
}

RocketMQ 在客户端、服务端之间传输内容时,还进行了优化,编码、解码时采用了 ByteBuffer 进行内存分配以及读取,占用最小的内存空间占用来进行网络传输

在这里插入图片描述

接下来再细究一下传输内容有哪些内容,如下来自官网:

在这里插入图片描述

可见传输内容主要分为以下四部分:

  1. 消息长度:总长度,4 个字节存储,占用一个 int 类型
  2. 序列化类型、消息头长度:同样占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度
  3. 消息头数据:经过序列化过后的消息头数据
  4. 消息主体数据:消息主体的二进制字节数据内容

TCP 粘包、拆包

TCP 粘包、拆包问题是指在进行 TCP 通信时,因为 TCP 是面向流的,所以发送方在传输数据时可能会将多个小数据包粘合在一起发送,而接收方则可能将这些数据包拆分成多个小的数据包进行接收,从而导致数据接收出现错误的问题

TCP收发:一个发送可能被多次接收,多个发送可能被一次接收

TCP传输:一个发送可能占用多个传输包,多个发送可能公用一个传输包

image.png 假设客户端分别发送了两个数据包 ABC 和 DEF 给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况

1、服务端分两次读取到了两个独立的数据包,分别是 ABC 和 DDEF,没有粘包和拆包

2、服务端一次接收到了两个数据包,ABC 和 DEF 粘合在一起,被称为TCP粘包

3、服务端分两次读取到了两个数据包,第一次读取到了完整的 ABC 包和 DEF 包的部分内容(ABCD),第二次读取到了 DEF 包的剩余内容(EF),这被称为 TCP 拆包

4、服务端分两次读取到了两个数据包,第一次读取到了 ABC 包的部分内容(AB),第二次读取到了 CD,第三次读到了 EF(这种有粘包和半包问题)

解决方案

对于粘包、拆包问题的解决,就是对包的格式进行统一的约束,常用的几种方法如下:

  1. 固定长度:消息采用定长的方式,例如每个报文的大小固定长度为 200 字节,若不够,则采用空格替代空位

    Netty 采用 FixedLengthFrameDecoder 类进行实现,这种方式在消息传输时不是这个长度的话,就会造成空间的浪费,不推荐

  2. 分割符:在包的尾部增加分割符,比如通过 \n 换行分割或者可自定义通过指定的分割符进行分割

    Netty 采用 LineBasedFrameDecoder 实现换行分割符,采用 DelimiterBasedFrameDecoder 实现自定义分割符

  3. 消息头、消息体:将消息分为消息头、消息体,消息头中包含了表示消息总长度的字段,只有当读取到了足够长度的消息以后才算是读取到了一个完整的消息

    Netty 采用 LengthFieldBasedFrameDecoder 进行实现,而在 RocketMQ 中也是使用这种方式来解决网络传输 TCP 粘包、拆包问题

消息的通信方式及流程

在 RocketMQ 消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway)三种,其中单向模式相对简单,一般用在发送心跳包的场景下,无需关注 response,如下主要介绍 RocketMQ 异步通信流程

在这里插入图片描述

如上流程图,主要涉及的处理过程,如下:

  1. 通过 NettyRemotingClient 向 Broker 服务端发起请求前,先创建 Channel,建立好连接

  2. 分配 Semaphore 信号量许可令牌,在实例化 NettyRemotingClient 时,会为它分配 65535 个令牌

    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
      // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
      this.semaphoreOneway = new Semaphore(permitsOneway, true);
      // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"))
      this.semaphoreAsync = new Semaphore(permitsAsync, true);
    }
    
  3. 在创建 RemotingCommand 实例时,会原子自增生成一个 opaque int32bit 值,绑定它要回调处理的逻辑

  4. 客户端再通过向服务端发起写事件请求,服务端由 NettyServerHandler 接收处理,它是 NettyRemotingServer 内部类

  5. NettyServerHandler 会根据请求的编码值来匹配对应的 Processor 处理器、ExecutorService 线程池

  6. 接着就是让 ExecutorService 异步调用 Processor 要处理的逻辑,处理完结果以后再将要响应给客户端信息写回到客户端

  7. 客户端由 NettyClientHandler 接收处理,通过 opaque 获取到 responseFuture,再调用生产者处异步发送消息时的监听逻辑

Reactor 多线程设计

RocketMQ RPC 通信采用 Netty 组件作为底层通信库,同样也遵循了 Reactor 多线程模型,同时在这之上做了一些扩展和优化

在这里插入图片描述 以上框图可以大致了解 RocketMQ 中 NettyRemotingServer Reactor 多线程模型

一个 Reactor 主线程:EventLoopGroupBoss,即为上面的 NioEventLoopGroup,负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 Selector 上

RocketMQ 源码中会自动根据 OS 类型来选择 NIO 或 Epoll,也可以 NettyRemotingServer#useEpoll 来判别是属于哪个,源码如下:

private boolean useEpoll() {
  // OS 类型:Windows、Linux
  return RemotingUtil.isLinuxPlatform()
    // 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型
    && nettyServerConfig.isUseEpollNativeSelector()
    && Epoll.isAvailable();
}

在实例化 NettyRemotingServer 时,会通过该方法来判别创建 EpollEventLoopGroup 还是 NioEventLoopGroup,在未做参数配置时,默认使用的就是 NioEventLoopGroup,源码如下:

// public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
if (useEpoll()) {
  this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
    }
  });

  this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);
    private int threadTotal = nettyServerConfig.getServerSelectorThreads();

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
    }
  });
} else {
  this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
    }
  });

  this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);
    private int threadTotal = nettyServerConfig.getServerSelectorThreads();

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
    }
  });
}

随即监听真正的网络数据,拿到网络数据以后,再丢给 Worker 线程池(eventLoopGroupSelector)即为上面的 eventLoopGroupSelector,在真正执行业务逻辑之前需要进行 SSL 验证、编码、解码、空闲检查、网络连接管理,这些工作将分配给 DefaultEventExecutorGroup 去做,在 NettyRemotingServer#start 方法中可以看到,源码如下:

/*
 * SslHandler:SSL安全套接字协议
 * ⬇
 * FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输
 * ⬇
 * NettyEncoder:编码器
 * ⬇
 * NettyDecoder:解码器
 * ⬇
 * IdleStateHandler:空闲检查
 * ⬇
 * NettyConnectManageHandler:网络连接管理
 * ⬇
 * NettyServerHandler:服务端请求处理器
 */
ServerBootstrap childHandler =
  this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
  .option(ChannelOption.SO_REUSEADDR, true)
  .option(ChannelOption.SO_KEEPALIVE, false)
  .childOption(ChannelOption.TCP_NODELAY, true)
  .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline()
        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
        .addLast(defaultEventExecutorGroup,
                 encoder,
                 new NettyDecoder(),
                 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                 connectionManageHandler,
                 serverHandler
                );
    }
  });

在处理业务操作放在业务线程池中执行,根据 RemotingCommand 业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 Processor,然后封装成异步 Task,提交给对应的业务 Processor 处理线程池来执行,从入口到业务逻辑的几个步骤中,线程池一直再增加,这跟每一步逻辑复杂性相关、越复杂需要的并发通道越宽,如下 BrokerController 注册 Processor 源码,以 sendMessageExecutor 发送消息为例:

// Math.min(Runtime.getRuntime().availableProcessors(), 4)
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
  this.brokerConfig.getSendMessageThreadPoolNums(),
  this.brokerConfig.getSendMessageThreadPoolNums(),
  1000 * 60,
  TimeUnit.MILLISECONDS,
  this.sendThreadPoolQueue,
  new ThreadFactoryImpl("SendMessageThread_"));

/*
 * SendMessageProcessor、sendMessageExecutor
 */
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

Netty 服务端网络通信线程主要有以下几类:

线程数线程名线程具体说明
1NettyBoss_%dReactor 主线程
NNettyServerEPOLLSelector_%d_%dReactor 线程池
M1NettyServerCodecThread_%dWorker线程池
M2%sMessageThread__%d业务 processor 处理线程池

总结

该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的数据结构以及相关的一些源码的分析,对其客户端、服务端之间相互调用的协议、编解码进行了阐述,也说明了一下在 RocketMQ 中对于 TCP 流传输时会产生的粘包、拆包问题进行了描述以及 Netty 相关的解决 Case,最后对于 NettyRemotingServer Reactor 多线程模型设计进行了源码追踪以及模型图优化,希望对您有帮助,能够喜欢~

博文放在 RocketMQ 分类里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

vnjohn

作者

vnjohn

后端研发工程师。喜欢探索新技术,空闲时也折腾 AIGC 等效率工具。 可以在 GitHub 关注我了解更多,也可以加我微信(vnjohn) 与我交流。