閱讀876 返回首頁    go iPhone_iPad_Mac_apple


小規模的流處理框架.Part 2: RxJava 1.x/2.x

part 1: thread pools中,我們設計並實現了一個相對簡單的實時處理events的係統。在閱讀本文之前你應該確保已經讀懂了Part1的那篇文章,這裏重新闡述一遍係統的設計要求:

係統能夠每秒處理1000個任務,每一個Event至少有2個屬性:

  • clientId-我們希望每一秒有多個任務是在同一個客戶端下處理的(譯者:不同的clientId對應不同的ClientProjection,即對應不同的一係列操作)
  • UUID-全局唯一的

消費一個任務要花費10毫秒,為這樣的流設計一個消費者:

  1. 能夠實時的處理任務
  2. 和同一個客戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientId的任務序列使用並行處理
  3. 如果10秒內出現了重複的UUID,丟棄它。假設10秒後不會重複

到目前為止我們提出了線程池和共享緩存結合的設計,而在這篇文章中我們會使用RxJava進行實現。開始之前,我從沒有提到EventStream是如何實現的,僅僅是給出了API:

1 interface EventStream {
2  
3     void consume(EventConsumer consumer);
4  
5 }

事實上為了能夠進行測試,我建立了一個RxJava流,它所有的行為都符合設計要求:

01 @Slf4j
02 class EventStream {
03  
04     void consume(EventConsumer consumer) {
05         observe()
06             .subscribe(
07                 consumer::consume,
08                 e -> log.error("Error emitting event", e)
09         );
10     }
11  
12     Observable<Event> observe() {
13         return Observable
14                 .interval(1, TimeUnit.MILLISECONDS)
15                 .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
16                 .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
17                 .flatMap(this::occasionallyDuplicate, 100)
18                 .observeOn(Schedulers.io());
19     }
20  
21     private Observable<Event> occasionallyDuplicate(Event x) {
22         final Observable<Event> event = Observable.just(x);
23         if (Math.random() >= 0.01) {
24             return event;
25         }
26         final Observable<Event> duplicated =
27                 event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
28         return event.concatWith(duplicated);
29     }
30  
31 }

雖然你們沒必要明白這個流模擬器是怎麼工作的,但它的工作過程相當有趣。首先我們使用interval()產生一個每毫秒輸出一個Long型值(0,1,2)的穩定流(這是因為設計要求係統每秒能處理1000個event)。然後我們使用delay()對每個event進行0到1000微秒內的隨機延遲,在這之後events出現的時機就變得不可預測,就更符合真實情況。最終我們使用map()將每個Long型值映射到一個隨機的Event上,每個Event都包含一個1000到1100(inclusive-exclusive)之間的clientId。
最後一點就有趣了,我們想模擬隨機的重複事件。為了做到這點,我們使用flatMap()將每個event映射到自身(99%情況下)。然而在剩下的1%情況中,我們將event返回兩次,第二次出現的時間延遲了10ms到5s。實際應用時,重複的event與第一次出現的event之間會相隔幾百個其他的event,這就使得流的行為更加符合真實情況。
有兩種方法可以與EventStream進行交互-基於回調的consume()和基於流的observer()。我們可以利用Observable快速地建立處理管道,這種方法的功能和part1中的非常的像但更加簡單。

Missing backpressure

首先利用RxJava實現最初的方案非常簡短:

01 EventStream es = new EventStream();
02 EventConsumer clientProjection = new ClientProjection(
03         new ProjectionMetrics(
04                 new MetricRegistry()));
05  
06 es.observe()
07         .subscribe(
08                 clientProjection::consume,
09                 e -> log.error("Fatal error", e)
10         );

(ClientProjection,ProjectionMetrics等來自於part1).使用以上的代碼幾乎會立刻拋出MissingBackpressureException,這也是預料之中的。你們記得嗎,我們在part1中最初的方案會運行的越來越慢是因為處理event的潛伏期越來越長。RxJava會盡量避免這種情況,而且也會避免隊列溢出。之所以會拋出MissingBackpressureException是因為消費者(ClientProjection)沒有能力實時地處理event。這是一個fail-fast機製。聰明的做法就是將處理的過程移到一個獨立的線程池,就像之前那樣,但這次要使用RxJava來實現:

01 EventStream es = new EventStream();
02 EventConsumer clientProjection = new FailOnConcurrentModification(
03         new ClientProjection(
04                 new ProjectionMetrics(
05                         new MetricRegistry())));
06  
07 es.observe()
08         .flatMap(e -> clientProjection.consume(e, Schedulers.io()))
09         .window(1, TimeUnit.SECONDS)
10         .flatMap(Observable::count)
11         .subscribe(
12                 c -> log.info("Processed {} events/s", c),
13                 e -> log.error("Fatal error", e)
14         );

EventConsumer中添加了一個輔助方法,它能夠利用提供的Scheduler異步地處理event:

01 @FunctionalInterface
02 interface EventConsumer {
03     Event consume(Event event);
04  
05     default Observable<Event> consume(Event event, Scheduler scheduler) {
06         return Observable
07                 .fromCallable(() -> this.consume(event))
08                 .subscribeOn(scheduler);
09     }
10  
11 }

使用flatMap()在一個獨立的Scheduler.io()中處理event,這樣每一個消費過程都是異步調用的。這次event的處理已經符合實時性的要求了,但還有一個更大的問題。我使用FailOnConcurrentModification對ClientProjection進行包裝是有原因的。events的處理都是彼此獨立的,所以對於同一個clientId有可能會並發地處理兩個event,這樣並不好。幸運的是比起使用線程來說,用RxJava解決這個問題要更加簡單:

01 es.observe()
02         .groupBy(Event::getClientId)
03         .flatMap(byClient -> byClient
04                 .observeOn(Schedulers.io())
05                 .map(clientProjection::consume))
06         .window(1, TimeUnit.SECONDS)
07         .flatMap(Observable::count)
08         .subscribe(
09                 c -> log.info("Processed {} events/s", c),
10                 e -> log.error("Fatal error", e)
11         );

上麵的代碼改動的地方隻有一點點。首先我們依據clientId對event進行分組,將單一的Observable流分割成多個流,每個名為byClient的子流都代表著擁有相同clientId的event。現在如果我們對子流進行映射,我們能夠確定有相同clientId的event是絕不會並發地被處理的。輸出流是惰性的,所以我們必須對流調用subscribe。與其對每一個event單獨地調用subscribe,我們選擇將每一秒內處理的event收集起來並對其計數。這樣一來每秒我們接收到的就是一個Integer類型的event,它代表著每秒內我們處理的event數量。

Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state

現在我們必須除去重複的UUID,最簡單也是最笨的做法就是利用全局狀態。我們能夠簡單地利用filter()在cache中查找重複的event:

01 final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
02         .expireAfterWrite(10, TimeUnit.SECONDS)
03         .build();
04  
05 es.observe()
06         .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
07         .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
08         .subscribe(
09                 clientProjection::consume,
10                 e -> log.error("Fatal error", e)
11         );

如果你想要監控上麵代碼的效果可以簡單的加入一個度量器:

01 Meter duplicates = metricRegistry.meter("duplicates");
02  
03 es.observe()
04         .filter(e -> {
05             if (seenUuids.getIfPresent(e.getUuid()) != null) {
06                 duplicates.mark();
07                 return false;
08             else {
09                 return true;
10             }
11         })

在操作符內部訪問全局的、尤其是可變的狀態時是非常危險的,並且這樣會破壞RxJava唯一的目的-簡單並發。雖然我們使用的是Guava中線程安全的Cache,但在很多情況下你很容易會忘記這個全局共享的可變狀態是可以被多個線程訪問的,如果你發現你在操作符鏈中修改外部的一些變量的話,那就要非常小心了。

Custom distinct() operator in RxJava 1.x

RxJava 1.x有一個distinct()運算函數,它大概可以做如下的工作:

1 es.observe()
2         .distinct(Event::getUuid)
3         .groupBy(Event::getClientId)

不幸的是distinct()會在內部將所有的UUID都存儲在一個不斷增長的HashSet裏麵,但我們隻關心10s內的重複事件。通過複製粘貼DistinctOperator的實現,我創造了DistinctEvent操作符,它利用了Guava的cache僅僅隻存儲10s內的UUID。我故意將Event硬編碼在這個操作符內而不是將它寫成一般性的就是為了讓代碼更易懂:

01 class DistinctEvent implements Observable.Operator<Event, Event> {
02     private final Duration duration;
03  
04     DistinctEvent(Duration duration) {
05         this.duration = duration;
06     }
07  
08     @Override
09     public Subscriber<? super Event> call(Subscriber<? super Event> child) {
10         return new Subscriber<Event>(child) {
11             final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
12                     .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
13                     .<UUID, Boolean>build().asMap();
14  
15             @Override
16             public void onNext(Event event) {
17                 if (keyMemory.put(event.getUuid(), true) == null) {
18                     child.onNext(event);
19                 else {
20                     request(1);
21                 }
22             }
23  
24             @Override
25             public void onError(Throwable e) {
26                 child.onError(e);
27             }
28  
29             @Override
30             public void onCompleted() {
31                 child.onCompleted();
32             }
33  
34         };
35     }
36 }

自定義的操作符使用起來非常簡單,實現如下:

01 es.observe()
02         .lift(new DistinctEvent(Duration.ofSeconds(10)))
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .window(1, TimeUnit.SECONDS)
09         .flatMap(Observable::count)
10         .subscribe(
11                 c -> log.info("Processed {} events/s", c),
12                 e -> log.error("Fatal error", e)
13         );

事實上如果我們跳過每秒的logging實現可以變得更加簡單:

01 es.observe()
02         .lift(new DistinctEvent(Duration.ofSeconds(10)))
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .subscribe(
09                 e -> {},
10                 e -> log.error("Fatal error", e)
11         );

這個方案比之前的基於線程池和裝飾者模式的要更加簡短,其中唯一麻煩的部分就是在自定義的操作符中當存儲了太多的UUID之後會造成內存泄漏,幸好RxJava 2能解決這個問題。

RxJava 2.x and more powerful built-in distinct()

distinct()允許使用自定義的Collection而不必使用內置的HashSet(感覺2.x中可以使用自定義的數據結構後,1.x中的DistinctEvent就完全沒必要了)。不管你是否相信,依賴倒置不僅僅隻出現在Spring框架或者Java EE中。當一個庫允許你提供它內部數據結構的自定義實現時,這就已經是依賴反轉。首先我創造了一個輔助方法,它能夠建立Set,Set由Map提供依賴,而Map則由Cache提供依賴。這就像委托一樣!

1 private Set<UUID> recentUuids() {
2     return Collections.newSetFromMap(
3             CacheBuilder.newBuilder()
4                     .expireAfterWrite(10, TimeUnit.SECONDS)
5                     .<UUID, Boolean>build()
6                     .asMap()
7     );
8 }

有了這個方法之後,我們就能利用以下的代碼實現整個任務:

01 es.observe()
02         .distinct(Event::getUuid, this::recentUuids)
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .subscribe(
09                 e -> {},
10                 e -> log.error("Fatal error", e)
11         );

這段代碼是如此的優雅、簡單、清晰!它的大致流程如下:

  • observe一個event流
  • 消除重複的UUID
  • 依據clientId對event分組
  • 對每一個client有序地處理event

希望你能喜歡這些方案,並能將之運用到你的日常生活中去。

See also:

最後更新:2017-05-19 10:31:13

  上一篇:go  《LOG4J2官方文檔》Chainsaw 可以自動處理你的日誌文件(通知appender的配置)
  下一篇:go  從單例模式到Happens-Before