355
阿裏雲
多線程下載示例__SDK示例_批量數據通道_大數據計算服務-阿裏雲
import java.io.IOException;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import com.aliyun.odps.Column;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordReader;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;import com.aliyun.odps.tunnel.TunnelException;class DownloadThread implements Callable<Long> {private long id;private RecordReader recordReader;private TableSchema tableSchema;public DownloadThread(int id,RecordReader recordReader, TableSchema tableSchema) {this.id = id;this.recordReader = recordReader;this.tableSchema = tableSchema;}@Overridepublic Long call() {Long recordNum = 0L;try {Record record;while ((record = recordReader.read()) != null) {recordNum++;System.out.print("Thread " + id + "t");consumeRecord(record, tableSchema);}recordReader.close();} catch (IOException e) {e.printStackTrace();}return recordNum;}private static void consumeRecord(Record record, TableSchema schema) {for (int i = 0; i < schema.getColumns().size(); i++) {Column column = schema.getColumn(i);String colValue = null;switch (column.getType()) {case BIGINT: {Long v = record.getBigint(i);colValue = v == null ? null : v.toString();break;}case BOOLEAN: {Boolean v = record.getBoolean(i);colValue = v == null ? null : v.toString();break;}case DATETIME: {Date v = record.getDatetime(i);colValue = v == null ? null : v.toString();break;}case DOUBLE: {Double v = record.getDouble(i);colValue = v == null ? null : v.toString();break;}case STRING: {String v = record.getString(i);colValue = v == null ? null : v.toString();break;}default:throw new RuntimeException("Unknown column type: "+ column.getType());}System.out.print(colValue == null ? "null" : colValue);if (i != schema.getColumns().size())System.out.print("t");}System.out.println();}}public class DownloadThreadSample {private static String accessId = "<your access id>";private static String accessKey = "<your access Key>";private static String odpsUrl = "https://service.odps.aliyun.com/api";private static String project = "<your project>";private static String table = "<your table name>";private static String partition = "<your partition spec>";private static int threadNum = 10;public static void main(String args[]) {Account account = new AliyunAccount(accessId, accessKey);Odps odps = new Odps(account);odps.setEndpoint(odpsUrl);odps.setDefaultProject(project);TableTunnel tunnel = new TableTunnel(odps);PartitionSpec partitionSpec = new PartitionSpec(partition);DownloadSession downloadSession;try {downloadSession = tunnel.createDownloadSession(project, table,partitionSpec);System.out.println("Session Status is : "+ downloadSession.getStatus().toString());long count = downloadSession.getRecordCount();System.out.println("RecordCount is: " + count);ExecutorService pool = Executors.newFixedThreadPool(threadNum);ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();long start = 0;long step = count / threadNum;for (int i = 0; i < threadNum - 1; i++) {RecordReader recordReader = downloadSession.openRecordReader(step * i, step);callers.add(new DownloadThread( i, recordReader, downloadSession.getSchema()));}RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count- ((threadNum - 1) * step));callers.add(new DownloadThread( threadNum - 1, recordReader, downloadSession.getSchema()));Long downloadNum = 0L;List<Future<Long>> recordNum = pool.invokeAll(callers);for (Future<Long> num : recordNum)downloadNum += num.get();System.out.println("Record Count is: " + downloadNum);pool.shutdown();} catch (TunnelException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
注意:對於tunnel endpoint,支持指定或者不指定。如果指定,按照指定的下載。如果不指定,按照我們的自動路由下載。
最後更新:2016-07-12 15:17:54
上一篇:
多線程上傳示例__SDK示例_批量數據通道_大數據計算服務-阿裏雲
下一篇:
SQL 概要__SQL_大數據計算服務-阿裏雲
CreateVirtualMFADevice__用戶管理接口_RAM API文檔_訪問控製-阿裏雲
防盜鏈__存儲空間管理_最佳實踐_對象存儲 OSS-阿裏雲
SetListenerAccessControlStatus__Listener相關API_API 參考_負載均衡-阿裏雲
查詢伸縮配置__伸縮配置_用戶指南_彈性伸縮-阿裏雲
設置參數__實例管理_用戶指南_雲數據庫 RDS 版-阿裏雲
為什麼我的作業一直失敗?__技術分享_技術運維問題_媒體轉碼-阿裏雲
郵箱設置___郵箱常見問題_企業郵箱-阿裏雲
GetCallerIdentity__操作接口_STS API文檔_訪問控製-阿裏雲
配置轉發策略__實例管理_用戶指南_負載均衡-阿裏雲
GroupDesc__數據類型_API文檔_批量計算-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲