閱讀437 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(二)

編程方式定義Schema

如果不能事先通過case class定義schema(例如,記錄的字段結構是保存在一個字符串,或者其他文本數據集中,需要先解析,又或者字段對不同用戶有所不同),那麼你可能需要按以下三個步驟,以編程方式的創建一個DataFrame:

  1. 從已有的RDD創建一個包含Row對象的RDD
  2. 用StructType創建一個schema,和步驟1中創建的RDD的結構相匹配
  3. 把得到的schema應用於包含Row對象的RDD,調用這個方法來實現這一步:SQLContext.createDataFrame

For example:

例如:

// sc 是已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 創建一個RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 數據的schema被編碼與一個字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各個數據類型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基於前麵的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 將RDD[people]的各個記錄轉換為Rows,即:得到一個包含Row對象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 將schema應用到包含Row對象的RDD上,得到一個DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 將DataFrame注冊為table
peopleDataFrame.registerTempTable("people")

// 執行SQL語句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查詢的結果是DataFrame,且能夠支持所有常見的RDD算子
// 並且其字段可以以索引訪問,也可以用字段名訪問
results.map(t => "Name: " + t(0)).collect().foreach(println)

數據源

Spark SQL支持基於DataFrame操作一係列不同的數據源。DataFrame既可以當成一個普通RDD來操作,也可以將其注冊成一個臨時表來查詢。把DataFrame注冊為table之後,你就可以基於這個table執行SQL語句了。本節將描述加載和保存數據的一些通用方法,包含了不同的Spark數據源,然後深入介紹一下內建數據源可用選項。

通用加載/保存函數

在最簡單的情況下,所有操作都會以默認類型數據源來加載數據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手動指定選項

你也可以手動指定數據源,並設置一些額外的選項參數。數據源可由其全名指定(如,org.apache.spark.sql.parquet),而對於內建支持的數據源,可以使用簡寫名(json, parquet, jdbc)。任意類型數據源創建的DataFrame都可以用下麵這種語法轉成其他類型數據格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接對文件使用SQL

Spark SQL還支持直接對文件使用SQL查詢,不需要用read方法把文件加載進來。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save操作有一個可選參數SaveMode,用這個參數可以指定如何處理數據已經存在的情況。很重要的一點是,這些保存模式都沒有加鎖,所以其操作也不是原子性的。另外,如果使用Overwrite模式,實際操作是,先刪除數據,再寫新數據。

僅Scala/Java 所有支持的語言 含義
SaveMode.ErrorIfExists (default) "error" (default) (默認模式)從DataFrame向數據源保存數據時,如果數據已經存在,則拋異常。
SaveMode.Append "append" 如果數據或表已經存在,則將DataFrame的數據追加到已有數據的尾部。
SaveMode.Overwrite "overwrite" 如果數據或表已經存在,則用DataFrame數據覆蓋之。
SaveMode.Ignore "ignore" 如果數據已經存在,那就放棄保存DataFrame數據。這和SQL裏CREATE TABLE IF NOT EXISTS有點類似。

保存到持久化表

在使用HiveContext的時候,DataFrame可以用saveAsTable方法,將數據保存成持久化的表。與registerTempTable不同,saveAsTable會將DataFrame的實際數據內容保存下來,並且在HiveMetastore中創建一個遊標指針。持久化的表會一直保留,即使Spark程序重啟也沒有影響,隻要你連接到同一個metastore就可以讀取其數據。讀取持久化表時,隻需要用用表名作為參數,調用SQLContext.table方法即可得到對應DataFrame。

默認情況下,saveAsTable會創建一個”managed table“,也就是說這個表數據的位置是由metastore控製的。同樣,如果刪除表,其數據也會同步刪除。

Parquet文件

Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,而且Parquet文件能夠自動保存原始數據的schema。寫Parquet文件的時候,所有的字段都會自動轉成nullable,以便向後兼容。

編程方式加載數據

仍然使用上麵例子中的數據:

// 我們繼續沿用之前例子中的sqlContext對象
// 為了支持RDD隱式轉成DataFrame
import sqlContext.implicits._

val people: RDD[Person] = ... // 和上麵例子中相同,一個包含case class對象的RDD

// 該RDD將隱式轉成DataFrame,然後保存為parquet文件
people.write.parquet("people.parquet")

// 讀取上麵保存的Parquet文件(多個文件 - Parquet保存完其實是很多個文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加載Parquet文件,並返回DataFrame結果
val parquetFile = sqlContext.read.parquet("people.parquet")

// Parquet文件(多個)可以注冊為臨時表,然後在SQL語句中直接查詢
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分區發現

像Hive這樣的係統,一個很常用的優化手段就是表分區。在一個支持分區的表中,數據是保存在不同的目錄中的,並且將分區鍵以編碼方式保存在各個分區目錄路徑中。Parquet數據源現在也支持自動發現和推導分區信息。例如,我們可以把之前用的人口數據存到一個分區表中,其目錄結構如下所示,其中有2個額外的字段,gender和country,作為分區鍵:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

在這個例子中,如果需要讀取Parquet文件數據,我們隻需要把 path/to/table 作為參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能夠自動的從路徑中提取出分區信息,隨後返回的DataFrame的schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分區鍵的數據類型將是自動推導出來的。目前,隻支持數值類型和字符串類型數據作為分區鍵。

有的用戶可能不想要自動推導出來的分區鍵數據類型。這種情況下,你可以通過 spark.sql.sources.partitionColumnTypeInference.enabled (默認是true)來禁用分區鍵類型推導。禁用之後,分區鍵總是被當成字符串類型。

從Spark-1.6.0開始,分區發現默認隻在指定目錄的子目錄中進行。以上麵的例子來說,如果用戶把 path/to/table/gender=male 作為參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load,那麼gender就不會被作為分區鍵。如果用戶想要指定分區發現的基礎目錄,可以通過basePath選項指定。例如,如果把 path/to/table/gender=male作為數據目錄,並且將basePath設為 path/to/table,那麼gender仍然會最為分區鍵。

Schema合並

像ProtoBuffer、Avro和Thrift一樣,Parquet也支持schema演變。用戶從一個簡單的schema開始,逐漸增加所需的新字段。這樣的話,用戶最終會得到多個schema不同但互相兼容的Parquet文件。目前,Parquet數據源已經支持自動檢測這種情況,並合並所有文件的schema。

因為schema合並相對代價比較大,並且在多數情況下不是必要的,所以從Spark-1.5.0之後,默認是被禁用的。你可以這樣啟用這一功能:

  1. 讀取Parquet文件時,將選項mergeSchema設為true(見下麵的示例代碼)
  2. 或者,將全局選項spark.sql.parquet.mergeSchema設為true
// 繼續沿用之前的sqlContext對象
// 為了支持RDD隱式轉換為DataFrame
import sqlContext.implicits._

// 創建一個簡單的DataFrame,存到一個分區目錄中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// 創建另一個DataFrame放到新的分區目錄中,
// 並增加一個新字段,丟棄一個老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// 讀取分區表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// 最終的schema將由3個字段組成(single,double,triple)
// 並且分區鍵出現在目錄路徑中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metastore Parquet table轉換

在讀寫Hive metastore Parquet 表時,Spark SQL用的是內部的Parquet支持庫,而不是Hive SerDe,因為這樣性能更好。這一行為是由spark.sql.hive.convertMetastoreParquet 配置項來控製的,而且默認是啟用的。

Hive/Parquet schema調和

Hive和Parquet在表結構處理上主要有2個不同點:

  1. Hive大小寫敏感,而Parquet不是
  2. Hive所有字段都是nullable的,而Parquet需要顯示設置

由於以上原因,我們必須在Hive metastore Parquet table轉Spark SQL Parquet table的時候,對Hive metastore schema做調整,調整規則如下:

  1. 兩種schema中字段名和字段類型必須一致(不考慮nullable)。調和後的字段類型必須在Parquet格式中有相對應的數據類型,所以nullable是也是需要考慮的。
  2. 調和後Spark SQL Parquet table schema將包含以下字段:
    • 隻出現在Parquet schema中的字段將被丟棄
    • 隻出現在Hive metastore schema中的字段將被添加進來,並顯式地設為nullable。

刷新元數據

Spark SQL會緩存Parquet元數據以提高性能。如果Hive metastore Parquet table轉換被啟用的話,那麼轉換過來的schema也會被緩存。這時候,如果這些表由Hive或其他外部工具更新了,你必須手動刷新元數據。

// 注意,這裏sqlContext是一個HiveContext
sqlContext.refreshTable("my_table")

配置

Parquet配置可以通過 SQLContext.setConf 或者 SQL語句中 SET key=value來指定。

屬性名 默認值 含義
spark.sql.parquet.binaryAsString false 有些老係統,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不區分二進製數據和字符串類型數據。這個標誌的意思是,讓Spark SQL把二進製數據當字符串處理,以兼容老係統。
spark.sql.parquet.int96AsTimestamp true 有些老係統,如:特定版本的Impala,Hive,把時間戳存成INT96。這個配置的作用是,讓Spark SQL把這些INT96解釋為timestamp,以兼容老係統。
spark.sql.parquet.cacheMetadata true 緩存Parquet schema元數據。可以提升查詢靜態數據的速度。
spark.sql.parquet.compression.codec gzip 設置Parquet文件的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip(默認), lzo
spark.sql.parquet.filterPushdown true 啟用過濾器下推優化,可以講過濾條件盡量推導最下層,已取得性能提升
spark.sql.hive.convertMetastoreParquet true 如果禁用,Spark SQL將使用Hive SerDe,而不是內建的對Parquet tables的支持
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.
ParquetOutputCommitter
Parquet使用的數據輸出類。這個類必須是 org.apache.hadoop.mapreduce.OutputCommitter的子類。一般來說,它也應該是 org.apache.parquet.hadoop.ParquetOutputCommitter的子類。注意:1. 如果啟用spark.speculation, 這個選項將被自動忽略

2. 這個選項必須用hadoop configuration設置,而不是Spark SQLConf

3. 這個選項會覆蓋 spark.sql.sources.outputCommitterClass

Spark SQL有一個內建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 這個類的在輸出到S3的時候比默認的ParquetOutputCommitter類效率高。

spark.sql.parquet.mergeSchema false 如果設為true,那麼Parquet數據源將會merge 所有數據文件的schema,否則,schema是從summary file獲取的(如果summary file沒有設置,則隨機選一個)


轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:32:35

  上一篇:go  《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(四)
  下一篇:go  《麵向機器智能的TensorFlow實踐》TensorFlow與機器學習基礎