閱讀696 返回首頁    go 阿裏雲 go 技術社區[雲棲]


如何同步TableStore數據到Elasticsearch

圖書館

圖書館Q是一家大型圖書館,圖書館藏書眾多,紙質圖書600多萬冊,電子圖書7000多萬冊,總數有八千多萬冊,這些圖書之前都是人工檢索維護的,現在需要做一個係統來存儲管理這些圖書信息。
需求如下:

  • 圖書總量目前八千多萬冊,考慮到未來二十年的增長,需要係統能支持一億的存儲量。
  • 圖書信息很重要,不能接受丟失發生。
  • 圖書的名字和作者名字需要支持模煳搜索。
  • 每本書的屬性最多有一百多個,且不固定,不同類型的圖書的屬性列差異較大。且未來可能會新增屬性列。

根據上麵這些需求特點,要完成這個管理係統,需要兩類係統支持:

  • 分布式NoSQL數據庫:解決兩億存儲量的問題,解決屬性列較多且不固定的問題,解決可靠性要求高的問題。
  • 搜索係統:解決固定列模煳搜索的需求。

如果使用阿裏雲產品,那麼對應的產品就是:

  • Table Store:分布式NoSQL數據庫。
  • Elasticsearch:搜索係統,支持模煳搜索。

在管理係統中使用上述兩個係統的時候,目前需要雙寫,當新增一本書的時候,需要將詳細書本信息寫入Table Store,將書本ID和作者,書名寫入Elasticsearch,並且對書名,作者建索引。查詢的時候,如果是根據書本ID,則直接查詢Table Store。如果是根據書名模煳查詢,則先查Elasticsearch,獲取到匹配的書本的ID後,再到Table Store中查詢詳細信息。

如果Table Store到Elasticsearch有自動同步通道,那麼隻需要將新書信息寫入Table Store即可,不再需要寫Elasticsearch。減少了一次寫入操作,且不用再考慮數據一致性問題,係統架構大大簡化。那麼如何才能實現這個自動同步通道呢?

目標

類似於上麵的場景,有很多係統都有這樣的需求:擁有PB級海量數據需要持久化存儲,同時有一兩個字段需要做模煳查詢,比如姓名,手機號碼等,目前很多解決方案需要雙寫分布式數據庫和Elasticsearch,但這樣不僅會帶來開發、運維複雜度,而且還有數據不一致的問題。

針對上述問題,Table Store團隊聯合數據集成(CDP)和Elasticsearch團隊上線了近實時的數據同步方案,用戶隻需要將數據寫入Table Store,Table Store會負責將數據在10分鍾內自動發送給Elasticsearch建索引。

相關產品

Table Store:阿裏雲分布式NoSQL數據庫,專注於海量數據的存儲服務,目前單表可支持10PB級,10萬億行以上的數據量,且數據量增大後性能仍然保持穩定。Table Store Stream功能是一種增量實時通道服務,類似於MySQL的binlog,可以通過Stream接口實時讀取到最新的變化數據(Put/Update/Delete)。

數據集成 :阿裏雲數據管理平台,支持數據同步等眾多數據功能。

Elasticsearch :阿裏雲Elasticsearch是剛推出的一項新服務,提供基於開源Elasticsearch及商業版X-Pack插件,致力於數據分析、數據搜索等場景服務。在開源Elasticsearch基礎上提供企業級權限管控、安全監控告警、自動報表生成等功能。



三種產品在新解決方案中的角色如下:

產品 Table Store 數據集成 Elasticsearch
角色 數據存儲 數據同步通道 查詢增強

限製

由於Table Store和Elasticsearch不是完全對等的產品,所以如果需要將數據導入Elasticsearch,那麼在使用Table Store的時候有一些注意的地方:

  • Table Store主鍵列個數:

    • 目前Table Store最大支持4個主鍵列,而Elasticsearch隻支持一個,所以Table Store的表設計時隻能使用一個主鍵列,如果之前有多個主鍵列,可以將多個主鍵列的值轉換成String,然後拚接成一個主鍵列。
  • Table Store數據變化類型:

    • 僅支持PUT(新增),UPDATE(更新)兩種操作。
    • 不支持DELETE操作。
  • Table Store多版本:

    • 僅支持單版本,不支持多版本
  • Elasticsearch:

    • 版本:支持阿裏雲和開源的5.*.*版本。
  • 延時:

    • 目前使用的是周期調度,每隔5分鍾調度一次,再加上插件中有5分鍾延遲,同步總延遲在5~10分鍾。

開通服務

  • Table Store:
    • 登陸Table Store首頁:https://www.aliyun.com/product/ots/
    • 點擊開通。
    • 新建實例和表,表需要開通Stream,有效時間可以選擇24小時。
    • Table Store支持按預留CU和按量付費兩種收費模式,如果創建表時指定讀寫CU都為0則表示為按量付費,後期如果沒有使用則不收費。且目前每月有10GB,1000萬CU的免費額度。
    • Table Store中表需要開通Stream功能。
  • 數據集成
    • 登陸DataWorks綁定AK。
    • 然後創建項目即可。
    • 注意:子賬號不能創建項目,隻能被主賬號授權。
  • Elasticsearch
    • 登陸Elasticsearch首頁:https://data.aliyun.com/product/elasticsearch。
    • 點擊立即購買,購買時的VPC必須和之前購買的ECS在同一個VPC環境內部。
    • 根據數據量預估,購買相應的實例大小。
    • 目前收費是按實例規格收費。

使用方式

    • Table Store:PutRow/BatchWriteRow接口寫入數據
    • Elasticsearch:無須寫入
    • Elasticsearch:搜索到請求結果後,拿到每個doc的_id字段值。
    • Table Store:Elasticsearch中的_id字段就是Table Store中的主鍵值,獲取到一係列_id值後,使用Table Store的BatchGetRow可以查詢到完整數據。

同步流程

  1. 整個同步流程應該包括下麵兩個步驟:
    1. 導出Table Store的全量數據到Elasticsearch,並且記錄開始時間T1。
    2. 等全量導出結束後,再開始同步增量數據,增量數據開始同步的時間是T1。
  2. 對於全量導出,需要使用otsreader插件,配置中的Range使用INF_MIN到INF_MAX,也就是導出所有數據。
  3. 對於增量同步,需要配置起始時間和結束時間為一個變量,在調度周期配置的時候配置起始時間必須小於等於T1,否則可能會有數據丟失發生。
  4. 我們下麵會以增量同步為例來介紹如何配置增量同步任務。

Table Store配置

無須配置

Elasticsearch配置

無須配置

數據集成配置

1. 創建數據源(可選)

  1. 如果已經創建了Table Store的數據源,則可以跳過這一步。
  2. 如果不希望創建數據源,也可以在配置頁麵配置相應的endpoint,instanceName,AccessKeyID和AccessKeySecret。如果希望創建,則按照下麵步驟操作。
  3. 登錄阿裏雲大數據開發套件:數據源地址
  4. 單擊左側 離線同步 > 數據源
  5. 在數據源配置頁麵,選擇右上角 新增數據源 ,會有一個彈出框。
  6. 按照說明填寫:
    • 數據源名稱:填寫一個數據源標識符,比如車聯網。
    • 數據源描述:填入描述符,比如:車聯網GPS數據存儲。
    • 數據源類型:選擇 ots ,ots是Table Store曾用名。
    • OTS Endpoint:填入TableStore 實例頁麵的實例地址,如果Table Store的實例和目標產品(比如Elasticsearch)在同一個region,則可以填入私網地址,否則需要填入公網地址,不能填入VPC地址。
    • OTS 實例ID:填入Table Store的實例名稱。
    • Access Id:填入阿裏雲網站的AccessKeyID。
    • Access Key:填入阿裏雲網站AccessKeyID對應的AccessKeySecret。
    • 點擊 測試連通性 ,如果成功則會在右上角提示:測試連接成功。 如果失敗,點擊endpoint是否配置正確,如果仍然無法解決,提工單聯係數據集成。
    • 填好後的頁麵類似下麵這樣: 配置數據源
  7. 單擊確定,數據源創建成功,此時在數據源頁麵會出現一個新的數據源信息; 新數據源

2. 創建導出任務

  1. 單擊數據集成地址,進入數據集成的頁麵,會出現模式選擇: 模式選擇
  2. 單擊 腳本模式 ,彈出一個 導入模板 配置。
  3. 在導入模板配置裏麵:
    1. 來源類型:OTS Stream
    2. 目標類型:Elasticsearch
      es1
  4. 單擊確認,則進入配置界麵。

3. 完善配置項

在配置界麵,已經提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,每一項配置後麵都做了解釋。

{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "setting": {
      "errorLimit": {
        "record": "0"  # 允許出錯的個數,當錯誤超過這個數目的時候同步任務會失敗。
      },
      "speed": {
        "mbps": "1",  # 每次同步任務的最大流量。
        "concurrent": "1" # 每次同步任務的並發度。
      }
    },
    "reader": {
      "plugin": "otsstream",  # Reader插件的名稱。
      "parameter": {
        "endpoint": "",  # TableStore中實例的endpoint。
        "accessId": "",  # 阿裏雲的AccessKeyID。
        "accessKey": "",  # 阿裏雲的AccessKeySecret。
        "instanceName": "",  # TableStore的實例名,如果使用DataSource,則需要新增配置項datasource,不再需要配置endpoint,accessId,accessKey和instanceName。
        "dataTable": "",  # TableStore中的表名。
        "statusTable": "TableStoreStreamReaderStatusTable", # 存儲TableStore Stream狀態的表,一般不需要修改。
        "startTimestampMillis": "", # 開始導出的時間點。
        "endTimestampMillis": "",   # 結束導出的時間點。
        "date": "yyyyMMdd",  # 導出哪一天的數據,功能和startTimestampMillis、endTimestampMillis重複,這一項需要刪除。
        "mode": "single_version_and_update_only", # TableStore Stream導出數據的格式,目前ElasticSearch隻能接收這種格式的,這個不需要修改。如果配置模板中沒有則需要增加。
        "column":[  # 需要導出TableStore中的哪些列到ElasticSearch中去,如果配置模板中沒有則需要增加。
            {"name":"uid"},  
            {"name":"name"},           
            {"name":"phone"}
        ],
        "isExportSequenceInfo": false, #  # single_version_and_update_only 模式下隻能是false。
        "maxRetries": 30 # 最大重試次數。
      }
    },
    "writer": {
      "plugin": "elasticsearch",  # Writer插件的名稱:ElasticSearchWriter,不需要修改。
      "parameter": {
        "endpoint": "",# ElasticSearch的endpoint,控製台上有。
        "accessId": "",# 如果使用了X-PACK插件,則這裏需要填寫username,如果沒使用,則這裏填空字符串即可。阿裏雲Elasticsearch使用了X-PACK插件,這裏需要填寫username。
        "accessKey": "", # 如果使用了X-PACK插件,則這裏需要填寫password,如果沒使用,則這裏填空字符串即可。阿裏雲Elasticsearch使用了X-PACK插件,這裏需要填寫password。  
        "index": "",  # ElasticSearch的索引名稱,如果之前沒有,插件會自動創建。
        "indexType": "", # ElasticSearch中相應索引下的類型名稱
        "cleanup": true, # 是否在每次導入數據到ElasticSearch的時候清空原有數據,全量導入/重建索引的時候需要設置為true,同步增量的時候必須為false,這裏因為是同步,則需要設置為false。
        "discovery": false, # 是否自動發現,設置為true
        "batchSize": 1000, # 每批導出的個數
        "splitter": ",",  # 如果插入數據是array,就使用指定分隔符。
        "column": [  # ElasticSearch中的列名,順序和Reader中的Column順序一致
          {
            "name": "uid",  # TableStore中的主鍵列是uid,這裏也有同名uid,用type:id表示這一列是主鍵
            "type": "id"  # id表示這一列是主鍵,id不是ElasticSearch的內置類型,是ElasticSearchWriter提供的虛擬類型
          },
          {
            "name": "name", # 對應於TableStore中的屬性列:name
            "type": "text"  # 文本類型,采用默認分詞
          }
        ]
      }
    }
  }
}

其他配置項可以參考:ElasticsearchWriter配置項

4. 保存任務

  1. 點擊上部的 保存 按鈕,則彈出一個對話框,輸入任務名稱後按確定即可。 es2

5. 設置調度資源

  1. 由於目前數據集成還沒辦法自動訪問VPC環境內的Elasticsearch,所以暫時需要用戶自己購買VPC內的ECS機器作為調度資源。
  2. 購買一台ECS,新購ECS所在的VPC需要和Elasticsearch的VPC是同一個。
  3. 進入 DataWorks > 調度資源列表 ,單擊 新增調度資源
  4. 在彈出的對話框中輸入:資源名稱,選擇:歸屬項目。 es3
  5. 然後點擊 確定 ,會提示 添加調度資源成功cdp4
  6. 點擊 管理服務器 ,會彈出一個新的彈出框。 cdp5
  7. 點擊 增加服務器 ,會彈出一個新的彈出框。
    • 選擇 專有網絡
    • 輸入剛剛購買的ECS的:ECS UUID。登陸ECS,以root身份執行命令 bash dmidecode | grep UUID即可獲取到UUID。
    • 輸入剛剛購買的ECS的內網:機器IP。
    • 單擊 添加
      cdp6
  8. 如果是添加機器,需要初始化服務器,點擊 調度資源列表 頁麵相應資源名稱後麵的 服務器初始化 ,會彈出初始化步驟,按這個步驟執行。
  9. 執行完成後,再點擊 調度資源列表 頁麵相應資源名稱後麵的 服務器管理 ,點擊 刷新 ,當 服務狀態 變為 正常 時,表示調度資源配置成功。 cdp8

6. 提交任務

  1. 回到任務配置頁麵 數據集成 > 離線同步 > 同步任務 > 數據同步
  2. 雙擊剛剛創建的任務:tablestore2es,點擊配置內容上部的 提交
  3. 配置調度參數:
    • 調度類型:周期調度
    • 自動重跑:√
    • 生效日期:默認值
    • 調度周期:分鍾
    • 起始時間:默認值
    • 時間間隔:5分鍾
    • 跨周期依賴:可以選擇:自依賴,等待上一調度結束,才能接續運行。 cdp9
  4. 提交任務後,原有任務處於:直讀狀態。

7. 綁定調度資源

  1. 切換到 運維中心 > 任務列表 > 周期任務 ,可以看到剛剛創建的周期任務: cdp10
  2. 選中剛剛創建的周期任務:tablestore2es。點擊下部的 修改資源組 ,會彈出一個選擇框,選擇剛剛創建的資源組名稱:
    cdp11

  3. 點擊 確認 ,即可綁定成功。

8. 驗證結果

  1. 周期任務是從下一天的00:00點開始執行。
  2. 等執行完一個任務後,可以在ECS上通過下述命令查看Elasticsearch中的數據量:
curl -XGET https://endpoint/index_name/type_name/_count?pretty  -d '
{
    "query": {
        "match_all": {}
    }
}'            

結果類似下麵:

{
  "count" : 1000,  # ElasticSearch中index_name索引的type_name類型中的doc數
  "_shards" : {     # 這個是ElasticSearch返回數據相關的meta值,表示總共有5個shard,全部成功返回了結果
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  }
}               

9. 下一步計劃

  1. 至此,TableStore數據通過數據集成同步到Elasticsearch的配置完成了,延遲在5分鍾到10分鍾之間。
  2. 雖然目前可以運行了,但是仍然存在一些問題。
    1. 需要 設置調度資源 ,比較麻煩。預計十二月中旬的時候這一步可以自動處理,不再需要配置。使用會更加簡單。
    2. 延遲在 5~10 分鍾,對於部分係統而言,可能延遲比較大,預計十二月底的時候可以減少到秒級。時效性會更高。
  3. 在使用中有任何問題,可以加入表格存儲釘釘技術交流群:11789671

最後更新:2017-11-21 07:34:26

  上一篇:go  11月20日雲棲精選夜讀:圍觀阿裏總部:邊喝茶邊搞技術是一種怎樣的體驗?
  下一篇:go  Redis數據過期和淘汰策略詳解