閱讀652 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Java8 CompletableFuture

fxjwind

Java8 CompletableFuture

https://colobu.com/2016/02/29/Java-CompletableFuture/

https://www.deadcoderising.com/java8-writing-asynchronous-code-with-completablefuture/

 

Java8 lamda

https://ifeve.com/lambda/

lamda可以認為是匿名函數,用過scala的都很熟悉

 

Java8 supplier

https://www.byteslounge.com/tutorials/java-8-consumer-and-supplier

supplier,

Suppliers represent a function that accepts no arguments and produce a result of some arbitrary type.

 

A Future represents the pending result of an asynchronous computation. It offers a method — get — that returns the result of the computation when it's done.

The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.

傳統的Future,提供異步計算的可能性

不過Future的結果需要用get去獲取,這個過程是block的或者輪詢,所以這樣又限製了異步

 

Beside implementing the Future interface, CompletableFuture also implements the CompletionStage interface.

CompletionStage is a promise. It promises that the computation eventually will be done.

The great thing about the CompletionStage is that it offers a vast selection of methods that let you attach callbacks that will be executed on completion.

This way we can build systems in a non-blocking fashion.

所以我們需要CompletableFuture,除了future接口,還實現CompletionStage接口

CompletionStage的作用是,提供給你很多方法,讓你可以attach各種callbacks用於future執行完成後的followup

這樣我們更容易build non-blocking的係統

例子,

Let's start with the absolute basics — creating a simple asynchronous computation.

CompletableFuture.supplyAsync(this::sendMsg);  

It's as easy as that.

supplyAsync takes a Supplier containing the code we want to execute asynchronously — in our case the sendMsgmethod.

If you've worked a bit with Futures in the past, you may wonder where the Executor went. If you want to, you can still define it as a second argument. However, if you leave it out it will be submitted to the ForkJoinPool.commonPool().

ForkJoinPool.commonPool

Java 8為ForkJoinPool添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的處理器數量

commonPool可以理解為由JVM管理的一個通用線程池,誰都可以用

 

加上callback

In the first example, we simply sent a message asynchronously by executing sendMsg in its own thread.

Now let's add a callback where we notify about how the sending of the message went.

CompletableFuture.supplyAsync(this::sendMsg)  
                 .thenAccept(this::notify);

thenAccept is one of many ways to add a callback. It takes a Consumer — in our case notify — which handles the result of the preceding computation when it's done.

thenAccept是加callback的一種方式,當然有很多種加callback的方式;

這裏thenAccept是一種Consumer function,consumer即接受參數沒有返回的一種function

在這裏notify會作為consumer會接收sendMsg是輸出作為輸入來繼續處理

加上多個callbacks

If you want to continue passing values from one callback to another, thenAccept won't cut it since Consumer doesn't return anything.

To keep passing values, you can simply use thenApply instead.

上麵說了thenAccept是一種Consumer,是不會有返回值的,所以如果要級聯多個callback,需要用thenApply,它是既有輸入參數,又有返回值

thenApply takes a Function which accepts a value, but also return one.

To see how this works, let's extend our previous example by first finding a receiver.

    CompletableFuture.supplyAsync(this::findReceiver)
                     .thenApply(this::sendMsg)
                     .thenAccept(this::notify);

Now the asynchronous task will first find a receiver, then send a message to the receiver before it passes the result on to the last callback to notify.

thenCompose

Until now, sendMsg has been a normal blocking function. Let's now assume that we got a sendMsgAsync method that returns aCompletionStage.

If we kept using thenApply to compose the example above, we would end up with nested CompletionStages.

CompletableFuture.supplyAsync(this::findReceiver)  
                 .thenApply(this::sendMsgAsync);

// Returns type CompletionStage<CompletionStage<String>> 

這個例子會出現多個異步的嵌套,

所以這裏用thenCompose,去除掉嵌套

CompletableFuture.supplyAsync(this::findReceiver)  
                 .thenCompose(this::sendMsgAsync);

// Returns type CompletionStage<String>

異步的執行callback

Until now all our callbacks have been executed on the same thread as their predecessor.

If you want to, you can submit the callback to the ForkJoinPool.commonPool() independently instead of using the same thread as the predecessor. This is done by using the async suffix version of the methods CompletionStage offers.

我們也可以異步的執行callback,

CompletableFuture<String> receiver  
            = CompletableFuture.supplyAsync(this::findReceiver);

receiver.thenApply(this::sendMsg);  
receiver.thenApply(this::sendOtherMsg);  

這個例子中,兩次的消息發送都是在當前線程中完成的,我們也可以給加上async前綴,讓他們異步的執行,

CompletableFuture<String> receiver  
            = CompletableFuture.supplyAsync(this::findReceiver);

receiver.thenApplyAsync(this::sendMsg);  
receiver.thenApplyAsync(this::sendMsg);

 

異常處理

CompletableFuture.supplyAsync(this::failingMsg)  
                 .exceptionally(ex -> new Result(Status.FAILED))
                 .thenAccept(this::notify);

exceptionally gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception.

This way succeeding callbacks can continue with the alternative result as input.

If you need more flexibility, check out whenComplete and handle for more ways of handling errors.

 

thenCombine

thenCombine allows us to register a BiFunction callback depending on the result of two CompletionStages.

CompletableFuture<String> to =  
    CompletableFuture.supplyAsync(this::findReceiver);

CompletableFuture<String> text =  
    CompletableFuture.supplyAsync(this::createContent);

to.thenCombine(text, this::sendMsg);  

上麵的是both的case

下麵的是either的case

Let’s say you have two sources of finding a receiver. You’ll ask both, but will be happy with the first one returning with a result.

CompletableFuture<String> firstSource =  
    CompletableFuture.supplyAsync(this::findByFirstSource);

CompletableFuture<String> secondSource =  
    CompletableFuture.supplyAsync(this::findBySecondSource);

firstSource.acceptEither(secondSource, this::sendMsg); 

最後更新:2017-04-07 21:25:10

  上一篇:go Copycat - 狀態
  下一篇:go Copycat - MemberShip