MillWheel: Fault-Tolerant Stream Processing at Internet Scale
為什麼要做MillWheel?
因為當前的其他的流式係統,無法同時滿足 fault tolerance, versatility, and scalability 的需求。
Spark Streaming [34] and Sonora [32] do excellent jobs of efficient checkpointing, but limit the space of operators that are available to user code.
S4 [26] does not provide fully fault-tolerant persistent state
Storm’s [23] exactly-once mechanism for record delivery, Trident [22], requires strict transaction ordering to operate.
Streaming SQL systems [1] [2] [5] [6] [21] [24] provide succinct and simple solutions to many streaming problems, but intuitive state abstractions and complex application logic (e.g. matrix multiplication) are more naturally expressed using the operational flow of an imperative language rather than a declarative language like SQL.
Note,imperative language, declarative language, function language。refer:https://stackoverflow.com/questions/1784664/what-is-the-difference-between-declarative-and-imperative-programming
具體需求
先描述應用場景,
Google’s Zeitgeist pipeline is used to track trends in web queries.
This pipeline ingests a continuous input of search queries and performs anomaly detection, outputting queries which are spiking or dipping as quickly as possible.
Google’s Zeitgeist 這個服務用於 track web 查詢的趨勢的,對持續的 search queries 進行 anomaly detection,盡可能快的發現spiking or dipping。
架構如下,
Our approach is to bucket records into one-second intervals and to compare the actual traffic for each time bucket to the expected traffic that the model predicts.
If these quantities are consistently different over a non-trivial number of buckets, then we have high confidence that a query is spiking or dipping.
In parallel, we update the model with the newly received data and store it for future use.
場景中關鍵的幾點,
Persistent Storage: It is important to note that this implementation requires both short- and long-term storage.
A spike may only last a few seconds, and thus depend on state from a small window of time, whereas model data can correspond to months of continuous updates.
LowWatermarks:
在現實的場景中,網絡環境是很複雜的,當一個時間點出現dipping的時候,有兩種可能性,
真正的dipping,這個點query確實變少了
由於網絡或其他問題,數據被delay了,還沒有收到
那麼自然產生的問題,我如何知道這個時間點的數據是否到齊?
MillWheel addresses this by providing a low watermark for incoming data for each processing stage (e.g. Window Counter, Model Calculator), which indicates that all data up to a given timestamp has been received.
MillWheel提供 low watermark機製來告訴你什麼時候數據會到齊。
當然low watermark往往也是啟發式得到的,其實並不能完美的解這個問題,隻能說如果過了 low watermark 還沒有數據來,我們有 high confidence 來說應該是沒有數據,而不是被delay
Duplicate Prevention: For Zeitgeist, duplicate record deliveries could cause spurious spikes.
我們要在平台層麵保證exactly-once
整理出的詳細需求如下:
• Data should be available to consumers as soon as it is published (i.e. there are no system-intrinsic barriers to ingesting inputs and providing output data). 比如micro-batch就是種 system-intrinsic barriers
• Persistent state abstractions should be available to user code, and should be integrated into the system’s overall consistency model.
• Out-of-order data should be handled gracefully by the system. 可以處理時間亂序的數據
• A monotonically increasing low watermark of data timestamps should be computed by the system. 係統會生成 low watermarker
• Latency should stay constant as the system scales to more machines. 保證 latency
• The system should provide exactly-once delivery of records. 保證 exactly-once 語義
SYSTEM OVERVIEW
Abstractly, inputs and outputs in MillWheel are represented by (key, value, timestamp) triples.
Computations,等同於Bolt
Application logic lives in computations, which encapsulate arbitrary user code.
Keys
Keys are the primary abstraction for aggregation and comparison between different records in MillWheel.
For every record in the system, the consumer specifies a key extraction function, which assigns a key to the record.
注意在,millwhell中,相同key的record是被串行處理的,隻有不同key的record才可以被並行處理
Streams,等同於Storm裏麵的流
Streams are the delivery mechanism between different computations in MillWheel.
Persistent State
In its most basic form, persistent state in MillWheel is an opaque byte string that is managed on a per-key basis.
The user provides serialization and deserialization routines (such as translating a rich data structure in and out of its wire format), for which a variety of convenient mechanisms (e.g. Protocol Buffers [13]) exist.
Persistent state is backed by a replicated, highly available data store (e.g. Bigtable [7] or Spanner [9]), which ensures data integrity in a way that is completely transparent to the end user.
Common uses of state include counters aggregated over windows of records and buffered data for a join.
這裏persistent state,可以認為是checkpoint,注意,MillWheel的checkpoint是 per-key basis的,可以在MillWheel起到很關鍵的作用
用戶需要提供序列號和反序列化的邏輯,這些checkpoint往往被存到像bigtable這樣的分布式存儲中
往往像有狀態的computation就需要存persistent state,比如基於窗口的聚合計數,或流join
Low Watermarks
對於computation,當給定low watermark,就不應該收到比它還早的數據
Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow.
min(oldest work of A, low watermark of C : C outputs to A)
oldest work of A,是A中最老的record的時間戳
而C是A的父節點,那麼A的low watermark不可能比C遲,因為A一定比C遲收到數據,所以A的low watermark一定是小於等於C的low watermark的
這樣遞歸的結果是,最終low watermark會取決於injector(即,源),而對於injector的input,肯定是外部係統比如kafka這樣的隊列,或文件係統,那麼injector怎麼知道它的low watermark
injector其實是不知道的,隻能做estimate,比如對於文件係統,可以以文件的create時間作為low watermark,文件裏麵一定不會有比create time更早的記錄
所以low watermark機製,是無法完美解這個問題的,都會有too fast,too late的問題
Timers,即trigger,解決when的問題
A simple implementation of dips in Zeitgeist would set a low watermark timer for the end of a given time bucket, and report a dip if the observed traffic falls well below the model’s prediction.
FAULT TOLERANCE
終於到了關鍵的地方了,
Delivery Guarantees
Exactly-Once Delivery
MillWheel是如何保證exactly-once語義的,
Upon receipt of an input record for a computation, the MillWheel framework performs the following steps:
• The record is checked against deduplication data from previous deliveries; duplicates are discarded.
• User code is run for the input record, possibly resulting in pending changes to timers, state, and productions.
• Pending changes are committed to the backing store.
• Senders are ACKed.
• Pending downstream productions are sent.
兩點需要注意的,
一是,它會去重,這樣可以保證exactly-once,如何去後麵說
其實一般的streaming係統都可以做到at-least once,所以做到exactly-once,隻需要做到去重即可
你可以依賴外部存儲,或者係統裏麵直接做掉
二是,對中間狀態做checkpoint
MillWheel如何在係統層麵做去重,
The system assigns unique IDs to all records at production time.
We identify duplicate records by including this unique ID for the record in the same atomic write as the state modification.
If the same record is later retried, we can compare it to the journaled ID, and discard and ACK the duplicate.
通過為每個record增加unique id
為了快速知道這個id是否出現過,使用bloom filter
Since we cannot necessarily store all duplication data in-memory, we maintain a Bloom filter of known record fingerprints, to provide a fast path for records that we have provably never seen before.
如果filter miss,我們需要讀後端存儲才能判斷是否是duplicate
In the event of a filter miss, we must read the backing store to determine whether a record is a duplicate.
這個怎麼實現?怎麼判斷是filter miss,還是新出現的record?出現duplicate畢竟不是經常發生的
為了防止record id爆掉,需要回收,有個問題?回收後,bloom filter需要重新初始化嗎,還是說bloom filter本身是支持過期的
Record IDs for past deliveries are garbage collected after MillWheel can guarantee that all internal senders have finished retrying.
Strong Productions
We checkpoint produced records before delivery in the same atomic write as state modification.
We call this pattern of checkpointing before record production strong productions.
這部用以保證at-least once,storm是通過spout超時重發的,後續的係統很少繼續沿用這個方式,因為這樣做周期太長
Millwheel或Linkedin的Samza都是采用local重發的方式,比如MillWheel,在produce record之前,會把checkpoint和狀態修改放在一個原子寫中做掉,checkpoint往往寫入bigtable中
當然下層節點,成功處理完該record,會send回acker,這時,我們可以把checkpoint刪除
如果這時crash,我們可以come back時,從checkpoint中讀出record,重新produce
如果之前不做checkpoint,當come back時,會以當前狀態(比如計數,有可能新到數據已產生更新)來produce,這樣就會產生不一致
另外區別於persistent state,這裏checkpoint特指produced record
Weak Productions and Idempotency
MillWheel通過 record id 和 Strong Production 來保證 exactly-once 語義,這其中也是有很多代價的,有些場景不需要保證exactly-once,at-least onces就足夠了,比如很多無狀態的場景
所以他提供Weak production來滿足這種需求。
不需要保證exactly-once,就不去重就ok了,disabling exactly-once can be accomplished simply by skipping the deduplication pass
是不是checkpoint produced records也可以完全去掉了,直接produce,然後等ack,失敗或超時就重發,那這樣就和storm一樣了,鏈路長的時候,周期會很長
MillWheel提供的優化就是 weak productions,
比如,對於A-》B-》C的鏈路
B-》C的produce,超過1s還沒有返回
我們這時候,對該produce進行checkpoint,然後直接ack A,避免A繼續等待
當然B會繼續等待,直到收到C的ack,才將該checkpoint刪除
如果此時B Crash,那麼當B restart,他會自己去replay上次的produce,對A透明,直到成功,才會刪除checkpoint
State Manipulation
In implementing mechanisms to manipulate user state in MillWheel, we discuss both the “hard” state that is persisted to our backing store and the “soft” state which includes any in-memory caches or aggregates.
We must satisfy the following user-visible guarantees:
• The system does not lose data.
• Updates to state must obey exactly-once semantics.
• All persisted data throughout the system must be consistent at any given point in time.
• Low watermarks must reflect all pending state in the system.
• Timers must fire in-order for a given key.
首先,為了避免不一致,所有per-key的操作,包含persist,checkpoint,狀態更新,都會在一個原子寫中完成
To avoid inconsistencies in persisted state (e.g. between timers, user state, and production checkpoints), we wrap all per-key updates in a single atomic operation.
再者,對於僵屍writer或由於網絡延遲導致的延遲寫,采用sequencer的方式,每個寫都有sequence id,過期的寫請求會被丟棄;並且在新的workers啟動時需要invalid之前的sequencers
As work may shift between machines (due to load balancing, failures, or other reasons) a major threat to our data consistency is the possibility of zombie writers and network remnants issuing stale writes to our backing store.
To address this possibility, we attach a sequencer token to each write, which the mediator of the backing store checks for validity before allowing the write to commit.
New workers invalidate any extant sequencers before starting work, so that no remnant writes can succeed thereafter.
所以,對於MillWheel,對於一個給定的key,隻能有一個worker writer有權限執行寫操作,這個是MillWheel保證寫一致性的關鍵
Thus, we can guarantee that, for a given key, only a single worker can write to that key at a particular point in time.
In order to quickly recover from unplanned process failures, each computation worker in MillWheel can checkpoint its state at an arbitrarily fine granularity (in practice, sub-second or per-record granularity is standard, depending on input volume). Our use of always-consistent soft state allows us to minimize the number of occasions when we must scan these checkpoints to specific cases – machine failures or load-balancing events.
When we do perform scans, these can often be asynchronous, allowing the computation to continue processing input records while the scan progresses.
最後更新:2017-04-07 21:05:50