flume列子回想
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind =master
a1.sources.r1.port =8888
a1.sinks.s1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
a1.sources.r1.intercpetors=i1
a1.sources.r1.interceptors.i1.type=search_replace
a1.sources.r1.interceptors.i1.searchPattern=(\\d{3})\\d{4}(\\d{4})
a1.sources.r1.interceptors.i1.replaceString=$1xxxx$2
手機號碼抹去中間四位
這個還需要java類去製造avro序列化的文件
package flumeTest;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
//鏈接avro的flume source 發送event 到flume agent
public class FlumeClient {
private RpcClient flumeClient;
private String hostName;
private int port;
public FlumeClient(String hostname,int port){
this.hostName =hostname;
this.port=port;
this.flumeClient=RpcClientFactory.getDefaultInstance(hostname, port);
}
//把字符串消息發送event到avro source
public void sendEvent(String msg){
Map<String, String> headers =new HashMap<String, String>();
headers.put("timestamp", String.valueOf(new Date().getTime()));
//構建event
Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers);
try{
flumeClient.append(event);
}catch (Exception e) {
e.printStackTrace();
flumeClient.close();
flumeClient=null;
flumeClient=RpcClientFactory.getDefaultInstance(hostName, port);
}
}
public void close(){
flumeClient.close();
}
//這個類的作用就是向hostName的port端口輸入Flume定義的RpcClient avro格式的內容
public static void main(String[] args) {
FlumeClient flumeClient =new FlumeClient("master", 8888);
String bMsg="fromjava-msg";
for(int i=0;i<100;i++){
flumeClient.sendEvent(bMsg+i);
}
flumeClient.close();
}
}
package flumeTest;
import java.util.Random;
public class SendPhoneNo {
public static void main(String[] args) {
FlumeClient flumeClient = new FlumeClient("master", 8888);
Random random = new Random();
for (int i = 0; i < 100; i++) {
String phoneNo = "1" + random.nextInt(10) + random.nextInt(10) + random.nextInt(10) + random.nextInt(10)
+ random.nextInt(10) + random.nextInt(10) + random.nextInt(10) + random.nextInt(10)
+ random.nextInt(10) + random.nextInt(10);
flumeClient.sendEvent(phoneNo);
}
flumeClient.close();
}
}
列子二
a1.sources=r1
a1.channels=c1 c2 c3 c4
a1.sinks=s1 s2 s3 s4
a1.sources.r1.type=avro
a1.sources.r1.bind=master
a1.sources.r1.port=8888
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
a1.channels.c3.transactionCapacity=100
a1.channels.c4.type=memory
a1.channels.c4.capacity=1000
a1.channels.c4.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flumelog/henan
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.filePrefix=test_log
a1.sinks.s1.hdfs.rollInterval=0
a1.sinks.s1.hdfs.rollSize=0
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks/s1.hdfs.userLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flumelog/hebei
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.filePrefix=test_log
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=0
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks/s2.hdfs.userLocalTimeStamp=true
這個conf是為了將events按照不同地區攔截寫入到hdfs上的不同文件夾中 相當於按區歸類處理source文件為avro所以寫了一個java類來傳送到master
package flumeTest;
//agent的slector 為multiplexing
//到event的header去匹配key為ptovince的value 然後發送到相應的channel中
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class ForFanoutSelectorClient {
private RpcClient client;
private final String[] provinces={"henan","hebei","shanghai","shandong"};
private final Random random =new Random();
public ForFanoutSelectorClient(String hostname,int port){
this.client=RpcClientFactory.getDefaultInstance(hostname, port);
}
public Event getRandomEvent(String msg){
Map<String,String> headers=new HashMap<String, String>();
String province =provinces[random.nextInt(4)];
headers.put("province", province);
Event result=EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers);
return result;
}
public void sendEvent(Event event){
try {
client.append(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
public void close(){
client.close();
}
public static void main(String[] args) {
ForFanoutSelectorClient fanoutSelectorClient =new ForFanoutSelectorClient("master", 8888);
String msg ="peopleinfo_";
for(int i=0;i<300;i++){
Event event =fanoutSelectorClient.getRandomEvent(msg+i+"_");
fanoutSelectorClient.sendEvent(event);
}
fanoutSelectorClient.close();
}
}
列子三
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/opt/spooldirnew
a1.sources.r1.fileHeader=true
a1.sinks.s1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
a1.sources.r1.interceptor=i1
a1.sources.r1.interceptor.i1.type=regex_filter
a1.sources.r1.interceptor.i1.regex=^[a-z0-9]+([._\\-][a-z0-9])@([a-z0-9]+[-a-z0-9]*[a-z0-9]+.){1,63}[a-z0-9]+$
a1.sources.r1.interceptor.excludeEvents=false
刪選出在spooldirnew文件裏麵有關郵箱的event時間 輸出在logger中
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/spooldir
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/flumelog/%Y%m%d
a1.sinks.k1.hdfs.fileSuffix=.log
a1.simks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.roolSize=0
a1.sinks.k1.hdfs.roolCount=100
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
這個conf是用來動態分布采集到的數據的
動態即按照年月日創建文件夾在集群上 需要注意的一點事得寫userLocalTimeStamp=true 不然flume無法參照時間做事
a1.sources = r1
a1.sinks = s1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.s1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
采集netcat類型的數據以日誌形式展示 (官網例子)
a1.sources=r1
a1.sinkes=s1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.bind=master
a1.sources.r1.port=8888
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=logger
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=host
host攔截器 之前總結過 就是在header中添加host Ip
a1.sources= r1
a1.sinkes=s1 s2
a1.channels=c1 c2
a1.sources.r1.type=avro
a1.sources.r1.bind=master
a1.sources.r1.port=8888
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=logger
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flumelog/%Y%m%d
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.filePrefix=test_log
a1.sinks.s2.hdfs.rollInterval=0
a1.sinks.s2.hdfs.rollSize=0
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks/s2.hdfs.userLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
多sinks和channels 一個用來輸出到logger中 一個用於保存在hdfs上
最後更新:2017-11-08 00:03:41
上一篇:
mysql和mongodb對比互補
下一篇:
Flume 之 Interceptors
Java與js的交互
OBTAINING SPRING 3 ARTIFACTS WITH MAVEN
關於舉辦“天德π客”創業論壇——“基於阿裏雲的大數據實踐—海量日誌分析”的通知
Android 通過字符串來獲取R下麵資源的ID 值
雲服務器 ECS 使用OpenAPI管理ECS:使用OpenAPI彈性釋放ECS實例
阿裏雲與Apache Flink商業公司DataArtisans於2017杭州雲棲大會達成戰略合作並發布
[python爬蟲]scrapy+django+mysql爬大眾點評餐廳數據
用數據找到撬動新零售的支點
我也說說Emacs吧(6) - Lisp速成
互聯網企業安全高級指南1.2 企業安全包括哪些事情