前言
分布式事务是涉及多个参与者(比如:数据库、消息队列、缓存等)的事务操作;在分布式系统中,多个节点之间协作,共同完成一个复杂的任务或操作过程。分布式事务需要确保在跨多个参与者的操作中,要么所有的操作都成功并永久保存,要么所有的操作都不执行,以确保数据的一致性和完整性
在传统的单机事务中,事务机制(比如:ACID,原子性、一致性、隔离性、持久性)能够确保数据库中的数据操作要么全部成功,要么全部失败;但在分布式环境中,涉及到多个数据库、服务和资源,实现这种事务保证更加复杂
分布式事务必须解决以下问题、挑战:
- 原子性:所有的操作要么都成功,要么都失败,不允许出现部分成功部分失败的情况
- 一致性:在分布式系统中,事务执行结束后,系统状态应该保持一致
- 隔离性:各个事务之间应该相互隔离,不影响
- 持久性:事务完成后,其结果应该被持久化,不会因为系统故障而丢失
常见的分布式事务解决方案包括:两阶段提交(Two-Phase Commit)、补偿事务(Compensating Transaction)、三阶段提交(Three-Phase Commit)、消息队列事务消息等
XA 协议、TCC 事务、最大努力通知等,这些解决方案的实现方式各有不同,但都需要考虑如何确保所有参与者的事务操作能够保持一致性,以及如何处理可能出现的异常情况
这些方法都在不同程度上解决了分布式环境下事务的一致性、可靠性问题;本文会通过消息队列 RocketMQ 事务消息的方式来保证分布式事务的执行
事务消息
RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
Apache RocketMQ 在 4.3.0 版本中已经支持分布式事务消息,这里 RocketMQ 采用了 2PC「两阶段提交」的思想来实现了事务消息,同时增加了一个补偿机制来处理二阶段超时或者失败的消息,如下图所示:
上图中说明了事务消息的大致方案,其中分为了两个流程:正常事务消息的发送及提交、事务消息的补偿流程
RocketMQ 事务消息流程概要
事务消息发送及提交
1、发送消息 Half 半消息
2、服务端 Broker 响应消息写入结果,将原有 Topic 进行替换写入到 Broker 内置的 Topic 中后,进行 Pending 状态
3、根据发送的结果执行本地事务(如果写入失败,此时 Half 半消息对业务不可见,本地逻辑不执行)
4、根据本地事务状态之下 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)
事务消息的补偿流程
1、对没有 Commit/Rollback 的事务消息(pending 状态的消息)从服务端发起一次 “回查”
2、Producer 生产者收到回查的消息,检查回查的消息对应的本地事务状态
3、通过本地事务状态,重新执行 Commit 或者 Rollback
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况
RocketMQ 事务消息设计
事务消息在一阶段对用户不可见
在 RocketMQ 事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?
RocketMQ 事务消息的做法:若消息是 half 半消息,备份原有的消息主题、消息消费队列,然后改变主题为 ==RMQ_SYS_TRANS_HALF_TOPIC==
由于消费组未订阅该主题,故消费端无法消费 half 类型的消息,然后 RocketMQ 会开启一个定时任务,从 Topic:RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发起回查事务状态请求,根据事务状态来决定是提交或者回滚消息.
在 RocketMQ 中,消息在服务端 Broker 的存储结构如下,每条消息都会有对应的索引消息,Consumer 通过 ConsumeQueue 这个二级索引来读取消息实体内容,其流程如下:
RocketMQ 具体实现策略:写入的如果是事务消息,对消息的 Topic、Queue 等属性进行替换,同时将原来的 Topic、Queue 信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知到消息的存在,不会消费。
其实改变消息主题是 RocketMQ 常用 “套路”,RocketMQ 延迟消息的实现机制也是基于此套路来实现的
Commit、Rollback 操作以及 Op 消息的引入
在第一阶段写入一条对用户不可见的消息以后,二阶段需要作 Commit 操作,则必然要让消息能够被用户可见;如果是 Rollback 则需要撤销一阶段的消息,对于 Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息
实际上 RocketMQ 也无法真正的删除一条消息,因为是消息顺序写入到文件中的;但是区别于这条消息没有确定状态「Pending 状态,事务悬而未决」需要一个操作来标识这条消息的最终状态
RocketMQ 事务消息方案中引入了 Op 消息的概念,用 Op 消息标识事务消息已经确定的状态「Commit 或者 Rollback」如果一条事务消息没有对应的 Op 消息,说明这个事务的状态还无法确定(可能是二阶段失败了)引入 Op 消息后,事务消息无论是 Commit 或者 Rollback 都会记录一个 Op 操作
Commit 相对于 Rollback 只是在写入 Op 消息前创建 Half 消息的索引.
在这里,分享一张源码的时序图部分,从投递事务消息—服务端处理事务消息的核心流程
Op 消息的存储和对应关系 RocketMQ 将 Op 消息写入到全局特定的一个 Topic 中,通过源码中的方法:TransactionalMessageUtil#buildOpTopic,这个 Topic「==RMQ_SYS_TRANS_OP_HALF_TOPIC==」是一个内部的 Topic(像 Half 消息中的 Topic 一样)不会被用户消费;Op 消息的内容为对应的 Half 消息存储的 Offset 偏移量,这样通过 Op 消息能索引到 Half 消息进行后续的回查操作
在事务消息 commit、rollback 时会将半消息写入到 Op Topic 中,在 commit 阶段时会先将消息内容存储到 Consumer Topic 中,以便在消费者侧能够监听到该消息进行消费处理.
Half 消息的索引构建
在执行二阶段 Commit 操作时,需要构建出 Half 半消息的索引;一阶段的 Half 消息由于是写入到一个 RocketMQ 内置的 Topic,所以二阶段构建索引时需要读取出 Half 消息,并将 Topic、Queue 替换为真正目标的 Topic、Queue,之后通过一次普通消息的写入操作来生成一条对用户消费者可见的消息
RocketMQ 事务消息二阶段其实是利用了一阶段存储的消息内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息的写入过程
==在 RocketMQ dashboard 控制台或通过提供的命令都无法看到 Op Topic 信息==
@Test
public void adminLocal() throws Exception {
DefaultMQAdminExt admin = new DefaultMQAdminExt();
admin.setNamesrvAddr("172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876");
admin.start();
ClusterInfo clusterInfo = admin.examineBrokerClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
Set<Map.Entry<String, BrokerData>> entries = brokerAddrTable.entrySet();
for (Map.Entry<String, BrokerData> next : entries) {
System.out.println(next.getKey() + " " + next.getValue());
}
System.out.println("topic list:");
TopicList topicList = admin.fetchAllTopicList();
Set<String> sets = topicList.getTopicList();
sets.forEach(System.out::println);
System.out.println("topic route | info");
TopicRouteData transOpHalfTopic = admin.examineTopicRouteInfo("RMQ_SYS_TRANS_OP_HALF_TOPIC");
System.out.println(transOpHalfTopic);
}
在控制台会提示错误:org.apache.rocketmq.client.exception.MQClientException: CODE: 17 DESC: No topic route info in name server for the topic: RMQ_SYS_TRANS_OP_HALF_TOPIC
一开始不信邪了,它不是说属于 Broker 服务端内置的 Topic 嘛,但是它怎么会查询不到呢,思而索之,网上也没有相关的介绍,随即向 ChatGPT 抛出了一个问题,它给出了理解,如下:
所以,我仔细看了一下源码,发现这个 Op Topic 确实只有在 Broker 服务端这一侧进行交互使用,并没有在 Producer 生产侧这一块看到有使用这块的内容.
如何处理二阶段失败的消息
若在 RocketMQ 事务消息的二阶段过程中失败了,例如:在做 Commit 操作时,出现网络问题会导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 Commit,RocketMQ 采用了一种补偿机制,称为 “回查”。
Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个 Group 中的 Producer)由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback。
Broker 端通过对比 Half 消息和 Op 消息进行事务消息的回查并且推进 CheckPoint(记录哪些事务消息的状态是确定的)
值得注意的是,RocketMQ 并不会无休止的信息事务状态回查,默认回查:15 次,如果 15 次回查还是无法得知事务状态,RocketMQ 默认回滚该消息.
以下的源码来自于 BrokerConfig 服务端配置文件
/**
* 首先要检查的事务性消息最小时间间隔,只有一个消息超过这个时间间隔才可以检查
*/
@ImportantField
private long transactionTimeOut = 6 * 1000;
/**
* 消息被检查的最大次数,如果超过此值,该消息将会被丢弃
*/
@ImportantField
private int transactionCheckMax = 15;
若要调整该配置只需要将以下的参数在 broker.conf 配置文件进行配置即可.
回查事务消息源码时序图
1、在 Broker 启动阶段会选举好 Controller 角色——createBrokerController
2、在创建 Controller 角色时会调用 initialize 方法进行初始化事务消息处理类—TransactionalMessageService、事务消息检查类—TransactionalMessageCheckService
3、在启动以前将所有的准备工作都做好以后,就会启动 Controller,在启动期间,会执行:startProcessorByHa 方法,启动一些核心的处理类,比如:TransactionalMessageCheckService
事务消息核心检查类:TransactionalMessageCheckService,它实现了 Runnable 接口,重写了 run 方法,它每隔一分钟才会执行一次,调用的是 CountDownLatch#await 方法,阻塞时间为 1 分钟,当 1 分钟过后,会调用 onWaitEnd 方法执行具体的检查逻辑过程
4、在检查时,会获取到当前 Broker 服务端配置的最大间隔次数、最大间隔时间,进行当次任务的检查过程,实际调用的是 TransactionalMessageService#check 方法进行处理
check 方法中会对比 Half Topic、Op Topic 中的消息,两者相互比对偏移量,将已经处理过的消息存入到单独的集合中,同时,在这方法中,会比对每一条检查的消息是否满足(检查最大次数、单次消息检查间隔时间)当满足条件以后,调用 sendCheckMessage 方法给生产者端发出请求。
5、当这条 Half 半消息满足条件,准备执行发送时,会调用 Broker2Client#checkProducerTransactionState 方法执行「请求编码:39」类型的消息采用单向发送的方式给到生产者端进行处理
在生产者侧,ClientRemotingProcessor 用于接收来自 Broker 端发过来的消息,在此类中会对各自不同请求的编码格式进行处理,在这里,我们先只分析事务这一块的消息
1、采用 Netty Channel 接收消息,拿到消息编码为 39 时,会执行检查事务状态的逻辑
2、随即会转交给生产者调用异步线程进行事务消息的回查处理
3、在生产者异步线程处理事务消息的时候,会调用自定义事务监听器的方法:checkLocalTransaction 进行事务消息的提交、回滚、Unknow 状态判定
创建事务生产者时,需要创建 TransactionMQProducer 事务生产者实例,同时需要为这个事务生产者绑定好一个实现 TransactionListener 接口的自定义监听处理,主要做的事就是如何执行本地事务、本地事务状态回查.
后续在 RocketMQ 事务消息实战篇章节,来详细阐述这块的流程
总结
该篇博文主要分析了 RocketMQ 事务消息流程中的事务 Half 消息发送以及本地事务消息的补偿流程,重点的是结合 GitHub 部分对 RocketMQ 这块的设计部分进行整体了阐述,博主通过时序图+源码具体实现的部分对其进行了更深层次的描述,RocketMQ 源码部分在接收到事务消息时的处理过程以及在对事务消息这一块的补偿处理部分,希望能够加深你对事务消息这块的理解,感谢三连支持~
🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!
博文放在 RocketMQ 分类里,欢迎查看,会持续更新!
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!