Flume-ng ThriftSource原理分析
Thrift IDL
Flume Thrift IDL在client包裏麵,定義如下:
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}
注意
:event在C#裏麵是關鍵字,所以利用Thrift編譯器生成客戶端的接口時,要把所有event關鍵字改成其他的、比如events.
Thrift Service
Flume的Source分兩種:
- 實現PollableSource接口
通過SinkRunner管理Source - 實現EventDrivenSource接口
可以自己接受數據、發送到channel。比如ThriftSource
Flume Thrift Service的實現類在core包
public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = "threads";
public static final String CONFIG_BIND = "bind";
public static final String CONFIG_PORT = "port";
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
//創建工作者線程池
...
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
//ThriftSourceProtocol是Flume Thrift Service的真正實現
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
...
}
Flume Thrift Service真正的實現類是內部類ThriftSourceHandler
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
//傳給channel
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn("Thrift source " + getName() + " could not append events " +
"to the channel.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn("Thrift source %s could not append events to the " +
"channel.", getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
最後更新:2017-04-03 12:55:13