Spark + Log Service__Spark_开发人员指南_E-MapReduce-阿里云
Spark + Log Service
Spark 接入 Log Service
下面这个例子演示了 Spark Streaming 如何消费 Log Service 中的日志数据,统计日志条数。
if (args.length < 6) {
System.err.println(
"""Usage: TestLoghub <sls project> <sls logstore> <loghub group name>
| <sls endpoint> <receiver number> <batch interval seconds>
""".stripMargin)
System.exit(1)
}
val logserviceProject = args(0) // Log Service 中 project 名
val logStoreName = args(1) // Log Service 中 logstore 名
val loghubGroupName = args(2) // loghubGroupName 相同的作业将共同消费 logstore 的数据
val loghubEndpoint = args(3) // 阿里云日志服务数据类 API Endpoint
val accessKeyId = "<accessKeyId>" // 访问日志服务的 AccessKeyId
val accessKeySecret = "<accessKeySecret>" // 访问日志服务的 AccessKeySecret
val numReceivers = args(4).toInt // 启动多少个 Receiver 来读取 logstore 中的数据
val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming 中每次处理批次时间间隔
val conf = new SparkConf().setAppName("Test Loghub Streaming")
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStream,
loghubGroupName,
endpoint,
numReceivers,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.foreachRDD(rdd => println(rdd.count()))
ssc.start()
ssc.awaitTermination()
说明
- E-MapReduce SDK支持LogService的三种消费模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默认是“END_CURSOR”。
- BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
- END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
- SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费。单位为秒。
- 以上三种消费模式都收到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费:在LoghubUtils#createStream接口中,以下参数需要组合使用:
- cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
- forceSpecial:true
- E-MapReduce 的机器(除了 Master 节点)无法连接公网。配置 Log Service endpoint 时,请注意使用 Log Service 提供的内网 endpoint,否则无法请求到 Log Service。
- 了解更多关于 Log Service,请查看相关文档。
附录
完整示例代码请看:
最后更新:2016-12-19 19:29:39
上一篇:
Spark + ONS__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
Spark + MNS__Spark_开发人员指南_E-MapReduce-阿里云
DnsServerType__数据类型_API文档_云解析-阿里云
全局唯一数字序列使用__开发手册_分布式关系型数据库 DRDS-阿里云
步骤四:配置推荐引擎__快速入门_推荐引擎-阿里云
总体说明__服务器端API_阿里云物联网套件-阿里云
登录 Windows 实例__远程连接_实例_用户指南_云服务器 ECS-阿里云
Lite脚本编写SDK__手工脚本编写_Lite用户使用手册_性能测试-阿里云
金融云推荐架构__使用金融云产品_金融云-阿里云
概述__实例_产品简介_云服务器 ECS-阿里云
新网接入__CNAME绑定教程_用户指南_CDN-阿里云
OTSWriter__Writer插件_使用手册_数据集成-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云