《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南(三)
JSON數據集
Spark SQL在加載JSON數據的時候,可以自動推導其schema並返回DataFrame。用SQLContext.read.json讀取一個包含String的RDD或者JSON文件,即可實現這一轉換。
注意,通常所說的json文件隻是包含一些json數據的文件,而不是我們所需要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。因此,一個常規的多行json文件經常會加載失敗。
// sc是已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 數據集是由路徑指定的
// 路徑既可以是單個文件,也可以還是存儲文本文件的目錄
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 推導出來的schema,可由printSchema打印出來
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 將DataFrame注冊為table
people.registerTempTable("people")
// 跑SQL語句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 另一種方法是,用一個包含JSON字符串的RDD來創建DataFrame
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive表
Spark SQL支持從Apache Hive讀寫數據。然而,Hive依賴項太多,所以沒有把Hive包含在默認的Spark發布包裏。要支持Hive,需要在編譯spark的時候增加-Phive和-Phive-thriftserver標誌。這樣編譯打包的時候將會把Hive也包含進來。注意,hive的jar包也必須出現在所有的worker節點上,訪問Hive數據時候會用到(如:使用hive的序列化和反序列化SerDes時)。
Hive配置在conf/目錄下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。請注意,如果在YARN cluster(yarn-cluster mode)模式下執行一個查詢的話,lib_mananged/jar/下麵的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅動器(driver)和所有執行器(executor)都可用。一種簡便的方法是,通過spark-submit命令的–jars和–file選項來提交這些文件。
如果使用Hive,則必須構建一個HiveContext,HiveContext是派生於SQLContext的,添加了在Hive Metastore裏查詢表的支持,以及對HiveQL的支持。用戶沒有現有的Hive部署,也可以創建一個HiveContext。如果沒有在hive-site.xml裏配置,那麼HiveContext將會自動在當前目錄下創建一個metastore_db目錄,再根據HiveConf設置創建一個warehouse目錄(默認/user/hive/warehourse)。所以請注意,你必須把/user/hive/warehouse的寫權限賦予啟動spark應用程序的用戶。
// sc是一個已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 這裏用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
和不同版本的Hive Metastore交互
Spark SQL對Hive最重要的支持之一就是和Hive metastore進行交互,這使得Spark SQL可以訪問Hive表的元數據。從Spark-1.4.0開始,Spark SQL有專門單獨的二進製build版本,可以用來訪問不同版本的Hive metastore,其配置表如下。注意,不管所訪問的hive是什麼版本,Spark SQL內部都是以Hive 1.2.1編譯的,而且內部使用的Hive類也是基於這個版本(serdes,UDFs,UDAFs等)
以下選項可用來配置Hive版本以便訪問其元數據:
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive metastore版本,可選的值為0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars |
builtin |
初始化HiveMetastoreClient的jar包。這個屬性可以是以下三者之一:
目前內建為使用Hive-1.2.1,編譯的時候啟用-Phive,則會和spark一起打包。如果沒有-Phive,那麼spark.sql.hive.metastore.version要麼是1.2.1,要就是未定義
使用maven倉庫下載的jar包版本。這個選項建議不要再生產環境中使用
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一個逗號分隔的類名前綴列表,這些類使用classloader加載,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有交互的類。例如,自定義的log4j appender |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
一個逗號分隔的類名前綴列表,這些類在每個Spark SQL所訪問的Hive版本中都會被顯式的reload。例如,某些在共享前綴列表(spark.sql.hive.metastore.sharedPrefixes)中聲明為共享的Hive UD函數 |
用JDBC連接其他數據庫
Spark SQL也可以用JDBC訪問其他數據庫。這一功能應該優先於使用JdbcRDD。因為它返回一個DataFrame,而DataFrame在Spark SQL中操作更簡單,且更容易和來自其他數據源的數據進行交互關聯。JDBC數據源在java和python中用起來也很簡單,不需要用戶提供額外的ClassTag。(注意,這與Spark SQL JDBC server不同,Spark SQL JDBC server允許其他應用執行Spark SQL查詢)
首先,你需要在spark classpath中包含對應數據庫的JDBC driver,下麵這行包括了用於訪問postgres的數據庫driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
遠程數據庫的表可以通過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。以下是選項列表:
屬性名 | 含義 |
---|---|
url |
需要連接的JDBC URL |
dbtable |
需要讀取的JDBC表。注意,任何可以填在SQL的where子句中的東西,都可以填在這裏。(既可以填完整的表名,也可填括號括起來的子查詢語句) |
driver |
JDBC driver的類名。這個類必須在master和worker節點上都可用,這樣各個節點才能將driver注冊到JDBC的子係統中。 |
partitionColumn, lowerBound, upperBound, numPartitions |
這幾個選項,如果指定其中一個,則必須全部指定。他們描述了多個worker如何並行的讀入數據,並將表分區。partitionColumn必須是所查詢的表中的一個數值字段。注意,lowerBound和upperBound隻是用於決定分區跨度的,而不是過濾表中的行。因此,表中所有的行都會被分區然後返回。 |
fetchSize |
JDBC fetch size,決定每次獲取多少行數據。在JDBC驅動上設成較小的值有利於性能優化(如,Oracle上設為10) |
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
疑難解答
- JDBC driver class必須在所有client session或者executor上,對java的原生classloader可見。這是因為Java的DriverManager在打開一個連接之前,會做安全檢查,並忽略所有對原聲classloader不可見的driver。最簡單的一種方法,就是在所有worker節點上修改compute_classpath.sh,並包含你所需的driver jar包。
- 一些數據庫,如H2,會把所有的名字轉大寫。對於這些數據庫,在Spark SQL中必須也使用大寫。
性能調整
對於有一定計算量的Spark作業來說,可能的性能改進的方式,不是把數據緩存在內存裏,就是調整一些開銷較大的選項參數。
內存緩存
Spark SQL可以通過調用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減少內存占用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來刪除內存中的table。
你還可以使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 如果設置為true,Spark SQL將會根據數據統計信息,自動為每一列選擇單獨的壓縮編碼方式。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控製列式緩存批量的大小。增大批量大小可以提高內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。 |
其他配置選項
以下選項同樣也可以用來給查詢任務調性能。不過這些選項在未來可能被放棄,因為spark將支持越來越多的自動優化。
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置join操作時,能夠作為廣播變量的最大table的大小。設置為-1,表示禁用廣播。注意,目前的元數據統計僅支持Hive metastore中的表,並且需要運行這個命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan |
spark.sql.tungsten.enabled |
true | 設為true,則啟用優化的Tungsten物理執行後端。Tungsten會顯式的管理內存,並動態生成表達式求值的字節碼 |
spark.sql.shuffle.partitions |
200 | 配置數據混洗(shuffle)時(join或者聚合操作),使用的分區數。 |
分布式SQL引擎
Spark SQL可以作為JDBC/ODBC或者命令行工具的分布式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就可以直接在Spark SQL中運行SQL查詢。
運行Thrift JDBC/ODBC server
這裏實現的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2
是相同的。你可以使用beeline腳本來測試Spark或者Hive-1.2.1的JDBC server。
在Spark目錄下運行下麵這個命令,啟動一個JDBC/ODBC server
./sbin/start-thriftserver.sh
這個腳本能接受所有 bin/spark-submit 命令支持的選項參數,外加一個 –hiveconf 選項,來指定Hive屬性。運行./sbin/start-thriftserver.sh –help可以查看完整的選項列表。默認情況下,啟動的server將會在localhost:10000端口上監聽。要改變監聽主機名或端口,可以用以下環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
或者Hive係統屬性 來指定
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
接下來,你就可以開始在beeline中測試這個Thrift JDBC/ODBC server:
./bin/beeline
下麵的指令,可以連接到一個JDBC/ODBC server
beeline> !connect jdbc:hive2://localhost:10000
可能需要輸入用戶名和密碼。在非安全模式下,隻要輸入你本機的用戶名和一個空密碼即可。對於安全模式,請參考beeline documentation.
Hive的配置是在conf/目錄下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。
你也可以在beeline的腳本中指定。
Thrift JDBC server也支持通過HTTP傳輸Thrift RPC消息。以下配置(在conf/hive-site.xml中)將啟用HTTP模式:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
同樣,在beeline中也可以用HTTP模式連接JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
最後更新:2017-05-19 15:32:33