如何同步TableStore數據到Elasticsearch
圖書館
圖書館Q是一家大型圖書館,圖書館藏書眾多,紙質圖書600多萬冊,電子圖書7000多萬冊,總數有八千多萬冊,這些圖書之前都是人工檢索維護的,現在需要做一個係統來存儲管理這些圖書信息。
需求如下:
- 圖書總量目前八千多萬冊,考慮到未來二十年的增長,需要係統能支持一億的存儲量。
- 圖書信息很重要,不能接受丟失發生。
- 圖書的名字和作者名字需要支持模煳搜索。
- 每本書的屬性最多有一百多個,且不固定,不同類型的圖書的屬性列差異較大。且未來可能會新增屬性列。
根據上麵這些需求特點,要完成這個管理係統,需要兩類係統支持:
- 分布式NoSQL數據庫:解決兩億存儲量的問題,解決屬性列較多且不固定的問題,解決可靠性要求高的問題。
- 搜索係統:解決固定列模煳搜索的需求。
如果使用阿裏雲產品,那麼對應的產品就是:
- Table Store:分布式NoSQL數據庫。
- Elasticsearch:搜索係統,支持模煳搜索。
在管理係統中使用上述兩個係統的時候,目前需要雙寫,當新增一本書的時候,需要將詳細書本信息寫入Table Store,將書本ID和作者,書名寫入Elasticsearch,並且對書名,作者建索引。查詢的時候,如果是根據書本ID,則直接查詢Table Store。如果是根據書名模煳查詢,則先查Elasticsearch,獲取到匹配的書本的ID後,再到Table Store中查詢詳細信息。
如果Table Store到Elasticsearch有自動同步通道,那麼隻需要將新書信息寫入Table Store即可,不再需要寫Elasticsearch。減少了一次寫入操作,且不用再考慮數據一致性問題,係統架構大大簡化。那麼如何才能實現這個自動同步通道呢?
目標
類似於上麵的場景,有很多係統都有這樣的需求:擁有PB級海量數據需要持久化存儲,同時有一兩個字段需要做模煳查詢,比如姓名,手機號碼等,目前很多解決方案需要雙寫分布式數據庫和Elasticsearch,但這樣不僅會帶來開發、運維複雜度,而且還有數據不一致的問題。
針對上述問題,Table Store團隊聯合數據集成(CDP)和Elasticsearch團隊上線了近實時的數據同步方案,用戶隻需要將數據寫入Table Store,Table Store會負責將數據在10分鍾內自動發送給Elasticsearch建索引。
相關產品
Table Store:阿裏雲分布式NoSQL數據庫,專注於海量數據的存儲服務,目前單表可支持10PB級,10萬億行以上的數據量,且數據量增大後性能仍然保持穩定。Table Store Stream功能是一種增量實時通道服務,類似於MySQL的binlog,可以通過Stream接口實時讀取到最新的變化數據(Put/Update/Delete)。
數據集成 :阿裏雲數據管理平台,支持數據同步等眾多數據功能。
Elasticsearch :阿裏雲Elasticsearch是剛推出的一項新服務,提供基於開源Elasticsearch及商業版X-Pack插件,致力於數據分析、數據搜索等場景服務。在開源Elasticsearch基礎上提供企業級權限管控、安全監控告警、自動報表生成等功能。
三種產品在新解決方案中的角色如下:
產品 | Table Store | 數據集成 | Elasticsearch |
---|---|---|---|
角色 | 數據存儲 | 數據同步通道 | 查詢增強 |
限製
由於Table Store和Elasticsearch不是完全對等的產品,所以如果需要將數據導入Elasticsearch,那麼在使用Table Store的時候有一些注意的地方:
-
Table Store主鍵列個數:
- 目前Table Store最大支持4個主鍵列,而Elasticsearch隻支持一個,所以Table Store的表設計時隻能使用一個主鍵列,如果之前有多個主鍵列,可以將多個主鍵列的值轉換成String,然後拚接成一個主鍵列。
-
Table Store數據變化類型:
- 僅支持PUT(新增),UPDATE(更新)兩種操作。
- 不支持DELETE操作。
-
Table Store多版本:
- 僅支持單版本,不支持多版本
-
Elasticsearch:
- 版本:支持阿裏雲和開源的5.*.*版本。
-
延時:
- 目前使用的是周期調度,每隔5分鍾調度一次,再加上插件中有5分鍾延遲,同步總延遲在5~10分鍾。
開通服務
- Table Store:
- 登陸Table Store首頁:https://www.aliyun.com/product/ots/
- 點擊開通。
- 新建實例和表,表需要開通Stream,有效時間可以選擇24小時。
- Table Store支持按預留CU和按量付費兩種收費模式,如果創建表時指定讀寫CU都為0則表示為按量付費,後期如果沒有使用則不收費。且目前每月有10GB,1000萬CU的免費額度。
- Table Store中表需要開通Stream功能。
- 數據集成
- 登陸DataWorks綁定AK。
- 然後創建項目即可。
- 注意:子賬號不能創建項目,隻能被主賬號授權。
- Elasticsearch
- 登陸Elasticsearch首頁:https://data.aliyun.com/product/elasticsearch。
- 點擊立即購買,購買時的VPC必須和之前購買的ECS在同一個VPC環境內部。
- 根據數據量預估,購買相應的實例大小。
- 目前收費是按實例規格收費。
使用方式
- 寫
- Table Store:PutRow/BatchWriteRow接口寫入數據
- Elasticsearch:無須寫入
- 讀
- Elasticsearch:搜索到請求結果後,拿到每個doc的_id字段值。
- Table Store:Elasticsearch中的_id字段就是Table Store中的主鍵值,獲取到一係列_id值後,使用Table Store的BatchGetRow可以查詢到完整數據。
同步流程
- 整個同步流程應該包括下麵兩個步驟:
- 導出Table Store的全量數據到Elasticsearch,並且記錄開始時間T1。
- 等全量導出結束後,再開始同步增量數據,增量數據開始同步的時間是T1。
- 對於全量導出,需要使用otsreader插件,配置中的Range使用INF_MIN到INF_MAX,也就是導出所有數據。
- 對於增量同步,需要配置起始時間和結束時間為一個變量,在調度周期配置的時候配置起始時間必須小於等於T1,否則可能會有數據丟失發生。
- 我們下麵會以增量同步為例來介紹如何配置增量同步任務。
Table Store配置
無須配置
Elasticsearch配置
無須配置
數據集成配置
1. 創建數據源(可選)
- 如果已經創建了Table Store的數據源,則可以跳過這一步。
- 如果不希望創建數據源,也可以在配置頁麵配置相應的endpoint,instanceName,AccessKeyID和AccessKeySecret。如果希望創建,則按照下麵步驟操作。
- 登錄阿裏雲大數據開發套件:數據源地址。
- 單擊左側 離線同步 > 數據源。
- 在數據源配置頁麵,選擇右上角 新增數據源 ,會有一個彈出框。
- 按照說明填寫:
- 數據源名稱:填寫一個數據源標識符,比如車聯網。
- 數據源描述:填入描述符,比如:車聯網GPS數據存儲。
- 數據源類型:選擇 ots ,ots是Table Store曾用名。
- OTS Endpoint:填入TableStore 實例頁麵的實例地址,如果Table Store的實例和目標產品(比如Elasticsearch)在同一個region,則可以填入私網地址,否則需要填入公網地址,不能填入VPC地址。
- OTS 實例ID:填入Table Store的實例名稱。
- Access Id:填入阿裏雲網站的AccessKeyID。
- Access Key:填入阿裏雲網站AccessKeyID對應的AccessKeySecret。
- 點擊 測試連通性 ,如果成功則會在右上角提示:測試連接成功。 如果失敗,點擊endpoint是否配置正確,如果仍然無法解決,提工單聯係數據集成。
- 填好後的頁麵類似下麵這樣:
- 單擊確定,數據源創建成功,此時在數據源頁麵會出現一個新的數據源信息;
2. 創建導出任務
- 單擊數據集成地址,進入數據集成的頁麵,會出現模式選擇:
- 單擊 腳本模式 ,彈出一個 導入模板 配置。
- 在導入模板配置裏麵:
- 單擊確認,則進入配置界麵。
3. 完善配置項
在配置界麵,已經提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,每一項配置後麵都做了解釋。
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0" # 允許出錯的個數,當錯誤超過這個數目的時候同步任務會失敗。
},
"speed": {
"mbps": "1", # 每次同步任務的最大流量。
"concurrent": "1" # 每次同步任務的並發度。
}
},
"reader": {
"plugin": "otsstream", # Reader插件的名稱。
"parameter": {
"endpoint": "", # TableStore中實例的endpoint。
"accessId": "", # 阿裏雲的AccessKeyID。
"accessKey": "", # 阿裏雲的AccessKeySecret。
"instanceName": "", # TableStore的實例名,如果使用DataSource,則需要新增配置項datasource,不再需要配置endpoint,accessId,accessKey和instanceName。
"dataTable": "", # TableStore中的表名。
"statusTable": "TableStoreStreamReaderStatusTable", # 存儲TableStore Stream狀態的表,一般不需要修改。
"startTimestampMillis": "", # 開始導出的時間點。
"endTimestampMillis": "", # 結束導出的時間點。
"date": "yyyyMMdd", # 導出哪一天的數據,功能和startTimestampMillis、endTimestampMillis重複,這一項需要刪除。
"mode": "single_version_and_update_only", # TableStore Stream導出數據的格式,目前ElasticSearch隻能接收這種格式的,這個不需要修改。如果配置模板中沒有則需要增加。
"column":[ # 需要導出TableStore中的哪些列到ElasticSearch中去,如果配置模板中沒有則需要增加。
{"name":"uid"},
{"name":"name"},
{"name":"phone"}
],
"isExportSequenceInfo": false, # # single_version_and_update_only 模式下隻能是false。
"maxRetries": 30 # 最大重試次數。
}
},
"writer": {
"plugin": "elasticsearch", # Writer插件的名稱:ElasticSearchWriter,不需要修改。
"parameter": {
"endpoint": "",# ElasticSearch的endpoint,控製台上有。
"accessId": "",# 如果使用了X-PACK插件,則這裏需要填寫username,如果沒使用,則這裏填空字符串即可。阿裏雲Elasticsearch使用了X-PACK插件,這裏需要填寫username。
"accessKey": "", # 如果使用了X-PACK插件,則這裏需要填寫password,如果沒使用,則這裏填空字符串即可。阿裏雲Elasticsearch使用了X-PACK插件,這裏需要填寫password。
"index": "", # ElasticSearch的索引名稱,如果之前沒有,插件會自動創建。
"indexType": "", # ElasticSearch中相應索引下的類型名稱
"cleanup": true, # 是否在每次導入數據到ElasticSearch的時候清空原有數據,全量導入/重建索引的時候需要設置為true,同步增量的時候必須為false,這裏因為是同步,則需要設置為false。
"discovery": false, # 是否自動發現,設置為true
"batchSize": 1000, # 每批導出的個數
"splitter": ",", # 如果插入數據是array,就使用指定分隔符。
"column": [ # ElasticSearch中的列名,順序和Reader中的Column順序一致
{
"name": "uid", # TableStore中的主鍵列是uid,這裏也有同名uid,用type:id表示這一列是主鍵
"type": "id" # id表示這一列是主鍵,id不是ElasticSearch的內置類型,是ElasticSearchWriter提供的虛擬類型
},
{
"name": "name", # 對應於TableStore中的屬性列:name
"type": "text" # 文本類型,采用默認分詞
}
]
}
}
}
}
其他配置項可以參考:ElasticsearchWriter配置項
4. 保存任務
5. 設置調度資源
- 由於目前數據集成還沒辦法自動訪問VPC環境內的Elasticsearch,所以暫時需要用戶自己購買VPC內的ECS機器作為調度資源。
- 購買一台ECS,新購ECS所在的VPC需要和Elasticsearch的VPC是同一個。
- 進入 DataWorks > 調度資源列表 ,單擊 新增調度資源 。
- 在彈出的對話框中輸入:資源名稱,選擇:歸屬項目。
- 然後點擊 確定 ,會提示 添加調度資源成功 。
- 點擊 管理服務器 ,會彈出一個新的彈出框。
- 點擊 增加服務器 ,會彈出一個新的彈出框。
- 如果是添加機器,需要初始化服務器,點擊 調度資源列表 頁麵相應資源名稱後麵的 服務器初始化 ,會彈出初始化步驟,按這個步驟執行。
- 執行完成後,再點擊 調度資源列表 頁麵相應資源名稱後麵的 服務器管理 ,點擊 刷新 ,當 服務狀態 變為 正常 時,表示調度資源配置成功。
6. 提交任務
- 回到任務配置頁麵 數據集成 > 離線同步 > 同步任務 > 數據同步 。
- 雙擊剛剛創建的任務:tablestore2es,點擊配置內容上部的 提交 。
- 配置調度參數:
- 提交任務後,原有任務處於:直讀狀態。
7. 綁定調度資源
- 切換到 運維中心 > 任務列表 > 周期任務 ,可以看到剛剛創建的周期任務:
選中剛剛創建的周期任務:tablestore2es。點擊下部的 修改資源組 ,會彈出一個選擇框,選擇剛剛創建的資源組名稱:
點擊 確認 ,即可綁定成功。
8. 驗證結果
- 周期任務是從下一天的00:00點開始執行。
- 等執行完一個任務後,可以在ECS上通過下述命令查看Elasticsearch中的數據量:
curl -XGET https://endpoint/index_name/type_name/_count?pretty -d '
{
"query": {
"match_all": {}
}
}'
結果類似下麵:
{
"count" : 1000, # ElasticSearch中index_name索引的type_name類型中的doc數
"_shards" : { # 這個是ElasticSearch返回數據相關的meta值,表示總共有5個shard,全部成功返回了結果
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
}
}
9. 下一步計劃
- 至此,TableStore數據通過數據集成同步到Elasticsearch的配置完成了,延遲在5分鍾到10分鍾之間。
- 雖然目前可以運行了,但是仍然存在一些問題。
- 需要 設置調度資源 ,比較麻煩。預計十二月中旬的時候這一步可以自動處理,不再需要配置。使用會更加簡單。
- 延遲在 5~10 分鍾,對於部分係統而言,可能延遲比較大,預計十二月底的時候可以減少到秒級。時效性會更高。
- 在使用中有任何問題,可以加入表格存儲釘釘技術交流群:11789671
最後更新:2017-11-21 07:34:26