05 如何确保消息不会丢失?
你好,我是李玥。这节课我们来聊聊丢消息的事儿。
对于刚刚接触消息队列的同学,最常遇到的问题,也是最头痛的问题就是丢消息了。对于大部分业务系统来说,丢消息意味着数据丢失,是完全无法接受的。
其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。
绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的消息队列提供的API不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。
这节课我们就来讲一下,消息队列是怎么保证消息可靠传递的,这里面的实现原理是怎么样的。当你熟知原理以后,无论你使用任何一种消息队列,再简单看一下它的API和相关配置项,就能很快知道该如何配置消息队列,写出可靠的代码,避免消息丢失。
检测消息丢失的方法
我们说,用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道。一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。
如果是IT基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在Producer端,我们给每个发出的消息附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。
如果没有消息丢失,Consumer收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号+1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。
首先,像Kafka和RocketMQ这样的消息队列,它是不保证在Topic上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果你的系统中Producer是多实例的,由于并不好协调多个Producer之间的发送顺序,所以也需要每个Producer分别生成各自的消息序号,并且需要附加上Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。
Consumer实例的数量最好和分区数量一致,做到Consumer和分区一一对应,这样会比较方便地在Consumer内检测消息序号的连续性。
确保消息可靠传递
讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。
你可以看下这个图,一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。
- 生产阶段: 在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端。
- 存储阶段: 在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段: 在这个阶段,Consumer从Broker上拉取消息,经过网络传输发送到Consumer上。
1. 生产阶段
在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以Kafka为例,我们看一下如何可靠地发送消息:
同步发送时,只要注意捕获异常即可。
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
2. 存储阶段
在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。
对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在RocketMQ中,需要将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘。
如果是Broker是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2个以上的节点,再给客户端回复发送确认响应。这样当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失。后面我会专门安排一节课,来讲解在集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的。
3. 消费阶段
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
同样,我们以用Python语言消费RabbitMQ消息为例,来看一下如何实现一段可靠的消费代码:
def callback(ch, method, properties, body):
print(" [x] 收到消息 %r" % body)
# 在这儿处理收到的消息
database.save(body)
print(" [x] 消费完成")
# 完成消费业务逻辑后发送消费确认响应
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
你可以看到,在消费的回调方法callback中,正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。
小结
这节课我带大家分析了一条消息从发送到消费整个流程中,消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。
- 在生产阶段,你需要捕获消息发送的错误,并重发消息。
- 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个Broker宕机或者磁盘损坏而丢失。
- 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
你在理解了这几个阶段的原理后,如果再出现丢消息的情况,应该可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步深入分析,快速找到问题原因。
思考题
我刚刚讲到,如果消息在网络传输过程中发送错误,由于发送方收不到确认,会通过重发来保证消息不丢失。但是,如果确认响应在网络传输时丢失,也会导致重发消息。也就是说,无论是Broker还是Consumer都是有可能收到重复消息的,那我们在编写消费代码时,就需要考虑这种情况,你可以想一下,在消费消息的代码中,该如何处理这种重复消息,才不会影响业务逻辑的正确性?欢迎在留言区与我分享讨论。
感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给你的朋友。
- 钱 👍(124) 💬(4)
如何确保消息不会丢失 1:WAL 2:分布式WAL 除非地球爆炸,否则问题不大。 猜测各种消息队列或者数据库,确保消息不丢只能这么玩,就连人也一样,脑袋记不住那就写下来,怕本子弄潮啦!那就光盘、U盘、磁盘、布头、木头、石头北京、上海、深圳都各写一份。 consumer接到重复消息,那就业务去重,怎么去? 1:业务处理逻辑本身就是幂等的,那天然就去掉了 2:业务处理逻辑非幂等,那就消息先去重,根据业务ID(标识消息唯一性的就行),去查询是否消费过此消息了,消费了,则抛弃,否则就消费
2019-08-20 - ly 👍(72) 💬(5)
老师,我有几个理解: 当produer发送消息给blocker的时候(send方法),此方法会在blocker收到消息并正常储存后才返回,此期间应该会阻塞,也就是如果blocker配置同步刷盘,可能会增加调用时间(只能出现对消息敏感的场景)。 另外拉消息的时候,消费者A进行pull后,没有返回确认给blocker就挂了(或者因代码问题导致一直阻塞),这时消息应该还在blocker的,消费者B如果此时pull消息,是否会拉取到刚刚那条给消费者A的消息?衍生的疑问就是两个消费者先后去拉消息是否能拉到同一条消息(在前者未给blocker发确认的前提下)。 对于消费者处理重复消息的问题:一般消息中都会存在一个唯一性的东西,不管是消息队列本身的msgId还是业务订单号之类的,可以在db中存在一个消费表,对这个唯一性东西建立唯一索引,每次处理消费者逻辑之前先insert进去,让数据库来帮我们排重我觉得是最保险的。
2019-08-02 - 王立光 👍(62) 💬(5)
假如消费时由于某种原因,一直没发ack。rocketmq是不是会一直发这条消息,这样导致下面消息都无法被消费?
2019-08-15 - 南山 👍(34) 💬(2)
老师,再看一次这个的时候有个疑问: 1,broker给producer发送ack,网络原因或者其他原因producer没收到,这种情况broker这条消息怎么办? 2,broker给producer发送ack,broker怎么知道producer有没有成功收到这条ack信息呢?
2019-10-19 - 陈天柱 👍(27) 💬(5)
老师您好,我最近处理一个分布式事务场景,就是支付成功以后回调处理刷新订单状态和刷新优惠券状态的分布式事务问题。我有一个疑问就是,假如刷新订单状态失败,就没有发消息到队列中,而是在刷新订单状态成功了以后,再发消息到队列中,只要保证消息不丢的前提下,分布式事务能得到保证吗?换言之,就是消息不丢的话,可以保证最终一致性,事务消息的回查也能保证最终一致性,两者的概念感觉有一些交集。所以希望老师能抽空帮忙分析一下,万分感谢🙏
2019-10-19 - QQ怪 👍(16) 💬(1)
建议老师加餐如何做幂等性
2019-08-01 - 皮蛋 👍(15) 💬(4)
rocketmq默认失败重试次数是2,如果2次均失败,或者说重试次数都失败了,这种情况一般在实际生产中是怎么处理的?
2019-08-04 - angel😇txy🤓 👍(14) 💬(7)
思考题,从两个环节展开,生产端到broker,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是: (1)全局唯一 (2)MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽 有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。 broker到consumer,由消费端做好幂等性,比如根据业务流水号去重
2019-08-20 - 游弋云端 👍(11) 💬(1)
1、消费端支持幂等操作,业务上一般有难度; 2、消费端增加去冗余机制,例如缓存最新消费成功的N条消息的SN,收到消息后,先确认是否是消费过的消息,如果是,直接应该ACK,并放弃消费。
2019-08-01 - TH 👍(10) 💬(1)
幂等性是一种办法,如果做不到幂等性,那么在消费端需要存储消费的消息ID,关键这个ID什么时候存?如果是消费前就存,那么消费失败了,下次消费同样的消息,是否会认为上次已经成功了?如果在消费成功后再存,那么消费会不会出现部分成功的情况?除非满足事务ACID特性。 关于消息丢失检查还有一点疑问:如果靠ID连续性来检查,是不是说一个producer只能对应一个consumer?
2019-08-01 - Better me 👍(9) 💬(3)
对于思考题,我认为也可以像老师说的那样查看消息是否丢失的方法,如果Producer的某条消息ack相应因为网络故障丢失,那么Producer此时重发消息的唯一标识应该和之前那条消息是一样的,那么只需要在Consumer接受消息前判断是否有相同标识的消息,如果有则拦截。还可以在消费端业务逻辑接口中做幂等判断,前面那种可以做到不侵入到业务代码中,老师看看有没有什么问题
2019-08-01 - 史双龙 👍(8) 💬(1)
玥哥好,我jio着只要在消费端做好幂等就可以,业务借口最好都要做幂等性校验,
2019-08-01 - 凌空飞起的剪刀腿 👍(6) 💬(1)
李老师您好,我想知道订阅者怎么知道broker有他订阅的消息的,不能一直轮询吧?
2020-03-20 - 又双叒叕是一年啊 👍(6) 💬(1)
老师您好我看了这篇文章有几个疑惑希望您帮忙解答下: 1.kafka 和 rocketmq 都不能保证全局topic的顺序性,只能保证 partition 分区的消息顺序性,那我需要保证全局唯一性的场景要如何处理比较好,都有哪些方式可以保证消息的全局顺序性? 文章里您说 可以在 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。 这个句话的意识是说 在 producer 生产消息时要生成一个 全局有序递增的消息id? 然后再 consumer消费时 通过这个id进行消费控制? 因为一个 consumer group 可能包含 多个 consumer ,这个要怎么控制分布式环境下多个consumer消费的顺序呢? 还有consumer也可能发送 reblance操作导致消费分区发生变化 2.确保消息可靠传递方面 #1.如果 存储阶段 发生极端情况: broker 在收到 消息后还未来得及写入磁盘就发生 broker 掉电宕机情况, 这个消息会丢失? 还是producer端先进行自动重试处理,broker都挂了,重试肯定会超时失败,如果达到producer端重试最大次数还是失败,才会向producer端抛出异常 而 producer端 可以捕获这个异常 进行业务的补偿重试 或 降级 保证消息不丢 (入队列 or redis or 入库) , 再进行 后续处理。是这样? 不知道我理解的是否正确 提个建议: 希望老师把关键参数 列一下 或者 给个参考链接 我们去看下不熟悉现找确实比较费劲 不同版本差异也比较大 #2.对于消费阶段 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认-> 这个是要求在生产阶段都采用 手动 提交方式处理? consumer 每次pull 过程中自动提交会有问题? 比如 文章中例子,在自动提交情况下 保存db出异常,在下次 拉取消息时会自动提交刚才的消息,所以才要求手动提交消费进度?
2019-08-31 - 虚竹 👍(4) 💬(3)
老师好,关于消息丢失问题,不止发mq,也可能是发es、钉钉、短信、邮件等,如果需要每个业务自己保证消息发出,是比较困难的, 在考虑应该以微服务的形式提供这些发消息的接口,微服务内部保证消息发送成功,可以通过时间递减等方式重试,一直不成功则 持久化存储提醒人工操作,这里还可以加一些监控阈值报警异常发送情况,同时这里需要支持web搜索查看某个消息是否发送成功或失败,因为涉及到业务判罚,业务人员比较重视, 想问下目前业界是否有开源的相关的实现? 京东内部是如何处理这个问题的? 另外我的思路是否有可行? 期待老师帮忙解惑~
2019-12-24