閱讀372 返回首頁    go 阿裏雲 go 技術社區[雲棲]


讓CarbonData使用更簡單

CarbonData 是什麼
引用官方的說法:
Apache CarbonData是一種新的高性能數據存儲格式,針對當前大數據領域分析場景需求各異而導致的存儲冗餘問題,CarbonData提供了一種新的融合數據存儲方案,以一份數據同時支持“任意維度組合的過濾查詢、快速掃描、詳單查詢等”多種應用場景,並通過多級索引、字典編碼、列存等特性提升了IO掃描和計算性能,實現百億數據級秒級響應。

CarbonData的使用

我之前寫過一篇使用的文章。CarbonData集群模式體驗。到0.3.0版本,已經把kettle去掉了,並且我提交的PR已經能夠讓其在Spark Streaming中運行。之後將其集成到StreamingPro中,可以簡單通過配置即可完成數據的流式寫入和作為SQL服務被讀取。


準備工作
CarbonData 使用了Hive的MetaStore。
  • MySQL數據庫
  • hive-site.xml 文件
  • 下載StreamingPro with CarbonData
MySQL

創建一個庫:

create database hive CHARACTER SET latin1;

hdfs-site.xml
新建文件 /tmp/hdfs-site.xml,然後寫入如下內容:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>你的賬號</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>你的密碼</value>
</property>

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>file:///tmp/user/hive/warehouse</value>
</property>

<property>
<name>hive.exec.scratchdir</name>
<value>file:///tmp/hive/scratchdir</value>
</property>

<property>
 <name>hive.metastore.uris</name>
 <value></value>
</property>

<property>
  <name>datanucleus.autoCreateSchema</name>
  <value>true</value>
</property>


</configuration>


啟動Spark Streaming寫入數據
新建一個文件,/tmp/streaming-test-carbondata.json,內容如下:

{
  "test": {
    "desc": "測試",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
      "testJoinTable"
    ],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [
          {
            "data1": [
              "1",
              "2",
              "3"
            ],
            "data2": [
              "1",
              "2",
              "3"
            ],
            "data3": [
              "1",
              "2",
              "3"
            ],
            "data4": [
              "1",
              "2",
              "3"
            ]
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [
          {
            "name": "a"
          }
        ]
      },
      {
        "name": "stream.table",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select t2.a,t2.b from test2 t2, testJoinTable t3 where t2.a = t3.a"
          }
        ]
      },
      {
        "name": "stream.output.carbondata",
        "params": [
          {
            "format": "carbondata",
            "mode": "Append",
            "tableName": "carbon4",
            "compress": "true",
            "useKettle": "false",
            "tempCSV":"false"
          }
        ]
      }
    ],
    "configParams": {
    }
  },
  "testJoinTable": {
    "desc": "測試",
    "strategy": "refTable",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.source.MockJsonCompositor",
        "params": [
          {
            "a": "3"
          },
          {
            "a": "4"
          },
          {
            "a": "5"
          }
        ]
      },
      {
        "name": "batch.refTable",
        "params": [
          {
            "tableName": "testJoinTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}


運行即可(spark 1.6 都可以)

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.platform  spark_streaming  \
-streaming.job.file.path file:///tmp/streaming-test-carbondata.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta

如果/tmp/carbondata/store/default/ 目錄生成了文件就代表數據已經寫入。


啟動SQL查詢服務
新建一個/tmp/empty.json文件,內容為:
{}

啟動命令:

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.rest true \
-streaming.spark.service true \
-streaming.platform  spark  \
-streaming.job.file.path file:///tmp/empty.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta
查詢方式:
curl --request POST \
  --url https://127.0.0.1:9003/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --data 'sql=select%20*%20from%20carbon4%20where%20a%3D%223%22&resultType=json'
如果放在PostMan之類的東西裏,是這樣子的:

69bd64f4bf84fb8560667e4147e854cc4de6cbb2

常見問題
如果出現類似

File does not exist: /tmp/carbondata/store/default/carbon3/Fact/Part0/Segment_0

則是因為在你的環境裏找到了hadoop相關的配置文件,比如hdfs-site.xml之類的。去掉或者自己寫一個,比如新建一個 hdfs-site.xml,然後寫入如下內容:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>
</configuration>
這樣就會讀本地文件了。

最後更新:2017-04-01 17:13:52

  上一篇:go StreamingPro 支持多輸入,多輸出配置
  下一篇:go 程序員效率的奧義