CompletableFuture 不能被中斷
我之前寫過一篇關於InterruptedException and interrupting threads的文章。總之,如果你調用Future.cancel(),那麼Future不僅會終止正在等待的get(),還會試圖去中斷底層的線程。這是個很重要的特征,它能夠使線程池變得更加利於使用。我在之前的文章中也說過,相對於標準的Future,盡量使用CompletableFuture。但事實證明,Future的更加強大的兄弟-CompletableFuture並不能優雅地處理cancel()。
請思考下麵的任務代碼,在接下來的測試中會用到:
class InterruptibleTask implements Runnable { private final CountDownLatch started = new CountDownLatch(1) private final CountDownLatch interrupted = new CountDownLatch(1) @Override void run() { started.countDown() try { Thread.sleep(10_000) } catch (InterruptedException ignored) { interrupted.countDown() } } void blockUntilStarted() { started.await() } void blockUntilInterrupted() { assert interrupted.await(1, TimeUnit.SECONDS) } }
客戶端線程可以檢查InterruptibleTask是否已經開始運行或者是被中斷了。首先,我們可以從外部查看InterruptibleTask到底會對cancel()作出怎麼樣的反應:
def "Future is cancelled without exception"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) } def "CompletableFuture is cancelled via CancellationException"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) }
到目前為止一切順利,Future和CompletableFuture都以幾乎相同的方式工作著-在cancel之後取回結果會拋出CancellationException(這裏要解釋一下,Future.cancel()是不會拋出異常的,而CompletableFuture.cancel()則會以拋出CancellationException強行結束,上麵的代碼作者都手動拋出了CancellationException)。但在myThreadPool中的線程會怎樣呢?我猜會被中斷然後被線程池重新回收,我大錯特錯!
def "should cancel Future"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() } @Ignore("Fails with CompletableFuture") def "should cancel CompletableFuture"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() }
第一個測試提交普通的Runnable給ExecutorService然後等待直到它開始執行,接著我們取消Future等待直到拋出InterruptedException,當底層的線程被中斷的時候blockUntilInterrupted()會返回。第二個測試失敗了,CompletableFuture.cancel()不會中斷線程,盡管Future看起來被取消了,但後台線程仍然在執行,sleep()不會拋出InterruptionException。這是一個bug還是這就是CompletableFuture的特點?你們可以查看此文檔,不幸地是這就是它的特點:
Parameters: mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.
RTFM(Read The Fucking Manual),但為什麼CompletableFuture會以這樣的方式工作?首先讓我們檢查一下“老的”Future的實現與CompletableFuture的有什麼不同。FutureTask會在執行ExecutorService.submit()之後返回,而且它的cancel()有如下的實現(我移除了Unsafe以及相似的非線程安全的Java代碼,所以僅僅把它當作偽代碼看待):
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state state = INTERRUPTED; } } } finally { finishCompletion(); } return true; }
FutureTask的state變量狀態如下圖:
萬一執行cancel(),我們要麼進入CANCELLED狀態,要麼通過INTERRUPTING進入INTERRUPTED。這裏的核心部分是我們要獲取runner線程(如果存在,例如如果task正在被執行)然後試著去中斷它。這裏要小心對於正在運行的線程的強製中斷。最後在finishCompletion()中我們要通知所有阻塞在Future.get()的線程(這一步在這裏無關痛癢可以忽略)。所以我們可以直觀的看到老的Future是如何取消正在運行的tasks的。那CompletableFuture呢?它的cancel()偽代碼如下:
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = false; if (result == null) { result = new AltResult(new CancellationException()); cancelled = true; } postComplete(); return cancelled || isCancelled(); }
這相當令人失望,我們很少把result賦值為CancellationException而忽略mayInterruptIfRunning標誌。postComplete()的作用和finishCompletion()的作用相似,通知所有注冊在future下的正在等待的回調操作。這種實現相當讓人不愉快(使用了非阻塞的Treiber stack),但它的確沒有中斷任何底層的線程。
Reasons and implications
CompletableFuture的這種cancel限製並不是bug,而是一種設計決定。CompletableFuture天生就沒有和任何線程綁定在一起,但Future卻幾乎總是代表在後台運行的task。使用new關鍵字創造一個CompletableFuture(new CompletableFuture<>())就很好,這時沒有任何底層的線程去取消。但是仍然有大部分的CompletableFuture和後台的task以及線程有聯係,在這種情況下有問題的cancel()就是一個潛在的問題。我不建議盲目地用CompletableFuture替換Future,因為如果程序裏麵有cancel(),那麼替換可能會改變程序的行為。這就意味著CompletableFuture有意地違背了裏氏替換原則,我們要認真思考這樣做的含義。
最後更新:2017-05-19 14:32:14