LinkedBlockingQueue源碼解讀
Node LinkedBlockingQueue鏈表節點,單向節點
//Node節點類,單向節點
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
繼任節點
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
next為空意味當前節點為鏈尾
*/
Node<E> next;
Node(E x) { item = x; }
}
構造方法
//默認無界隊列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
LinkedBlockingQueue指定容量,鏈頭和鏈尾都非空對象但item為空
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
全局變量
/** The capacity bound, or Integer.MAX_VALUE if none */
//鏈表容量,默認Integer.MAX_VALUE,即無界隊列
private final int capacity;
/** Current number of elements */
//當前隊列元素總量
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list. 鏈頭
* Invariant: head.item == null //不變形:鏈頭元素永為空
*/
transient Node<E> head;
/**
* Tail of linked list. 鏈尾
* Invariant: last.next == null //不變形:鏈尾元素後再無元素
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
//拿鎖,在 take, poll等方法時會請求
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//隊列非空條件,以便通知隊列進行取元素
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//插入鎖,在 put, offer等方法時會請求
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//隊列非空條件,以便同意隊列進行插入元素
private final Condition notFull = putLock.newCondition();
signalNotEmpty鏈表非空,然後signal(通知) takeLock進行獲取元素
//僅在put/offer後調用
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//強製拿鎖
try {
notEmpty.signal();//觸發signal
} finally {
takeLock.unlock();
}
}
signalNotFull鏈表非滿,然後signal(通知) putLock進行插入元素
//僅在take/poll後調用
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
enqueue插入元素
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
//putLock必須獲取當前鎖
// assert last.next == null;
//隊列尾必須為空
//新node指向原隊列尾元素的next(鏈下個對象),然後再指向last(鏈尾)
last = last.next = node;
}
dequeue取元素
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
//當前線程必須持有takeLock
// assert head.item == null;
//當前鏈頭元素必須為空
//備份鏈頭
Node<E> h = head;
//備份鏈次元素(實際要取出的元素,定義為first)
Node<E> first = h.next;
h.next = h; // help GC //原鏈頭h已經無作用
head = first;//first指向head,first成為新鏈頭
E x = first.item;//取出目標元素
first.item = null;//置空(LinkBlockingQueue規範)
return x;
}
fullyLock&fullyUnlock全局拿鎖和放鎖
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
Collection插入到隊列
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);//默認無界隊列
//獲取pulLock,構造函數中pulLock從無競爭,但需要保證可見性
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
//不支持空對象
if (e == null)
throw new NullPointerException();
//不允許超過限度
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);//原子更新
} finally {
putLock.unlock();
}
}
put放入對象
public void put(E e) throws InterruptedException {
//不支持空對象
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
//預創建操作標記(-1即無操作)
int c = -1;
//創建節點
Node<E> node = new Node<E>(e);
//備份(副本)putLock和當前總量
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//請求putLock鎖(可打斷)
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//容量已滿則等待
while (count.get() == capacity) {
notFull.await();
}
//執行過插入至鏈尾
enqueue(node);
//原子自增一位,返回舊值
c = count.getAndIncrement();
//鏈表還沒有滿,通知其他線程執行插入
if (c + 1 < capacity)
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
if (c == 0) // 由於存在放鎖和拿鎖,這裏可能拿鎖一直在消費數據,count會變化。這裏的if條件表示如果隊列中還有1條數據
signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,可以進行消費
}
offer帶時間限製的插入,返回操作結果
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//嚴禁非空對象
if (e == null) throw new NullPointerException();
//獲取等待時間
long nanos = unit.toNanos(timeout);
//預創建操作標記(-1即無操作)
int c = -1;
//獲取putLock鎖和當前鏈表容量
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可打斷的請求鎖
putLock.lockInterruptibly();
try {
//如果已滿載,並且
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//等待nanos(毫秒)秒,期間收到signal則返回(nanos-等待時間),否則在等待結束後返回0或負數
//可打斷並返回InterruptedException
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));//插入元素
c = count.getAndIncrement();//獲取最新容量高
if (c + 1 < capacity)//未滿載則通知其他線程進行put/offer
notFull.signal();
} finally {
putLock.unlock();
}
//存在takeLock和putlock,takeLock可能在消費,count會變化,c == 0表示隊列有一條數據待消費
if (c == 0)//takeLock的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,可以進行消費
signalNotEmpty();
return true;
}
offer嚐試插入(一旦嚐試插入則一直等待直至成功)
public boolean offer(E e) {
//元素不能為空
if (e == null) throw new NullPointerException();
//當前鏈表容量
final AtomicInteger count = this.count;
//滿的話則返回false
if (count.get() == capacity)
return false;
//預創建操作標記(-1即無操作)
int c = -1;
Node<E> node = new Node<E>(e);
//獲取putlock
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//未滿載
if (count.get() < capacity) {
enqueue(node);//插入元素
c = count.getAndIncrement();
//隊列未滿則繼續通知插入
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)//存在takeLock和putlock,takeLock可能在消費,count會變化,c == 0表示隊列有一條數據待消費
signalNotEmpty();//takeLock的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,可以進行消費
return c >= 0;//鏈表有元素則表示插入成功
}
take取元素(可打斷)
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果當前鏈表空,則等待
while (count.get() == 0) {
notEmpty.await();
}
//取出首元素(first) E
x = dequeue();
c = count.getAndDecrement();//鏈表容量(原子)減一,並返回舊值
//鏈表非空則繼續通知其他線程來取
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//放鎖
}
//如果鏈滿則通知其他putLock等待的線程進行取元素
//意思是,takeLock和putLock同時進行時,putLock一直在放元素,true表示有一條線程在等待插入元素
if (c == capacity)
signalNotFull();
return x;
}
poll取元素,有時間
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;//定義待取得元素
int c = -1;//操作標記
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可打斷請求
try {
//如果鏈表無元素,則執行等待,直至耗時完畢
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();//取元素
c = count.getAndDecrement();//獲取原來鏈表長度然後長度減一
if (c > 1)//如果鏈長度大於1,證明還有鏈還有元素,通知其他等待的takeLock執行取元素
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)//原鏈長度達至最大長度,即在取完元素後還可以放一個元素,所以執行通知putLock進行放元素
signalNotFull();
return x;
}
poll取元素
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)//當前鏈無元素,直接返回
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//獲取鎖,否則一直等待
try {
if (count.get() > 0) {//獲取鎖成功,並鏈有元素
x = dequeue();//取元素
c = count.getAndDecrement();//獲取原鏈表長度然後實際長度減一
if (c > 1)//原鏈表長度還有元素則通知繼續進行通知其他線程取元素
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//原鏈表長度達至滿,則表示剛取完還可以放一個元素,所以執行通知
if (c == capacity)
signalNotFull();
return x;
}
peek取元素,但不拆鏈
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
unlink 拆鏈,將trail的下個元素p從鏈中拆除
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();//必須獲takeLock和putLock,合稱fullyLock();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;//p元素item置null
trail.next = p.next;//trail和(p.next)建立鏈關係
//如果原p就是尾元素,則置trail為尾元素
if (last == p)
last = trail;
//獲取原鏈長度並減一,原鏈長度等於限額則表示鏈未滿,則通知進行插入
if (count.getAndDecrement() == capacity)
notFull.signal();
}
remove拆除item所在的鏈
public boolean remove(Object o) {
//LinkedBlockingQueue允許null的item,除了head和last
if (o == null) return false;
fullyLock();//全局鎖
try {
//迭代鏈表,發現首元素則拆除鏈
//
for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
contains o是否存在鏈表中
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
//p為空則表示到達鏈尾,否則原本就是空鏈
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
toArray迭代元素返回數組
public Object[] toArray() {
fullyLock();//獲取全局鎖
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
toArray迭代元素,然後插入數組a
public <T> T[] toArray(T[] a) {
fullyLock();//獲取全鎖
try {
int size = count.get();//當前鏈表長度
if (a.length < size)//參數數組a長度小於當前鏈表長度,則進行擴容
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;//迭代下標
//迭代鏈表並且將item存入數組a
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
//如果參數數組a長度長於鏈長度,則a下標的元素置空,但(k+1)往後的元素呢?又不置空?
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}
toString
public String toString() {
fullyLock();//全局鎖
try {
Node<E> p = head.next;//獲取首元素,為空則直接返回[]
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;//獲取元素,然後組裝,如果item為當前鏈表則返回this Collection
sb.append(e == this ? "(this Collection)" : e);
p = p.next;//為空則到達鏈尾
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
}
clear清空鏈
public void clear() {
fullyLock();//全局鎖
try {
//1,取出實際頭元素p,備份原head至h 4,將p定於為新head
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;//2,原head的next指向原head,造成循壞鏈,並item為空
p.item = null;//3,實際頭元素item置空,help gc
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
drainTo轉換item至Collection
public int drainTo(Collection<? super E> c, int maxElements) {
//集合不能為空並不能為當前元素
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;//是否未滿,以通知來插入
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//請求tackLock
try {
//maxElements和count,取小的一方
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
//迭代鏈
while (i < n) {
//取出節點head次元素
Node<E> p = h.next;
//添加至collections
c.add(p.item);
//然後置空
p.item = null;
//原head自關聯
h.next = h;
//定義新head
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
//恢複鏈原有狀態,即使c.add()拋異常
if (i > 0) {//如果已經迭代過元素將迭代中的h作為新head
//assert h.item == null;
head = h;
//如果迭代完鏈的原長度的為capacity(鏈容量最大值)則表示此鏈沒有滿,通知putlock進行插入
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
Itr迭代器
private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
//構造函數
Itr() {
fullyLock();
try {//下個元素
current = head.next;
//下個元素item
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
//下個月元素是否為空
public boolean hasNext() {
return current != null;
}
/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
//如果自連鏈,則證明鏈頭(clear()和drainTo()回導致此情況)
if (s == p)
return head.next;
//否則返回p.next(s != null && s.item == null僅在鏈頭時發生)
if (s == null || s.item != null)
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
//hasNext已經判斷了不為空
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
//current == null表示已經達到鏈尾
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
//hasNext已經判斷了不為空
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//迭代鏈,找到p然後拆鏈
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
最後更新:2017-08-13 22:33:53