閱讀476 返回首頁    go windows


導入導出數據__快速開始_大數據計算服務-阿裏雲

MaxCompute提供多種數據導入導出方式:直接在客戶端使用Tunnel命令 或者通過 TUNNEL 提供的SDK自行編寫Java工具,以及通過Flume及Fluentd插件方式導入。

Tunnel命令導入數據

準備數據

假設我們準備本地文件wc_example.txt,內容如下:

  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!

這裏我們把該數據文件保存在D:odpsodpsbin目錄下。

創建ODPS表

我們需要把上麵的數據導入到MaxCompute的一張表中,所以需要創建一張表:

  1. CREATE TABLE wc_in (word string);

執行tunnel命令

輸入表創建成功後,可以在MaxCompute客戶端輸入tunnel命令進行數據的導入,如下:

  1. tunnel upload D:odpsodpsbinwc_example.txt wc_in;

執行成功後,查看表wc_in的記錄,如下:

  1. odps@ $odps_project>select * from wc_in;
  2. ID = 20150918110501864g5z9c6
  3. Log view:
  4. https://webconsole.odps.aliyun-inc.com:8080/logview/?h=https://service-corp.odps.aliyun-inc.com/api&p=odps_public_dev&i=20150918
  5. QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
  6. +------+
  7. | word |
  8. +------+
  9. | I LOVE CHINA! |
  10. | MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL! |
  11. +------+

注意:

  • 有關Tunnel命令的更多詳細介紹,例如:如何將數據導入分區表,請參考Tunnel操作
  • 當表中含有多個列時,可以通過“-fd”參數指定列分隔符;

Tunnel SDK

關於如何利用tunnel SDK進行上傳數據,下麵也將通過場景介紹。場景描述:上傳數據到ODPS,其中,項目空間為”odps_public_dev”,表名為”tunnel_sample_test”,分區為”pt=20150801,dt=hangzhou”。

  1. 創建表,添加分區:
    1. CREATE TABLE IF NOT EXISTS tunnel_sample_test(
    2. id STRING,
    3. name STRING)
    4. PARTITIONED BY (pt STRING, dt STRING); --創建表
    5. ALTER TABLE tunnel_sample_test
    6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分區
  2. 創建UploadSample的工程目錄結構,如下:
    1. |---pom.xml
    2. |---src
    3. |---main
    4. |---java
    5. |---com
    6. |---aliyun
    7. |---odps
    8. |---tunnel
    9. |---example
    10. |---UploadSample.java
    UploadSample : tunnel源文件pom.xml : maven工程文件
  3. 編寫UploadSample程序,程序如下:

    1. package com.aliyun.odps.tunnel.example;
    2. import java.io.IOException;
    3. import java.util.Date;
    4. import com.aliyun.odps.Column;
    5. import com.aliyun.odps.Odps;
    6. import com.aliyun.odps.PartitionSpec;
    7. import com.aliyun.odps.TableSchema;
    8. import com.aliyun.odps.account.Account;
    9. import com.aliyun.odps.account.AliyunAccount;
    10. import com.aliyun.odps.data.Record;
    11. import com.aliyun.odps.data.RecordWriter;
    12. import com.aliyun.odps.tunnel.TableTunnel;
    13. import com.aliyun.odps.tunnel.TunnelException;
    14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    15. public class UploadSample {
    16. private static String accessId = "####";
    17. private static String accessKey = "####";
    18. private static String tunnelUrl = "https://dt-corp.odps.aliyun-inc.com";
    19. private static String odpsUrl = "https://service-corp.odps.aliyun-inc.com/api";
    20. private static String project = "odps_public_dev";
    21. private static String table = "tunnel_sample_test";
    22. private static String partition = "pt=20150801,dt=hangzhou";
    23. public static void main(String args[]) {
    24. Account account = new AliyunAccount(accessId, accessKey);
    25. Odps odps = new Odps(account);
    26. odps.setEndpoint(odpsUrl);
    27. odps.setDefaultProject(project);
    28. try {
    29. TableTunnel tunnel = new TableTunnel(odps);
    30. tunnel.setEndpoint(tunnelUrl);
    31. PartitionSpec partitionSpec = new PartitionSpec(partition);
    32. UploadSession uploadSession = tunnel.createUploadSession(project,
    33. table, partitionSpec);
    34. System.out.println("Session Status is : "
    35. + uploadSession.getStatus().toString());
    36. TableSchema schema = uploadSession.getSchema();
    37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
    38. Record record = uploadSession.newRecord();
    39. for (int i = 0; i < schema.getColumns().size(); i++) {
    40. Column column = schema.getColumn(i);
    41. switch (column.getType()) {
    42. case BIGINT:
    43. record.setBigint(i, 1L);
    44. break;
    45. case BOOLEAN:
    46. record.setBoolean(i, true);
    47. break;
    48. case DATETIME:
    49. record.setDatetime(i, new Date());
    50. break;
    51. case DOUBLE:
    52. record.setDouble(i, 0.0);
    53. break;
    54. case STRING:
    55. record.setString(i, "sample");
    56. break;
    57. default:
    58. throw new RuntimeException("Unknown column type: "
    59. + column.getType());
    60. }
    61. }
    62. for (int i = 0; i < 10; i++) {
    63. recordWriter.write(record);
    64. }
    65. recordWriter.close();
    66. uploadSession.commit(new Long[]{0L});
    67. System.out.println("upload success!");
    68. } catch (TunnelException e) {
    69. e.printStackTrace();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }

    注解:這裏我們省略了accessId和accesskey的配置,實際運行時請換上您自己的accessId以及accessKey。

  4. pom.xml文件配置如下:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="https://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.aliyun.odps.tunnel.example</groupId>
    7. <artifactId>UploadSample</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>com.aliyun.odps</groupId>
    12. <artifactId>odps-sdk-core</artifactId>
    13. <version>0.20.7-public</version>
    14. </dependency>
    15. </dependencies>
    16. <repositories>
    17. <repository>
    18. <id>alibaba</id>
    19. <name>alibaba Repository</name>
    20. <url>https://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
    21. </repository>
    22. </repositories>
    23. </project>
  5. 編譯與運行:編譯UploadSample工程:
    1. mvn package
    運行UploadSample程序,這裏我們利用eclipse導入maven project:右擊java工程並點擊< Import->Maven->Existing Maven Projects >設置如下:

右擊UploadSample.java並點擊< Run As->Run Configurations >,如下:

點擊< Run >運行成功:控製台顯示:

  1. Session Status is : NORMAL
  2. upload success!
  1. 查看運行結果:在客戶端輸入:
    1. select * from tunnel_sample_test;
    顯示結果如下:
    1. +----+------+----+----+
    2. | id | name | pt | dt |
    3. +----+------+----+----+
    4. | sample | sample | 20150801 | hangzhou |
    5. | sample | sample | 20150801 | hangzhou |
    6. | sample | sample | 20150801 | hangzhou |
    7. | sample | sample | 20150801 | hangzhou |
    8. | sample | sample | 20150801 | hangzhou |
    9. | sample | sample | 20150801 | hangzhou |
    10. | sample | sample | 20150801 | hangzhou |
    11. | sample | sample | 20150801 | hangzhou |
    12. | sample | sample | 20150801 | hangzhou |
    13. | sample | sample | 20150801 | hangzhou |
    14. +----+------+----+----+

    備注:

    • Tunnel作為MaxCompute中一個獨立的服務,有專屬的訪問端口提供給大家。當用戶在阿裏雲內網環境中,使用Tunnel內網連接下載數據時,MaxCompute不會將該操作產生的流量計入計費。此外內網地址僅對上海域的雲產品有效。

Fluentd導入方案

除了通過客戶端及Tunnel Java SDK導入數據外,MaxCompute還提供了導入數據的工具-Fluentd。

Fluentd是一個開源的軟件,用來收集各種源頭日誌(包括Application Log、Sys Log及Access Log),允許用戶選擇插件對日誌數據進行過濾、並存儲到不同的數據處理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧靈活而著稱,允許用戶自定義數據源、過濾處理及目標端等插件。目前在這款軟件中已經有300+個插件運行Fluentd的架構上,而且這些插件全部是開源的。 MaxCompute也在這款軟件上開源了數據導入插件。

環境準備

使用這款軟件,向MaxCompute導入數據,需要具備如下環境:

  • Ruby 2.1.0 或更新
  • Gem 2.4.5 或更新
  • Fluentd-0.10.49 或從 Fluentd 官網 查找最新,Fluentd為不同的OS提供了不同的版本,詳見 Fluentd 官方文檔
  • Protobuf-3.5.1 或更新(Ruby protobuf)

安裝導入插件

接下來可以通過以下兩種方式中的任意一種來安裝MaxCompute Fluentd 導入插件。

方式一:通過ruby gem安裝

  1. $ gem install fluent-plugin-aliyun-odps

MaxCompute已經將這個插件發布到GEM庫中, 名稱為 fluent-plugin-aliyun-odps,隻需要通過gem install 命令來安裝即可(大家在使用gem 時在國內可能會遇到gem庫無法訪問,可以在網上搜一下更改gem 庫源來解決)。

方式二:通過插件源碼安裝

  1. $ gem install protobuf
  2. $ gem install fluentd --no-ri --no-rdoc
  3. $ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
  4. $ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

其中第二條命令是安裝fluentd,如果已經安裝可以省略。 MaxCompute Fluentd插件源碼在github上,clone下來之後直接放到Fluentd的plugin目錄中即可。

插件的使用

使用Fluentd導入數據時,最主要的是配置Fluentd的conf文件,更多conf文件 的介紹請參考 Fluentd 配置文件介紹

示例一:導入Nginx日誌 。Conf中source的配置如下。

  1. <source>
  2. type tail
  3. path /opt/log/in/in.log
  4. pos_file /opt/log/in/in.log.pos
  5. refresh_interval 5s
  6. tag in.log
  7. format /^(?<remote>[^ ]*) - - [(?<datetime>[^]]*)] "(?<method>S+)(?: +(?<path>[^"]*?)(?: +S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^"]*)"$/
  8. time_format %Y%b%d %H:%M:%S %z
  9. </source>

fluentd 以tail方式監控指定的文件內容是否有變化,更多的tail配置參見:Fluentd 官方文檔

match 配置如下:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint https://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project projectforlog
  11. <table in.log>
  12. table nginx_log
  13. fields remote,method,path,code,size,agent
  14. partition ctime=${datetime.strftime('%Y%m%d')}
  15. time_format %d/%b/%Y:%H:%M:%S %z
  16. </table>
  17. </match>

數據會導入到projectforlog project的nginx_log表中,其中會以源中的datetime字段作為分區,插件遇到不同的值時會自動創建分區。

示例二:導入MySQL中的數據。導入MySQL中數據時,需要安裝fluent-plugin-sql插件作為數據源:

  1. $ gem install fluent-plugin-sql

配置conf中的source:

  1. <source>
  2. type sql
  3. host 127.0.0.1
  4. database test
  5. adapter mysql
  6. username xxxx
  7. password xxxx
  8. select_interval 10s
  9. select_limit 100
  10. state_file /path/sql_state
  11. <table>
  12. table test_table
  13. tag in.sql
  14. update_column id
  15. </table>
  16. </source>

這個例子是從test_table中SELECT數據,每間隔10s去讀取100條數據出來,SELECT 時將ID列作為主鍵(id字段是自增型)。關於fluent-plugin-sql的更多說明參見 Fluentd SQL插件官方說明

match 配置如下:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint https://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project your_projectforlog
  11. <table in.log>
  12. table mysql_data
  13. fields id,field1,field2,fields3
  14. </table>
  15. </match>

數據會導出到MaxCompute projectforlog project的mysql_data表中,導入的字段包括id, field1, field2, field3。

插件參數說明

向MaxCompute導入數據,需要將插件配置在conf文件中match項中。插件支持的參數說明如下:

  • type(Fixed): 固定值 aliyun_odps.
  • aliyun_access_id(Required):雲賬號access_id.
  • aliyun_access_key(Required):雲賬號access key.
  • aliyun_odps_hub_endpoint(Required):如果你的服務部署在ESC上,請把本值設定為 https://dh-ext.odps.aliyun-inc.com, 否則設置為https://dh.odps.aliyun.com.
  • aliyunodps_endpoint(Required):如果你的服務部署在ESC上,請把本值設定為 https://odps-ext.aiyun-inc.com/api, 否則設置為https://service.odps.aliyun.com/api.
  • buffer_chunk_limit(Optional): 塊大小,支持“k”(KB),“m”(MB),“g”(GB)單位,默認 8MB,建議值2MB.
  • buffer_queue_limit(Optional): 塊隊列大小,此值與buffer_chunk_limit共同決定整個緩衝區大小.
  • flush_interval(Optional): 強製發送間隔,達到時間後塊數據未滿則強製發送, 默認 60s.
  • project(Required): project名稱.
  • table(Required): table名稱.
  • fields(Required): 與source對應,字段名必須存在於source之中.
  • partition(Optional):若為分區表,則設置此項.
  • 分區名支持的設置模式:
  • 固定值: partition ctime=20150804
  • 關鍵字: partition ctime=${remote} (其中remote為source中某字段)
  • 時間格式關鍵字: partition ctime=${datetime.strftime(‘%Y%m%d’)} (其中datetime為source中某時間格式字段,輸出為%Y%m%d格式作為分區名稱)
  • time_format(Optional):如果使用時間格式關鍵字為< partition >, 請設置本參數. 例如: source[datetime]=”29/Aug/2015:11:10:16 +0800”,則設置< time_format > 為”%d/%b/%Y:%H:%M:%S %z”

Flume

除了使用Fluentd可以導入數據外,MaxCompute還支持通過Flume導入數據。Flume是Apache的一款開源軟件,MaxCompute基於Flume開源了導入插件源代碼,感興趣的朋友可以參見 Flume MaxCompute 插件了解更多細節。

最後更新:2016-12-14 16:25:51

  上一篇:go 創建刪除表__快速開始_大數據計算服務-阿裏雲
  下一篇:go 運行SQL__快速開始_大數據計算服務-阿裏雲