java之JUC係列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch
前麵寫了兩篇JDBC源碼的文章,自己都覺得有點枯燥,先插一段JUC係列的文章來換換胃口,前麵有文章大概介紹過J U C包含的東西,JUC體係包含的內容也是非常的多,不是一兩句可以說清楚的,我這首先列出將會列舉的JUC相關的內容,然後介紹本文的版本:Tools部分
J.U.C體係的主要大板塊包含內容,如下圖所示:
注意這個裏麵每個部分都包含很多的類和處理器,而且是相互包含,相互引用的,相互實現的。
說到J UC其實就是說java的多線程等和鎖,前麵說過一些狀態轉換,中斷等,我們今天來用它的tools來實現一些有些小意思的東西,講到其他內容的時候,再來想想這寫tools是怎麼實現的。
tools是本文說要講到的重點,而tools主要包含哪些東西呢:
Tools也包含了5個部分的知識:Executors、Semaphor、Exchanger、CyclicBarrier、CountDownLatch,其實也就是五個工具類,這5個工具類有神馬用途呢,就是我們接下來要將的內容了。
Executors:
其實它主要用來創建線程池,代理了線程池的創建,使得你的創建入口參數變得簡單,通過方法名便知道了你要創建的線程池是什麼樣一個線程池,功能大概是什麼樣的,其實線程池內部都是統一的方法來實現,通過構造方法重載,使得實現不同的功能,但是往往這種方式很多時候不知道具體入口參數的改變有什麼意思,除非讀了源碼才知道,此時builder模式的方式來完成,builder什麼樣的東西它告訴你就可以。
常見的方法有(都是靜態方法):
1、創建一個指定大小的線程池,如果超過大小,放入blocken隊列中,默認是LinkedBlockingQueue,默認的ThreadFactory為:Executors.defaultThreadFactory(),是一個Executors的一個內部類。
Executors.newFixedThreadPool(int)
內部實現是:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newFixedThreadPool(int,ThreadFactory)
內部實現是:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
3、創建線程池長度為1的,也就是隻有一個長度的線程池,多餘的必須等待,它和調用Executors.newFixedThreadPool(1)得到的結果一樣:
Executors.newSingleThreadExecutor()
內部實現是:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }是不是蠻簡單的,就是在變參數,你自己也可以new的。
4、和方法3類似,可以自定義ThreadFactory,這裏就不多說了!
5、創建可以進行緩存的線程池,默認緩存60s,數據會放在一個SynchronousQueue上,而不會進入blocken隊列中,也就是隻要有線程進來就直接進入調度,這個不推薦使用,因為容易出問題,除非用來模擬一些並發的測試:
Executors.newCachedThreadPool();
內部實現為:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }6、和方法5類似,增加自定義ThreadFactory
7、添加一個Schedule的調度器的線程池,默認隻有一個調度:
Executors.newSingleThreadScheduledExecutor();
內部實現為(這裏可以看到不是用ThreadPoolExector了,schedule換了一個類,內部實現通過ScheduledThreadPoolExecutor類裏麵的內部類ScheduledFutureTask來實現的,這個內部類是private,默認是引用不到的哦):
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }8、和7一樣,增加自己定義的ThreadFactory
9、添加一個schedule的線程池調度器,和newFixedThreadPool有點類似:
Executors.newScheduledThreadPool();
內部代碼為:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
其實內部Exectors裏麵還有一些其他的方法,我們就不多說明了,另外通過這裏,大家先可以了解一個大概,知道Exectors其實是一個工具類,提供一係列的靜態方法,來完成對對應線程池的形象化創建,所以不用覺得很神奇,神奇的是內部是如何實現的,本文我們不闡述文章中各種線程池的實現,隻是大概上有個認識,等到我們專門將Exector係列的時候,我們會詳細描述這些細節。
OK,我們繼續下一個話題:
Semaphor,這個鳥東西是敢毛吃的呢?
答:通過名字就看出來了,是信號量。
信號量可以幹什麼呢?
答:根據一些閥值做訪問控製。
OK,我們這裏模擬一個當多個線程並發一段代碼的時候,如何控製其訪問速度:
import java.util.Random; import java.util.concurrent.Semaphore; public class SemaphoreTest { private final static Semaphore MAX_SEMA_PHORE = new Semaphore(10); public static void main(String []args) { for(int i = 0 ; i < 100 ; i++) { final int num = i; final Random radom = new Random(); new Thread() { public void run() { boolean acquired = false; try { MAX_SEMA_PHORE.acquire(); acquired = true; System.out.println("我是線程:" + num + " 我獲得了使用權!" + DateTimeUtil.getDateTime()); long time = 1000 * Math.max(1, Math.abs(radom.nextInt() % 10)); Thread.sleep(time); System.out.println("我是線程:" + num + " 我執行完了!" + DateTimeUtil.getDateTime()); }catch(Exception e) { e.printStackTrace(); }finally { if(acquired) { MAX_SEMA_PHORE.release(); } } } }.start(); } } }
接下來:
Exchanger十個神馬鬼東西呢?
答:線程之間交互數據,且在並發時候使用,兩兩交換,交換中不會因為線程多而混亂,發送出去沒接收到會一直等,由交互器完成交互過程。
啥時候用,沒想到案例?
答:的確很少用,而且案例很少,不過的確有這種案例,Exchanger
import java.util.concurrent.Exchanger; public class ExchangerTest { public static void main(String []args) { final Exchanger <Integer>exchanger = new Exchanger<Integer>(); for(int i = 0 ; i < 10 ; i++) { final Integer num = i; new Thread() { public void run() { System.out.println("我是線程:Thread_" + this.getName() + "我的數據是:" + num); try { Integer exchangeNum = exchanger.exchange(num); Thread.sleep(1000); System.out.println("我是線程:Thread_" + this.getName() + "我原先的數據為:" + num + " , 交換後的數據為:" + exchangeNum); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } } }
這裏運行你可以看到,如果某個線程和另一個線程傳送了數據,它接受到的數據必然是另一個線程傳遞給他的,中間步驟由Exchanger去控製,其實你可以說,我自己隨機取選擇,不過中間的算法邏輯就要複雜一些了。
接下來:
CyclicBarrier,關卡模式,搞啥玩意的呢?
答:當你在很多環節需要卡住,要多個線程同時在這裏都達到後,再向下走,很有用途。
能否舉個例子,有點抽象?
答:團隊出去旅行,大家一起先達到酒店住宿,然後一起達到遊樂的地方遊玩,然後一起坐車回家,每次需要點名後確認相關人員均達到,然後LZ一聲令下,觸發,大夥就瘋子般的出發了。
下麵的例子也是以旅遊的方式來呈現給大家:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class BarrierTest { private static final int THREAD_COUNT = 10; private final static CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREAD_COUNT , new Runnable() { public void run() { System.out.println("======>我是導遊,本次點名結束,準備走下一個環節!"); } } ); public static void main(String []args) throws InterruptedException, BrokenBarrierException { for(int i = 0 ; i < 10 ; i++) { new Thread(String.valueOf(i)) { public void run() { try { System.out.println("我是線程:" + this.getName() + " 我們達到旅遊地點!"); CYCLIC_BARRIER.await(); System.out.println("我是線程:" + this.getName() + " 我開始騎車!"); CYCLIC_BARRIER.await(); System.out.println("我是線程:" + this.getName() + " 我們開始爬山!"); CYCLIC_BARRIER.await(); System.out.println("我是線程:" + this.getName() + " 我們回賓館休息!"); CYCLIC_BARRIER.await(); System.out.println("我是線程:" + this.getName() + " 我們開始乘車回家!"); CYCLIC_BARRIER.await(); System.out.println("我是線程:" + this.getName() + " 我們到家了!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }.start(); } } }
測試結果中可以發現,大家一起走到某個步驟後,導遊說:“我是導遊,本次點名結束,準備走下一個環節!”,然後才會進入下一個步驟,OK,這個有點意思吧,其實賽馬也是這個道理,隻是賽馬通常隻有一個步驟,所以我們還有一個方式是:
CountDownLatch的方式來完成賽馬操作,CountDownLatch是用計數器來做的,所以它不可以被複用,如果要多次使用,就要從新new一個出來才可以。我們下麵的代碼中,用兩組賽馬,每組5個參與者來,做一個簡單測試:
import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { private final static int GROUP_SIZE = 5; public static void main(String []args) { processOneGroup("分組1"); processOneGroup("分組2"); } private static void processOneGroup(final String groupName) { final CountDownLatch start_count_down = new CountDownLatch(1); final CountDownLatch end_count_down = new CountDownLatch(GROUP_SIZE); System.out.println("==========================>\n分組:" + groupName + "比賽開始:"); for(int i = 0 ; i < GROUP_SIZE ; i++) { new Thread(String.valueOf(i)) { public void run() { System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已經準備就緒!"); try { start_count_down.await();//等待開始指令發出即:start_count_down.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我是線程組:【" + groupName + "】,第:" + this.getName() + " 號線程,我已執行完成!"); end_count_down.countDown(); } }.start(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("各就各位,預備!"); start_count_down.countDown();//開始賽跑 try { end_count_down.await();//等待多個賽跑者逐個結束 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("分組:" + groupName + "比賽結束!"); } }
public class ThreadJoinTest { private final static int GROUP_SIZE = 5; public static void main(String []args) throws InterruptedException { Thread []threadGroup1 = new Thread[5]; Thread []threadGroup2 = new Thread[5]; for(int i = 0 ; i < GROUP_SIZE ; i++) { final int num = i; threadGroup1[i] = new Thread() { public void run() { int j = 0; while(j++ < 10) { System.out.println("我是1號組線程:" + num + " 這個是我第:" + j + " 次運行!"); } } }; threadGroup2[i] = new Thread() { public void run() { int j = 0; while(j++ < 10) { System.out.println("我是2號組線程:" + num + " 這個是我第:" + j + " 次運行!"); } } }; threadGroup1[i].start(); } for(int i = 0 ; i < GROUP_SIZE ; i++) { threadGroup1[i].join(); } System.out.println("-==================>線程組1執行完了,該輪到俺了!"); for(int i = 0 ; i < GROUP_SIZE ; i++) { threadGroup2[i].start(); } for(int i = 0 ; i < GROUP_SIZE ; i++) { threadGroup2[i].join(); } System.out.println("全部結束啦!哈哈,回家喝稀飯!"); } }
代碼是不是繁雜了不少,嗬嗬,我們再看看上麵的信號量,如果不用工具,自己寫會咋寫,我們模擬CAS鎖,使用Atomic配合完成咋來做呢。也來玩玩,嗬嗬:
import java.util.concurrent.atomic.AtomicInteger; public class ThreadWaitNotify { private final static int THREAD_COUNT = 100; private final static int QUERY_MAX_LENGTH = 2; private final static AtomicInteger NOW_CALL_COUNT = new AtomicInteger(0); public static void main(String []args) throws InterruptedException { Thread []threads = new Thread[THREAD_COUNT]; for(int i = 0 ; i < THREAD_COUNT ; i++) { threads[i] = new Thread(String.valueOf(i)) { synchronized public void run() { int nowValue = NOW_CALL_COUNT.get(); while(true) { if(nowValue < QUERY_MAX_LENGTH && NOW_CALL_COUNT.compareAndSet(nowValue, nowValue + 1)) { break;//獲取到了 } try { this.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } nowValue = NOW_CALL_COUNT.get();//獲取一個數據,用於對比 } System.out.println(this.getName() + "======我開始做操作了!"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getName() + "======操作結束了!"); NOW_CALL_COUNT.getAndDecrement(); this.notify(); } }; } for(int i = 0 ; i < THREAD_COUNT ; i++) { threads[i].start(); } } }
還是有點意思哈,這樣寫就是大部分人對while循環那部分會寫暈掉,主要是要不斷去判定和嚐試,wait()默認是長期等待,但是我們不想讓他長期等待,就等1s然後再嚐試,其實例子還可以改成wait一個隨機的時間範圍,這樣模擬的效果會更加好一些;另外實際的代碼中,如果獲取到鎖後,notify方法應當放在finally中,才能保證他肯定會執行notify這個方法。
OK,本文就是用,玩,希望玩得有點爽,我們後麵會逐步介紹它的實現機製以及一寫線程裏頭很好用,但是大家又不是經常用的東西。
最後更新:2017-04-04 07:03:45