阅读1004 返回首页    go 人物


SDK介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云

概要

SDK通过Maven管理,pom信息如下

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.1.1-public</version>
  5. </dependency>

Java SDK Example

准备工作

访问DataHub需要使用阿里云认证账号,需要提供阿里云AccessID及AccessKey。 同时需要提供访问的服务地址,公网服务地址为:

  1. https://dh-cn-hangzhou.aliyuncs.com

初始化AliyunAccount与DatahubConfiguration

  1. String accessId = "Your AccessId";
  2. String accessKey = "Your AccessKey";
  3. String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
  4. AliyunAccount account = new AliyunAccount(accessId, accessKey);
  5. DatahubConfiguration conf = new DatahubConfiguration(account, endpoint);

初始化DataHubClient, DataHub服务所有操作均可用该client进行

  1. DatahubClient client = new DatahubClient(conf);

TupleTopic使用样例

创建Tuple Topic
  1. RecordSchema schema = new RecordSchema();
  2. schema.addField(new Field("a", FieldType.STRING));
  3. schema.addField(new Field("b", FieldType.BIGINT));
  4. int shardCount = 5;
  5. int lifeCycle = 3;
  6. String topicName = "topic_example";
  7. String topicDesc = "topic_example_desc";
  8. client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicDesc);
获取Shard列表
  1. ListShardResult listShardResult = client.listShard(projectName, topicName);
写入Tuple数据

准备数据:

  1. List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
  2. String shardId = listShardResult.getShards().get(0).getShardId();
  3. RecordEntry entry = new RecordEntry(schema);
  4. entry.setString(0, "Test");
  5. entry.setBigint(1, 5L);
  6. entry.setShardId(shardId);
  7. recordEntries.add(entry);
  8. PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
  9. if (result.getFailedRecordCount() != 0) {
  10. List<ErrorEntry> errors = result.getFailedRecordError();
  11. // deal with result.getFailedRecords()
  12. }

更多写入方式请参考多方式数据写入

消费Tuple数据

获取Cursor,可以通过三种方式获取:”OLDEST”, “LATEST”, “SYSTEM_TIME”

  1. GetCursorResult cursorRs =
  2. client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
  3. //GetCursorResult cursorRs = client.getCursor(projectName, topicName, shardId, System.currentTimeMillis() - 24 * 3600 * 1000 /* ms */); 可以获取到24小时内的第一条数据Cursor

读取数据:

  1. int limit = 100;
  2. String cursor = cursorRs.getCursor();
  3. while (true) {
  4. try {
  5. GetRecordsResult recordRs = client.getRecords(projectName, topicName, shardId, cursor, limit, schema);
  6. List<RecordEntry> recordEntries = recordRs.getRecords();
  7. if (cursor.equals(recordRs.getNextCursor())) {
  8. // no more data , please wait
  9. try {
  10. Thread.sleep(1000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. cursor = recordRs.getNextCursor();
  16. } catch (InvalidCursorException ex) {
  17. cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
  18. cursor = cursorRs.getCursor();
  19. }
  20. }

BlobTopic写入二进制数据使用样例

创建Blob Topic
  1. int shardCount = 5;
  2. int lifeCycle = 3;
  3. String topicName = "topic_example";
  4. String topicDesc = "topic_example_desc";
  5. client.createTopic(projectName, topicName, shardCount, lifeCycle, RecordType.BLOB, topicDesc);
获取Shard列表

同TupleTopic

写入Blob二进制数据

准备数据:

  1. List<BlobRecordEntry> recordEntries = new ArrayList<BlobRecordEntry>();
  2. String shardId = listShardResult.getShards().get(0).getShardId();
  3. String data = String.valueOf(System.currentTimeMillis());
  4. BlobRecordEntry entry = new BlobRecordEntry();
  5. entry.setData(data.getBytes());
  6. entry.setShardId(shardId);
  7. recordEntries.add(entry);
  8. PutBlobRecordsResult result = client.putBlobRecords(projectName, topicName, recordEntries);
  9. if (result.getFailedRecordCount() != 0) {
  10. List<ErrorEntry> errors = result.getFailedRecordError();
  11. // deal with result.getFailedRecords()
  12. }
消费Blob数据

获取Cursor,同TupleTopic.

读取数据:

  1. int limit = 100;
  2. String cursor = cursorRs.getCursor();
  3. while (true) {
  4. try {
  5. GetBlobRecordsResult recordRs = client.getBlobRecords(projectName, topicName, shardId, cursor, limit);
  6. List<BlobRecordEntry> recordEntries = recordRs.getRecords();
  7. Assert.assertEquals(recordRs.getRecords().get(0).getData(), data.getBytes());
  8. if (cursor.equals(recordRs.getNextCursor())) {
  9. // no more data , please wait
  10. try {
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. cursor = recordRs.getNextCursor();
  17. } catch (InvalidCursorException ex) {
  18. cursorRs = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
  19. cursor = cursorRs.getCursor();
  20. }
  21. }

最后更新:2016-11-24 18:03:38

  上一篇:go 快速指引__DataHub实时数据通道_大数据计算服务-阿里云
  下一篇:go Web控制台介绍__使用指南_DataHub实时数据通道_大数据计算服务-阿里云