476
微信
导入导出数据__快速开始_大数据计算服务-阿里云
MaxCompute提供多种数据导入导出方式:直接在客户端使用Tunnel命令 或者通过 TUNNEL 提供的SDK自行编写Java工具,以及通过Flume及Fluentd插件方式导入。
Tunnel命令导入数据
准备数据
假设我们准备本地文件wc_example.txt,内容如下:
I LOVE CHINA!
MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL!
这里我们把该数据文件保存在D:odpsodpsbin目录下。
创建ODPS表
我们需要把上面的数据导入到MaxCompute的一张表中,所以需要创建一张表:
CREATE TABLE wc_in (word string);
执行tunnel命令
输入表创建成功后,可以在MaxCompute客户端输入tunnel命令进行数据的导入,如下:
tunnel upload D:odpsodpsbinwc_example.txt wc_in;
执行成功后,查看表wc_in的记录,如下:
odps@ $odps_project>select * from wc_in;
ID = 20150918110501864g5z9c6
Log view:
https://webconsole.odps.aliyun-inc.com:8080/logview/?h=https://service-corp.odps.aliyun-inc.com/api&p=odps_public_dev&i=20150918
QWxsb3ciLCJSZXNvdXJjZSI6WyJhY3M6b2RwczoqOnByb2plY3RzL29kcHNfcHVibGljX2Rldi9pbnN0YW5jZXMvMjAxNTA5MTgxMTA1MDE4NjRnNXo5YzYiXX1dLC
+------+
| word |
+------+
| I LOVE CHINA! |
| MY NAME IS MAGGIE.I LIVE IN HANGZHOU!I LIKE PLAYING BASKETBALL! |
+------+
注意:
- 有关Tunnel命令的更多详细介绍,例如:如何将数据导入分区表,请参考Tunnel操作。
- 当表中含有多个列时,可以通过“-fd”参数指定列分隔符;
Tunnel SDK
关于如何利用tunnel SDK进行上传数据,下面也将通过场景介绍。场景描述:上传数据到ODPS,其中,项目空间为”odps_public_dev”,表名为”tunnel_sample_test”,分区为”pt=20150801,dt=hangzhou”。
- 创建表,添加分区:
CREATE TABLE IF NOT EXISTS tunnel_sample_test(
id STRING,
name STRING)
PARTITIONED BY (pt STRING, dt STRING); --创建表
ALTER TABLE tunnel_sample_test
ADD IF NOT EXISTS PARTITION (pt='20150801',dt='hangzhou'); --添加分区
- 创建UploadSample的工程目录结构,如下:
UploadSample : tunnel源文件pom.xml : maven工程文件|---pom.xml
|---src
|---main
|---java
|---com
|---aliyun
|---odps
|---tunnel
|---example
|---UploadSample.java
编写UploadSample程序,程序如下:
package com.aliyun.odps.tunnel.example;
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {
private static String accessId = "####";
private static String accessKey = "####";
private static String tunnelUrl = "https://dt-corp.odps.aliyun-inc.com";
private static String odpsUrl = "https://service-corp.odps.aliyun-inc.com/api";
private static String project = "odps_public_dev";
private static String table = "tunnel_sample_test";
private static String partition = "pt=20150801,dt=hangzhou";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
PartitionSpec partitionSpec = new PartitionSpec(partition);
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println("Session Status is : "
+ uploadSession.getStatus().toString());
TableSchema schema = uploadSession.getSchema();
RecordWriter recordWriter = uploadSession.openRecordWriter(0);
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
for (int i = 0; i < 10; i++) {
recordWriter.write(record);
}
recordWriter.close();
uploadSession.commit(new Long[]{0L});
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
注解:这里我们省略了accessId和accesskey的配置,实际运行时请换上您自己的accessId以及accessKey。
pom.xml文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun.odps.tunnel.example</groupId>
<artifactId>UploadSample</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.20.7-public</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>alibaba</id>
<name>alibaba Repository</name>
<url>https://mvnrepo.alibaba-inc.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>
- 编译与运行:编译UploadSample工程:
运行UploadSample程序,这里我们利用eclipse导入maven project:右击java工程并点击< Import->Maven->Existing Maven Projects >设置如下:mvn package
右击UploadSample.java并点击< Run As->Run Configurations >,如下:
点击< Run >运行成功:控制台显示:
Session Status is : NORMAL
upload success!
- 查看运行结果:在客户端输入:
显示结果如下:select * from tunnel_sample_test;
+----+------+----+----+
| id | name | pt | dt |
+----+------+----+----+
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
| sample | sample | 20150801 | hangzhou |
+----+------+----+----+
备注:
- Tunnel作为MaxCompute中一个独立的服务,有专属的访问端口提供给大家。当用户在阿里云内网环境中,使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。此外内网地址仅对上海域的云产品有效。
- MaxCompute阿里云内网地址:https://odps-ext.aliyun-inc.com/api
- MaxCompute公网地址:https://service.odps.aliyun.com/api
Fluentd导入方案
除了通过客户端及Tunnel Java SDK导入数据外,MaxCompute还提供了导入数据的工具-Fluentd。
Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log),允许用户选择插件对日志数据进行过滤、并存储到不同的数据处理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧灵活而着称,允许用户自定义数据源、过滤处理及目标端等插件。目前在这款软件中已经有300+个插件运行Fluentd的架构上,而且这些插件全部是开源的。 MaxCompute也在这款软件上开源了数据导入插件。
环境准备
使用这款软件,向MaxCompute导入数据,需要具备如下环境:
- Ruby 2.1.0 或更新
- Gem 2.4.5 或更新
- Fluentd-0.10.49 或从 Fluentd 官网 查找最新,Fluentd为不同的OS提供了不同的版本,详见 Fluentd 官方文档
- Protobuf-3.5.1 或更新(Ruby protobuf)
安装导入插件
接下来可以通过以下两种方式中的任意一种来安装MaxCompute Fluentd 导入插件。
方式一:通过ruby gem安装
$ gem install fluent-plugin-aliyun-odps
MaxCompute已经将这个插件发布到GEM库中, 名称为 fluent-plugin-aliyun-odps,只需要通过gem install 命令来安装即可(大家在使用gem 时在国内可能会遇到gem库无法访问,可以在网上搜一下更改gem 库源来解决)。
方式二:通过插件源码安装
$ gem install protobuf
$ gem install fluentd --no-ri --no-rdoc
$ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r
其中第二条命令是安装fluentd,如果已经安装可以省略。 MaxCompute Fluentd插件源码在github上,clone下来之后直接放到Fluentd的plugin目录中即可。
插件的使用
使用Fluentd导入数据时,最主要的是配置Fluentd的conf文件,更多conf文件 的介绍请参考 Fluentd 配置文件介绍 。
示例一:导入Nginx日志 。Conf中source的配置如下。
<source>
type tail
path /opt/log/in/in.log
pos_file /opt/log/in/in.log.pos
refresh_interval 5s
tag in.log
format /^(?<remote>[^ ]*) - - [(?<datetime>[^]]*)] "(?<method>S+)(?: +(?<path>[^"]*?)(?: +S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^"]*)"$/
time_format %Y%b%d %H:%M:%S %z
</source>
fluentd 以tail方式监控指定的文件内容是否有变化,更多的tail配置参见:Fluentd 官方文档 。
match 配置如下:
<match in.**>
type aliyun_odps
aliyun_access_id ************
aliyun_access_key *********
aliyun_odps_endpoint https://service.odps.aliyun.com/api
aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
buffer_chunk_limit 2m
buffer_queue_limit 128
flush_interval 5s
project projectforlog
<table in.log>
table nginx_log
fields remote,method,path,code,size,agent
partition ctime=${datetime.strftime('%Y%m%d')}
time_format %d/%b/%Y:%H:%M:%S %z
</table>
</match>
数据会导入到projectforlog project的nginx_log表中,其中会以源中的datetime字段作为分区,插件遇到不同的值时会自动创建分区。
示例二:导入MySQL中的数据。导入MySQL中数据时,需要安装fluent-plugin-sql插件作为数据源:
$ gem install fluent-plugin-sql
配置conf中的source:
<source>
type sql
host 127.0.0.1
database test
adapter mysql
username xxxx
password xxxx
select_interval 10s
select_limit 100
state_file /path/sql_state
<table>
table test_table
tag in.sql
update_column id
</table>
</source>
这个例子是从test_table中SELECT数据,每间隔10s去读取100条数据出来,SELECT 时将ID列作为主键(id字段是自增型)。关于fluent-plugin-sql的更多说明参见 Fluentd SQL插件官方说明
match 配置如下:
<match in.**>
type aliyun_odps
aliyun_access_id ************
aliyun_access_key *********
aliyun_odps_endpoint https://service.odps.aliyun.com/api
aliyun_odps_hub_endpoint https://dh.odps.aliyun.com
buffer_chunk_limit 2m
buffer_queue_limit 128
flush_interval 5s
project your_projectforlog
<table in.log>
table mysql_data
fields id,field1,field2,fields3
</table>
</match>
数据会导出到MaxCompute projectforlog project的mysql_data表中,导入的字段包括id, field1, field2, field3。
插件参数说明
向MaxCompute导入数据,需要将插件配置在conf文件中match项中。插件支持的参数说明如下:
- type(Fixed): 固定值 aliyun_odps.
- aliyun_access_id(Required):云账号access_id.
- aliyun_access_key(Required):云账号access key.
- aliyun_odps_hub_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 https://dh-ext.odps.aliyun-inc.com, 否则设置为https://dh.odps.aliyun.com.
- aliyunodps_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 https://odps-ext.aiyun-inc.com/api, 否则设置为https://service.odps.aliyun.com/api.
- buffer_chunk_limit(Optional): 块大小,支持“k”(KB),“m”(MB),“g”(GB)单位,默认 8MB,建议值2MB.
- buffer_queue_limit(Optional): 块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小.
- flush_interval(Optional): 强制发送间隔,达到时间后块数据未满则强制发送, 默认 60s.
- project(Required): project名称.
- table(Required): table名称.
- fields(Required): 与source对应,字段名必须存在于source之中.
- partition(Optional):若为分区表,则设置此项.
- 分区名支持的设置模式:
- 固定值: partition ctime=20150804
- 关键字: partition ctime=${remote} (其中remote为source中某字段)
- 时间格式关键字: partition ctime=${datetime.strftime(‘%Y%m%d’)} (其中datetime为source中某时间格式字段,输出为%Y%m%d格式作为分区名称)
- time_format(Optional):如果使用时间格式关键字为< partition >, 请设置本参数. 例如: source[datetime]=”29/Aug/2015:11:10:16 +0800”,则设置< time_format > 为”%d/%b/%Y:%H:%M:%S %z”
Flume
除了使用Fluentd可以导入数据外,MaxCompute还支持通过Flume导入数据。Flume是Apache的一款开源软件,MaxCompute基于Flume开源了导入插件源代码,感兴趣的朋友可以参见 Flume MaxCompute 插件了解更多细节。
最后更新:2016-12-14 16:25:51
上一篇:
创建删除表__快速开始_大数据计算服务-阿里云
下一篇:
运行SQL__快速开始_大数据计算服务-阿里云
启用伸缩组__API快速入门_快速入门_弹性伸缩-阿里云
阿里云安全白皮书都有哪些重要内容?
云虚拟主机开源discuz安装指南__程序安装_使用指南_云虚机主机-阿里云
参数说明__Spark_开发人员指南_E-MapReduce-阿里云
PasswordPolicy__数据类型_RAM API文档_访问控制-阿里云
MapReduce开发插件介绍__Eclipse开发插件_工具_大数据计算服务-阿里云
步骤 1:配置选型__快速入门(Linux)_云服务器 ECS-阿里云
Decrypt__API 参考_密钥管理服务-阿里云
删除应用__应用管理_用户指南_容器服务-阿里云
消费者状态显示"是否在线" 为"否" 问题排查__技术分享_技术运维问题_消息队列 MQ-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云