OkHttp 3.7源碼分析(三)——任務隊列
OkHttp3.7源碼分析文章列表如下:
前麵的博客已經提到過,OkHttp的一個高效之處在於在內部維護了一個線程池,方便高效地執行異步請求。本篇博客將詳細介紹OkHttp的任務隊列機製。
1. 線程池的優點
OkHttp的任務隊列在內部維護了一個線程池用於執行具體的網絡請求。而線程池最大的好處在於通過線程複用減少非核心任務的損耗。
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。但如果對多線程應用不當,會增加對單個任務的處理時間。可以舉一個簡單的例子:
假設在一台服務器完成一項任務的時間為T
T1 創建線程的時間 T2 在線程中執行任務的時間,包括線程間同步所需時間 T3 線程銷毀的時間
顯然T = T1+T2+T3。注意這是一個極度簡化的假設。
可以看出T1,T3是多線程本身的帶來的開銷(在Java中,通過映射pThead,並進一步通過>SystemCall實現native線程),我們渴望減少T1,T3所用的時間,從而減少T的時間。但一些線>程的使用者並沒有注意到這一點,所以在程序中頻繁的創建或銷毀線程,這導致T1和T3在T中占有>相當比例。顯然這是突出了線程的弱點(T1,T3),而不是優點(並發性)。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。
- 通過對線程進行緩存,減少了創建銷毀的時間損失
- 通過控製線程數量閥值,減少了當線程過少時帶來的CPU閑置(比如說長時間卡在I/O上了)與線程過多時對JVM的內存與線程切換時係統調用的壓力
類似的還有Socket連接池、DB連接池、CommonPool(比如Jedis)等技術。
2. OkHttp的任務隊列
OkHttp的任務隊列主要由兩部分組成:
- 任務分發器dispatcher:負責為任務找到合適的執行線程
- 網絡請求任務線程池
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private Runnable idleCallback;
/** Executes calls. Created lazily. */
private ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
...
}
參數說明如下:
- readyAsyncCalls:待執行異步任務隊列
- runningAsyncCalls:運行中異步任務隊列
- runningSyncCalls:運行中同步任務隊列
- executorService:任務隊列線程池:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
- int corePoolSize: 最小並發線程數,這裏並發同時包括空閑與活動的線程,如果是0的話,空閑一段時間後所有線程將全部被銷毀
- int maximumPoolSize: 最大線程數,當任務進來時可以擴充的線程最大值,當大於了這個值就會根據丟棄處理機製來處理
- long keepAliveTime: 當線程數大於
corePoolSize
時,多餘的空閑線程的最大存活時間,類似於HTTP中的Keep-alive- TimeUnit unit: 時間單位,一般用秒
- BlockingQueue workQueue: 工作隊列,先進先出,可以看出並不像Picasso那樣設置優先隊列
- ThreadFactory threadFactory: 單個線程的工廠,可以打Log,設置
Daemon
(即當JVM退出時,線程自動結束)等可以看出,在Okhttp中,構建了一個閥值為[0, Integer.MAX_VALUE]的線程池,它不保留任何最小線程數,隨時創建更多的線程數,當線程空閑時隻能活60秒,它使用了一個不存儲元素的阻塞工作隊列,一個叫做"OkHttp Dispatcher"的線程工廠。
也就是說,在實際運行中,當收到10個並發請求時,線程池會創建十個線程,當工作完成後,線程池會在60s後相繼關閉所有線程。
3. Dispatcher分發器
dispatcher分發器類似於Ngnix中的反向代理,通過Dispatcher將任務分發到合適的空閑線程,實現非阻塞,高可用,高並發連接
1.同步請求
當我們使用OkHttp進行同步請求時,一般構造如下:
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build();
Response response = client.newCall(request).execute();
接下來看看RealCall.execute
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
同步調用的執行邏輯是:
- 將對應任務加入分發器
- 執行任務
- 執行完成後通知dispatcher對應任務已完成,對應任務出隊
2.異步請求
異步請求一般構造如下:
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d("OkHttp", "Call Failed:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d("OkHttp", "Call succeeded:" + response.message());
}
});
當HttpClient的請求入隊時,根據代碼,我們可以發現實際上是Dispatcher進行了入隊操作。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加正在運行的請求
runningAsyncCalls.add(call);
//線程池執行請求
executorService().execute(call);
} else {
//添加到緩存隊列排隊等待
readyAsyncCalls.add(call);
}
}
如果滿足條件:
- 當前請求數小於最大請求數(64)
- 對單一host的請求小於閾值(5)
將該任務插入正在執行任務隊列,並執行對應任務。如果不滿足則將其放入待執行隊列。
接下來看看AsyncCall.execute
@Override protected void execute() {
boolean signalledCallback = false;
try {
//執行耗時IO任務
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
//回調,注意這裏回調是在線程池中,而不是想當然的主線程回調
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回調,同上
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最關鍵的代碼
client.dispatcher().finished(this);
}
}
當任務執行完成後,無論成功與否都會調用dispatcher.finished方法,通知分發器相關任務已結束:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
- 空閑出多餘線程,調用promoteCalls調用待執行的任務
- 如果當前整個線程池都空閑下來,執行空閑通知回調線程(idleCallback)
接下來看看promoteCalls:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
promoteCalls的邏輯也很簡單:掃描待執行任務隊列,將任務放入正在執行任務隊列,並執行該任務。
4. 總結
以上就是整個任務隊列的實現細節,總結起來有以下幾個特點:
- OkHttp采用Dispatcher技術,類似於Nginx,與線程池配合實現了高並發,低阻塞的運行
- Okhttp采用Deque作為緩存,按照入隊的順序先進先出
- OkHttp最出彩的地方就是在try/finally中調用了
finished
函數,可以主動控製等待隊列的移動,而不是采用鎖或者wait/notify,極大減少了編碼複雜性
最後更新:2017-05-05 10:31:20