Cassandra使用pycassa批量導入數據
本周接手了一個Cassandra係統的維護工作,有一項是需要將應用方的數據導入我們維護的Cassandra集群,並且為應用方提供HTTP的方式訪問服務。這是我第一次接觸KV係統,原來隻是走馬觀花似的看過KV啊,NoSQL啊。但是實際上沒有實際的使用經驗。經過兩天的學習和接手,終於搞明白了在生產環境中的使用方式。在此簡要的筆記一下。本文主要包括的內容有:
Cassandra的簡介,
Cassandra的相關CLI
Cassandra的Python API,並且給出一個批量導入數據的例子。
1. Cassandra簡介
Cassandra的主要特點就是它不是一個數據庫,而是由一堆數據庫節點共同構成的一個分布式網絡服務,對Cassandra 的一個寫操作,會被複製到其他節點上去,對Cassandra的讀操作,也會被路由到某個節點上麵去讀取。對於一個Cassandra群集來說,擴展性能 是比較簡單的事情,隻管在群集裏麵添加節點就可以了。
Cassandra是一個混合型的非關係的數據庫,類似於Google的BigTable。其主要功能比 Dynomite(分布式的Key-Value存 儲係統)更豐富,但支持度卻不如文檔存儲MongoDB(介於關係數據庫和非關係數據庫之間的開源產品,是非關係數據庫當中功能最豐富,最像關係數據庫 的。支持的數據結構非常鬆散,是類似json的bjson格式,因此可以存儲比較複雜的數據類型。)Cassandra最初由Facebook開發,後轉變成了開源項目。它是一個網絡社交雲計算方麵理想的數據庫。以Amazon專有的完全分布式的Dynamo為基礎,結合了Google BigTable基於列族(Column Family)的數據模型。P2P去中心化的存儲。很多方麵都可以稱之為Dynamo 2.0。
和其他數據庫比較,有幾個突出特點:
- 模式靈活 :使用Cassandra,像文檔存儲,你不必提前解決記錄中的字段。你可以在係統運行時隨意的添加或移除字段。這是一個驚人的效率提升,特別是在大型部 署上。
- 真正的可擴展性 :Cassandra是純粹意義上的水平擴展。為給集群添加更多容量,可以指向另一台電腦。你不必重啟任何進程,改變應用查詢,或手動遷移任何數據。
- 多數據中心識別 :你可以調整你的節點布局來避免某一個數據中心起火,一個備用的數據中心將至少有每條記錄的完全複製。
一些使Cassandra提高競爭力的其他功能:
- 範圍查詢 :如果你不喜歡全部的鍵值查詢,則可以設置鍵的範圍來查詢。
- 列表數據結構 :在混合模式可以將超級列添加到5維。對於每個用戶的索引,這是非常方便的。
- 分布式寫操作 :可以在任何地方任何時間集中讀或寫任何數據。並且不會有任何單點失敗。
2. 基礎命令
連接
./cassandra-cli-h 10.224.52.73 -port 9160
集群式自動負載的,因此連接任意一個節點即可。
Check schema
show schema;
在創建了schema或者列族後,可以使用時命令確認是否成功
在運行改命令前,需要使用命令use keyspace_name; 否則會遇到以下錯誤:
Not authorized to a working keyspace
list
list column_family_name;
可以顯示列族的前100列。
3. 批量導入
實驗數據來自搜狗實驗室的中文詞語搭配庫,https://www.sogou.com/labs/dl/r.html。
數據格式如下:
詞語1_詞語2 \t 兩個詞共同出現的次數
在這裏並不討論該數據的具體意義,隻是以這個數據為起點來說明如何向應用方提供服務。
部分實際數據:
都要_打牌>--4
等候_一次>--26
本刊_重要>--3 關係_全方位>14 加熱_迅速>--107
設計列族名為 test_only, cli 如下:
create column family test_only
with column_type = 'Standard'
andcomparator = 'UTF8Type'
anddefault_validation_class = 'BytesType'
andkey_validation_class = 'UTF8Type'
andread_repair_chance = 0.1
anddclocal_read_repair_chance = 0.0
andgc_grace = 864000
andmin_compaction_threshold = 4
andmax_compaction_threshold = 32
andreplicate_on_write = true
andcompaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
andcaching = 'KEYS_ONLY'
and column_metadata = [
{column_name : 'count',
validation_class : UTF8Type}]
andcompression_options = {'sstable_compression' :'org.apache.cassandra.io.compress.SnappyCompressor'};
連接到Cassandra:pycassa.ConnectionPool(‘keyspace_name’, server_list)
具體到我們的例子就是:
con = pycassa.ConnectionPool('History',server_list=["server1:9160", "server2:9160","server3:9160"])
獲取列族:
cf = pycassa.ColumnFamily(con, cfName)
插入一條數據:
cf.insert('row_key', {'col_name': 'col_val'})
批量插入:
cf.batch_insert({'row1': {'name1': 'val1', 'name2': 'val2'}, 'row2': {'foo': 'bar'}})
獲取一條數據:
cf.get(‘row_key’)
獲取某一列的值:
cf.get(‘row_key’)[‘column_name’]
下麵是具體的代碼實現:
import pycassa import time batch_size = 100 def pycassa_connect(): #start = time.time() return pycassa.ConnectionPool('History', server_list=["192.168.1.20:9160"]) #end = time.time() #print "Mola init time: ", (end - start) def batch_insert(file_path, cf): global batch_size f = open(file_path, "r") count=0 error_count = 0 kvmap = {} for line in f:-- list = line.split("\t") if len(list) != 2 : print "skip error data" continue column = {} column['count'] = list[1].replace('\n', '') try: kvmap[list[0].decode('gb2312').encode('utf-8')] = column- if len(kvmap) % batch_size == 0: cf.batch_insert(kvmap) kvmap.clear() count = count + 1 except Exception, ex: print "found execption" print ex error_count = error_count + 1 f.close() if len(kvmap) > 0 : cf.batch_insert(kvmap) ---- for key in kvmap: print "key is %s, value is %s"%(key, kvmap[key])- print "total insert data is %d, error is %d"%(count, error_count)
如何測試數據是正確的?
def test_after_insert(file_path, cf): f = open(file_path, "r") error_count=0 print "Test started" for line in f:-- list = line.split("\t") if len(list) != 2 : print "skip error data" continue count = list[1].replace('\n', '') if cf.get(list[0].decode('gb2312').encode('utf-8'))['count'] != count: print "Key %s doesn't match value %s"%(list[0].decode('gb2312').encode('utf-8'), count) error_count = error_count + 1 print "Test completed, found %d error(s)."%error_count f.close()
最後更新:2017-04-03 05:40:09