前言
正當我以為目前的架構可以滿足需求時,登登~需求變更~~~
上面的長官們希望 Worker 可以當做一個共用服務,讓其他團隊可以訂閱 Worker 解析後的設備訊息來做其他應用
這就代表我們不能將資料的順序在 Archiver 那邊處理,而是 Worker 發送出來的設備訊息就需要按照順序了
所以 Worker 不能採用競爭消費的方式爭搶訊息來處理,因為這會導致同一設備的訊息順序錯亂,如下圖

為了解決這個問題,我們需要做到兩件事情
- 讓同一個 Worker 處理同一個 Device 的訊息
 - 需要一個 Load Balancer 的角色,在 Worker 增加或減少時重新分配設備道不同 Worker 上,且分配的過程中保證順序性
 
Dispatcher 處理 Worker 增加
我們新開發了一個微服務,用來分配設備到不同 Worker 以及處理 Worker 增加/減少時的重新分配,整個流程大致如下
起始狀態,Dispatcher 會從 db 中讀取所有註冊過的設備來得知目前有哪些設備,且在沒有 Worker 啟動的情況下,Dispatcher 不會去接收設備的訊息,會讓訊息堆積在 RabbitMQ 中

第一個 Worker 啟動時

每個 Worker 會有自己的 id,並用自己的 id 當作前綴,去訂閱設備的訊息,前綴的目的是方便 Dispatcher 知道要將設備的訊息傳到哪個 Worker 上

之後 Worker 會到 ETCD 註冊自己,在 ETCD 中會有一個 key 叫做 Worker List,裡面會有目前起來的 Worker 的 id 清單

Dispatcher 會去訂閱 ETCD,當 Worker List 這個 key 有異動時,Dispatcher 就知道 Worker 是變多還變少,以目前的例子 Dispatcher 會知道有一個 id 是 A 的 Worker 啟動了

之後 Dispatcher 會將設備根據目前有幾個 Worker 來做分配,因為原本沒有 Worker,所以現在會將全部的設備分配到 Worker A 上

之後 Dispatcher 就會開始訂閱消息,並根據設備分配到的 Worker id 將 topic 加上 Worker id 的前綴,分配到該 Worker 上

假設現在系統附載變大,K8s 自動產生了一個新的 Worker B

Worker B 一樣會先用自己的 id 當作前綴,去訂閱 RabbitMQ

然後會到 ETCD 註冊

Dispatcher 會收到 Worker B 註冊的通知

由於現在有兩個 Worker,所以 Dispatcher 會對設備進行重新分配,原本
1,2,3,4,5都是分配給 A,Dispatcher 會將4,5分配給 B,這邊都只是先分配而已,還沒有完成
之後 Dispatcher 會一個一個設備去做轉移,首先先轉移設備
4,Dispatcher 會建立一個 queue 給設備 4,在轉移過程中設備 4 的訊息都會先暫存到這個 queue 中
之後 Dispatcher 會發送一個 device 4 terminate 的訊息給 Worker A,當 Worker A 接收到這個訊息時代表從這個訊息之後就不會有 device 4 的訊息再傳過來了

為了確保同一個設備只會被同一個 Worker 處理,所以我們會讓 Worker A 收到 terminate 訊息時回傳一個 ack 的訊息給 Dispatcher,這樣就能保證 Worker A 已經沒有處理 device 4 的訊息,在等待 ack 的過程中,device 4 的訊息還是會繼續上傳,所以會保存在 Dispatcher 的 queue 中

當 Worker A 收到 terminate,代表 device 4 的訊息都處理完了,Worker A 會發送一個 ack 的 notify 給 Dispatcher

此時,Dispatcher 就能將暫存的 device 4 訊息交給 Worker B 處理,且不用擔心同一個設備的訊息被兩個 Worker 同時處理導致順序不正確的問題了

當然,device 5 的處理方式也是一樣的

Disaptcher 處理 Worker 減少
延續上面的例子,當負載變小,K8s 想要將 Worker B 關閉,此時 Worker B 會做 Graceful shutdown,而不是馬上關閉

Graceful Shutdown 的第一步就是先去 ETCD 取消註冊

Dispatcher 就會收到通知

Dispatcher 會重新分配設備,將 device 4 與 device 5 分配給 Worker A,一樣,這只是分配而已,還沒轉移完成

之後的作法幾乎跟 Worker 增加時轉移設備的流程一樣,先轉移 device 4

通知 W orker B device 4 要被轉移了

等待 Worker B 回應時先將 device 4 訊息暫存

Worker B 收到後回傳 Ack

Dispatcher 收到 Ack,將暫存的訊息轉交由 Worker A 處理

device 5 的處理流程也是一樣的

當 Dispatcher 轉移完所有 Worker B 的設備後,Dispatcher 會發送一個消息通知 Worker B 可以將自己關掉了

Worker B 收到訊息後就會把自己關掉

總結
透過新開發的這個微服務,我們可以讓設備的訊息保持一致的順序,讓其他團隊可以更方便的使用我們的 Worker
當然,目前看起來 Dispatcher 會是一個 single point of failure,我也會繼續想一下這個部分有沒有改進的方式,之後如果有其他更新會再寫上來