Rabbit MQ动态创建多个队列和监听多线程处理

Rabbit MQ动态创建多个队列和监听多线程处理

问题背景

最近公司有个订单同步和处理需求,A系统有新建订单,添加产品等一系列操作,需要同步到B系统,我们使用了Rabbit MQ中间件,A系统中在订单创建、修改等操作完成之后发送消息到Rabbit MQ,然后B系统消费消息,调用接口获取数据并把订单信息保存下来。

消息是有顺序的,要保证消息顺序消费,通常情况下一个队列的消费者只能有一个,否则不能保证顺序。

但是单个消费者的消费能力有限,因此需要增加消费者来处理消息。

可选方案

公司业务只要求同一个订单按顺序处理,因此要顺序消费并提升处理速度,必须把订单分到多个队列中,相同的订单要进入同一个订单队列(可以按照订单号取模等方式),这样既保证订单顺序,又保证不同的订单能并发处理。

方案 说明 最终选择
启动多个服务进程 把队列名称配置在服务配置中,每个进程监听一个队列
配置不太方便,增加或减少消费者数量也不太方便
启动单个服务进程 配置多个队列和监听,多线程处理
有单点风险,不过配置很方便

最终我们选择的是启动一个服务进程,然后动态创建多个队列和监听的方式,也就是多个线程处理业务。

实现方式

项目基于Spring Boot,使用Spring来管理MQ队列、Exchange等,因此我们使用Spring来动态创建指定数量的监听和队列,数量可以通过配置文件配置,随着业务增长比较方便增加队列数量。

具体是通过实现BeanDefinitionRegistryPostProcessor来动态创建多个队列和多个监听,另外系统中有多个MQ队列,使用spring-multirabbit-lib开源库来处理(配置名称为:my-sync-mq)。

@Component
public class CustomSimpleMessageBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    /**
     * 指定队列、Exchange等使用哪个Admin来声明
     */
    private static final String ADMINS_THAT_SHOULD_DECLARE = "adminsThatShouldDeclare";
    /**
     * 系统中有多个消息队列服务器,目前用于同步的消息队列名称,参考spring-multirabbit-lib配置
     */
    public static final String MY_SYNC_MQ_NAME = "my-sync-mq";
    /**
     * 配置信息,用于提前获取application.yml配置文件的配置信息
     */
    private Environment environment;
    /**
     * 手动动态注册Exchange、Queue、MessageListener等为Spring管理的Bean
     *
     * @param registry
     */
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
        // 从配置文件获取配置信息,包含队列名称和数量信息等
        Integer queueCount = NumberUtils.toInt(environment.resolvePlaceholders("custom.queue.count"));
        String queueName = environment.resolvePlaceholders("custom.queue.name");
        String exchangeName = environment.resolvePlaceholders("custom.exchange.name");
        String routeKey = environment.resolvePlaceholders("custom.route.key");
        // 创建动态数量的监听
        processMessageListeners(registry, queueCount, queueName);
        // 使用同一个Exchange
        processExchange(registry, exchangeName);
        // 创建动态数量的队列和绑定
        processQueueAndBindings(registry, queueCount, "customDynamic", exchangeName, queueName, routeKey);
    }
    /**
     * 系统中有多个RabbitMQ,只在同步对接的MQ中声明队列<br>
     * spring-multirabbit-lib中RabbitAdmin的Bean名字格式:<code>${connectionFactoryName}-admin</code>
     *
     * @return
     */
    protected RuntimeBeanReference getDeclareAdmin() {
        return new RuntimeBeanReference(MY_SYNC_MQ_NAME + MultiRabbitConstants.RABBIT_ADMIN_SUFFIX);
    }
     /**
     * 队列名称规则:queueName,queueName1,queueName2
     *
     * @param queueName
     * @param index
     * @return
     */
    protected String getIndexQueueName(String queueName, Integer index) {
        return StringUtils.join(queueName, index == null || index == 0 ? null : index);
    }
    /**
     * 处理Exchange Bean
     *
     * @param registry
     * @param exchangeName
     */
    protected void processExchange(BeanDefinitionRegistry registry, String exchangeName) {
        GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
        beanDefinition.setBeanClass(DirectExchange.class);
        beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, exchangeName);
        beanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
        registry.registerBeanDefinition("customDynamicSyncExchange", beanDefinition);
    }
    /**
     * 处理队列和绑定Bean
     *
     * @param registry
     * @param queueCount
     * @param prefix
     * @param exchangeName
     * @param queueName
     * @param routeKey
     */
    protected void processQueueAndBindings(BeanDefinitionRegistry registry, Integer queueCount, String prefix, String exchangeName, String queueName, String routeKey) {
        for (Integer i = 0; i < queueCount; i++) {
            String calcQueueName = getIndexQueueName(queueName, i);
            String calcRouteKey = getIndexQueueName(routeKey, i);
            String queueBeanName = getIndexQueueName(prefix + "Queue", i);
            String bindingBeanName = getIndexQueueName(prefix + "Binding", i);
            // 注册队列Bean
            GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
            beanDefinition.setBeanClass(Queue.class);
            beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, calcQueueName);
            beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, true);
            beanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
            registry.registerBeanDefinition(queueBeanName, beanDefinition);
            // 注册Binding Bean
            GenericBeanDefinition bindingBeanDefinition = new GenericBeanDefinition();
            bindingBeanDefinition.setBeanClass(Binding.class);
            bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(0, calcQueueName);
            bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
            bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(2, exchangeName);
            bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(3, calcRouteKey);
            bindingBeanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(4, new HashMap<>());
            bindingBeanDefinition.getPropertyValues().add(ADMINS_THAT_SHOULD_DECLARE, getDeclareAdmin());
            registry.registerBeanDefinition(bindingBeanName, bindingBeanDefinition);
        }
    }
    /**
     * 配置动态监听MessageListener
     *
     * @param registry
     * @param queueCount
     * @param customQueueName
     */
    protected void processMessageListeners(BeanDefinitionRegistry registry, Integer queueCount, String customQueueName) {
        boolean autoStartup = environment.getProperty("spring.multirabbitmq.connections.my-sync-mq.listener.simple.auto-startup", Boolean.class, false);
        for (Integer i = 0; i < queueCount; i++) {
            String queueName = getIndexQueueName(customQueueName, i);
            String beanName = getIndexQueueName(StringUtils.uncapitalize(CustomSimpleMessageListenerContainer.class.getSimpleName()), i);
            GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
            beanDefinition.setBeanClass(CustomSimpleMessageListenerContainer.class);
            beanDefinition.getPropertyValues().addPropertyValue("queueNames", queueName);
            beanDefinition.getPropertyValues().addPropertyValue("connectionFactory", new RuntimeBeanReference(ConnectionFactory.class));
            beanDefinition.getPropertyValues().addPropertyValue("messageListener", new RuntimeBeanReference("customSyncMessagingListener"));
            beanDefinition.getPropertyValues().addPropertyValue("autoStartup", autoStartup);
            registry.registerBeanDefinition(beanName, beanDefinition);
        }
    }
    /**
     * 自定义消息监听,指定connectionFactory名称,有多个MQ服务器需要指定
     */
    private static class CustomSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
        @Override
        protected String getRoutingLookupKey() {
            return MY_SYNC_MQ_NAME;
        }
    }
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {}
    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

监听消息处理:

@Component
public class CustomSyncMessagingListener implements MessageListener {

    private static final Logger logger = LoggerFactory.getLogger(CustomSyncMessagingListener.class);

    public void doSyncOrder(String messageStr) {
        logger.info("========{}", messageStr);
        // 处理业务逻辑
    }

    @Override
    public void onMessage(Message message) {
        MessageProperties messageProperties = message.getMessageProperties();
        logger.info("处理Queue[{}]...", messageProperties.getConsumerQueue());
        String messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
        doSyncOrder(messageStr);
    }
}

目前已经上线并使用,提升消费端处理能力。

上一篇
下一篇