0%

[從零開始做水平擴展] 4.提案驗證

經過上次討論結果,最後決定使用 delayed queue 的方式來去實作,所以我們要先驗證假設的條件成立

Worker 可以透過搶訊息方式取得不同設備上傳的資料

目前設備上傳資料的方式可以透過兩個渠道

  1. rabbitmq via amqp
  2. azure iot hub

在 rabbitmq 的部分,多個 consumer 訂閱相同 topic 時,已經確定他是可以保證兩個 consumer 取得的資料不同了 (透過競爭消費的方式)

Untitled (6)

但 azure iot hub 的部分,實測如果不手動設定 partition,不同 worker 是有可能收到一樣的訊息的,需要設定 azure iot hub 的 partition 數量與 worker instance 數量一致才能避免這個問題

考慮到想讓 worker 動態擴展越簡單越好,這邊決定新增一個微服務,訂閱 iot hub 的訊息後 forwarding 到 rabbitmq,讓兩個渠道最後的行為一致

Untitled (7)

Delayed queue 能正確處理資料順序

首先,將 delayed queue 獨立出來在 archiver 前面只負責處理排序然後在傳道 mqtt 給 archiver 好像有點浪費,所以我們決定將這個 delayed queue 放在 archiver 裡面當作一個 buffer

當資料傳給 archiver 時,會先存放在 buffer 一段時間,當時間到了之後,會將 buffer 內的資料做排序,再按照之前的方式進行計算

假設 worker 今天處理某筆資料花費的時間很久,archiver 那邊就有可能把這個時間區段的結果計算錯誤

例如一分鐘內每 10 秒傳過來一筆資料,資料如下:

1
1 -> 2 -> 3 -> 4 -> 5 -> 6

假設第三筆資料在某個 worker 處理了很久,到最後 archiver 就只收到

1
1 -> 2 -> 4 -> 5 -> 6

所以算出來的結果就變成

1
2
3
4
5
Max: 6,
Min: 1,
Last: 6,
Avg: 1 * 10 sec + 2 * 20 sec + 4 * 10 sec + 5 * 10 sec + 6 * 10 sec / 60 sec
= 3.3333333333

但正確的結果應該是

1
2
3
4
5
Max: 6,
Min: 1,
Last: 6,
Avg: 1 * 10 sec + 2 * 10 sec + 3 * 10 + 4 * 10 sec + 5 * 10 sec + 6 * 10 sec / 60 sec
= 3.5

平均值算錯了

這個時候 worker 就會將這個處理很久的資料時間區段傳給另外一支微服務叫 Restore

他會用跟 archiver 一樣的計算方式去把 worker 通知他要重算的區段重新計算已達到修正的目的

詳細內容這邊就不說太多