前言
上一篇對RabbitMQ的流程和相關的理論進行初步的概述,如果小伙伴之前對消息隊列不是很了解,那么在看理論時會有些困惑,這里以消息模式為切入點,結合理論細節和代碼實踐的方式一起來學習。
正文
常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通過設置交換機類型和配置參數來實現各個模式;接下來就分別進行實操演示吧。
以下演示都是通過管理員的賬號進行。其實每種模式其實很大一部分操作都是一樣的,所以公共部分不會重復截圖說明,不過會針對不同的配置進行說明。
1. 簡單模式(Simple)
簡單模式顧名思義就是簡單,不用配置太多的東西,如下圖所示:
上圖解析:
P:表示生產者,負責推送消息;
C:表示消費者,負責接收消息;
中間紅色部分:代表的是隊列(Queue);
小伙伴可能會奇怪,這里沒有交換機嗎?
其實是有的,上一篇說流程的時候,消息肯定是要通過交換機轉發到隊列中的,這里沒有指定,那是因為用到了默認的交換機,具體看以下演示。
1.1 Web管理界面進行演示
對于Web界面演示來說,只需要將消息能生產、投遞、消費即可,我們不用去弄一個生產者和消費者,生產者和消費者都是業務處理邏輯用的,所以通常都是根據業務需求就行實現的;話不多說開始演示吧。
根據上圖所示,我們只需要創建一個隊列即可,然后就可以進行消息模擬發送和消費了。
此時并沒有指定交換機綁定,點擊隊列名看詳情中的Bindings,有一個默認的交換機已經和隊列進行綁定:
隊列詳情頁面的說明,在上篇文章中就已經標注了,這里就不再贅述。
有了綁定關系之后,就可以在默認的交換機頁面開始模擬轉發消息;首先進入Exchanges管理頁面,點擊默認交換機(AMQP default)進入詳情開始發布消息:
消息發送成功之后就會在隊列界面看到消息情況:
隊列里面有了消息之后,就可以模擬消費者進行消息消費,點擊隊列名進入詳情,可在詳情也模擬消費:
如上所示,簡單模式整個消費流程就通過Web頁面模擬完了。但在消費消息時,提供了Ack Mode模式(消息確認模式)選擇來進行消費,可選擇的模式如下:
Nack message requeue true:獲取消息,但是不會向Server做ack應答確認(即不告訴服務器消息被消費了),消息重新入隊。即隊列中的消息不會被刪除掉;
Automatic Ack:獲取消息,向Server做應答確認(即會告訴服務器消息被消費了),消息不重新入隊,將會從隊列中刪除;
Reject requeue true:拒絕獲取消息(即拒絕處理消息),消息重新入隊;
Reject requeue false:拒絕獲取消息(即拒絕處理消息),消息不重新入隊,將會被刪除;
到這關于簡單模式下的界面演示就結束了,其中描述的細節內容是共用的,在其他模式下的操作也類似,后續不做重復說明。
1.2 代碼進行演示
這里就用控制臺的方式,一步一步的實現。這里需要引入Nuget包:RabbitMQ.Client。生產者的整體代碼如下:
接下來就一步一步來調試,看看消息是怎么一步一步發出去的;
創建連接
剛開始沒有任何連接,如下:
代碼繼續下一步,連接就有了:
此時就可以理解為網絡連接上了,但通道還沒有創建出來,如下:
根據連接創建通道
通道根據連接進行創建,目的是為了提高傳輸效率,共用一個連接,不然頻繁的創建和銷毀連接會占資源,影響性能。
定義隊列
有連接和通道之后理論就可以直接發消息了,但直接通信會相互依賴比較強,達不到解耦合的效果。所以需要定義一個隊列將消息存放到里面,客戶端想用了自己來消費就行,另外隊列還可以達到一定的削峰作用。
創建隊列的時候需要傳幾個參數,分別意思如下:
參數1:queue, 隊列的名稱;
參數2:durable, 隊列是否持久化;如果為true,服務器重啟之后隊列還在,不會被清除;否則就被清掉。
參數3:exclusive ,是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉;
參數4:autodelete, 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息;
參數5:arguments, 用來設置隊列附加參數,如設置隊列的有效期、隊列的消息生命周期、消息的最大長度等;
發送消息
經過以上步驟,就可以發送消息了,如上沒有定義交換機,那就是綁定了默認交換機。
發送時的幾個參數意思如下:
參數1:exchange,交換機,這里沒有指定交換機。
參數2:routingKey,路由key,即指定隊列,簡單模式下,路由key默認就是隊列名
參數3:basicProperties, 配置其他相關屬性
參數4:body ,需要發送的消息內容
以上的生產者完成了,現在再來一個消費者演示一下消息消費,消費者整體代碼如下:
效果如下:
在整個過程中,還是會先建立連接,創建通道,指定隊列;不同的是增加對接收數據的處理。
是不是用起來比較簡單方便,主要是在復雜項目中,消息隊列的作用真的很大。來,接著往下說說其他模式。
2. 工作模式(Work)
工作模式是考慮到多個消費者情況下,消息如何被消費的,主要有兩種方案,輪詢分發和公平分發;
上圖解析:
這兩種方式需要多個消費者,在界面不太好模擬,所以就直接上代碼演示了。
2.1 輪詢分發
在簡單模式基礎上稍微改動一下代碼即可。在生產者中發多條消息出來,然后啟用多個消費者就可以進行模擬如下:
生產者代碼整體如下:
消費者代碼整體如下:
兩個消費者的代碼都是一樣,沒有變動;兩個消費者的都是在消費同一個隊列的消息,可以先啟動兩個消費者,然后在啟動生產者,效果如下:
由上可見,默認情況下其實采用的是輪詢方式。
2.2 公平分發
在實際業務中,有些業務處理比較耗時,有些處理耗時不長,如上案例,假如奇數消息需要處理比較耗時,那么對應的消費者就壓力比較大。這種情況可以通過公平分發的方式進行業務處理,處理快點的就多處理點。
如何才能知道消費者處理業務完成呢?消費者處理完成之后主動上報是最好不過的,所以只需要在消費者端將自動確認機制改為手動確認即可,即:業務處理完成之后,手動上報確認狀態。
生產者的代碼不需要變動,只需要稍微改改消費者代碼即可,這里模擬兩個不同處理能力的消費者:
消費者1處理一條消息業務需要500毫秒;
消費者2處理一條消息業務需要2秒;
消費者1整體代碼如下:
消費者2整體代碼如下,主要是模擬時間不一樣:
先啟動兩個消費者,再啟動生產者,看看消費情況,如下:
以上演示就是公平分發模式的演示,其中有兩個關鍵的步驟:
3. 發布訂閱模式(Fanout)
Fanout模式是一種發布訂閱模式,是一種廣播機制,不需要指定路由Key。這種模式的交換機就會將消息廣播到綁定的所有隊列上去,只要有消費者訂閱對應的隊列,就會收到消息。如下圖:
上圖解析:
在這種模式下,消息會一次性被多個消費者消費。
3.1 Web管理界面進行演示
先創建一個Fanout模式的交換機;
創建兩個隊列(根據實際需要創建多個),并將隊列綁定到上一步創建的交換機上;
這里演示就創建兩個隊列,分別是FanoutQ1和FanoutQ2
在隊列詳情頁或交換機詳情頁都可以進行交換機和隊列的綁定,這里分別在FanoutQ1和FanoutQ2隊列詳情頁進行綁定,如下:
同樣的方式綁定FanoutQ2, 綁定完成之后,可以在交換機詳情頁看到對應的綁定隊列:
通過交換機上投遞消息,看效果;
此時通過FanoutExchange交換機投遞消息,綁定到此交換機上的隊列都能收到:
查看隊列消息情況,可以看到兩個隊列都接收到消息了。
3.2 代碼進行演示
其實代碼和操作Web一樣,只是用代碼實現生產者和消費者而已。
生產者的代碼
因為Fanout交換機不用關注RoutingKey,所以在發布消息時,第二個參數不需要傳遞RoutingKey。
消費者1的代碼
消費者比較關注的是交換機需要和生產者指定的是同一個,隊列和交換機有綁定。
消費者2的代碼,其實和消費者1基本一樣,只是定義的隊列不一樣
先啟動消費者,然后啟動生產者,看效果
這里是控制臺程序,為了顯示方便就先啟動消費者,后期的生產者,實際應用場景先啟動誰都行。
4. 路由模式(Direct)
Direct模式是在Fanout基礎增加RoutingKey條件, 即交換機不會將消息現全部投遞到所有隊列,而是只投遞到對應RoutingKey下的隊列。如圖:
上圖解析:
P:表示生產者,負責推送消息;
X:表示交換機,指定類型為direct,圖中表示一個交換機綁定了多個隊列;
中間箭頭上的error、info、warning代表具體的RoutingKey;
C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
中間紅色部分:代表的是隊列(Queue);
Direct模式其實在實際應用場景中用的比較多的,默認的Exhanges也是Direct模式, 很多關于消息隊列的框架,默認也是采用這種模式,主要原因是根據RoutingKey精確的處理對應的業務,不會由于考慮不周到,導致消息處理有不確定性,性能相對也不錯。
4.1 Web管理界面進行演示
先創建一個Direct模式的交換機;
創建兩個隊列,然后將隊列綁定到上一步創建的交換機上,并指定對應的RoutingKey;
創建隊列完成之后,還需要綁定到交換機上,上一種模式演示的是從隊列詳情中維護綁定關系,這次從交換機詳情中進行演示,如下:
綁定第1個隊列,指定RoutingKey是order,模擬處理訂單的:
綁定第2個隊列,指定RoutingKey是msg,模擬處理消息的:
有了綁定關系,就可以測試驗證了。
注:這里的RoutingKey可以根據實際情況隨意指定的。
向交換機上投遞消息,看效果;
模擬發布一個RoutingKey為order的消息:
再發布一個消息,指定RoutingKey為msg,如下:
兩個消息都發布成功,而是指定對應的RoutingKey進行投遞,所以現在綁定到此交換機上兩個隊列中分別有一條數據,查看隊列的消息概況:
可以進入隊列詳情,通過GetMessage獲取到具體的消息內容,這里就不截圖了,上面已經演示過。
4.2 代碼進行演示
在Fanout的代碼基礎上稍微改動即可,主要改動點就是改變交換機的類型,并在隊列和交換機綁定時設置對應的RoutingKey,發布消息的時候指定交換機和RoutingKey。
生產者代碼
消費者1代碼,主要是綁定隊列時指定的RoutingKey為order
消費者2代碼,主要是綁定隊列時指定的RoutingKey為msg
運行起來看效果:
如上演示效果,和Web演示一樣,只有精確匹配到RoutingKey才能消費到對應的消息數據。
5. 主題模式(Topic)
Topic模式是在Direct模式基礎增加模糊匹配RoutingKey,Direct精確匹配RoutingKey,Topic可以通*或#進行模糊匹配,從而把消息投遞到對應的隊列中,如圖:
上圖解析:
P:表示生產者,負責推送消息;
X:表示交換機,指定類型為topic,圖中表示一個交換機綁定了多個隊列;
中間箭頭上的文字代表模糊匹配的RoutingKey;其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞,匹配符需要與點號(.)搭配使用。*.orange.test.# 示例中orange算一個詞,test算一個詞,即通過點號(.)分開的就稱為一個詞。
C1、C2:表示多個消費者,都可以從同一個隊列接收消息;
中間紅色部分:代表的是隊列(Queue);
5.1 Web管理界面進行演示
先創建一個Topic模式的交換機;
創建兩個隊列,然后將隊列綁定到上一步創建的交換機上,并指定對應的RoutingKey;
將隊列綁定到交換機上,這里還是在交換機詳情中進行演示,如下:
同樣的步驟分別對TopicQ1和TopicQ2進行規則綁定,如下:
現在的TopicExchange的綁定關系如下:
有了關系之后就可以進行驗證效果了。
向交換機上投遞消息,會根據RoutingKey的模糊匹配規則將消息投遞到對應的隊列中,看效果;
先指定order.create.test發布消息,看看會匹配哪些隊列:
TopicQ2接收到消息,匹配到路由規則order.#
再指定RoutingKey 為order.update 發布一個消息,如下:
TopicQ1和TopicQ2都收到消息了,匹配到路由規則order.#和order.*,如下:
再指定RoutingKey 為order發布一個消息,就會匹配到order.#,這里就不截圖了。
以上測試說明:在Topic類型交換機和隊列綁定關系時,可以指定RoutingKey的匹配規則,星號、#號、點號搭配使用,其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞。
更多情況,小伙伴們自己動手試試。
5.2 代碼進行演示
在Direct的代碼基礎上稍微改動即可,主要改動點就是改變交換機的類型,并在隊列和交換機綁定時設置對應的RoutingKey,這里的RoutingKey是一個規則,是星號、#號、點號和每個詞的組合,發布消息的時候指定交換機和RoutingKey,RoutingKey會去匹配綁定的規則。
6. 參數模式(Headers)
Headers模式不是通過RoutingKey進行匹配投遞消息,而是匹配請求頭中所帶的鍵值進行消息投遞,所以創建隊列是需要設置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。
5.1 Web管理界面進行演示
先創建一個Headers模式的交換機;
創建兩個隊列,然后將隊列綁定到上一步創建的交換機上,可以指定Headers的參數;
將隊列綁定到交換機上,這里還是在交換機詳情中進行演示,如下:
這里不使用RoutingKey的方式,而是通過設置參數的形式進行綁定,后續投遞消息的時候就匹配參數,如果能匹配上,就將消息投遞到對應的隊列。
綁定HeaderQ1隊列:
同樣的方式綁定HeaderQ2隊列,只是只添加了一個鍵值對,order:111,最后HeaderExchange交換機的綁定關系如下:
關系綁定好之后就可以進行測試效果了。
向交換機上投遞消息,會根據檢查Headers參數的條件是否符合,若符合將消息投遞到對應的隊列中,看效果;
設置兩個參數進行發布,如下:
可以看到兩個隊列都匹配到了,因為order和msg鍵值對匹配到HeaderQ1,order的鍵值對匹配到HeaderQ2,如果只設置一個order簡直對呢:
此時只有HeaderQ2才能精確匹配,HeaderQ1沒有全部匹配,所以對應隊列沒有收到消息,如下:
由此可見,在界面上沒有指定x-match綁定的話,默認是all,就是要全部匹配才投遞消息到對應隊列。
這里繼續新增一個HeaderQ3的隊列,創建方式和上面不一樣,只是在綁定交換機的時候增加x-match 為 any,如下:
綁定成功之后,現在關系如下,其中order的鍵值對是在每個綁定中都有,如下:
測試發消息之前,把之前消息都清空了,也就是隊列中的消息都是空的,這次我們再指定order為111的參數進行發布消息,看看有哪些隊列收到消息呢:
消息發出后,之后HeaderQ2和HeaderQ3收到消息,HeaderQ2只有一個order參數,精確匹配上了,HeaderQ3有多個參數,但設置了x-match為any,所以只要匹配其中一個即可。HeaderQ1多個參數需要全部匹配才行,所以沒有接收到消息:
5.2 代碼進行演示
Headers模式是根據參數進行匹配,不是通過RoutingKey,所以只需要在綁定隊列時設置好參數,在發送消息的時候也設置好參數,這樣就會根據匹配原則去匹配參數,如果匹配上,消息就投遞到對應的隊列,供消費者進行消費。這里為了演示比較清晰一點,使用一個生產者,三個消費者的形式進行演示。
生產者
代碼中關鍵的部分是指定交換機的類型為Headers,然后模擬了兩類參數的形式投遞消息,方便用于測試參數匹配模式的測試。
消費者1,綁定參數為order:111和msg:222
消費者2,和消費者1代表基本一樣,只是綁定參數不一樣;
消費者3,代表基本和消費者1一樣,只是在參數指定的時候增加了x-match來指定匹配模式,這里指定為any,也就是只要其中有部分匹配上也可以消費到消息。
演示效果,啟動生產者和消費者:
如上圖,消費者3指定了匹配模式為部分匹配,所以可以接收到一個參數的消息,而消費者1需要精確匹配,所以不能接收到。
關于常用的消息模式就聊到這吧,小伙伴們可以根據自己的業務場景進行使用,相關演示代碼的地址如下:
碼云:https://gitee.com/CodeZoe/dot-net-core-study-demo/tree/main/RabbitMQDemo
總結
RabbitMQ提供了多種模式應對各種業務,但匹配條件越是模糊或者參數化,那性能相對比較弱。再回顧一些常用的消息隊列組件,是不是很多都是默認使用Direct模式,精確匹配的RoutingKey可以針對具體不同業務處理,匹配性能也相對比較高;當然其他模式小伙伴也可以針對業務情況進行使用。
后續的文章將繼續分享RabbitMQ消息確認機制、死信隊列、磁盤監控等相關知識點的應用,關注“Code綜藝圈”,和我一起學習吧。
----------------------------------
原文:https://mp.weixin.qq.com/s/LP_aAf_fODUPOtiikVwvTw
該文章在 2023/5/20 8:18:33 編輯過