836
阿里云
Spark + ODPS__Spark_开发人员指南_E-MapReduce-阿里云
Spark + MaxCompute
Spark 接入 MaxCompute
本章节将介绍如何使用E-MapReduce SDK在Spark中完成一次MaxCompute数据的读写操作。
初始化一个OdpsOps对象。在 Spark 中,MaxCompute的数据操作通过OdpsOps类完成,请参照如下步骤创建一个OdpsOps对象:
import com.aliyun.odps.TableSchemaimport com.aliyun.odps.data.Recordimport org.apache.spark.aliyun.odps.OdpsOpsimport org.apache.spark.{SparkContext, SparkConf}object Sample {def main(args: Array[String]): Unit = {// == Step-1 ==val accessKeyId = "<accessKeyId>"val accessKeySecret = "<accessKeySecret>"// 以内网地址为例val urls = Seq("https://odps-ext.aliyun-inc.com/api", "https://dt-ext.odps.aliyun-inc.com")val conf = new SparkConf().setAppName("Test Odps")val sc = new SparkContext(conf)val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))// 下面是一些调用代码// == Step-2 ==...// == Step-3 ==...}// == Step-2 ==// 方法定义1// == Step-3 ==// 方法定义2}
从MaxCompute中加载表数据到Spark中。通过OdpsOps对象的readTable方法,可以将MaxCompute中的表加载到Spark中,即生成一个RDD,如下所示:
// == Step-2 ==val project = <odps-project>val table = <odps-table>val numPartitions = 2val inputData = odpsOps.readTable(project, table, read, numPartitions)inputData.top(10).foreach(println)// == Step-3 ==...
在上面的代码中,您还需要定义一个read函数,用来解析和预处理MaxCompute表数据,如下所示:
def read(record: Record, schema: TableSchema): String = {record.getString(0)}
这个函数的含义是将MaxCompute表的第一列加载到Spark运行环境中。
将 Spark 中的结果数据保存到MaxCompute表中。通过OdpsOps对象的saveToTable方法,可以将Spark RDD持久化到MaxCompute中。
val resultData = inputData.map(e => s"$e has been processed.")odpsOps.saveToTable(project, table, dataRDD, write)
在上面的代码中,您还需要定义一个write函数,用作写MaxCompute表前数据预处理,如下所示:
def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {val r = emptyReordr.set(0, s)}
这个函数的含义是将RDD的每一行数据写到对应MaxCompute表的第一列中。
分区表参数写法说明
SDK支持对MaxCompute分区表的读写,这里分区名的写法标准是:分区列名=分区名,多个分区时以逗号分隔,例如有分区列pt和ps:
- 读分区pt为1的表数据:pt=‘1’
- 读分区pt为1和分区ps为2的表数据:pt=‘1’,ps=‘2’
附录
示例代码请看:
最后更新:2016-12-19 19:19:30
上一篇:
简单操作 OSS 文件__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
Spark + ONS__Spark_开发人员指南_E-MapReduce-阿里云
Redis客户端连接__连接实例_快速入门_云数据库 Redis 版-阿里云
按量计费预告(2017-1月生效)__计费说明_日志服务-阿里云
ECS 数据源 (3/3):分组管理__准备数据源_用户指南_业务实时监控服务 ARMS-阿里云
维表管理__管理系统配置_用户指南_业务实时监控服务 ARMS-阿里云
提交作业__命令行工具_批量计算-阿里云
通过 Docker 工具连接集群__快速入门_容器服务-阿里云
MQTT 常见问题__MQTT 接入(物联)_消息队列 MQ-阿里云
添加监控服务器不成功?__产品使用常见问题_产品使用问题_性能测试-阿里云
万网域名使用万网企业邮箱,设置解析方法__邮箱解析_产品使用问题_云解析-阿里云
添加监控服务器__测试环境_使用手册_性能测试-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云