閱讀973 返回首頁    go 技術社區[雲棲]


跟著實例學習ZooKeeper的用法: Curator框架應用

前麵的幾篇文章介紹了一些ZooKeeper的應用方法, 本文將介紹Curator訪問ZooKeeper的一些基本方法, 而不僅僅限於指定的Recipes, 你可以使用Curator API任意的訪問ZooKeeper。

CuratorFramework

Curator框架提供了一套高級的API, 簡化了ZooKeeper的操作。 它增加了很多使用ZooKeeper開發的特性,可以處理ZooKeeper集群複雜的連接管理和重試機製。 這些特性包括:

  • 自動化的連接管理: 重新建立到ZooKeeper的連接和重試機製存在一些潛在的錯誤case。 Curator幫助你處理這些事情,對你來說是透明的。
  • 清理API:
    • 簡化了原生的ZooKeeper的方法,事件等
    • 提供了一個現代的流式接口
  • 提供了Recipes實現: 如前麵的文章介紹的那樣,基於這些Recipes可以創建很多複雜的分布式應用

Curator框架通過CuratorFrameworkFactory以工廠模式和builder模式創建CuratorFramework實 例。 CuratorFramework實例都是線程安全的,你應該在你的應用中共享同一個CuratorFramework實例.

工廠方法newClient()提供了一個簡單方式創建實例。 而Builder提供了更多的參數控製。一旦你創建了一個CuratorFramework實例,你必須調用它的start()啟動,在應用退出時調用close()方法關閉.

下麵的例子演示了兩種創建Curator的方法:

package com.colobu.zkrecipe.framework;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class CreateClientExample {
    private static final String PATH = "/example/basic";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        try {
            client = createSimple(server.getConnectString());
            client.start();
            client.create().creatingParentsIfNeeded().forPath(PATH, "test".getBytes());
            CloseableUtils.closeQuietly(client);

            client = createWithOptions(server.getConnectString(), new ExponentialBackoffRetry(1000, 3), 1000, 1000);
            client.start();
            System.out.println(new String(client.getData().forPath(PATH)));
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }

    }

    public static CuratorFramework createSimple(String connectionString) {
        // these are reasonable arguments for the ExponentialBackoffRetry. 
        // The first retry will wait 1 second - the second will wait up to 2 seconds - the
        // third will wait up to 4 seconds.
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // The simplest way to get a CuratorFramework instance. This will use default values.
        // The only required arguments are the connection string and the retry policy
        return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
    }

    public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
        // using the CuratorFrameworkFactory.builder() gives fine grained control
        // over creation options. See the CuratorFrameworkFactory.Builder javadoc details
        return CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc. etc.
                .build();
    }
}

Curator框架提供了一種流式接口。 操作通過builder串聯起來, 這樣方法調用類似語句一樣。

client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

CuratorFramework提供的方法:

方法名 描述
create() 開始創建操作, 可以調用額外的方法(比如方式mode 或者後台執行background) 並在最後調用forPath()指定要操作的ZNode
delete() 開始刪除操作. 可以調用額外的方法(版本或者後台處理version or background)並在最後調用forPath()指定要操作的ZNode
checkExists() 開始檢查ZNode是否存在的操作. 可以調用額外的方法(監控或者後台處理)並在最後調用forPath()指定要操作的ZNode
getData() 開始獲得ZNode節點數據的操作. 可以調用額外的方法(監控、後台處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操作的ZNode
setData() 開始設置ZNode節點數據的操作. 可以調用額外的方法(版本或者後台處理) 並在最後調用forPath()指定要操作的ZNode
getChildren() 開始獲得ZNode的子節點列表。 以調用額外的方法(監控、後台處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操作的ZNode
inTransaction() 開始是原子ZooKeeper事務. 可以複合create, setData, check, and/or delete 等操作然後調用commit()作為一個原子操作提交

後台操作的通知和監控可以通過ClientListener接口發布. 你可以在CuratorFramework實例上通過addListener()注冊listener, Listener實現了下麵的方法:

  • eventReceived() 一個後台操作完成或者一個監控被觸發

事件類型以及事件的方法如下:

Event Type Event Methods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GETDATA getResultCode(), getPath(), getStat() and getData()
SETDATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
WATCHED getWatchedEvent()

還可以通過ConnectionStateListener接口監控連接的狀態。 強烈推薦你增加這個監控器。

你可以使用命名空間Namespace避免多個應用的節點的名稱衝突。 CuratorFramework提供了命名空間的概念,這樣CuratorFramework會為它的API調用的path加上命名空間:

CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
 ...
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"

Curator還提供了臨時的CuratorFramework: CuratorTempFramework, 一定時間不活動後連接會被關閉。這hi基於Camille Fournier的一篇文章: https://whilefalse.blogspot.com/2012/12/building-global-highly-available.html.

創建builder時不是調用build()而是調用buildTemp()。 3分鍾不活動連接就被關閉,你也可以指定不活動的時間。 它隻提供了下麵幾個方法:

    public void     close();
    public CuratorTransaction inTransaction() throws Exception;
    public TempGetDataBuilder getData() throws Exception;

操作方法

上麵的表格列出了CuratorFramework可以用的操作。 下麵就是一個例子:

package com.colobu.zkrecipe.framework;

import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;

public class CrudExample {

    public static void main(String[] args) {

    }

    public static void create(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given ZNode with the given data
        client.create().forPath(path, payload);
    }

    public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given EPHEMERAL ZNode with the given data
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }

    public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given
        // data using Curator protection.
        return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
    }

    public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception {
        // set data for the given node
        client.setData().forPath(path, payload);
    }

    public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this is one method of getting event/async notifications
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                // examine event for details
            }
        };
        client.getCuratorListenable().addListener(listener);
        // set data for the given node asynchronously. The completion
        // notification
        // is done via the CuratorListener.
        client.setData().inBackground().forPath(path, payload);
    }

    public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

    public static void delete(CuratorFramework client, String path) throws Exception {
        // delete the given node
        client.delete().forPath(path);
    }

    public static void guaranteedDelete(CuratorFramework client, String path) throws Exception {
        // delete the given node and guarantee that it completes
        client.delete().guaranteed().forPath(path);
    }

    public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
        /**
         * Get children and set a watcher on the node. The watcher notification
         * will come through the CuratorListener (see setDataAsync() above).
         */
        return client.getChildren().watched().forPath(path);
    }

    public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher) throws Exception {
        /**
         * Get children and set the given watcher on the node.
         */
        return client.getChildren().usingWatcher(watcher).forPath(path);
    }
}

事務

上麵也提到, CuratorFramework提供了事務的概念,可以將一組操作放在一個原子事務中。 什麼叫事務? 事務是原子的, 一組操作要麼都成功,要麼都失敗。

下麵的例子演示了事務的操作:

package com.colobu.zkrecipe.framework;

import java.util.Collection;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

public class TransactionExample {

    public static void main(String[] args) {

    }

    public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception {
        // this example shows how to use ZooKeeper's new transactions
        Collection<CuratorTransactionResult> results = client.inTransaction().create().forPath("/a/path", "some data".getBytes())
                .and().setData().forPath("/another/path", "other data".getBytes())
                .and().delete().forPath("/yet/another/path")
                .and().commit(); // IMPORTANT!
                                                                                                                                // called
        for (CuratorTransactionResult result : results) {
            System.out.println(result.getForPath() + " - " + result.getType());
        }
        return results;
    }

    /*
     * These next four methods show how to use Curator's transaction APIs in a
     * more traditional - one-at-a-time - manner
     */
    public static CuratorTransaction startTransaction(CuratorFramework client) {
        // start the transaction builder
        return client.inTransaction();
    }

    public static CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction) throws Exception {
        // add a create operation
        return transaction.create().forPath("/a/path", "some data".getBytes()).and();
    }

    public static CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction) throws Exception {
        // add a delete operation
        return transaction.delete().forPath("/another/path").and();
    }

    public static void commitTransaction(CuratorTransactionFinal transaction) throws Exception {
        // commit the transaction
        transaction.commit();
    }
}

最後更新:2017-05-23 10:32:08

  上一篇:go  跟著實例學習ZooKeeper的用法: 臨時節點
  下一篇:go  Disruptor Wizard已死,Disruptor Wizard永存!