Copycat - Overview
Copycat’s primary role is as a framework for building highly consistent, fault-tolerant replicated state machines.
Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state machine on each server.
State machine operations are guaranteed to be applied in the same order on all servers, and Copycat handles the persistence and replication of the state machine state internally.
Copycat是用來管理分布式狀態機的,要保證所有操作以相同的順序在每個server上被執行,從而得到一直的狀態機的狀態
為了做fault-tolerant,當狀態機crash可以恢複,所以要先把operation寫入log,並保證所有server上的log是一致的,這樣隻需要按log回放就可以得到一致的狀態
這種replication技術,成為operation transfer
還有一種是state transfer
也可以參考kudu的論文,kudu
Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log.
The physical storage of each replica of a tablet is fully decoupled.
這樣做對於server的狀態機,或kudu所說的tablet存儲是不感知分布式的,fully decoupled;
用戶使用Copycat,
首先需要創建一個statemachine類,這就是用戶需要同步的對象,
public class MapStateMachine extends StateMachine { }
Copycat replicated state machines are modified and queried by defining operations through which a client and state machine can communicate.
Operations are replicated by the Copycat cluster and are translated into arguments to methods on the replicated state machine.
Users must define the interface between the client and the cluster by implementing Operation
classes that clients will submit to the replicated state machine.
然後用戶要定義,這個StateMachine之上的操作,
操作分為兩類,
Command,可以修改狀態機
Query,隻讀
Command
For example, in a map state machine some commands might include put
and remove
. To implement a state machine command, simply implement theCommand
interface.
public class PutCommand implements Command<Object> { private final Object key; private final Object value; public PutCommand(Object key, Object value) { this.key = key; this.value = value; } public Object key() { return key; } public Object value() { return value; } }
上麵就定義一個put command,這個命令就是要把key:value put到狀態機
Query
Queries are state machine operations that read the system’s state but do not modify it. For example, in a map state machine some queries might include get
, size
, and isEmpty
. To implement a state machine query, implement the Query
interface.
public class GetQuery implements Query<Object> { private final Object key; public GetQuery(Object key) { this.key = key; } public Object key() { return key; } }
再者,要在狀態機上實現這些操作,
Implementing State Machine Operations
State machine operations are implemented as public
methods on the state machine class which accept a singleCommit
parameter where the generic argument for the commit is the operation accepted by the method. Copycat automatically detects the command or query that applies to a given state machine methods based on the generic argument to the Commit
parameter.
public class MapStateMachine extends StateMachine { private Map<Object, Object> map = new HashMap<>(); public Object put(Commit<PutCommand> commit) { try { map.put(commit.operation().key(), commit.operation().value()); } finally { commit.close(); } } public Object get(Commit<GetQuery> commit) { try { return map.get(commit.operation().key()); } finally { commit.close(); } } }
Commit可以認為是command的封裝
snapshot邏輯的實現,
State machine operations are replicated and written to a log on disk on each server in the cluster.
As commands are submitted to the cluster over time, the disk capacity will eventually be consumed.
Copycat must periodically remove unneeded commands from the replicated log to conserve disk space. This is known as log compaction.
log越來越大就需要刪掉老的log,但是為了保證數據不丟,就需要把當前的statemachine做snapshot存儲下來;這樣就可以把當前狀態以前的log給刪除掉
public class MapStateMachine extends StateMachine implements Snapshottable { private Map<Object, Object> map = new HashMap<>(); @Override public void snapshot(SnapshotWriter writer) { writer.writeObject(map); } @Override public void install(SnapshotReader reader) { map = reader.readObject(); } }
For snapshottable state machines, Copycat will periodically request a binary snapshot of the state machine’s state and write the snapshot to disk. If the server is restarted, the state machine’s state will be recovered from the on-disk snapshot. When a new server joins the cluster, the snapshot of the state machine will be replicated to the joining server to catch up its state. This allows Copycat to remove commits that contributed to the snapshot from the replicated log, thus conserving disk space.
最後,創建cluster
1. 先建立一個server,
Once a state machine and its operations have been defined, we can create a CopycatServer
to manage the state machine.
Address address = new Address("123.456.789.0", 5000); CopycatServer.Builder builder = CopycatServer.builder(address); builder.withStateMachine(MapStateMachine::new);
用我們上麵定義的MapStateMachine,拉起server
builder.withTransport(NettyTransport.builder() .withThreads(4) .build()); builder.withStorage(Storage.builder() .withDirectory(new File("logs")) .withStorageLevel(StorageLevel.DISK) .build()); CopycatServer server = builder.build();
可以自定義的,transport和storage
注冊我們定義的command
One final task is necessary to complete the configuration of the server. We’ve created two state machine operations -PutCommand
and GetQuery
- which are Serializable
. By default, Copycat’s serialization framework will serialize these operations using Java’s serialization. However, users can explicitly register serializable classes and implement custom binary serializers for more efficient serialization.
server.serializer().register(PutCommand.class);
server.serializer().register(GetQuery.class);
serializer默認是Java’s serialization,如果對性能有要求,可以自己實現序列化
2. 拉起集群
Bootstrapping the Cluster
Once the server has been built, we can bootstrap a new cluster by calling the bootstrap()
method:
CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();
When a server is bootstrapped, it forms a new single node cluster to which additional servers can be joined.
3. 加入已有集群
Joining an Existing Cluster
Once an initial cluster has been bootstrapped, additional servers can be added to the cluster via the join()
method. When joining an existing cluster, the existing cluster configuration must be provided to the join
method:
Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();
最後更新:2017-04-07 21:25:10