343
技術社區[雲棲]
cascading基本概念
這是cascading官方userguide的中文翻譯,其中有些概念看過一段時間又忘了,在此做個記錄,一是方便自己複習,二是方便新手。
關於cascading我不想多說了,你如果寫過原生mapreduce程序,然後再接觸cascading,你會發現cascading Great job。它對Map和Reduce進行了高度抽象,用Tap、Pipe、Function、Operation這些概念替代了原有的Map和Reduce,可以很舒服的開發hadoop程序,但是這些概念過了一個來月我又忘的差不多了,所以有了這篇翻譯。
對於一些不好翻譯的單詞我這裏直接給出了原單詞,所以說看資料還是英文原版的好,看翻譯的書籍有很大風險,萬一別人翻譯錯了,那你這個newbie怎麼能懂!
下麵的翻譯來自cascading-userguide/ch03s03.html,下載地址:https://docs.cascading.org/cascading/2.5/userguide/html/userguide.zip
首先是一段關於pipe assembly的代碼例子
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); join = new GroupBy( join ); join = new Every( join, new SomeAggregator() ); // the tail of the assembly join = new Each( join, new SomeFunction() );
通用流模式Common Stream Pattern
-
Split 一分多
-
Merge 多合一stream擁有相同fields常見的有:Merge、GroupBy
-
Join 和sqljoin一樣,把擁有不同列的stream按照相同列join起來。HashJoin、CoGroup
除了split、merge、join以外,管道組裝還有examine、filter、organize、transform這些操作,為了方便處理,在元組中的每個值都被賦予一個fieldname,就像數據庫中的列名一樣,這樣他們就能很方便的被引用與選擇。
術語介紹:
-
Operation(cascading.operation.Operation)接受一個參數元組Tuple,輸出零或多個結果元組。Cascading提供了幾個常用的Operation,開發者也可以自己實現。
-
Tuple,在Cascading中,數據被看作元組(cascading.tuple.Tuple)的流,元組由fields構成,元組和數據庫中的記錄或行類似。一個元組是一組值的數組,每個值可以為任何java.lang.Object。
-
Fields(cascading.tuple,Fields)被用於聲明或引用元組中的某一列,fields可以表示為像“firstname”、“birthdate”的字符串,也可以是整數值(0表示第一個,-1表示最後一個),或者還可以是預定義的值(Fields.All、Fields.RESULT、Fields.REPLACE等)
Pipe類是用於實例化與命名一個pipe,Pipe的名字可以被planner用於綁定到tap上,作為source或者sink使用。(第三種選擇是綁定pipebranch到一個tap,作為一個trap,這在高級主題在詳細討論)
SubAssembly子類是一個特殊的pipe類型,他用於嵌套一組可重用的pipeassemblies,這樣可以方便用於更大範圍內的pipeassembly。
其餘六種pipe類型:
-
Each這種pipe基於tuple的內容做處理,包括analyze、transform、filter。還可以用Each類splitor branch一個流,達到這種效果你僅僅需要把Each的輸出定向到一個不同的pipe或sink即可。
-
Merge這個pipe和Each一樣都可以把一個流split成兩個,Merge還可以把多個流合並成一個,前途是這些流具有相同的fields。當不需要grouping(noaggregator or buffer操作會被使用時)時使用Merge,Merge比GroupBy快。
-
GroupBy基於特定field,把一個流中的tuples分組。如果傳入多個stream,它在分組之前先進行merge操作,在進行merge時,GroupBy要求多個stream必須用相同的fieldstructure。分組的目的通常是為Every管道準備一個處理流,Every管道可以針對groups進行aggregator和buffer操作,比如counting,totalling,averaging。我們應該明確,grouping這裏意味著基於某一特地field的值進行分組(byGroupBy或CoGroup),比如按照timestamp或zipcode進行分組,在每一分組中,元組的順序是隨機的,不過你也可以指定一個次sortkey,但是通常情況下,是不必要的,這隻會增加運行時間。
-
Every用於處理分組後的流。隻能用於GroupBy或Cogroup的輸出流,不能處理Each、Merge、HashJoin的輸出流。
-
CoGroup對多個流進行join,和SQLjoin類似。它隻基於某特定field進行group,產出一個結果流。如果在多流中包含有相同的field名,它們必須被重命名來避免結果tuple中field的衝突
-
HashJoin和Cogroup一樣,用於對多個流的join。但是它更適用於不需要grouping的場合下,這時它的性能更優。
Pipe類型對比表
Pipe type |
Purpose |
Input |
Output |
|
instantiate a pipe; create or name a branch |
name |
a (named) pipe |
|
create nested subassemblies |
|
|
|
apply a filter or function, or branch a stream |
tuple stream (grouped or not) |
a tuple stream, optionally filtered or transformed |
|
merge two or more streams with identical fields |
two or more tuple streams |
a tuple stream, unsorted |
|
sort/group on field values; optionally merge two or morestreams with identical fields |
one or more tuple streams with identical fields |
a single tuple stream, grouped on key field(s) with optionalsecondary sort |
|
apply aggregator or buffer operation |
grouped tuple stream |
a tuple stream plus new fields with operation results |
|
join 1 or more streams on matching field values |
one or more tuple streams |
a single tuple stream, joined on key field(s) |
|
join 1 or more streams on matching field values |
one or more tuple streams |
a tuple stream in arbitrary order |
Each、Every的語法如下:
new Each( previousPipe, argumentSelector, operation, outputSelector ) new Every( previousPipe, argumentSelector, operation, outputSelector )
這兩個pipe都有四個參數:
-
一個pipe實例
-
一個參數選擇器
-
一個Operation實例
-
一個輸出選擇器
Each與Every的主要不同在於Each用於處理單個元組,Every用於處理由GroupBy或CoGroup輸出的分組tuple,這限製了它們分別可以實施的操作、結果的輸出。
Each的操作可以是Functions和Filters的子類。比如你可以從日誌文件中解析出特定field;過濾掉除了HTTPGET以外的請求,把timestring替換成datefield。
Every的操作可以是Aggregators和Buffer的子類。比如你可以用Every去統計每天GET請求數,它會為每天輸出一個統計值。
大多數Operation子類都聲明了resultfield(就是上圖中的declaredfields),outputSelector指定輸出Tuple中的field,而輸出Tuple中的field來源於input的field和operation的結果這兩個方麵。如果outputSelector=Fields.ALL那麼輸出的Tuple就是input+result的數據merge後的結果。
對於Each來說argumentSelector默認為Fields.All,outputSelector默認為Fields.RESULT
對於Every來說Aggregator的result默認被追加到inputTuple上。比如,你在department域上做grouping,然後統計這個公寓的names,那麼結果fields會是["department","num_employees"]
當Every與Bufferoperation一起使用時,行為和Aggregator很不一樣,operationresult這次是和當前valuestuple關聯在一起,而不是當前groupingtuple。這就像Each與Function一起使用時一樣。這也許看起來不是很直觀,但這提供了很大的靈活性。換一個說法,bufferoperation的result沒有被追加到thecurrent keys being grouped on,是由buffer來決定是emit它們ifthey are relevant,而且對於Buffer來說,針對每個唯一的grouping有可能emit多個resultTuple。也就是說,一個Buffer可能或者可能不和Aggregator行為一致,但是Aggregator隻是一個特殊的Buffer。
最後更新:2017-04-03 12:54:03