1004
魔兽
SDK介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云
概要
SDK通过Maven管理,pom信息如下
<dependency><groupId>com.aliyun.datahub</groupId><artifactId>aliyun-sdk-datahub</artifactId><version>2.1.1-public</version></dependency>
Java SDK Example
准备工作
访问DataHub需要使用阿里云认证账号,需要提供阿里云AccessID及AccessKey。 同时需要提供访问的服务地址,公网服务地址为:
https://dh-cn-hangzhou.aliyuncs.com
初始化AliyunAccount与DatahubConfiguration
String accessId = "Your AccessId";String accessKey = "Your AccessKey";String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";AliyunAccount account = new AliyunAccount(accessId, accessKey);DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);
初始化DataHubClient, DataHub服务所有操作均可用该client进行
DatahubClient client = new DatahubClient(conf);
TupleTopic使用样例
创建Tuple Topic
RecordSchema schema = new RecordSchema();schema.addField(new Field("a", FieldType.STRING));schema.addField(new Field("b", FieldType.BIGINT));int shardCount = 5;int lifeCycle = 3;String topicName = "topic_example";String topicDesc = "topic_example_desc";client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicDesc);
获取Shard列表
ListShardResult listShardResult = client.listShard(projectName, topicName);
写入Tuple数据
准备数据:
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();String shardId = listShardResult.getShards().get(0).getShardId();RecordEntry entry = new RecordEntry(schema);entry.setString(0, "Test");entry.setBigint(1, 5L);entry.setShardId(shardId);recordEntries.add(entry);PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);if (result.getFailedRecordCount() != 0) {List<ErrorEntry> errors = result.getFailedRecordError();// deal with result.getFailedRecords()}
更多写入方式请参考多方式数据写入
消费Tuple数据
获取Cursor,可以通过三种方式获取:”OLDEST”, “LATEST”, “SYSTEM_TIME”
GetCursorResult cursorRs =client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);//GetCursorResult cursorRs = client.getCursor(projectName, topicName, shardId, System.currentTimeMillis() - 24 * 3600 * 1000 /* ms */); 可以获取到24小时内的第一条数据Cursor
读取数据:
int limit = 100;String cursor = cursorRs.getCursor();while (true) {try {GetRecordsResult recordRs = client.getRecords(projectName, topicName, shardId, cursor, limit, schema);List<RecordEntry> recordEntries = recordRs.getRecords();if (cursor.equals(recordRs.getNextCursor())) {// no more data , please waittry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}cursor = recordRs.getNextCursor();} catch (InvalidCursorException ex) {cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);cursor = cursorRs.getCursor();}}
BlobTopic写入二进制数据使用样例
创建Blob Topic
int shardCount = 5;int lifeCycle = 3;String topicName = "topic_example";String topicDesc = "topic_example_desc";client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicDesc);
获取Shard列表
同TupleTopic
写入Blob二进制数据
准备数据:
List<BlobRecordEntry> recordEntries = new ArrayList<BlobRecordEntry>();String shardId = listShardResult.getShards().get(0).getShardId();String data = String.valueOf(System.currentTimeMillis());BlobRecordEntry entry = new BlobRecordEntry();entry.setData(data.getBytes());entry.setShardId(shardId);recordEntries.add(entry);PutBlobRecordsResult result = client.putBlobRecords(projectName, topicName, recordEntries);if (result.getFailedRecordCount() != 0) {List<ErrorEntry> errors = result.getFailedRecordError();// deal with result.getFailedRecords()}
消费Blob数据
获取Cursor,同TupleTopic.
读取数据:
int limit = 100;String cursor = cursorRs.getCursor();while (true) {try {GetBlobRecordsResult recordRs = client.getBlobRecords(projectName, topicName, shardId, cursor, limit);List<BlobRecordEntry> recordEntries = recordRs.getRecords();Assert.assertEquals(recordRs.getRecords().get(0).getData(), data.getBytes());if (cursor.equals(recordRs.getNextCursor())) {// no more data , please waittry {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}cursor = recordRs.getNextCursor();} catch (InvalidCursorException ex) {cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);cursor = cursorRs.getCursor();}}
最后更新:2016-11-24 18:03:38
上一篇:
快速指引__DataHub实时数据通道_大数据计算服务-阿里云
下一篇:
Web控制台介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云
使用管理VPN__金融云新手上路_金融云-阿里云
创建分区__数据库开发_用户指南(RDBMS)_数据管理-阿里云
阿里云新一代关系型数据库 PolarDB
IoT API 授权映射表__子账号访问IoT_控制台使用手册_阿里云物联网套件-阿里云
DeployedInfo__数据类型_API_API 网关-阿里云
ALIYUN::CS::App__资源列表_资源编排-阿里云
ForwardEntryItemType__数据类型_API 参考_云服务器 ECS-阿里云
使用RAM授权__用户访问权限控制_用户指南_文件存储-阿里云
业务限制__附录_RAM API文档_访问控制-阿里云
GetPolicyVersion__授权策略管理接口_RAM API文档_访问控制-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云