Flume 高可用 負載均衡問題
首先寫一個普通的Java類
這個類主要就是向hostName iP的port節點發送信息
package flumeTest;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
//鏈接avro的flume source 發送event 到flume agent
public class FlumeClient {
private RpcClient flumeClient;
private String hostName;
private int port;
public FlumeClient(String hostname,int port){
this.hostName =hostname;
this.port=port;
this.flumeClient=RpcClientFactory.getDefaultInstance(hostname, port);
}
//把字符串消息發送event到avro source
public void sendEvent(String msg){
Map<String, String> headers =new HashMap<String, String>();
headers.put("timestamp", String.valueOf(new Date().getTime()));
//構建event
Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers);
try{
flumeClient.append(event);
}catch (Exception e) {
e.printStackTrace();
flumeClient.close();
flumeClient=null;
flumeClient=RpcClientFactory.getDefaultInstance(hostName, port);
}
}
public void close(){
flumeClient.close();
}
//這個類的作用就是向hostName的port端口輸入Flume定義的RpcClient avro格式的內容
public static void main(String[] args) {
FlumeClient flumeClient =new FlumeClient("master", 5555);
String bMsg="fromjava-msg";
for(int i=0;i<100;i++){
flumeClient.sendEvent(bMsg+i);
}
flumeClient.close();
}
}
conf文件
不在展示 很簡單 source為avro sink為logger channel為memory
高可用的方式
package flumeTest;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
//這個類主要是保證了Flume的高可用 通過properties文件定義Client類型是default_failover
//
public class FailoverClient {
private Properties properties;
private RpcClient failoverClient;
//初始化rpcclient
public FailoverClient() throws IOException{
this.properties=new Properties();
InputStream inputStream = new FileInputStream("E:\\new workspace\\flumeTest\\src\\main\\resources\\failover_client.conf");
properties.load(inputStream);
this.failoverClient=RpcClientFactory.getInstance(properties);
}
//發送消息
public void sendEvent(String msg){
Event event =EventBuilder.withBody(msg,Charset.forName("UTF-8"));
try {
failoverClient.append(event);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close(){
failoverClient.close();
}
public static void main(String[] args) throws IOException, InterruptedException{
FailoverClient failoverClient =new FailoverClient();
String msg ="message_";
for(int i=1;i<100;i++){
failoverClient.sendEvent(msg+i);
Thread.sleep(1000);
}
failoverClient.close();
}
}
properties文件
client.type=default_failover
hosts=h1 h2
hosts.h1=master:8888
hosts.h2=slave1:8888
max-attempts=3
負載均衡的Java類和高可用的類幾乎沒有什麼改變 變得隻是properties文件
package flumeTest;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class LoadBalanceClient {
private RpcClient lbClient;
private Properties properties;
public LoadBalanceClient() throws FileNotFoundException, IOException{
this.properties =new Properties();
properties.load(new FileInputStream("src/main/resources/load_balance.conf"));
this.lbClient =RpcClientFactory.getInstance(properties);
}
public void sentEvent(String msg){
Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"));
try {
lbClient.append(event);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close(){
lbClient.close();
}
public static void main(String[] args) throws FileNotFoundException, IOException {
LoadBalanceClient loadBalanceClient =new LoadBalanceClient();
String msg ="msg_";
for (int i = 0; i < 100; i++) {
loadBalanceClient.sentEvent(msg+i);
}
loadBalanceClient.close();
}
}
properties
client.type=default_loadbalance
hosts=h1 h2
hosts.h1=master:8888
hosts.h2=slave1:8888
host-selector-random
最後更新:2017-11-06 23:33:43