1004
小米淨水器
SDK介紹__使用指南_DataHub實時數據通道_大數據計算服務-阿裏雲
概要
SDK通過Maven管理,pom信息如下
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.1.1-public</version>
</dependency>
Java SDK Example
準備工作
訪問DataHub需要使用阿裏雲認證賬號,需要提供阿裏雲AccessID及AccessKey。 同時需要提供訪問的服務地址,公網服務地址為:
https://dh-cn-hangzhou.aliyuncs.com
初始化AliyunAccount與DatahubConfiguration
String accessId = "Your AccessId";
String accessKey = "Your AccessKey";
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);
初始化DataHubClient, DataHub服務所有操作均可用該client進行
DatahubClient client = new DatahubClient(conf);
TupleTopic使用樣例
創建Tuple Topic
RecordSchema schema = new RecordSchema();
schema.addField(new Field("a", FieldType.STRING));
schema.addField(new Field("b", FieldType.BIGINT));
int shardCount = 5;
int lifeCycle = 3;
String topicName = "topic_example";
String topicDesc = "topic_example_desc";
client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicDesc);
獲取Shard列表
ListShardResult listShardResult = client.listShard(projectName, topicName);
寫入Tuple數據
準備數據:
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
String shardId = listShardResult.getShards().get(0).getShardId();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
更多寫入方式請參考多方式數據寫入
消費Tuple數據
獲取Cursor,可以通過三種方式獲取:”OLDEST”, “LATEST”, “SYSTEM_TIME”
GetCursorResult cursorRs =
client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
//GetCursorResult cursorRs = client.getCursor(projectName, topicName, shardId, System.currentTimeMillis() - 24 * 3600 * 1000 /* ms */); 可以獲取到24小時內的第一條數據Cursor
讀取數據:
int limit = 100;
String cursor = cursorRs.getCursor();
while (true) {
try {
GetRecordsResult recordRs = client.getRecords(projectName, topicName, shardId, cursor, limit, schema);
List<RecordEntry> recordEntries = recordRs.getRecords();
if (cursor.equals(recordRs.getNextCursor())) {
// no more data , please wait
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
cursor = recordRs.getNextCursor();
} catch (InvalidCursorException ex) {
cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorRs.getCursor();
}
}
BlobTopic寫入二進製數據使用樣例
創建Blob Topic
int shardCount = 5;
int lifeCycle = 3;
String topicName = "topic_example";
String topicDesc = "topic_example_desc";
client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicDesc);
獲取Shard列表
同TupleTopic
寫入Blob二進製數據
準備數據:
List<BlobRecordEntry> recordEntries = new ArrayList<BlobRecordEntry>();
String shardId = listShardResult.getShards().get(0).getShardId();
String data = String.valueOf(System.currentTimeMillis());
BlobRecordEntry entry = new BlobRecordEntry();
entry.setData(data.getBytes());
entry.setShardId(shardId);
recordEntries.add(entry);
PutBlobRecordsResult result = client.putBlobRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
消費Blob數據
獲取Cursor,同TupleTopic.
讀取數據:
int limit = 100;
String cursor = cursorRs.getCursor();
while (true) {
try {
GetBlobRecordsResult recordRs = client.getBlobRecords(projectName, topicName, shardId, cursor, limit);
List<BlobRecordEntry> recordEntries = recordRs.getRecords();
Assert.assertEquals(recordRs.getRecords().get(0).getData(), data.getBytes());
if (cursor.equals(recordRs.getNextCursor())) {
// no more data , please wait
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
cursor = recordRs.getNextCursor();
} catch (InvalidCursorException ex) {
cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorRs.getCursor();
}
}
最後更新:2016-11-24 18:03:38
上一篇:
快速指引__DataHub實時數據通道_大數據計算服務-阿裏雲
下一篇:
Web控製台介紹__使用指南_DataHub實時數據通道_大數據計算服務-阿裏雲
使用管理VPN__金融雲新手上路_金融雲-阿裏雲
創建分區__數據庫開發_用戶指南(RDBMS)_數據管理-阿裏雲
阿裏雲新一代關係型數據庫 PolarDB
IoT API 授權映射表__子賬號訪問IoT_控製台使用手冊_阿裏雲物聯網套件-阿裏雲
DeployedInfo__數據類型_API_API 網關-阿裏雲
ALIYUN::CS::App__資源列表_資源編排-阿裏雲
ForwardEntryItemType__數據類型_API 參考_雲服務器 ECS-阿裏雲
使用RAM授權__用戶訪問權限控製_用戶指南_文件存儲-阿裏雲
業務限製__附錄_RAM API文檔_訪問控製-阿裏雲
GetPolicyVersion__授權策略管理接口_RAM API文檔_訪問控製-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲