Pig源碼分析: 簡析執行計劃的生成
摘要
本文通過跟代碼的方式,分析從輸入一批Pig-latin到輸出物理執行計劃(與launcher引擎有關,一般是MR執行計劃,也可以是Spark RDD的執行算子)的整體流程。
不會具體涉及AST如何解析、如何使用了Anltr、邏輯執行計劃如何映射、邏輯執行計劃如何優化、MR執行計劃如何切分為MR Job,而是從輸入一批Pig DSL到待執行的真正執行計劃的關鍵變化步驟(方法和類)。
執行計劃完整解析
入口處書Main類的main函數
/** * The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate * for executing Jar files. Warning, this method calls System.exit(). * * @param args * -jar can be used to add additional jar files (colon separated). - will start a * shell. -e will execute the rest of the command line as if it was input to the * shell. * @throws IOException */ public static void main(String args[]) { // add win HADOOP_HOME // make sure you have "winutils.exe" under /bin // if not, download one from https://github.com/srccodes/hadoop-common-2.2.0-bin/tree/master/bin // entrance for local debug // better try: -x spark -e cmds System.exit(run(args, null)); }
Main -> GruntParser,這是第一步。
Main首先進行一些參數初始化(啟動模式、輸入類型判斷、初始化類等等),然後借助GruntParser解析輸入的pig-latin腳本

grunt.exec()之後的下一個關鍵步驟是進入GruntParser的解析。
GruntParser裏parse()會依賴PigScriptParser.jj文件,具體代碼跟不進去,最終生成的是語法樹。
在出現Dump操作之後,進入GruntParser的processDump方法。
這步結束之後是完成了語法層麵的解析。
PigServer-> QueryParserDriver,PigServer的parseQuery方法會進入QueryParserDriver的parse(query)方法,返回邏輯執行計劃

QueryParserDriver->LogicalPlanGenerator,生成邏輯執行計劃具體依靠的是LogicalPlanGenerator。
截止到這,是生成了邏輯執行計劃。
PigServer-> HExecutionEngine,接下來是優化邏輯執行計劃和生成物理執行計劃。

接下來HExecutionEngine.compile(LogicalPlan, Properties)先優化邏輯執行計劃。
HExecutionEngine在初始化的時候,會針對不同的情況組合不同的優化策略(disable某些規則)。

這個PlanOptimizer優化的過程在之前 邏輯執行計劃優化 的文檔裏已經有了具體過程了。
再生成物理執行計劃,
主要通過LogToPhyTranslcationVisitor內在walk遍曆邏輯執行計劃節點的時候,
針對不同的Op accept()時觸發對應LogToPhyTranslcationVisitor的多態visit(Op)方法,實現邏輯執行計劃步驟同物理執行計劃步驟的映射。
接下來就是launchPlan,根據配置啟動Launcher,如下是啟動了SparkLauncher

SparkLauncher裏對物理執行計劃的每個步驟進行了RDD操作的翻譯(直接對應算子),執行後以SparkStats返回,內含OutputInfo信息(包括結果文件地址等信息)。
關於Pig on Spark如何實現SparkLauncher和翻譯物理執行計劃算子,可以參考我的Github和這篇博文來閱讀代碼。
另一條路是啟動MapReduceLauncher,MRCompiler把物理執行計劃翻譯成MR執行計劃。主要的翻譯過程在compile(PO)方法裏。

MR執行計劃的翻譯主要有兩步,
首先是MRCompiler.compile(),把物理執行計劃翻譯到MapReduce執行計劃
其次JobControlCompiler.compile(),輸入MROperPlan,返回JobControl,這步控製MR Job隊列
以上是從輸入腳本到輸出執行計劃的整體流程。比較粗糙,但關鍵步驟和過程都有。
例子
為了直觀起見,我把自己跑的例子貼出來,包括pig-latin,邏輯執行計劃,物理執行計劃,MR執行計劃。
pig-latin
REGISTER D:/tutorial.jar; raw = LOAD 'D:/github/flare-spork/tutorial/data/excite-small.log' USING PigStorage('\t') AS (user, time, query); clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query); clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query; houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query; ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram; ngramed2 = DISTINCT ngramed1; hour_frequency1 = GROUP ngramed2 BY (ngram, hour); hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count; uniq_frequency1 = GROUP hour_frequency2 BY group::ngram; uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1)); uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean; filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0; ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score; dump ordered_uniq_frequency;
邏輯執行計劃
#----------------------------------------------- # New Logical Plan: #----------------------------------------------- ordered_uniq_frequency: (Name: LOSort Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: hour) | | | score:(Name: Project Type: double Uid: 23 Input: 0 Column: score) | |---filtered_uniq_frequency: (Name: LOFilter Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | (Name: GreaterThan Type: boolean Uid: 29) | | | |---score:(Name: Project Type: double Uid: 23 Input: 0 Column: score) | | | |---(Name: Constant Type: double Uid: 28) | |---uniq_frequency3: (Name: LOForEach Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | (Name: LOGenerate[false,false,false,false,false] Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: (*)) | | | | | group:(Name: Project Type: chararray Uid: 9 Input: 1 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score:(Name: Project Type: double Uid: 23 Input: 2 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count:(Name: Project Type: long Uid: 24 Input: 3 Column: (*)) | | | | | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean:(Name: Project Type: double Uid: 25 Input: 4 Column: (*)) | | | |---(Name: LOInnerLoad[1] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray) | | | |---(Name: LOInnerLoad[0] Schema: group#9:chararray) | | | |---(Name: LOInnerLoad[2] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double) | | | |---(Name: LOInnerLoad[3] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long) | | | |---(Name: LOInnerLoad[4] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | |---uniq_frequency2: (Name: LOForEach Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | | | (Name: LOGenerate[true,true] Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double) | | | | | group:(Name: Project Type: chararray Uid: 9 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ScoreGenerator) Type: bag Uid: 20) | | | | | |---hour_frequency2:(Name: Project Type: bag Uid: 16 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: group#9:chararray) | | | |---hour_frequency2: (Name: LOInnerLoad[1] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | |---uniq_frequency1: (Name: LOCogroup Schema: group#9:chararray,hour_frequency2#16:bag{#31:tuple(group::ngram#9:chararray,group::hour#6:chararray,count#14:long)}) | | | group::ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: group::ngram) | |---hour_frequency2: (Name: LOForEach Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | | | (Name: LOGenerate[true,false] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long) | | | | | group:(Name: Project Type: tuple Uid: 10 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid: 14) | | | | | |---ngramed2:(Name: Project Type: bag Uid: 11 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray)) | | | |---ngramed2: (Name: LOInnerLoad[1] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | |---hour_frequency1: (Name: LOCogroup Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray),ngramed2#11:bag{#30:tuple(user#1:bytearray,hour#6:chararray,ngram#9:chararray)}) | | | ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: ngram) | | | hour:(Name: Project Type: chararray Uid: 6 Input: 0 Column: hour) | |---ngramed2: (Name: LODistinct Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | |---ngramed1: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | | | (Name: LOGenerate[false,false,true] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | hour:(Name: Project Type: chararray Uid: 6 Input: 1 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.NGramGenerator) Type: bag Uid: 7) | | | | | |---query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[hour] Schema: hour#6:chararray) | | | |---(Name: LOInnerLoad[query] Schema: query#5:chararray) | |---houred: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,query#5:chararray) | | | (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,hour#6:chararray,query#5:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ExtractHour) Type: chararray Uid: 6) | | | | | |---time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*)) | | | | | query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[time] Schema: time#2:bytearray) | | | |---(Name: LOInnerLoad[query] Schema: query#5:chararray) | |---clean2: (Name: LOForEach Schema: user#1:bytearray,time#2:bytearray,query#5:chararray) | | | (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,time#2:bytearray,query#5:chararray) | | | | | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*)) | | | | | time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*)) | | | | | (Name: UserFunc(org.apache.pig.tutorial.ToLower) Type: chararray Uid: 5) | | | | | |---query:(Name: Project Type: bytearray Uid: 3 Input: 2 Column: (*)) | | | |---(Name: LOInnerLoad[user] Schema: user#1:bytearray) | | | |---(Name: LOInnerLoad[time] Schema: time#2:bytearray) | | | |---(Name: LOInnerLoad[query] Schema: query#3:bytearray) | |---clean1: (Name: LOFilter Schema: user#1:bytearray,time#2:bytearray,query#3:bytearray) | | ...
物理執行計劃
| |---ordered_uniq_frequency: POSort[bag]() - scope-70 | | | Project[chararray][0] - scope-68 | | | Project[double][2] - scope-69 | |---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67 | | | Project[chararray][1] - scope-57 | | | Project[chararray][0] - scope-59 | | | Project[double][2] - scope-61 | | | Project[long][3] - scope-63 | | | Project[double][4] - scope-65 | |---filtered_uniq_frequency: Filter[bag] - scope-53 | | | Greater Than[boolean] - scope-56 | | | |---Project[double][2] - scope-54 | | | |---Constant(2.0) - scope-55 | |---uniq_frequency2: New For Each(true,true)[bag] - scope-52 | | | Project[chararray][0] - scope-47 | | | POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50 | | | |---Project[bag][1] - scope-49 | |---uniq_frequency1: Package[tuple]{chararray} - scope-44 | |---uniq_frequency1: Global Rearrange[tuple] - scope-43 | |---uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45 | | | Project[chararray][0] - scope-46 | |---hour_frequency2: New For Each(true,false)[bag] - scope-42 | | | Project[tuple][0] - scope-37 | | | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40 | | | |---Project[bag][1] - scope-39 | |---hour_frequency1: Package[tuple]{tuple} - scope-33 | |---hour_frequency1: Global Rearrange[tuple] - scope-32 | |---hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34 | | | Project[chararray][2] - scope-35 | | | Project[chararray][1] - scope-36 | |---ngramed2: PODistinct[bag] - scope-31 | |---ngramed1: New For Each(false,false,true)[bag] - scope-30 | | | Project[bytearray][0] - scope-23 | | | Project[chararray][1] - scope-25 | | | POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28 | | | |---Project[chararray][2] - scope-27 | |---houred: New For Each(false,false,false)[bag] - scope-22 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18 | | | |---Cast[chararray] - scope-17 | | | |---Project[bytearray][1] - scope-16 | | | Project[chararray][2] - scope-20 | |---clean2: New For Each(false,false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-5 | | | Project[bytearray][1] - scope-7 | | | POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11 | | | |---Cast[chararray] - scope-10 | | | |---Project[bytearray][2] - scope-9 | |---clean1: Filter[bag] - scope-1 | | | POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4 | | | |---Cast[chararray] - scope-3 | | | |---Project[bytearray][2] - scope-2 | |---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(' ')) - scope-0
MR執行計劃
# Map Reduce Plan #-------------------------------------------------- MapReduce node scope-72 Map Plan Local Rearrange[tuple]{tuple}(true) - scope-74 | | | Project[tuple][*] - scope-73 | |---ngramed1: New For Each(false,false,true)[bag] - scope-30 | | | Project[bytearray][0] - scope-23 | | | Project[chararray][1] - scope-25 | | | POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28 | | | |---Project[chararray][2] - scope-27 | |---houred: New For Each(false,false,false)[bag] - scope-22 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18 | | | |---Cast[chararray] - scope-17 | | | |---Project[bytearray][1] - scope-16 | | | Project[chararray][2] - scope-20 | |---clean2: New For Each(false,false,false)[bag] - scope-13 | | | Project[bytearray][0] - scope-5 | | | Project[bytearray][1] - scope-7 | | | POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11 | | | |---Cast[chararray] - scope-10 | | | |---Project[bytearray][2] - scope-9 | |---clean1: Filter[bag] - scope-1 | | | POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4 | | | |---Cast[chararray] - scope-3 | | | |---Project[bytearray][2] - scope-2 | |---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(' ')) - scope-0-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-78 | |---New For Each(true)[bag] - scope-77 | | | Project[tuple][0] - scope-76 | |---Package[tuple]{tuple} - scope-75-------- Global sort: false ---------------- MapReduce node scope-80 Map Plan hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34 | | | Project[chararray][2] - scope-35 | | | Project[chararray][1] - scope-36 | |---Load(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-79-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-81 | |---hour_frequency2: New For Each(true,false)[bag] - scope-42 | | | Project[tuple][0] - scope-37 | | | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40 | | | |---Project[bag][1] - scope-39 | |---hour_frequency1: Package[tuple]{tuple} - scope-33-------- Global sort: false ---------------- MapReduce node scope-83 Map Plan uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45 | | | Project[chararray][0] - scope-46 | |---Load(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-82-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-84 | |---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67 | | | Project[chararray][1] - scope-57 | | | Project[chararray][0] - scope-59 | | | Project[double][2] - scope-61 | | | Project[long][3] - scope-63 | | | Project[double][4] - scope-65 | |---filtered_uniq_frequency: Filter[bag] - scope-53 | | | Greater Than[boolean] - scope-56 | | | |---Project[double][2] - scope-54 | | | |---Constant(2.0) - scope-55 | |---uniq_frequency2: New For Each(true,true)[bag] - scope-52 | | | Project[chararray][0] - scope-47 | | | POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50 | | | |---Project[bag][1] - scope-49 | |---uniq_frequency1: Package[tuple]{chararray} - scope-44-------- Global sort: false ---------------- MapReduce node scope-86 Map Plan ordered_uniq_frequency: Local Rearrange[tuple]{chararray}(false) - scope-91 | | | Constant(all) - scope-90 | |---New For Each(false,false)[tuple] - scope-89 | | | Project[chararray][0] - scope-87 | | | Project[double][2] - scope-88 | |---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-85-------- Reduce Plan Store(file:/tmp/temp1620254926/tmp-586682361:org.apache.pig.impl.io.InterStorage) - scope-101 | |---New For Each(false)[tuple] - scope-100 | | | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-99 | | | |---Project[tuple][*] - scope-98 | |---New For Each(false,false)[tuple] - scope-97 | | | Constant(-1) - scope-96 | | | ordered_uniq_frequency: POSort[bag]() - scope-70 | | | | | Project[chararray][0] - scope-94 | | | | | Project[double][1] - scope-95 | | | |---Project[bag][1] - scope-93 | |---Package[tuple]{chararray} - scope-92-------- Global sort: false ---------------- MapReduce node scope-103 Map Plan ordered_uniq_frequency: Local Rearrange[tuple]{tuple}(false) - scope-104 | | | Project[chararray][0] - scope-68 | | | Project[double][2] - scope-69 | |---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-102-------- Reduce Plan ordered_uniq_frequency: Store(file:/tmp/temp1620254926/tmp-225116343:org.apache.pig.impl.io.InterStorage) - scope-71 | |---New For Each(true)[tuple] - scope-107 | | | Project[bag][1] - scope-106 | |---PackageLite[tuple]{tuple} - scope-105-------- Global sort: true Quantile file: file:/tmp/temp1620254926/tmp-586682361 ----------------
全文完 :)
最後更新:2017-04-03 12:56:35