JDK 1.8 ArrayBlockingQueue源碼解讀(不含迭代器)
全局變量
/** The queued items */
//存放元素的數組
final Object[] items;
/** items index for next take, poll, peek or remove */
//下次拿元素的下標 take, poll, peek or remove中使用
int takeIndex;
/** items index for next put, offer, or add */
//下次放元素的下標 put, offer, or add中使用
int putIndex;
/** Number of elements in the queue */
//隊列元素總數
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
//隊列鎖
final ReentrantLock lock;
/** Condition for waiting takes */
//非空條件
private final Condition notEmpty;
/** Condition for waiting puts */
//非滿條件
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
//迭代器
transient Itrs itrs = null
構造函數
//初始化容量限額和默認非公平鎖
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
構造函數
//初始化容量限額和默認非公平鎖,和將Collection中插入ArrayBlockingQueue中
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);//元素非空,否則NullPointerException
items[i++] = e;
}
//如果Collection.size() > capacity會報下標越界
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
dec循環減一
//如果i是0則當前數組長度減1,否則i減1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
itemAt返回指定下標元素
final E itemAt(int i) {
return (E) items[i];
}
enqueue插入元素
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
//當前線程隻獲取一次鎖
// assert items[putIndex] == null;
//下標takeIndex所在的元素為空
final Object[] items = this.items;
//插入元素
items[putIndex] = x;
//如果數組已經滿了,則下次插入到數組首位
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
dequeue取出元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
//當前線程隻獲取一次鎖
// assert items[takeIndex] != null;
//下標takeIndex所在的元素不為空
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//取出元素
E x = (E) items[takeIndex];
//置空
items[takeIndex] = null;
//當前數組取元素已經至數組尾,則下次從頭再取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//迭代器相關,後麵補
if (itrs != null)
itrs.elementDequeued();
//通知notFull await的線程進行取元素
notFull.signal();
return x;
}
removeAt移除指定下標元素
/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
//當前線程僅持鎖1次,即並無重入
// assert items[removeIndex] != null;
//下標removeIndex的元素不能為空
// assert removeIndex >= 0 && removeIndex < items.length;
//下標removeIndex不能越界
//獲取當前列表
final Object[] items = this.items;
//待移除下標等於待取元素下標
if (removeIndex == takeIndex) {
// removing front item; just advance
//直接置空
items[takeIndex] = null;
//當前數組取元素已經至數組尾,則下次從頭再取
if (++takeIndex == items.length)
takeIndex = 0;
count--;//數組長度減一
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
//否則,這是個內部移除
// slide over all others up through putIndex.
//通過putIndex偏移所有元素
final int putIndex = this.putIndex;
//開始迭代
for (int i = removeIndex;;) {
int next = i + 1;
//如果迭代至尾,則從頭再來
if (next == items.length)
next = 0;
//下個操作下標不等於待插入下標
if (next != putIndex) {
//則next元素向左移一位
items[i] = items[next];
//備份最新removeIndex為i
i = next;
} else {
////下個操作下標等於待插入下標,即i(removeIndex)置空
items[i] = null;
this.putIndex = i;//下次插入元素下標為i(removeIndex),跳出循環
break;
}
}
count--;//數組長度減一
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
add直接添加元素,失敗則拋異常高
//
public boolean add(E e) {
return super.add(e);
//super.add(e);如下,如果插入失敗則拋異常提示數組已滿
/*
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
*/
}
offer添加元素(直接返回結果)
//
public boolean offer(E e) {
checkNotNull(e);//元素必須非空
final ReentrantLock lock = this.lock;
lock.lock();
try {
//已滿則直接返回false
if (count == items.length)
return false;
else {
//否則執行插入
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
put添加元素(可中斷)
/**
public void put(E e) throws InterruptedException {
checkNotNull(e);//插入元素非空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//已滿則等待並釋放鎖
while (count == items.length)
notFull.await();
enqueue(e);//插入元素
} finally {
lock.unlock();
}
}
offer添加元素(帶時間限製,可中斷)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);//必須非空
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果容量已滿則等待
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
offer添加元素(帶時間限製,可中斷)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);//必須非空
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//等待獲取鎖,可中斷
try {
//如果容量已滿則等待
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入
return true;
} finally {
lock.unlock();
}
}
poll取元素(會一直等待)
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
take取元素(會一直等待,可中斷,無元素則一直等待)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
poll取元素(取鎖會一直等待(可中斷),取鎖後無元素會限時等待)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
peek取元素(取鎖會一直等待,如果隊列為空則返回null)
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
獲取size(取鎖會一直等待)
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
remainingCapacity獲取剩餘容量(取鎖會一直等待)
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
remove移除元素(鎖一直等待)
public boolean remove(Object o) {
//null元素直接返回
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//鎖一直等待
try {
if (count > 0) {
//獲取放下標putIndex
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//如果獲取下標是此元素則直接移除並返回結果
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//否則i加1,然後判斷是否達至隊列尾,是則重頭再來
if (++i == items.length)
i = 0;
} while (i != putIndex);//迭代條件為取下標takeIndex不等於putIndex
}
return false;
} finally {
lock.unlock();
}
}
contains是否包含此元素
public boolean contains(Object o) {
//非空約束
if (o == null) return false;
//備份數組(為什麼不是先等到lock,在再備份?)
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//從取下標開始迭代
if (o.equals(items[i]))
return true;
//達到數組尾則從頭再來
if (++i == items.length)
i = 0;
//取下標等於放下標時,表示數組已經迭代完成
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
toArray,ArrayBlockingQueue轉數組
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
//ArrayBlockingQueue表示取下標在數組中間
/*null null null 3 4 5 null null null
*null 1 2 3 4
*null null 2 3 4 5
*/
if (count <= n)
//從takeIndex拷貝count位數然後複製到a數組中(a是從頭開始複製至count)
System.arraycopy(items, takeIndex, a, 0, count);
else {
//表示取下標為0
//0 null null null 4 5
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}
toArray,ArrayBlockingQueue轉數組
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
//參數數組a長度比ArrayBlockingQueue斷則擴容
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
//ArrayBlockingQueue表示取下標在數組中間
/*null null null 3 4 5 null null null
*null 1 2 3 4
*null null 2 3 4 5
*/
if (count <= n)
//從takeIndex拷貝count位數然後複製到a數組中(a是從頭開始複製至count)
System.arraycopy(items, takeIndex, a, 0, count);
else {
//表示取下標為0
//0 null null null 4 5
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count)
//參數數組a下標null置空,然後count+1以後的就不管了?
a[count] = null;
} finally {
lock.unlock();
}
return a;
}
toString
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//空數組返回[]
int k = count;
if (k == 0)
return "[]";
final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
//迭代結束,直接返回
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
//到數組尾則從頭再來
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
clear清空ArrayBlockingQueue
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//ArrayBlockingQueue非空
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//從取下標開始迭代
do {
items[i] = null;
//達到數組尾則從來再來
if (++i == items.length)
i = 0;
//putIndex == takeIndex 表示數組已經遍曆完
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
//ArrayBlockingQueue非空並且有待插入的線程等待中,則通知
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
ArrayBlockingQueue元素遷移至Collection
public int drainTo(Collection<? super E> c, int maxElements){
//Collection非空,且不是本身
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
//maxElements再多遷移對象不能小於0
if (maxElements <= 0)
return 0;
//
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//maxElements,count兩者取小
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
//從takeIndex開始取元素
E x = (E) items[take];
c.add(x);
//添加完以後置空
items[take] = null;
//取下標自增一位,若果已達至數組尾則從頭再取
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
//i>0證明有取過元素
if (i > 0) {
//從新統計ArrayBlockingQueue總數
count -= i;
//更新取下標
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
//ArrayBlockingQueue非空並且有待插入的線程等待中,則通知
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
最後更新:2017-08-13 22:33:51