406
魔獸
利用Doug Lea的並發包實現帶超時機製的線程池
jdk5引入的concurrent包來自於Doug Lea的卓越貢獻。最近我在查找服務器OOM的原因之後,決定采用這個包重寫應用中一個servlet,這個servlet調用了一個阻塞方法,當被阻塞之後,服務器中的線程數(因為阻塞了,後續請求不斷地新增線程)突然增加導致了服務器當機,因此決定采用一個線程池,並且設置超時,如果阻塞方法超過一定時間就取消線程。因為我們的項目仍然跑在jdk 1.4.2上麵,短期內不可能升級到jdk5,還是要利用這個並發包。去這裏下載源碼並自己打包成jar,加入項目的lib,然後利用PooledExecutor和TimedCallable來實現我們的需求。首先是線程池,相當簡單:
import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* <p>類說明:線程池</p>
* <p>注意事項:</p>
* <pre></pre>
* <p>創建日期:Sep 7, 2007 1:25:33 PM</p>
* @author:dennis zane
* @version $Id:$
*/
public class MyThreadPool{
private static PooledExecutor exec = new PooledExecutor(new BoundedBuffer(
20), 30);
static {
exec.setKeepAliveTime(1000 * 60 * 5);
exec.createThreads(5);
exec.setMinimumPoolSize(4);
}
public static void execute(final Runnable r) throws InterruptedException{
exec.execute(r);
}
public static void shutdown() {
exec.shutdownAfterProcessingCurrentlyQueuedTasks();
exec = null;
}
}
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* <p>類說明:線程池</p>
* <p>注意事項:</p>
* <pre></pre>
* <p>創建日期:Sep 7, 2007 1:25:33 PM</p>
* @author:dennis zane
* @version $Id:$
*/
public class MyThreadPool{
private static PooledExecutor exec = new PooledExecutor(new BoundedBuffer(
20), 30);
static {
exec.setKeepAliveTime(1000 * 60 * 5);
exec.createThreads(5);
exec.setMinimumPoolSize(4);
}
public static void execute(final Runnable r) throws InterruptedException{
exec.execute(r);
}
public static void shutdown() {
exec.shutdownAfterProcessingCurrentlyQueuedTasks();
exec = null;
}
}
靜態初始化並設置一個PoolExecutor,設置空閑線程的存活時間為5分鍾,設置最小線程數為4,最大線程數為30,一開始創建5個線程以待使用(根據各自的應用調整這些參數),另外提供了shutdown方法以供ServeltContextListener的contextDestroyed方法調用以關閉線程池。那麼,結合TimedCallable來實現提交線程的超時機製,調用類似:
//設置超時時間
private static final long timeout = 1000;
......
......
try{
Callable callable = new Callable() {
public Object call() {
return new YourProgram().run();
}
};
TimedCallable timedCallable = new TimedCallable(callable, timeout);
FutureResult future = new FutureResult();
Runnable cmd = future.setter(timedCallable);
//提交給線程池來執行
MyThreadPool.execute(cmd);
//獲取任務結果
YourObject obj= (YourObject) future.get();
......
......
} catch (InterruptedException e) {
if (e instanceof TimeoutException) {
log.error("任務超時");
...
}
}catch(InvocationTargetException e)
{
//清理任務..
}
......
private static final long timeout = 1000;
......
......
try{
Callable callable = new Callable() {
public Object call() {
return new YourProgram().run();
}
};
TimedCallable timedCallable = new TimedCallable(callable, timeout);
FutureResult future = new FutureResult();
Runnable cmd = future.setter(timedCallable);
//提交給線程池來執行
MyThreadPool.execute(cmd);
//獲取任務結果
YourObject obj= (YourObject) future.get();
......
......
} catch (InterruptedException e) {
if (e instanceof TimeoutException) {
log.error("任務超時");
...
}
}catch(InvocationTargetException e)
{
//清理任務..
}
......
如果不是很理解這段代碼,那麼也許你應該先看看jdk5引入的Future、FutureTask等類,或者看看這裏的文檔。對於超時時間的大小估算,你應當在生產環境中計算該阻塞方法的調用時間,正常運行一段時間,利用腳本語言(比如ruby、python)分析日誌以得到一個調用花費時間的最大值作為timeout,這裏的單位是毫秒。而線程池大小的估算,要看你提交給線程執行的任務的類型:如果是計算密集型的任務,那麼線程池的大小一般是(cpu個數+1);如果是IO密集型的任務(一般的web應用皆是此類),那麼估算有一個公式,
假設N-cpu是cpu個數,U是目標CPU使用率,W/C是任務的等待時間與計算時間的比率,那麼最優的池大小等於:
N-threads=N-cpu*U*(1+W/C)
文章轉自莊周夢蝶 ,原文發布時間2007-09-09
最後更新:2017-05-18 10:32:49