1004
魔獸
SDK介紹__使用指南_DataHub實時數據通道_大數據計算服務-阿裏雲
概要
SDK通過Maven管理,pom信息如下
<dependency><groupId>com.aliyun.datahub</groupId><artifactId>aliyun-sdk-datahub</artifactId><version>2.1.1-public</version></dependency>
Java SDK Example
準備工作
訪問DataHub需要使用阿裏雲認證賬號,需要提供阿裏雲AccessID及AccessKey。 同時需要提供訪問的服務地址,公網服務地址為:
https://dh-cn-hangzhou.aliyuncs.com
初始化AliyunAccount與DatahubConfiguration
String accessId = "Your AccessId";String accessKey = "Your AccessKey";String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";AliyunAccount account = new AliyunAccount(accessId, accessKey);DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);
初始化DataHubClient, DataHub服務所有操作均可用該client進行
DatahubClient client = new DatahubClient(conf);
TupleTopic使用樣例
創建Tuple Topic
RecordSchema schema = new RecordSchema();schema.addField(new Field("a", FieldType.STRING));schema.addField(new Field("b", FieldType.BIGINT));int shardCount = 5;int lifeCycle = 3;String topicName = "topic_example";String topicDesc = "topic_example_desc";client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicDesc);
獲取Shard列表
ListShardResult listShardResult = client.listShard(projectName, topicName);
寫入Tuple數據
準備數據:
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();String shardId = listShardResult.getShards().get(0).getShardId();RecordEntry entry = new RecordEntry(schema);entry.setString(0, "Test");entry.setBigint(1, 5L);entry.setShardId(shardId);recordEntries.add(entry);PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);if (result.getFailedRecordCount() != 0) {List<ErrorEntry> errors = result.getFailedRecordError();// deal with result.getFailedRecords()}
更多寫入方式請參考多方式數據寫入
消費Tuple數據
獲取Cursor,可以通過三種方式獲取:”OLDEST”, “LATEST”, “SYSTEM_TIME”
GetCursorResult cursorRs =client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);//GetCursorResult cursorRs = client.getCursor(projectName, topicName, shardId, System.currentTimeMillis() - 24 * 3600 * 1000 /* ms */); 可以獲取到24小時內的第一條數據Cursor
讀取數據:
int limit = 100;String cursor = cursorRs.getCursor();while (true) {try {GetRecordsResult recordRs = client.getRecords(projectName, topicName, shardId, cursor, limit, schema);List<RecordEntry> recordEntries = recordRs.getRecords();if (cursor.equals(recordRs.getNextCursor())) {// no more data , please waittry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}cursor = recordRs.getNextCursor();} catch (InvalidCursorException ex) {cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);cursor = cursorRs.getCursor();}}
BlobTopic寫入二進製數據使用樣例
創建Blob Topic
int shardCount = 5;int lifeCycle = 3;String topicName = "topic_example";String topicDesc = "topic_example_desc";client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicDesc);
獲取Shard列表
同TupleTopic
寫入Blob二進製數據
準備數據:
List<BlobRecordEntry> recordEntries = new ArrayList<BlobRecordEntry>();String shardId = listShardResult.getShards().get(0).getShardId();String data = String.valueOf(System.currentTimeMillis());BlobRecordEntry entry = new BlobRecordEntry();entry.setData(data.getBytes());entry.setShardId(shardId);recordEntries.add(entry);PutBlobRecordsResult result = client.putBlobRecords(projectName, topicName, recordEntries);if (result.getFailedRecordCount() != 0) {List<ErrorEntry> errors = result.getFailedRecordError();// deal with result.getFailedRecords()}
消費Blob數據
獲取Cursor,同TupleTopic.
讀取數據:
int limit = 100;String cursor = cursorRs.getCursor();while (true) {try {GetBlobRecordsResult recordRs = client.getBlobRecords(projectName, topicName, shardId, cursor, limit);List<BlobRecordEntry> recordEntries = recordRs.getRecords();Assert.assertEquals(recordRs.getRecords().get(0).getData(), data.getBytes());if (cursor.equals(recordRs.getNextCursor())) {// no more data , please waittry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}cursor = recordRs.getNextCursor();} catch (InvalidCursorException ex) {cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);cursor = cursorRs.getCursor();}}
最後更新:2016-11-24 18:03:38
上一篇:
快速指引__DataHub實時數據通道_大數據計算服務-阿裏雲
下一篇:
Web控製台介紹__使用指南_DataHub實時數據通道_大數據計算服務-阿裏雲
使用管理VPN__金融雲新手上路_金融雲-阿裏雲
創建分區__數據庫開發_用戶指南(RDBMS)_數據管理-阿裏雲
阿裏雲新一代關係型數據庫 PolarDB
IoT API 授權映射表__子賬號訪問IoT_控製台使用手冊_阿裏雲物聯網套件-阿裏雲
DeployedInfo__數據類型_API_API 網關-阿裏雲
ALIYUN::CS::App__資源列表_資源編排-阿裏雲
ForwardEntryItemType__數據類型_API 參考_雲服務器 ECS-阿裏雲
使用RAM授權__用戶訪問權限控製_用戶指南_文件存儲-阿裏雲
業務限製__附錄_RAM API文檔_訪問控製-阿裏雲
GetPolicyVersion__授權策略管理接口_RAM API文檔_訪問控製-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲