Storm之Collector-p1
IBasicOutputCollector.java
List<Integer> emit(String streamId, List<Object> tuple);
提交一係列的tuple,返回接收到這些tuple的taskId
void emitDirect(int taskId, String streamId, List<Object> tuple);
直接向某個task提交一係列的tuple
BasicOutputCollector.java
這個類裏封裝了一個OutputCollector的代理和一個inputTuple
OutputCollector是在構造函數裏傳入的,在Bolt處理完tuple之後調用此類的emit方法時,方法內部會調用封裝的OutputCollector來進行emit,
最終的emit是OutputCollector的emit
此類還提供了兩個emit方法的重載,目的是在沒有指定streamId的時候提供一個默認名為“default”的streamId.
此類的emit方法沒有提供anchors參數,每次bolt執行完之後進行emit時會自動將輸入tuples和輸出tuples關聯,如果不需要關聯,則可不用此類。
IOutputCollector.java
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
提交一係列的tuple,返回接收到這些tuple的taskId,anchors參數是指接收到的tuples
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
直接向某個task提交一係列tuple,同樣的也會附帶上輸入的tuple
void ack(Tuple input);
ack某個tuple
void fail(Tuple input);
fail某個tuple
OutputCollector.java
封裝了一個IOutputCollector的代理,該代理在構造函數時傳遞進來被初始化。
提供了多個emit方法的重載,基本上包括有(單個anchor,無anchor,無指定streamId,隻有輸出的tuple)這些
emitDirect重載的方式也基本上一樣,都是為了使用方便來做的。
當然,最終的emit,ack和fail都是通過代理來實現的。
CoordinatedOutputCollector.java
這個類比較奇葩,是定義在CoordinatedBolt的內部類,隻有CoordinatedBolt這個類使用。
封裝了一個IOutputCollector代理,該代理在構造函數時被初始化。
此類沒有重載emit和emitDirect方法,但是在emit和emitDirect方法內部會調用一個名為updateTaskCounts的方法
private void updateTaskCounts(Object id, List<Integer> tasks) {
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null) {
Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
for(Integer task: tasks) {
int newCount = get(taskEmittedTuples, task, 0) + 1;
taskEmittedTuples.put(task, newCount);
}
}
}
}
這個方法主要是更新目標task和向其發送的tuple數量關係,其關係維護在_tracked變量裏,關係鏈為
tuple_id —> task_id —> num
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
if(failed) {
_delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
}
將收到的tupleID對應的跟蹤信息中receivedTuples(已接收數量)+1 ,然後檢查是否已經處理完該tupleID對應的任務,如果檢查失敗就fail回上一個bolt
TupleType.REGULAR是為了保證不是傳遞ID的也不是傳遞數量的流。
public void fail(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.failed = true;
}
checkFinishId(tuple, TupleType.REGULAR);
_delegate.fail(tuple);
}
設置跟蹤信息failed,然後checkFinishId方法中會fail所有應該ack的tuples,然後刪除這個tupleID對應的跟蹤信息
最後更新:2017-04-01 13:38:49