979
技術社區[雲棲]
論文摘抄 - FlumeJava
本摘抄不保證論文完整性和理解準確性
原始的MapReduce,分Map,Shuffle,Reduce。Map裏包括shards。Shuffle理解為groupByKey的事情。Reduce裏包括Combiner,可以定義Sharder來控製key怎麼和Reducer worker對應起來。
核心抽象和基本原語
PCollection<T>是一個不可變的bag,可以是有序的(Sequence),也可以是無序的(Collection)。PCollection可以來自於內存裏的Java PCollection對象,也可以讀取自文件。
PTable<K, V>,可以看成PCollection<Pair<K, V>>,不可變無序multi-map。
第一個原語是parallelDo(),把PCollection<T>變成新的PCollection<S>,處理方式定義在DoFn<T, S>裏。emitFn是call-back,傳給用戶的process(…),使用emitFn.emit(outElem)發射出去。parallelDo()可以在map或reduce中使用,DoFn不應該使用閉包外全局的變量,(inline function)純操作自己的inputs。
第二個原語是groupByKey(),把PTable<K, V>轉變成PTable<K,Collection<V>>,
第三個原語是combineValues(),接收input為PTable<K,Collection<V>>和一個V的符合結合律的方法,返回PTable<K, V>。
第四個原語是flatten(),接收一個PCollection<T>的list,返回一個PCollection<T>
衍生原語(Derived Operations)
count(),接收PCollection<T>,返回PTable<T, Integer>
實現方式為parallelDo(),groupByKey()和combineValues()
join(),接收PTable<K, V1>,PTable<K, V2>,返回PTable<K,Tuple2<Collection<V1>, Collection<V2>>
實現方式為,第一步,使用parallelDo()把每個input PTable<K, Vi>變成通用的PTable<K, TaggedUnion2<V1,V2>>;第二步使用flattern來combine tables;第三步,使用groupByKey()作用於被扁平過了tables,產生PTable<K,Collection<TaggedUnion2<V1, V2>>>
top(),接收比較函數和N,
實現方式為parallelDo(),groupByKey()和combineValues()
延遲分析(Deffered Evaluation)
PCollection對象有兩種狀態,defferred或materialized。
FlumeJava.run()真正觸發execution plan的物化/執行。
PObjects
PObject<T>用於存儲Java對象,物化過了之後可以使用getValue()方法獲得PObject的值。有點像Future。
operate()方法
優化器
parallelDoFusion(融合)
Producer-Consumer and Sibling Fusion,如下圖
大致是說,ABCD這幾種由同一份input產生的parallelDo,可以融合起來在一個parallelDo,即A+B+C+D,裏處理。一些中間結果也可以不要。
MapShuffleCombineReduce(MSCR) Operation
FlumeJava優化器的核心在於把ParallelDo,GroupByKey,CombineValues和Flattern的組合轉換成一個個單個的MapReduce。
MSCR是一個中間層的操作,有M個input channels(每個可以進行map操作),有R個Reduce channels(每個可以進行shuffle,或combine,或reduce操作)。單個input channal m,接收PCollection<Tm>作為輸入,執行R路output輸出的ParallelDo “map”操作,產生R個PTable<Kr, Vs> outputs。每個output channel r flatterns它的M個inputs,然後
a) 進行一次GroupByKey的“shuffle”,或CombineValues的“combine”,或Or-output的ParallelDo “reduce”,然後把結果寫出到Or-output PCollections
b) 把inputs直接寫出為outputs
前者這樣的output channel稱為”Grouping” channel,後者稱為”pass-through” channel。”pass-through” channel允許map的output成為一個MSCR操作的輸出。