252
京東網上商城
小規模的流處理框架.Part 1: thread pools
(譯者:強力推薦這篇文章,作者設計了一個用於小流量的流式數據處理框架,並詳細給出了每一個需要注意的設計細節,對比了不同設計方案的優缺點,能夠讓你對流處理過程,某些設計模式和設計原則以及指標度量工具有一個更深刻的認識!)
在GeeCON 2016上我為我的公司準備了一個編程競賽,這次的任務是設計並實現一個能夠滿足以下要求的係統:
係統能夠每秒處理1000個任務,每一個Event至少有2個屬性:
- clientId-我們希望每一秒有多個任務是在同一個客戶端下處理的(譯者:不同的clientId對應不同的ClientProjection,即對應不同的一係列操作)
- UUID-全局唯一的
消費一個任務要花費10毫秒,為這樣的流設計一個消費者:
- 能夠實時的處理任務
- 和同一個客戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientId的任務序列使用並行處理
- 如果10秒內出現了重複的UUID,丟棄它。假設10秒後不會重複
有幾個關於以上要求的重要細節:
- 1000events/s的任務量,消耗一個event要10ms,1s內能消耗100個event,那麼為了保證實時性,就需要10個並發的消費者。
- events擁有聚集的ID(clientId),在1s內我們希望多個event能夠被指定到同一個給定的client上,並且我們不能夠並發地或無序地處理這些event。
- 我們必須以某種方式忽略重複的信息,最可能的方法就是記住最近10s內所有的ID,這就需要暫時保存一萬個UUID。
在這篇文章中,我會引導你們使用一些成功的方案並做一些小小的突破,你將要學習如何使用精確地有針對性的度量器來解決問題。
Naive sequential processing
我們可以在迭代器中處理這個問題,首先我們可以對API做一些假設,想象一下它會是這個樣子:
01 |
interface EventStream {
|
03 |
void consume(EventConsumer consumer);
|
08 |
interface EventConsumer {
|
09 |
Event consume(Event event);
|
15 |
private final Instant created = Instant.now();
|
16 |
private final int clientId;
|
17 |
private final UUID uuid;
|
一個典型的推送式API,和JMS很像。需要注意的是EventConsumer是阻塞的,這就意味著它不會返回新的Event,除非前一個已經被處理完畢了。這僅僅是我做出的一個假設,而且它沒有太大的違反之前的要求,這也是JMS中消息偵聽者的工作機製。下麵是一個簡單的實現,這個實現隻是簡單的添加了一個工作間隔為10ms的偵聽器:
1 |
class ClientProjection implements EventConsumer {
|
4 |
public Event consume(Event event) {
|
5 |
Sleeper.randSleep( 10 , 1 ); //譯者:這裏隻是用睡眠來代替實際編程中一些耗時的操作
|
當然在現實生活中這個consumer可能會在數據庫中做一些存儲操作,或者進行遠程調用等等。我在睡眠時間的分布上添加了一些隨機性,目的是使得手動測試更加貼近實際情況(譯者:實際情況中耗時操作的用時不盡相同,所以要隨機化):
03 |
private static final Random RANDOM = new Random();
|
05 |
static void randSleep( double mean, double stdDev) {
|
06 |
final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
|
08 |
TimeUnit.MICROSECONDS.sleep(( long ) micros);
|
09 |
} catch (InterruptedException e) {
|
10 |
throw new RuntimeException(e);
|
18 |
EventStream es = new EventStream(); //some real implementation here
|
19 |
es.consume( new ClientProjection());
|
以上的代碼能夠編譯並運行,但為了滿足設計要求我們必須要插入一些度量器。最重要的度量器就是有關於信息消費的潛伏期,這個潛伏期指的是從信息的產生到開始處理的這段時間。我們使用 Dropwizard Metrics來實現這個潛伏期的度量:
01 |
class ClientProjection implements EventConsumer {
|
03 |
private final ProjectionMetrics metrics;
|
05 |
ClientProjection(ProjectionMetrics metrics) {
|
06 |
this .metrics = metrics;
|
10 |
public Event consume(Event event) {
|
11 |
metrics.latency(Duration.between(event.getCreated(), Instant.now()));
|
12 |
Sleeper.randSleep( 10 , 1 );
|
ProjectionMetrics類的功能如下(主要就是將event的潛伏期用柱狀圖的形式表現出來):
01 |
import com.codahale.metrics.Histogram;
|
02 |
import com.codahale.metrics.MetricRegistry;
|
03 |
import com.codahale.metrics.Slf4jReporter;
|
04 |
import lombok.extern.slf4j.Slf4j;
|
06 |
import java.time.Duration;
|
07 |
import java.util.concurrent.TimeUnit;
|
10 |
class ProjectionMetrics {
|
12 |
private final Histogram latencyHist;
|
14 |
ProjectionMetrics(MetricRegistry metricRegistry) {
|
15 |
final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
|
17 |
.convertRatesTo(TimeUnit.SECONDS)
|
18 |
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
20 |
reporter.start( 1 , TimeUnit.SECONDS);
|
21 |
latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics. class , "latency" ));
|
24 |
void latency(Duration duration) {
|
25 |
latencyHist.update(duration.toMillis());
|
現在當你運行這個解決方案時,你很快就會發現潛伏期的中值和第99.9%的值(分別指的是第count/2個值和第99.9%*count個值)都在無限增長:
1 |
type=HISTOGRAM, [...] count= 84 , min= 0 , max= 795 , mean= 404.88540608274104 , [...]
|
2 |
median= 414.0 , p75= 602.0 , p95= 753.0 , p98= 783.0 , p99= 795.0 , p999= 795.0
|
3 |
type=HISTOGRAM, [...] count= 182 , min= 0 , max= 1688 , mean= 861.1706371990878 , [...]
|
4 |
median= 869.0 , p75= 1285.0 , p95= 1614.0 , p98= 1659.0 , p99= 1678.0 , p999= 1688.0
|
6 |
[... 30 seconds later...]
|
8 |
type=HISTOGRAM, [...] count= 2947 , min= 14 , max= 26945 , mean= 15308.138585757424 , [...]
|
9 |
median= 16150.0 , p75= 21915.0 , p95= 25978.0 , p98= 26556.0 , p99= 26670.0 , p999= 26945.0
|
在運行了30s之後我們的應用程序處理event會出現平均15s的延遲,因此它並不具備完整的實時性,顯然缺少並發才是原因所在。我們的ClientProjection事件消費者會花費10ms去完成事件處理,所以它每秒最多可以處理100個event,然而我們需要更多的處理量。我們必須要增強ClientProjection同時不違反其他的設計要求!
Naive thread pool
最顯而易見的解決方法是對EventConsumer使用多線程技術,最簡單的實現途徑就是利用ExecutorService:
01 |
import java.util.concurrent.ExecutorService;
|
02 |
import java.util.concurrent.Executors;
|
04 |
class NaivePool implements EventConsumer, Closeable {
|
06 |
private final EventConsumer downstream;
|
07 |
private final ExecutorService executorService;
|
09 |
NaivePool( int size, EventConsumer downstream) {
|
10 |
this .executorService = Executors.newFixedThreadPool(size);
|
11 |
this .downstream = downstream;
|
15 |
public Event consume(Event event) {
|
16 |
executorService.submit(() -> downstream.consume(event));
|
21 |
public void close() throws IOException {
|
22 |
executorService.shutdown();
|
這裏我們使用了裝飾者模式。最初的ClientProjection實現EventConsumer是可行的,但我們利用加入了並發的另一個EventConsumer實現對ClientProjection進行包裝。這就允許我們能夠將更複雜的行為組合起來而不用更改ClientProjection本身,這種設計可以:
- 解耦:不同的EventConsumer互不影響,但它們卻可以自由地組合在一起,在同一個線程池中工作
- 單一職責:每個EventConsumer隻做一項工作,並將自己委托給下一個組件即線程池
-
開放/關閉原則:我們可以改變係統的行為卻不用修改現有實現
開放/關閉原則通常可以通過注入策略模式和模板方法模式來實現,這很簡單。整體的代碼如下:
01 |
MetricRegistry metricRegistry = |
03 |
ProjectionMetrics metrics = |
04 |
new ProjectionMetrics(metricRegistry);
|
05 |
ClientProjection clientProjection = |
06 |
new ClientProjection(metrics);
|
08 |
new NaivePool( 10 , clientProjection);
|
09 |
EventStream es = new EventStream();
|
10 |
es.consume(naivePool); |
我們寫的度量器顯示這種改良的方案確實表現的更好:
1 |
type=HISToOGRAM, count= 838 , min= 1 , max= 422 , mean= 38.80768197277468 , [...]
|
2 |
median= 37.0 , p75= 45.0 , p95= 51.0 , p98= 52.0 , p99= 52.0 , p999= 422.0
|
3 |
type=HISTOGRAM, count= 1814 , min= 1 , max= 281 , mean= 47.82642776789085 , [...]
|
4 |
median= 51.0 , p75= 57.0 , p95= 61.0 , p98= 62.0 , p99= 63.0 , p999= 65.0
|
6 |
[... 30 seconds later...]
|
8 |
type=HISTOGRAM, count= 30564 , min= 5 , max= 3838 , mean= 364.2904915942238 , [...]
|
9 |
median= 352.0 , p75= 496.0 , p95= 568.0 , p98= 574.0 , p99= 1251.0 , p999= 3531.0
|
我們可以看到延遲雖然也在增長但規模卻小得多,30s後潛伏期達到了364ms。這種潛伏期增長是係統問題,我們需要更多的度量器。注意到NaivePool(你會明白為什麼這裏是naive-初級的)會開啟10條線程,這應該足以處理1000個event,每個要花費10ms。在實際情況下,我們需要一點額外的處理容量來避免因垃圾回收或小規模峰值負荷所帶來的問題。為了證明線程池才是我們的瓶頸,我們要監控它內部的隊列,這需要一點小小的工作量:
01 |
class NaivePool implements EventConsumer, Closeable {
|
03 |
private final EventConsumer downstream;
|
04 |
private final ExecutorService executorService;
|
06 |
NaivePool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
07 |
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
08 |
String name = MetricRegistry.name(ProjectionMetrics. class , "queue" );
|
09 |
Gauge<Integer> gauge = queue::size;
|
10 |
metricRegistry.register(name, gauge);
|
11 |
this .executorService =
|
12 |
new ThreadPoolExecutor(
|
13 |
size, size, 0L, TimeUnit.MILLISECONDS, queue);
|
14 |
this .downstream = downstream;
|
18 |
public Event consume(Event event) {
|
19 |
executorService.submit(() -> downstream.consume(event));
|
24 |
public void close() throws IOException {
|
25 |
executorService.shutdown();
|
這裏使用ThreadPoolExecutor的目的是為了能夠提供自定義的LinkedBlockingQueue實例,接下來就可以監控隊列的長度(see:ExecutorService – 10 tips and tricks)。Gauge會周期性地調用queue::size,你需要的時候就會提供隊列的長度。度量器顯示線程池的大小確實是一個問題:
1 |
type=GAUGE, name=[...].queue, value= 35
|
2 |
type=GAUGE, name=[...].queue, value= 52
|
4 |
[... 30 seconds later...]
|
6 |
type=GAUGE, name=[...].queue, value= 601
|
不斷增長的隊列長度進一步加劇了隊列內正在等待著的task的潛伏期,將線程池的大小增加到10到20之間,最終隊列的長度顯示合理並且沒有失控。然而我們仍然沒有解決重複ID問題,並且也沒有解決同一個clientId可能會對它的events進行並發處理的問題。
Obscure locking
讓我們從避免對擁有相同clientId的events使用並行處理開始。如果兩個有相同clientId的event一個接一個地來,相繼進入線程池隊列,那麼NaivePool會幾乎同時將它們取出隊列實現並行處理。開始的時候我們可能會想到對每一個clientId加一個Lock:
02 |
class FailOnConcurrentModification implements EventConsumer {
|
04 |
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
|
05 |
private final EventConsumer downstream;
|
07 |
FailOnConcurrentModification(EventConsumer downstream) {
|
08 |
this .downstream = downstream;
|
12 |
public Event consume(Event event) {
|
13 |
Lock lock = findClientLock(event);
|
16 |
downstream.consume(event);
|
21 |
log.error( "Client {} already being modified by another thread" , event.getClientId());
|
26 |
private Lock findClientLock(Event event) {
|
27 |
return clientLocks.computeIfAbsent(
|
29 |
clientId -> new ReentrantLock());
|
以上的代碼完全搞錯方向了,這種設計太過於複雜,但運行代碼至少會發現一個問題。events的處理過程就像下麵這樣,由一個裝飾者包裹著另一個:
1 |
ClientProjection clientProjection = |
2 |
new ClientProjection( new ProjectionMetrics(metricRegistry));
|
3 |
FailOnConcurrentModification failOnConcurrentModification = |
4 |
new FailOnConcurrentModification(clientProjection);
|
6 |
new NaivePool( 10 , failOnConcurrentModification, metricRegistry);
|
7 |
EventStream es = new EventStream();
|
一旦運行過一會兒錯誤信息就會彈出來,告訴我們在其他線程中已經在處理擁有相同clientId的event。我們為每一個clientId都綁定了一個Lock,這樣做的目的是為了弄清楚如果其他的線程沒有處理的時候client的狀態。這種醜陋的方法讓我們的方案變得慘不忍睹,與其因獲取不到Lock而拋出錯誤信息,還不如等待一下,等待Lock被釋放:
02 |
class WaitOnConcurrentModification implements EventConsumer {
|
04 |
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
|
05 |
private final EventConsumer downstream;
|
06 |
private final Timer lockWait;
|
08 |
WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
|
09 |
this .downstream = downstream;
|
10 |
lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification. class , "lockWait" ));
|
14 |
public Event consume(Event event) {
|
16 |
final Lock lock = findClientLock(event);
|
17 |
final Timer.Context time = lockWait.time();
|
19 |
final boolean locked = lock.tryLock( 1 , TimeUnit.SECONDS);
|
22 |
downstream.consume(event);
|
27 |
} catch (InterruptedException e) {
|
28 |
log.warn( "Interrupted" , e);
|
33 |
private Lock findClientLock(Event event) {
|
34 |
return clientLocks.computeIfAbsent(
|
36 |
clientId -> new ReentrantLock());
|
這次的設計和之前的很像,但不同的是tryLock()會持續1s的時間以等待指定client的Lock被釋放。如果兩個有相同clientId的event相繼出現,其中一個會獲取到Lock進行處理,而另一個會一直阻塞直到unlock()被調用。
這段代碼不僅複雜,而且在某些微妙的情況下可能會發生不可預知的錯誤。例如,如果兩個有相同clientId的event幾乎在同一時刻出現,那麼誰將會是第一個?兩個event會在同一時刻請求Lock,這時我們並不能保證哪一個event會第一個得到非公平鎖,處理event的順序可能就會發生混亂。肯定會有更好的方法…
Dedicated threads
讓我們退一步,深吸一口氣。你會怎樣確保事情不會並行發生?僅僅使用一個線程就行了!事實上這是我們最開始的做法,但它的處理流量並不理想。我們不用關心不同clientIds的並發情況,我們隻需要確保有相同clientId的events由一個專有線程處理就行。
你可能會想到使用一個map將clientId映射到Thread,當然這太簡單了。我們可能會創造上千個線程,而它們大多數的時候可能都處於空閑狀態(對於給定的clientId每秒可能隻處理少數幾個event)。一個很好的折中是使用固定大小的線程池,每個線程負責指定的一些clientId。在這種方法中,兩個不同的clientId可能會在同一個線程中完成處理,但相同的clientId總是在同一個線程中處理。如果兩個有相同clientId的event出現了,它們都會被送去同一個線程,因此為了避免並發處理,以下實現相當簡單:
01 |
class SmartPool implements EventConsumer, Closeable {
|
03 |
private final List<ExecutorService> threadPools;
|
04 |
private final EventConsumer downstream;
|
06 |
SmartPool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
07 |
this .downstream = downstream;
|
08 |
List<ExecutorService> list = IntStream
|
10 |
.mapToObj(i -> Executors.newSingleThreadExecutor())
|
11 |
.collect(Collectors.toList());
|
12 |
//譯者:這裏使用CopyOnWriteArrayList是為了保證訪問threadPools裏麵元素時是線程安全的
|
13 |
this .threadPools = new CopyOnWriteArrayList<>(list);
|
17 |
public void close() throws IOException {
|
18 |
threadPools.forEach(ExecutorService::shutdown);
|
22 |
public Event consume(Event event) {
|
23 |
final int threadIdx = event.getClientId() % threadPools.size();
|
24 |
final ExecutorService executor = threadPools.get(threadIdx);
|
25 |
executor.submit(() -> downstream.consume(event));
|
關鍵點是最後的那部分:
1 |
int threadIdx = event.getClientId() % threadPools.size();
|
2 |
ExecutorService executor = threadPools.get(threadIdx); |
這個簡單的算法總是為相同的clientId使用同一個ExecutorService單線程,不同的ID可能會在同一個線程內處理,例如當threadPools的大小為20時,Id為7, 27, 47的client都會在索引為7的線程內處理。雖然一個線程會對應多個clientId,但隻要一個clientId在同一個線程內處理就行了。基於這點,鎖就不需要了,順序調用也就得到了保障。邊注:一個clientId對應一個線程可能產生無法預估的後果,但一個actor對應一個clientId(例如在Akka裏麵就是如此)就簡單許多。
順便為了保證安全,我為每一個線程池都插入了度量器以監控它們的隊列長度,實現如下:
01 |
class SmartPool implements EventConsumer, Closeable {
|
03 |
private final List<LinkedBlockingQueue<Runnable>> queues;
|
04 |
private final List<ExecutorService> threadPools;
|
05 |
private final EventConsumer downstream;
|
07 |
SmartPool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
08 |
this .downstream = downstream;
|
09 |
this .queues = IntStream
|
11 |
.mapToObj(i -> new LinkedBlockingQueue<Runnable>())
|
12 |
.collect(Collectors.toList());
|
13 |
List<ThreadPoolExecutor> list = queues
|
15 |
.map(q -> new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, q))
|
16 |
.collect(Collectors.toList());
|
17 |
this .threadPools = new CopyOnWriteArrayList<>(list);
|
18 |
metricRegistry.register(MetricRegistry.name(ProjectionMetrics. class , "queue" ), (Gauge<Double>) this ::averageQueueLength);
|
最後更新:2017-05-19 11:01:38