zookeeper基礎知識整理
https://zookeeper.apache.org/doc/trunk/javaExample.html 官網上文檔,API,用例
Zookeeper應用簡單例子
Zookeeper能幫我們作什麼事情呢?簡單的例子:假設我們我們有個20個搜索引擎的服務器(每個負責總索引中的一部分的搜索任務)和一個總服務器(負責向這20個搜索引擎的服務器發出搜索請求並合並結果集),一個備用的總服務器(負責當總服務器宕機時替換總服務器),一個web的cgi(向總服務器發出搜索請求).搜索引擎的服務器中的15個服務器現在提供搜索服務,5個服務器正在生成索引.這20個搜索引擎的服務器經常要讓正在提供搜索服務的服務器停止提供服務開始生成索引,或生成索引的服務器已經把索引生成完成可以搜索提供服務了.使用Zookeeper可以保證總服務器自動感知有多少提供搜索引擎的服務器並向這些服務器發出搜索請求,備用的總服務器宕機時自動啟用備用的總服務器,web的cgi能夠自動地獲知總服務器的網絡地址變化.這些又如何做到呢?
-
提供搜索引擎的服務器都在Zookeeper中創建znode,zk.create("/search/nodes/node1",
"hostname".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL); - 總服務器可以從Zookeeper中獲取一個znode的子節點的列表,zk.getChildren("/search/nodes",true);
- 總服務器遍曆這些子節點,並獲取子節點的數據生成提供搜索引擎的服務器列表.
- 當總服務器接收到子節點改變的事件信息,重新返回第二步.
- 總服務器在Zookeeper中創建節點,zk.create("/search/master","hostname".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);
- 備用的總服務器監控Zookeeper中的"/search/master"節點.當這個znode的節點數據改變時,把自己啟動變成總服務器,並把自己的網絡地址數據放進這個節點.
- web的cgi從Zookeeper中"/search/master"節點獲取總服務器的網絡地址數據並向其發送搜索請求.
- web的cgi監控Zookeeper中的"/search/master"節點,當這個znode的節點數據改變時,從這個節點獲取總服務器的網絡地址數據,並改變當前的總服務器的網絡地址.
Zookeeper簡單整理(《Hadoop in action》,也可以參考《Hadoop權威指南》裏的zookeeper篇)
當一個ZooKeeper實例被創建之後, 它啟動一個線程連接到ZooKeeper服務器, 對構造函數的響應返回的很快, 因此在使用ZooKeeper對象錢等待建立連接非常重要, 因此借助並發包中的CountDownLatch來阻塞, 直到ZooKeeper實例準備好.
客戶端連接到ZooKeeper之後, Watcher的process()方法被調用, 並收到一個事件, 表明連接已經完成. 在收到連接事件時(由Watcher.Event.KeeperState枚舉型表示, 並帶有值SyncConnected), CountDownLatch的countDown()方法被調用, 計數減一, 即釋放等待線程, 表明連接建立, 可以自行其他操作了. 比如創建znode.
ZooKeeper可以提供一種高可用, 高性能的協作服務.
ZooKeeper是為協作而設計的(通常使用小數據文件), 不是大容量的數據存儲, 因此任何一個znode的數據存儲量的上限是1MB
ZooKeeper上的數據訪問都是原子的. 不可能出現部分數據被客戶端寫入, ZooKeeper不支持追加操作.
znode的路徑必須是絕對, 因此, 他們必須由反斜杠字符開頭. 除此之外, 他們還必須是唯一的.
zookeeper在路徑中是保留字, /zookeeper用來存儲管理信息, 比如一些配額信息.
znode可以分為兩種: 臨時的和永久的. znode類型是在創建時指定的, 並且不能被改變. 一個臨時性znode會在創建它的客戶端的會話結束時由ZooKeeper刪除, 一個永久的znode並不依賴客戶端會話, 而且隻有在客戶端明確刪除它的時候才會被刪除(不一定是創建的客戶端), 一個臨時的znode不應該有子節點, 即臨時性的子節點.
臨時性znode綁定在客戶端會話上, 它們對所有客戶端都是可見的.
臨時性znode對於建立那些需要知道什麼時候某些分布式資源可以使用的應用非常有效.
在znode有改變時, Watch使客戶端了解相應的信息. Watch由ZooKeeper服務的操作來設置, 同時由服務的其他操作來觸發. 比如, 一個客戶端可能調用znode上的exists操作, 同時在這個節點上加了一個Watch, 如果這個節點不存在, 返回false, 如果一段時間之後, 這個znode由另一台客戶端建立了, 那麼Watch將被觸發, 通知第一台客戶端znode被創建的消息.
Watcher隻被觸發一次, 為了獲得多次提醒, 客戶端需要再次注冊Watcher.
更新ZooKeeper的操作是有限製的. delete或setData必須明確需要更新的znode的版本號, 如果版本號不匹配, 更新就會失敗. 更新操作是非阻塞的, 因此客戶端如果失去了一個更新, 它可以在不阻塞其他進行進程執行的情況下, 選擇重新嚐試或進行其他操作.
盡管ZooKeeper可以被看著是一個文件係統, 但是它處於便利性的考慮, 摒棄了一些文件係統的操作原語. 因為文件非常小並且是整體讀寫的, 所以不需要提供打開, 關閉或尋址操作.
ZooKeeper的異步化API使你能並行處理請求, 在某些場景下可以提供更好的吞吐量. 如果你想讀取大批量znode並且獨立的處理他們, 使用同步api的話, 那麼每個讀操作都會被阻塞, 直到它返回的那一刻, 相反使用異步化, 可以非常快的執行所有異步操作, 並且用不同的線程來處理響應.
讀操作exists, getChildren和getData都被設置了watch, 並且這些watch都由寫操作來觸發:create, delete和setData. ACL操作並不參與到watch中.
exists操作上的watch在被監視的znode創建, 刪除或數據更新時被觸發
getData操作上的watch在被監視的znode刪除或數據更新時觸發, 在創建時不能觸發, 因為隻有znode一定存在, getData操作才會成功.
getChildren操作上的watch在被監視的znode的子節點創建或刪除時, 或者當這個znode節點的子節點被刪除時被觸發. 可以通過查看watch事件類型來區分是znode, 還是它的子節點被刪除.
ACL
如果我們想要客戶端在example.com域中對znode進行讀訪問, 那麼可以對這個znode的ACL進行設置, 使用host模式, 帶有example.com的ID和READ許可, 在java中可以這樣創建ACL對象:
new ACL(Perms.READ, new Id("host", "example.com"));
exists並不受ACL許可控製.
ZooKeeper以複合模式運行在一組叫做ensemble的集群上, ZooKeeper通過複製來獲得高可用性, 同時, 隻要ensemble中的大部分機器都運行正常就可以提供服務. 比如說, 在一個5節點ensemble中, 可以在任何兩台機器故障的情況下服務仍在運作, 如果6節點的話, 也隻能承受兩台出現故障, 因此一個ensemble通常選擇奇數台機器.
ZooKeeper的思想非常簡單: 它所需要做的就是保證對znode樹的每一次修改都複製到ensemble中的大部分機器上去.
ZooKeeper采用了Zab協議, 它分為兩個階段, 並且可能被無限製的重複.
階段1:領導者選舉
在ensemble中的機器要參與一個選擇特殊成員的進程, 這個成員叫做領導者, 其他機器則叫做跟隨者, 在大部分的跟隨者與它們的領導者同步了狀態以後, 這個階段才算完成.
階段2:原子廣播
所有的寫操作請求被傳送給領導者, 並通過廣播將更新信息告訴給跟隨者, 當大部分跟隨者執行了修改後, 領導者就會提交更新操作, 客戶端將得到更新成功的回應.
如果領導者出現故障, 剩下的機器將會再次進行領導者選舉, 並在新領導者被選出來之前繼續執行任務. 如果在不久之後, 老的領導者恢複了. 那麼它將以跟隨者的身份繼續運行, 領導者的選舉非常快(200ms左右), 因此不會帶來明顯的延遲.
所有在ensemble中的機器在更新它們的內存中的znode樹之前會先將更新信息寫入磁盤. 讀操作請求可能由任何機器服務, 同時由於它們隻涉及到內存查找, 因此非常快.
一致性
在ensemble中的領導者和跟隨者非常聰明, 跟隨者通過來更新號來滯後領導者, 結果導致隻要大部分而不是所有ensemble確認更新
每一個對znode樹的更新都會給定一個全局標識, 叫zxid(ZooKeeper事務id).
ZooKeeper客戶端與ensemble中的服務器列表配置相一致, 在啟動時, 它嚐試與表中的一個服務器相連接, 如果連接失敗, 它就嚐試列表中的其他服務器, 以此類推, 直到最終連接到其中一個, 或者當ZooKeeper的所有服務器都無法連接時, 連接失敗.
一旦與ZooKeeper服務器連接成功, 服務器會創建與客戶端的一個新的會話. 每個會話都會有超時時間, 這個是在會話創建時設定的, 如果服務器在超時時段內得到請求, 它可能中斷該會話, 一旦會話中斷, 它可能不再打開, 與該會話相關的臨時性節點都將丟失.
在會話空閑的一定時間內, 都會由客戶端發起ping請求來保持活躍(猶如心跳)(ping是由zk客戶端自動發送, 不需要由程序來指定), 超時時段要設置的足夠小, 以便能檢測到服務器故障, 並且能在會話超時時連接到另外一台服務器.
創建複雜的臨時性節點狀態的應用, 應該設置更長的會話超時時間, 因為重建這些內容的代價非常昂貴, 在這種情況下, 應用程序可以有更多的時間來重啟, 從而避免會話過期. 每個會話都由服務器給定一個唯一的身份和密碼, 而且如果在建立連接時傳遞給zk的話, 它就能恢複會話(隻要沒有過期), 所以應用程序可以安全關閉, 同時因為存儲了身份和密碼, 它可以重新獲得這個身份和密碼並恢複會話.
一個zk實例一次隻能處於一種狀態, 一個zk實例在建立與zk服務器的建立時, 處於CONNECTING狀態, 一旦連接建立, 它就變成CONNECTED狀態了.
使用zk的客戶端可以通過注冊watcher的方法來獲取狀態的改變的消息. 一旦進入CONNECTED狀態, watcher將獲得一個KeeperState值為SyncConnected的WatchEvent.
zk的watcher對象有兩個職責, 一個是了解zk實例的狀態變化, 一個了解znode的改變, 初始化或者在在zk實例讀取znode節點信息的讀方法(exists方法和getData方法)上指定是否需要watch znode節點變化, 這裏的watcher可以是專門的, 也可以是zk實例構造函數中指定默認的.
zk可以作為配置的高可用存儲, 使應用的參與者可以恢複或更新配置文件, 使用zk的監測可以創建靈活配置服務, 有需要的客戶端可以以此來了解配置的改變.
public class ConnectionWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; private ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } public void create(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("created" + createPath); } public void close() throws InterruptedException { zk.close(); } } public class ActiveKeyValueStore extends ConnectionWatcher { private final Charset CHARSET = Charset.forName("UTF-8"); private final int MAX_RETRIES = 3; private final int RETRY_PERIOD_SECONDS = 5; public void write(String path, String value) throws KeeperException, InterruptedException { Stat stat = zk.exists(path, false); int retries = 0; while(true) { try { if (stat == null) { zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else { zk.setData(path, value.getBytes(CHARSET), -1); } }catch(KeeperException.SessionExpiredException e) { throw e; }catch(KeeperException e) { if (retries ++ == MAX_RETRIES) { throw e; } TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS); } } } public String read(String path, Watcher watcher) throws KeeperException, InterruptedException { byte[] data = zk.getData(path, watcher, null); return new String(data, CHARSET); } } public class ConfigWatcher implements Watcher { private ActiveKeyValueStore store; private String path; public ConfigWatcher(String hosts) throws IOException, InterruptedException { store = new ActiveKeyValueStore(); store.connect(hosts); } @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeDataChanged) { try { displayConfig(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } public void displayConfig() throws KeeperException, InterruptedException { System.out.println(store.read(path, this)); } public static void main(String[] args) throws Exception { ConfigWatcher watcher = new ConfigWatcher(args[0]); watcher.displayConfig(); Thread.sleep(Long.MAX_VALUE); } }
zk實例方法大多都帶有一個InterruptException異常, 可以通過調用被阻塞線程的interrupt()方法來拋出一個InterruptException異常來取消一個zk操作.
如果zk服務器發送錯誤信息或者服務器發生通信故障時, KeeperException將拋出, KeeperException用不同的子類來對應不同的出錯, 比如KeeperException.NoNoeException在不存在的znode上執行操作將會被拋出
KeeperException包括三種明確的種類:
狀態異常
通常出現在另一個進程在改變znode, 而當前進程沒有感知到該改變. 例如調用setData方法對znode進行更新時, 另一個進程也在更新, 此時將拋出BadVersionException, 對於這種可能發生的情況, 必須通過編碼重試來避免. 還有一些可能是程序錯誤, 比如在創建一個臨時節點時可能發生NoChildrenEphemeralsException.
可恢複異常
比如在一個會話中, 可能連接丟失, 將觸發ConnectionLossException, 此時可以通過重連來恢複會話, 保證會話的完整性.
冪等操作是指相同的結果可以被一次又一次應用的操作, 比如讀請求或者無條件的setData, 它可以簡單的被重新嚐試.
不可恢複異常
比如創建連接時出現驗證失敗, 會拋出AuthFailedException. 另外一種就是會話過期, 會拋出SessionExpireException, 此時狀態為CLOSED, 永遠無法重連. 對於這種情況, 可以通過在watcher中判斷KeeperState的狀態是否為Expired, 如果是則嚐試重新建立連接, 從而保證write方法的重試.
如果zk實例連接到zk服務器失敗, 會嚐試ensemble中的另一台, 如果所有服務器都無法連接, 將拋出IOException.
zookeeper應該隻運行在隻負責zookeeper的機器上, 有其他應用程序競爭資源會顯著影響zookeeper的性能.
每一個在ensemble集群中的zookeeper都有一個在集群中唯一的數字, 這個數字必須在1~255之間, 這個號碼在dataDir路徑下的純文本文件myId中.
在zookeeper配置文件中有這樣一行:
server.n=hostname:port:port
比如:
server.1=zookeeper1:2888:3888
當zookeeper服務器啟動時, 它讀取myid文件來判斷這是哪個服務器, 然後讀取配置文件來確定它需要監聽哪個端口, 以及ensemble中其他服務器的網絡地址
連接到zookeeper服務器的客戶端實例, 應該使用"zookeeper1:2181, zookeeper2:2181,zookeeper3:2181"來作為zookeeper實例的構造參數.
在複製模式中, 還有另外兩個屬性: initLimit, syncLimit
initLimit表示跟隨者連接到領導者可以與其同步的時間, 如果在這個時間內跟隨者無法與領導者進行同步, 那麼領導者將放棄領導地位, 重新選舉, 通過查看日誌, 如果發現這種事情經常發生, 說明這個時間設置短了, 需要加長一些.
syncLimit是允許一個跟隨者與領導者同步的時間, 如果跟隨者在這段時間內不能與領導者同步, 將重啟, 而與該跟隨者的連接將被連接到另外一台機器上.
最後更新:2017-04-02 22:16:26