《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(二)
编程方式定义Schema
如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:
- 从已有的RDD创建一个包含Row对象的RDD
- 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
- 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame
For example:
例如:
// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// 数据的schema被编码与一个字符串中
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// 基于前面的字符串生成schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")
// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")
// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)
数据源
Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的Spark数据源,然后深入介绍一下内建数据源可用选项。
通用加载/保存函数
在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手动指定选项
你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接对文件使用SQL
Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式
Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。
仅Scala/Java | 所有支持的语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
(默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。 |
SaveMode.Append |
"append" |
如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。 |
SaveMode.Overwrite |
"overwrite" |
如果数据或表已经存在,则用DataFrame数据覆盖之。 |
SaveMode.Ignore |
"ignore" |
如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。 |
保存到持久化表
在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不同,saveAsTable会将DataFrame的实际数据内容保存下来,并且在HiveMetastore中创建一个游标指针。持久化的表会一直保留,即使Spark程序重启也没有影响,只要你连接到同一个metastore就可以读取其数据。读取持久化表时,只需要用用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。
默认情况下,saveAsTable会创建一个”managed table“,也就是说这个表数据的位置是由metastore控制的。同样,如果删除表,其数据也会同步删除。
Parquet文件
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
编程方式加载数据
仍然使用上面例子中的数据:
// 我们继续沿用之前例子中的sqlContext对象
// 为了支持RDD隐式转成DataFrame
import sqlContext.implicits._
val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD
// 该RDD将隐式转成DataFrame,然后保存为parquet文件
people.write.parquet("people.parquet")
// 读取上面保存的Parquet文件(多个文件 - Parquet保存完其实是很多个文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加载Parquet文件,并返回DataFrame结果
val parquetFile = sqlContext.read.parquet("people.parquet")
// Parquet文件(多个)可以注册为临时表,然后在SQL语句中直接查询
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分区发现
像Hive这样的系统,一个很常用的优化手段就是表分区。在一个支持分区的表中,数据是保存在不同的目录中的,并且将分区键以编码方式保存在各个分区目录路径中。Parquet数据源现在也支持自动发现和推导分区信息。例如,我们可以把之前用的人口数据存到一个分区表中,其目录结构如下所示,其中有2个额外的字段,gender和country,作为分区键:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
在这个例子中,如果需要读取Parquet文件数据,我们只需要把 path/to/table 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能够自动的从路径中提取出分区信息,随后返回的DataFrame的schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据作为分区键。
有的用户可能不想要自动推导出来的分区键数据类型。这种情况下,你可以通过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用之后,分区键总是被当成字符串类型。
从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来说,如果用户把 path/to/table/gender=male 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被作为分区键。如果用户想要指定分区发现的基础目录,可以通过basePath选项指定。例如,如果把 path/to/table/gender=male作为数据目录,并且将basePath设为 path/to/table,那么gender仍然会最为分区键。
Schema合并
像ProtoBuffer、Avro和Thrift一样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增加所需的新字段。这样的话,用户最终会得到多个schema不同但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种情况,并合并所有文件的schema。
因为schema合并相对代价比较大,并且在多数情况下不是必要的,所以从Spark-1.5.0之后,默认是被禁用的。你可以这样启用这一功能:
- 读取Parquet文件时,将选项mergeSchema设为true(见下面的示例代码)
- 或者,将全局选项spark.sql.parquet.mergeSchema设为true
// 继续沿用之前的sqlContext对象
// 为了支持RDD隐式转换为DataFrame
import sqlContext.implicits._
// 创建一个简单的DataFrame,存到一个分区目录中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建另一个DataFrame放到新的分区目录中,
// 并增加一个新字段,丢弃一个老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 最终的schema将由3个字段组成(single,double,triple)
// 并且分区键出现在目录路径中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
Hive metastore Parquet table转换
在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,因为这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,而且默认是启用的。
Hive/Parquet schema调和
Hive和Parquet在表结构处理上主要有2个不同点:
- Hive大小写敏感,而Parquet不是
- Hive所有字段都是nullable的,而Parquet需要显示设置
由于以上原因,我们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema做调整,调整规则如下:
- 两种schema中字段名和字段类型必须一致(不考虑nullable)。调和后的字段类型必须在Parquet格式中有相对应的数据类型,所以nullable是也是需要考虑的。
- 调和后Spark SQL Parquet table schema将包含以下字段:
- 只出现在Parquet schema中的字段将被丢弃
- 只出现在Hive metastore schema中的字段将被添加进来,并显式地设为nullable。
刷新元数据
Spark SQL会缓存Parquet元数据以提高性能。如果Hive metastore Parquet table转换被启用的话,那么转换过来的schema也会被缓存。这时候,如果这些表由Hive或其他外部工具更新了,你必须手动刷新元数据。
// 注意,这里sqlContext是一个HiveContext
sqlContext.refreshTable("my_table")
配置
Parquet配置可以通过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。 |
spark.sql.parquet.int96AsTimestamp |
true | 有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的作用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。 |
spark.sql.parquet.cacheMetadata |
true | 缓存Parquet schema元数据。可以提升查询静态数据的速度。 |
spark.sql.parquet.compression.codec |
gzip | 设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo |
spark.sql.parquet.filterPushdown |
true | 启用过滤器下推优化,可以讲过滤条件尽量推导最下层,已取得性能提升 |
spark.sql.hive.convertMetastoreParquet |
true | 如果禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持 |
spark.sql.parquet.output.committer.class |
org.apache.parquet.hadoop. |
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。一般来说,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 如果启用spark.speculation, 这个选项将被自动忽略
2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf 3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。 |
spark.sql.parquet.mergeSchema |
false |
如果设为true,那么Parquet数据源将会merge 所有数据文件的schema,否则,schema是从summary file获取的(如果summary file没有设置,则随机选一个) |
最后更新:2017-05-19 15:32:35