写在前面
上篇文章介绍了 分布式一致性那些事儿,本文将详细讲解一下本地事务表的实现原理
原理
本地消息表方案最初是ebay提出的,其实也是BASE理论的应用,属于可靠消息最终一致性的范畴。这里以支付服务和会计服务为例展开介绍本地消息表方案,大概流程是这样子:用户在支付服务完成了支付订单支付成功后,此时会调用会计服务的接口生成一条原始的会计凭证到数据库中,如图1所示。这里必须明确:支付服务处理完订单支付等逻辑后,此时若直接调用会计服务生成会计凭证数据的接口肯定会遇到分布式事务的问题。
因为用户完成支付后,此时得立马给用户一个支付的反馈,要做的就是提醒用户支付成功。因为会计服务生成的会计凭证保存到数据库的过程中可以对用户透明,用户也无需知道有这么一个流程,为了提高响应速度和解耦,因此可以引入mq来做到异步生成会计凭证,即用户完成支付订单支付后,此时可以将消息投递到mq中,然后会计服务再去监听mq消息去处理消费逻辑。
在支付服务和会计服务之间引入mq后,此时又引入了新的问题。大概列举如下:
- 1,若支付服务完成支付逻辑后,在投递消息到mq中间件的过程中由于网络抖动等原因,没有投递到mq中导致消息丢失了怎么办?
- 2,mq接收到消息后,由于内部原因导致消息丢失了怎么办?
- 3,会计服务在监听消息的过程中,由于网络原因没有接收到消息或消费过程中遇到异常,此时也会导致消息丢失,测试怎么办?
经过以上分析,mq可能会丢失消息,传统的mq没有实现分布式事务(注意rocketmq的某些版本有实现分布式事务功能),因此这里可以引入本地消息表结合mq的方式来解决分布式事务的问题,保证消息的可靠投递。
正向流程步骤大概如下:
- 1)在支付库中引入一张消息表来记录支付消息,即用户支付成功后同时往这张消息表插入一条支付成功的消息,状态为“发送中”。注意支付逻辑和插入消息表的代码要包裹在一个事务里面,这里保证了本地事务的强一致性。即支付逻辑和插入消息表的消息组成了一个强一致性的事务,要么同时成功,要么同时失败。
- 2)完成 1)步的逻辑后,此时再向mq的PAY_QUEUE队列中投递一条支付消息,这条支付消息的内容跟保存在支付库消息表的消息内容一致。
- 3)mq接收到消息后,此时会计服务也监听到这条消息了,此时会计服务处理消费逻辑即开始生成会计凭证。
- 4)会计凭证生成后,再反向向mq投递一条消费成功的消息到ACC_QUEUE队列
- 5)同时支付服务又来监听这个会计服务消费成功的消息,当支付服务监听到这个消费成功的消息后,此时再将本地消息表的消息状态改为“已发送”。
- 6)经过前面5步后,整个业务就已经完成了。
本地事务表是如何工作的
此时可能你会有个疑问:用户支付成功后,若消息在投递过程中丢失了就丢失了,会计服务那边也消费不到了,此时同样也会造成支付服务(生产者)和会计服务(消费者)之间的数据不一致。
之前增加的本地消息表好像也没起作用啊?
那此时怎么办呢?如何来解决消息丢失的问题,做到消息的可靠投递呢?
其实解决方案就是消息重复投递,但消费者的消费接口要实现幂等性。
怎么来让消息重复投递呢?此时本地消息表就派上用场了,刚才我们在支付库中新增加了一张本地消息表,即支付等逻辑处理成功,这张本地消息表也会记录一条记录,此时的消息状态是“发送中”。若第一次生产者投递的消息丢失后,此时我们只要将这张本地消息表状态为“发送中”的消息重新投递即可,直到消费者消费成功为止,消费者消费成功后将这条消息的状态改为“已发送”即可。
因此为了能将丢失后的消息重发,此时我们引入一个定时任务好了,暂且叫它“消息恢复系统”吧,如下图所示。这个消息恢复系统就是每隔一段时间去本地消息表中捞取状态为“发送中”的消息,然后重新投递到mq中间件中,然后消费者就会重新消费了。若消费者已经消费过了,此时就不再处理消费业务逻辑,直接反向投递一条消费成功的消息到mq中,此时原来的生产者此时也会监听这条消费成功的消息,将本地消息表的消息状态改为“已发送”,此时消息恢复系统就不会再去捞取这条状态为“已发送”的消息,然后进行重新投递了。
此时若消息丢失后且消息恢复系统在重新投递过程中,也可能会再次投递失败。此时我们一般会指定最大重试次数,重试间隔时间根据重试次数而线性增长。若达到最大重试次数后,同时记录日志,我们可以根据记录的日志来通过邮件或短信来发送告警通知,接收到告警通知后及时介入人工处理即可。
消息重复怎么办
如果消息重复发送了,或者消费者在消费消息后发送的ack消息失败了,导致消息重复发送,会不会有问题?
是有问题的,这个时候就要求消费者那边需要保证消息的幂等性。我们为每一条消息生成一个唯一id,即使在消息重复发送的时候也确保id是相同的。
消费者消费某条消息后记录下唯一id,每次接收到消息,比对一下唯一id,如何已经处理过,则直接ack。
缺点
本地事务表也有缺点
- 1)本地消息表与业务耦合在一起,难于做成通用性,不可独立伸缩。
- 2)本地消息表是基于数据库来做的,而数据库是要读写磁盘IO的,因此在高并发下是有性能瓶颈的
参考
[1] 分布式一致性那些事儿
[2] 基于本地消息表的分布式事务解决方案