支持生產阻塞的線程池
在各種並發編程模型中,生產者-消費者模式大概是最常用的了。在實際工作中,對於生產消費的速度,通常需要做一下權衡。通常來說,生產任務的速度要大於消費的速度。一個細節問題是,隊列長度,以及如何匹配生產和消費的速度。
一個典型的生產者-消費者模型如下:
在並發環境下利用J.U.C提供的Queue實現可以很方便地保證生產和消費過程中的線程安全。這裏需要注意的是,Queue必須設置初始容量,防止生產者生產過快導致隊列長度暴漲,最終觸發OutOfMemory。
對於一般的生產快於消費的情況。當隊列已滿時,我們並不希望有任何任務被忽略或得不到執行,此時生產者可以等待片刻再提交任務,更好的做法是,把生 產者阻塞在提交任務的方法上,待隊列未滿時繼續提交任務,這樣就沒有浪費的空轉時間了。阻塞這一點也很容易,BlockingQueue就是為此打造 的,ArrayBlockingQueue和LinkedBlockingQueue在構造時都可以提供容量做限製,其中 LinkedBlockingQueue是在實際操作隊列時在每次拿到鎖以後判斷容量。
更進一步,當隊列為空時,消費者拿不到任務,可以等一會兒再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,當有任務時 便可以立即獲得執行,建議調用take的帶超時參數的重載方法,超時後線程退出。這樣當生產者事實上已經停止生產時,不至於讓消費者無限等待。
於是一個高效的支持阻塞的生產消費模型就實現了。
等一下,既然J.U.C已經幫我們實現了線程池,為什麼還要采用這一套東西?直接用ExecutorService不是更方便?
我們來看一下ThreadPoolExecutor的基本結構:
可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已經幫我們實現好了,並且直接采用線程池的實現還有很多優勢,例如線程數的動態調整等。
但問題在於,即便你在構造ThreadPoolExecutor時手動指定了一個BlockingQueue作為隊列實現,事實上當隊列滿 時,execute方法並不會阻塞,原因在於ThreadPoolExecutor調用的是BlockingQueue非阻塞的offer方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
這時候就需要做一些事情來達成一個結果:當生產者提交任務,而隊列已滿時,能夠讓生產者阻塞住,等待任務被消費。
關鍵在於,在並發環境下,隊列滿不能由生產者去判斷,不能調用ThreadPoolExecutor.getQueue().size()來判斷隊列是否滿。
線程池的實現中,當隊列滿時會調用構造時傳入的RejectedExecutionHandler去拒絕任務的處理。默認的實現是AbortPolicy,直接拋出一個RejectedExecutionException。
幾種拒絕策略在這裏就不贅述了,這裏和我們的需求比較接近的是CallerRunsPolicy,這種策略會在隊列滿時,讓提交任務的線程去執行任務,相當於讓生產者臨時去幹了消費者幹的活兒,這樣生產者雖然沒有被阻塞,但提交任務也會被暫停。
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
但這種策略也有隱患,當生產者較少時,生產者消費任務的時間裏,消費者可能已經把任務都消費完了,隊列處於空狀態,當生產者執行完任務後才能再繼續生產任務,這個過程中可能導致消費者線程的饑餓。
參考類似的思路,最簡單的做法,我們可以直接定義一個RejectedExecutionHandler,當隊列滿時改為調用BlockingQueue.put來實現生產者的阻塞:
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
這樣,我們就無需再關心Queue和Consumer的邏輯,隻要把精力集中在生產者和消費者線程的實現邏輯上,隻管往線程池提交任務就行了。
相比最初的設計,這種方式的代碼量能減少不少,而且能避免並發環境的很多問題。當然,你也可以采用另外的手段,例如在提交時采用信號量做入口限製等,但是如果僅僅是要讓生產者阻塞,那就顯得複雜了。
文章轉自 並發編程網-ifeve.com
最後更新:2017-05-22 17:31:56
上一篇:
穀歌發布基於機器學習的Android APP安全檢測係統:Google Play Protect
下一篇:
Inter Thread Latency
六城新能源汽車調查報告發布,各級城市消費者都關注什麼?
kettle數據同步的五種方案
java.util.concurrent包(6)——CyclicBarrier使用
開發中的版本問題(1)—查看JDK、TOMCAT版本
錯誤整理:No plugin found for prefix 'jetty' in the current project and in the plugin groups
【轉】數據流圖懂不懂?
Object-C中的類-類的聲明
九度題目1087:約數的個數
initWithFormat 和stringWithFormat的區別
5分鍾學習基於Go,go-microservice-template,Minke的微服務