閱讀598 返回首頁    go iPhone_iPad_Mac_apple


《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(一)

Spark SQL, DataFrames 以及 Datasets 編程指南

概要

Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不同,Spark SQL的接口提供了更多關於數據的結構信息和計算任務的運行時信息。在Spark內部,Spark SQL會能夠用於做優化的信息比RDD API更多一些。Spark SQL如今有了三種不同的API:SQL語句、DataFrame API和最新的Dataset API。不過真正運行計算的時候,無論你使用哪種API或語言,Spark SQL使用的執行引擎都是同一個。這種底層的統一,使開發者可以在不同的API之間來回切換,你可以選擇一種最自然的方式,來表達你的需求。

 

本文中所有的示例都使用Spark發布版本中自帶的示例數據,並且可以在spark-shell、pyspark shell以及sparkR shell中運行。

SQL

Spark SQL的一種用法是直接執行SQL查詢語句,你可使用最基本的SQL語法,也可以選擇HiveQL語法。Spark SQL可以從已有的Hive中讀取數據。更詳細的請參考Hive Tables 這一節。如果用其他編程語言運行SQL,Spark SQL將以DataFrame返回結果。你還可以通過命令行command-line 或者 JDBC/ODBC 使用Spark SQL。

DataFrames

DataFrame是一種分布式數據集合,每一條數據都由幾個命名字段組成。概念上來說,她和關係型數據庫的表 或者 R和Python中的data frame等價,隻不過在底層,DataFrame采用了更多優化。DataFrame可以從很多數據源(sources)加載數據並構造得到,如:結構化數據文件,Hive中的表,外部數據庫,或者已有的RDD。

DataFrame API支持ScalaJavaPython, and R

Datasets

Dataset是Spark-1.6新增的一種API,目前還是實驗性的。Dataset想要把RDD的優勢(強類型,可以使用lambda表達式函數)和Spark SQL的優化執行引擎的優勢結合到一起。Dataset可以由JVM對象構建(constructed )得到,而後Dataset上可以使用各種transformation算子(map,flatMap,filter 等)。

Dataset API 對 Scala 和 Java的支持接口是一致的,但目前還不支持Python,不過Python自身就有語言動態特性優勢(例如,你可以使用字段名來訪問數據,row.columnName)。對Python的完整支持在未來的版本會增加進來。

入門

入口:SQLContext

Spark SQL所有的功能入口都是SQLContext 類,及其子類。不過要創建一個SQLContext對象,首先需要有一個SparkContext對象。

val sc: SparkContext // 假設已經有一個 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用於包含RDD到DataFrame隱式轉換操作
import sqlContext.implicits._

除了SQLContext之外,你也可以創建HiveContext,HiveContext是SQLContext 的超集。

除了SQLContext的功能之外,HiveContext還提供了完整的HiveQL語法,UDF使用,以及對Hive表中數據的訪問。要使用HiveContext,你並不需要安裝Hive,而且SQLContext能用的數據源,HiveContext也一樣能用。HiveContext是單獨打包的,從而避免了在默認的Spark發布版本中包含所有的Hive依賴。如果這些依賴對你來說不是問題(不會造成依賴衝突等),建議你在Spark-1.3之前使用HiveContext。而後續的Spark版本,將會逐漸把SQLContext升級到和HiveContext功能差不多的狀態。

spark.sql.dialect選項可以指定不同的SQL變種(或者叫SQL方言)。這個參數可以在SparkContext.setConf裏指定,也可以通過 SQL語句的SET key=value命令指定。對於SQLContext,該配置目前唯一的可選值就是”sql”,這個變種使用一個Spark SQL自帶的簡易SQL解析器。而對於HiveContext,spark.sql.dialect 默認值為”hiveql”,當然你也可以將其值設回”sql”。僅就目前而言,HiveSQL解析器支持更加完整的SQL語法,所以大部分情況下,推薦使用HiveContext。

創建DataFrame

Spark應用可以用SparkContext創建DataFrame,所需的數據來源可以是已有的RDD(existing RDD),或者Hive表,或者其他數據源(data sources.)

以下是一個從JSON文件創建DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 將DataFrame內容打印到stdout
df.show()

DataFrame操作

DataFrame提供了結構化數據的領域專用語言支持,包括ScalaJavaPython and R.

這裏我們給出一個結構化數據處理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 創建一個 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展示 DataFrame 的內容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印數據樹形結構
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 篩選出年齡大於21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 計算各個年齡的人數
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrame的完整API列表請參考這裏:API Documentation

除了簡單的字段引用和表達式支持之外,DataFrame還提供了豐富的工具函數庫,包括字符串組裝,日期處理,常見的數學函數等。完整列表見這裏:DataFrame Function Reference.

編程方式執行SQL查詢

SQLContext.sql可以執行一個SQL查詢,並返回DataFrame結果。

val sqlContext = ... // 已有一個 SQLContext 對象
val df = sqlContext.sql("SELECT * FROM table")

創建Dataset

Dataset API和RDD類似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通信。如果這個編碼器和標準序列化都能把對象轉字節,那麼編碼器就可以根據代碼動態生成,並使用一種特殊數據格式,這種格式下的對象不需要反序列化回來,就能允許Spark進行操作,如過濾、排序、哈希等。

// 對普通類型數據的Encoder是由 importing sqlContext.implicits._ 自動提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 以下這行不僅定義了case class,同時也自動為其創建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 隻需提供一個和數據schema對應的class即可轉換為 Dataset。Spark會根據字段名進行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

和RDD互操作

Spark SQL有兩種方法將RDD轉為DataFrame。

1. 使用反射機製,推導包含指定類型對象RDD的schema。這種基於反射機製的方法使代碼更簡潔,而且如果你事先知道數據schema,推薦使用這種方式;

2. 編程方式構建一個schema,然後應用到指定RDD上。這種方式更囉嗦,但如果你事先不知道數據有哪些字段,或者數據schema是運行時讀取進來的,那麼你很可能需要用這種方式。

利用反射推導schema

Spark SQL的Scala接口支持自動將包含case class對象的RDD轉為DataFrame。對應的case class定義了表的schema。case class的參數名通過反射,映射為表的字段名。case class還可以嵌套一些複雜類型,如Seq和Array。RDD隱式轉換成DataFrame後,可以進一步注冊成表。隨後,你就可以對表中數據使用SQL語句查詢了。

// sc 是已有的 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 為了支持RDD到DataFrame的隱式轉換
import sqlContext.implicits._

// 定義一個case class.
// 注意:Scala 2.10的case class最多支持22個字段,要繞過這一限製,
// 你可以使用自定義class,並實現Product接口。當然,你也可以改用編程方式定義schema
case class Person(name: String, age: Int)

// 創建一個包含Person對象的RDD,並將其注冊成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查詢的返回結果是一個DataFrame,且能夠支持所有常見的RDD算子
// 查詢結果中每行的字段可以按字段索引訪問:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名訪問:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 會一次性返回多列,並以Map[String, T]為返回結果類型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回結果: Map("name" -> "Justin", "age" -> 19)


轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:32:30

  上一篇:go  《麵向機器智能的TensorFlow實踐》安裝TensorFlow10
  下一篇:go  《麵向機器智能的TensorFlow實踐》引言