log4j2+flume+hadoop
大數據數據采集架構
我們日常的應用會打印很多日誌,很可能我們需要從這些日誌中提取某些有用信息,要實現這個功能可以通過如下架構實現。我的選型是log4j2+flume+hadoop。整個架構如圖所示:
問題一:為什麼是log4j2?
1.傳統的log4j對性能的消耗很大。Apache宣稱,對於並發發操作log4j2的性能是log4j的18倍
2.log4j2為flume專門提供了一個flume appender 利於flume做數據采集
3.log4j提供jsonLaout,可以生成json形式的日誌,這種類型的數據對於第二階段的數據解析提供了便利。
問題二:為什麼是flume
1.flume是JAVA語言開發的,我個人是專門做JAVA,如果要做自定義會很方便,而flume提供了靈活的自定義功能。
2.flume在采集數據的時候便可做一些數據清洗的東西,將不想要的東西過濾掉。
3.flume本身比較輕巧,日數據在100W以內都能穩定使用。如果超過100W可以考慮跟kafka集成。
問題三:為什麼是hadoop?
公司的要求是將用戶的數據收集,存儲,然後進行分析,根據分析結果改善用戶體驗,等等。hadoop的優勢是對硬件的要求不高,並且有很強的容錯性,能對數據進行離線分析。這些特點恰好滿足公司需求。
IP分配
IP | flume | hadoop |
---|---|---|
m1 192.168.1.111 | agent1 | NameNode |
s2 192.168.1.112 | collector1 | DataNode1 |
s3 192.168.1.113 | collector2 | DataNode2 |
一.log4j2
1. 新建一個marven項目,目錄結構如圖所示
2. 配置pom文件
<properties>
<log4j.version>2.8.2</log4j.version>
<slf4j.version>2.8.2</slf4j.version>
<flume-ng.versiopn>2.8.2</flume-ng.versiopn>
<log4j-flume-ng.version>1.7.0</log4j-flume-ng.version>
<jackson.version>2.7.0</jackson.version>
</properties>
<dependencies>
<!-- log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- slf4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- flume -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-flume-ng</artifactId>
<version>${flume-ng.versiopn}</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>${log4j-flume-ng.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
3.配置log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<!--自定義flume日誌級別-->
<CustomLevels>
<CustomLevel name="FLUME" intLevel="88" />
</CustomLevels>
<!--定義輸出日誌的地方-->
<Appenders>
<!--控製台輸出-->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/>
</Console>
<!--log文件輸出-->
<File name="MyFile" fileName="logs/app.log">
<PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/>
</File>
<!--輸出到flume-->
<Flume name="eventLogger" compress="false">
<Agent host="192.168.1.111" port="41414"/>
<!--輸出方式為json-->
<JSONLayout/>
</Flume>
</Appenders>
<!--配置不同的日誌級別輸出到不同地點-->
<Loggers>
<!--root代表默認日誌級別-->
<Root level="error">
<!--設定flume級別及以上的日誌通過flume-appender輸出-->
<AppenderRef ref="eventLogger" level="FLUME" />
<!--設定console級別及以上的日誌通過控製台輸出-->
<AppenderRef ref="Console" level="info" />
<!--設定error及以上的日誌通過log文件輸出-->
<AppenderRef ref="MyFile" level="error" />
</Root>
</Loggers>
</Configuration>
3.LaoutTest.java
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Date;
/**
* Created by hadoop on 2017/7/28.
*/
public class LaoutTest {
static Logger logger = LogManager.getLogger(LaoutTest.class);
public static void main(String[] args) throws InterruptedException {
while (true) {
// 每隔兩秒log輸出一下當前係統時間戳
Thread.sleep(100);
logger.info(String.valueOf(new Date().getTime()));
logger.log(Level.getLevel("FLUME"), "another diagnostic message");
try {
throw new Exception("exception msg");
}
catch (Exception e) {
logger.error("error:" + e.getMessage());
}
}
}
}
二.flume
flume我使用的是1.7版本,下載地址https://flume.apache.org/download.html
安裝在/usr下 文件名更名為flume。三台機器都這樣操作。如下配置文件都是在/user/flume/conf下創建生成
1. agent1: 配置文件名avro-mem-hdfs-collector.properties
#nents on this agent
agent1.sources = r1
agent1.sinks = k1 k2 k3
agent1.channels = c1 c2 c3
#設定來源 通道 存儲之間的關係
agent1.sources.r1.channels = c1 c2 c3
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2
agent1.sinks.k3.channel = c3
agent1.sources.r1.selector = replicating
#source
agent1.sources.r1.type = avro
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 41414
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors =i1
agent1.sources.r1.interceptors.i1.type = timestamp
#channel c1
agent1.channels.c1.type = memory
agent1.channels.c1.keep-alive = 30
agent1.channels.c1.capacity = 10000
agent1.channels.c1.transactionCapacity = 1000
#sink k1
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/all/%Y-%m-%d/%H
agent1.sinks.k1.hdfs.filePrefix = logs
agent1.sinks.k1.hdfs.inUsePrefix = .
agent1.sinks.k1.hdfs.fileType = DataStream
agent1.sinks.k1.hdfs.rollInterval = 0
agent1.sinks.k1.hdfs.rollSize = 16777216
agent1.sinks.k1.hdfs.rollCount = 0
agent1.sinks.k1.hdfs.batchSize = 1000
agent1.sinks.k1.hdfs.writeFormat = text
agent1.sinks.k1.hdfs.fileType = DataStream
agent1.sinks.k1.callTimeout =10000
#channel c2
agent1.channels.c2.type=memory
agent1.channels.c2.keep-alive = 30
agent1.channels.c2.capacity = 10000
agent1.channels.c2.transactionCapacity = 1000
#sink for k2
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = c2
agent1.sinks.k2.hostname = 192.168.1.112
agent1.sinks.k2.port = 41414
#channel c3
agent1.channels.c3.type=memory
agent1.channels.c3.keep-alive = 30
agent1.channels.c3.capacity = 10000
agent1.channels.c3.transactionCapacity = 1000
#sink for k3
agent1.sinks.k3.type = avro
agent1.sinks.k3.channel = c2
agent1.sinks.k3.hostname = 192.168.1.113
agent1.sinks.k3.port = 41414
2. collector2 配置文件名avro-mem-hdfs.properties
#nents on this agent
collector2.sources = r1
collector2.sinks = k1
collector2.channels = c1
#source
collector2.sources.r1.channels = c1
collector2.sources.r1.type = avro
collector2.sources.r1.bind = 0.0.0.0
collector2.sources.r1.port = 41414
collector2.sources.r1.fileHeader = false
collector2.sources.r1.interceptors =i1
collector2.sources.r1.interceptors.i1.type = timestamp
# channel
collector2.channels.c1.type = memory
collector2.channels.c1.keep-alive = 30
collector2.channels.c1.capacity = 30000
collector2.channels.c1.transactionCapacity = 3000
# sink
collector2.sinks.k1.channel = c1
collector2.sinks.k1.type = hdfs
collector2.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business1/%Y-%m-%d/%H
collector2.sinks.k1.hdfs.filePrefix = logs
collector2.sinks.k1.hdfs.inUsePrefix = .
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.hdfs.rollInterval = 0
collector2.sinks.k1.hdfs.rollSize = 16777216
collector2.sinks.k1.hdfs.rollCount = 0
collector2.sinks.k1.hdfs.batchSize = 1000
collector2.sinks.k1.hdfs.writeFormat = text
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.callTimeout =10000
3. collector3 配置文件avro-mem-hdfs.properties
#nents on this agent
#nents on this agent
collector3.sources = r1
collector3.sinks = k1
collector3.channels = c1
#source
collector3.sources.r1.channels = c1
collector3.sources.r1.type = avro
collector3.sources.r1.bind = 0.0.0.0
collector3.sources.r1.port = 41414
collector3.sources.r1.fileHeader = false
collector3.sources.r1.interceptors =i1
collector3.sources.r1.interceptors.i1.type = timestamp
# channel
collector3.channels.c1.type = memory
collector3.channels.c1.keep-alive = 30
collector3.channels.c1.capacity = 30000
collector3.channels.c1.transactionCapacity = 3000
# sink
collector3.sinks.k1.channel = c1
collector3.sinks.k1.type = hdfs
collector3.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business2/%Y-%m-%d/%H
collector3.sinks.k1.hdfs.filePrefix = logs
collector3.sinks.k1.hdfs.inUsePrefix = .
collector3.sinks.k1.hdfs.fileType = DataStream
collector3.sinks.k1.hdfs.rollInterval = 0
collector3.sinks.k1.hdfs.rollSize = 16777216
collector3.sinks.k1.hdfs.rollCount = 0
collector3.sinks.k1.hdfs.batchSize = 1000
collector3.sinks.k1.hdfs.writeFormat = text
collector3.sinks.k1.hdfs.fileType = DataStream
collector3.sinks.k1.callTimeout =10000
4. 進入/user/flume目錄啟動agent與collectoer
1.啟動agent1:
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs-collector.properties -Dflume.root.logger=INFO,console -n agent1
2.啟動collector1:
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector1
3.啟動collector2:
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector3
最後更新:2017-08-13 22:38:48