閱讀132 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Java8並發教程:Threads和Executors

歡迎閱讀我的Java8並發教程的第一部分。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進行並發編程。這是一係列教程中的第一部分。在接下來的15分鍾,你將會學會如何通過線程,任務(tasks)和 exector services來並行執行代碼。

  • 第一部分:Threads和Executors
  • 第二部分:同步和鎖

並發在Java5中首次被引入並在後續的版本中不斷得到增強。在這篇文章中介紹的大部分概念同樣適用於以前的Java版本。不過我的代碼示例聚焦於Java8,大量使用lambda表達式和其他新特性。如果你對lambda表達式不屬性,我推薦你首先閱讀我的Java 8 教程

Threads 和 Runnables

所有的現代操作係統都通過進程和線程來支持並發。進程是通常彼此獨立運行的程序的實例,比如,如果你啟動了一個Java程序,操作係統產生一個新的進程,與其他程序一起並行執行。在這些進程的內部,我們使用線程並發執行代碼,因此,我們可以最大限度的利用CPU可用的核心(core)。

Java從JDK1.0開始執行線程。在開始一個新的線程之前,你必須指定由這個線程執行的代碼,通常稱為task。這可以通過實現Runnable——一個定義了一個無返回值無參數的run()方法的函數接口,如下麵的代碼所示:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

因為Runnable是一個函數接口,所以我們利用lambda表達式將當前的線程名打印到控製台。首先,在開始一個線程前我們在主線程中直接運行runnable。

控製台輸出的結果可能像下麵這樣:

Hello main
Hello Thread-0
Done!

或者這樣:

Hello main
Done!
Hello Thread-0

由於我們不能預測這個runnable是在打印’done’前執行還是在之後執行。順序是不確定的,因此在大的程序中編寫並發程序是一個複雜的任務。

我們可以將線程休眠確定的時間。在這篇文章接下來的代碼示例中我們可以通過這種方法來模擬長時間運行的任務。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

當你運行上麵的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鍾的延遲。TimeUnit在處理單位時間時一個有用的枚舉類。你可以通過調用Thread.sleep(1000)來達到同樣的目的。

使用Thread類是很單調的且容易出錯。由於並發API在2004年Java5發布的時候才被引入。這些API位於java.util.concurrent包下,包含很多處理並發編程的有用的類。自從這些並發API引入以來,在隨後的新的Java版本發布過程中得到不斷的增強,甚至Java8提供了新的類和方法來處理並發。

接下來,讓我們走進並發API中最重要的一部——executor services。

Executors

並發API引入了ExecutorService作為一個在程序中直接使用Thread的高層次的替換方案。Executos支持運行異步任務,通常管理一個線程池,這樣一來我們就不需要手動去創建新的線程。在不斷地處理任務的過程中,線程池內部線程將會得到複用,因此,在我們可以使用一個executor service來運行和我們想在我們整個程序中執行的一樣多的並發任務。

下麵是使用executors的第一個代碼示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);

});

// => Hello pool-1-thread-1

Executors類提供了便利的工廠方法來創建不同類型的 executor services。在這個示例中我們使用了一個單線程線程池的 executor。

代碼運行的結果類似於上一個示例,但是當運行代碼時,你會注意到一個很大的差別:Java進程從沒有停止!Executors必須顯式的停止-否則它們將持續監聽新的任務。

ExecutorService提供了兩個方法來達到這個目的——shutdwon()會等待正在執行的任務執行完而shutdownNow()會終止所有正在執行的任務並立即關閉execuotr。

這是我喜歡的通常關閉executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor通過等待指定的時間讓當前執行的任務終止來“溫柔的”關閉executor。在等待最長5分鍾的時間後,execuote最終會通過中斷所有的正在執行的任務關閉。

Callables 和 Futures

除了Runnable,executor還支持另一種類型的任務——Callable。Callables也是類似於runnables的函數接口,不同之處在於,Callable返回一個值。

下麵的lambda表達式定義了一個callable:在休眠一分鍾後返回一個整數。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也可以像runnbales一樣提交給 executor services。但是callables的結果怎麼辦?因為submit()不會等待任務完成,executor service不能直接返回callable的結果。不過,executor 可以返回一個Future類型的結果,它可以用來在稍後某個時間取出實際的結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在將callable提交給exector之後,我們先通過調用isDone()來檢查這個future是否已經完成執行。我十分確定這會發生什麼,因為在返回那個整數之前callable會休眠一分鍾、

在調用get()方法時,當前線程會阻塞等待,直到callable在返回實際的結果123之前執行完成。現在future執行完畢,我們可以在控製台看到如下的結果:

future done? false
future done? true
result: 123

Future與底層的executor service緊密的結合在一起。記住,如果你關閉executor,所有的未中止的future都會拋出異常。

executor.shutdownNow();
future.get();

你可能注意到我們這次創建executor的方式與上一個例子稍有不同。我們使用newFixedThreadPool(1)來創建一個單線程線程池的 execuot service。 這等同於使用newSingleThreadExecutor不過使用第二種方式我們可以稍後通過簡單的傳入一個比1大的值來增加線程池的大小。

Timeouts

任何future.get()調用都會阻塞,然後等待直到callable中止。在最糟糕的情況下,一個callable持續運行——因此使你的程序將沒有響應。我們可以簡單的傳入一個時長來避免這種情況。

ExecutorService executor = Executors.newFixedThreadPool(1);

    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

    future.get(1, TimeUnit.SECONDS);

運行上麵的代碼將會產生一個TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已經猜到為什麼會拋出這個異常。我們指定的最長等待時間為1分鍾,而這個callable在返回結果之前實際需要兩分鍾。

invokeAll

Executors支持通過invokeAll()一次批量提交多個callable。這個方法結果一個callable的集合,然後返回一個future的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在這個例子中,我們利用Java8中的函數流(stream)來處理invokeAll()調用返回的所有future。我們首先將每一個future映射到它的返回值,然後將每個值打印到控製台。如果你還不屬性stream,可以閱讀我的Java8 Stream 教程

invokeAny

批量提交callable的另一種方式就是invokeAny(),它的工作方式與invokeAll()稍有不同。在等待future對象的過程中,這個方法將會阻塞直到第一個callable中止然後返回這一個callable的結果。

為了測試這種行為,我們利用這個幫助方法來模擬不同執行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

 

我們利用這個方法創建一組callable,這些callable擁有不同的執行時間,從1分鍾到3分鍾。通過invokeAny()將這些callable提交給一個executor,返回最快的callable的字符串結果-在這個例子中為任務2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上麵這個例子又使用了另一種方式來創建executor——調用newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool類型的 executor,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的線程池不同,ForkJoinPools使用一個並行因子數來創建,默認值為主機CPU的可用核心數。

ForkJoinPools 在Java7時引入,將會在這個係列後麵的教程中詳細講解。讓我們深入了解一下 scheduled executors 來結束本次教程。

Scheduled Executors

我們已經學習了如何在一個 executor 中提交和運行一次任務。為了持續的多次執行常見的任務,我們可以利用調度線程池。

ScheduledExecutorService支持任務調度,持續執行或者延遲一段時間後執行。

下麵的實例,調度一個任務在延遲3分鍾後執行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

調度一個任務將會產生一個專門的future類型——ScheduleFuture,它除了提供了Future的所有方法之外,他還提供了getDelay()方法來獲得剩餘的延遲。在延遲消逝後,任務將會並發執行。

為了調度任務持續的執行,executors 提供了兩個方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一個方法用來以固定頻率來執行一個任務,比如,下麵這個示例中,每分鍾一次:

ScheduledExecutorService executor =     Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,這個方法還接收一個初始化延遲,用來指定這個任務首次被執行等待的時長。

請記住:scheduleAtFixedRate()並不考慮任務的實際用時。所以,如果你指定了一個period為1分鍾而任務需要執行2分鍾,那麼線程池為了性能會更快的執行。

在這種情況下,你應該考慮使用scheduleWithFixedDelay()。這個方法的工作方式與上我們上麵描述的類似。不同之處在於等待時間 period 的應用是在一次任務的結束和下一個任務的開始之間。例如:

ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

這個例子調度了一個任務,並在一次執行的結束和下一次執行的開始之間設置了一個1分鍾的固定延遲。初始化延遲為0,任務執行時間為0。所以我們分別在0s,3s,6s,9s等間隔處結束一次執行。如你所見,scheduleWithFixedDelay()在你不能預測調度任務的執行時長時是很有用的。

這是並發係列教程的第以部分。我推薦你親手實踐一下上麵的代碼示例。你可以從 Github 上找到這篇文章中所有的代碼示例,所以歡迎你fork這個repo,給我星星

我希望你會喜歡這篇文章。如果你有任何的問題都可以在下麵評論或者通過 Twitter 給我回複。

最後更新:2017-05-22 16:37:39

  上一篇:go  並發編程網站推薦
  下一篇:go  軟件事務內存導論(一)前言