閱讀672 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(四)

使用Spark SQL命令行工具

Spark SQL CLI是一個很方便的工具,它可以用local mode運行hive metastore service,並且在命令行中執行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通信。

用如下命令,在spark目錄下啟動一個Spark SQL CLI

./bin/spark-sql

Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你可以用這個命令查看完整的選項列表:./bin/spark-sql –help

升級指南

1.5升級到1.6

  • 從Spark-1.6.0起,默認Thrift server 將運行於多會話並存模式下(multi-session)。這意味著,每個JDBC/ODBC連接有其獨立的SQL配置和臨時函數注冊表。table的緩存仍然是公用的。如果你更喜歡老的單會話模式,隻需設置spark.sql.hive.thriftServer.singleSession為true即可。當然,你也可在spark-defaults.conf中設置,或者將其值傳給start-thriftserver.sh –conf(如下):
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

1.4升級到1.5

  • Tungsten引擎現在默認是啟用的,Tungsten是通過手動管理內存優化執行計劃,同時也優化了表達式求值的代碼生成。這兩個特性都可以通過把spark.sql.tungsten.enabled設為false來禁用。
  • Parquet schema merging默認不啟用。需要啟用的話,設置spark.sql.parquet.mergeSchema為true即可
  • Python接口支持用點(.)來訪問字段內嵌值,例如df[‘table.column.nestedField’]。但這也意味著,如果你的字段名包含點號(.)的話,你就必須用重音符來轉義,如:table.`column.with.dots`.nested。
  • 列式存儲內存分區剪枝默認是啟用的。要禁用,設置spark.sql.inMemoryColumarStorage.partitionPruning為false即可
  • 不再支持無精度限製的decimal。Spark SQL現在強製最大精度為38位。對於BigDecimal對象,類型推導將會使用(38,18)精度的decimal類型。如果DDL中沒有指明精度,默認使用的精度是(10,0)
  • 時間戳精確到1us(微秒),而不是1ns(納秒)
  • 在“sql”這個SQL變種設置中,浮點數將被解析為decimal。HiveQL解析保持不變。
  • 標準SQL/DataFrame函數均為小寫,例如:sum vs SUM。
  • 當推測任務被啟用是,使用DirectOutputCommitter是不安全的,因此,DirectOutputCommitter在推測任務啟用時,將被自動禁用,且忽略相關配置。
  • JSON數據源不再自動加載其他程序產生的新文件(例如,不是Spark SQL插入到dataset中的文件)。對於一個JSON的持久化表(如:Hive metastore中保存的表),用戶可以使用REFRESH TABLE這個SQL命令或者HiveContext.refreshTable來把新文件包括進來。

1.3升級到1.4

DataFrame數據讀寫接口

根據用戶的反饋,我們提供了一個新的,更加流暢的API,用於數據讀(SQLContext.read)寫(DataFrame.write),同時老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)將被廢棄。

有關SQLContext.read和DataFrame.write的更詳細信息,請參考API文檔。

DataFrame.groupBy保留分組字段

根據用戶的反饋,我們改變了DataFrame.groupBy().agg()的默認行為,在返回的DataFrame結果中保留了分組字段。如果你想保持1.3中的行為,設置spark.sql.retainGroupColumns為false即可。

// 在1.3.x中,如果要保留分組字段"department", 你必須顯式的在agg聚合時包含這個字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 而在1.4+,分組字段"department"默認就會包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滾到1.3的行為(不包含分組字段),按如下設置即可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

1.2升級到1.3

在Spark 1.3中,我們去掉了Spark SQL的”Alpha“標簽,並清理了可用的API。從Spark 1.3起,Spark SQL將對1.x係列二進製兼容。這個兼容性保證不包括顯式的標注為”unstable(如:DeveloperAPI或Experimental)“的API。

SchemaRDD重命名為DataFrame

對於用戶來說,Spark SQL 1.3最大的改動就是SchemaRDD改名為DataFrame。主要原因是,DataFrame不再直接由RDD派生,而是通過自己的實現提供RDD的功能。DataFrame隻需要調用其rdd方法就能轉成RDD。

在Scala中仍然有SchemaRDD,隻不過這是DataFrame的一個別名,以便兼容一些現有代碼。但仍然建議用戶改用DataFrame。Java和Python用戶就沒這個福利了,他們必須改代碼。

統一Java和Scala API

在Spark 1.3之前,有單獨的java兼容類(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的鏡像。Spark 1.3中將Java API和Scala API統一。兩種語言的用戶都應該使用SQLContext和DataFrame。一般這些類中都會使用兩種語言中都有的類型(如:Array取代各語言獨有的集合)。有些情況下,沒有通用的類型(例如:閉包或者maps),將會使用函數重載來解決這個問題。

另外,java特有的類型API被刪除了。Scala和java用戶都應該用org.apache.spark.sql.types來編程描述一個schema。

隱式轉換隔離,DSL包移除 – 僅針對scala

Spark 1.3之前的很多示例代碼,都在開頭用 import sqlContext._,這行將會導致所有的sqlContext的函數都被引入進來。因此,在Spark 1.3我們把RDDs到DataFrames的隱式轉換隔離出來,單獨放到SQLContext.implicits對象中。用戶現在應該這樣寫:import sqlContext.implicits._

另外,隱式轉換也支持由Product(如:case classes或tuples)組成的RDD,但需要調用一個toDF方法,而不是自動轉換。

如果需要使用DSL(被DataFrame取代的API)中的方法,用戶之前需要導入DSL(import org.apache.spark.sql.catalyst.dsl), 而現在應該要導入 DataFrame API(import org.apache.spark.sql.functions._)

移除org.apache.spark.sql中DataType別名 – 僅針對scala

Spark 1.3刪除了sql包中的DataType類型別名。現在,用戶應該使用 org.apache.spark.sql.types中的類。

UDF注冊挪到sqlContext.udf中 – 針對java和scala

注冊UDF的函數,不管是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF注冊保持不變。

Python DataTypes不再是單例

在python中使用DataTypes,你需要先構造一個對象(如:StringType()),而不是引用一個單例。

Shark用戶遷移指南

調度

用戶可以通過如下命令,為JDBC客戶端session設定一個Fair Scheduler pool。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer個數

在Shark中,默認的reducer個數是1,並且由mapred.reduce.tasks設定。Spark SQL廢棄了這個屬性,改為 spark.sql.shuffle.partitions, 並且默認200,用戶可通過如下SET命令來自定義:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

你也可以把這個屬性放到hive-site.xml中來覆蓋默認值。

目前,mapred.reduce.tasks屬性仍然能被識別,並且自動轉成spark.sql.shuffle.partitions

緩存

shark.cache表屬性已經不存在了,並且以”_cached”結尾命名的表也不再會自動緩存。取而代之的是,CACHE TABLE和UNCACHE TABLE語句,用以顯式的控製表的緩存:

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意:CACHE TABLE tbl 現在默認是饑餓模式,而非懶惰模式。再也不需要手動調用其他action來觸發cache了!

從Spark-1.2.0開始,Spark SQL新提供了一個語句,讓用戶自己控製表緩存是否是懶惰模式

CACHE [LAZY] TABLE [AS SELECT] ...

以下幾個緩存相關的特性不再支持:

  • 用戶定義分區級別的緩存逐出策略
  • RDD 重加載
  • 內存緩存直接寫入策略

兼容Apache Hive

Spark SQL設計時考慮了和Hive metastore,SerDes以及UDF的兼容性。目前這些兼容性鬥是基於Hive-1.2.1版本,並且Spark SQL可以連到不同版本的Hive metastore(從0.12.0到1.2.1,參考:https://spark.apache.org/docs/latest/sql-programming-guide.html

部署在已有的Hive倉庫之上

Spark SQL Thrift JDBC server采用了”out of the box”(開箱即用)的設計,使用很方便,並兼容已有的Hive安裝版本。你不需要修改已有的Hive metastore或者改變數據的位置,或者表分區。

支持的Hive功能

Spark SQL 支持絕大部分Hive功能,如:

  • Hive查詢語句:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有的Hive操作符:
    • Relational operators (===<><>>=<=, etc)
    • Arithmetic operators (+-*/%, etc)
    • Logical operators (AND&&OR||, etc)
    • Complex type constructors
    • Mathematical functions (signlncos, etc)
    • String functions (instrlengthprintf, etc)
  • 用戶定義函數(UDF)
  • 用戶定義聚合函數(UDAF)
  • 用戶定義序列化、反序列化(SerDes)
  • 窗口函數(Window functions)
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 查詢子句
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 采樣
  • 執行計劃詳細(Explain)
  • 分區表,包括動態分區插入
  • 視圖
  • 所有Hive DDL(data definition language):
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 絕大部分Hive數據類型:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

不支持的Hive功能

以下是目前不支持的Hive特性的列表。多數是不常用的。

不支持的Hive常見功能

  • bucket表:butcket是Hive表的一個哈希分區

不支持的Hive高級功能

  • UNION類操作
  • 去重join
  • 字段統計信息收集:Spark SQL不支持同步的字段統計收集

Hive輸入、輸出格式

  • CLI文件格式:對於需要回顯到CLI中的結果,Spark SQL僅支持TextOutputFormat。
  • Hadoop archive — Hadoop歸檔

Hive優化

一些比較棘手的Hive優化目前還沒有在Spark中提供。有一些(如索引)對應Spark SQL這種內存計算模型來說並不重要。另外一些,在Spark SQL未來的版本中會支持。

  • 塊級別位圖索引和虛擬字段(用來建索引)
  • 自動計算reducer個數(join和groupBy算子):目前在Spark SQL中你需要這樣控製混洗後(post-shuffle)並發程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
  • 元數據查詢:隻查詢元數據的請求,Spark SQL仍需要啟動任務來計算結果
  • 數據傾斜標誌:Spark SQL不會理會Hive中的數據傾斜標誌
  • STREAMTABLE join提示:Spark SQL裏沒有這玩藝兒
  • 返回結果時合並小文件:如果返回的結果有很多小文件,Hive有個選項設置,來合並小文件,以避免超過HDFS的文件數額度限製。Spark SQL不支持這個。

參考

數據類型

Spark SQL和DataFrames支持如下數據類型:

  • Numeric types(數值類型)
    • ByteType: 1字節長的有符號整型,範圍:-128 到 127.
    • ShortType: 2字節長有符號整型,範圍:-32768 到 32767.
    • IntegerType: 4字節有符號整型,範圍:-2147483648 到 2147483647.
    • LongType: 8字節有符號整型,範圍: -9223372036854775808 to 9223372036854775807.
    • FloatType: 4字節單精度浮點數。
    • DoubleType: 8字節雙精度浮點數
    • DecimalType: 任意精度有符號帶小數的數值。內部使用java.math.BigDecimal, BigDecimal包含任意精度的不縮放整型,和一個32位的縮放整型
  • String type(字符串類型)
    • StringType: 字符串
  • Binary type(二進製類型)
    • BinaryType: 字節序列
  • Boolean type(布爾類型)
    • BooleanType: 布爾類型
  • Datetime type(日期類型)
    • TimestampType: 表示包含年月日、時分秒等字段的日期
    • DateType: 表示包含年月日字段的日期
  • Complex types(複雜類型)
    • ArrayType(elementType, containsNull):數組類型,表達一係列的elementType類型的元素組成的序列,containsNull表示數組能否包含null值
    • MapType(keyType, valueType, valueContainsNull):映射集合類型,表示一個鍵值對的集合。鍵的類型是keyType,值的類型則由valueType指定。對應MapType來說,鍵是不能為null的,而值能否為null則取決於valueContainsNull。
    • StructType(fields):表示包含StructField序列的結構體。
      • StructField(name, datatype, nullable): 表示StructType中的一個字段,name是字段名,datatype是數據類型,nullable表示該字段是否可以為空

所有Spark SQL支持的數據類型都在這個包裏:org.apache.spark.sql.types,你可以這樣導入之:

import  org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:默認containsNull為true
MapType scala.collection.Map MapType(keyTypevalueType, [valueContainsNull])注意:默認valueContainsNull為true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一個StructFields的序列,並且同名的字段是不允許的。
StructField 定義字段的數據對應的Scala類型(例如,如果StructField的dataType為IntegerType,則其數據對應的scala類型為Int) StructField(namedataTypenullable)

NaN語義

這是Not-a-Number的縮寫,某些float或double類型不符合標準浮點數語義,需要對其特殊處理:

  • NaN == NaN,即:NaN和NaN總是相等
  • 在聚合函數中,所有NaN分到同一組
  • NaN在join操作中可以當做一個普通的join key
  • NaN在升序排序中排到最後,比任何其他數值都大
  • 轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:32:36

  上一篇:go  《麵向機器智能的TensorFlow實踐》導讀
  下一篇:go  《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(二)