多线程上传示例__SDK示例_批量数据通道_大数据计算服务-阿里云
import java.io.IOException;import java.util.ArrayList;import java.util.Date;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.aliyun.odps.Column;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordWriter;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TunnelException;import com.aliyun.odps.tunnel.TableTunnel.UploadSession;class UploadThread implements Callable<Boolean> {private long id;private RecordWriter recordWriter;private Record record;private TableSchema tableSchema;public UploadThread(long id, RecordWriter recordWriter, Record record,TableSchema tableSchema) {this.id = id;this.recordWriter = recordWriter;this.record = record;this.tableSchema = tableSchema;}@Overridepublic Boolean call() {for (int i = 0; i < tableSchema.getColumns().size(); i++) {Column column = tableSchema.getColumn(i);switch (column.getType()) {case BIGINT:record.setBigint(i, 1L);break;case BOOLEAN:record.setBoolean(i, true);break;case DATETIME:record.setDatetime(i, new Date());break;case DOUBLE:record.setDouble(i, 0.0);break;case STRING:record.setString(i, "sample");break;default:throw new RuntimeException("Unknown column type: "+ column.getType());}}for (int i = 0; i < 10; i++) {try {recordWriter.write(record);} catch (IOException e) {recordWriter.close();e.printStackTrace();return false;}}recordWriter.close();return true;}}public class UploadThreadSample {private static String accessId = "<your access id>";private static String accessKey = "<your access Key>";private static String odpsUrl = "<https://service.odps.aliyun.com/api>";private static String project = "<your project>";private static String table = "<your table name>";private static String partition = "<your partition spec>";private static int threadNum = 10;public static void main(String args[]) {Account account = new AliyunAccount(accessId, accessKey);Odps odps = new Odps(account);odps.setEndpoint(odpsUrl);odps.setDefaultProject(project);try {TableTunnel tunnel = new TableTunnel(odps);PartitionSpec partitionSpec = new PartitionSpec(partition);UploadSession uploadSession = tunnel.createUploadSession(project,table, partitionSpec);System.out.println("Session Status is : "+ uploadSession.getStatus().toString());ExecutorService pool = Executors.newFixedThreadPool(threadNum);ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();for (int i = 0; i < threadNum; i++) {RecordWriter recordWriter = uploadSession.openRecordWriter(i);Record record = uploadSession.newRecord();callers.add(new UploadThread(i, recordWriter, record,uploadSession.getSchema()));}pool.invokeAll(callers);pool.shutdown();Long[] blockList = new Long[threadNum];for (int i = 0; i < threadNum; i++)blockList[i] = Long.valueOf(i);uploadSession.commit(blockList);System.out.println("upload success!");} catch (TunnelException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}
注意:对于tunnel endpoint,支持指定或者不指定。如果指定,按照指定的endpoint路由。如果不指定,支持自动路由。
最后更新:2016-07-12 15:15:40
上一篇:
简单下载示例__SDK示例_批量数据通道_大数据计算服务-阿里云
下一篇:
多线程下载示例__SDK示例_批量数据通道_大数据计算服务-阿里云
设置解析记录时提示冲突的原因__网站解析_产品使用问题_云解析-阿里云
修改路径缓存策略__配置操作接口_API 手册_CDN-阿里云
ALIYUN::ECS::SNatEntry__资源列表_资源编排-阿里云
1.2 阿里云企业邮箱-管理篇__云邮箱快速开始_阿里云邮箱 体验_体验馆-阿里云
ListUsers__用户管理接口_RAM API文档_访问控制-阿里云
快照 2.0 产品规格升级__快照_产品简介_云服务器 ECS-阿里云
查询RDS可用区信息和数据复制状态__实例管理_API 参考_云数据库 RDS 版-阿里云
闹钟_阿里云帮助中心-阿里云,领先的云计算服务提供商
使用第三方数据卷__数据卷指南_用户指南_容器服务-阿里云
查看当前实例数据库参数运行列表__参数管理_API 参考_云数据库 RDS 版-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云