基于Spring的RabbitTemplate实现消息事务
分布式系统中常见一种情况,就是数据库操作成功之后发送MQ消息。
分布式消息常见问题
数据库操作之后发送MQ消息通常会遇到一些问题,理论上消息和事务要同时成功才算一个完整的事务,那到底该把发送MQ放到数据库事务之外还是数据库事务之内?
下面分析下可能存在的问题:
- 消息放到数据库事务之内
- 事务处理异常,回滚事务——ok
- 消息发送异常,回滚事务——ok
- 消息发送成功提交事务——ok
- 消息发送成功提交失败——不ok,不好处理,一般MQ也不能撤销消息,而且消费端可能已经在处理了
- 消息放到数据库事务之外
- 事务处理异常,回滚事务——ok,不用发消息
- 事务处理成功,发送消息成功——ok
- 事务处理成功,消息发送失败——不ok,消息丢失
- 在2的基础上增加本地消息表,放到同一个数据库,业务操作完成之后把需要发送的MQ消息插入本地消息表中
- 事务处理异常,回滚事务——ok,不用发消息,消息表也回滚
- 事务处理成功,发送消息成功
- 更新消息表状态成功——ok
- 更新消息表状态失败——ok(定时任务补偿)
- 事务处理成功,消息发送失败——ok(定时任务补偿)
- 通过定时扫描失败消息重新发送MQ
- 重发消息需保证幂等性——ok
分布式消息事务处理
常见的处理逻辑是本地消息表+消息重试补偿
RabbitTemplate配置和使用
我们使用RabbitMQ作为消息队列,因此我们可以使用spring-rabbit帮助实现mq发送(前提是已经安装了RabbitMQ了)。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-amqp.version}</version>
</dependency>
application.yml配置:
spring:
rabbitmq:
virtual-host: /
username: appws
password: xxxxxx
addresses: 10.181.57.239:5672
publisherConfirms: true
然后就可以注入RabbitTemplate
了,代码片段如下:
// 消息队列配置
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_ROUTEKEY = "test.routekey";
@Bean
public Exchange testExchange() {
return new TopicExchange(TEST_EXCHANGE);
}
@Bean
public Queue testQueue() {
return new Queue(TEST_QUEUE, true);
}
@Bean
public Binding testBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(TEST_ROUTEKEY).noargs();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void test(){
// 一些DB操作
rabbitTemplate.convertAndSend(TEST_EXCHANGE, TEST_ROUTEKEY, param);
// 其他DB操作等
}
拦截器处理RabbitTemplate事务
从上面的代码片段可以看出,业务方法使用了@Transactional
注解使用了事务之后,rabbitTemplate.convertAndSend
方法并没有专门放到事务之外,这个时候一旦有异常,可能造成消息发送成功,但是事务异常回滚的问题。要解决这个问题,需要把rabbitTemplate.convertAndSend
移动到事务之外,但是通常都配置的声明式事务,不能简单的把代码移动到外面,这个需要利用Spring事务的一个特性TransactionSynchronization
,注册一个同步钩子,自动把相关代码放到事务完成之后执行,我们使用拦截器拦截rabbitTemplate.convertAndSend
方法,实现不用修改现有代码自动把发送MQ消息逻辑移到事务之外:
RabbitTemplateTransactionInterceptor.java
代码详情:
@Aspect
@Order(50)
@Component
public class RabbitTemplateTransactionInterceptor {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(RabbitTemplateTransactionInterceptor.class);
/**
* 代理convertAndSend方法够用
*/
@Pointcut("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object))")
public void convertAndSend() {
// noop
}
@Around("convertAndSend()")
public void aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
if (TransactionSynchronizationManager.isSynchronizationActive()
&& TransactionSynchronizationManager.isActualTransactionActive() // 事务开启判断
&& args.length == 3) {
logger.info("拦截RabbitTemplate发送:{}", args);
// 注册同步器
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() { // 事务提交之后执行
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
});
} else { // 没有开启事务或者参数不正确就直接执行,不处理
joinPoint.proceed();
}
}
}
注:定时扫描和消息重试在另外的逻辑中。