RocketMQ 分布式事务实战

作者: vnjohn / 发表于 2023-11-30 / 分类: RocketMQ

RocketMQ, 源码

前言

在上一篇文章:保护数据完整性:探索 RocketMQ 分布式事务消息的力量 详细分析了「事务消息设计方面及源码相关层面」讲解,事务半消息的发送及提交、事务消息的补偿过程

在 RocketMQ 中由于网络故障原因或业务应用程序异常宕机导致事务消息未及时的完成处理,提供了事务消息补偿机制>检查本地事务执行状态的方法,为整个流程二阶段提交完成了不可忽视的异常消息补偿机制。

接下来,会通过以下两个链路中的第一条链路进行实战演练,确保在 RocketMQ 事务消息处理过程中,这两者的事务状态能够确保一致完成.

1、创建订单完成、预扣减库存 2、订单支付完成、实扣减库存

业务设计流程

在这里插入图片描述

1、事务生产者发送半消息到 Broker 服务端的 Half Topic 中,实际发送半消息的 Topic 是真实的 Topic,在这里会被替换为「RMQ_SYS_TRANS_HALF_TOPIC」存储到日志文件中

2、在 Broker 服务端将半消息存储到日志文件以后,若发送半消息的结果是成功的,那么就会执行「订单服务客户端」本地事务方法「executeLocalTransaction」

3、会同步等待本地事务方法执行的结果,再根据执行结果、消息体投递消息类型「EndTransactionRequestHeader」给到 Broker 服务端进行处理,该请求由 Broker 服务端「EndTransactionProcessor」进行处理.

4、在 EndTransactionProcessor 中会根据本地事务处理的结果,进行判别

1、若本地事务执行成功,在 Broker 服务端会将半消息对应的 Topic 调整为真实的 Topic 消息进行存储到日志文件中,随即在库存服务的消费者才能消费到这条消息,从而再对库存进行扣减,同时标记好半消息,确保在==定时检查事务消息==时不会再次被扫描到进行处理 2、若本地事务执行失败,在 Broker 服务端会将半消息标记为「已处理」不会让定时触发的事务消息检查机制进行扫描到 当然,定时任务的数据处理,不能确保它有时间的误差性,所以说执行成功或执行失败的事务消息,会在补偿机制进行再一次的校验

业务设计源码

基础 SQL 脚本

CREATE TABLE `order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_no` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '订单编号',
  `amount` bigint DEFAULT NULL COMMENT '订单金额',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

CREATE TABLE `stock` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `lock_stock` int DEFAULT NULL COMMENT '锁定库存',
  `stock` int DEFAULT NULL COMMENT '真实库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

基础依赖

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spring-boot.version>2.6.7</spring-boot.version>
    <jackson.version>2.11.0</jackson.version>
    <mysql.version>8.0.17</mysql.version>
    <alibaba-druid.version>1.2.8</alibaba-druid.version>
    <mybatis-plus.version>3.4.2</mybatis-plus.version>
</properties>
<dependencies>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.vnjohn</groupId>
        <artifactId>blog-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

基础配置

订单服务、库存服务在实际的生产过程中,会各自有各自的库,在本地环境中演练中采用一个库进行模拟.

订单服务,作为事务消息生产者

server:
  port: 8088

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  transaction:
    producer: order_transaction
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876

库存服务,作为事务消息消费者

server:
  port: 8085

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876
  consumer-group: order_transaction_group
  order:
    create:
      topic: order_transaction
      tag: withholding_stock

基础依赖代码库

/**
 * Spring Context 工具类
 *
 * @author vnjohn
 */
@Component
public class SpringContextUtils implements ApplicationContextAware {
    public final static String SPRING_CONTEXT_UTILS_COMPONENT = "springContextUtils";
    public static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtils.applicationContext = applicationContext;
    }

    /**
     * 获取HttpServletRequest
     */
    public static HttpServletRequest getHttpServletRequest() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
    }

    public static HttpServletResponse getHttpServletResponse() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> c) {
        return applicationContext.getBean(c);
    }

    public static <T> Map<String, T> getBeanOfType(Class<T> c) {
        return applicationContext.getBeansOfType(c);
    }

    public static <T> T getBean(String name, Class<T> requiredType) {
        return applicationContext.getBean(name, requiredType);
    }

    public static boolean containsBean(String name) {
        return applicationContext.containsBean(name);
    }

    public static boolean isSingleton(String name) {
        return applicationContext.isSingleton(name);
    }

    public static Class<? extends Object> getType(String name) {
        return applicationContext.getType(name);
    }

    /**
     * 获取当前环境
     */
    public static String getActiveProfile() {
        return applicationContext.getEnvironment().getActiveProfiles()[0];
    }
}

模块代码

订单服务

订单 DO 实体

@Data
@TableName("`order`")
@EqualsAndHashCode(callSuper = false)
public class OrderDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

订单数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface OrderMapper extends BaseMapper<OrderDO> {
}

订单仓储层

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderRepository {
    @Resource
    private OrderMapper orderMapper;

    public Order queryByOrderNo(String orderNo) {
        OrderDO orderDO = orderMapper.selectOne(new QueryWrapper<OrderDO>()
                .lambda().eq(OrderDO::getOrderNo, orderNo));
        return BeanUtils.copy(orderDO, Order.class);
    }
}

库存服务

库存 DO 实体

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Data
@TableName("`stock`")
@EqualsAndHashCode(callSuper = false)
public class StockDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    private Long skuId;

    private Integer lockStock;

    private Integer stock;

}

库存数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface StockMapper extends BaseMapper<StockDO> {
}

库存仓储层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Slf4j
@Component
public class StockRepository {
    @Resource
    private StockMapper stockMapper;

    public void preDecreaseStock(String orderNo, Long skuId) {
        // 订单号与 SkuId 可用于做日志记录,这里默认给它数量记为 1
        log.info("订单号:{}", orderNo);
        StockDO stockDO = stockMapper.selectOne(new QueryWrapper<StockDO>()
                .lambda().eq(StockDO::getSkuId, skuId));
        if (null == stockDO) {
            return;
        }
        int currentLockStock = stockDO.getLockStock() + 1;
        stockDO.setLockStock(currentLockStock);
        // 此处最好采用乐观锁+ CAS 方式进行更新
        stockMapper.updateById(stockDO);
    }
}

生成订单

订单领域实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class Order {
    /**
     * id
     */
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class CreateOrder {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单领域执行器

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderCreateHandler {
    @Resource
    private OrderMapper orderMapper;

    public Boolean handle(CreateOrder order) {
        // 在这里模拟订单异常或创建系统
        // 1、订单创建逻辑,涉及到表结构及数据会比较多,这里不做多阐述
        String orderNo = order.getOrderNo();
        return orderMapper.insert(BeanUtils.copy(order, OrderDO.class)) > 0;
    }

}

RocketMQ 事务消息

订单服务生产者

首先要在订单服务能够投递事务消息,应该先实例化一个事务生产者

事务生产者实例

/**
 * 订单服务专门用来提供事务消息的生产者
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderTransactionProducer {
    @Value("${rocketmq.transaction.producer}")
    private String transactionProducerName;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    private static TransactionMQProducer PRODUCER;

    /**
     * 获取事务生产者实例对象
     *
     * @return 事务生产者实例
     */
    public static TransactionMQProducer getInstance() {
        return PRODUCER;
    }

    /**
     * 若未定义线程,检测事务半消息的线程默认只有一个,当同时出现多条事务半消息需要检测时,就退化为队列的方式进行入队,要进行排队处理,从而降低了并发、并行数
     *
     * <p>
     *  public TransactionMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
     *          super(namespace, producerGroup, rpcHook);
     *          this.checkThreadPoolMinSize = 1;
     *          this.checkThreadPoolMaxSize = 1;
     *          this.checkRequestHoldMax = 2000;
     *  }
     * </p>
     */
    @PostConstruct
    public void initTransactionProducer() {
        try {
            PRODUCER = new TransactionMQProducer(transactionProducerName);
            PRODUCER.setNamesrvAddr(namesrvAddr);
            PRODUCER.setTransactionListener(new OrderTransactionListener());
            // 自定义线程池处理,用于追踪消息投递时的日志,能够追踪到具体的投放线程,所必要参数,不设置采用的是默认的线程池
//          producer.setExecutorService();
            PRODUCER.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            log.error("创建事务生产者异常,", e);
        }
    }
}

事务生产者监听器

与事务生产者必须绑定好的一个关键>监听器,用这个监听器来判别如何做事务消息的后续处理

/**
 * 订单服务「本地事务消息-半消息」处理
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
public class OrderTransactionListener implements TransactionListener {
    /**
     * unknow 消息最大重试次数设置为 3,当超过 3 次以后进行默认成功且记录到本地日志表中
     */
    private static Integer MAX_RETRY_TIME = 3;

    /**
     * 用于存储本地事务执行的结果:事务id->订单id
     */
    private ConcurrentHashMap<String, String> TRANSACTION_ORDER_MAP = new ConcurrentHashMap<>();

    /**
     * 用于存储本地事务检查次数的结果:事务id->check 次数
     * 源码中会默认检查为 15 次,一次的时间间隔为 6s,由于那个属于全局的配置
     * 在这里可自定义适配次数,时间还是按照默认的配置来进行处理
     */
    private ConcurrentHashMap<String, Integer> UNKNOW_TRANSACTION_CHECK_MAP = new ConcurrentHashMap<>();


    /**
     * 执行本地事务的处理逻辑
     *
     * @param message 待发送的消息内容
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 当前执行的事务 id
        String transactionId = message.getTransactionId();
        CreateOrder createOrder = (CreateOrder) o;
        TRANSACTION_ORDER_MAP.put(transactionId, createOrder.getOrderNo());
        // TODO 捕获创建订单后的执行结果
        try {
            // 伪代码创建订单
            OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
            Order order = orderRepository.queryByOrderNo(createOrder.getOrderNo());
            // 订单已存在,不再做处理
            if (Objects.nonNull(order)) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            OrderCreateHandler orderCreateHandler = SpringContextUtils.getBean(OrderCreateHandler.class);
            Boolean handleResult = orderCreateHandler.handle(createOrder);
            if (handleResult) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception orderException) {
            // 消息进行回滚,在消费者那一侧是无法观察此消息的
            log.error("创建订单出现异常,", orderException);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 由于网络问题,导致非业务异常,可能是订单已经写入数据库成功了没有及时地去处理事务消息状态
        // 在这里需要去进行 check 检查本地事务是否执行有误
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 事务 producer 会从 broker 获取到未处理的事务消息列表,进行依次处理
     * 检查本地事务状态,当出现「事务消息生产者」宕机时,该方法仍然会对未处理的事务消息进行检测
     * 对于 unknow 消息,会 1s 进行一次定期处理,该参数可调整
     *
     * @param messageExt 消息扩展类信息
     * @return 本地事务执行状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 检查本地事务表是否存在订单号
        String transactionId = messageExt.getTransactionId();
        log.info("checkLocalTransaction transactionId:{}", transactionId);
        String orderNo = TRANSACTION_ORDER_MAP.get(transactionId);
        // unknow 消息进行重试三次,超出后不再做处理,不然 unknow 消息会一直在控制台打印进行处理,视为无效的工作
        // 增加补偿机制,用于处理 unknow 重试的消息,当重试的消息是提交或回滚状态,则调用相关的方法进行处理
        Integer checkCount = UNKNOW_TRANSACTION_CHECK_MAP.get(transactionId);
        if (Objects.isNull(checkCount) || checkCount < MAX_RETRY_TIME) {
            checkCount = Objects.isNull(checkCount) ? 1 : ++checkCount;
            log.info("transactionId-{},check 检查次数:{}", transactionId, checkCount);
            UNKNOW_TRANSACTION_CHECK_MAP.put(transactionId, checkCount);
            return LocalTransactionState.UNKNOW;
        }
        // 检查次数超出预定阈值,可记录到日志
        if (checkCount.equals(MAX_RETRY_TIME)) {
            log.info("transactionId-{},check 检查次数超出", transactionId);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 反查订单数据当前订单是否已经创建
        OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
        Order order = orderRepository.queryByOrderNo(orderNo);
        if (Objects.nonNull(order)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

}

事务消息实体

/**
 * 订单库存消息体
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class OrderWithStockMessage {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 数量
     */
    private Integer quantity;

}

事务消息投递

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderUnifiedCommandHandler {

    public void handler(String orderNo) {
        // 1、订单数据幂等校验成功
        // 2、调用生产者发送事务半消息
        // 假设接收到订单支付已完成的标识
        OrderWithStockMessage stockMessageBody = new OrderWithStockMessage();
        stockMessageBody.setSkuId(1001L);
        stockMessageBody.setQuantity(2);
        stockMessageBody.setOrderNo(orderNo);
        Message stockMessage = new Message(
                "order_transaction",
                "withholding_stock",
                UUID.randomUUID().toString(),
                JsonUtils.objToJsonStr(stockMessageBody).getBytes()
        );
        CreateOrder order = new CreateOrder();
        order.setSkuId(1001L);
        order.setAmount(100L);
        order.setUserId(888L);
        order.setOrderNo(orderNo);

        // putUserProperty 该方法,通过网络传递给 consumer 一些用户自定义参数,可以用来校验做其他的业务逻辑处理
        // stockMessage.putUserProperty("action", );

        // 发送的是半消息
        // Message msg, Object arg
        // 第一个参数:本地事务处理成功以后,需要进行发送的消息体内容
        // 第二个参数:作为本地事务用于检测或执行本地事务时的对象体
        try {
            TransactionSendResult transactionSendResult = OrderTransactionProducer.getInstance().sendMessageInTransaction(stockMessage, order);
            log.info("事务执行结果:{}", JsonUtils.objToJsonStr(transactionSendResult.getLocalTransactionState()));
        } catch (MQClientException e) {
            log.error("订单预生成,事务半消息 send fail,", e);
        }
    }
}

库存服务消费者

/**
 * 订单消费者,消费来自订单服务投递的消息
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class CreateOrderPreStockConsumer {
    @Value("${rocketmq.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    @Value("${rocketmq.order.create.topic}")
    private String orderCreateTopic;

    @Value("${rocketmq.order.create.tag}")
    private String orderCreateTag;

    @PostConstruct
    public void initCreateOrderConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe(orderCreateTopic, orderCreateTag);
            consumer.registerMessageListener(new CreateOrderStockMessageListener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    public static class CreateOrderStockMessageListener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            String msgId = messageExt.getMsgId();
            byte[] body = messageExt.getBody();
            String bodyValue = new String(body);
            log.info("当前订单创建成功,预扣减库存信息:{}", bodyValue);
            String orderNo = JSON.parseObject(bodyValue).getString("orderNo");
            Long skuId = JSON.parseObject(bodyValue).getLong("skuId");
            StockRepository stockRepository = SpringContextUtils.getBean(StockRepository.class);
            stockRepository.preDecreaseStock(orderNo, skuId);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

Tips

采用两阶段提交的事务消息,先提交一个半消息,然后执行本地事务,再发送一个 commit 的半消息;若这个 commit 半消息失败了,MQ 是基于第一个半消息不断的反查本地事务执行状态来进行后续流程的推进的,这样只有当本地事务提交成功,最终 MQ 消息也会发送成功,若本地事务 rollback,那么 MQ 消息不会再进行发送,会标记这条半消息的状态为「已处理」从而保证了两者之间的一致性

执行本地事务方法+本地事务定时检查,结合起来来保证事务消息执行的一致性

总结

该篇博文主要是通过实际的业务代码来进行 RocketMQ 事务消息实战,上一篇博文从 RocketMQ 事务消息的整体设计以及相关的源码的讲解,这篇通过订单生成、库存预扣减的简单例子来对事务消息的这块流程进行细粒化的业务设计,事务消息生产者的本地事务消息与补偿事务消息结合起来保证订单创建成功以后,库存才进行预扣减,希望这篇简单的 RocketMQ 事务消息实战博文能够帮助到您理解事务消息的实际应用,期待三连支持!

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

vnjohn

作者

vnjohn

后端研发工程师。喜欢探索新技术,空闲时也折腾 AIGC 等效率工具。 可以在 GitHub 关注我了解更多,也可以加我微信(vnjohn) 与我交流。