架構師必會-基于消息的分布式事務實現方案
當前位置:點晴教程→知識管理交流
→『 技術文檔交流 』
1. 前言業務系統之間通過MQ進行交互時,怎么保證發送的消息對方一定能收到,可能有人說RocketMQ就能做到,如果貴公司用到的消息隊列是kafka、rabbitmq、activemq實現 這里分享一下基于消息的分布式事務解決方案,此種方案是最終一致性的解決方案,不挑MQ,但是前提MQ本身要支持接收到的消息不能丟失。 2. MQ的配置建議如果要保證MQ接收到的消息不丟,就要配置相關的同步策略或者刷盤策略 主從同步策略 建議主從同步建議設置為主從同步策略為主從同步完再響應,這樣單個節點如果掛了,另一個節點的數據還會存在 刷盤策略 消息中間件為了提高效率,默認接收到消息不會立即刷盤,如果要主從同步策略是主節點接收到消息以后立即響應,這會正好主節點宕機,就會導致消息丟失,所以要特別注意下,雖然可以設置成同步刷盤,但是效率就會降低,所以還是建議設置主從同步策略 3. 生產方設計生產者的職責是必須要保證本地事務提交成功消息一定要發送出去,或者業務處理失敗就不發送。 3.1 消息持久化生產方方案如下,首先需要在業務庫中創建一張表,字段大致為:
與本地業務表使用同一個事務,提交則一起提交,回滾則一起回滾,因為使用的同一個事務所以是強一致的,再事務提交以后進行消息數據的發送,發送成功以后則更改消息狀態為已發送,具體流程請查看圖1 這里可能還有一點還要考慮,就是在圖1的第2步、第3步、第4步會出現失敗,具體描述如下:
持久化相關代碼 圖4,集成Spring的事務管理器,重寫事務提交后發送消息 3.2 消息補償設計以上這三個問題就需要引入補償任務來處理了,具體查看圖5,補償任務會根據發送狀態查詢對應的數據,然后進行發送,這里有一點特別注意,消費方要必須做冪等處理,因為圖1的第3步、第4步消息都已經發送到MQ了,只是發送方不清楚,所以還會重復發送,另外99.9%的場景是能立即發送成功的,只有很小部分需要做補償: 補償代碼 查詢待發送的數據,這里為1分鐘之前的,定時任務用的是elastic-job,用其他定時任務也可以 至此整個發送方設計就完成了,下面看看部分 4. 消費方設計消費方相對比較簡單,主要有兩點要求
以下是消費表的設計
此表也要與業務表處于同一個事務,如果不是一個事務,會出現業務表操作成功、消息表插入失敗,如果出現消息重復發送就會出現重復消費的問題,具體查看圖6 消費方代碼 這里是kafka的消費代碼,通過動態代理,封裝KafkaListener類,在處理前進行消息重復判斷,在處理后進行消費表的插入,這里需要特別注意一點,業務處理不能把異常自己吃掉,否則上層捕獲不到,會認為業務處理成功,從而插入臟數據 圖7 消費方部分核心代碼 5. 歷史數據清理通過前面介紹,我們創建了2張表,分別為消息發送表、消息消費表,這兩張表要特別注意下,如果業務量比較大,數據量會快速增長,所以需要刪除已經處理成功的數據,通過配置兩個定時任務,保留一定的時間數據,其他時間的數據就可以刪除了,代碼如下: 圖9 消費方清理數據代碼 該文章在 2023/5/25 9:33:02 編輯過 |
關鍵字查詢
相關文章
正在查詢... |