254
技術社區[雲棲]
java之JUC係列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch
前麵寫了兩篇JDBC源碼的文章,自己都覺得有點枯燥,先插一段JUC係列的文章來換換胃口,前麵有文章大概介紹過J U C包含的東西,JUC體係包含的內容也是非常的多,不是一兩句可以說清楚的,我這首先列出將會列舉的JUC相關的內容,然後介紹本文的版本:Tools部分
J.U.C體係的主要大板塊包含內容,如下圖所示:

注意這個裏麵每個部分都包含很多的類和處理器,而且是相互包含,相互引用的,相互實現的。
說到J UC其實就是說java的多線程等和鎖,前麵說過一些狀態轉換,中斷等,我們今天來用它的tools來實現一些有些小意思的東西,講到其他內容的時候,再來想想這寫tools是怎麼實現的。
tools是本文說要講到的重點,而tools主要包含哪些東西呢:

Tools也包含了5個部分的知識:Executors、Semaphor、Exchanger、CyclicBarrier、CountDownLatch,其實也就是五個工具類,這5個工具類有神馬用途呢,就是我們接下來要將的內容了。
Executors:
其實它主要用來創建線程池,代理了線程池的創建,使得你的創建入口參數變得簡單,通過方法名便知道了你要創建的線程池是什麼樣一個線程池,功能大概是什麼樣的,其實線程池內部都是統一的方法來實現,通過構造方法重載,使得實現不同的功能,但是往往這種方式很多時候不知道具體入口參數的改變有什麼意思,除非讀了源碼才知道,此時builder模式的方式來完成,builder什麼樣的東西它告訴你就可以。
常見的方法有(都是靜態方法):
1、創建一個指定大小的線程池,如果超過大小,放入blocken隊列中,默認是LinkedBlockingQueue,默認的ThreadFactory為:Executors.defaultThreadFactory(),是一個Executors的一個內部類。
Executors.newFixedThreadPool(int)
內部實現是:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newFixedThreadPool(int,ThreadFactory)
內部實現是:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
3、創建線程池長度為1的,也就是隻有一個長度的線程池,多餘的必須等待,它和調用Executors.newFixedThreadPool(1)得到的結果一樣:
Executors.newSingleThreadExecutor()
內部實現是:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}是不是蠻簡單的,就是在變參數,你自己也可以new的。
4、和方法3類似,可以自定義ThreadFactory,這裏就不多說了!
5、創建可以進行緩存的線程池,默認緩存60s,數據會放在一個SynchronousQueue上,而不會進入blocken隊列中,也就是隻要有線程進來就直接進入調度,這個不推薦使用,因為容易出問題,除非用來模擬一些並發的測試:
Executors.newCachedThreadPool();
內部實現為:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}6、和方法5類似,增加自定義ThreadFactory
7、添加一個Schedule的調度器的線程池,默認隻有一個調度:
Executors.newSingleThreadScheduledExecutor();
內部實現為(這裏可以看到不是用ThreadPoolExector了,schedule換了一個類,內部實現通過ScheduledThreadPoolExecutor類裏麵的內部類ScheduledFutureTask來實現的,這個內部類是private,默認是引用不到的哦):
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}8、和7一樣,增加自己定義的ThreadFactory
9、添加一個schedule的線程池調度器,和newFixedThreadPool有點類似:
Executors.newScheduledThreadPool();
內部代碼為:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其實內部Exectors裏麵還有一些其他的方法,我們就不多說明了,另外通過這裏,大家先可以了解一個大概,知道Exectors其實是一個工具類,提供一係列的靜態方法,來完成對對應線程池的形象化創建,所以不用覺得很神奇,神奇的是內部是如何實現的,本文我們不闡述文章中各種線程池的實現,隻是大概上有個認識,等到我們專門將Exector係列的時候,我們會詳細描述這些細節。
OK,我們繼續下一個話題:
Semaphor,這個鳥東西是敢毛吃的呢?
答:通過名字就看出來了,是信號量。
信號量可以幹什麼呢?
答:根據一些閥值做訪問控製。
OK,我們這裏模擬一個當多個線程並發一段代碼的時候,如何控製其訪問速度:
import java.util.Random;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private final static Semaphore MAX_SEMA_PHORE = new Semaphore(10);
public static void main(String []args) {
for(int i = 0 ; i < 100 ; i++) {
final int num = i;
final Random radom = new Random();
new Thread() {
public void run() {
boolean acquired = false;
try {
MAX_SEMA_PHORE.acquire();
acquired = true;
System.out.println("我是線程:" + num + " 我獲得了使用權!" + DateTimeUtil.getDateTime());
long time = 1000 * Math.max(1, Math.abs(radom.nextInt() % 10));
Thread.sleep(time);
System.out.println("我是線程:" + num + " 我執行完了!" + DateTimeUtil.getDateTime());
}catch(Exception e) {
e.printStackTrace();
}finally {
if(acquired) {
MAX_SEMA_PHORE.release();
}
}
}
}.start();
}
}
}
接下來:
Exchanger十個神馬鬼東西呢?
答:線程之間交互數據,且在並發時候使用,兩兩交換,交換中不會因為線程多而混亂,發送出去沒接收到會一直等,由交互器完成交互過程。
啥時候用,沒想到案例?
答:的確很少用,而且案例很少,不過的確有這種案例,Exchanger
import java.util.concurrent.Exchanger;
public class ExchangerTest {
public static void main(String []args) {
final Exchanger <Integer>exchanger = new Exchanger<Integer>();
for(int i = 0 ; i < 10 ; i++) {
final Integer num = i;
new Thread() {
public void run() {
System.out.println("我是線程:Thread_" + this.getName() + "我的數據是:" + num);
try {
Integer exchangeNum = exchanger.exchange(num);
Thread.sleep(1000);
System.out.println("我是線程:Thread_" + this.getName() + "我原先的數據為:" + num + " , 交換後的數據為:" + exchangeNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
這裏運行你可以看到,如果某個線程和另一個線程傳送了數據,它接受到的數據必然是另一個線程傳遞給他的,中間步驟由Exchanger去控製,其實你可以說,我自己隨機取選擇,不過中間的算法邏輯就要複雜一些了。
接下來:
CyclicBarrier,關卡模式,搞啥玩意的呢?
答:當你在很多環節需要卡住,要多個線程同時在這裏都達到後,再向下走,很有用途。
能否舉個例子,有點抽象?
答:團隊出去旅行,大家一起先達到酒店住宿,然後一起達到遊樂的地方遊玩,然後一起坐車回家,每次需要點名後確認相關人員均達到,然後LZ一聲令下,觸發,大夥就瘋子般的出發了。
下麵的例子也是以旅遊的方式來呈現給大家:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierTest {
private static final int THREAD_COUNT = 10;
private final static CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREAD_COUNT ,
new Runnable() {
public void run() {
System.out.println("======>我是導遊,本次點名結束,準備走下一個環節!");
}
}
);
public static void main(String []args)
throws InterruptedException, BrokenBarrierException {
for(int i = 0 ; i < 10 ; i++) {
new Thread(String.valueOf(i)) {
public void run() {
try {
System.out.println("我是線程:" + this.getName() + " 我們達到旅遊地點!");
CYCLIC_BARRIER.await();
System.out.println("我是線程:" + this.getName() + " 我開始騎車!");
CYCLIC_BARRIER.await();
System.out.println("我是線程:" + this.getName() + " 我們開始爬山!");
CYCLIC_BARRIER.await();
System.out.println("我是線程:" + this.getName() + " 我們回賓館休息!");
CYCLIC_BARRIER.await();
System.out.println("我是線程:" + this.getName() + " 我們開始乘車回家!");
CYCLIC_BARRIER.await();
System.out.println("我是線程:" + this.getName() + " 我們到家了!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
測試結果中可以發現,大家一起走到某個步驟後,導遊說:“我是導遊,本次點名結束,準備走下一個環節!”,然後才會進入下一個步驟,OK,這個有點意思吧,其實賽馬也是這個道理,隻是賽馬通常隻有一個步驟,所以我們還有一個方式是:
CountDownLatch的方式來完成賽馬操作,CountDownLatch是用計數器來做的,所以它不可以被複用,如果要多次使用,就要從新new一個出來才可以。我們下麵的代碼中,用兩組賽馬,每組5個參與者來,做一個簡單測試:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
private final static int GROUP_SIZE = 5;
public static void main(String []args) {
processOneGroup("分組1");
processOneGroup("分組2");
}
private static void processOneGroup(final String groupName) {
final CountDownLatch start_count_down = new CountDownLatch(1);
final CountDownLatch end_count_down = new CountDownLatch(GROUP_SIZE);
System.out.println("==========================>\n分組:" + groupName + "比賽開始:");
for(int i = 0 ; i < GROUP_SIZE ; i++) {
new Thread(String.valueOf(i)) {
public void run() {
System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已經準備就緒!");
try {
start_count_down.await();//等待開始指令發出即:start_count_down.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已執行完成!");
end_count_down.countDown();
}
}.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("各就各位,預備!");
start_count_down.countDown();//開始賽跑
try {
end_count_down.await();//等待多個賽跑者逐個結束
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("分組:" + groupName + "比賽結束!");
}
}
public class ThreadJoinTest {
private final static int GROUP_SIZE = 5;
public static void main(String []args) throws InterruptedException {
Thread []threadGroup1 = new Thread[5];
Thread []threadGroup2 = new Thread[5];
for(int i = 0 ; i < GROUP_SIZE ; i++) {
final int num = i;
threadGroup1[i] = new Thread() {
public void run() {
int j = 0;
while(j++ < 10) {
System.out.println("我是1號組線程:" + num + " 這個是我第:" + j + " 次運行!");
}
}
};
threadGroup2[i] = new Thread() {
public void run() {
int j = 0;
while(j++ < 10) {
System.out.println("我是2號組線程:" + num + " 這個是我第:" + j + " 次運行!");
}
}
};
threadGroup1[i].start();
}
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup1[i].join();
}
System.out.println("-==================>線程組1執行完了,該輪到俺了!");
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup2[i].start();
}
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup2[i].join();
}
System.out.println("全部結束啦!哈哈,回家喝稀飯!");
}
}
代碼是不是繁雜了不少,嗬嗬,我們再看看上麵的信號量,如果不用工具,自己寫會咋寫,我們模擬CAS鎖,使用Atomic配合完成咋來做呢。也來玩玩,嗬嗬:
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadWaitNotify {
private final static int THREAD_COUNT = 100;
private final static int QUERY_MAX_LENGTH = 2;
private final static AtomicInteger NOW_CALL_COUNT = new AtomicInteger(0);
public static void main(String []args) throws InterruptedException {
Thread []threads = new Thread[THREAD_COUNT];
for(int i = 0 ; i < THREAD_COUNT ; i++) {
threads[i] = new Thread(String.valueOf(i)) {
synchronized public void run() {
int nowValue = NOW_CALL_COUNT.get();
while(true) {
if(nowValue < QUERY_MAX_LENGTH && NOW_CALL_COUNT.compareAndSet(nowValue, nowValue + 1)) {
break;//獲取到了
}
try {
this.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
nowValue = NOW_CALL_COUNT.get();//獲取一個數據,用於對比
}
System.out.println(this.getName() + "======我開始做操作了!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName() + "======操作結束了!");
NOW_CALL_COUNT.getAndDecrement();
this.notify();
}
};
}
for(int i = 0 ; i < THREAD_COUNT ; i++) {
threads[i].start();
}
}
}
還是有點意思哈,這樣寫就是大部分人對while循環那部分會寫暈掉,主要是要不斷去判定和嚐試,wait()默認是長期等待,但是我們不想讓他長期等待,就等1s然後再嚐試,其實例子還可以改成wait一個隨機的時間範圍,這樣模擬的效果會更加好一些;另外實際的代碼中,如果獲取到鎖後,notify方法應當放在finally中,才能保證他肯定會執行notify這個方法。
OK,本文就是用,玩,希望玩得有點爽,我們後麵會逐步介紹它的實現機製以及一寫線程裏頭很好用,但是大家又不是經常用的東西。
最後更新:2017-04-04 07:03:45