閱讀836 返回首頁    go 阿裏雲


Spark + ODPS__Spark_開發人員指南_E-MapReduce-阿裏雲

Spark + MaxCompute

Spark 接入 MaxCompute

本章節將介紹如何使用E-MapReduce SDK在Spark中完成一次MaxCompute數據的讀寫操作。

  1. 初始化一個OdpsOps對象。在 Spark 中,MaxCompute的數據操作通過OdpsOps類完成,請參照如下步驟創建一個OdpsOps對象:

    1. import com.aliyun.odps.TableSchema
    2. import com.aliyun.odps.data.Record
    3. import org.apache.spark.aliyun.odps.OdpsOps
    4. import org.apache.spark.{SparkContext, SparkConf}
    5. object Sample {
    6. def main(args: Array[String]): Unit = {
    7. // == Step-1 ==
    8. val accessKeyId = "<accessKeyId>"
    9. val accessKeySecret = "<accessKeySecret>"
    10. // 以內網地址為例
    11. val urls = Seq("https://odps-ext.aliyun-inc.com/api", "https://dt-ext.odps.aliyun-inc.com")
    12. val conf = new SparkConf().setAppName("Test Odps")
    13. val sc = new SparkContext(conf)
    14. val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
    15. // 下麵是一些調用代碼
    16. // == Step-2 ==
    17. ...
    18. // == Step-3 ==
    19. ...
    20. }
    21. // == Step-2 ==
    22. // 方法定義1
    23. // == Step-3 ==
    24. // 方法定義2
  2. 從MaxCompute中加載表數據到Spark中。通過OdpsOps對象的readTable方法,可以將MaxCompute中的表加載到Spark中,即生成一個RDD,如下所示:

    1. // == Step-2 ==
    2. val project = <odps-project>
    3. val table = <odps-table>
    4. val numPartitions = 2
    5. val inputData = odpsOps.readTable(project, table, read, numPartitions)
    6. inputData.top(10).foreach(println)
    7. // == Step-3 ==
    8. ...

    在上麵的代碼中,您還需要定義一個read函數,用來解析和預處理MaxCompute表數據,如下所示:

    1. def read(record: Record, schema: TableSchema): String = {
    2. record.getString(0)
    3. }

    這個函數的含義是將MaxCompute表的第一列加載到Spark運行環境中。

  3. 將 Spark 中的結果數據保存到MaxCompute表中。通過OdpsOps對象的saveToTable方法,可以將Spark RDD持久化到MaxCompute中。

    1. val resultData = inputData.map(e => s"$e has been processed.")
    2. odpsOps.saveToTable(project, table, dataRDD, write)

    在上麵的代碼中,您還需要定義一個write函數,用作寫MaxCompute表前數據預處理,如下所示:

    1. def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
    2. val r = emptyReord
    3. r.set(0, s)
    4. }

    這個函數的含義是將RDD的每一行數據寫到對應MaxCompute表的第一列中。

  4. 分區表參數寫法說明

SDK支持對MaxCompute分區表的讀寫,這裏分區名的寫法標準是:分區列名=分區名,多個分區時以逗號分隔,例如有分區列pt和ps:

  • 讀分區pt為1的表數據:pt=‘1’
  • 讀分區pt為1和分區ps為2的表數據:pt=‘1’,ps=‘2’

附錄

示例代碼請看:

最後更新:2016-12-19 19:19:30

  上一篇:go 簡單操作 OSS 文件__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go Spark + ONS__Spark_開發人員指南_E-MapReduce-阿裏雲