Java8 CompletableFuture
Java8 CompletableFuture
https://colobu.com/2016/02/29/Java-CompletableFuture/
https://www.deadcoderising.com/java8-writing-asynchronous-code-with-completablefuture/
Java8 lamda
lamda可以認為是匿名函數,用過scala的都很熟悉
Java8 supplier
https://www.byteslounge.com/tutorials/java-8-consumer-and-supplier
supplier,
Suppliers represent a function that accepts no arguments and produce a result of some arbitrary type.
A Future represents the pending result of an asynchronous computation. It offers a method — get
— that returns the result of the computation when it's done.
The problem is that a call to get
is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.
傳統的Future,提供異步計算的可能性
不過Future的結果需要用get去獲取,這個過程是block的或者輪詢,所以這樣又限製了異步
Beside implementing the Future
interface, CompletableFuture
also implements the CompletionStage
interface.
A CompletionStage
is a promise. It promises that the computation eventually will be done.
The great thing about the CompletionStage
is that it offers a vast selection of methods that let you attach callbacks that will be executed on completion.
This way we can build systems in a non-blocking fashion.
所以我們需要CompletableFuture,除了future接口,還實現
CompletionStage接口
CompletionStage的作用是,提供給你很多方法,讓你可以attach各種callbacks用於future執行完成後的followup
這樣我們更容易build non-blocking的係統
例子,
Let's start with the absolute basics — creating a simple asynchronous computation.
CompletableFuture.supplyAsync(this::sendMsg);
It's as easy as that.
supplyAsync
takes a Supplier
containing the code we want to execute asynchronously — in our case the sendMsg
method.
If you've worked a bit with Futures in the past, you may wonder where the Executor
went. If you want to, you can still define it as a second argument. However, if you leave it out it will be submitted to the ForkJoinPool.commonPool()
.
ForkJoinPool.commonPool
Java 8為ForkJoinPool添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的處理器數量
commonPool可以理解為由JVM管理的一個通用線程池,誰都可以用
加上callback
In the first example, we simply sent a message asynchronously by executing sendMsg
in its own thread.
Now let's add a callback where we notify about how the sending of the message went.
CompletableFuture.supplyAsync(this::sendMsg)
.thenAccept(this::notify);
thenAccept
is one of many ways to add a callback. It takes a Consumer
— in our case notify
— which handles the result of the preceding computation when it's done.
thenAccept是加callback的一種方式,當然有很多種加callback的方式;
這裏thenAccept是一種Consumer function,consumer即接受參數沒有返回的一種function
在這裏notify會作為consumer會接收sendMsg是輸出作為輸入來繼續處理
加上多個callbacks
If you want to continue passing values from one callback to another, thenAccept
won't cut it since Consumer
doesn't return anything.
To keep passing values, you can simply use thenApply
instead.
上麵說了thenAccept是一種Consumer,是不會有返回值的,所以如果要級聯多個callback,需要用thenApply,它是既有輸入參數,又有返回值
thenApply
takes a Function
which accepts a value, but also return one.
To see how this works, let's extend our previous example by first finding a receiver.
CompletableFuture.supplyAsync(this::findReceiver)
.thenApply(this::sendMsg)
.thenAccept(this::notify);
Now the asynchronous task will first find a receiver, then send a message to the receiver before it passes the result on to the last callback to notify.
thenCompose
Until now, sendMsg
has been a normal blocking function. Let's now assume that we got a sendMsgAsync
method that returns aCompletionStage
.
If we kept using thenApply
to compose the example above, we would end up with nested CompletionStage
s.
CompletableFuture.supplyAsync(this::findReceiver)
.thenApply(this::sendMsgAsync);
// Returns type CompletionStage<CompletionStage<String>>
這個例子會出現多個異步的嵌套,
所以這裏用thenCompose,去除掉嵌套
CompletableFuture.supplyAsync(this::findReceiver)
.thenCompose(this::sendMsgAsync);
// Returns type CompletionStage<String>
異步的執行callback
Until now all our callbacks have been executed on the same thread as their predecessor.
If you want to, you can submit the callback to the ForkJoinPool.commonPool()
independently instead of using the same thread as the predecessor. This is done by using the async suffix version of the methods CompletionStage
offers.
我們也可以異步的執行callback,
CompletableFuture<String> receiver
= CompletableFuture.supplyAsync(this::findReceiver);
receiver.thenApply(this::sendMsg);
receiver.thenApply(this::sendOtherMsg);
這個例子中,兩次的消息發送都是在當前線程中完成的,我們也可以給加上async前綴,讓他們異步的執行,
CompletableFuture<String> receiver
= CompletableFuture.supplyAsync(this::findReceiver);
receiver.thenApplyAsync(this::sendMsg);
receiver.thenApplyAsync(this::sendMsg);
異常處理
CompletableFuture.supplyAsync(this::failingMsg)
.exceptionally(ex -> new Result(Status.FAILED))
.thenAccept(this::notify);
exceptionally
gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception.
This way succeeding callbacks can continue with the alternative result as input.
If you need more flexibility, check out whenComplete
and handle
for more ways of handling errors.
thenCombine
thenCombine
allows us to register a BiFunction
callback depending on the result of two CompletionStage
s.
CompletableFuture<String> to =
CompletableFuture.supplyAsync(this::findReceiver);
CompletableFuture<String> text =
CompletableFuture.supplyAsync(this::createContent);
to.thenCombine(text, this::sendMsg);
上麵的是both的case
下麵的是either的case
Let’s say you have two sources of finding a receiver. You’ll ask both, but will be happy with the first one returning with a result.
CompletableFuture<String> firstSource =
CompletableFuture.supplyAsync(this::findByFirstSource);
CompletableFuture<String> secondSource =
CompletableFuture.supplyAsync(this::findBySecondSource);
firstSource.acceptEither(secondSource, this::sendMsg);
最後更新:2017-04-07 21:25:10