好,现在我们把订单状态变化消息要发送给所有关心订单状态的系统上去,实现方式就是用消息队列。
在这种业务下,我们最想要的是什么?
1.消息的顺序:对于同一笔订单来说,状态的变化都是有严格的先后顺序的。
2.吞吐量:像订单的业务,我们自然希望订单越多越好。订单越多,吞吐量就越大。
在这种情况下,我们先看看 RabbitMQ 是怎么做的。
首先,对于发消息,并广播给多个消费者这种情况,RabbitMQ 会为每个消费者建立一个对应的队列。也就是说,如果有 10 个消费者,RabbitMQ 会建立 10 个对应的队列。然后,当一条消息被发出后,RabbitMQ 会把这条消息复制 10 份放到这 10 个队列里。
当 RabbitMQ 把消息放入到对应的队列后,我们紧接着面临的问题就是,我们应该在系统内部启动多少线程去从消息队列中获取消息。
如果只是单线程去获取消息,那自然没有什么好说的。但是多线程情况,可能就会有问题了……
RabbitMQ 有这么个特性,它在官方文档就声明了自己是不保证多线程消费同一个队列的消息,一定保证顺序的。而不保证的原因,是因为多线程时,当一个线程消费消息报错的时候,RabbitMQ 会把消费失败的消息再入队,此时就可能出现乱序的情况。
T0 时刻,队列中有四条消息 A1、B1、B2、A2。其中 A1、A2 表示订单 A 的两个状态:待付款、已付款。B1、B2 也同理,是订单 B 的待付款、已付款。
到了 T1 时刻,消息 A1 被线程 1 收到,消息 B1 被线程 2 收到。此时,一切都还正常。
到了 T3 时刻,B1 消费出错了,同时呢,由于线程 1 处理速度快,又从消息队列中获取到了 B2。此时,问题开始出现。
到了 T4 时刻,由于 RabbitMQ 线程消费出错,可以把消息重新入队的特性,此时 B1 会被重新放到队列头部。所以,如果不凑巧,线程 1 获取到了 B1,就出现了乱序情况,B2 状态明明是 B1 的后续状态,却被提前处理了。
所以,可以看到了,这个场景用 RabbitMQ,出现了三个问题:
1.为了实现发布订阅功能,从而使用的消息复制,会降低性能并耗费更多资源
2.多个消费者无法严格保证消息顺序
3.大量的订单集中在一个队列,吞吐量受到了限制
那么 Kafka 怎么样呢?Kafka 正好在这三个问题上,表现的要比 RabbitMQ 要好得多。
首先,Kafka 的发布订阅并不会复制消息,因为 Kafka 的发布订阅就是消费者直接去获取被 Kafka 保存在日志文件中的消息就好。无论是多少消费者,他们只需要主动去找到消息在文件中的位置即可。
其次,Kafka 不会出现消费者出错后,把消息重新入队的现象。
最后,Kafka 可以对订单进行分区,把不同订单分到多个分区中保存,这样,吞吐量能更好。
所以,对于这个需求 Kafka 更合适。
首先,先看看 RabbitMQ 的,你会发现 RabbitMQ 是允许在消息中添加 routing_key 或者自定义消息头,然后通过一些特殊的 Exchange,很简单的就实现了消息匹配分发。开发几乎不用成本。
而 Kafka 呢?如果你要实现消息匹配,开发成本高多了。
首先,通过简单的配置去自动匹配和分发到合适的消费者端这件事是不可能的。
其次,消费者端必须先把所有消息不管需要不需要,都取出来。然后,再根据业务需求,自己去实现各种精准和模糊匹配。可能因为过度的复杂性,还要引入规则引擎。
这个场景下 RabbitMQ 扳回一分。
RabbitMQ 和 Kafka 消息队列如何选?
先看下 RabbitMQ 的。
RabbitMQ 的消息自带手表,消息中有个 TTL 字段,可以设置消息在 RabbitMQ 中的存放的时间,超时了会被移送到一个叫死信队列的地方。
所以,延迟队列 RabbitMQ 最简单的实现方式就是设置 TTL,然后一个消费者去监听死信队列。当消息超时了,监听死信队列的消费者就收到消息了。
不过,这样做有个大问题:假设,我们先往队列放入一条过期时间是 10 秒的 A 消息,再放入一条过期时间是 5 秒的 B 消息。 那么问题来了,B 消息会先于 A 消息进入死信队列吗?
答案是否定的。B 消息会优先遵守队列的先进先出规则,在 A 消息过期后,和其一起进入死信队列被消费者消费。
在 RabbitMQ 的 3.5.8 版本以后,官方推荐的 rabbitmq delayed message exchange 插件可以解决这个问题。
·用了这个插件,我们在发送消息的时候,把消息发往一个特殊的 Exchange。
·同时,在消息头里指定要延迟的时间。
·收到消息的 Exchange 并不会立即把消息放到队列里,而是在消息延迟时间到达后,才会把消息放入。
再看下 Kafka 的:
Kafka 要实现延迟队列就很麻烦了。
·你先需要把消息先放入一个临时的 topic。
·然后得自己开发一个做中转的消费者。让这个中间的消费者先去把消息从这个临时的 topic 取出来。
·取出来,这消息还不能马上处理啊,因为没到时间呢。也没法保存在自己的内存里,怕崩溃了,消息没了。所以,就得把没有到时间的消息存入到数据库里。
·存入数据库中的消息需要在时间到了之后再放入到 Kafka 里,以便真正的消费者去执行真正的业务逻辑。
想想就已经头大了,这都快搞成调度平台了。再高级点,还要用时间轮算法才能更好更准确。
这次,RabbitMQ 上那一条条戴手表的消息,才是最好的选择。
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack