前言
RocketMQ 专栏篇:
RocketMQ 生产者 DefaultMQProducer 源码分析
RocketMQ MQClientInstance 生产者启动源码分析
RocketMQ MessageQueue、MessageQueueSelector 结构
RocketMQ DefaultMQProducer#send 方法源码分析(一)
RocketMQ DefaultMQProducer#send 方法源码分析(二)
上篇文章【RocketMQ 通信机制底层数据结构及源码解析 】主要介绍了 RocketMQ 中底层的网络通信机制涉及到的数据结构以及线程模型通信,未做过多源码的介绍,这篇文章主要围绕着一块的源码解读.
new
在 Broker 服务端创建 BrokerController 时,会实例化 BrokerController,在里面会传递 NettyServerConfig、NettyClientConfig,如下:
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
Netty Server Boss 默认绑定的端口:10911
BrokerOuterAPI
在实例化 BrokerController 时,会先将 NettyRemotingClient 先创建好,它主要用来与其他 Broker 之间进行相互通信的,比如:当通过命令在某台 Broker 创建一个 Topic,会通过当前 Broker 组装好信息,发送给其他 Broker 进行 Topic 路由信息进行传递,以便于其他 Broker 都得知该 Topic 信息,进行消息的接收.
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
// ....
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
}
MQClientInstance
在生产者、消费者启动时,通过 MQClientManager#getOrCreateMQClientInstance会创建 MQClientInstance 实例,会将 NettyClientConfig 绑定好
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
// 客户端远程调用的处理器,接受来自 Broker 请求并做出响应
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// MQ 客户端 API 发起请求的类
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
// RocketMQ 网络模型的核心类
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
// 消费组数量发生变化,触发重平衡
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
// 消费者客户端重置偏移量
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
// 获取消费者状态
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
// 获取消费者运行的信息
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
// 消费消息
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
// 回复消息
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
NettyRemotingClient
NettyRemotingClient 充当 RocketMQ 网络通信模型下的客户端,生产者、消费者、Broker 都持有对它的引用进行使用,它整体的实例化过程源码如下:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
// 单向发送信号量数、异步发送信号量数 = 65535
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 使用公共线程池处理来自客户端的各种 Processor,最低线程数为 4、最大线程数为 CPU 核数
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
在其实例化时,提供了一个内部局部变量为 Bootstrap
private final Bootstrap bootstrap = new Bootstrap();
initialize
在实例化 BrokerController 期间,只是会将 Netty 服务端,给设置好,不做任何处理
NettyRemotingServer
调用 BrokerController#initialize 初始化方法时,会实例化 NettyRemotingServer
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// Processor 公共处理的线程池,当未指定 Executor 时
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 默认都是创建 EpollEventLoopGroup
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
- 创建 Semaphore Oneway 信号量:256,Semaphore Async 信号量:64
- 创建 Processor 公共处理的线程池,当 Processor 未指定 Executor 时,分配给这个 Executor 进行处理,公共的业务线程池
- 创建 1 个线程数的 EpollEventLoopGroup,Reactor 主线程
- 创建 3 个线程数的 EpollEventLoopGroup,Reactor 线程池
通过 useEpoll 方法来判别 EpollEventLoopGroup 还是 NioEventLoopGroup
private boolean useEpoll() {
// OS 类型:Windows、Linux
return RemotingUtil.isLinuxPlatform()
// 通过 NettyServerConfig.setUseEpollNativeSelector 方法设置是否开启 Epoll Selector 模型
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
在实例化 BrokerController 时已经设置 useEpollNativeSelector 变量为 true.
start
NettyRemotingServer
通过 BrokerController#start 方法再调用 NettyRemotingServer#start 方法启动 Netty Server 服务端
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
/*
1. SslHandler:SSL安全套接字协议
2. ⬇
3. FileRegionEncoder:文件区域采用 Zero-Copy SendFile 编码传输
4. ⬇
5. NettyEncoder:编码器
6. ⬇
7. NettyDecoder:解码器
8. ⬇
9. IdleStateHandler:空闲检查
10. ⬇
11. NettyConnectManageHandler:网络连接管理
12. ⬇
13. NettyServerHandler:服务端请求处理器
*/
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 在 TCP 协议中,当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求,然后依次处理
// ChannelOption.SO_BACKLOG 参数就是用来设置这个连接队列的大小
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
// 默认情况下,TCP 连接在 TIME_WAIT 状态时,不能立即被重用,必须等待一段时间才能重用
// 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接
.option(ChannelOption.SO_REUSEADDR, true)
// 用于开启或者关闭保活探测,默认情况下是关闭的
// 当 SO_KEEPALIVE 开启时,可以保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于 TCP 连接的输入
.option(ChannelOption.SO_KEEPALIVE, false)
// TCP_NODELAY 是禁用Nagle算法,即数据包立即发送出去
// 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法
// 如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送。默认为 false
.childOption(ChannelOption.TCP_NODELAY, true)
// 绑定本地端口 Broker:10911、NameSrv:9876
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
// 设置发送缓冲区大小
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
}
// 设置接收缓冲区大小
if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
}
// 设置写缓冲区大小
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
}
// 设置是否开启池化 ByteBufAllocator,采用默认的 PooledByteBufAllocator
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
启动 NettyRemotingServer 流程如下:
- 创建 DefaultEventExecutorGroup Worker 线程池,默认线程数量:8,线程名 prefix:NettyServerCodecThread_
- 通过 ServerBootstrap 指定好分组:Reactor 主线程、Reactor 线程池
- 创建 EpollServerSocketChannel,ServerSocketChannel 实现类
- 设置服务端参数,如下表
- 调用 io.netty.bootstrap.AbstractBootstrap#bind 方法创建一个 EpollServerSocketChannel,并且绑定好地址、端口
- 启动 NettyEventExecutor,它是一个单独的线程,用来接收来自 Netty 客户端空闲、关闭、连接、异常事件并进行监听回调处理.
- 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的客户端请求,并对它们进行处理,响应超时等待回调请求返回给客户端
参数名 | 参数值 | 参数描述 |
---|---|---|
ChannelOption.SO_BACKLOG | 1024 | 当服务器端接收到客户端的连接请求时,会创建一个连接队列来存储这些请求 |
ChannelOption.SO_REUSEADDR | true | 通过给套接字配置可重用属性,告诉操作系统内核,这样的 TCP 连接可以复用 TIME_WAIT 状态的连接 |
ChannelOption.SO_KEEPALIVE | false | 用于开启或者关闭保活探测,默认情况下是关闭的 |
ChannelOption.TCP_NODELAY | true | 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false |
ChannelOption.SO_SNDBUF | 0 | 设置发送缓冲区大小 |
ChannelOption.SO_RCVBUF | 0 | 设置接收缓冲区大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 设置写缓冲区大小 |
ChannelOption.ALLOCATOR | PooledByteBufAllocator.DEFAULT | 优先分配直接内存 |
Broker 服务端会在初始化阶段,通过调用 BrokerController#registerProcessor 方法注册,请求 -> Processor 处理器之间的映射关系,将其写入到 NettyRemotingAbstract#processorTable 集合中,当接收来自客户端请求时,代表输入由 Netty 最后一个处理器:NettyRemotingServer.NettyServerHandler 接收处理,执行其内部的 channelRead0 方法处理消息收到的请求,根据请求体 RequestCommand 携带的 code,从 processorTable 集合中找到 Pair 组合「Processor,Executor」等待 Broker 处理完成之后,再执行客户端的回调方法,返回给客户端具体的请求结果.
NettyRemotingClient
在执行 BrokerController#start 时,同时会将 BrokerOuterAPI 启动,也就是启动 NettyRemotingClient
在执行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法时,同时会将 MQClientAPIImpl 也启动,也就是启动 NettyRemotingClient
所以,从 Broker、生产者、消费者角度作为客户端,它们使用的都是同一个类 NettyRemotingClient 逻辑作为 Netty 客户端使用,以下是其启动时具体的源码:
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
// 默认线程数为 4
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互,NioSocketChannel 代表的就是非阻塞的 SocketChannel
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
// 数据包组装为更大的帧然后进行发送
.option(ChannelOption.TCP_NODELAY, true)
// 定时发送探测包来探测连接的对端是否存活
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
// DefaultEventExecutorGroup 用来执行以下五个 ChannelHandler
pipeline.addLast(
defaultEventExecutorGroup,
// 编码 -> 处理请求
new NettyEncoder(),
// 解码 -> 处理响应
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
// 远程调用->请求、响应处理器
new NettyClientHandler());
}
});
// 操作系统客户端发送缓冲区的大小
if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
}
// 操作系统客户端接收缓冲区的大小
if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
}
if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
}
// Timer 定时执行哪些请求过期的事件,每隔 3 秒
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
// 生产者、消费者客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
启动 NettyRemotingClient 流程如下:
- 创建 DefaultEventExecutorGroup Worker 线程池,用于向客户端发起写事件、接收读事件的处理
- 通过 group 绑定 Worker 主线程,创建 NioSocketChannel 非阻塞 SocketChannel
- 设置相关的客户端参数,如下表
- 设置客户端请求、响应时要执行的处理器逻辑,主要是:编码-NettyEncoder、解码-NettyDecoder、请求_响应处理器-NettyClientHandler
- 创建一个 Timer 定时器,每隔 3 秒扫描哪些超时等待的请求,并对它们进行处理,响应超时等待回调请求返回给业务调用方
参数名 | 参数值 | 参数描述 |
---|---|---|
ChannelOption.TCP_NODELAY | true | 如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true 关闭 Nagle 算法如果要减少发送次数减少网络交互,就设置为 false 等累积一定大小后再发送,默认为 false |
ChannelOption.SO_KEEPALIVE | false | 用于开启或者关闭保活探测,默认情况下是关闭的 |
CONNECT_TIMEOUT_MILLIS | 3000 | 连接超时时长 3 秒,在规定时间内未处理完成返回 Timeout 异常 |
ChannelOption.SO_SNDBUF | 0 | 客户端发送缓冲区的大小 |
ChannelOption.SO_RCVBUF | 0 | 客户端接收缓冲区的大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 设置写缓冲区大小 |
在作为客户端角度,只有当每次发起投递消息、消费消息请求时,才会创建与服务端之间的 Channel 通道,核心方法 NettyRemotingClient#createChannel 内部调用 Bootstrap#connect(java.net.SocketAddress) 建立与服务端之间的连接,然后再发起请求,请求的内容以及协议已经在本节专栏的上一篇博文讲到过了.
总结
该篇文章主要介绍在 RocketMQ remoting 底层通信模块中的 NettyRemotingServer、NettyRemotingClient 实例化、初始化、启动时源码的分析,在 BrokerController 实例化会优先构建好 Netty 客户端实例,在其初始化阶段会构建好 Netty 服务端实例,而在生产者、消费者侧,是在实例化 MQClientInstance 实例时会将 Netty 客户端实例也构建好,同时在 Broker、生产者、消费者启动时,会将对应的 Netty 服务端、客户端都一并启动,比编写文章不易,希望对您有帮助,能够喜欢~
博文放在 RocketMQ 分类里,欢迎订阅,会持续更新!
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!