当前位置:首页 > 行业动态 > 正文

如何确保消息队列中收发消息的顺序一致性?

消息队列(Message Queue)是一种用于在分布式系统中传递消息的机制,它能够保证消息的收发顺序。

在消息队列(Message Queue,简称MQ)中,顺序消息的收发是确保业务操作按预期顺序执行的重要机制,以下是关于MQ顺序消息收发的详细解答:

如何确保消息队列中收发消息的顺序一致性?  第1张

基本概念

1、顺序消息:顺序消息是指消息队列中的消息按照发送的顺序被消费,这种机制对于需要严格保证操作顺序的业务场景非常重要,例如电商系统中的订单处理、支付流程等。

2、全局顺序和分区顺序

全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费。

实现机制

1. RocketMQ

Producer

RocketMQ通过Producer发送顺序消息时,会使用MessageQueueSelector指定消息发送到固定的MessageQueue,这样可以确保相同业务操作的消息被发送到同一个队列中,从而实现局部有序。

对于全局有序,需要将所有消息都发送到同一个MessageQueue中。

Consumer

RocketMQ的消费者会注册一个监听器,进行消息的拉取和消费处理,对于顺序消息,消费者会通过加锁机制来保证消息消费的顺序性。

Broker端通过对MessageQueue进行加锁,确保同一个MessageQueue只能被同一个Consumer进行消费。

2. Kafka

Kafka在同一个Partition内保障消息顺序,如果Topic存在多个Partition则无法确保全局顺序,为了保障全局顺序,需要控制Partition数量为1个。

3. RabbitMQ

RabbitMQ中的queue是有序的消息集合,消息以FIFO方式进行排队和出队列,要实现RabbitMQ的顺序消息,需要配置一个Queue对应一个Consumer,并关闭autoack,prefetchCount=1,每次只消费一条信息,处理过后进行手工ack。

示例代码

1. RocketMQ示例代码

// Producer示例
public class Producer {
    private final String TOPIC = "OrderTopic";
    private DefaultMQProducer producer;
    public Producer() throws MQClientException {
        producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
    }
    public void sendMessage(Order order) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
        Message message = new Message(TOPIC, "create_order", JSON.toJSONBytes(order));
        producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, order.getId());
    }
    public void shutdown() {
        producer.shutdown();
    }
}
// Consumer示例
public class Consumer {
    private final String TOPIC = "OrderTopic";
    private DefaultMQPushConsumer consumer;
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) > {
            for (MessageExt msg : msgs) {
                System.out.println("Received message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

2. HTTP协议下的Java SDK示例代码(RocketMQ)

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class OrderProducer {
    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "${HTTP_ENDPOINT}",
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );
        final String topic = "${TOPIC}";
        final String instanceId = "${INSTANCE_ID}";
        MQProducer producer;
        if (instanceId != null && !instanceId.isEmpty()) {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }
        try {
            for (int i = 0; i < 8; i++) {
                TopicMessage pubMsg = new TopicMessage(
                        "hello mq!".getBytes(),
                        "A"
                );
                pubMsg.setShardingKey(String.valueOf(i % 2));
                pubMsg.getProperties().put("a", String.valueOf(i));
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        } finally {
            mqClient.shutdown();
        }
    }
}

注意事项

性能影响:顺序消息由于需要保证顺序性,可能会对性能产生影响,单个MessageQueue的消息量可能很大,而Consumer端消费时只能单线程消费,可能导致消息积压。

Broker故障:如果顺序消息所在的Broker发生故障,可能会导致消息顺序错乱。

配置要求:对于不同的MQ实现,顺序消息的配置要求也不同,Kafka需要控制Partition数量为1个,RabbitMQ需要配置普通queue并关闭autoack等。

希望以上信息能帮助您更好地理解MQ顺序消息的收发机制及其实现方法,如有更多问题,请随时提问。

0