1004
人物
SDK介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云
概要
SDK通过Maven管理,pom信息如下
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.1.1-public</version>
</dependency>
Java SDK Example
准备工作
访问DataHub需要使用阿里云认证账号,需要提供阿里云AccessID及AccessKey。 同时需要提供访问的服务地址,公网服务地址为:
https://dh-cn-hangzhou.aliyuncs.com
初始化AliyunAccount与DatahubConfiguration
String accessId = "Your AccessId";
String accessKey = "Your AccessKey";
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);
初始化DataHubClient, DataHub服务所有操作均可用该client进行
DatahubClient client = new DatahubClient(conf);
TupleTopic使用样例
创建Tuple Topic
RecordSchema schema = new RecordSchema();
schema.addField(new Field("a", FieldType.STRING));
schema.addField(new Field("b", FieldType.BIGINT));
int shardCount = 5;
int lifeCycle = 3;
String topicName = "topic_example";
String topicDesc = "topic_example_desc";
client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicDesc);
获取Shard列表
ListShardResult listShardResult = client.listShard(projectName, topicName);
写入Tuple数据
准备数据:
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
String shardId = listShardResult.getShards().get(0).getShardId();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
更多写入方式请参考多方式数据写入
消费Tuple数据
获取Cursor,可以通过三种方式获取:”OLDEST”, “LATEST”, “SYSTEM_TIME”
GetCursorResult cursorRs =
client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
//GetCursorResult cursorRs = client.getCursor(projectName, topicName, shardId, System.currentTimeMillis() - 24 * 3600 * 1000 /* ms */); 可以获取到24小时内的第一条数据Cursor
读取数据:
int limit = 100;
String cursor = cursorRs.getCursor();
while (true) {
try {
GetRecordsResult recordRs = client.getRecords(projectName, topicName, shardId, cursor, limit, schema);
List<RecordEntry> recordEntries = recordRs.getRecords();
if (cursor.equals(recordRs.getNextCursor())) {
// no more data , please wait
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
cursor = recordRs.getNextCursor();
} catch (InvalidCursorException ex) {
cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorRs.getCursor();
}
}
BlobTopic写入二进制数据使用样例
创建Blob Topic
int shardCount = 5;
int lifeCycle = 3;
String topicName = "topic_example";
String topicDesc = "topic_example_desc";
client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicDesc);
获取Shard列表
同TupleTopic
写入Blob二进制数据
准备数据:
List<BlobRecordEntry> recordEntries = new ArrayList<BlobRecordEntry>();
String shardId = listShardResult.getShards().get(0).getShardId();
String data = String.valueOf(System.currentTimeMillis());
BlobRecordEntry entry = new BlobRecordEntry();
entry.setData(data.getBytes());
entry.setShardId(shardId);
recordEntries.add(entry);
PutBlobRecordsResult result = client.putBlobRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
消费Blob数据
获取Cursor,同TupleTopic.
读取数据:
int limit = 100;
String cursor = cursorRs.getCursor();
while (true) {
try {
GetBlobRecordsResult recordRs = client.getBlobRecords(projectName, topicName, shardId, cursor, limit);
List<BlobRecordEntry> recordEntries = recordRs.getRecords();
Assert.assertEquals(recordRs.getRecords().get(0).getData(), data.getBytes());
if (cursor.equals(recordRs.getNextCursor())) {
// no more data , please wait
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
cursor = recordRs.getNextCursor();
} catch (InvalidCursorException ex) {
cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorRs.getCursor();
}
}
最后更新:2016-11-24 18:03:38
上一篇:
快速指引__DataHub实时数据通道_大数据计算服务-阿里云
下一篇:
Web控制台介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云
使用管理VPN__金融云新手上路_金融云-阿里云
创建分区__数据库开发_用户指南(RDBMS)_数据管理-阿里云
阿里云新一代关系型数据库 PolarDB
IoT API 授权映射表__子账号访问IoT_控制台使用手册_阿里云物联网套件-阿里云
DeployedInfo__数据类型_API_API 网关-阿里云
ALIYUN::CS::App__资源列表_资源编排-阿里云
ForwardEntryItemType__数据类型_API 参考_云服务器 ECS-阿里云
使用RAM授权__用户访问权限控制_用户指南_文件存储-阿里云
业务限制__附录_RAM API文档_访问控制-阿里云
GetPolicyVersion__授权策略管理接口_RAM API文档_访问控制-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云