157
技術社區[雲棲]
java.util.concurrent解析——AbstractQueuedSynchronizer隊列管理
上一篇博客中,我們提到AQS
的隊列管理是基於CLH鎖隊列實現的,所以首先我們來看下CLH鎖隊列
。
1 CLH鎖隊列
CLH鎖隊列
本質上是一個基於鏈表的FIFO自旋鎖隊列,隊列中的每一個節點實質上是一個自旋鎖:在阻塞時不斷循環讀取狀態變量,當前驅節點釋放同步對象使用權後,跳出循環,執行同步代碼。其基本結構如下:
隊列中每一個節點有兩個成員:
- 節點狀態變量
- 前驅指針:pred
head,tail並不是實際節點,隻是為了表示隊列的首尾,被稱為dumb node。
在如此結構之下,其enqueue操作邏輯如下:
do { pred = tail;
} while(!tail.compareAndSet(pred, node));
其lock操作如下:
public void lock() {
final Node node = new Node();
node.locked = true;
// 一個CAS操作即可將當前線程對應的節點加入到隊列中,
// 並且同時獲得了前繼節點的引用,然後就是等待前繼釋放鎖
Node pred = this.tail.getAndSet(node);
this.prev.set(pred);
while (pred.locked) {// 進入自旋
}
}
可以看到其自旋邏輯。
而其dequeue操做更加簡單:
head = node;
從麵的操作,可以看到CLH鎖隊列
有如下優勢:
- 隊列的入列、出列操作原子性完成,無需加鎖,高效
- 判斷當前隊列等待是否為空同樣簡單,隻需檢查head是否為tail即可
- 每個節點獨立維護其狀態變量,避免了集中狀態管理的內存競爭
2 AQS
進程隊列
AQS
進程隊列相比於CLH鎖隊列
主要做了兩處修改:
- 每個節點新增一個next指針。由於
AQS
隊列中的進程不僅有自旋等待,還包括阻塞等待的情況。阻塞等待的隊列需要其他隊列主動喚醒。這就要求隊列中某個節點出列時需要顯式告知其後繼節點,因而需要加入next指針 - 節點狀態變量status由一個bit替換成一個int。這主要是由於
AQS
下的狀態更加複雜
首先來看下AQS隊列節點
的基本結構:
static final class Node {
// 表明節點是否以共享模式等待的標記
static final Node SHARED = new Node();
// 表明節點是否以獨占模式等待的標記
static final Node EXCLUSIVE = null;
// 表明線程已被取消
static final int CANCELLED = 1;
// 表明後續節點的線程需要unparking
static final int SIGNAL = -1;
// 表明線程正在等待一個條件
static final int CONDITION = -2;
// 表明下一次acquireShared應該無條件傳播
static final int PROPAGATE = -3;
/*
* 狀態字段,隻能取下麵的值:
* SIGNAL(-1): 這個結點的後繼是(或很快是)阻塞的(通過park),所以當前結點
* 必須unpark它的後繼,當它釋放或取消時。為了避免競爭,acquire方法必須
* 首先表明它們需要一個信號,然後再次嚐試原子性acquire,如果失敗了就阻塞。
*
* CANCELLED(1): 這個結點由於超時或中斷已被取消。結點從不離開這種狀態。尤其是,
* 這種狀態的線程從不再次阻塞。
*
* CONDITION(-2): 這個結點當前在一個條件隊列上。它將不會用於sync隊列的結點,
* 直到被轉移,在那時,結點的狀態將被設為0.
* 這個值在這裏的使用與其他字段的使用沒有關係,僅僅是簡化結構。
*
* PROPAGATE(-3): releaseShared應該傳遞給其他結點。這是在doReleaseShared裏設置
* (僅僅是頭結點)以確保傳遞繼續,即使其他操作有幹涉。
*
* 0: 非以上任何值。
*
* 值是組織為數字的用以簡化使用。非負值表示結點不需要信號。這樣,大部分代碼不需要
* 檢查特定的值,隻需要(檢查)符號。
*
* 對於普通同步結點,字段初始化為0;對於條件結點初始化為CONDITION(-2)。
* 通過CAS操作修改(或者,當允許時,用無條件volatile寫。)
*/
volatile int waitStatus;
/*
* 連接到當前結點/線程依賴的用來檢查等待狀態的前驅結點。
* 在進入隊列時賦值,隻在出隊列時置為空(為了GC考慮)。
* 根據前驅結點的取消,我們使查找一個非取消結點的while循環短路,這個總是會退出,
* 因為頭結點從不會是取消了的:一個結點成為頭隻能是一次成功的acquire操作結果。
*
* 一個取消了的線程從不會在獲取操作成功,線程隻能取消自己,不能是其他結點。
*/
volatile Node prev;
/*
* 連接到當前結點/線程釋放時解除阻塞的後續結點。
* 在入隊列時賦值,在繞過已取消前驅節點時調整,出隊列時置為空(for GC)。
* 入隊操作不會給前驅結點的next字段賦值,直到附件後(把新節點賦值給隊列的tail屬性?),
* 所以看到next字段為空不一定表示它就是隊列的尾結點。然而,如果next字段看起來是空,
* 我們可以從tail向前遍曆進行雙重檢查。
* 被取消了的結點的next字段被設置為指向它自己而不是空,這讓isOnSyncQueue變得容易。
*/
volatile Node next;
/*
* 列隊在這個結點的線程,在構造時初始化,用完後置空。
*/
volatile Thread thread;
/*
* 連接到下一個在條件上等待的結點或是特殊的值SHARED。
* 因為條件隊列隻在獨占模式下持有時訪問,我們隻需要一個簡單的鏈表隊列來持有在條件上等待的結點。
* 他們然後被轉移到隊列去re-acquire。
* 因為條件隻能是獨占的,我們通過用一個特殊的值來表明共享模式 來節省一個字段。
*/
Node nextWaiter;
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
接下來我們就來看下其主要操作的主要邏輯。
3 enqueue
由於AQS
隊列節點包括pred和next兩個指針,無法通過一次原子操作更新兩個指針。所以添加結點到隊列的操作最重要的是要保證:即使添加的CAS操作失敗了,也不能影響隊列結點現有的連接關係。
對於新加結點:
- 在CAS之前指向它的預期前驅
- CAS成功之後再更新預期前驅的後繼指針。
在步驟1成功之後、步驟2完成之前,其他線程通過結點的 “next” 連接可能看到“尾結點”(即代碼裏的 pred)的 “next” 為空,但其實隊列裏已經加入新的結點,這也是為什麼通過 “next” 連接遍曆隊列時碰到後繼為空的,必須從原子地更新的 “tail” 結點向後遍曆。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 嚐試enq的快速路徑;失敗後回退到完整的enq。
Node pred = tail;
if (pred != null) {
// 把新結點的前驅指向pred,必須在下麵的CAS完成之前設置,
// 這樣確保一旦CAS成功後,從tail向後遍曆是ok的。
node.prev = pred;// 步驟 1
if (compareAndSetTail(pred, node)) { //CAS
// 新節點成功進入隊列
// 前驅結點的next字段指向新結點,建立完整的連接。
pred.next = node; // 步驟 2
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 隊列是空,必須初始化。
if (compareAndSetHead(new Node())) // 原子地設置頭結點
tail = head; // 尾結點也指向頭結點
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 步驟 1
t.next = node; // 步驟 2
// 在把新結點設置為tail後才能更新前驅的next字段,這樣,即使CAS失敗了也不會影響原來的連接關係。
return t;
}
}
}
}
4 acquire
acquire方法不提供絕對公平的保證,因為現在在加入隊列之前先進行tryAcquire操作,如果這個線程先於頭結點鎖定,那麼頭結點就隻能繼續等待了。這種情形稱為闖入。
這個acquire之所以先嚐試獲取是為了在無競爭的情況下提高性能,並可以實現非公平的獲取。如果要保證絕對的公平性,則可以在子類實現的tryAcquire方法裏判斷當前線程是否是頭結點,是則嚐試獲取,不是則直接返回false。
// 以獨占模式獲取
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 首先嚐試獲取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 失敗後加入等待隊列,再從隊列裏再次嚐試獲取;成功獲取後才返回,
// 返回的boolean表示線程是否曾經被中斷。
// 在acquireQueued方法裏,線程可能被反複park、unpark,直到獲取鎖。
selfInterrupt(); // 重新設置中斷狀態位,是否執行取決於acquireQueued的返回值
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 線程是否曾被中斷是由這個變量記錄的。
for (;;) { // 死循環,用於acquire失敗後重試
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 前驅是頭結點,繼續嚐試獲取
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢測是否需要等待,如果需要,則park當前線程
// 隻有前驅在等待時才進入等待,否則繼續重試
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 線程進入等待,需要其他線程來喚醒這個線程以繼續執行
interrupted = true; // 隻要線程在等待過程中被中斷過一次就會記錄下來
}
} finally {
if (failed)
// acquire失敗,取消acquire
cancelAcquire(node);
}
}
/*
* 這個方法是信號控製的核心。檢查和更新沒有成功獲取的結點的狀態。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅結點也在等待,說明這是一個穩定的等待狀態。
return true ;
if (ws > 0) {
// 前驅結點已取消,向前遍曆直到找到一個非取消結點。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把找到的結點的後繼指向node,那麼當前pred與node之間的已取消結點就不再被引用了,可以被垃圾回收。
pred.next = node;
} else {
// 前驅的狀態必是 0 或 PROPAGATE之一。表明需要一個信號,但不park先。
// 調用者需要重試來確保它在park之前沒法獲取。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// park當前執行線程, 其他線程unpark這個線程後繼續執行
LockSupport.park( this);
return Thread.interrupted();
}
5 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* 如果status是負的(比如,可能需要信號)嚐試清除預期的信號。
* 如果這失敗了或status被其他等待線程修改也是沒關係的。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 準備unpark的線程在後繼裏持有,一般就是下一個結點。
* 但如果被取消或是空,從tail向後遍曆來找到實際的非取消後繼。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 沒有直接後繼或直接後繼不需要通知
s = null;
// 從tail向後遍曆,查找需要通知的結點
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到一個後不跳出循環是為了找到最老的需要通知的結點。
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 結點不為null,喚醒後繼的等待線程
LockSupport.unpark(s.thread);
}
最後更新:2017-07-26 09:04:20