819
阿裏雲
技術社區[雲棲]
Java並發編程【1.2時代】
本文介紹了Java原生的多線程技術(1.2),通過詳細介紹wait和notify相關的機製、基礎的多線程技術以及基於這些技術的等待超時、線程間的通信技術和線程池高階技術,最後通過一個基於線程池的簡單文本web服務器—MollyServer,來闡明多線程帶來好處。通過介紹這些技術,展示了在沒有使用Java並發包的時代(1.5-)是如何完成Java的多線程編程,為理解Java5提供了良好幫助。
線程簡介
Java從誕生開始就明智的選擇內置對多線程的支持,這將Java語言同其他同一時期的語言相比,具有明顯優勢。線程作為操作係統最小的調度單元,多個線程同時執行,將會改善我們的代碼,在多核環境中具有更加明顯的好處,但是過多的創建線程和對線程的不當管理也容易造成問題。
啟動線程
構造線程
Java中啟動線程必須要先行的構造一個Thread對象,然後調用這個對象的start方法。
02 |
this .daemon = parent.isDaemon();
|
03 |
this .priority = parent.getPriority();
|
04 |
this .name = name.toCharArray();
|
05 |
if (security == null || isCCLOverridden(parent.getClass()))
|
06 |
this .contextClassLoader = parent.getContextClassLoader();
|
08 |
this .contextClassLoader = parent.contextClassLoader;
|
09 |
this .inheritedAccessControlContext = AccessController.getContext();
|
11 |
setPriority(priority);
|
12 |
if (parent.inheritableThreadLocals != null )
|
13 |
this .inheritableThreadLocals =
|
14 |
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
|
15 |
/* Stash the specified stack size in case the VM cares */
|
16 |
this.stackSize = stackSize;
|
線程的構造,最主要或者說也就是線程對象的初始化過程,在上述過程中,一個新構造的線程對象是由其parent線程來進行分配空間的,而child線程繼承了parent的是否Daemon,優先級和加載資源的classloader,棧空間的大小並且還會分配一個唯一的ID來標識這個child線程,至此一個能夠運行的線程對象就初始化好了,在堆內存中等待著運行。
啟動線程
調用Thread對象的start方法,就可啟動一個新的線程,parent線程同步告知Java VM,隻要線程規劃器空閑,應立即啟動這個線程。

而啟動線程,也是交給操作係統來完成,這裏就是一個本地方法了。
啟動一個線程時,最好設置名稱,這樣在jstack分析時,就會好很多,自定義的線程最好能夠起個名字。
05 |
public class ThreadName {
|
10 |
public static void main(String[] args) {
|
11 |
Thread t = new Thread( new Job());
|
12 |
t.setName( "ThreadNameJob" );
|
16 |
static class Job implements Runnable {
|
22 |
} catch (InterruptedException e) {
|
上述代碼直接運行,可以通過jstack pid來觀察棧信息,結果如下:
02 |
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode): |
04 |
"Attach Listener" daemon prio=10 tid=0x00007f4c38001000 nid=0x30b5 waiting on condition [0x0000000000000000] |
05 |
java.lang.Thread.State: RUNNABLE
|
07 |
"DestroyJavaVM" prio=10 tid=0x00007f4c60007800 nid=0x3086 waiting on condition [0x0000000000000000] |
08 |
java.lang.Thread.State: RUNNABLE
|
10 |
"ThreadNameJob" prio=10 tid=0x00007f4c600a2800 nid=0x3097 waiting on condition [0x00007f4c37cfb000] |
11 |
java.lang.Thread.State: TIMED_WAITING (sleeping)
|
12 |
at java.lang.Thread.sleep(Native Method)
|
13 |
at com.murdock.books.multithread.example.ThreadName$Job.run(ThreadName.java:26)
|
14 |
at java.lang.Thread.run(Thread.java:662)
|
16 |
"Low Memory Detector" daemon prio=10 tid=0x00007f4c60091800 nid=0x3095 runnable [0x0000000000000000] |
17 |
java.lang.Thread.State: RUNNABLE
|
19 |
"C2 CompilerThread1" daemon prio=10 tid=0x00007f4c6008f000 nid=0x3094 waiting on condition [0x0000000000000000] |
20 |
java.lang.Thread.State: RUNNABLE
|
22 |
"C2 CompilerThread0" daemon prio=10 tid=0x00007f4c6008c000 nid=0x3093 waiting on condition [0x0000000000000000] |
23 |
java.lang.Thread.State: RUNNABLE
|
25 |
"Signal Dispatcher" daemon prio=10 tid=0x00007f4c6008a000 nid=0x3092 runnable [0x0000000000000000] |
26 |
java.lang.Thread.State: RUNNABLE
|
28 |
"Finalizer" daemon prio=10 tid=0x00007f4c6006e000 nid=0x3091 in Object.wait() [0x00007f4c5c860000] |
29 |
java.lang.Thread.State: WAITING (on object monitor)
|
30 |
at java.lang.Object.wait(Native Method)
|
31 |
- waiting on <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock)
|
32 |
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
|
33 |
- locked <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock)
|
34 |
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
|
35 |
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
|
37 |
"Reference Handler" daemon prio=10 tid=0x00007f4c6006c000 nid=0x3090 in Object.wait() [0x00007f4c5c961000] |
38 |
java.lang.Thread.State: WAITING (on object monitor)
|
39 |
at java.lang.Object.wait(Native Method)
|
40 |
- waiting on <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock)
|
41 |
at java.lang.Object.wait(Object.java:485)
|
42 |
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
|
43 |
- locked <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock)
|
45 |
"VM Thread" prio=10 tid=0x00007f4c60065800 nid=0x308f runnable |
47 |
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4c6001a800 nid=0x3087 runnable |
49 |
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4c6001c800 nid=0x3088 runnable |
51 |
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4c6001e800 nid=0x3089 runnable |
53 |
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4c60020000 nid=0x308a runnable |
55 |
"VM Periodic Task Thread" prio=10 tid=0x00007f4c6009c000 nid=0x3096 waiting on condition |
57 |
JNI global references: 882 |
可以看到一個Java程序在運行時,後台創建了很多的線程,所以一個Java程序,縱使隻有main,它也是多線程的,其中可以看到ThreadNameJob這個線程,也可以看到本地以吞吐量優先的ParallelGC的線程,它的數量默認是和CPU相同的,其中有4個對新生代進行GC的線程。
終止線程

線程從執行Runnalbe開始到結束。
理解中斷
中斷是一種狀態,它使一個運行中的線程能夠感知到其他線程對自身作出了中斷操作,也就是影響到了自己。線程工作檢查自身是否被中斷來作出響應的行為。而該狀態並沒有維護在Thread中,是通過native方法獲得。
可以通過當前線程對象的isInterrupted來判斷是否被中斷了。
05 |
public class Interrupted {
|
10 |
public static void main(String[] args) throws Exception {
|
11 |
InterruptedJob ij = new InterruptedJob();
|
12 |
ij.setName( "InterruptedJobThread " );
|
19 |
System.out.println( "INTERRUPTED IJ" );
|
24 |
static class InterruptedJob extends Thread {
|
31 |
} catch (InterruptedException e) {
|
32 |
System.out.println( "CURRENT INTERRUPT STATUS IS "
|
33 |
+ Thread.currentThread().getName()
|
34 |
+ Thread.currentThread().isInterrupted());
|
36 |
Thread.currentThread().interrupt();
|
38 |
System.out.println( "CURRENT INTERRUPT STATUS IS "
|
39 |
+ Thread.currentThread().getName()
|
40 |
+ Thread.currentThread().isInterrupted());
|
上述程序輸出:
INTERRUPTED IJ
CURRENT INTERRUPT STATUS IS InterruptedJobThread false
CURRENT INTERRUPT STATUS IS InterruptedJobThread true
可以看出一旦拋出InterruptedException,當前線程的中斷狀態就被清除,但是也可以調用Thread.interrupted()來清除當前的中斷狀態。
線程屬性

Java中創建的線程均會映射為操作係統層麵的線程,在Java線程對象中有部分屬性可以提供訪問。線程狀態是理解線程運行的關鍵。
線程優先級
02 |
class Thread implements Runnable {
|
03 |
/* Make sure registerNatives is the first thing <clinit> does. */
|
04 |
private static native void registerNatives();
|
可以看到priority,這個代表著優先級,優先級的範圍從1到10,優先級高的線程占有CPU時間長一些,這當然是在長時間運行時體現出來的,但是不能做為程序執行的依據。
對priority可以通過對線程對象進行設置,使用setPriority來完成對線程優先級的設定。
下麵的例子中,構建了三個不同的線程,它們的優先級不一樣,從1到10,然後運行,優先級高的線程對times++執行的會多一些。
05 |
public class Priority {
|
06 |
private static CountDownLatch countDownLatch = new CountDownLatch( 10000000 );
|
08 |
private static CountDownLatch start = new CountDownLatch( 1 );
|
10 |
public static void main(String[] args) {
|
11 |
CountJob job1 = new CountJob();
|
12 |
Thread lingdao = new Thread(job1);
|
13 |
lingdao.setPriority( 10 );
|
16 |
CountJob job2 = new CountJob();
|
17 |
Thread pming = new Thread(job2);
|
21 |
CountJob job3 = new CountJob();
|
22 |
Thread zhongchan = new Thread(job3);
|
23 |
zhongchan.setPriority( 5 );
|
29 |
countDownLatch.await();
|
30 |
} catch (InterruptedException e) {
|
34 |
System.out.println( "lingdao : have " + job1.getTimes());
|
35 |
System.out.println( "pming : have" + job2.getTimes());
|
36 |
System.out.println( "zhongchan : have" + job3.getTimes());
|
40 |
static class CountJob implements Runnable {
|
42 |
private int times = 0 ;
|
49 |
} catch (InterruptedException e) {
|
53 |
while (countDownLatch.getCount() > 0 ) {
|
54 |
synchronized (CountJob. class ) {
|
55 |
if (countDownLatch.getCount() > 0 ) {
|
56 |
countDownLatch.countDown();
|
63 |
public int getTimes() {
|
執行結果如下:
lingdao : have 4347635
pming : have2661562
zhongchan : have2990803
每次執行的可能都不一樣,但是總的趨勢是高優先級的線程對CPU的占用時間會多一些。
線程狀態
線程在運行的生命周期中可能處於下麵的6種不同的狀態,在一個時刻,線程可能處於CPU上處於運行,或者暫時的沒有分配到CPU資源而處於就緒(準備運行),或者處於阻塞的狀態。具體內容如下麵的表格所示:
狀態名稱
|
阻塞
|
可以中斷
|
說明
|
運行中 |
N |
N |
正在CPU上進行執行
|
準備運行(就緒) |
N |
N |
暫時的失去CPU資源處於就緒隊列中,可能隨時被線程調度器調度執行
|
休眠 |
Y |
Y |
讓出CPU資源的就緒隊列,等待一段時間後再次被放入隊列,可以被中斷提前進入就緒隊列
|
等待 |
Y |
Y |
接受到通知或者等待超時會進入到就緒隊列,可以被中斷 |
阻塞於I/O
|
Y |
N |
I/O條件滿足後,例如讀入了一些字符,準備運行
|
阻塞於同步 |
Y |
N |
當獲得同步鎖後準備運行
|
可以使用如下狀態遷移來描述線程的狀態:

線程在一個時刻將會處於上述的三種狀態之一,這個模型將有效的理解Java線程對象,但是其中處於等待狀態的線程可能會在等待I/O和等待同步時無法被中斷,雖然運行的線程已經被中斷標識,但是不會像休眠和等待一樣通過InterruptedException來直接返回。
10 |
public class ReadInterrupted {
|
15 |
public static void main(String[] args) {
|
16 |
// 使用父線程,也就是main-thread
|
17 |
Thread thread = new Thread( new InterruptedJob(Thread.currentThread()));
|
20 |
InputStream is = System.in;
|
23 |
} catch (IOException e) {
|
27 |
System.out.println( "Main Thread is interrupted ? " + Thread.currentThread().isInterrupted());
|
30 |
static class InterruptedJob implements Runnable {
|
32 |
Thread interruptedThread;
|
34 |
public InterruptedJob(Thread thread) {
|
35 |
this .interruptedThread = thread;
|
42 |
} catch (InterruptedException e) {
|
46 |
interruptedThread.interrupt();
|
運行的結果是:
這時整個線程掛在is.read上,這時隨意從控製台輸入一個字符,主線程退出:
123
Main Thread is interrupted ? true
可以看出對阻塞於同步I/O的線程被中斷後,中斷標識被打上,但是不會拋出異常退出。
線程規劃
對高I/O的線程盡量給予高優先級的設定,對於低I/O以CPU運算為主的線程盡量降低優先級,避免過多的占用CPU。因此,不能依據線程優先級的高低來運行程序,需要保證每個線程都有運行的機會。
並發訪問對象

Java支
持多個線程同時的訪問一個對象,或者對象的變量,由於每個線程可以擁有這個變量的拷貝(這麼做的目的是能夠快速的執行,雖然變量分配的內存在共享內存中,
但是每個執行的線程還是可以擁有一份拷貝,這樣做的目的是加速程序的執行,這是現代多核處理器的一個顯著特性)。因此,程序在執行過程中,可能一個線程看
到的變量並不一定是最新的。
Volatile
Volatile關鍵字,就是告知任何對該變量的訪問均需要從共享內存中獲取,而對它的改變必須同步刷新會共享內存。
比如,表示一個程序是否運行的變量,boolean on = true,那麼可能是另一個線程來對它進行關閉動作,因此將其設置成為volatile boolean on,這樣就會再其他線程對它進行改變時,能夠讓原有的線程立刻感知到。
但是過多的使用volatile是不必要的,相反它會降低程序執行的效率。
Synchronized
同步,在帶來可見性的同時,它主要是對多個線程在同一個時刻,隻能有一個處於方法或者塊中。
可以通過將synchronized關鍵字加在方法前麵或者采用同步快的方式來進行表現:
01 |
static synchronized void m() {
|
02 |
System.out.println( "T" );
|
05 |
public static void main(String[] args) {
|
08 |
synchronized (Synchronized. class ) {
|
}
Java同步是針對普通的Java對象而言的,每個Java對象均有一把“鎖”,這個鎖在一個線程進入時會排斥其他線程進入,是一個排他鎖。通過javap來觀察字節碼,可以看到:
01 |
public static void main(java.lang.String[]); |
03 |
Stack=2, Locals=2, Args_size=1
|
04 |
0: invokestatic #31; //Method m:()V
|
05 |
3: ldc #1; //class com/murdock/books/multithread/example/Synchronized
|
09 |
8: invokestatic #31; //Method m:()V
|
當出現命令monitorenter時代獲得了該對象的鎖,當運行命令monitorexit時代表釋放了該對象的鎖。
同步化集合
同步化訪問
在Java的集合api中有非常多的同步集合,比如:Vector和Hashtable,這些集合的所有方法都是synchronized,也就是說對這些集合的訪問是同步的,但是如果每個接口都有一個專屬的同步集合實現是非常不現實的,因此用過使用Collections.synchronizedXxx方法,可以包裝一個同步的集合對象進行使用。
比如,摘自Collections
1 |
public static <T> List<T> synchronizedList(List<T> list) {
|
2 |
return (list instanceof RandomAccess ?
|
3 |
new SynchronizedRandomAccessList<T>(list) :
|
最後更新:2017-05-22 18:02:01