阅读841 返回首页    go 阿里云 go 技术社区[云栖]


Flume MaxCompute Sink插件__数据入云_数据集成-阿里云

Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。

ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原有功能特性,支持ODPS表自定义分区、且可以自动创建分区。

二、环境要求

1、JDK(1.6以上,推荐1.7)

2、Flume-NG 1.x

三、插件部署

1、下载ODPS Sink插件并解压:aliyun-odps-flume-plugin

2、部署ODPS Sink插件:将文件夹odps_sink移动到Apache Flume安装目录下:

  1. $ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d
  2. $mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/

移动后,核验ODPS Sink插件是否已经在相应目录:

  1. $ ls { YOUR_APACHE_FLUME_DIR}/plugins.d
  2. odps_sink

部署完成后,只需要在Flume的配置文件中将sink的type字段配置为:

  1. com.aliyun.odps.flume.sink.OdpsSink

即可使用

四、配置示例

例:将日志文件中的结构化数据进行解析,并上传到ODPS表中

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

  1. #test_basic.log
  2. some,log,line1
  3. some,log,line2
  4. ...

第一步、在ODPS 的 project创建ODPS Datahub表

建表语句如下所示:

  1. CREATE TABLE hub_table_basic (col1 STRING, col2 STRING)
  2. PARTITIONED BY (pt STRING)
  3. INTO 1 SHARDS
  4. HUBLIFECYCLE 1;

第二步、创建Flume作业配置文件:

在Flume安装目录的conf/文件夹下创建名为odps_basic.conf的文件,并输入内容如下:

  1. # odps_basic.conf
  2. # A single-node Flume configuration for ODPS
  3. # Name the components on this agent
  4. a1.sources = r1
  5. a1.sinks = k1
  6. a1.channels = c1
  7. # Describe/configure the source
  8. a1.sources.r1.type = exec
  9. a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log
  10. # Describe the sink
  11. a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
  12. a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
  13. a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
  14. a1.sinks.k1.odps.endPoint = https://service.odps.aliyun.com/api
  15. a1.sinks.k1.odps.datahub.endPoint = https://dh.odps.aliyun.com
  16. a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
  17. a1.sinks.k1.odps.table = hub_table_basic
  18. a1.sinks.k1.odps.partition = 20150814
  19. a1.sinks.k1.batchSize = 100
  20. a1.sinks.k1.serializer = DELIMITED
  21. a1.sinks.k1.serializer.delimiter = ,
  22. a1.sinks.k1.serializer.fieldnames = col1,,col2
  23. a1.sinks.k1.serializer.charset = UTF-8
  24. a1.sinks.k1.shard.number = 1
  25. a1.sinks.k1.shard.maxTimeOut = 60
  26. a1.sinks.k1.autoCreatePartition = true
  27. # Use a channel which buffers events in memory
  28. a1.channels.c1.type = memory
  29. a1.channels.c1.capacity = 1000
  30. a1.channels.c1.transactionCapacity = 1000
  31. # Bind the source and sink to the channel
  32. a1.sources.r1.channels = c1
  33. a1.sinks.k1.channel = c1

第三步:启动Flume

启动Flume并指定agent的名称和配置文件路径,-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。

  1. $ cd { YOUR_APACHE_FLUME_DIR}
  2. $ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

  1. ...
  2. Write success. Event count: 2
  3. ...

在ODPS Datahub表中即可查到数据;

五、了解更多

Apache Flume User Guide

ODPS Sink插件地址:aliyun-odps-flume-plugin

ODPS Sink配置参数说明

更多ODPS Sink配置示例

最后更新:2016-11-24 11:23:47

  上一篇:go DataX__数据入云_数据集成-阿里云
  下一篇:go Fluentd MaxCompute插件__数据入云_数据集成-阿里云