消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?


这四个问题是面试中遇到过最多的,今天就来详细解答一下

如何保证消息不丢失

就我们市面上常见的消息队列而言,只要配置得当,我们的消息就不会丢。

先来看看这个图

可以看到一共有三个阶段,分别是生产消息、存储消息和消费消息。我们从这三个阶段分别入手来看看如何确保消息不会丢失。

生产消息

生产者发送消息至Broker,需要处理Broker的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。

这样就能保证在生产消息阶段消息不会丢失。

存储消息

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢(假如怕两台都挂了..那就再多些)。

消费消息

这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给Broker消费成功,这是不对的。

你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

小结一下

可以看出,保证消息的可靠性需要三方配合。

  • 生产者需要处理好Broker的响应,出错情况下利用重试、报警等手段。
  • Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。
  • 消费者需要在执行完真正的业务逻辑之后再返回响应给Broker。

但是要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

如何处理重复消息

下面从Producer,Broker,Consumer三个方面分析为什么产生消息重复

Producer

生产者在同步发送消息的时候,Broker收到消息给生产者发送ack的时候可能会因为网络问题而丢失,导致生产者重复发送消息

Broker

Consumer

消费者收到消息在处理过程中可能会宕机导致没有及时上报偏移量,导致所有partition重新分配,就导致消息重复消费了。

消息重复了如何处理呢?根据分析上面消息重复产生的原因可知消息重复是不可避免的,那只能从消费者端来处理消息重复问题了。

  • 幂等性

幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

这需要开发者在写代码的时候处理。

  • 根据消息id去重

每个消息生成一个唯一id,每次处理成功后将消息id保存到redis中,每次接收到消息,从reids中判断消息是否处理过。

这种方法不能彻底解决消息重复问题,但是能解决大部分消息重复问题。

如何确保消息有序性

全局有序

如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!

不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可。

部分有序

因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

如何处理消息堆积处理

消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

因此我们需要先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。

假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。

当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。

参考

[1]消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?

br>


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录