閱讀565 返回首頁    go 魔獸


《 Java並發編程從入門到精通》第5章 多線程之間交互:線程閥

5.1 線程安全的阻塞隊列BlockingQueue

(1)先理解一下Queue、Deque、BlockingQueue的概念:

Queue(隊列) :用於保存一組元素,不過在存取元素的時候必須遵循先進先出原則。隊列是一種特殊的線性表,它隻允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。隊列中沒有元素時,稱為空隊列。在隊列這種數據結構中,最先插入的元素將是最先被刪除的元素;反之最後插入的元素將是最後被刪除的元素,因此隊列又稱為“先進先出”(FIFO—first in first out)的線性表。

Deque(雙端隊列): 兩端都可以進出的隊列。當我們約束從隊列的一端進出隊時,就形成了另外一種存取模式,它遵循先進後出原則,這就是棧結構。雙端隊列主要是用於棧操作。使用站結構讓操作有可追溯性(如windows窗口地址欄內的路徑前進棧、後退棧)。

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也隻從容器裏拿元素。

阻塞隊列提供了四種處理方法:

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

(2)Java裏的阻塞隊列最新JDK中提供了7個阻塞隊列。分別是:

 

BlockingQueue常用的方法有,更多方法請查詢API:

1)add(anObject):把anObject加到BlockingQueue裏,即如果BlockingQueue可以容納,則返回true,否則招聘異常

2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裏,即如果BlockingQueue可以容納,則返回true,否則返回false.

3)put(anObject):把anObject加到BlockingQueue裏,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏麵有空間再繼續.

4)poll(time):取走BlockingQueue裏排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null

5)take():取走BlockingQueue裏排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止

其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會拋出NullPointerException。null 被用作指示poll 操作失敗的警戒值。


5.2 ArrayBlockingQueue


    ArrayBlockingQueue一個由數組支持的有界的阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。

這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。

此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

先看一下ArrayBlockingQueue的部分源碼:理解一下ArrayBlockingQueue的實現原理和機製

public class ArrayBlockingQueue <E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
 
    //數組的儲存結構
    final Object[] items;    
   //鎖采用的機製
    final ReentrantLock lock;
    public ArrayBlockingQueue( int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程
        lock = new ReentrantLock(fair);
        notEmpty = lock .newCondition();
        notFull  lock .newCondition();
    }
    public boolean offer(E e) {
        checkNotNull(e);
        //使用ReentrantLock 鎖機製
        final ReentrantLock lock = this.lock;
        lock.lock();//加鎖
        try {
            if (count == items.length)
                return false ;
            else {
                enqueue(e);
                return true ;
            }
        } finally {
            lock.unlock();//釋放鎖
        }
    }
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[ putIndex] = x;//通過數組進行儲存
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
…….
}
使用實例是:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/*
* 現有的程序代碼模擬產生了16個日誌對象,並且需要運行16秒才能打印完這些日誌,
* 請在程序中增加4個線程去調用parseLog()方法來分頭打印這16個日誌對象,
* 程序隻需要運行4秒即可打印完這些日誌對象。
*/
public class BlockingQueueTest {
       public static void main(String[] args) throws Exception {
             // 新建一個等待隊列
             final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16);
             // 四個線程
             for (int i = 0; i < 4; i++) {
                   new Thread(new Runnable() {
                         @Override
                         public void run() {
                               while (true ) {
                                     try {
                                          String log = (String) bq.take();
                                           parseLog(log);
                                    } catch (Exception e) {
                                    }
                              }
                        }
                  }).start();
            }
             for (int i = 0; i < 16; i++) {
                  String log = (i + 1) + ” –>  “;
                  bq.put(log); // 將數據存到隊列裏!
            }
      }
       // parseLog方法內部的代碼不能改動
       public static void parseLog(String log) {
            System. out.println(log + System.currentTimeMillis());
             try {
                  Thread. sleep(1000);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
      }
}

5.3 LinkedBlockingQueue


      LinkedBlockingQueue : 基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列 中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;隻有當隊列緩衝區達到最大值緩存容量時 (LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反 之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別 采用了獨立的鎖來控製數據同步,這也意味著在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大 小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於 消費者的速度,也許還沒有等到隊列滿阻塞產生,係統內存就有可能已被消耗殆盡了。

先看一下LinkedBlockingDeque的部分源碼:理解一下ArrayBlockingQueue的實現原理和機製
public class LinkedBlockingDeque <E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    final ReentrantLock lock = new ReentrantLock();//線程安全
    /**
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);//每次插入後都將動態地創建鏈接節點
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e) {
        return offerLast(e);
    }
    public boolean add(E e) {
        addLast(e);
        return true ;
    }
    public void addLast(E e) {
        if (!offerLast(e))
            throw new IllegalStateException(“Deque full”);
    }
    public E removeFirst() {
        E x = pollFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
 
……
}
使用實例是:
將ArrayBlockingQueue的例子換成LinkedBlockingQueue即可:
 // 新建一個等待隊列
 final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16);
換成:
final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(16);

5.4 PriorityBlockingQueue 


PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而隻會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控製線程同步的鎖采用的是公平鎖。

先看一下PriorityBlockingQueue的部分源碼,理解一下PriorityBlockingQueue的實現原理和機製:

public class PriorityBlockingQueue <E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private final ReentrantLock lock ;//說明本類使用一個lock來同步讀寫等操作
    private transient Comparator<? super E> comparator;
     // 使用指定的初始容量創建一個 PriorityBlockingQueue,並根據指定的比較器對其元素進行排序。
    public PriorityBlockingQueue( int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
     public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
……
}
 

5.5 DelayQueue


     DelayQueue:是一個支持延時獲取元素的使用優先級隊列的實現的無界阻塞隊列。隊列中的元素必須實現Delayed接口和Comparable接口,也就是說DelayQueue裏麵的元素必須有public int compareTo( T o)和long getDelay(TimeUnit unit)方法存在,在創建元素時可以指定多久才能從隊列中獲取當前元素。隻有在延遲期滿時才能從隊列中提取元素。我們可以將DelayQueue運用在以下應用場景:
  • 緩存係統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
我們來看一下DelayQueue的源碼來理解一下:
//可以看出來E元素必須繼承Delayed和而Delayed又繼承Comparable;
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
 
    private final transient ReentrantLock lock = new ReentrantLock();//安全鎖機製
    private final PriorityQueue<E> q = new PriorityQueue<E>();//PriorityQueue來存取元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    //根據元素的Delay進行判斷
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q .poll();
                    first = null; // don’t retain ref while waiting
                    if (leader != null)
                       //沒到時間阻塞等待
                        available.await();
                    else {
                        Thread thisThread = Thread. currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null ;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
……
}
我們來看一下DelayQueue的使用實例:
(1)實現一個Student對象作為DelayQueue的元素必須實現Delayed 接口的兩個方法;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Student implements Delayed { //必須實現Delayed接口
       private String name ;
       private long submitTime ;// 交卷時間
       private long workTime ;// 考試時間
       public String getName() {
             return this .name + ” 交卷,用時” + workTime;
      }
       public Student(String name, long submitTime) {
             this.name = name;
             this.workTime = submitTime;
             this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS ) + System.nanoTime ();
            System. out.println(this.name + ” 交卷,用時” + workTime);
      }
       //必須實現getDelay方法
       public long getDelay(TimeUnit unit) {
//          返回一個延遲時間
             return unit.convert(submitTime – System.nanoTime (), unit.NANOSECONDS );
      }
       //必須實現compareTo方法
       public int compareTo(Delayed o) {
//          比較的方法
            Student that = (Student) o;
             return submitTime > that.submitTime ? 1 : ( submitTime < that.submitTime ? -1 : 0);
      }
}
(2)執行運行類如下:
package demo.thread;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
       public static void main(String[] args) throws Exception {
             // 新建一個等待隊列
             final DelayQueue<Student> bq = new DelayQueue<Student>();
             for (int i = 0; i < 5; i++) {
                  Student student = new Student(“學生” +i,Math.round((Math. random()*10+i)));
                  bq.put(student); // 將數據存到隊列裏!
            }
             //獲取但不移除此隊列的頭部;如果此隊列為空,則返回 null。
            System. out.println(“bq.peek()”+bq.peek().getName());
             //獲取並移除此隊列的頭部,在可從此隊列獲得到期延遲的元素,或者到達指定的等待時間之前一直等待(如有必要)。
             //poll(long timeout, TimeUnit unit) 大家可以試一試這個方法
      }
}

運行結果如下:每次運行結果都不一樣,一問,我們獲得永遠是隊列裏麵的第一個元素;

學生0 交卷,用時8
學生1 交卷,用時6
學生2 交卷,用時10
學生3 交卷,用時10
學生4 交卷,用時9
bq.peek()學生1 交卷,用時6

可以慢慢的在以後的工作當中體會DelayQueue的用法;

最後更新:2017-05-22 14:02:20

  上一篇:go  Java集合-Collection
  下一篇:go  Java 9中將移除 Sun.misc.Unsafe