etcd Watch實現筆記

etcd Watch實現筆記

最近要給項目實現一個類似於etcd的Watch功能。為此先研究下etcd如何實現Watch,順便把研究結果記錄下來作為學習筆記。筆記基於v3.3.0。

API

Watch和其它etcd v3 API一樣,都是基於protobuf定義:

message WatchRequest { // request_union is a request to either create a new watcher or cancel an existing watcher. oneof request_union { WatchCreateRequest create_request = 1; WatchCancelRequest cancel_request = 2; }}message WatchResponse { ResponseHeader header = 1; // watch_id is the ID of the watcher that corresponds to the response. int64 watch_id = 2; // created is set to true if the response is for a create watch request. // The client should record the watch_id and expect to receive events for // the created watcher from the same stream. // All events sent to the created watcher will attach with the same watch_id. bool created = 3; // canceled is set to true if the response is for a cancel watch request. // No further events will be sent to the canceled watcher. bool canceled = 4; // compact_revision is set to the minimum index if a watcher tries to watch // at a compacted index. // // This happens when creating a watcher at a compacted revision or the watcher cannot // catch up with the progress of the key-value store. // // The client should treat the watcher as canceled and should not try to create any // watcher with the same start_revision again. int64 compact_revision = 5; // cancel_reason indicates the reason for canceling the watcher. string cancel_reason = 6; repeated mvccpb.Event events = 11;}service Watch { // Watch watches for events happening or that have happened. Both input and output // are streams; the input stream is for creating and canceling watchers and the output // stream sends events. One watch RPC can watch on multiple key ranges, streaming events // for several watches at once. The entire event history can be watched starting from the // last compaction revision. rpc Watch(stream WatchRequest) returns (stream WatchResponse) { option (google.api.http) = { post: "/v3beta/watch" body: "*" }; }}

相對應的HTTP API通過grpc-gateway自動生成的代碼實現。其本質為將HTTP request翻譯為grpc request,並由grpc client直接發給本地的grpc server。注意,因為grpc-gateway不支持real bidirectional stream, 所以需要使用websocket(#8237)做一層proxy:

httpmux.Handle( "/v3beta/", wsproxy.WebsocketProxy( // grpc-websocket-proxy gwmux, wsproxy.WithRequestMutator( // Default to the POST method for streams func(incoming *http.Request, outgoing *http.Request) *http.Request { outgoing.Method = "POST" return outgoing }, ), ), )

websocket使得client和server可以雙向發送信息,其概念和用法可以參考這篇文章。

實現

API的實現由watchServer完成(etcdserver/api/v3rpc/watch.go),其核心為WatchableKV

// WatchableKV is a KV that can be watched.type WatchableKV interface { KV Watchable}// Watchable is the interface that wraps the NewWatchStream function.type Watchable interface { // NewWatchStream returns a WatchStream that can be used to // watch events happened or happening on the KV. NewWatchStream() WatchStream}

這幾個組件間的關係大致如圖:

watchServer通過gRPC生成的etcdserverpb完成和用戶的交互,一個用戶對應一個gRPC stream。watchServer中的Recv routine會不斷從gRPC stream獲得用戶請求,並且根據請求向WatchStream發出Create/Cancel Watch的命令。同時其會通過golang channel向Send routine發送control msg,Send routine根據message內容進行處理(比如Cancel Watch時的一些內存清理)再將結果返回給用戶。同時Send routine也會通過channel監聽從WatchStream發來的events,並將其轉發給用戶。之所以在WatchServerWatchableKV間加入了一層WatchStream而不是直接調用,是因為WatchableKV在接到watch請求後會返回一個watcherwatcher更加flexible,比如不同的watcher可以向不同的channel發送events,但是caller也需要管理這些的watcher的lifecycle。WatchStream就起到了管理這些watcher lifecycle的作用。同時它也隱藏了一些watchServer所不需要的flexibility,比如對於WatchServer來說所有watcher向一個channel發送events就已足夠。這在不降低WatchKVextensibility的同時,使得WatchServer提供的Watch功能更易於上層使用。WatchableKV繼承了KV,這兩個類因篇幅限制在此不展開討論,以後有機會再介紹。

這裡的設計很好的體現了兩個設計原則:單一功能原則開閉原則

從單一功能原則來說,這裡的每個類/routine都專註於實現一個功能。KV實現了Key-Value store(mvcc/kvstore.go)。WatchableKV專註實現watch KV變化(mvcc/watchable_store.go)。WatchStream負責管理WatchableKV生成的watcher。watchServerWatchStream傳遞的變化通過etcdserverpb傳達給用戶(etcdserver/api/v3rpc/watch.go)。就連WatcherServer中Send和Recv兩個goroutine都只分別負責Send和Recv,需要協調便通過control message。

從開閉原則來看,這些component無需修改代碼而可以很容易的通過composition、繼承等方法實現功能拓展。比如說我們需要一個WatchStream的新功能:events不再通過一個channel返回,而是經由多個channel分發,同時保證同一個key的events順序不變。那麼我們無需修改WatchStream代碼,而是可以直接創建一個新類MultiWatchStream來實現這個功能(正如上文提到,WatchableKV返回的watcher本身足夠flexible來完成events的分發)。假設我們沒有WatchStream這個類而是直接把這層邏輯寫進了WatchableKV的話,要實現同樣的新功能很可能就得修改WatchableKV的代碼。而修改代碼的風險比增加代碼的風險要高,因為其可能會影響現有代碼。

Scalablity

每個client的Watch request都會建立一個gRPC stream。同時每watch一個key都會在WatchableKV中創建額外的watcher。watch key的數量多了,這對watch的scalability會造成一定的影響。在etcd 3.2中,etcd通過gRPC proxy提升了其scalabili (link)。client將不直接connect到etcd server,而是先連接到gRPC proxy。proxy會將對同一個key的多次watch合併成etcd server上的一次Watch來減少對etcd的壓力。etcd repo里提供了一張架構圖,為了閱讀方便摘抄如下(原鏈接):

+-------------+ | etcd server | +------+------+ ^ watch key A (s-watcher) | +-------+-----+ | gRPC proxy | <-------+ | | | ++-----+------+ |watch key A (c-watcher)watch key A ^ ^ watch key A |(c-watcher) | | (c-watcher) | +-------+-+ ++--------+ +----+----+ | client | | client | | client | | | | | | | +---------+ +---------+ +---------+

這種方法針對leader election(多個client watch少數key)會有更好的效果。而對需要watch大量不同key的情況,作用有限。

推薦閱讀:

Etcd源碼分析之一:server啟動

TAG:etcd | 軟體工程 | 計算機 |