閱讀261 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Java異步——ThreadPoolExecutor源碼解析

任何一種語言、框架,線程都是非常重要的一部分。要想實現異步就需要通過異步線程,但是頻繁地創建銷毀線程會帶來較大的性能開銷,而線程池就是為解決這一問題而出現的。簡單來說線程池有以下幾大優勢:

  • 降低資源開銷:通過複用已經創建的線程,降低線程頻繁創建、銷毀帶來的資源開銷和性能損耗
  • 快速啟動任務:通過複用已有線程,快速啟動任務
  • 易於管理:線程池可以統一管理、分配、調優和監控

Java中的線程池是基於ThreadPoolExecutor實現的,我們使用的ExecutorService的各種線程池策略都是基於ThreadPoolExecutor實現的,所以ThreadPoolExecutor十分重要。要弄明白各種線程池策略,必須先弄明白ThreadPoolExecutor

1 創建線程池

首先來看下線程池的創建:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize 核心線程池大小
  • maximumPoolSize 線程池最大容量大小
  • keepAliveTime 線程池空閑時,線程存活的時間
  • TimeUnit 時間單位
  • ThreadFactory 線程工廠
  • BlockingQueue任務隊列
  • RejectedExecutionHandler 線程拒絕策略

這裏寫圖片描述

ThreadPoolExecutor的基本流程如下:

  • 當用戶通過submit或者execute提交任務時,如果當前線程池中線程數小於corePoolSize,直接創建一個線程執行任務
  • 如果當前線程數大於corePoolSize,則將任務加入到BlockingQueue
  • 如果BlockingQueue也滿了,在小於MaxPoolSize的情況下創建線程執行任務
  • 如果線程數大於等於MaxPoolSize,那麼執行拒絕策略RejectedExecutionHandler
  • 當線程池中超過corePoolSize線程,空閑時間達到keepAliveTime時,關閉空閑線程

2 線程池狀態

ThreadPoolExecutor內部有多個狀態,理解線程池內部狀態對於理解線程池原理至關重要,所以接下來看下線程池的狀態:

/*
     * runState是整個線程池的運行生命周期狀態,有如下取值:
     *  1. RUNNING:可以新加線程,同時可以處理queue中的線程。
     *  2. SHUTDOWN:不增加新線程,但是處理queue中的線程。
     *  3. STOP 不增加新線程,同時不處理queue中的線程。
     *  4. TIDYING 所有的線程都終止了(queue中),同時workerCount為0,那麼此時進入TIDYING
     *  5. terminated()方法結束,變為TERMINATED
     * The runState provides the main lifecyle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     * 狀態的轉化主要是:
     * RUNNING -> SHUTDOWN(調用shutdown())
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP(調用shutdownNow())
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING(queue和pool均empty)
     *    When both queue and pool are empty
     * STOP -> TIDYING(pool empty,此時queue已經為empty)
     *    When pool is empty
     * TIDYING -> TERMINATED(調用terminated())
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */

runState的存儲也值得一說,它並不是用一個單獨的int或者enum進行存儲,而是和線程數workerCount共同保存到一個原子量ctl中:

//利用ctl來保證當前線程池的狀態和當前的線程的數量。ps:低29位為線程池容量,高3位為線程狀態。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //設定偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //確定最大的容量2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //幾個狀態,用Integer的高三位表示
    // runState is stored in the high-order bits
    //111
    private static final int RUNNING    = -1 << COUNT_BITS;
    //000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //001
    private static final int STOP       =  1 << COUNT_BITS;
    //010
    private static final int TIDYING    =  2 << COUNT_BITS;
    //011
    private static final int TERMINATED =  3 << COUNT_BITS;
    //獲取線程池狀態,取前三位
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取當前正在工作的worker,主要是取後麵29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //獲取ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 通過調用runStateOf()方法獲取當前線程池狀態
  • 通過調用workerCountOf()獲取當前線程數

3 添加任務

向線程池添加任務一般通過execute或者submit方法添加,接下來通過execute方法介紹下添加任務的原理:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //當前的Worker的數量小於核心線程池大小時,新建一個Worker線程執行該任務。
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //如果worker數量已經大於核心線程數,嚐試將任務添加到任務隊列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//recheck防止線程池狀態的突變,如果突變,那麼將reject線程,防止workQueue中增加新線程
                reject(command);
            else if (workerCountOf(recheck) == 0)//上下兩個操作都有addWorker的操作,但是如果在workQueue.offer的時候Worker變為0,
                 //那麼將沒有Worker執行新的task,所以增加一個Worker.
                addWorker(null, false);
        }
        //如果workQueue滿了,那麼這時候可能還沒到線程池的maximum,所以嚐試增加一個Worker
        else if (!addWorker(command, false))
            reject(command);//如果Worker數量到達上限,那麼就拒絕此線程
    }

可以看到execute方法內部的核心邏輯在於添加工作線程addWorker方法,所以接下來看下addWorker:

   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            /**
             * rs!=Shutdown || fistTask!=null || workCount.isEmpty
             * 如果當前的線程池的狀態>SHUTDOWN 那麼拒絕Worker的add 
             * 如果=SHUTDOWN,那麼此時不能新加入不為null的Task,如果在WorkCount為empty的時候不能加入任何            * 類型的Worker,
             * 如果不為empty可以加入task為null的Worker,增加消費的Worker
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果當前線程數已經超標,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果線程數沒有超標,則嚐試通過CAS將workercount加一,如果成功直接跳出循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果失敗,對狀態進行double check,如果狀態已改變則重試
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //接下來開始真正創建新的線程
        //創建一個新的worker線程
        Worker w = new Worker(firstTask);
        Thread t = w.thread;
        //獲取鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
             * rs!=SHUTDOWN ||firstTask!=null
             * 
             * 同樣檢測當rs>SHUTDOWN時直接拒絕減小Wc,同時Terminate,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
             */
            if (t == null ||
                (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null))) {
                decrementWorkerCount();
                tryTerminate();
                return false;
            }
            //將新建的worker線程加入到workers數組中
            workers.add(w);

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }
        //新建線程開始執行
        t.start();
        // It is possible (but unlikely) for a thread to have been
        // added to workers, but not yet started, during transition to
        // STOP, which could result in a rare missed interrupt,
        // because Thread.interrupt is not guaranteed to have any effect
        // on a non-yet-started Thread (see Thread#interrupt).
        //若此時線程池狀態變為STOP,但當前線程並未interrupt,執行interrupt
        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();
        return true;
    }

整個addWorker方法大致分為兩大階段:

  • workerCount++:此時並不創建真正的線程,而僅僅是通過CAS操作把workerCount加一
  • 創建線程:創建worker線程,將其加入到workers隊列中,並根據狀態對線程進行不同操作

3.1 workerCount++

workerCount++操作主要涉及上述代碼中標號retry覆蓋的代碼,主要邏輯有以下三大部分:

  • 根據線程池當前狀態判斷是否可以添加線程,如果不能添加直接返回false:
    • 如果當前的線程池的狀態>SHUTDOWN 那麼拒絕Worker的add
    • 如果=SHUTDOWN,那麼此時不能新加入不為null的Task,如果在WorkCount為empty的時候不能加入任何類型的Worker
    • 如果不為empty可以加入task為null的Worker,增加消費的Worker
  • 根據當前worker數判斷是否可以添加線程:
    • 如果core為true,且當前worker數超過corePoolSize則不允許添加線程
    • 如果core為fasle,且worker數超過maximumPoolSize則不允許添加線程
  • 通過compareAndIncrementWorkerCount執行workerCount++操作,如果成功跳出循環;如果失敗對當前狀態進行doubleCheck,如果狀態改變重新回到步驟1,如果狀態不變重新回到步驟2

3.2 創建線程

創建線程的操作主要分為以下幾個步驟:

  • 創建一個worker線程實例
  • 獲取當前線程池鎖進行互斥操作
  • 對線程池狀態再次進行判斷。同樣檢測當rs>SHUTDOWN時直接拒絕減小Wc,同時Terminate,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
  • 將線程加入線程隊列中,釋放鎖
  • 執行線程
  • 若此時線程池狀態變為STOP,但當前線程並未interrupt,執行interrupt

4 Worker

在第3節中看到添加的線程是通過Worker實現的,所以接下來看下Worker這個類:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable var2) {
            this.setState(-1);
            this.firstTask = var2;
            this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
        }
        ......
        public void run() {
            ThreadPoolExecutor.this.runWorker(this);
        }
}

可以看到Worker實現了Runnable接口,並在內部維護了一個線程變量,看到這裏其實Worker的大致邏輯明顯了,無非是維護一個線程實例,執行添加的runnable實例。

4.1runWorker

在addWorker方法中,Worker實例創建好後會就會執行其thread變量的start方法,進而也就會執行Worker的run方法:

public void run() {
  ThreadPoolExecutor.this.runWorker(this);
}

所以接下來看下ThreadPoolExecutor的runWorker方法:

final void runWorker(Worker w) {
     Runnable task = w.firstTask;
     w.firstTask = null;
     //標識線程是不是異常終止的
     boolean completedAbruptly = true;
     try {
         //task不為null情況是初始化worker時,如果task為null,則去隊列中取線程--->getTask()
         while (task != null || (task = getTask()) != null) {
             w.lock();
             //獲取woker的鎖,防止線程被其他線程中斷
             clearInterruptsForTaskRun();//清楚所有中斷標記
             try {
                 beforeExecute(w.thread, task);//線程開始執行之前執行此方法,可以實現Worker未執行退出,本類中未實現
                 Throwable thrown = null;
                 try {
                     task.run();
                 } catch (RuntimeException x) {
                     thrown = x; throw x;
                 } catch (Error x) {
                     thrown = x; throw x;
                 } catch (Throwable x) {
                     thrown = x; throw new Error(x);
                 } finally {
                     afterExecute(task, thrown);//線程執行後執行,可以實現標識Worker異常中斷的功能,本類中未實現
                 }
             } finally {
                 task = null;//運行過的task標null,方便GC
                 w.completedTasks++;
                 w.unlock();
             }
         }
         completedAbruptly = false;
     } finally {
         //處理worker退出的邏輯
         processWorkerExit(w, completedAbruptly);
     }
 }

整個方法的邏輯比較簡單:

  • task不為null情況是初始化worker時,如果task為null,則去隊列中取線程--->getTask()
  • 獲取woker的鎖,防止線程被其他線程中斷
  • 線程開始執行之前執行beforeExecute方法,可以實現Worker未執行退出,本類中未實現
  • 執行任務
  • 線程執行後執行,可以實現標識Worker異常中斷的功能,本類中未實現
  • 處理worker退出的邏輯

4.2 getTask

接下來再來看看runWorker中的getTask方法:

private Runnable getTask() {
     boolean timedOut = false; // Did the last poll() time out?

     retry:
     for (;;) {
         int c = ctl.get();
         int rs = runStateOf(c);

         // Check if queue empty only if necessary.
         //當前狀態為>stop時,不處理workQueue中的任務,同時減小worker的數量所以返回null,如果為shutdown 同時workQueue已經empty了,同樣減小worker數量並返回null
         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
             decrementWorkerCount();
             return null;
         }

         boolean timed;      // Are workers subject to culling?

         for (;;) {
             int wc = workerCountOf(c);
             timed = allowCoreThreadTimeOut || wc > corePoolSize;

             if (wc <= maximumPoolSize && ! (timedOut && timed))
                 break;
             if (compareAndDecrementWorkerCount(c))
                 return null;
             c = ctl.get();  // Re-read ctl
             if (runStateOf(c) != rs)
                 continue retry;
             // else CAS failed due to workerCount change; retry inner loop
         }

         try {
             Runnable r = timed ?
                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                 workQueue.take();
             if (r != null)
                 return r;
             timedOut = true;
         } catch (InterruptedException retry) {
             timedOut = false;
         }
     }
 }

這段代碼十分關鍵,首先看幾個局部變量:

boolean timedOut = false;//主要是判斷後麵的poll是否要超時
boolean timed;//主要是標識著當前Worker超時是否要退出

wc > corePoolSize時需要減小空閑的Worker數,那麼timed為true,但是wc <= corePoolSize時,不能減小核心線程數timed為false。
timedOut初始為false,如果timed為true那麼使用poll取線程。如果正常返回,那麼返回取到的task。如果超時,證明worker空閑,同時worker超過了corePoolSize,需要刪除。返回r=null。則 timedOut = true。此時循環到wc <= maximumPoolSize && ! (timedOut && timed)時,減小worker數,並返回null,導致worker退出。如果線程數<= corePoolSize,那麼此時調用 workQueue.take(),沒有線程獲取到時將一直阻塞,直到獲取到線程或者中斷。

最後更新:2017-07-16 23:02:42

  上一篇:go  實踐 —— 親測從 RDS MySQL 通過數據集成導入 MaxCompute
  下一篇:go  Java異步——FutureTask源碼解析