阅读128 返回首页    go 微信


多方式写入__高级特性_DataHub实时数据通道_大数据计算服务-阿里云

多方式数据写入(Hash/PartitionKey

DataHub服务支持三种写入方式:

按ShardID写入

指定写入某个Shard,该场景主要用于用户需要保证每个通道中数据有序,因此需要将部分数据指定写入到某个Shard中。样例代码:

  1. // 新建client
  2. Account account = new AliyunAccount("your access id", "your access key");
  3. DatahubConfiguration conf = new DatahubConfiguration(account, "datahub endpoint");
  4. DatahubClient client = new DatahubClient(conf);
  5. // 构造需要上传的records
  6. RecordSchema schema = client.getTopic("projectName", "topicName").getRecordSchema();
  7. List<RecordEntry> recordEntries = new ArrayList<~>();
  8. RecordEntry entry = new RecordEntry(schema);
  9. for (int i=0; i<entry.getFieldCount(); i++) {
  10. entry.setBigint(i, 1);
  11. }
  12. entry.setShardId("shardId");
  13. recordEntries.add(entry);
  14. // 数据写入
  15. client.putRecords("projectName", "topicName", recordEntries);

按HashKey写入

指定一个128 bit的MD5值。 按照HashKey写入,根据Shard的beginHashKey与endHashKey决定数据写入的Shard。

该种方式的写入场景主要用于用户不关心数据的写入顺序,根据某个字段值或用户维护的key来进行写入。

  1. // 新建client
  2. Account account = new AliyunAccount("your access id", "your access key");
  3. DatahubConfiguration conf = new DatahubConfiguration(account, "datahub endpoint");
  4. DatahubClient client = new DatahubClient(conf);
  5. // 构造需要上传的records
  6. RecordSchema schema = client.getTopic("projectName", "topicName").getRecordSchema();
  7. List<RecordEntry> recordEntries = new ArrayList<~>();
  8. RecordEntry entry = new RecordEntry(schema);
  9. for (int i=0; i<entry.getFieldCount(); i++) {
  10. entry.setBigint(i, 1);
  11. }
  12. entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");
  13. recordEntries.add(entry);
  14. // 数据写入
  15. client.putRecords("projectName", "topicName", recordEntries);

按PartitionKey写入

指定一个String类型参数作为PartitionKey,系统根据该String的MD5值以及Shard的beginHashKey与endHashKey决定写入的Shard。

该种方式的应用场景与按HashKey写入方式类似,区别在于用户不需要提供固定范围的HashKey,而是通过一个字符串Key,系统会计算出其对应的HashKey进行写入。

  1. // 新建client
  2. Account account = new AliyunAccount("your access id", "your access key");
  3. DatahubConfiguration conf = new DatahubConfiguration(account, "datahub endpoint");
  4. DatahubClient client = new DatahubClient(conf);
  5. // 构造需要上传的records
  6. RecordSchema schema = client.getTopic("projectName", "topicName").getRecordSchema();
  7. List<RecordEntry> recordEntries = new ArrayList<~>();
  8. RecordEntry entry = new RecordEntry(schema);
  9. for (int i=0; i<entry.getFieldCount(); i++) {
  10. entry.setBigint(i, 1);
  11. }
  12. entry.setPartitionKey("TestPartitionKey");
  13. recordEntries.add(entry);
  14. // 数据写入
  15. client.putRecords("projectName", "topicName", recordEntries);

最后更新:2016-11-23 18:01:58

  上一篇:go 扩容缩容Merge/Split__高级特性_DataHub实时数据通道_大数据计算服务-阿里云
  下一篇:go 批量数据通道概要__SDK介绍_批量数据通道_大数据计算服务-阿里云