閱讀157 返回首頁    go 技術社區[雲棲]


java.util.concurrent解析——AbstractQueuedSynchronizer隊列管理

上一篇博客中,我們提到AQS的隊列管理是基於CLH鎖隊列實現的,所以首先我們來看下CLH鎖隊列

1 CLH鎖隊列

CLH鎖隊列本質上是一個基於鏈表的FIFO自旋鎖隊列,隊列中的每一個節點實質上是一個自旋鎖:在阻塞時不斷循環讀取狀態變量,當前驅節點釋放同步對象使用權後,跳出循環,執行同步代碼。其基本結構如下:
這裏寫圖片描述

隊列中每一個節點有兩個成員:

  • 節點狀態變量
  • 前驅指針:pred

head,tail並不是實際節點,隻是為了表示隊列的首尾,被稱為dumb node。

在如此結構之下,其enqueue操作邏輯如下:

do { pred = tail;
} while(!tail.compareAndSet(pred, node));

其lock操作如下:

public void lock() {
  final Node node = new Node();
  node.locked = true;
  // 一個CAS操作即可將當前線程對應的節點加入到隊列中,
  // 並且同時獲得了前繼節點的引用,然後就是等待前繼釋放鎖
  Node pred = this.tail.getAndSet(node);
  this.prev.set(pred);
  while (pred.locked) {// 進入自旋
  }
}

可以看到其自旋邏輯。

而其dequeue操做更加簡單:

head = node;

從麵的操作,可以看到CLH鎖隊列有如下優勢:

  • 隊列的入列、出列操作原子性完成,無需加鎖,高效
  • 判斷當前隊列等待是否為空同樣簡單,隻需檢查head是否為tail即可
  • 每個節點獨立維護其狀態變量,避免了集中狀態管理的內存競爭

2 AQS進程隊列

AQS進程隊列相比於CLH鎖隊列主要做了兩處修改:

  • 每個節點新增一個next指針。由於AQS隊列中的進程不僅有自旋等待,還包括阻塞等待的情況。阻塞等待的隊列需要其他隊列主動喚醒。這就要求隊列中某個節點出列時需要顯式告知其後繼節點,因而需要加入next指針
  • 節點狀態變量status由一個bit替換成一個int。這主要是由於AQS下的狀態更加複雜

首先來看下AQS隊列節點的基本結構:

static final class Node {
     // 表明節點是否以共享模式等待的標記
    static final Node SHARED = new Node();

    // 表明節點是否以獨占模式等待的標記
    static final Node EXCLUSIVE = null;

    // 表明線程已被取消
    static final int CANCELLED =  1;

    // 表明後續節點的線程需要unparking
    static final int SIGNAL    = -1;

    // 表明線程正在等待一個條件
    static final int CONDITION = -2;

    // 表明下一次acquireShared應該無條件傳播
    static final int PROPAGATE = -3;

    /*
     * 狀態字段,隻能取下麵的值:
     * SIGNAL(-1):    這個結點的後繼是(或很快是)阻塞的(通過park),所以當前結點
     *              必須unpark它的後繼,當它釋放或取消時。為了避免競爭,acquire方法必須
     *              首先表明它們需要一個信號,然後再次嚐試原子性acquire,如果失敗了就阻塞。
     *               
     * CANCELLED(1):  這個結點由於超時或中斷已被取消。結點從不離開這種狀態。尤其是,
     *                 這種狀態的線程從不再次阻塞。
     *
     * CONDITION(-2): 這個結點當前在一個條件隊列上。它將不會用於sync隊列的結點,
     *               直到被轉移,在那時,結點的狀態將被設為0.
     *              這個值在這裏的使用與其他字段的使用沒有關係,僅僅是簡化結構。
     *               
     * PROPAGATE(-3): releaseShared應該傳遞給其他結點。這是在doReleaseShared裏設置
     *                 (僅僅是頭結點)以確保傳遞繼續,即使其他操作有幹涉。
     *
     * 0:             非以上任何值。
     *
     * 值是組織為數字的用以簡化使用。非負值表示結點不需要信號。這樣,大部分代碼不需要
     * 檢查特定的值,隻需要(檢查)符號。
     *
     * 對於普通同步結點,字段初始化為0;對於條件結點初始化為CONDITION(-2)。
     * 通過CAS操作修改(或者,當允許時,用無條件volatile寫。)
     */
    volatile int waitStatus;

    /*
     * 連接到當前結點/線程依賴的用來檢查等待狀態的前驅結點。
     * 在進入隊列時賦值,隻在出隊列時置為空(為了GC考慮)。
     * 根據前驅結點的取消,我們使查找一個非取消結點的while循環短路,這個總是會退出,
     * 因為頭結點從不會是取消了的:一個結點成為頭隻能是一次成功的acquire操作結果。
     *
     * 一個取消了的線程從不會在獲取操作成功,線程隻能取消自己,不能是其他結點。
     */
    volatile Node prev;

    /*
     * 連接到當前結點/線程釋放時解除阻塞的後續結點。
     * 在入隊列時賦值,在繞過已取消前驅節點時調整,出隊列時置為空(for GC)。
     * 入隊操作不會給前驅結點的next字段賦值,直到附件後(把新節點賦值給隊列的tail屬性?),
     * 所以看到next字段為空不一定表示它就是隊列的尾結點。然而,如果next字段看起來是空,
     * 我們可以從tail向前遍曆進行雙重檢查。
     * 被取消了的結點的next字段被設置為指向它自己而不是空,這讓isOnSyncQueue變得容易。
     */
    volatile Node next;

    /*
     * 列隊在這個結點的線程,在構造時初始化,用完後置空。
     */
    volatile Thread thread;

    /*
     * 連接到下一個在條件上等待的結點或是特殊的值SHARED。
     * 因為條件隊列隻在獨占模式下持有時訪問,我們隻需要一個簡單的鏈表隊列來持有在條件上等待的結點。
     * 他們然後被轉移到隊列去re-acquire。
     * 因為條件隻能是獨占的,我們通過用一個特殊的值來表明共享模式 來節省一個字段。
     */
    Node nextWaiter;

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

接下來我們就來看下其主要操作的主要邏輯。

3 enqueue

由於AQS隊列節點包括pred和next兩個指針,無法通過一次原子操作更新兩個指針。所以添加結點到隊列的操作最重要的是要保證:即使添加的CAS操作失敗了,也不能影響隊列結點現有的連接關係。

對於新加結點:

  • 在CAS之前指向它的預期前驅
  • CAS成功之後再更新預期前驅的後繼指針。

在步驟1成功之後、步驟2完成之前,其他線程通過結點的 “next” 連接可能看到“尾結點”(即代碼裏的 pred)的 “next” 為空,但其實隊列裏已經加入新的結點,這也是為什麼通過 “next” 連接遍曆隊列時碰到後繼為空的,必須從原子地更新的 “tail” 結點向後遍曆。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
   // 嚐試enq的快速路徑;失敗後回退到完整的enq。
    Node pred = tail;
    if (pred != null) {
      // 把新結點的前驅指向pred,必須在下麵的CAS完成之前設置,
      // 這樣確保一旦CAS成功後,從tail向後遍曆是ok的。
        node.prev = pred;// 步驟 1
        if (compareAndSetTail(pred, node)) {  //CAS
             // 新節點成功進入隊列
             // 前驅結點的next字段指向新結點,建立完整的連接。
            pred.next = node; // 步驟 2
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列是空,必須初始化。
            if (compareAndSetHead(new Node())) // 原子地設置頭結點
                tail = head; // 尾結點也指向頭結點
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {  // 步驟 1
                t.next = node; // 步驟 2
 // 在把新結點設置為tail後才能更新前驅的next字段,這樣,即使CAS失敗了也不會影響原來的連接關係。
                return t;
            }
        }
    }
}

4 acquire

acquire方法不提供絕對公平的保證,因為現在在加入隊列之前先進行tryAcquire操作,如果這個線程先於頭結點鎖定,那麼頭結點就隻能繼續等待了。這種情形稱為闖入。

這個acquire之所以先嚐試獲取是為了在無競爭的情況下提高性能,並可以實現非公平的獲取。如果要保證絕對的公平性,則可以在子類實現的tryAcquire方法裏判斷當前線程是否是頭結點,是則嚐試獲取,不是則直接返回false。

// 以獨占模式獲取
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 首先嚐試獲取
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      // 失敗後加入等待隊列,再從隊列裏再次嚐試獲取;成功獲取後才返回,
      // 返回的boolean表示線程是否曾經被中斷。

      // 在acquireQueued方法裏,線程可能被反複park、unpark,直到獲取鎖。
      selfInterrupt(); // 重新設置中斷狀態位,是否執行取決於acquireQueued的返回值
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 線程是否曾被中斷是由這個變量記錄的。
        for (;;) { // 死循環,用於acquire失敗後重試
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// 前驅是頭結點,繼續嚐試獲取
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 檢測是否需要等待,如果需要,則park當前線程
            // 隻有前驅在等待時才進入等待,否則繼續重試
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 線程進入等待,需要其他線程來喚醒這個線程以繼續執行
                interrupted = true;   // 隻要線程在等待過程中被中斷過一次就會記錄下來
        }
    } finally {
        if (failed)
             // acquire失敗,取消acquire
            cancelAcquire(node);
    }
}

/*
 * 這個方法是信號控製的核心。檢查和更新沒有成功獲取的結點的狀態。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      // 前驅結點也在等待,說明這是一個穩定的等待狀態。
        return true ;
    if (ws > 0) {
      // 前驅結點已取消,向前遍曆直到找到一個非取消結點。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 把找到的結點的後繼指向node,那麼當前pred與node之間的已取消結點就不再被引用了,可以被垃圾回收。
        pred.next = node;
    } else {
      // 前驅的狀態必是 0 或 PROPAGATE之一。表明需要一個信號,但不park先。
      // 調用者需要重試來確保它在park之前沒法獲取。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
       // park當前執行線程, 其他線程unpark這個線程後繼續執行
    LockSupport.park( this);
    return Thread.interrupted();
}

5 release

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 如果status是負的(比如,可能需要信號)嚐試清除預期的信號。
     * 如果這失敗了或status被其他等待線程修改也是沒關係的。
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 準備unpark的線程在後繼裏持有,一般就是下一個結點。
     * 但如果被取消或是空,從tail向後遍曆來找到實際的非取消後繼。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      // 沒有直接後繼或直接後繼不需要通知
        s = null;

        // 從tail向後遍曆,查找需要通知的結點
        for (Node t = tail; t != null && t != node; t = t.prev)
             // 找到一個後不跳出循環是為了找到最老的需要通知的結點。
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 結點不為null,喚醒後繼的等待線程
        LockSupport.unpark(s.thread);
}

最後更新:2017-07-26 09:04:20

  上一篇:go  Java並發——核心理論
  下一篇:go  java.util.concurrent解析——AbstractQueuedSynchronizer綜述