閱讀754 返回首頁    go 阿裏雲 go 技術社區[雲棲]


MongoDB Replication NetworkInterface

Replication是MongoDB一套非常複雜的功能,功能包括數據同步,選舉,心跳維護等。涉及到與其他MongoD進程通訊。RPC的封裝相對也比較重要,作為這些功能實現的基礎。

Replication相關的對象角色都被封裝在ReplicationCoordinatorImpl對象中:

repl::ReplicationCoordinatorImpl* replCoord = 
    new repl::ReplicationCoordinatorImpl(
        getGlobalReplSettings(),
        new repl::ReplicationCoordinatorExternalStateImpl,
        new repl::NetworkInterfaceImpl,
        new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)),
        static_cast<int64_t>(curTimeMillis64()));

本篇先從網路部分開始介紹NetworkInterfaceImpl

ReplicationExecutor::NetworkInterface

接口,定義了startCommand cancelCommand runCallbackWithGlobalExclusiveLock等方法,抽象Command的處理邏輯。同時還有start wait等一些線程管理接口,重點關注在command和callback相關的實現。

首先來說明下連接池的管理:

NetworkInterfaceImpl::ConnectionPool::ConnectionInfo

集成DBClientConnection外,還有一個creationDate成員,記錄連接創建的時間,後麵會根據這個時間來消耗過期的Connection。

NetworkInterfaceImpl::ConnectionPool::ConnectionPtr

保存的並不是Connection指針,而是一個list::iterator。構造該對象時,隻需要指定HostAddress即可,而不是需要真正的Connection對象等。ConnectionPtr會根據HostAddress從Pool中獲取(acquireConnection)相應的迭代器。

NetworkInterfaceImpl::ConnectionPool

核心類,連接池管理,不同與一般的連接池隻維護同一個地址的多個連接。此連接池維護的是不同的地址,對應的多個連接,所有的空閑連接都存放在_connections成員中,使用中的連接存放在_inUseConnections,兩個成員都被_mutex保護。調用者隻要使用兩個函數acquireConnectionreleaseConnection,需要連接時調用acquireConnection,使用結束後通過releaseConnection歸還。

acquireConnection

作用是獲取Connection,如果Pool中沒有,則創建一個Connection。首先從剛才說的_connections中根據參數傳入的HostAddress查找到相應的ConnectionList。

獲取到ConnectionList後,先會去嚐試清理掉過期的Connection(參考cleanUpOlderThan_inlock實現)。然後取出ConnectionList中的第一個Connection放入到_inUseConnections中。

如果沒有找到相應HostAddress的連接,或者連接過期被釋放,甚至連接已經不可用。則重新構建連接,並且發送鑒權信息,保證連接的合法性,且可用。

cleanUpOlderThan_inlock

這裏的處理比較暴力,個人認為並不是非常合理。簡單講就是找到從創建開始30秒的鏈接去殺掉銷毀。如果能修改成隻銷毀長時間沒有被使用過的鏈接,效果會更好。明明是在長期使用的連接,也因為30秒的問題而被銷毀。

Sock::isStillConnected

判斷連接是否可用的,采用了socket pool的方式來判斷,用非阻塞的方式查看Socket的POLLIN事件,如果Socket中存在數據,或者任何其他的錯誤事件被觸發,則說明連接狀態不可用,返回False。

Sock::connect

因為connect timeout是受syncookies影響,timeout時間會非常的久,所以要在創建另一個線程中進行connect操作,原線程wait指定的時間,超時則放棄。這裏的實現比較重,需要開辟新的線程,比較好的做法是使用epoll非阻塞的方法,可以參考解決方案Stackoverflow

releaseConnection

_inUseConnections再放回到_connections中,在List中的位置是頭部。

NetworkInterfaceImpl

現在可以回頭來繼續說明NetworkInterfaceImpl

NetworkInterfaceImpl雖然通過ConnectionPool管理了Socket,自身則來管理了一個線程池,線程數量最多51,最少也維護了1個線程。command任務都提交到_pending成員中,工作線程從中獲取到Command任務對象來執行,生產者消費者模式。工作線程的最大空閑時間也是30S,如果30S時間內,線程空閑數量小於_peding數量,則自動銷毀資源。

每個提交上來的任務,都有一個cbHandle,作為Callback的句柄參數。onFinish任務處理完成,或者被Cancel的話,都會調用。對於Cancel需要說明,隻能取消處於pending狀態的任務。

_runCommand

有了前麵的ConnectionPool,這裏就簡單了很多,隻需要從中拿到連接對象,發送Command消息即可。如果異常情況出現,比如連接失敗,則通過捕獲異常來反饋。雖然ConnectionPool沒有對連接數量做出上限控製,但NetworkInterfaceImpl控製了線程數量,所以連接數基本也是可控的。理論來說最多的連接數量等於地址數量*51,但實際情況遠達不到。

最後更新:2017-04-01 13:37:07

  上一篇:go PostgreSQL 為什麼不要濫用unlogged table & hash index
  下一篇:go PostgreSQL 違反唯一約束的插入操作會產品HEAP垃圾嗎?