ceph monitor paxos的實現(一)
ceph monitor的一個主要功能是使用paxos分布式式協議維護一個key/value數據庫的一致性。所使用的數據庫引擎一般是leveldb。
數據庫接口抽象
為了適應不同的數據庫引擎, ceph定義一個MonitorDBStore類來抽象對k/v數據庫的操作。對後端數據庫要求是支持事務或者原子性的key/value批量更新。它定義個一 Transaction類來說明一個事務包含的所有操作,並且這個類是可以序列化和反序列化的,以便在服務器之間傳送:
struct Op {
uint8_t type;
string prefix;
string key, endkey;
bufferlist bl;
}
struct Transaction {
list<Op> ops;
uint64_t bytes, keys;
Transaction() : bytes(0), keys(0) {}
enum {
OP_PUT = 1,
OP_ERASE = 2,
OP_COMPACT = 3,
};
例如它定義個put、erase的成員函數操作:
// 設置一個key的value
void put(string prefix, string key, bufferlist& bl) {
ops.push_back(Op(OP_PUT, prefix, key, bl));
++keys;
bytes += prefix.length() + key.length() + bl.length();
}
// 刪除一個key
void erase(string prefix, string key) {
ops.push_back(Op(OP_ERASE, prefix, key));
++keys;
bytes += prefix.length() + key.length();
}
而序列化、反序列化函數:
void encode(bufferlist& bl) const {
ENCODE_START(2, 1,bl);
::encode(ops,bl);
::encode(bytes, bl);
::encode(keys, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(2, bl);
::decode(ops, bl);
if (struct_v >= 2) {
::decode(bytes,bl);
::decode(keys, bl);
}
DECODE_FINISH(bl);
}
ceph 主要用這個MonitorDBStore來為各個需要使用paxos的模塊提供存儲,為了各個模塊不相互幹擾,每個模塊會選擇一個前綴, 所有屬於這個模塊的數據都使用這個prefix再加上 一個key,才構成後端數據庫真正的key, 具體結構時這樣的:
prefix + '\0' + key
MonitorDBStore的API 主要是
int apply_transaction(MonitorDBStore::TransactionRef t)
負責把Transaction的每一條操作以原子方式在後端數據庫執行,是一個同步操作,而
queue_transaction(MonitorDBStore::TransactionRef t,Context *oncommit)
是一個異步操作,事務完成後會回調一個從Context導出的類對象,類似於C語言中的回調函數。
除此以外,MonitorDBStore還有get操作
int get(const string& prefix, const string& key, bufferlist& bl);
int get(const string& prefix, const version_t ver, bufferlist& bl);
定義迭代器用來批量獲取數據,它可以指定幾個prefix, 並批量把數據追加到一個Transaction裏麵,以便在服務器見批量傳數據, 可以預見加進去的數據操作是put操作
class WholeStoreIteratorImpl : public StoreIteratorImpl {
KeyValueDB::WholeSpaceIterator iter;
set<string> sync_prefixes;
public:
WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator(
set<string> &prefixes) : StoreIteratorImpl(), iter(iter),
sync_prefixes(prefixes)
{ }
bool add_chunk_entry(TransactionRef tx
string &prefix,
string &key,
bufferlist &value,
uint64_t max);
}
paxos數據在MonitorDB上的存放格式
ceph內部使用了log來記錄最近一段時間的操作,log存放在leveldb中,key的前綴‘paxos’被paxos核心模塊保留。每一條log一個key, key的組成是paxos前綴+
index, index是用整數來表示的,順序增加。為了加快log的查詢, 還用"first_committed" "last_committed", 兩個key來表示這段log, 前者是第一條log,後者是最後一條log。
monitor啟動時的數據同步
每次monitor server啟動時都會按照monmap中的服務器地址去連接其他monitor服務器,並同步數據。這個過程叫做bootstrap(). bootstrap的第一個目的是補全數據,從其他服務拉缺失的paxos log或者全量複製數據庫,其次是在必要時形成多數派建立一個paxos集群或者加入到已有的多數派中。
啟動時將自己加入到一個外部法人集合,因為剛開始自己肯定不是在多數派中:
// i'm outside the quorum
if (monmap->contains(name))
outside_quorum.insert(name);
然後給其它所有它知道的服務器發送探測包:
// probe monitors
dout(10) << "probing other monitors" << dendl;
for (unsigned i = 0; i < monmap->size(); i++) {
if ((int)i != rank)
messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
monmap->get_inst(i));
}
for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
p != extra_probe_peers.end();
++p) {
if (*p != messenger->get_myaddr()) {
entity_inst_t i;
i.name = entity_name_t::MON(-1);
i.addr = *p;
messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
}
}
任何一個服務器收到探測包都會比較自己的最後一次修改數據的版本是否落後於正在探測的服務器的數據:
if (!is_probing() && !is_synchronizing()) {
// If the probing mon is way ahead of us, we need to re-bootstrap.
// Normally we capture this case when we initially bootstrap, but
// it is possible we pass those checks (we overlap with
// quorum-to-be) but fail to join a quorum before it moves past
// us. We need to be kicked back to bootstrap so we can
// synchonize, not keep calling elections.
if (paxos->get_version() + 1 < m->paxos_first_version) {
dout(1) << " peer " << m->get_source_addr() << " has first_committed " << "ahead of us, re-bootstrapping" << dendl;
bootstrap();
goto out;
}
}
對於被探測的服務器,如果最後一條log的index number都跟不上對方的第一條記錄的index number,意味著已經落後太多了,中間log記錄已經缺失,不可能讓paxos核心部分通過log來傳播數據到本進程以獲得數據的最終版本,本進程需要重啟bootstrap從對方主動拉數據。此時不會帶對方的探測包返回應答。正常情況,我們會報告本服務器的paxos狀態:
r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
r->name = name;
r->quorum = quorum;
monmap->encode(r->monmap_bl, m->get_connection()->get_features());
r->paxos_first_version = paxos->get_first_committed();
r->paxos_last_version = paxos->get_version();
m->get_connection()->send_message(r);
// did we discover a peer here?
if (!monmap->contains(m->get_source_addr())) {
dout(1) << " adding peer " << m->get_source_addr()
<< " to list of hints" << dendl;
extra_probe_peers.insert(m->get_source_addr());
}
主要內容包括我們是否是多數派的一員(通過返回多數派成員列表),以及我的paxos log的第一條記錄號和最後一條記錄號。
一旦一個發出探測包的服務器收到一個應答也會檢查paxos log是否過時:
if (paxos->get_version() < m->paxos_first_version &&
m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
dout(10) << " peer paxos versions [" << m->paxos_first_version
<< "," << m->paxos_last_version << "]"
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, true);
m->put();
return;
}
if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
dout(10) << " peer paxos version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, false);
m->put();
return;
}
一種情況是我的最後一條log記錄和對方的第一條log記錄之間有空隙,中間有缺失,隻能主動從對方拉數據,道理與上麵相同。還有一種是根據配置變量paxos_max_join_drift,數據並沒有缺失,但是要傳的log超過一個閥值,不如全量從對方複製數據。
輸入探測方發現不需要在這個階段複製數據,並且對方就是多數派的一員,那麼可以肯定它的數據是和其他服務器同步的,至少應該樂觀的認為,:-) ,所以直接加入到多數派去:
if (m->quorum.size()) { // 多數派列表非空
if (monmap->contains(name) &&
!monmap->get_addr(name).is_blank_ip()) {
// i'm part of the cluster; just initiate a new election
// 我的地址他們都知道了, 通過start_election選舉後可以加入多數派
start_election();
} else {
// 需要通知leader把我的地址修改了,然後會probe time會超時後重啟bootstrap
dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
monmap->get_inst(*m->quorum.begin()));
}
}
else {
//如果對方也不是當前多數派的一員,並且是屬於monmap的一員,那麼把它列入到在多數派外麵的人
if (monmap->contains(m->name)) {
dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
outside_quorum.insert(m->name);
} else {
dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
m->put();
return;
}
//一旦發現不在多數派的人數超過2F + 1 (包括自己), 說明集群不存在多數派,就可以通過選舉來形成多數派
unsigned need = monmap->size() / 2 + 1;
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
if (outside_quorum.size() >= need) {
if (outside_quorum.count(name)) {
dout(10) << " that's enough to form a new quorum, calling election" << dendl;
start_election();
} else {
dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
}
} else {
dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
}
}
本章總結
ceph monitor通過bootstrap過程,探測服務器列表中的各個服務器,比對log的最小記錄號和最大記錄號,直到本機數據的log曆史(第一條記錄和最後一條記錄)都與所有其他服務器有交集,說明本機沒有漏掉數據,從而進入多數派的形成過程,為paxos核心部分隻通過傳播log就可以同步數據創造條件。在boostrap階段,服務器分析是否存在一個多數派,必要是通過進入競選形成多數派。在這個階段的全量同步和部分數據傳輸,沒有介紹,因為相對簡單,可以通過閱讀ceph源碼獲得。
本章並未涉及ceph paxos設計最核心部分,有時間再介紹。
最後更新:2017-06-13 11:31:36