閱讀769 返回首頁    go 小米 go 小米6


Pig源碼分析: 邏輯執行計劃模塊

Whole View

本文分析的是Pig Logical模塊的代碼(newplan package下),具體每種邏輯執行的實現類不會做具體分析。



Architecture

關鍵類/接口關係圖


下麵對關鍵類/接口具體實現做分析


Operator

public abstract class Operator {
    protected SourceLocation location; // The location of the operator in the original pig script.
    
    protected String name;
    protected OperatorPlan plan; // plan that contains this operator
    protected Map<String, Object> annotations;
    protected final int hashPrime = 31;

Operator的變量:

對name和plan提供get函數,構造函數傳入name和plan。

對annotations提供get,annote,remove方法,來得到、添加、移除注釋。

對locaiton提供get和set函數,且構造函數new SourceLocation()。


Operstor抽象方法:

主要方法為accept(PlanVisitor v),在PlanWalker裏常用到。

還提供一個isEqual方法。

 

繼承結構


主要看兩類實現 LogicalExpression和LogicalRelationalOperator


LogicalExpression

public abstract class LogicalExpression extends Operator {

    static long nextUid = 1;
    protected LogicalSchema.LogicalFieldSchema fieldSchema;
    protected LogicalSchema.LogicalFieldSchema uidOnlyFieldSchema;

deepCopy()方法需要子類實現

 

繼承結構:


子類實現略


LogicalRelationalOperator

LogicalRelationalOperator代表關係型操作,關係型操作有Schema。以下是主要變量,LogicalRelationalOperator為他們提供了一些get/set方法。

abstract public class LogicalRelationalOperator extends Operator {
    
    protected LogicalSchema schema;
    protected int requestedParallelism;
    protected String alias;
    protected int lineNum;
    
    /**
     * Name of the customPartitioner if one is used, this is set to null otherwise.
     */
    protected String mCustomPartitioner = null;
    
    /**
     * A HashSet to indicate whether an option (such a Join Type) was pinned
     * by the user or can be chosen at runtime by the optimizer.
     */
    protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();

關於LogicalSchema類:

內部類LogicalFieldSchema具體表示每一個field的結構,可以看到與LogicalSchema是嵌套的。LogicalSchema維護一個List<LogicalFieldSchema>

    public static class LogicalFieldSchema {
        public String alias;
        public byte type;
        public long uid;
        public LogicalSchema schema;

提供基本方法如下:



除了基本方法外,還提供一套merge schema的方法


繼承結構:


子類實現略。


OperatorPlan

OperatorPlan是一個接口,定義了對Operator的圖操作(Graph Operations)。

羅列了所有方法之後發現,Operator類雖然沒有結構,隻是一個普通的VO類。但是OperatorPlan這個接口定義的以下這套圖操作,使OperstorOperator組成了一個Graph



實現結構:


下麵展開分析。


OperatorSubPlan

OperatorSubPlan代表的是一個OperatorPlan的一個子集的視圖,OperatorSubPlan隻有一個實現,使用在Rule的match過程裏。所以OperatorSubPlan的作用就是提供一個子Plan,用於匹配操作。


BaseOperatorPlan

BaseOperatorPlan實現了OperatorPlan接口,具體實現了各個圖操作方法,把Operator之間的關係(包括softLink關係)用PlanEdge表示,圖操作方法都借助PlanEdge類表達和實現。

public abstract class BaseOperatorPlan implements OperatorPlan {

    protected List<Operator> ops;
    protected PlanEdge fromEdges;
    protected PlanEdge toEdges;
    protected PlanEdge softFromEdges;
    protected PlanEdge softToEdges;

    private List<Operator> roots;
    private List<Operator> leaves;

比如:

toEdges.get(op)                  返回op的前輩

toEdges.get(op)==null       的op為root

fromEdges.get(op)             返回op的後輩

fromEdges.get(op)==null  的op為leave


PlanEdge類的實現:

public class PlanEdge extends MultiMap<Operator, Operator>

這裏的MultiMap是Pig自己的工具類,Pig表示不使用Apache common的MultiMap是因為不支持序列化。

public class MultiMap<K, V> implements Serializable {

    // Change this if you modify the class.
    static final long serialVersionUID = 2L;

    protected Map<K, ArrayList<V>> mMap = null;

    public MultiMap() {
        mMap = new HashMap<K, ArrayList<V>>();
    }

因為MultiMap的value部分使用的是ArrayList,所以使得某些圖操作支持position信息,如:

public Pair<Integer, Integer> disconnect(Operator from, Operator to)
public void connect(Operator from, int fromPos, Operator to, int toPos)

除了實現圖操作方法外,BaseOperatorPlan還提供了explain()方法,子類會使用dpumper或printer來打印輸出Operators層次結構。


繼承結構:


主要看下LogicalPlan和LogicalExpressionPlan兩個實現類。


LogicalPlan

LogicalPlan隻包含關係型操作,也就是說涉及到的Operator都是LogicalRelationalOperator。

 

explain()方法既支持LogicalPlanPrinter的visit實現,也支持DotLOPrinter的dpump實現。

LogicalPlanPrinter是PlanVisitor的子類, LogicalPlanPrinter內部有一個PrintStream,在visit()過程中邊遍曆,邊記錄。

DotLOPrinter是DotPlanDumper的子類,DotPlanDumper是PlanDumper的子類,根據graphviz的dot algorithm,輸出符合DOT格式的plan。


LogicalExpressionPlan

LogicalExpressionPlan處理的是LogicalExpressionOperators。

 

explain()方法借助LogicalPlanPrinter實現


PlanVisitor

訪問者機製,用於操作一個plan。

內部有一個PlanWalker雙向隊列,PlanWalker會按照某種順序遍曆訪問傳入的OperatorPlan,讓plan的每個operation accept該Visitor。

PlanVisitor可以進行push和pop walker的操作。visit()方法調用的是walker.walk(this)方法。



繼承結構很可觀


主要看LogicalExpressionVisitor、LogicalRelationalNodesVisitor這兩大體係。前者訪問expression plans,後者訪問logical plans。


LogicalExpressionVisitor

LogicalExpressionVisitor初始化的時候會判斷傳入的OperatorPlan是否是LogicalExpressionPlan的子類。visit()方法們通過多態,接受LogicalExpression的子類。


LogicalRelationalNodesVisitor

LogicalRelationalNodesVisitor接受的OperatorPlan必須每個operator都是LogicalRelationalOperator的子類(初始化的時候會得到operator iterator對每個進行校驗,不滿足就拋異常)。visit()方法們通過多態,接受LogicalRelationalOperator的實現子類。


PlanWalker

PlanWalker提供的是遍曆訪問一個plan的能力。

PlanWalker的子類主要實現兩個方法:

public abstract void walk(PlanVisitor visitor) throws FrontendException;

public abstract PlanWalker spawnChildWalker(OperatorPlan plan);

walk()方法在子類的實現中,會以不同的順序遍曆plan,最後的結果是遍曆到的節點Operator會調op.accept(visitor)接受本Visitor


繼承結構


接下來具體介紹各子類遍曆能力的實現。


DependencyOrderWalker

DependencyOrderWalker按照依賴順序訪問plan,即一個node被訪問的前提是它的前輩們已經被訪問過了。這個訪問順序相當於,按照拓撲順序訪問圖上的節點。


@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DependencyOrderWalker(plan);
}

walk()方法通過plan.getSinks()方法得到所有的leave節點,即沒有後輩的節點,然後遍曆他們,獲取每個節點的所有前輩,再遞歸前輩的前輩,從而實現把所有的節點都訪問一遍,最後得到結果就是一個FIFO的List。代碼裏的這個Graph依賴遍曆的方式很不高效,但是因為訪問的圖的節點少,所以可接受。

遞歸的過程如下

    protected void doAllPredecessors(Operator node,
                                   Set<Operator> seen,
                                   Collection<Operator> fifo) throws FrontendException {
        if (!seen.contains(node)) {
            // We haven't seen this one before.
            Collection<Operator> preds = Utils.mergeCollection(plan.getPredecessors(node), plan.getSoftLinkPredecessors(node));
            if (preds != null && preds.size() > 0) {
                // Do all our predecessors before ourself
                for (Operator op : preds) {
                    doAllPredecessors(op, seen, fifo);
                }
            }
            // Now do ourself
            seen.add(node);
            fifo.add(node);
        }
    }


DepthFirstWalk

DepthFirstWalker是深度優先遍曆(由上而下的深度優先

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DepthFirstWalker(plan);
}

walk()方法通過plan.getSources()得到所有的root節點,然後遍曆他們,遍曆的時候獲取他們的所有後輩,遞歸遍曆。

遞歸過程如下:

    private void depthFirst(Operator node,
                            Collection<Operator> successors,
                            Set<Operator> seen,
                            PlanVisitor visitor) throws FrontendException {
        if (successors == null) return;

        for (Operator suc : successors) {
            if (seen.add(suc)) {
                suc.accept(visitor);
                Collection<Operator> newSuccessors = Utils.mergeCollection(plan.getSuccessors(suc), plan.getSoftLinkSuccessors(suc));
                depthFirst(suc, newSuccessors, seen, visitor);
            }
        }
    }

PreOrderDepthFirstWalker

PreOrderDepthFirstWalker即前序深度優先(由下而上的深度優先


子Walker是深度優先

public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DepthFirstWalker(plan);
}

walk()方法是通過plan.getSinks()得到所有leave節點,然後遍曆每個leave節點,獲得他的前輩,並遞歸進行深度優先(向上)遍曆。

遞歸操作如下:

    private void depthFirst(Operator node, Collection<Operator> predecessors, Set<Operator> seen,
            PlanVisitor visitor) throws FrontendException {
        if (predecessors == null)
            return;

        boolean thisBranchFlag = branchFlag;
        for (Operator pred : predecessors) {
            if (seen.add(pred)) {
                branchFlag = thisBranchFlag;
                pred.accept(visitor);
                Collection<Operator> newPredecessors = Utils.mergeCollection(plan.getPredecessors(pred), plan.getSoftLinkPredecessors(pred));
                depthFirst(pred, newPredecessors, seen, visitor);
            }
        }
    }

ReserveDependencyOrderWalker

ReserveDependencyOrderWalker是逆向的依賴順序遍曆,即一個節點訪問之後才能訪問它依賴的節點,即N節點要想被訪問,需要依賴N節點的節點先被訪問。

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new ReverseDependencyOrderWalker(plan);
}

walk()方法的訪問模式類似DependencyOrderWalker,區別在於先獲得所有的root節點,然後進行遍曆操作,遍曆root節點的所有後輩,遞歸後輩的後輩,使root節點最後訪問。

遞歸如下:

    protected void doAllSuccessors(Operator node,
                                   Set<Operator> seen,
                                   Collection<Operator> fifo) throws FrontendException {
        if (!seen.contains(node)) {
            // We haven't seen this one before.
            Collection<Operator> succs = Utils.mergeCollection(plan.getSuccessors(node), plan.getSoftLinkSuccessors(node));
            if (succs != null && succs.size() > 0) {
                // Do all our successors before ourself
                for (Operator op : succs) {
                    doAllSuccessors(op, seen, fifo);
                }
            }
            // Now do ourself
            seen.add(node);
            fifo.add(node);
        }
    }

ReverseDependencyOrderWalkerWOSeenChk

ReverseDependencyOrderWalkerWOSeenChk也是逆向的依賴順序遍曆,同ReserveDependencyOrderWalker一樣。

 

子Walker是ReserveDependencyOrderWalker

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new ReverseDependencyOrderWalker(plan);
}

walk()方法和ReserveDependencyOrderWalker的區別在於,每次遍曆的時候不記錄一個seen的Set<Operator>集。



全文完 :)


最後更新:2017-04-03 12:56:08

  上一篇:go Android中的基本控件(上)--TextView控件
  下一篇:go 在64位機器上使用plSQL連接Oracle的問題(SQL*Net not properly installed)