閱讀343 返回首頁    go 技術社區[雲棲]


cascading基本概念

這是cascading官方userguide的中文翻譯,其中有些概念看過一段時間又忘了,在此做個記錄,一是方便自己複習,二是方便新手。

關於cascading我不想多說了,你如果寫過原生mapreduce程序,然後再接觸cascading,你會發現cascading Great job。它對Map和Reduce進行了高度抽象,用Tap、Pipe、Function、Operation這些概念替代了原有的Map和Reduce,可以很舒服的開發hadoop程序,但是這些概念過了一個來月我又忘的差不多了,所以有了這篇翻譯。

對於一些不好翻譯的單詞我這裏直接給出了原單詞,所以說看資料還是英文原版的好,看翻譯的書籍有很大風險,萬一別人翻譯錯了,那你這個newbie怎麼能懂!

下麵的翻譯來自cascading-userguide/ch03s03.html,下載地址:https://docs.cascading.org/cascading/2.5/userguide/html/userguide.zip

首先是一段關於pipe assembly的代碼例子

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

join = new GroupBy( join );

join = new Every( join, new SomeAggregator() );

// the tail of the assembly
join = new Each( join, new SomeFunction() );


通用流模式Common Stream Pattern

  • Split 一分多

  • Merge 多合一stream擁有相同fields常見的有:MergeGroupBy

  • Join sqljoin一樣,把擁有不同列的stream按照相同列join起來。HashJoinCoGroup

除了splitmergejoin以外,管道組裝還有examinefilterorganizetransform這些操作,為了方便處理,在元組中的每個值都被賦予一個fieldname,就像數據庫中的列名一樣,這樣他們就能很方便的被引用與選擇。

術語介紹:

  1. Operationcascading.operation.Operation)接受一個參數元組Tuple,輸出零或多個結果元組。Cascading提供了幾個常用的Operation,開發者也可以自己實現。

  2. Tuple,在Cascading中,數據被看作元組(cascading.tuple.Tuple)的流,元組由fields構成,元組和數據庫中的記錄或行類似。一個元組是一組值的數組,每個值可以為任何java.lang.Object

  3. Fieldscascading.tuple,Fields)被用於聲明或引用元組中的某一列,fields可以表示為像“firstname”、“birthdate”的字符串,也可以是整數值(0表示第一個,-1表示最後一個),或者還可以是預定義的值(Fields.AllFields.RESULTFields.REPLACE等)


Pipe類是用於實例化與命名一個pipePipe的名字可以被planner用於綁定到tap上,作為source或者sink使用。(第三種選擇是綁定pipebranch到一個tap,作為一個trap,這在高級主題在詳細討論)

SubAssembly子類是一個特殊的pipe類型,他用於嵌套一組可重用的pipeassemblies,這樣可以方便用於更大範圍內的pipeassembly

其餘六種pipe類型:

  1. Each這種pipe基於tuple的內容做處理,包括analyzetransformfilter。還可以用Eachsplitor branch一個流,達到這種效果你僅僅需要把Each的輸出定向到一個不同的pipesink即可。

  2. Merge這個pipeEach一樣都可以把一個流split成兩個,Merge還可以把多個流合並成一個,前途是這些流具有相同的fields。當不需要groupingnoaggregator or buffer操作會被使用時)時使用MergeMergeGroupBy快。

  3. GroupBy基於特定field,把一個流中的tuples分組。如果傳入多個stream,它在分組之前先進行merge操作,在進行merge時,GroupBy要求多個stream必須用相同的fieldstructure。分組的目的通常是為Every管道準備一個處理流,Every管道可以針對groups進行aggregatorbuffer操作,比如countingtotallingaveraging。我們應該明確,grouping這裏意味著基於某一特地field的值進行分組(byGroupByCoGroup,比如按照timestampzipcode進行分組,在每一分組中,元組的順序是隨機的,不過你也可以指定一個次sortkey,但是通常情況下,是不必要的,這隻會增加運行時間。

  4. Every用於處理分組後的流。隻能用於GroupByCogroup的輸出流,不能處理EachMergeHashJoin的輸出流。

  5. CoGroup對多個流進行join,和SQLjoin類似。它隻基於某特定field進行group,產出一個結果流。如果在多流中包含有相同的field名,它們必須被重命名來避免結果tuplefield的衝突

  6. HashJoinCogroup一樣,用於對多個流的join。但是它更適用於不需要grouping的場合下,這時它的性能更優。

Pipe類型對比表


Pipe type

Purpose

Input

Output

Pipe

instantiate a pipe; create or name a branch

name

a (named) pipe

SubAssembly

create nested subassemblies

 

 

Each

apply a filter or function, or branch a stream

tuple stream (grouped or not)

a tuple stream, optionally filtered or transformed

Merge

merge two or more streams with identical fields

two or more tuple streams

a tuple stream, unsorted

GroupBy

sort/group on field values; optionally merge two or morestreams with identical fields

one or more tuple streams with identical fields

a single tuple stream, grouped on key field(s) with optionalsecondary sort

Every

apply aggregator or buffer operation

grouped tuple stream

a tuple stream plus new fields with operation results

CoGroup

join 1 or more streams on matching field values

one or more tuple streams

a single tuple stream, joined on key field(s)

HashJoin

join 1 or more streams on matching field values

one or more tuple streams

a tuple stream in arbitrary order



EachEvery的語法如下:

new Each( previousPipe, argumentSelector, operation, outputSelector )
new Every( previousPipe, argumentSelector, operation, outputSelector )

這兩個pipe都有四個參數:

  • 一個pipe實例

  • 一個參數選擇器

  • 一個Operation實例

  • 一個輸出選擇器

EachEvery的主要不同在於Each用於處理單個元組,Every用於處理由GroupByCoGroup輸出的分組tuple,這限製了它們分別可以實施的操作、結果的輸出。

Each的操作可以是FunctionsFilters的子類。比如你可以從日誌文件中解析出特定field;過濾掉除了HTTPGET以外的請求,把timestring替換成datefield

Every的操作可以是AggregatorsBuffer的子類。比如你可以用Every去統計每天GET請求數,它會為每天輸出一個統計值。



大多數Operation子類都聲明了resultfield(就是上圖中的declaredfields),outputSelector指定輸出Tuple中的field,而輸出Tuple中的field來源於inputfieldoperation的結果這兩個方麵。如果outputSelector=Fields.ALL那麼輸出的Tuple就是input+result的數據merge後的結果。

對於Each來說argumentSelector默認為Fields.AlloutputSelector默認為Fields.RESULT

對於Every來說Aggregatorresult默認被追加到inputTuple上。比如,你在department域上做grouping,然後統計這個公寓的names,那麼結果fields會是["department","num_employees"]

EveryBufferoperation一起使用時,行為和Aggregator很不一樣,operationresult這次是和當前valuestuple關聯在一起,而不是當前groupingtuple。這就像EachFunction一起使用時一樣。這也許看起來不是很直觀,但這提供了很大的靈活性。換一個說法,bufferoperationresult沒有被追加到thecurrent keys being grouped on,是由buffer來決定是emit它們ifthey are relevant,而且對於Buffer來說,針對每個唯一的grouping有可能emit多個resultTuple。也就是說,一個Buffer可能或者可能不和Aggregator行為一致,但是Aggregator隻是一個特殊的Buffer



最後更新:2017-04-03 12:54:03

  上一篇:go Ext4.0中window窗體使用詳解(常用屬性)
  下一篇:go Unable to resolve target 'android-i'