阅读476 返回首页    go 微信


导入导出数据__快速开始_大数据计算服务-阿里云

MaxCompute提供多种数据导入导出方式:直接在客户端使用Tunnel命令 或者通过 TUNNEL 提供的SDK自行编写Java工具,以及通过Flume及Fluentd插件方式导入。

Tunnel命令导入数据

准备数据

假设我们准备本地文件wc_example.txt,内容如下:

  1. I LOVE CHINA!
  2. MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!

这里我们把该数据文件保存在D:odpsodpsbin目录下。

创建ODPS表

我们需要把上面的数据导入到MaxCompute的一张表中,所以需要创建一张表:

  1. CREATE TABLE wc_in (word string);

执行tunnel命令

输入表创建成功后,可以在MaxCompute客户端输入tunnel命令进行数据的导入,如下:

  1. tunnel upload D:odpsodpsbinwc_example.txt wc_in;

执行成功后,查看表wc_in的记录,如下:

  1. odps@ $odps_project>select * from wc_in;
  2. ID = 20150918110501864g5z9c6
  3. Log view:
  4. https://webconsole.odps.aliyun-inc.com:8080/logview/?h=https://service-corp.odps.aliyun-inc.com/api&p=odps_public_dev&i=20150918
  5. QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
  6. +------+
  7. | word |
  8. +------+
  9. | I LOVE CHINA! |
  10. | MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL! |
  11. +------+

注意:

  • 有关Tunnel命令的更多详细介绍,例如:如何将数据导入分区表,请参考Tunnel操作
  • 当表中含有多个列时,可以通过“-fd”参数指定列分隔符;

Tunnel SDK

关于如何利用tunnel SDK进行上传数据,下面也将通过场景介绍。场景描述:上传数据到ODPS,其中,项目空间为”odps_public_dev”,表名为”tunnel_sample_test”,分区为”pt=20150801,dt=hangzhou”。

  1. 创建表,添加分区:
    1. CREATE TABLE IF NOT EXISTS tunnel_sample_test(
    2. id STRING,
    3. name STRING)
    4. PARTITIONED BY (pt STRING, dt STRING); --创建表
    5. ALTER TABLE tunnel_sample_test
    6. ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分区
  2. 创建UploadSample的工程目录结构,如下:
    1. |---pom.xml
    2. |---src
    3. |---main
    4. |---java
    5. |---com
    6. |---aliyun
    7. |---odps
    8. |---tunnel
    9. |---example
    10. |---UploadSample.java
    UploadSample : tunnel源文件pom.xml : maven工程文件
  3. 编写UploadSample程序,程序如下:

    1. package com.aliyun.odps.tunnel.example;
    2. import java.io.IOException;
    3. import java.util.Date;
    4. import com.aliyun.odps.Column;
    5. import com.aliyun.odps.Odps;
    6. import com.aliyun.odps.PartitionSpec;
    7. import com.aliyun.odps.TableSchema;
    8. import com.aliyun.odps.account.Account;
    9. import com.aliyun.odps.account.AliyunAccount;
    10. import com.aliyun.odps.data.Record;
    11. import com.aliyun.odps.data.RecordWriter;
    12. import com.aliyun.odps.tunnel.TableTunnel;
    13. import com.aliyun.odps.tunnel.TunnelException;
    14. import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
    15. public class UploadSample {
    16. private static String accessId = "####";
    17. private static String accessKey = "####";
    18. private static String tunnelUrl = "https://dt-corp.odps.aliyun-inc.com";
    19. private static String odpsUrl = "https://service-corp.odps.aliyun-inc.com/api";
    20. private static String project = "odps_public_dev";
    21. private static String table = "tunnel_sample_test";
    22. private static String partition = "pt=20150801,dt=hangzhou";
    23. public static void main(String args[]) {
    24. Account account = new AliyunAccount(accessId, accessKey);
    25. Odps odps = new Odps(account);
    26. odps.setEndpoint(odpsUrl);
    27. odps.setDefaultProject(project);
    28. try {
    29. TableTunnel tunnel = new TableTunnel(odps);
    30. tunnel.setEndpoint(tunnelUrl);
    31. PartitionSpec partitionSpec = new PartitionSpec(partition);
    32. UploadSession uploadSession = tunnel.createUploadSession(project,
    33. table, partitionSpec);
    34. System.out.println("Session Status is : "
    35. + uploadSession.getStatus().toString());
    36. TableSchema schema = uploadSession.getSchema();
    37. RecordWriter recordWriter = uploadSession.openRecordWriter(0);
    38. Record record = uploadSession.newRecord();
    39. for (int i = 0; i < schema.getColumns().size(); i++) {
    40. Column column = schema.getColumn(i);
    41. switch (column.getType()) {
    42. case BIGINT:
    43. record.setBigint(i, 1L);
    44. break;
    45. case BOOLEAN:
    46. record.setBoolean(i, true);
    47. break;
    48. case DATETIME:
    49. record.setDatetime(i, new Date());
    50. break;
    51. case DOUBLE:
    52. record.setDouble(i, 0.0);
    53. break;
    54. case STRING:
    55. record.setString(i, "sample");
    56. break;
    57. default:
    58. throw new RuntimeException("Unknown column type: "
    59. + column.getType());
    60. }
    61. }
    62. for (int i = 0; i < 10; i++) {
    63. recordWriter.write(record);
    64. }
    65. recordWriter.close();
    66. uploadSession.commit(new Long[]{0L});
    67. System.out.println("upload success!");
    68. } catch (TunnelException e) {
    69. e.printStackTrace();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }

    注解:这里我们省略了accessId和accesskey的配置,实际运行时请换上您自己的accessId以及accessKey。

  4. pom.xml文件配置如下:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="https://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.aliyun.odps.tunnel.example</groupId>
    7. <artifactId>UploadSample</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>com.aliyun.odps</groupId>
    12. <artifactId>odps-sdk-core</artifactId>
    13. <version>0.20.7-public</version>
    14. </dependency>
    15. </dependencies>
    16. <repositories>
    17. <repository>
    18. <id>alibaba</id>
    19. <name>alibaba Repository</name>
    20. <url>https://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
    21. </repository>
    22. </repositories>
    23. </project>
  5. 编译与运行:编译UploadSample工程:
    1. mvn package
    运行UploadSample程序,这里我们利用eclipse导入maven project:右击java工程并点击< Import->Maven->Existing Maven Projects >设置如下:

右击UploadSample.java并点击< Run As->Run Configurations >,如下:

点击< Run >运行成功:控制台显示:

  1. Session Status is : NORMAL
  2. upload success!
  1. 查看运行结果:在客户端输入:
    1. select * from tunnel_sample_test;
    显示结果如下:
    1. +----+------+----+----+
    2. | id | name | pt | dt |
    3. +----+------+----+----+
    4. | sample | sample | 20150801 | hangzhou |
    5. | sample | sample | 20150801 | hangzhou |
    6. | sample | sample | 20150801 | hangzhou |
    7. | sample | sample | 20150801 | hangzhou |
    8. | sample | sample | 20150801 | hangzhou |
    9. | sample | sample | 20150801 | hangzhou |
    10. | sample | sample | 20150801 | hangzhou |
    11. | sample | sample | 20150801 | hangzhou |
    12. | sample | sample | 20150801 | hangzhou |
    13. | sample | sample | 20150801 | hangzhou |
    14. +----+------+----+----+

    备注:

    • Tunnel作为MaxCompute中一个独立的服务,有专属的访问端口提供给大家。当用户在阿里云内网环境中,使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。此外内网地址仅对上海域的云产品有效。

Fluentd导入方案

除了通过客户端及Tunnel Java SDK导入数据外,MaxCompute还提供了导入数据的工具-Fluentd。

Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log),允许用户选择插件对日志数据进行过滤、并存储到不同的数据处理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧灵活而着称,允许用户自定义数据源、过滤处理及目标端等插件。目前在这款软件中已经有300+个插件运行Fluentd的架构上,而且这些插件全部是开源的。 MaxCompute也在这款软件上开源了数据导入插件。

环境准备

使用这款软件,向MaxCompute导入数据,需要具备如下环境:

  • Ruby 2.1.0 或更新
  • Gem 2.4.5 或更新
  • Fluentd-0.10.49 或从 Fluentd 官网 查找最新,Fluentd为不同的OS提供了不同的版本,详见 Fluentd 官方文档
  • Protobuf-3.5.1 或更新(Ruby protobuf)

安装导入插件

接下来可以通过以下两种方式中的任意一种来安装MaxCompute Fluentd 导入插件。

方式一:通过ruby gem安装

  1. $ gem install fluent-plugin-aliyun-odps

MaxCompute已经将这个插件发布到GEM库中, 名称为 fluent-plugin-aliyun-odps,只需要通过gem install 命令来安装即可(大家在使用gem 时在国内可能会遇到gem库无法访问,可以在网上搜一下更改gem 库源来解决)。

方式二:通过插件源码安装

  1. $ gem install protobuf
  2. $ gem install fluentd --no-ri --no-rdoc
  3. $ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
  4. $ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

其中第二条命令是安装fluentd,如果已经安装可以省略。 MaxCompute Fluentd插件源码在github上,clone下来之后直接放到Fluentd的plugin目录中即可。

插件的使用

使用Fluentd导入数据时,最主要的是配置Fluentd的conf文件,更多conf文件 的介绍请参考 Fluentd 配置文件介绍

示例一:导入Nginx日志 。Conf中source的配置如下。

  1. <source>
  2. type tail
  3. path /opt/log/in/in.log
  4. pos_file /opt/log/in/in.log.pos
  5. refresh_interval 5s
  6. tag in.log
  7. format /^(?<remote>[^ ]*) - - [(?<datetime>[^]]*)] "(?<method>S+)(?: +(?<path>[^"]*?)(?: +S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^"]*)"$/
  8. time_format %Y%b%d %H:%M:%S %z
  9. </source>

fluentd 以tail方式监控指定的文件内容是否有变化,更多的tail配置参见:Fluentd 官方文档

match 配置如下:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint https://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project projectforlog
  11. <table in.log>
  12. table nginx_log
  13. fields remote,method,path,code,size,agent
  14. partition ctime=${datetime.strftime('%Y%m%d')}
  15. time_format %d/%b/%Y:%H:%M:%S %z
  16. </table>
  17. </match>

数据会导入到projectforlog project的nginx_log表中,其中会以源中的datetime字段作为分区,插件遇到不同的值时会自动创建分区。

示例二:导入MySQL中的数据。导入MySQL中数据时,需要安装fluent-plugin-sql插件作为数据源:

  1. $ gem install fluent-plugin-sql

配置conf中的source:

  1. <source>
  2. type sql
  3. host 127.0.0.1
  4. database test
  5. adapter mysql
  6. username xxxx
  7. password xxxx
  8. select_interval 10s
  9. select_limit 100
  10. state_file /path/sql_state
  11. <table>
  12. table test_table
  13. tag in.sql
  14. update_column id
  15. </table>
  16. </source>

这个例子是从test_table中SELECT数据,每间隔10s去读取100条数据出来,SELECT 时将ID列作为主键(id字段是自增型)。关于fluent-plugin-sql的更多说明参见 Fluentd SQL插件官方说明

match 配置如下:

  1. <match in.**>
  2. type aliyun_odps
  3. aliyun_access_id ************
  4. aliyun_access_key *********
  5. aliyun_odps_endpoint https://service.odps.aliyun.com/api
  6. aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
  7. buffer_chunk_limit 2m
  8. buffer_queue_limit 128
  9. flush_interval 5s
  10. project your_projectforlog
  11. <table in.log>
  12. table mysql_data
  13. fields id,field1,field2,fields3
  14. </table>
  15. </match>

数据会导出到MaxCompute projectforlog project的mysql_data表中,导入的字段包括id, field1, field2, field3。

插件参数说明

向MaxCompute导入数据,需要将插件配置在conf文件中match项中。插件支持的参数说明如下:

  • type(Fixed): 固定值 aliyun_odps.
  • aliyun_access_id(Required):云账号access_id.
  • aliyun_access_key(Required):云账号access key.
  • aliyun_odps_hub_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 https://dh-ext.odps.aliyun-inc.com, 否则设置为https://dh.odps.aliyun.com.
  • aliyunodps_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 https://odps-ext.aiyun-inc.com/api, 否则设置为https://service.odps.aliyun.com/api.
  • buffer_chunk_limit(Optional): 块大小,支持“k”(KB),“m”(MB),“g”(GB)单位,默认 8MB,建议值2MB.
  • buffer_queue_limit(Optional): 块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小.
  • flush_interval(Optional): 强制发送间隔,达到时间后块数据未满则强制发送, 默认 60s.
  • project(Required): project名称.
  • table(Required): table名称.
  • fields(Required): 与source对应,字段名必须存在于source之中.
  • partition(Optional):若为分区表,则设置此项.
  • 分区名支持的设置模式:
  • 固定值: partition ctime=20150804
  • 关键字: partition ctime=${remote} (其中remote为source中某字段)
  • 时间格式关键字: partition ctime=${datetime.strftime(‘%Y%m%d’)} (其中datetime为source中某时间格式字段,输出为%Y%m%d格式作为分区名称)
  • time_format(Optional):如果使用时间格式关键字为< partition >, 请设置本参数. 例如: source[datetime]=”29/Aug/2015:11:10:16 +0800”,则设置< time_format > 为”%d/%b/%Y:%H:%M:%S %z”

Flume

除了使用Fluentd可以导入数据外,MaxCompute还支持通过Flume导入数据。Flume是Apache的一款开源软件,MaxCompute基于Flume开源了导入插件源代码,感兴趣的朋友可以参见 Flume MaxCompute 插件了解更多细节。

最后更新:2016-12-14 16:25:51

  上一篇:go 创建删除表__快速开始_大数据计算服务-阿里云
  下一篇:go 运行SQL__快速开始_大数据计算服务-阿里云