閱讀505 返回首頁    go 阿裏雲 go 技術社區[雲棲]


ExecutorService-10個要訣和技巧

ExecutorService抽象概念自Java5就已經提出來了,現在是2014年。順便提醒一下:Java5和Java6都已不被支持,Java7在半年內也將會這樣。我提出這個的原因是許多Java程序員仍然不能完全明白ExecutorService到底是怎樣工作的。還有很多地方要去學習,今天我會分享一些很少人知道的特性和實踐。然而這篇文章仍然是麵向中等程序員的,沒什麼特別高級的地方。

1. Name pool threads

我想強調一點的是,當在運行JVM或調試期間創建線程時,默認的線程池命名規則是pool-N-thread-M,這裏N代表線程池的序列數(每一次你創建一個線程池的時候,全局計數N就加1),而M則是某一個線程池的線程序列數。例如,pool-2-thread-3就意味著JVM生命周期中第2線程池的第3線程。具體可以查看:Executors.defaultThreadFactory()。這樣不具備描述性,JDK使得線程命名的過程有些微的複雜,因為命名的方法隱藏在ThreadFactory內部。幸運地是Guava有一個很有用的類:

1 import com.google.common.util.concurrent.ThreadFactoryBuilder;
2  
3 final ThreadFactory threadFactory = new ThreadFactoryBuilder()
4         .setNameFormat("Orders-%d")
5         .setDaemon(true)
6         .build();
7 final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

線程池默認創造的是非守護線程,由你來決定是否合適。

2. Switch names according to context

有一個我從 Supercharged jstack: How to Debug Your Servers at 100mph學到的小技巧。一旦我們記住了線程的名字,那麼在任何時刻我們都能夠改變它們!這是有道理的,因為線程轉儲顯示了類名和方法名,沒有參數和局部變量。通過調整線程名保留一些必要的事務標識符,我們可以很容易追蹤某一條運行緩慢或者造成死鎖的信息/記錄/查詢等。例如:

01 private void process(String messageId) {
02     executorService.submit(() -> {
03         final Thread currentThread = Thread.currentThread();
04         final String oldName = currentThread.getName();
05         currentThread.setName("Processing-" + messageId);
06         try {
07             //real logic here...
08         finally {
09             currentThread.setName(oldName);
10         }
11     });
12 }

在try-finally塊內部,當前線程被命名為Processing-WHATEVER-MESSAGE-ID-IS,當通過係統追蹤信息流時這可能會派上用場。

3. Explicit and safe shutdown

在客戶端線程和線程池之間有一個任務隊列,當你的應用關閉時,你必須關心兩件事:任務隊列會發生什麼;正在運行的任務會怎樣(這個時候將詳細介紹)。令人感到吃驚的是許多程序員並不會適當地或有意識地關閉線程池。這有兩個方法:要麼讓所有的任務隊列全都執行完(shutdown()),要麼舍棄它們(shutdownNow()),這依賴你使用的具體情況。例如如果我們提交一連串的任務並且想要它們在完成後盡可能快的返回,可以使用shutdown():

1 private void sendAllEmails(List<String> emails) throws InterruptedException {
2     emails.forEach(email ->
3             executorService.submit(() ->
4                     sendEmail(email)));
5     executorService.shutdown();
6     final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
7     log.debug("All e-mails were sent so far? {}", done);
8 }

在這個例子中我們發送了一堆e-mail,每一個都作為一個獨立的任務交給線程池。在提交了所有的任務之後我們執行shutdown使線程池不再接收新的任務。然後最多等待1minute直到所有的任務都完成。然而如果有些任務仍然處於掛起狀態,awaitTermination()將返回false,而那些在等待的任務會繼續執行。我知道一些人會使用新潮的用法:

1 emails.parallelStream().forEach(this::sendEmail);

你可能會覺得我太保守,但我喜歡去控製並行線程的數量。不用介意,還有一種優雅的shutdown()方法shutdownNow():

1 final List<Runnable> rejected = executorService.shutdownNow();
2 log.debug("Rejected tasks: {}", rejected.size());

這樣一來隊列中還在等待的任務將會被舍棄並被返回,但已經在運行的任務將會繼續。

4. Handle interruption with care

很少人知道Future接口的cancel,這裏我不想重複說明,你可以去看我以前的文章:

InterruptedException and interrupting threads explained

5. Monitor queue length and keep it bounded

不合適的線程池大小可能會造成運行緩慢、不穩定以及內存泄漏。如果你配置太少的線程,那麼任務隊列就會變大,消耗太多內存。另一方麵太多的線程又會由於過度頻繁的上下文切換而造成整個係統運行緩慢。所以觀察隊列的長度並將其限定在一定範圍內是很重要的,這樣的話過載的線程池會短暫拒絕新任務的提交:

1 final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
2 executorService = new ThreadPoolExecutor(n, n,
3         0L, TimeUnit.MILLISECONDS,
4         queue);

上麵的代碼和Executors.newFixedThreadPool(n)是等價的,然而不同的是默認情況下固定線程池使用的是無限製的LinkedBlockingQueue ,我們使用的是固定容量100的ArrayBlockingQueue。這就意味著如果已經有100個任務在排隊(其中有n個任務正在執行),那麼新的任務就將被駁回並拋出RejectedExecutionException。一旦在外部可以訪問queue ,那麼我們就可以周期性地調用size(),並把它提交到logs/JMX或其他任何你使用的監視器中。

6. Remember about exception handling

下麵代碼段的結果是什麼?

1 executorService.submit(() -> {
2     System.out.println(1 0);
3 });

我深受其苦:它不會打印任何東西。不會拋出java.lang.ArithmeticException: / by zero,什麼也沒有。線程池將忽略這個異常,就像它從來沒發生過。如果上麵的代碼是用java.lang.Thread偶然創造的,那麼UncaughtExceptionHandler可能會起作用。但在線程池裏你就要多加小心了。如果你正在提交Runnable (沒有返回結果,就像上麵),那麼你必須將整個代碼塊用try-catch包起來,至少要log一下。如果你提交的是Callable,確保你總是使用阻塞的get()方法來重拋異常:

1 final Future<Integer> division = executorService.submit(() -> 1 0);
2 //below will throw ExecutionException caused by ArithmeticException
3 division.get();

有趣的是就算是Spring框架在處理這個bug的時候會使用@Async,詳細: SPR-8995SPR-12090

7. Monitor waiting time in a queue

監控工作隊列深度又是一個層麵,在排除單個事務或任務的故障時,有必要了解從任務的提交到實際執行耗時多長。這種等待時間最好趨近於零(當線程池中有空閑的線程時),但任務又不得不在隊列中排隊導致等待時間變長。而且如果池內沒有一定數量的線程,在運行新任務時可能需要創造新的線程,而這個過程也是要消耗少量時間的。為了能夠清楚地監測這個時間,我們使用類似下麵的代碼包裝原始的ExecutorService :

01 public class WaitTimeMonitoringExecutorService implements ExecutorService {
02  
03     private final ExecutorService target;
04  
05     public WaitTimeMonitoringExecutorService(ExecutorService target) {
06         this.target = target;
07     }
08  
09     @Override
10     public <T> Future<T> submit(Callable<T> task) {
11         final long startTime = System.currentTimeMillis();
12         return target.submit(() -> {
13                     final long queueDuration = System.currentTimeMillis() - startTime;
14                     log.debug("Task {} spent {}ms in queue", task, queueDuration);
15                     return task.call();
16                 }
17         );
18     }
19  
20     @Override
21     public <T> Future<T> submit(Runnable task, T result) {
22         return submit(() -> {
23             task.run();
24             return result;
25         });
26     }
27  
28     @Override
29     public Future<?> submit(Runnable task) {
30         return submit(new Callable<Void>() {
31             @Override
32             public Void call() throws Exception {
33                 task.run();
34                 return null;
35             }
36         });
37     }
38  
39     //...
40  
41 }

這並不是完整的實現,但你得知道這個基本概念。當我們向線程池提交任務的那一刻,就立馬開始測量時間,而任務一開始被執行就停止測量。不要被上麵源碼中很接近的startTime 和queueDuration 所迷惑了,事實上這兩行是在不同的線程中執行的,可能有數毫秒甚至數秒的差別,例如:

1 Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue

8. Preserve client stack trace

響應式編程這段日子似乎比較火,Reactive manifesto,reactive streams,RxJava(剛剛發布1.0),Clojure agents,scala.rx…,這些東西都挺好的,但它們的堆棧跟蹤將不再友好,大多數堆棧跟蹤沒有什麼卵用。舉個例子,當線程池中的任務拋出了一個異常:

1 java.lang.NullPointerException: null
2     at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
3     at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
4     at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
5     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
6     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
7     at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

我們很容易就發現MyTask在76行拋出了空指針異常,但我們並不知道是誰提交了這個任務,因為堆棧跟蹤僅僅隻是告訴你Thread和 ThreadPoolExecutor的信息。我們能通過源碼從技術上定位MyTask被創造的位置,不需要線程(更不必說事件驅動、響應式編程)我們就能夠馬上看到全麵信息。如果我們保留客戶端代碼(提交任務的代碼)的堆棧跟蹤並在出現故障的時候將其打印出來會怎麼樣?這不是什麼新想法,例如Hazelcast會將當前點發生的異常傳送回客戶端代碼,下麵就看看保持客戶端堆棧跟蹤是怎樣實現的:

01 public class ExecutorServiceWithClientTrace implements ExecutorService {
02  
03     protected final ExecutorService target;
04  
05     public ExecutorServiceWithClientTrace(ExecutorService target) {
06         this.target = target;
07     }
08  
09     @Override
10     public <T> Future<T> submit(Callable<T> task) {
11         return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
12     }
13  
14     private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
15         return () -> {
16             try {
17                 return task.call();
18             catch (Exception e) {
19                 log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
20                 throw e;
21             }
22         };
23     }
24  
25     private Exception clientTrace() {
26         return new Exception("Client stack trace");
27     }
28  
29     @Override
30     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throwsInterruptedException {
31         return tasks.stream().map(this::submit).collect(toList());
32     }
33  
34     //...
35  
36 }

這次一旦出現異常我們將檢索任務被提交地方的所有堆棧跟蹤和線程名,和標準異常相比下麵的異常信息更有價值:

01 Exception java.lang.NullPointerException in task submitted from thrad main here:
02 java.lang.Exception: Client stack trace
03     at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
04     at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
05     at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
06     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
07     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
08     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
09     at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
10     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9. Prefer CompletableFuture

Java 8提出了強大的CompletableFuture,請盡可能的使用它。ExecutorService並沒有擴展支持這個強大的抽象,所以你要小心使用它。用:

1 final CompletableFuture<BigDecimal> future =
2     CompletableFuture.supplyAsync(this::calculate, executorService);

代替:

1 final Future<BigDecimal> future =
2     executorService.submit(this::calculate);

CompletableFuture繼承了Future及其所有功能,而且CompletableFuture所提供的擴展功能極大地豐富了我們的API。

10. Synchronous queue

SynchronousQueue是一種有趣的BlockingQueue但真正意義上並不是queue,事實上它連數據結構都算不上。要解釋的話它算是0容量的隊列,引用JavaDoc:

each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]

Synchronous queues are similar to rendezvous channels used in CSP and Ada.

這和線程池有什麼關係呢?試著在ThreadPoolExecutor中使用SynchronousQueue:

1 BlockingQueue<Runnable> queue = new SynchronousQueue<>();
2 ExecutorService executorService = new ThreadPoolExecutor(22,
3         0L, TimeUnit.MILLISECONDS,
4         queue);

我們創造了有兩個線程的線程池和一個SynchronousQueue,因為SynchronousQueue本質上是零容量的隊列,因此如果有空閑線程,ExecutorService隻會執行新的任務。如果所有的線程都被占用,新任務會被立刻拒絕不會等待。當進程背景要求立刻啟動或者被丟棄時,這種機製是可取的。
以上,希望你們能夠找到至少一個有用的!

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:02:57

  上一篇:go  《循序漸進學Spark 》Spark架構與集群環境
  下一篇:go  《React官方文檔》之Getting Started