閱讀973 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Copycat - Overview

Copycat’s primary role is as a framework for building highly consistent, fault-tolerant replicated state machines.

Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state machine on each server.

State machine operations are guaranteed to be applied in the same order on all servers, and Copycat handles the persistence and replication of the state machine state internally.

 

Copycat是用來管理分布式狀態機的,要保證所有操作以相同的順序在每個server上被執行,從而得到一直的狀態機的狀態

為了做fault-tolerant,當狀態機crash可以恢複,所以要先把operation寫入log,並保證所有server上的log是一致的,這樣隻需要按log回放就可以得到一致的狀態

這種replication技術,成為operation transfer

還有一種是state transfer

參考,Data replication 同步技術

 

也可以參考kudu的論文,kudu

Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log. 
The physical storage of each replica of a tablet is fully decoupled.

這樣做對於server的狀態機,或kudu所說的tablet存儲是不感知分布式的,fully decoupled;

 

用戶使用Copycat,

首先需要創建一個statemachine類,這就是用戶需要同步的對象,

public class MapStateMachine extends StateMachine {
}

 

 

Copycat replicated state machines are modified and queried by defining operations through which a client and state machine can communicate.

Operations are replicated by the Copycat cluster and are translated into arguments to methods on the replicated state machine. 
Users must define the interface between the client and the cluster by implementing Operation classes that clients will submit to the replicated state machine.

然後用戶要定義,這個StateMachine之上的操作,

操作分為兩類,

Command,可以修改狀態機

Query,隻讀

 

Command

For example, in a map state machine some commands might include put and remove. To implement a state machine command, simply implement theCommand interface.

複製代碼
public class PutCommand implements Command<Object> {
  private final Object key;
  private final Object value;

  public PutCommand(Object key, Object value) {
    this.key = key;
    this.value = value;
  }

  public Object key() {
    return key;
  }

  public Object value() {
    return value;
  }
}
複製代碼

上麵就定義一個put command,這個命令就是要把key:value put到狀態機

 

Query

Queries are state machine operations that read the system’s state but do not modify it. For example, in a map state machine some queries might include getsize, and isEmpty. To implement a state machine query, implement the Queryinterface.

複製代碼
public class GetQuery implements Query<Object> {
  private final Object key;

  public GetQuery(Object key) {
    this.key = key;
  }

  public Object key() {
    return key;
  }
}
複製代碼

 

再者,要在狀態機上實現這些操作,

Implementing State Machine Operations

State machine operations are implemented as public methods on the state machine class which accept a singleCommitparameter where the generic argument for the commit is the operation accepted by the method. Copycat automatically detects the command or query that applies to a given state machine methods based on the generic argument to the Commitparameter.

複製代碼
public class MapStateMachine extends StateMachine {
  private Map<Object, Object> map = new HashMap<>();

  public Object put(Commit<PutCommand> commit) {
    try {
      map.put(commit.operation().key(), commit.operation().value());
    } finally {
      commit.close();
    }
  }

  public Object get(Commit<GetQuery> commit) {
    try {
      return map.get(commit.operation().key());
    } finally {
      commit.close();
    }
  }
}
複製代碼

Commit可以認為是command的封裝

snapshot邏輯的實現,

State machine operations are replicated and written to a log on disk on each server in the cluster.

As commands are submitted to the cluster over time, the disk capacity will eventually be consumed. 
Copycat must periodically remove unneeded commands from the replicated log to conserve disk space. This is known as log compaction.

log越來越大就需要刪掉老的log,但是為了保證數據不丟,就需要把當前的statemachine做snapshot存儲下來;這樣就可以把當前狀態以前的log給刪除掉

複製代碼
public class MapStateMachine extends StateMachine implements Snapshottable {
  private Map<Object, Object> map = new HashMap<>();

  @Override
  public void snapshot(SnapshotWriter writer) {
    writer.writeObject(map);
  }

  @Override
  public void install(SnapshotReader reader) {
    map = reader.readObject();
  }
}
複製代碼

For snapshottable state machines, Copycat will periodically request a binary snapshot of the state machine’s state and write the snapshot to disk. If the server is restarted, the state machine’s state will be recovered from the on-disk snapshot. When a new server joins the cluster, the snapshot of the state machine will be replicated to the joining server to catch up its state. This allows Copycat to remove commits that contributed to the snapshot from the replicated log, thus conserving disk space.

 

最後,創建cluster

1. 先建立一個server,

Once a state machine and its operations have been defined, we can create a CopycatServer to manage the state machine.

Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);
builder.withStateMachine(MapStateMachine::new);

用我們上麵定義的MapStateMachine,拉起server

複製代碼
builder.withTransport(NettyTransport.builder()
  .withThreads(4)
  .build());

builder.withStorage(Storage.builder()
  .withDirectory(new File("logs"))
  .withStorageLevel(StorageLevel.DISK)
  .build());

CopycatServer server = builder.build();
複製代碼

可以自定義的,transport和storage

注冊我們定義的command

One final task is necessary to complete the configuration of the server. We’ve created two state machine operations -PutCommand and GetQuery - which are Serializable. By default, Copycat’s serialization framework will serialize these operations using Java’s serialization. However, users can explicitly register serializable classes and implement custom binary serializers for more efficient serialization.

server.serializer().register(PutCommand.class);
server.serializer().register(GetQuery.class);

serializer默認是Java’s serialization,如果對性能有要求,可以自己實現序列化

 

2. 拉起集群

Bootstrapping the Cluster

Once the server has been built, we can bootstrap a new cluster by calling the bootstrap() method:

CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();

When a server is bootstrapped, it forms a new single node cluster to which additional servers can be joined.

 

3. 加入已有集群

Joining an Existing Cluster

Once an initial cluster has been bootstrapped, additional servers can be added to the cluster via the join()method. When joining an existing cluster, the existing cluster configuration must be provided to the joinmethod:

Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();

最後更新:2017-04-07 21:25:10

  上一篇:go MongoDB CPU 利用率高,怎麼破?
  下一篇:go 改變iOS app的icon(iOS10.3)