Rabbit MQ动态创建多个队列和监听多线程处理
问题背景
最近公司有个订单同步和处理需求,A系统有新建订单,添加产品等一系列操作,需要同步到B系统,我们使用了Rabbit MQ中间件,A系统中在订单创建、修改等操作完成之后发送消息到Rabbit MQ,然后B系统消费消息,调用接口获取数据并把订单信息保存下来。
消息是有顺序的,要保证消息顺序消费,通常情况下一个队列的消费者只能有一个,否则不能保证顺序。
但是单个消费者的消费能力有限,因此需要增加消费者来处理消息。
可选方案
公司业务只要求同一个订单按顺序处理,因此要顺序消费并提升处理速度,必须把订单分到多个队列中,相同的订单要进入同一个订单队列(可以按照订单号取模等方式),这样既保证订单顺序,又保证不同的订单能并发处理。
方案 | 说明 | 最终选择 |
---|---|---|
启动多个服务进程 | 把队列名称配置在服务配置中,每个进程监听一个队列 配置不太方便,增加或减少消费者数量也不太方便 |
|
启动单个服务进程 | 配置多个队列和监听,多线程处理 有单点风险,不过配置很方便 |
√ |
最终我们选择的是启动一个服务进程,然后动态创建多个队列和监听的方式,也就是多个线程处理业务。
实现方式
项目基于Spring Boot,使用Spring来管理MQ队列、Exchange等,因此我们使用Spring来动态创建指定数量的监听和队列,数量可以通过配置文件配置,随着业务增长比较方便增加队列数量。
具体是通过实现BeanDefinitionRegistryPostProcessor来动态创建多个队列和多个监听,另外系统中有多个MQ队列,使用spring-multirabbit-lib开源库来处理(配置名称为:my-sync-mq)。
@Component
public class CustomSimpleMessageBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
/**
* 指定队列、Exchange等使用哪个Admin来声明
*/
private static final String ADMINS_THAT_SHOULD_DECLARE = "adminsThatShouldDeclare";
/**
* 系统中有多个消息队列服务器,目前用于同步的消息队列名称,参考spring-multirabbit-lib配置
*/
public static final String MY_SYNC_MQ_NAME = "my-sync-mq";
/**
* 配置信息,用于提前获取application.yml配置文件的配置信息
*/
private Environment environment;
/**
* 手动动态注册Exchange、Queue、MessageListener等为Spring管理的Bean
*
* @param registry
*/
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
// 从配置文件获取配置信息,包含队列名称和数量信息等
Integer queueCount = NumberUtils.toInt(environment.resolvePlaceholders("custom.queue.count"));
String queueName = environment.resolvePlaceholders("custom.queue.name");
String exchangeName = environment.resolvePlaceholders("custom.exchange.name");
String routeKey = environment.resolvePlaceholders("custom.route.key");
// 创建动态数量的监听
processMessageListeners(registry, queueCount, queueName);
// 使用同一个Exchange
processExchange(registry, exchangeName);
// 创建动态数量的队列和绑定
processQueueAndBindings(registry, queueCount, "customDynamic", exchangeName, queueName, routeKey);
}
/**
* 系统中有多个RabbitMQ,只在同步对接的MQ中声明队列<br>
* spring-multirabbit-lib中RabbitAdmin的Bean名字格式:<code>${connectionFactoryName}-admin</code>
*
* @return
*/
protected RuntimeBeanReference getDeclareAdmin() {
return new RuntimeBeanReference(MY_SYNC_MQ_NAME + MultiRabbitConstants.RABBIT_ADMIN_SUFFIX);
}
/**
* 队列名称规则:queueName,queueName1,queueName2
*
* @param queueName
* @param index
* @return
*/
protected String getIndexQueueName(String queueName, Integer index) {
return StringUtils.join(queueName, index == null || index == 0 ? null : index);
}
/**
* 处理Exchange Bean
*
* @param registry
* @param exchangeName
*/
protected void processExchange(BeanDefinitionRegistry registry, String exchangeName) {
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(DirectExchange.class);
beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, exchangeName);
beanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
registry.registerBeanDefinition("customDynamicSyncExchange", beanDefinition);
}
/**
* 处理队列和绑定Bean
*
* @param registry
* @param queueCount
* @param prefix
* @param exchangeName
* @param queueName
* @param routeKey
*/
protected void processQueueAndBindings(BeanDefinitionRegistry registry, Integer queueCount, String prefix, String exchangeName, String queueName, String routeKey) {
for (Integer i = 0; i < queueCount; i++) {
String calcQueueName = getIndexQueueName(queueName, i);
String calcRouteKey = getIndexQueueName(routeKey, i);
String queueBeanName = getIndexQueueName(prefix + "Queue", i);
String bindingBeanName = getIndexQueueName(prefix + "Binding", i);
// 注册队列Bean
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(Queue.class);
beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, calcQueueName);
beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, true);
beanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
registry.registerBeanDefinition(queueBeanName, beanDefinition);
// 注册Binding Bean
GenericBeanDefinition bindingBeanDefinition = new GenericBeanDefinition();
bindingBeanDefinition.setBeanClass(Binding.class);
bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, calcQueueName);
bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(2, exchangeName);
bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(3, calcRouteKey);
bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(4, new HashMap<>());
bindingBeanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
registry.registerBeanDefinition(bindingBeanName, bindingBeanDefinition);
}
}
/**
* 配置动态监听MessageListener
*
* @param registry
* @param queueCount
* @param customQueueName
*/
protected void processMessageListeners(BeanDefinitionRegistry registry, Integer queueCount, String customQueueName) {
boolean autoStartup = environment.getProperty("spring.multirabbitmq.connections.my-sync-mq.listener.simple.auto-startup", Boolean.class, false);
for (Integer i = 0; i < queueCount; i++) {
String queueName = getIndexQueueName(customQueueName, i);
String beanName = getIndexQueueName(StringUtils.uncapitalize(CustomSimpleMessageListenerContainer.class.getSimpleName()), i);
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(CustomSimpleMessageListenerContainer.class);
beanDefinition.getPropertyValues().addPropertyValue("queueNames", queueName);
beanDefinition.getPropertyValues().addPropertyValue("connectionFactory", new RuntimeBeanReference(ConnectionFactory.class));
beanDefinition.getPropertyValues().addPropertyValue("messageListener", new RuntimeBeanReference("customSyncMessagingListener"));
beanDefinition.getPropertyValues().addPropertyValue("autoStartup", autoStartup);
registry.registerBeanDefinition(beanName, beanDefinition);
}
}
/**
* 自定义消息监听,指定connectionFactory名称,有多个MQ服务器需要指定
*/
private static class CustomSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
@Override
protected String getRoutingLookupKey() {
return MY_SYNC_MQ_NAME;
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}
监听消息处理:
@Component
public class CustomSyncMessagingListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(CustomSyncMessagingListener.class);
public void doSyncOrder(String messageStr) {
logger.info("========{}", messageStr);
// 处理业务逻辑
}
@Override
public void onMessage(Message message) {
MessageProperties messageProperties = message.getMessageProperties();
logger.info("处理Queue[{}]...", messageProperties.getConsumerQueue());
String messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
doSyncOrder(messageStr);
}
}
目前已经上线并使用,提升消费端处理能力。