閱讀283 返回首頁    go 阿裏雲 go 技術社區[雲棲]


跟著實例學習ZooKeeper的用法: 計數器

這一篇文章我們將學習使用Curator來實現計數器。 顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個集群共享的計數器。 隻要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數,一個用long來計數。

SharedCount

這個類使用int類型來計數。 主要涉及三個類。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字麵值和帶版本信息的值VersionedValue。

例子代碼:

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

import com.google.common.collect.Lists;

public class SharedCounterExample implements SharedCountListener{
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterExample example = new SharedCounterExample();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        count.start();
                        Thread.sleep(rand.nextInt(10000));
                        System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                        return null;
                    }
                };
                service.submit(task);
            }



            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }


    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);        
    }

}

在這個例子中,我們使用baseCount來監聽計數值(addListener方法)。 任意的SharedCount, 隻要使用相同的path,都可以得到這個計數值。 然後我們使用5個線程為計數值增加一個10以內的隨機數。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

這裏我們使用trySetCount去設置計數器。 第一個參數提供當前的VersionedValue,如果期間其它client更新了此計數值, 你的更新可能不成功, 但是這時你的client更新了最新的值,所以失敗了你可以嚐試再更新一次。 而setCount是強製更新計數器的值。

注意計數器必須start,使用完之後必須調用close關閉它。

在這裏再重複一遍前麵講到的, 強烈推薦你監控ConnectionStateListener, 盡管我們的有些例子沒有監控它。 在本例中SharedCountListener擴展了ConnectionStateListener。 這一條針對所有的Curator recipes都適用,後麵的文章中就不專門提示了。

DistributedAtomicLong

再看一個Long類型的計數器。 除了計數的範圍比SharedCount大了之外, 它首先嚐試使用樂觀鎖的方式設置計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。 還記得InterProcessMutex是什麼嗎? 它是我們前麵跟著實例學習ZooKeeper的用法: 分布式鎖 講的分布式可重入鎖。 這和上麵的計數器的實現有顯著的不同。

可以從它的內部實現DistributedAtomicValue.trySet中看出端倪。

    AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此計數器有一係列的操作:

  • get(): 獲取當前值
  • increment(): 加一
  • decrement(): 減一
  • add(): 增加特定的值
  • subtract(): 減去特定的值
  • trySet(): 嚐試設置計數值
  • forceSet(): 強製設置計數值

必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作後的值。

我們下麵的例子中使用5個線程對計數器進行加一操作,如果成功,將操作前後的值打印出來。

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;

import com.google.common.collect.Lists;

public class DistributedAtomicLongExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            List<DistributedAtomicLong> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        try {
                            //Thread.sleep(rand.nextInt(1000));
                            AtomicValue<Long> value = count.increment();
                            //AtomicValue<Long> value = count.decrement();
                            //AtomicValue<Long> value = count.add((long)rand.nextInt(20));
                            System.out.println("succeed: " + value.succeeded());
                            if (value.succeeded())
                                System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }

    }

}

最後更新:2017-05-23 10:32:25

  上一篇:go  Java FP: Java中函數式編程的Map和Fold(Reduce)
  下一篇:go  深入理解Java內存模型(四)——volatile