閱讀551 返回首頁    go 阿裏雲 go 技術社區[雲棲]


並發工具類(二)同步屏障CyclicBarrier

簡介

CyclicBarrier 的字麵意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。

實例代碼如下:

public class CyclicBarrierTest {

	static CyclicBarrier c = new CyclicBarrier(2);

	public static void main(String[] args) {
		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					c.await();
				} catch (Exception e) {

				}
				System.out.println(1);
			}
		}).start();

		try {
			c.await();
		} catch (Exception e) {

		}
		System.out.println(2);
	}
}

輸出

2
1

或者輸出

1
2

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主線程和子線程會永遠等待,因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,所以之前到達屏障的兩個線程都不會繼續執行。

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更複雜的業務場景。代碼如下:

public class CyclicBarrierTest2 {

	static CyclicBarrier c = new CyclicBarrier(2, new A());

	public static void main(String[] args) {
		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					c.await();
				} catch (Exception e) {

				}
				System.out.println(1);
			}
		}).start();

		try {
			c.await();
		} catch (Exception e) {

		}
		System.out.println(2);
	}

	static class A implements Runnable {

		@Override
		public void run() {
			System.out.println(3);
		}

	}

}

輸出

3
1
2

CyclicBarrier的應用場景

CyclicBarrier可以用於多線程計算數據,最後合並計算結果的應用場景。比如我們用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個帳戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet裏的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch的計數器隻能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。
  • CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。比如以下代碼執行完之後會返回true。

isBroken的使用代碼如下:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest3 {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
            }
        });
        thread.start();
        thread.interrupt();
        try {
            c.await();
        } catch (Exception e) {
            System.out.println(c.isBroken());
        }
    }
}

輸出

true

最後更新:2017-05-23 17:02:54

  上一篇:go  智慧城市數據安全防護如何開展?美國聖地亞哥案例探索
  下一篇:go  AKKA文檔(java版)