使用StreamingPro 快速構建Spark SQL on CarbonData
前言CarbonData已經發布了1.0版本,變更還是很快的,這個版本已經移除了kettle了,使得部署和使用 變得很簡單,而且支持1.6+ ,2.0+等多個Spark版本。
StreamingPro可以使得你很簡單通過一個命令就能體驗Carbondata,並且支持Http/JDBC的訪問形態。
比如我下載後的版本是這個: spark-1.6.3-bin-hadoop2.6。
地址在這: https://pan.baidu.com/s/1eRO5Wga ,你會得到一個比較大的Jar包。
同時你需要到maven下載一個 carbondata-spark-1.0.0-incubating.jar ,這個因為一些特殊原因才會用到。
你需要一個數據庫
因為我們用到了Hive 的mysql,所以你需要準備一個可以連接的數據庫。隻要能連接就行。如果沒有,比如你是mac的話,用
brew install mysql
即可。然後brew services start mysql創建一個數據庫:
create database hive CHARACTER SET latin1
//如果數據庫包字符異常啥的,啟動完streamingpro後到數據庫做如下更改:
alter table PARTITIONS convert to character set latin1;
alter table PARTITION_KEYS convert to character set latin1;
寫一個hive-site.xml文件
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>你的mysql賬號</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>你的mysql密碼</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>file:///tmp/user/hive/warehouse</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>file:///tmp/hive/scratchdir</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value></value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>true</value>
</property>
</configuration>
可以啟動了
//streamingpro jar包所處的目錄,
//裏麵新建一個query.json文件,裏麵放一個大括號就行
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name sql-interactive \
--jars /Users/allwefantasy/.m2/repository/org/apache/carbondata/carbondata-spark/1.0.0-incubating/carbondata-spark-1.0.0-incubating.jar \
--files $SHome/hive-site.xml \
--conf "spark.sql.hive.thriftServer.singleSession=true" \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar \
-streaming.name sql-interactive \
-streaming.job.file.path file://$SHome/query.json \
-streaming.platform spark \
-streaming.rest true \
-streaming.driver.port 9004 \
-streaming.spark.service true \
-streaming.thrift true \
-streaming.enableCarbonDataSupport true \
-streaming.enableHiveSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta
參數比較多。大家不用管他。 這樣http端口是9004, jdbc端口是 10000。我們可以通過http創建一張表
, city string, age Int) STORED BY 'carbondata'
curl --request POST \
--url https://127.0.0.1:9004/run/sql \
--header 'cache-control: no-cache' \
--header 'content-type: application/x-www-form-urlencoded' \
--header 'postman-token: 731441ac-c398-9a1b-2f06-8725ddbe84cd' \
--data 'sql=CREATE%20TABLE%20IF%20NOT%20EXISTS%20test_table4(id%20string%2C%20name%20string%2C%20city%20string%2C%20age%20Int)%20STORED%20BY%20'\''carbondata'\'''
寫入數據前,我們建立一個sample.csv的文件,id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
然後將這個文件導入://實際SQL:LOAD DATA LOCAL INPATH '/Users/allwefantasy/streamingpro/sample.csv' INTO TABLE test_table4
curl --request POST \
--url https://127.0.0.1:9004/run/sql \
--header 'cache-control: no-cache' \
--header 'content-type: application/x-www-form-urlencoded' \
--header 'postman-token: 5eb19ab4-653c-d05f-29ab-6003d7e83755' \
--data 'sql=LOAD%20DATA%20LOCAL%20INPATH%20%20'\''%2FUsers%2Fallwefantasy%2Fstreamingpro%2Fsample.csv'\''%20%20INTO%20TABLE%20test_table4'
這個使用我們可以用http查詢://sql: SELECT * FROM test_table4
curl --request POST \
--url https://127.0.0.1:9004/run/sql \
--header 'cache-control: no-cache' \
--header 'content-type: application/x-www-form-urlencoded' \
--header 'postman-token: d99349ae-b226-8a4e-4d65-d92b1771c111' \
--data 'sql=SELECT%20*%20FROM%20test_table4'
你也可以寫一個jdbc程序:object ScalaJdbcConnectSelect {
def main(args: Array[String]) {
// connect to the database named "mysql" on the localhost
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:hive2://localhost:10000/default"
// there's probably a better way to do this
var connection:Connection = null
try {
// make the connection
Class.forName(driver)
connection = DriverManager.getConnection(url)
// create the statement, and run the select query
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT * FROM test_table4 ")
while ( resultSet.next() ) {
println(" city = "+ resultSet.getString("city") )
}
} catch {
case e => e.printStackTrace
}
connection.close()
}
}
完成。
最後更新:2017-04-01 17:13:51