閱讀164 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Storm之Collector-p1

64697af5a7c0fbe4a0cc1b62200d1ac9c24af2df


IBasicOutputCollector.java   


List<Integer> emit(String streamId, List<Object> tuple);

提交一係列的tuple,返回接收到這些tuple的taskId


void emitDirect(int taskId, String streamId, List<Object> tuple);

直接向某個task提交一係列的tuple


BasicOutputCollector.java 

這個類裏封裝了一個OutputCollector的代理和一個inputTuple

OutputCollector是在構造函數裏傳入的,在Bolt處理完tuple之後調用此類的emit方法時,方法內部會調用封裝的OutputCollector來進行emit,

最終的emit是OutputCollector的emit

此類還提供了兩個emit方法的重載,目的是在沒有指定streamId的時候提供一個默認名為“default”的streamId.

此類的emit方法沒有提供anchors參數,每次bolt執行完之後進行emit時會自動將輸入tuples和輸出tuples關聯,如果不需要關聯,則可不用此類。


IOutputCollector.java


List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

提交一係列的tuple,返回接收到這些tuple的taskId,anchors參數是指接收到的tuples


void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

直接向某個task提交一係列tuple,同樣的也會附帶上輸入的tuple


void ack(Tuple input);

ack某個tuple


void fail(Tuple input);

fail某個tuple


OutputCollector.java

封裝了一個IOutputCollector的代理,該代理在構造函數時傳遞進來被初始化。

提供了多個emit方法的重載,基本上包括有(單個anchor,無anchor,無指定streamId,隻有輸出的tuple)這些

emitDirect重載的方式也基本上一樣,都是為了使用方便來做的。

當然,最終的emit,ack和fail都是通過代理來實現的。


CoordinatedOutputCollector.java

這個類比較奇葩,是定義在CoordinatedBolt的內部類,隻有CoordinatedBolt這個類使用。

封裝了一個IOutputCollector代理,該代理在構造函數時被初始化。

此類沒有重載emit和emitDirect方法,但是在emit和emitDirect方法內部會調用一個名為updateTaskCounts的方法


private void updateTaskCounts(Object id, List<Integer> tasks) {
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null) {
            Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
            for(Integer task: tasks) {
                int newCount = get(taskEmittedTuples, task, 0) + 1;
                taskEmittedTuples.put(task, newCount);
            }
        }
    }
}

這個方法主要是更新目標task和向其發送的tuple數量關係,其關係維護在_tracked變量裏,關係鏈為 

tuple_id —> task_id —> num


public void ack(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.receivedTuples++;
    }
    boolean failed = checkFinishId(tuple, TupleType.REGULAR);
    if(failed) {
        _delegate.fail(tuple);
    } else {
        _delegate.ack(tuple);
    }
}


將收到的tupleID對應的跟蹤信息中receivedTuples(已接收數量)+1 ,然後檢查是否已經處理完該tupleID對應的任務,如果檢查失敗就fail回上一個bolt

TupleType.REGULAR是為了保證不是傳遞ID的也不是傳遞數量的流。


public void fail(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.failed = true;
    }
    checkFinishId(tuple, TupleType.REGULAR);
    _delegate.fail(tuple);
}


設置跟蹤信息failed,然後checkFinishId方法中會fail所有應該ack的tuples,然後刪除這個tupleID對應的跟蹤信息

最後更新:2017-04-01 13:38:49

  上一篇:go Greenplum 添加segment mirror節點
  下一篇:go spanner 的前世今生