836
技術社區[雲棲]
Spark + ODPS__Spark_開發人員指南_E-MapReduce-阿裏雲
Spark + MaxCompute
Spark 接入 MaxCompute
本章節將介紹如何使用E-MapReduce SDK在Spark中完成一次MaxCompute數據的讀寫操作。
初始化一個OdpsOps對象。在 Spark 中,MaxCompute的數據操作通過OdpsOps類完成,請參照如下步驟創建一個OdpsOps對象:
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkContext, SparkConf}
object Sample {
def main(args: Array[String]): Unit = {
// == Step-1 ==
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
// 以內網地址為例
val urls = Seq("https://odps-ext.aliyun-inc.com/api", "https://dt-ext.odps.aliyun-inc.com")
val conf = new SparkConf().setAppName("Test Odps")
val sc = new SparkContext(conf)
val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
// 下麵是一些調用代碼
// == Step-2 ==
...
// == Step-3 ==
...
}
// == Step-2 ==
// 方法定義1
// == Step-3 ==
// 方法定義2
}
從MaxCompute中加載表數據到Spark中。通過OdpsOps對象的readTable方法,可以將MaxCompute中的表加載到Spark中,即生成一個RDD,如下所示:
// == Step-2 ==
val project = <odps-project>
val table = <odps-table>
val numPartitions = 2
val inputData = odpsOps.readTable(project, table, read, numPartitions)
inputData.top(10).foreach(println)
// == Step-3 ==
...
在上麵的代碼中,您還需要定義一個read函數,用來解析和預處理MaxCompute表數據,如下所示:
def read(record: Record, schema: TableSchema): String = {
record.getString(0)
}
這個函數的含義是將MaxCompute表的第一列加載到Spark運行環境中。
將 Spark 中的結果數據保存到MaxCompute表中。通過OdpsOps對象的saveToTable方法,可以將Spark RDD持久化到MaxCompute中。
val resultData = inputData.map(e => s"$e has been processed.")
odpsOps.saveToTable(project, table, dataRDD, write)
在上麵的代碼中,您還需要定義一個write函數,用作寫MaxCompute表前數據預處理,如下所示:
def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
val r = emptyReord
r.set(0, s)
}
這個函數的含義是將RDD的每一行數據寫到對應MaxCompute表的第一列中。
分區表參數寫法說明
SDK支持對MaxCompute分區表的讀寫,這裏分區名的寫法標準是:分區列名=分區名,多個分區時以逗號分隔,例如有分區列pt和ps:
- 讀分區pt為1的表數據:pt=‘1’
- 讀分區pt為1和分區ps為2的表數據:pt=‘1’,ps=‘2’
附錄
示例代碼請看:
最後更新:2016-12-19 19:19:30
上一篇:
簡單操作 OSS 文件__Spark_開發人員指南_E-MapReduce-阿裏雲
下一篇:
Spark + ONS__Spark_開發人員指南_E-MapReduce-阿裏雲
Redis客戶端連接__連接實例_快速入門_雲數據庫 Redis 版-阿裏雲
按量計費預告(2017-1月生效)__計費說明_日誌服務-阿裏雲
ECS 數據源 (3/3):分組管理__準備數據源_用戶指南_業務實時監控服務 ARMS-阿裏雲
維表管理__管理係統配置_用戶指南_業務實時監控服務 ARMS-阿裏雲
提交作業__命令行工具_批量計算-阿裏雲
通過 Docker 工具連接集群__快速入門_容器服務-阿裏雲
MQTT 常見問題__MQTT 接入(物聯)_消息隊列 MQ-阿裏雲
添加監控服務器不成功?__產品使用常見問題_產品使用問題_性能測試-阿裏雲
萬網域名使用萬網企業郵箱,設置解析方法__郵箱解析_產品使用問題_雲解析-阿裏雲
添加監控服務器__測試環境_使用手冊_性能測試-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲