雙隊列的一種實現
介紹
雙隊列是一種高效的內存數據結構,在多線程編程中,能保證生產者線程的寫入和消費者的讀出盡量做到最低的影響,避免了共享隊列的鎖開銷。本文將介紹一種雙隊列的設計,並給出實現代碼,然後會舉例使用的場景。該雙隊列在項目中使用,性能也得到了驗證。
設計
接下來具體介紹雙隊列的設計,並且會粘貼少量方法代碼,幫助介紹。
本文中講述的雙隊列,本質上是兩個數組保存寫入的Object,一個數組負責寫入,另一個被消費者讀出,兩個數組都對應一個重入鎖。數組內寫入的數據會被計數。
public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
BlockingQueue<T>, java.io.Serializable {
private static final long serialVersionUID = 1L;
private static int default_line_limit = 1000;
private static long max_cache_size = 67108864L;
private int lineLimit;
private long cacheSize;
private T[] itemsA;
private T[] itemsB;
private ReentrantLock readLock, writeLock;
private Condition notFull;
private Condition awake;
/**
* writeArray : in reader's eyes, reader get data from data source and write
* data to this line array. readArray : in writer's eyes, writer put data to
* data destination from this line array.
*
* Because of this is doubleQueue mechanism, the two line will exchange when
* time is suitable.
*
*/
private T[] writeArray, readArray;
private volatile int writeCount, readCount;
private int writeArrayTP, readArrayHP;
private volatile boolean closed = false;
private int spillSize = 0;
private long lineRx = 0;
private long lineTx = 0;
隊列實現了阻塞隊列的接口,所以在向隊列offer數據的時候是阻塞的,同樣,取出操作poll也會阻塞。兩個數組會在適當的時候進行queueSwitch操作。queueSwitch的條件就是當讀者把queue讀空了之後,且寫入的queue此時不為空的時候,兩個queue就會進行交換。在交換的時候,寫入queue會被上鎖,此時生產者不能讓隊列裏寫入數據。一般情況下,queue互換其實就是兩個數組的引用互換,將相應的計數器也重置,寫隊列的計數器此時就清零了,因為queue交換是因為讀隊列已經被讀空。
private long queueSwitch(long timeout, boolean isInfinite)
throws InterruptedException {
System.out.println("queue switch");
writeLock.lock();
try {
if (writeCount <= 0) {
if (closed) {
return -2;
}
try {
if (isInfinite && timeout <= 0) {
awake.await();
return -1;
} else {
return awake.awaitNanos(timeout);
}
} catch (InterruptedException ie) {
awake.signal();
throw ie;
}
} else {
T[] tmpArray = readArray;
readArray = writeArray;
writeArray = tmpArray;
readCount = writeCount;
readArrayHP = 0;
writeCount = 0;
writeArrayTP = 0;
notFull.signal();
// logger.debug("Queue switch successfully!");
return -1;
}
} finally {
writeLock.unlock();
}
}
上麵queue交換的時候,可以看到當要被交換的寫隊列也已經為空的時候,會做一次檢查。如果此時queue已經被顯示地關閉了,那麼poll操作就會返回空,讀者此時應該檢查queue是否已經被closed了,若已經closed了,那麼讀者已經把queue裏的數據讀完了。這裏的顯示close是我們給雙隊列加的一個狀態,close這件事的作用是為了讓讀者知道:生產者已經停止往queue裏寫新數據了,但是queue裏其實可能還有未取完的數據(在寫queue裏,此時還差一次queue switch),你往queue poll取數據的時候,如果取到空了,那麼應該做一次check,如果queue已經關閉了,那麼讀者就知道本次讀的任務完全結束了。反過來,close狀態其實不影響寫,生產者如果還想寫的話,其實也是可以的,但是我不推薦這麼做。
public void close() {
writeLock.lock();
try {
closed = true;
//System.out.println(this);
awake.signalAll();
} finally {
writeLock.unlock();
}
}
如果沒有這個close標誌位的話,可能就需要消費者放入一個EOF讓讀者知道。這在隻有一個生產者和一個消費者的情況下是可行的,但是如果是一個多對一,一對多,甚至多對多的情況呢?一對一的情況是最簡單的,也是雙隊列被創造出來最合適的場景。因為雙隊列完全分離了一個生產者和一個消費者的鎖爭搶情況,各自隻要獲得自己的讀/寫隊列的鎖就可以了。在本文闡述的雙隊列中,唯一產生一些開銷的就是queue swtich的情況,如果queue頻繁交換的話,還是會產生一些性能開銷的。

一對多
上麵已經大致介紹了雙隊列的讀寫。在實際項目中,一對多的場景需要注意的地方有兩:
- 單個生產者需要在結束的時候關閉queue
- 多個消費者需要知道任務結束(知道其他線程已經完成任務)

消費者之間或者外部有一方需要知道各個消費者線程的存活情況,這樣才能知道本次任務完成。比如如果外麵有一個上帝的話,可以加一個CountDownLatch計數,每個消費者完成後就countDown一次,外部調用await()直到大家都已經退出,那麼整個任務結束。如果沒有上帝,線程之間互相知道對方情況的話,我的做法是讓生產者放入一個EOF,當某線程取到EOF的時候,他知道自己是第一個遇到盡頭的人,他會置一個布爾,而其他線程在取到空的時候會檢查該布爾值,這樣就能知道是否已經有小夥伴已經拿到EOF了,那麼這時候就可以countDown了,而拿到EOF的線程進程countDown後就await(),最後退出。
下麵是我自己針對這種場景,使用雙隊列的方式,其中的fromQueue是一個ConcurrentLinkedQueue,大家可以忽略,toQueue是雙隊列,可以注意一下用法。特別是往裏麵寫的時候,需要while循環重試直到寫入成功。
@Override
public void run() {
long start = System.currentTimeMillis();
log.debug(Thread.currentThread() + " Unpacker started at " + start);
Random r = new Random(start);
Bundle bundle = null;
boolean shoudShutdown = false;
try {
while(!shoudShutdown) {
bundle = (Bundle) fromQueue.poll();
if (bundle == null) {
if (seeEOF.get()) {
// 當取到空,並且其他線程已經取到EOF,那麼本線程將Latch減1,並退出循環
latch.countDown();
shoudShutdown = true;
} else {
// 如果EOF還沒被取到,本線程小睡一會後繼續取
try {
sleep(r.nextInt(10));
} catch (InterruptedException e) {
log.error("Interrupted when taking a nap", e);
}
}
} else if (!bundle.isEof()) {
// bundle非空且非EOF,則往雙隊列寫入一個Bundle
byte[] lineBytes = BundleUtil.getDecompressedData(bundle);
// 放入雙隊列時,若offer失敗則重試
while (!toQueue.offer(new UnCompressedBundle(bundle.getId(), ByteUtil.bytes2Lines(lineBytes, lineDelim), bundle.getIndex(), bundle.getJobId()))) {
log.info("Unpacker put failed, will retry");
}
log.info("After enqueue, queue size is " + toQueue.size());
} else {
// Unpacker獲得到了EOF
seeEOF.set(true);
// 自己將Lacth減1,並等待其他線程退出
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
log.error("Interrupted when waiting the latch ");
}
// 其他線程已經退出,本線程放入EOF
while (!toQueue.offer(new UnCompressedBundle(-1L, new Line[0], -1L, -1L))) {
log.info("Unpacker put EOF failed, will retry");
}
// 關閉Queue
toQueue.close();
// 退出循環
shoudShutdown = true;
}
}
log.debug(Thread.currentThread() + " Unpacker finished in " + (System.currentTimeMillis()-start) + " ms");
} catch (Exception e) {
log.error("Exception when unpacker is running ", e);
// 將latch減1,表示自己異常退出,且不再工作
// latch.countDown();
log.debug(Thread.currentThread() + " Unpacker occured exception and stopped. ");
} finally {
}
}
多對一
多個生產者的情況下,寫入隊列無可避免發送鎖爭搶,但是能保證消費者的穩定讀出過程。沒有什麼特殊處理的地方,這裏就不囉嗦了。
總結分析
本文介紹了一種經典雙隊列的設計和實現,也給出了一些代碼演示。文章末尾我會貼出整個雙隊列的代碼實現,需要的同學也可以留言,我把.java發給你。如果使用的時候有發現問題,不吝賜教,這個雙隊列的實現也還不是很完美。使用的時候也存在需要注意的地方。
其實雙隊列的目的還是在於讓寫和讀互相沒有影響,而且更加照顧了寫的速度。因為一般寫的速度可能會比較快,而讀的人讀出之後還會做一些額外的處理,所以寫的這一方借助雙隊列,可以持續寫的過程,而且如果讀的一方慢的話,可以多起幾個消費者線程,就像"一對多"場景裏闡述的那樣來使用雙隊列。
下麵是整個實現。各位可以仔細看看,發現問題一定記得通知我 :)
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import lombok.ToString;
import lombok.extern.log4j.Log4j;
/**
* Represents a region with two swap spaces, one for storing data which from
* data source, the other one for storing data which will be transferred to data
* destination.
* <br>
* A classical DoubleCachedQueue, In beginning, space A and space B both
* empty, then loading task begin to load data to space A, when A is almost
* full, let the data from data source being loaded to space B, then dumping
* task begin to dump data from space A to data source. When space A is empty,
* switch the two spaces for load and dump task. Repeat the above operation.
*
*/
@Log4j
@ToString
public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
BlockingQueue<T>, java.io.Serializable {
private static final long serialVersionUID = 1L;
private static int default_line_limit = 1000;
private static long max_cache_size = 67108864L;
private int lineLimit;
private long cacheSize;
private T[] itemsA;
private T[] itemsB;
private ReentrantLock readLock, writeLock;
private Condition notFull;
private Condition awake;
/**
* writeArray : in reader's eyes, reader get data from data source and write
* data to this line array. readArray : in writer's eyes, writer put data to
* data destination from this line array.
*
* Because of this is doubleQueue mechanism, the two line will exchange when
* time is suitable.
*
*/
private T[] writeArray, readArray;
private volatile int writeCount, readCount;
private int writeArrayTP, readArrayHP;
private volatile boolean closed = false;
private int spillSize = 0;
private long lineRx = 0;
private long lineTx = 0;
/**
* Get info of line number in {@link DoubleCachedQueue} space.
*
* @return Information of line number.
*
*/
public String info() {
return String.format("Write Array: %s/%s; Read Array: %s/%s", writeCount, writeArray.length, readCount, readArray.length);
}
/**
* Use the two parameters to construct a {@link DoubleCachedQueue} which hold the
* swap areas.
*
* @param lineLimit
* Limit of the line number the {@link DoubleCachedQueue} can hold.
*
* @param byteLimit
* Limit of the bytes the {@link DoubleCachedQueue} can hold.
*
*/
public DoubleCachedQueue(int lineLimit) {
if (lineLimit <= 0) {
this.lineLimit = default_line_limit;
}else{
this.lineLimit = lineLimit;
}
itemsA = (T[])new Object[lineLimit];
itemsB = (T[])new Object[lineLimit];
readLock = new ReentrantLock();
writeLock = new ReentrantLock();
notFull = writeLock.newCondition();
awake = writeLock.newCondition();
readArray = itemsA;
writeArray = itemsB;
spillSize = lineLimit * 8 / 10;
}
public DoubleCachedQueue(long cacheSize){
if (cacheSize <= 0) {
throw new IllegalArgumentException(
"Queue initial capacity can't less than 0!");
}
this.cacheSize = cacheSize > max_cache_size ? max_cache_size : cacheSize;
readLock = new ReentrantLock();
writeLock = new ReentrantLock();
notFull = writeLock.newCondition();
awake = writeLock.newCondition();
readArray = itemsA;
writeArray = itemsB;
spillSize = lineLimit * 8 / 10;
}
/**
* Get line number of the {@link DoubleCachedQueue}
*
* @return lineLimit Limit of the line number the {@link DoubleCachedQueue} can
* hold.
*
*/
public int getLineLimit() {
return lineLimit;
}
/**
* Set line number of the {@link DoubleCachedQueue}.
*
* @param capacity
* Limit of the line number the {@link DoubleCachedQueue} can hold.
*
*/
public void setLineLimit(int capacity) {
this.lineLimit = capacity;
}
/**
* Insert one line of record to a apace which buffers the swap data.
*
* @param line
* The inserted line.
*
*/
private void insert(T line) {
writeArray[writeArrayTP] = line;
++writeArrayTP;
++writeCount;
++lineRx;
}
/**
* Insert a line array(appointed the limit of array size) of data to a apace
* which buffers the swap data.
*
* @param lines
* Inserted line array.
*
* @param size
* Limit of inserted size of the line array.
*
*/
private void insert(T[] lines, int size) {
if(size > 0){
System.arraycopy(lines, 0, writeArray, writeArrayTP, size);
writeArrayTP = writeArrayTP + size;
writeCount = writeCount + size;
lineRx = lineRx + size;
}
// for (int i = 0; i < size; ++i) {
// writeArray[writeArrayTP] = lines[i];
// ++writeArrayTP;
// ++writeCount;
// ++lineRx;
// if(lines[i] != null && lines[i].getLine() != null){
// byteRx += lines[i].getLine().length();
// }
// }
}
/**
* Extract one line of record from the space which contains current data.
*
* @return line A line of data.
*
*/
private T extract() {
T e = readArray[readArrayHP];
readArray[readArrayHP] = null;
++readArrayHP;
--readCount;
++lineTx;
return e;
}
/**
* Extract a line array of data from the space which contains current data.
*
* @param ea
* @return Extracted line number of data.
*
*/
private int extract(T[] ea) {
int readsize = Math.min(ea.length, readCount);
if(readsize > 0){
readCount = readCount - readsize;
lineTx = lineTx + readsize;
System.arraycopy(readArray, readArrayHP, ea, 0, readsize);
readArrayHP = readArrayHP + readsize;
}
// for (int i = 0; i < readsize; ++i) {
// ea[i] = readArray[readArrayHP];
// readArray[readArrayHP] = null;
// ++readArrayHP;
// --readCount;
// ++lineTx;
// }
return readsize;
}
/**
* switch condition: read queue is empty && write queue is not empty.
* Notice:This function can only be invoked after readLock is grabbed,or may
* cause dead lock.
*
* @param timeout
*
* @param isInfinite
* whether need to wait forever until some other thread awake it.
*
* @return
*
* @throws InterruptedException
*
*/
private long queueSwitch(long timeout, boolean isInfinite)
throws InterruptedException {
System.out.println("queue switch");
writeLock.lock();
try {
if (writeCount <= 0) {
if (closed) {
return -2;
}
try {
if (isInfinite && timeout <= 0) {
awake.await();
return -1;
} else {
return awake.awaitNanos(timeout);
}
} catch (InterruptedException ie) {
awake.signal();
throw ie;
}
} else {
T[] tmpArray = readArray;
readArray = writeArray;
writeArray = tmpArray;
readCount = writeCount;
readArrayHP = 0;
writeCount = 0;
writeArrayTP = 0;
notFull.signal();
// logger.debug("Queue switch successfully!");
return -1;
}
} finally {
writeLock.unlock();
}
}
/**
* If exists write space, it will return true, and write one line to the
* space. otherwise, it will try to do that in a appointed time,when time is
* out if still failed, return false.
*
* @param line
* a Line.
*
* @param timeout
* appointed limit time
*
* @param unit
* time unit
*
* @return True if success,False if failed.
*
*/
public boolean offer(T line, long timeout, TimeUnit unit)
throws InterruptedException {
if (line == null) {
throw new NullPointerException();
}
long nanoTime = unit.toNanos(timeout);
writeLock.lockInterruptibly();
if(itemsA == null || itemsB == null){
initArray(line);
}
try {
for (;;) {
if (writeCount < writeArray.length) {
insert(line);
if (writeCount == 1) {
awake.signal();
}
return true;
}
// Time out
if (nanoTime <= 0) {
return false;
}
// keep waiting
try {
nanoTime = notFull.awaitNanos(nanoTime);
} catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
} finally {
writeLock.unlock();
}
}
private void initArray(T line) {
long recordLength = computeSize(line);
long size = cacheSize/recordLength;
if(size <= 0){
size = default_line_limit;
}
lineLimit = (int) size;
itemsA = (T[])new Object[(int) size];
itemsB = (T[])new Object[(int) size];
readArray = itemsA;
writeArray = itemsB;
}
public long computeSize(T line){
return 1;
}
/**
* If exists write space, it will return true, and write a line array to the
* space.<br>
* otherwise, it will try to do that in a appointed time,when time out if
* still failed, return false.
*
* @param lines
* line array contains lines of data
*
* @param size
* Line number needs to write to the space.
*
* @param timeout
* appointed limit time
*
* @param unit
* time unit
*
* @return status of this operation, true or false.
*
* @throws InterruptedException
* if being interrupted during the try limit time.
*
*/
public boolean offer(T[] lines, int size, long timeout, TimeUnit unit)
throws InterruptedException {
if (lines == null || lines.length == 0) {
throw new NullPointerException();
}
long nanoTime = unit.toNanos(timeout);
writeLock.lockInterruptibly();
if(itemsA == null || itemsB == null){
initArray(lines[0]);
}
try {
for (;;) {
if (writeCount + size <= writeArray.length) {
insert(lines, size);
if (writeCount >= spillSize) {
awake.signalAll();
}
return true;
}
// Time out
if (nanoTime <= 0) {
return false;
}
// keep waiting
try {
nanoTime = notFull.awaitNanos(nanoTime);
} catch (InterruptedException ie) {
notFull.signal();
throw ie;
}
}
} finally {
writeLock.unlock();
}
}
/**
* Close the synchronized lock and one inner state.
*
*/
public void close() {
writeLock.lock();
try {
closed = true;
//System.out.println(this);
awake.signalAll();
} finally {
writeLock.unlock();
}
}
public boolean isClosed() {
return closed;
}
/**
*
*
* @param timeout
* appointed limit time
*
* @param unit
* time unit
*/
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanoTime = unit.toNanos(timeout);
readLock.lockInterruptibly();
try {
for (;;) {
if (readCount > 0) {
return extract();
}
if (nanoTime <= 0) {
return null;
}
nanoTime = queueSwitch(nanoTime, true);
}
} finally {
readLock.unlock();
}
}
/**
*
* @param ea
* line buffer
*
*
* @param timeout
* a appointed limit time
*
* @param unit
* a time unit
*
* @return line number of data.if less or equal than 0, means fail.
*
* @throws InterruptedException
* if being interrupted during the try limit time.
*/
public int poll(T[] ea, long timeout, TimeUnit unit)
throws InterruptedException {
long nanoTime = unit.toNanos(timeout);
readLock.lockInterruptibly();
try {
for (;;) {
if (readCount > 0) {
return extract(ea);
}
if (nanoTime == -2) {
return -1;
}
if (nanoTime <= 0) {
return 0;
}
nanoTime = queueSwitch(nanoTime, false);
}
} finally {
readLock.unlock();
}
}
public Iterator<T> iterator() {
return null;
}
/**
* Get size of {@link Storage} in bytes.
*
* @return Storage size.
*
* */
@Override
public int size() {
return (writeCount + readCount);
}
@Override
public int drainTo(Collection<? super T> c) {
return 0;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return 0;
}
/**
* If exists write space, it will return true, and write one line to the
* space.<br>
* otherwise, it will try to do that in a appointed time(20
* milliseconds),when time out if still failed, return false.
*
* @param line
* a Line.
*
* @see DoubleCachedQueue#offer(Line, long, TimeUnit)
*
*/
@Override
public boolean offer(T line) {
try {
return offer(line, 20, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
log.debug(e1.getMessage(), e1);
}
return false;
}
@Override
public void put(T e) throws InterruptedException {
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public T take() throws InterruptedException {
return null;
}
@Override
public T peek() {
return null;
}
@Override
public T poll() {
try {
return poll(1*1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.debug(e.getMessage(), e);
}
return null;
}
}
(全文完)
最後更新:2017-04-03 12:55:21