RocketMQ NettyRemotingServer、NettyRemotingClient 源码分析

作者: vnjohn / 发表于 2024-07-08 / 分类: RocketMQ

RocketMQ, 源码

前言

RocketMQ 专栏篇:

RocketMQ 单节点、集群节点部署

RocketMQ 分布式事务源码分析

RocketMQ 分布式事务实战

RocketMQ 生产者 DefaultMQProducer 源码分析

RocketMQ MQClientInstance 生产者启动源码分析

RocketMQ MessageQueue、MessageQueueSelector 结构

RocketMQ DefaultMQProducer#send 方法源码分析(一)

RocketMQ DefaultMQProducer#send 方法源码分析(二)

RocketMQ 通信机制底层数据结构及源码解析

上篇文章【RocketMQ 通信机制底层数据结构及源码解析 】主要介绍了 RocketMQ 中底层的网络通信机制涉及到的数据结构以及线程模型通信,未做过多源码的介绍,这篇文章主要围绕着一块的源码解读.

new

在 Broker 服务端创建 BrokerController 时,会实例化 BrokerController,在里面会传递 NettyServerConfig、NettyClientConfig,如下:

final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);

Netty Server Boss 默认绑定的端口:10911

BrokerOuterAPI

在实例化 BrokerController 时,会先将 NettyRemotingClient 先创建好,它主要用来与其他 Broker 之间进行相互通信的,比如:当通过命令在某台 Broker 创建一个 Topic,会通过当前 Broker 组装好信息,发送给其他 Broker 进行 Topic 路由信息进行传递,以便于其他 Broker 都得知该 Topic 信息,进行消息的接收.

public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
 	this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    // ....
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
 	this.remotingClient = new NettyRemotingClient(nettyClientConfig);
	this.remotingClient.registerRPCHook(rpcHook);
}

MQClientInstance

在生产者、消费者启动时,通过 MQClientManager#getOrCreateMQClientInstance会创建 MQClientInstance 实例,会将 NettyClientConfig 绑定好

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
	this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    // 客户端远程调用的处理器,接受来自 Broker 请求并做出响应
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    // MQ 客户端 API 发起请求的类
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
        final ClientRemotingProcessor clientRemotingProcessor,
        RPCHook rpcHook, final ClientConfig clientConfig) {
	this.clientConfig = clientConfig;
    // RocketMQ 网络模型的核心类
    this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
	this.clientRemotingProcessor = clientRemotingProcessor;
	this.remotingClient.registerRPCHook(rpcHook); 		
	this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
   	// 消费组数量发生变化,触发重平衡
    this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
    // 消费者客户端重置偏移量
    this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
    // 获取消费者状态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
    // 获取消费者运行的信息
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
    // 消费消息
    this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
    // 回复消息
    this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

NettyRemotingClient

NettyRemotingClient 充当 RocketMQ 网络通信模型下的客户端,生产者、消费者、Broker 都持有对它的引用进行使用,它整体的实例化过程源码如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
   final ChannelEventListener channelEventListener) {
   // 单向发送信号量数、异步发送信号量数 = 65535
   super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
   this.nettyClientConfig = nettyClientConfig;
   this.channelEventListener = channelEventListener;
   int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
   if (publicThreadNums <= 0) {
       publicThreadNums = 4;
   }
   // 使用公共线程池处理来自客户端的各种 Processor,最低线程数为 4、最大线程数为 CPU 核数
   this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
       private AtomicInteger threadIndex = new AtomicInteger(0);

       @Override
       public Thread newThread(Runnable r) {
           return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
       }
   });
   this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
       private AtomicInteger threadIndex = new AtomicInteger(0);

       @Override
       public Thread newThread(Runnable r) {
           return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
       }
   });
   if (nettyClientConfig.isUseTLS()) {
       try {
           sslContext = TlsHelper.buildSslContext(true);
           log.info("SSL enabled for client");
       } catch (IOException e) {
           log.error("Failed to create SSLContext", e);
       } catch (CertificateException e) {
           log.error("Failed to create SSLContext", e);
           throw new RuntimeException("Failed to create SSLContext", e);
       }
   }
}

在其实例化时,提供了一个内部局部变量为 Bootstrap

 private final Bootstrap bootstrap = new Bootstrap();

initialize

在实例化 BrokerController 期间,只是会将 Netty 服务端,给设置好,不做任何处理

NettyRemotingServer

调用 BrokerController#initialize 初始化方法时,会实例化 NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
                           final ChannelEventListener channelEventListener) {
  super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
  this.serverBootstrap = new ServerBootstrap();
  this.nettyServerConfig = nettyServerConfig;
  this.channelEventListener = channelEventListener;
  int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
  if (publicThreadNums <= 0) {
    publicThreadNums = 4;
  }
  // Processor 公共处理的线程池,当未指定 Executor 时
  this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
    }
  });
  // 默认都是创建 EpollEventLoopGroup
  if (useEpoll()) {
    this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
      }
    });
    this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      private int threadTotal = nettyServerConfig.getServerSelectorThreads();

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
      }
    });
  } else {
    this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
      }
    });

    this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      private int threadTotal = nettyServerConfig.getServerSelectorThreads();

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
      }
    });
  }

  loadSslContext();
}
  1. 创建 Semaphore Oneway 信号量:256,Semaphore Async 信号量:64
  2. 创建 Processor 公共处理的线程池,当 Processor 未指定 Executor 时,分配给这个 Executor 进行处理,公共的业务线程池
  3. 创建 1 个线程数的 EpollEventLoopGroup,Reactor 主线程
  4. 创建 3 个线程数的 EpollEventLoopGroup,Reactor 线程池

通过 useEpoll 方法来判别 EpollEventLoopGroup 还是 NioEventLoopGroup

private boolean useEpoll() {
  // OS 类型:Windows、Linux
  return RemotingUtil.isLinuxPlatform()
    // 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型
    && nettyServerConfig.isUseEpollNativeSelector()
    && Epoll.isAvailable();
}

在实例化 BrokerController 时已经设置 useEpollNativeSelector 变量为 true.

start

NettyRemotingServer

通过 BrokerController#start 方法再调用 NettyRemotingServer#start 方法启动 Netty Server 服务端

public void start() {
  this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
    nettyServerConfig.getServerWorkerThreads(),
    new ThreadFactory() {

      private AtomicInteger threadIndex = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
      }
    });

  prepareSharableHandlers();

  /*
 1. SslHandler:SSL安全套接字协议
 2. ⬇
 3. FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输
 4. ⬇
 5. NettyEncoder:编码器
 6. ⬇
 7. NettyDecoder:解码器
 8. ⬇
 9. IdleStateHandler:空闲检查
 10. ⬇
 11. NettyConnectManageHandler:网络连接管理
 12. ⬇
 13. NettyServerHandler:服务端请求处理器
   */
  ServerBootstrap childHandler =
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
    // 在 TCP 协议中,当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求,然后依次处理
    // ChannelOption.SO_BACKLOG 参数就是用来设置这个连接队列的大小
    .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
    // 默认情况下,TCP 连接在 TIME_WAIT 状态时,不能立即被重用,必须等待一段时间才能重用
    // 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接
    .option(ChannelOption.SO_REUSEADDR, true)
    // 用于开启或者关闭保活探测,默认情况下是关闭的
    // 当 SO_KEEPALIVE 开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于 TCP 连接的输入
    .option(ChannelOption.SO_KEEPALIVE, false)
    // TCP_NODELAY 是禁用Nagle算法,即数据包立即发送出去
    // 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
    // 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送。默认为 false
    .childOption(ChannelOption.TCP_NODELAY, true)
    // 绑定本地端口 Broker:10911、NameSrv:9876
    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
    .childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
          .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
          .addLast(defaultEventExecutorGroup,
                   encoder,
                   new NettyDecoder(),
                   new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                   connectionManageHandler,
                   serverHandler
                  );
      }
    });
  // 设置发送缓冲区大小
  if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
    log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
    childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
  }
  // 设置接收缓冲区大小
  if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
    log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
    childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
  }
  // 设置写缓冲区大小
  if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
    log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
             nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
    childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
      nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
  }
  // 设置是否开启池化 ByteBufAllocator,采用默认的 PooledByteBufAllocator
  if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  }

  try {
    ChannelFuture sync = this.serverBootstrap.bind().sync();
    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
    this.port = addr.getPort();
  } catch (InterruptedException e1) {
    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
  }

  if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
  }

  this.timer.scheduleAtFixedRate(new TimerTask() {

    @Override
    public void run() {
      try {
        NettyRemotingServer.this.scanResponseTable();
      } catch (Throwable e) {
        log.error("scanResponseTable exception", e);
      }
    }
  }, 1000 * 3, 1000);
}

启动 NettyRemotingServer 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,默认线程数量:8,线程名 prefix:NettyServerCodecThread_
  2. 通过 ServerBootstrap 指定好分组:Reactor 主线程、Reactor 线程池
  3. 创建 EpollServerSocketChannel,ServerSocketChannel 实现类
  4. 设置服务端参数,如下表
  5. 调用 io.netty.bootstrap.AbstractBootstrap#bind 方法创建一个 EpollServerSocketChannel,并且绑定好地址、端口
  6. 启动 NettyEventExecutor,它是一个单独的线程,用来接收来自 Netty 客户端空闲、关闭、连接、异常事件并进行监听回调处理.
  7. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的客户端请求,并对它们进行处理,响应超时等待回调请求返回给客户端
参数名参数值参数描述
ChannelOption.SO_BACKLOG1024当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求
ChannelOption.SO_REUSEADDRtrue通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_SNDBUF0设置发送缓冲区大小
ChannelOption.SO_RCVBUF0设置接收缓冲区大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小
ChannelOption.ALLOCATORPooledByteBufAllocator.DEFAULT优先分配直接内存

Broker 服务端会在初始化阶段,通过调用 BrokerController#registerProcessor 方法注册,请求 -> Processor 处理器之间的映射关系,将其写入到 NettyRemotingAbstract#processorTable 集合中,当接收来自客户端请求时,代表输入由 Netty 最后一个处理器:NettyRemotingServer.NettyServerHandler 接收处理,执行其内部的 channelRead0 方法处理消息收到的请求,根据请求体 RequestCommand 携带的 code,从 processorTable 集合中找到 Pair 组合「Processor,Executor」等待 Broker 处理完成之后,再执行客户端的回调方法,返回给客户端具体的请求结果.

NettyRemotingClient

在执行 BrokerController#start 时,同时会将 BrokerOuterAPI 启动,也就是启动 NettyRemotingClient

在执行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法时,同时会将 MQClientAPIImpl 也启动,也就是启动 NettyRemotingClient

所以,从 Broker、生产者、消费者角度作为客户端,它们使用的都是同一个类 NettyRemotingClient 逻辑作为 Netty 客户端使用,以下是其启动时具体的源码:

public void start() {
  this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
    // 默认线程数为 4
    nettyClientConfig.getClientWorkerThreads(),
    new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
      }
    });
  // 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互,NioSocketChannel 代表的就是非阻塞的 SocketChannel
  Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
    // 数据包组装为更大的帧然后进行发送
    .option(ChannelOption.TCP_NODELAY, true)
    // 定时发送探测包来探测连接的对端是否存活
    .option(ChannelOption.SO_KEEPALIVE, false)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (nettyClientConfig.isUseTLS()) {
          if (null != sslContext) {
            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
            log.info("Prepend SSL handler");
          } else {
            log.warn("Connections are insecure as SSLContext is null!");
          }
        }
        // DefaultEventExecutorGroup 用来执行以下五个 ChannelHandler
        pipeline.addLast(
          defaultEventExecutorGroup,
          // 编码 -> 处理请求
          new NettyEncoder(),
          // 解码 -> 处理响应
          new NettyDecoder(),
          new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
          new NettyConnectManageHandler(),
          // 远程调用->请求、响应处理器
          new NettyClientHandler());
      }
    });
  // 操作系统客户端发送缓冲区的大小
  if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
    log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
    handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
  }
  // 操作系统客户端接收缓冲区的大小
  if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
    log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
    handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
  }
  if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
    log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
             nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
    handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
      nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
  }
  // Timer 定时执行哪些请求过期的事件,每隔 3 秒
  this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
      try {
        NettyRemotingClient.this.scanResponseTable();
      } catch (Throwable e) {
        log.error("scanResponseTable exception", e);
      }
    }
  }, 1000 * 3, 1000);
  // 生产者、消费者客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理
  if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
  }
}

启动 NettyRemotingClient 流程如下:

  1. 创建 DefaultEventExecutorGroup Worker 线程池,用于向客户端发起写事件、接收读事件的处理
  2. 通过 group 绑定 Worker 主线程,创建 NioSocketChannel 非阻塞 SocketChannel
  3. 设置相关的客户端参数,如下表
  4. 设置客户端请求、响应时要执行的处理器逻辑,主要是:编码-NettyEncoder、解码-NettyDecoder、请求_响应处理器-NettyClientHandler
  5. 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的请求,并对它们进行处理,响应超时等待回调请求返回给业务调用方
参数名参数值参数描述
ChannelOption.TCP_NODELAYtrue如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false
ChannelOption.SO_KEEPALIVEfalse用于开启或者关闭保活探测,默认情况下是关闭的
CONNECT_TIMEOUT_MILLIS3000连接超时时长 3 秒,在规定时间内未处理完成返回 Timeout 异常
ChannelOption.SO_SNDBUF0客户端发送缓冲区的大小
ChannelOption.SO_RCVBUF0客户端接收缓冲区的大小
ChannelOption.WRITE_BUFFER_WATER_MARK0设置写缓冲区大小

在作为客户端角度,只有当每次发起投递消息、消费消息请求时,才会创建与服务端之间的 Channel 通道,核心方法 NettyRemotingClient#createChannel 内部调用 Bootstrap#connect(java.net.SocketAddress) 建立与服务端之间的连接,然后再发起请求,请求的内容以及协议已经在本节专栏的上一篇博文讲到过了.

总结

该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的 NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动时源码的分析,在 BrokerController 实例化会优先构建好 Netty 客户端实例,在其初始化阶段会构建好 Netty 服务端实例,而在生产者、消费者侧,是在实例化 MQClientInstance 实例时会将 Netty 客户端实例也构建好,同时在 Broker、生产者、消费者启动时,会将对应的 Netty 服务端、客户端都一并启动,比编写文章不易,希望对您有帮助,能够喜欢~

博文放在 RocketMQ 分类里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

vnjohn

作者

vnjohn

后端研发工程师。喜欢探索新技术,空闲时也折腾 AIGC 等效率工具。 可以在 GitHub 关注我了解更多,也可以加我微信(vnjohn) 与我交流。