閱讀607 返回首頁    go 阿裏雲 go 技術社區[雲棲]


哪個線程執行 CompletableFuture’s tasks 和 callbacks?

CompletableFuture盡管在2014年的三月隨著Java8被提出來,但它現在仍然是一種相對較新潮的概念。但也許這個類不為人所熟知是好事,因為它很容易被濫用,特別是涉及到使用線程和線程池的時候。而這篇文章的目的就是要描述線程是怎樣使用CompletableFuture的。

Running tasks

這是API的基礎部分,它有一個很實用的supplyAsync()方法,這個方法和ExecutorService.submit()很像,但不同的是返回CompletableFuture:

1 CompletableFuture.supplyAsync(() -> {
2             try (InputStream is = new URL("https://www.nurkiewicz.com").openStream()) {
3                 log.info("Downloading");
4                 return IOUtils.toString(is, StandardCharsets.UTF_8);
5             catch (IOException e) {
6                 throw new RuntimeException(e);
7             }
8         });

問題是supplyAsync()默認使用 ForkJoinPool.commonPool(),線程池由所有的CompletableFutures分享,所有的並行流和所有的應用都部署在同一個虛擬機上(如果你很不幸的仍在使用有很多人工部署的應用服務器)。這種硬編碼的,不可配置的線程池完全超出了我們的控製,很難去監測和度量。因此你應該指定你自己的Executor,就像這裏(也可以看看這裏幾種創造這樣Exetutor的方法):

1 ExecutorService pool = Executors.newFixedThreadPool(10);
2  
3 final CompletableFuture future =
4         CompletableFuture.supplyAsync(() -> {
5             //...
6         }, pool);

這僅僅是開始…

Callbacks and transformations

假如你想轉換給定的CompletableFuture,例如提取String的長度:

1 CompletableFuture intFuture =
2     future.thenApply(s -> s.length());

那麼是誰調用了s.length()?坦白點,我一點也不在乎。隻要涉及到lambda表達式,那麼所有的執行者像thenApply這樣的就是廉價的,我們並不關心是誰調用了lambda表達式。但如果這樣的表達式會占用一點點的CPU來完成阻塞的網絡通信那又會如何呢?

首先默認情況下會發生什麼?試想一下:我們有一個返回String類型的後台任務,當結果完成時我們想要異步地去執行特定的變換。最容易的實現方法是通過包裝一個原始的任務(返回String),任務完成時截獲它。當內部的task結束後,回調就開始執行,執行變換和返回改進的值。就像有一個麵介於我們的代碼和初始的計算結果之間(個人看法:這裏指的是下麵的future裏麵包含的task執行完畢返回結果s,然後立馬執行callback也就是thenApply裏麵的lambda表達式,這也就是為什麼作者說有一個麵位於初始計算結果和回調執行代碼之間)。那就是說這應該相當明顯了,s.length()的變換會在和執行原始任務相同的線程裏完成,哈?並不完全是這樣!(這裏指的是有時候變換的線程和執行原始任務的線程不是同一個線程,看下麵就知道)

01 CompletableFuture future =
02         CompletableFuture.supplyAsync(() -> {
03             sleepSeconds(2);
04             return "ABC";
05         }, pool);
06  
07 future.thenApply(s -> {
08     log.info("First transformation");
09     return s.length();
10 });
11  
12 future.get();
13 pool.shutdownNow();
14 pool.awaitTermination(1, TimeUnit.MINUTES);
15  
16 future.thenApply(s -> {
17     log.info("Second transformation");
18     return s.length();
19 });

如果future裏麵的task還在運行,那麼包含first transformation的 thenApply()就會一直處於掛起狀態。而這個task完成後thenApply()會立即執行,執行的線程和執行task的線程是同一個。然而在注冊第二次變換之前(也就是執行第二個thenApply()),我們將一直等待直到task完成(和第一個變換是一樣的,都需要等待)。更壞的情況是,我們完全地關閉了線程池,保證其他的代碼將不會執行。那麼哪個線程將要執行二次變換呢?我們都知道當注冊了callback的future完成時,二次變換必定會立刻執行。這就是說它是使用默認的主線程(來完成callback),上麵的代碼輸出如下:

pool-1-thread-1 | First transformation      main | Second transformation

二次變換在注冊的時候就意識到CompletableFuture已經完成了(指的是future裏麵的task已經返回結果,其實在第一次調用thenApply()之前就已經返回了,所以這一次不用等待task),因此它立刻執行了變換。由於此時已經沒有其他的線程,所以thenApply()就隻能在當前的main線程環境中被調用。最主要的原因還是因為這種行為機製在實際的變換成本很高時(如很耗時)很容易出錯。想象一下thenApply()內部的lambda表達式在進行一些繁重的計算或者阻塞的網絡調用,突然我們的異步 CompletableFuture阻塞了調用者線程!

Controlling callback’s thread pool

有兩種技術去控製執行回調和變換的線程,需要注意的是這些方法僅僅適用你的變換需要很高成本的時候,其他情況下可以忽略。那麼第一個方法可以選擇使用操作者的 *Async方法,例如:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 });

這一次second transformation被自動地卸載到了我們的老朋友線程ForkJoinPool.commonPool()中去了:

1 pool-1-thread-1                  | First transformation
2 ForkJoinPool.commonPool-worker-1 | Second transformation

但我們並不喜歡commonPool,所以我們提供自己的:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 }, pool2);

注意到這裏使用的是不同的線程池(pool-1 vs. pool-2):

1 pool-1-thread-1 | First transformation
2 pool-2-thread-1 | Second transformation

Treating callback like another computation step

我相信如果你在處理一些長時間運行的callbacks和transformations上有些麻煩(記住這篇文章同樣也適用於CompletableFuture的其他大部分方法),你應該簡單地使用其他表意明確的CompletableFuture,就像這樣:

01 //Imagine this is slow and costly
02 CompletableFuture<Integer> strLen(String s) {
03     return CompletableFuture.supplyAsync(
04             () -> s.length(),
05             pool2);
06 }
07  
08 //...
09  
10 CompletableFuture<Integer> intFuture =
11         future.thenCompose(s -> strLen(s));

這種方法更加明確,知道我們的變換有很大的開銷,我們不會將它運行在一些隨意的不可控的線程上。取而代之的是我們會將String到CompletableFuture<Integer>的變換封裝為一個異步操作。然而,我們必須用thenCompose()取代thenApply(),否則的話我們會得到CompletableFuture<CompletableFuture<Integer>>.

但如果我們的transformation 沒有一個能夠很好地處理嵌套CompletableFuture的形式怎麼辦,如applyToEither()會等待第一個Future完成然後執行transformation.

1 CompletableFuture<CompletableFuture<Integer>> poor =
2         future1.applyToEither(future2, s -> strLen(s));

這裏有個很實用的技巧,用來“展開”這類難以理解的數據結構,這種技巧叫flatten,通過使用flatMap(identity) (or flatMap(x -> x))。在我們的例子中flatMap()就叫做thenCompose:

1 CompletableFuture<Integer> good =
2         poor.thenCompose(x -> x);

我把它留給你,去弄懂它是怎樣和為什麼這樣工作的。我想這篇文章已經盡量清楚地闡述了線程是如何參與到CompletableFuture中去的。

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:02:50

  上一篇:go  我們的垃圾收集器
  下一篇:go  《React官方文檔》之教程Tutorial