836
小米
Spark + ODPS__Spark_开发人员指南_E-MapReduce-阿里云
Spark + MaxCompute
Spark 接入 MaxCompute
本章节将介绍如何使用E-MapReduce SDK在Spark中完成一次MaxCompute数据的读写操作。
初始化一个OdpsOps对象。在 Spark 中,MaxCompute的数据操作通过OdpsOps类完成,请参照如下步骤创建一个OdpsOps对象:
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkContext, SparkConf}
object Sample {
def main(args: Array[String]): Unit = {
// == Step-1 ==
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
// 以内网地址为例
val urls = Seq("https://odps-ext.aliyun-inc.com/api", "https://dt-ext.odps.aliyun-inc.com")
val conf = new SparkConf().setAppName("Test Odps")
val sc = new SparkContext(conf)
val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
// 下面是一些调用代码
// == Step-2 ==
...
// == Step-3 ==
...
}
// == Step-2 ==
// 方法定义1
// == Step-3 ==
// 方法定义2
}
从MaxCompute中加载表数据到Spark中。通过OdpsOps对象的readTable方法,可以将MaxCompute中的表加载到Spark中,即生成一个RDD,如下所示:
// == Step-2 ==
val project = <odps-project>
val table = <odps-table>
val numPartitions = 2
val inputData = odpsOps.readTable(project, table, read, numPartitions)
inputData.top(10).foreach(println)
// == Step-3 ==
...
在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,如下所示:
def read(record: Record, schema: TableSchema): String = {
record.getString(0)
}
这个函数的含义是将MaxCompute表的第一列加载到Spark运行环境中。
将 Spark 中的结果数据保存到MaxCompute表中。通过OdpsOps对象的saveToTable方法,可以将Spark RDD持久化到MaxCompute中。
val resultData = inputData.map(e => s"$e has been processed.")
odpsOps.saveToTable(project, table, dataRDD, write)
在上面的代码中,您还需要定义一个write函数,用作写MaxCompute表前数据预处理,如下所示:
def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
val r = emptyReord
r.set(0, s)
}
这个函数的含义是将RDD的每一行数据写到对应MaxCompute表的第一列中。
分区表参数写法说明
SDK支持对MaxCompute分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列pt和ps:
- 读分区pt为1的表数据:pt=‘1’
- 读分区pt为1和分区ps为2的表数据:pt=‘1’,ps=‘2’
附录
示例代码请看:
最后更新:2016-12-19 19:19:30
上一篇:
简单操作 OSS 文件__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
Spark + ONS__Spark_开发人员指南_E-MapReduce-阿里云
Redis客户端连接__连接实例_快速入门_云数据库 Redis 版-阿里云
按量计费预告(2017-1月生效)__计费说明_日志服务-阿里云
ECS 数据源 (3/3):分组管理__准备数据源_用户指南_业务实时监控服务 ARMS-阿里云
维表管理__管理系统配置_用户指南_业务实时监控服务 ARMS-阿里云
提交作业__命令行工具_批量计算-阿里云
通过 Docker 工具连接集群__快速入门_容器服务-阿里云
MQTT 常见问题__MQTT 接入(物联)_消息队列 MQ-阿里云
添加监控服务器不成功?__产品使用常见问题_产品使用问题_性能测试-阿里云
万网域名使用万网企业邮箱,设置解析方法__邮箱解析_产品使用问题_云解析-阿里云
添加监控服务器__测试环境_使用手册_性能测试-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云