閱讀544 返回首頁    go 阿裏雲 go 技術社區[雲棲]


跟著實例學習ZooKeeper的用法: Barrier

分布式Barrier是這樣一個類: 它會阻塞所有節點上的等待進程,知道某一個被滿足, 然後所有的節點繼續進行。

比如賽馬比賽中, 等賽馬陸續來到起跑線前。 一聲令下,所有的賽馬都飛奔而出。

柵欄Barrier

DistributedBarrier類實現了柵欄的功能。 它的構造函數如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要設置柵欄,它將阻塞在它上麵等待的線程:

setBarrier();

然後需要阻塞的線程調用“方法等待放行條件:

public void waitOnBarrier()

當條件滿足時,移除柵欄,所有等待的線程將繼續執行:

removeBarrier();

異常處理 DistributedBarrier 會監控連接狀態,當連接斷掉時waitOnBarrier()方法會拋出異常。

看一個例子:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " waits on Barrier");
                        barrier.waitOnBarrier();
                        System.out.println("Client #" + index + " begins");
                        return null;
                    }
                };
                service.submit(task);
            }

            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");


            controlBarrier.removeBarrier();


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}

這個例子創建了controlBarrier來設置柵欄和移除柵欄。 我們創建了5個線程,在此Barrier上等待。 最後移除柵欄後所有的線程才繼續執行。

如果你開始不設置柵欄,所有的線程就不會阻塞住。

雙柵欄Double Barrier

雙柵欄允許客戶端在計算的開始和結束時同步。當足夠的進程加入到雙柵欄時,進程開始計算, 當計算完成時,離開柵欄。 雙柵欄類是DistributedDoubleBarrier。 構造函數為:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成員數量,當enter方法被調用時,成員被阻塞,直到所有的成員都調用了enter。 當leave方法被調用時,它也阻塞調用線程, 知道所有的成員都調用了leave。 就像百米賽跑比賽, 發令槍響, 所有的運動員開始跑,等所有的運動員跑過終點線,比賽才結束。

DistributedBarrier 會監控連接狀態,當連接斷掉時enter()leave方法會拋出異常。

例子代碼:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " enters");
                        barrier.enter();
                        System.out.println("Client #" + index + " begins");
                        Thread.sleep((long) (3000 * Math.random()));
                        barrier.leave();
                        System.out.println("Client #" + index + " left");
                        return null;
                    }
                };
                service.submit(task);
            }


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}

最後更新:2017-05-23 10:32:28

  上一篇:go  LMAX Disruptor——一個高性能、低延遲且簡單的框架
  下一篇:go  Java FP: Java中函數式編程的Map和Fold(Reduce)