標籤:

Mongo 代理程序實現-代碼實戰篇

延續上一篇文章 Mongo 代理程序實現-複製集搭建及抓包篇,接下來,就正式開始我們的代碼實戰。

根據一貫的風格,我們先來梳理下項目目錄結構,結構如下:

.n|__ bin/ # 用於存放編譯後生成的二進位文件n|__ config/ # 用於存放配置文件n|__ connection/ # 存放連接相關的文件n| |__ proxy.go # 代理組件n| |__ pool.go # 連接池組件n| |__ repl_set.go # 複製集組件n| |__ conn.go # 連接對象組件n|__ internal/ # 存放 mongo 內部協議相關文件n| |__ auth.go # 握手鑒權組件n| |__ protocol.go # 協議解析組件n| |__ request.go # 請求重寫組件n| |__ response.go # 響應重寫組件n|__ statistics/ # 存放指標統計上報組件n|__ test/ # 存放各種語言驅動測試代碼的文件夾n|__ utils/ # 工具函數文件夾n|__ glide.yaml # 依賴包配置文件n|__ main.go # 入口文件n

限於篇幅的原因,不可能把上面的細節一一講個遍,我只挑選 proxy、pool 兩個組件來講...想了解更多實現細節的童鞋,可以私信我。

proxy 實現

最簡單的 proxy 實現套路就像下面這樣:

// main.gonfunc main() {n // 傳入配置參數,實例化一個代理對象n p := NewProxy(conf)n // 卡住,循環接受客戶端請求n p.LoopAccept()n}n

接著來實現 NewProxy、LoopAccept 方法:

// connection/proxy.gontype Proxy struct {n sync.RWMutexnn listener net.Listenern writePool, readPool *pooln}nnfunc NewProxy(conf config.UserConf) *Proxy {n // 開始監聽本地埠n listener, err := net.Listen("tcp", ":"+conf.GetString("port"))n if err != nil {n log.Fatalln(err)n }nn p := &Proxy{n listener: listener,n }nn // 實例化連接池n p.readPool, p.writePool, err = newPool(p)n if err != nil {n panic(err)n }nn return p n}nnfunc (p *Proxy) LoopAccept() {n for {n client, err := p.listener.Accept()nn go func(c net.Conn) {n defer c.Close()nn // 一個連接在多次 messageHandler 中共用一個 Reader 對象n cr := bufio.NewReader(c)n // 因為一個連接可能會進行多次讀或寫操作n for {n // 將客戶端請求代理給服務端,服務端響應代理回客戶端n // 同時中間對請求或響應進行重寫操作n err := p.messageHandler(cr, c)nn if err != nil {n // 只要出現錯誤,就執行到上面的 defer c.Close() 來關閉連接n returnn }n }n }(client)n }n}n

接著來實現核心邏輯 messageHandler:

// connection/proxy.gonfunc (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {n // 對請求報文進行解析操作n req, err := internal.Decode(clientReader)n if err != nil {n return errors.New("decode error")n }nn // 將客戶端請求發送給資料庫伺服器n res, err := p.clientToServer(req)n if err != nil {n return errors.New("request error")n }nn // 將資料庫伺服器響應返回給客戶端n return res.WriteTo(c)n}nnfunc (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {n var server net.Connn // 如果是讀操作,就從讀池中取出連接n if req.IsReadOp() {n host := req.GetHost()n // 某些讀操作需要發送到指定的讀庫上,所以需要傳 host,來獲取指定讀庫連接n server = p.readPool.Acquire(host)n // 反之,寫操作從寫池中取出連接n } else {n // 由於寫庫只有一個,所以不用傳 host 參數了n server = p.writePool.Acquire()n }nn // 將客戶端請求發送給資料庫伺服器n err := req.WriteTo(server)n if err != nil {n return nil, errn }nn // 獲取解析資料庫伺服器響應n res, err := internal.Decode(bufio.NewReader(server))n return res, errn}n

大致邏輯就是,客戶端通過代理把請求發給服務端,服務端響應也通過代理響應回客戶端。

------------ request ----------- request ------------n| | --------> | | --------> | |n| client | | proxy | | repl_set |n| | <-------- | | <-------- | |n------------ response ----------- response ------------n

吶~,當然還有非常多的細節,由於篇幅原因不得不省略...

pool 實現

由 proxy 的代碼邏輯來看,我們取讀或寫庫連接是通過讀或寫池的 Acquire 方法來取的:

// connection/pool.gontype pool struct {n sync.RWMutexnn connCh chan net.Connn newConn func(string) (net.Conn, error)n freeConn func(net.Conn) errorn}nnfunc (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {n host := ""n if len(opts) > 0 {n host, _ = (opts[0]).(string)n }nn chLen := len(p.connCh)n // 從 channel 中遍歷剩餘數量的 connn for i := 0; i < chLen; i++ {n select {n case conn, ok := <- ch:n if ok {n if len(host) > 0 {n if conn.RemoteAddr().String() == host {n return conn, niln }n // 沒有找到對應 host 的 conn,則把 conn 重新放回 channeln // 你可以簡單理解為只是執行了 p.connCh <- conn 操作n p.freeConn(conn)n } else {n return conn, niln }n }n // 避免數量不足而導致 channel 阻塞等待n default:n }n }nn // 若還沒有從 channel 中取到 conn,則立馬 new 一個n conn, err := p.newConn(host)n if err != nil {n return nil, errn }nn return conn, niln}n

池的實現大致就是實現了一個循環隊列,連接從池中取,取出的連接在使用完後,可以放回池中。

總結

聰明的童鞋可能已經看出,我在定義各種 struct 的時候,基本沒有添加什麼狀態量,因為在並發場景下,對狀態量的把控不好會導致一些很嚴重的問題,讀者可以自由發揮設計功底,使用 atomic 或 go 1.9 提供的 sync.Map 等無鎖操作來解決這些問題。

結束語

一溜寫下來,看過抓包篇的童鞋可能會說,mmp 你根本就沒講如何實現自動主備切換的邏輯。我表示確實是立了個大 flag (老臉一紅...

但我要真的一字一句寫下來,恐怕很多人看都不想看,文章篇幅就是要簡短明了,才有看下去的勇氣。當然你真想知道細節,可以私信我,我一定知而答(233。

推薦閱讀:

怎樣勸服機關單位使用 MySQL/MongoDB/Redis 取代 Oracle?

TAG:Proxy | mongo |