819
阿里云
技术社区[云栖]
Java并发编程【1.2时代】
本文介绍了Java原生的多线程技术(1.2),通过详细介绍wait和notify相关的机制、基础的多线程技术以及基于这些技术的等待超时、线程间的通信技术和线程池高阶技术,最后通过一个基于线程池的简单文本web服务器—MollyServer,来阐明多线程带来好处。通过介绍这些技术,展示了在没有使用Java并发包的时代(1.5-)是如何完成Java的多线程编程,为理解Java5提供了良好帮助。
线程简介
Java从诞生开始就明智的选择内置对多线程的支持,这将Java语言同其他同一时期的语言相比,具有明显优势。线程作为操作系统最小的调度单元,多个线程同时执行,将会改善我们的代码,在多核环境中具有更加明显的好处,但是过多的创建线程和对线程的不当管理也容易造成问题。
启动线程
构造线程
Java中启动线程必须要先行的构造一个Thread对象,然后调用这个对象的start方法。
02 |
this .daemon = parent.isDaemon();
|
03 |
this .priority = parent.getPriority();
|
04 |
this .name = name.toCharArray();
|
05 |
if (security == null || isCCLOverridden(parent.getClass()))
|
06 |
this .contextClassLoader = parent.getContextClassLoader();
|
08 |
this .contextClassLoader = parent.contextClassLoader;
|
09 |
this .inheritedAccessControlContext = AccessController.getContext();
|
11 |
setPriority(priority);
|
12 |
if (parent.inheritableThreadLocals != null )
|
13 |
this .inheritableThreadLocals =
|
14 |
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
|
15 |
/* Stash the specified stack size in case the VM cares */
|
16 |
this.stackSize = stackSize;
|
线程的构造,最主要或者说也就是线程对象的初始化过程,在上述过程中,一个新构造的线程对象是由其parent线程来进行分配空间的,而child线程继承了parent的是否Daemon,优先级和加载资源的classloader,栈空间的大小并且还会分配一个唯一的ID来标识这个child线程,至此一个能够运行的线程对象就初始化好了,在堆内存中等待着运行。
启动线程
调用Thread对象的start方法,就可启动一个新的线程,parent线程同步告知Java VM,只要线程规划器空闲,应立即启动这个线程。

而启动线程,也是交给操作系统来完成,这里就是一个本地方法了。
启动一个线程时,最好设置名称,这样在jstack分析时,就会好很多,自定义的线程最好能够起个名字。
05 |
public class ThreadName {
|
10 |
public static void main(String[] args) {
|
11 |
Thread t = new Thread( new Job());
|
12 |
t.setName( "ThreadNameJob" );
|
16 |
static class Job implements Runnable {
|
22 |
} catch (InterruptedException e) {
|
上述代码直接运行,可以通过jstack pid来观察栈信息,结果如下:
02 |
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode): |
04 |
"Attach Listener" daemon prio=10 tid=0x00007f4c38001000 nid=0x30b5 waiting on condition [0x0000000000000000] |
05 |
java.lang.Thread.State: RUNNABLE
|
07 |
"DestroyJavaVM" prio=10 tid=0x00007f4c60007800 nid=0x3086 waiting on condition [0x0000000000000000] |
08 |
java.lang.Thread.State: RUNNABLE
|
10 |
"ThreadNameJob" prio=10 tid=0x00007f4c600a2800 nid=0x3097 waiting on condition [0x00007f4c37cfb000] |
11 |
java.lang.Thread.State: TIMED_WAITING (sleeping)
|
12 |
at java.lang.Thread.sleep(Native Method)
|
13 |
at com.murdock.books.multithread.example.ThreadName$Job.run(ThreadName.java:26)
|
14 |
at java.lang.Thread.run(Thread.java:662)
|
16 |
"Low Memory Detector" daemon prio=10 tid=0x00007f4c60091800 nid=0x3095 runnable [0x0000000000000000] |
17 |
java.lang.Thread.State: RUNNABLE
|
19 |
"C2 CompilerThread1" daemon prio=10 tid=0x00007f4c6008f000 nid=0x3094 waiting on condition [0x0000000000000000] |
20 |
java.lang.Thread.State: RUNNABLE
|
22 |
"C2 CompilerThread0" daemon prio=10 tid=0x00007f4c6008c000 nid=0x3093 waiting on condition [0x0000000000000000] |
23 |
java.lang.Thread.State: RUNNABLE
|
25 |
"Signal Dispatcher" daemon prio=10 tid=0x00007f4c6008a000 nid=0x3092 runnable [0x0000000000000000] |
26 |
java.lang.Thread.State: RUNNABLE
|
28 |
"Finalizer" daemon prio=10 tid=0x00007f4c6006e000 nid=0x3091 in Object.wait() [0x00007f4c5c860000] |
29 |
java.lang.Thread.State: WAITING (on object monitor)
|
30 |
at java.lang.Object.wait(Native Method)
|
31 |
- waiting on <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock)
|
32 |
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
|
33 |
- locked <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock)
|
34 |
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
|
35 |
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
|
37 |
"Reference Handler" daemon prio=10 tid=0x00007f4c6006c000 nid=0x3090 in Object.wait() [0x00007f4c5c961000] |
38 |
java.lang.Thread.State: WAITING (on object monitor)
|
39 |
at java.lang.Object.wait(Native Method)
|
40 |
- waiting on <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock)
|
41 |
at java.lang.Object.wait(Object.java:485)
|
42 |
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
|
43 |
- locked <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock)
|
45 |
"VM Thread" prio=10 tid=0x00007f4c60065800 nid=0x308f runnable |
47 |
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4c6001a800 nid=0x3087 runnable |
49 |
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4c6001c800 nid=0x3088 runnable |
51 |
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4c6001e800 nid=0x3089 runnable |
53 |
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4c60020000 nid=0x308a runnable |
55 |
"VM Periodic Task Thread" prio=10 tid=0x00007f4c6009c000 nid=0x3096 waiting on condition |
57 |
JNI global references: 882 |
可以看到一个Java程序在运行时,后台创建了很多的线程,所以一个Java程序,纵使只有main,它也是多线程的,其中可以看到ThreadNameJob这个线程,也可以看到本地以吞吐量优先的ParallelGC的线程,它的数量默认是和CPU相同的,其中有4个对新生代进行GC的线程。
终止线程

线程从执行Runnalbe开始到结束。
理解中断
中断是一种状态,它使一个运行中的线程能够感知到其他线程对自身作出了中断操作,也就是影响到了自己。线程工作检查自身是否被中断来作出响应的行为。而该状态并没有维护在Thread中,是通过native方法获得。
可以通过当前线程对象的isInterrupted来判断是否被中断了。
05 |
public class Interrupted {
|
10 |
public static void main(String[] args) throws Exception {
|
11 |
InterruptedJob ij = new InterruptedJob();
|
12 |
ij.setName( "InterruptedJobThread " );
|
19 |
System.out.println( "INTERRUPTED IJ" );
|
24 |
static class InterruptedJob extends Thread {
|
31 |
} catch (InterruptedException e) {
|
32 |
System.out.println( "CURRENT INTERRUPT STATUS IS "
|
33 |
+ Thread.currentThread().getName()
|
34 |
+ Thread.currentThread().isInterrupted());
|
36 |
Thread.currentThread().interrupt();
|
38 |
System.out.println( "CURRENT INTERRUPT STATUS IS "
|
39 |
+ Thread.currentThread().getName()
|
40 |
+ Thread.currentThread().isInterrupted());
|
上述程序输出:
INTERRUPTED IJ
CURRENT INTERRUPT STATUS IS InterruptedJobThread false
CURRENT INTERRUPT STATUS IS InterruptedJobThread true
可以看出一旦抛出InterruptedException,当前线程的中断状态就被清除,但是也可以调用Thread.interrupted()来清除当前的中断状态。
线程属性

Java中创建的线程均会映射为操作系统层面的线程,在Java线程对象中有部分属性可以提供访问。线程状态是理解线程运行的关键。
线程优先级
02 |
class Thread implements Runnable {
|
03 |
/* Make sure registerNatives is the first thing <clinit> does. */
|
04 |
private static native void registerNatives();
|
可以看到priority,这个代表着优先级,优先级的范围从1到10,优先级高的线程占有CPU时间长一些,这当然是在长时间运行时体现出来的,但是不能做为程序执行的依据。
对priority可以通过对线程对象进行设置,使用setPriority来完成对线程优先级的设定。
下面的例子中,构建了三个不同的线程,它们的优先级不一样,从1到10,然后运行,优先级高的线程对times++执行的会多一些。
05 |
public class Priority {
|
06 |
private static CountDownLatch countDownLatch = new CountDownLatch( 10000000 );
|
08 |
private static CountDownLatch start = new CountDownLatch( 1 );
|
10 |
public static void main(String[] args) {
|
11 |
CountJob job1 = new CountJob();
|
12 |
Thread lingdao = new Thread(job1);
|
13 |
lingdao.setPriority( 10 );
|
16 |
CountJob job2 = new CountJob();
|
17 |
Thread pming = new Thread(job2);
|
21 |
CountJob job3 = new CountJob();
|
22 |
Thread zhongchan = new Thread(job3);
|
23 |
zhongchan.setPriority( 5 );
|
29 |
countDownLatch.await();
|
30 |
} catch (InterruptedException e) {
|
34 |
System.out.println( "lingdao : have " + job1.getTimes());
|
35 |
System.out.println( "pming : have" + job2.getTimes());
|
36 |
System.out.println( "zhongchan : have" + job3.getTimes());
|
40 |
static class CountJob implements Runnable {
|
42 |
private int times = 0 ;
|
49 |
} catch (InterruptedException e) {
|
53 |
while (countDownLatch.getCount() > 0 ) {
|
54 |
synchronized (CountJob. class ) {
|
55 |
if (countDownLatch.getCount() > 0 ) {
|
56 |
countDownLatch.countDown();
|
63 |
public int getTimes() {
|
执行结果如下:
lingdao : have 4347635
pming : have2661562
zhongchan : have2990803
每次执行的可能都不一样,但是总的趋势是高优先级的线程对CPU的占用时间会多一些。
线程状态
线程在运行的生命周期中可能处于下面的6种不同的状态,在一个时刻,线程可能处于CPU上处于运行,或者暂时的没有分配到CPU资源而处于就绪(准备运行),或者处于阻塞的状态。具体内容如下面的表格所示:
状态名称
|
阻塞
|
可以中断
|
说明
|
运行中 |
N |
N |
正在CPU上进行执行
|
准备运行(就绪) |
N |
N |
暂时的失去CPU资源处于就绪队列中,可能随时被线程调度器调度执行
|
休眠 |
Y |
Y |
让出CPU资源的就绪队列,等待一段时间后再次被放入队列,可以被中断提前进入就绪队列
|
等待 |
Y |
Y |
接受到通知或者等待超时会进入到就绪队列,可以被中断 |
阻塞于I/O
|
Y |
N |
I/O条件满足后,例如读入了一些字符,准备运行
|
阻塞于同步 |
Y |
N |
当获得同步锁后准备运行
|
可以使用如下状态迁移来描述线程的状态:

线程在一个时刻将会处于上述的三种状态之一,这个模型将有效的理解Java线程对象,但是其中处于等待状态的线程可能会在等待I/O和等待同步时无法被中断,虽然运行的线程已经被中断标识,但是不会像休眠和等待一样通过InterruptedException来直接返回。
10 |
public class ReadInterrupted {
|
15 |
public static void main(String[] args) {
|
16 |
// 使用父线程,也就是main-thread
|
17 |
Thread thread = new Thread( new InterruptedJob(Thread.currentThread()));
|
20 |
InputStream is = System.in;
|
23 |
} catch (IOException e) {
|
27 |
System.out.println( "Main Thread is interrupted ? " + Thread.currentThread().isInterrupted());
|
30 |
static class InterruptedJob implements Runnable {
|
32 |
Thread interruptedThread;
|
34 |
public InterruptedJob(Thread thread) {
|
35 |
this .interruptedThread = thread;
|
42 |
} catch (InterruptedException e) {
|
46 |
interruptedThread.interrupt();
|
运行的结果是:
这时整个线程挂在is.read上,这时随意从控制台输入一个字符,主线程退出:
123
Main Thread is interrupted ? true
可以看出对阻塞于同步I/O的线程被中断后,中断标识被打上,但是不会抛出异常退出。
线程规划
对高I/O的线程尽量给予高优先级的设定,对于低I/O以CPU运算为主的线程尽量降低优先级,避免过多的占用CPU。因此,不能依据线程优先级的高低来运行程序,需要保证每个线程都有运行的机会。
并发访问对象

Java支
持多个线程同时的访问一个对象,或者对象的变量,由于每个线程可以拥有这个变量的拷贝(这么做的目的是能够快速的执行,虽然变量分配的内存在共享内存中,
但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显着特性)。因此,程序在执行过程中,可能一个线程看
到的变量并不一定是最新的。
Volatile
Volatile关键字,就是告知任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新会共享内存。
比如,表示一个程序是否运行的变量,boolean on = true,那么可能是另一个线程来对它进行关闭动作,因此将其设置成为volatile boolean on,这样就会再其他线程对它进行改变时,能够让原有的线程立刻感知到。
但是过多的使用volatile是不必要的,相反它会降低程序执行的效率。
Synchronized
同步,在带来可见性的同时,它主要是对多个线程在同一个时刻,只能有一个处于方法或者块中。
可以通过将synchronized关键字加在方法前面或者采用同步快的方式来进行表现:
01 |
static synchronized void m() {
|
02 |
System.out.println( "T" );
|
05 |
public static void main(String[] args) {
|
08 |
synchronized (Synchronized. class ) {
|
}
Java同步是针对普通的Java对象而言的,每个Java对象均有一把“锁”,这个锁在一个线程进入时会排斥其他线程进入,是一个排他锁。通过javap来观察字节码,可以看到:
01 |
public static void main(java.lang.String[]); |
03 |
Stack=2, Locals=2, Args_size=1
|
04 |
0: invokestatic #31; //Method m:()V
|
05 |
3: ldc #1; //class com/murdock/books/multithread/example/Synchronized
|
09 |
8: invokestatic #31; //Method m:()V
|
当出现命令monitorenter时代获得了该对象的锁,当运行命令monitorexit时代表释放了该对象的锁。
同步化集合
同步化访问
在Java的集合api中有非常多的同步集合,比如:Vector和Hashtable,这些集合的所有方法都是synchronized,也就是说对这些集合的访问是同步的,但是如果每个接口都有一个专属的同步集合实现是非常不现实的,因此用过使用Collections.synchronizedXxx方法,可以包装一个同步的集合对象进行使用。
比如,摘自Collections
1 |
public static <T> List<T> synchronizedList(List<T> list) {
|
2 |
return (list instanceof RandomAccess ?
|
3 |
new SynchronizedRandomAccessList<T>(list) :
|
最后更新:2017-05-22 18:02:01