Copycat - MemberShip
為了便於實現,Copycat把member分成3種,
active, passive, and reserve members — each of which play some role in supporting rapid replacement of failed servers.
Active members are full voting members which participate in all aspects of the Raft consensus algorithm. Active servers are always in one of the Raft states — follower, candidate, or leader — at any given time.
Active member包含Raft協議中的follower, candidate, or leader 角色,即正常使用的member
Passive member
When a new server is added to a Raft cluster, the server typically must be caught up to within some bound of the leader before it can become a full voting member of the cluster. Adding a new server without first warming up its log will result in some period of decreased availability.
Systems can maintain servers that are virtually kept in sync with the rest of the cluster at all times. We call these servers passive servers.
那些隻是和集群其他的member保持同步,但不參加vote流程的member,稱為Passive
有passive節點的好處,是當需要加Active節點和替換fail的Active節點時,不需要catch up的過程,直接替換就可以
passive節點,還可以作為隻讀節點
Passive Replication
Passive節點的同步catchup,不是從leader直接接收AppendEntries RPCs
Each follower is responsible for sending AppendEntries RPCs to a subset of passive servers at regular intervals.
- Each follower sends AppendEntries RPCs only to a subset of passive servers
- Followers send only committed entries to passive servers
Reserve Members
For large clusters, though, the overhead of maintaining passive servers can by itself become a drain on the cluster’s resources.
Each additional passive server imposes the overhead of replicating all committed log entries, and this is significant even if done by followers. Thus, to ease the load on large clusters, we introduce the reserve member type.
對於比較大的集群,維護passive member的代價也很高;所以為了降低這個成本,又加入reserve member
Reserve members serve as standbys to passive members.
Reserve servers do not maintain state machines and need not known about committed entries.
However, because reserve servers can be promoted to passive, they do need to have some mechanism for learning about configuration changes.
Reserve是passive的standby,這樣就不用維護太多的passive,當一個passive升級成active後,再把一個Reserve加入到Passive中
所以Reserve不需要存儲完整的machine state,但是需要知道configuration的變化
Member
public interface Member { enum Type { INACTIVE, RESERVE, PASSIVE, ACTIVE, } CompletableFuture<Void> promote(); CompletableFuture<Void> demote(); }
定義member有幾種類型,然後可以promote和demote
ServerMember
public final class ServerMember implements Member, CatalystSerializable, AutoCloseable { @Override public CompletableFuture<Void> promote() { return configure(Type.values()[type.ordinal() + 1]); } @Override public CompletableFuture<Void> demote() { return configure(Type.values()[type.ordinal() - 1]); } /** * Recursively reconfigures the cluster. */ private void configure(Member.Type type, CompletableFuture<Void> future) { // Set a timer to retry the attempt to leave the cluster. configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> { configure(type, future); }); // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state. // Non-leader states should forward the request to the leader if there is one. Leader states // will log, replicate, and commit the reconfiguration. cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() //往ServerState發送reconfigure請求 .withIndex(cluster.getConfiguration().index()) .withTerm(cluster.getConfiguration().term()) .withMember(new ServerMember(type, serverAddress(), clientAddress(), updated)) .build()).whenComplete((response, error) -> { //....... }); }
下麵看看ServerState
ServerState,定義各種可以接受的request
public interface ServerState extends Managed<ServerState> { CopycatServer.State type(); CompletableFuture<RegisterResponse> register(RegisterRequest request); CompletableFuture<ConnectResponse> connect(ConnectRequest request, Connection connection); CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request); CompletableFuture<UnregisterResponse> unregister(UnregisterRequest request); CompletableFuture<PublishResponse> publish(PublishRequest request); CompletableFuture<ConfigureResponse> configure(ConfigureRequest request); CompletableFuture<InstallResponse> install(InstallRequest request); CompletableFuture<JoinResponse> join(JoinRequest request); CompletableFuture<ReconfigureResponse> reconfigure(ReconfigureRequest request); CompletableFuture<LeaveResponse> leave(LeaveRequest request); CompletableFuture<AppendResponse> append(AppendRequest request); CompletableFuture<PollResponse> poll(PollRequest request); CompletableFuture<VoteResponse> vote(VoteRequest request); CompletableFuture<CommandResponse> command(CommandRequest request); CompletableFuture<QueryResponse> query(QueryRequest request); }
AbstractState
public abstract class AbstractState implements ServerState { /** * Forwards the given request to the leader if possible. */ protected <T extends Request, U extends Response> CompletableFuture<U> forward(T request) { CompletableFuture<U> future = new CompletableFuture<>(); context.getConnections().getConnection(context.getLeader().serverAddress()).whenComplete((connection, connectError) -> { if (connectError == null) { connection.<T, U>send(request).whenComplete((response, responseError) -> { if (responseError == null) { future.complete(response); } else { future.completeExceptionally(responseError); } }); } else { future.completeExceptionally(connectError); } }); return future; } /** * Updates the term and leader. */ protected boolean updateTermAndLeader(long term, int leader) { // If the request indicates a term that is greater than the current term or no leader has been // set for the current term, update leader and term. if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) { context.setTerm(term); context.setLeader(leader); // Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset(); return true; } return false; } }
主要是增加一些工具接口
InactiveState
對於InactiveState,隻能響應configure請求,其他請求都是返回error
class InactiveState extends AbstractState { @Override public CompletableFuture<ConfigureResponse> configure(ConfigureRequest request) { updateTermAndLeader(request.term(), request.leader()); Configuration configuration = new Configuration(request.index(), request.term(), request.timestamp(), request.members()); // Configure the cluster membership. This will cause this server to transition to the // appropriate state if its type has changed. context.getClusterState().configure(configuration); // 更新clusterState中對應server的state // If the configuration is already committed, commit it to disk. // Check against the actual cluster Configuration rather than the received configuration in // case the received configuration was an older configuration that was not applied. if (context.getCommitIndex() >= context.getClusterState().getConfiguration().index()) { context.getClusterState().commit(); } return CompletableFuture.completedFuture(logResponse(ConfigureResponse.builder() .withStatus(Response.Status.OK) .build())); }
context.getClusterState().configure
首先看看什麼是ClusterState
Manages the persistent state of the Copycat cluster from the perspective of a single server
也就是說,每個server上都會保存ClusterState,來了解整個cluster的情況
並且ClusterState.member用來表示self server
ClusterState.members用來記錄cluster中所有的member的狀態
這裏configure的邏輯,主要就是根據傳入的configuration來更新member和members
context.getClusterState().commit()
核心邏輯,是把更新的配置commit到disk
if (context.getMetaStore().loadConfiguration().index() < configuration.index()) { context.getMetaStore().storeConfiguration(configuration); }
ReserveState
響應append請求
class ReserveState extends InactiveState { @Override public CompletableFuture<AppendResponse> append(AppendRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); // Update the local commitIndex and globalIndex. context.setCommitIndex(request.commitIndex()); context.setGlobalIndex(request.globalIndex()); //Sets the maximum compaction index for major compaction
return CompletableFuture.completedFuture(logResponse(AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(true) .withLogIndex(0) .build())); }
AbstractState.updateTermAndLeader
protected boolean updateTermAndLeader(long term, int leader) { // If the request indicates a term that is greater than the current term or no leader has been // set for the current term, update leader and term. if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) { context.setTerm(term); //如果term,leader是新的更新context信息 context.setLeader(leader); // Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset(); //需要reset到最新commited配置,避免老leader的髒數據 return true; } return false; }
context.setCommitIndex
ServerContext setCommitIndex(long commitIndex) { long previousCommitIndex = this.commitIndex; if (commitIndex > previousCommitIndex) { this.commitIndex = commitIndex; log.commit(Math.min(commitIndex, log.lastIndex())); //log commit到這個index long configurationIndex = cluster.getConfiguration().index(); if (configurationIndex > previousCommitIndex && configurationIndex <= commitIndex) { cluster.commit(); //commit cluster的configuration } } return this; }
響應command,query,register, keepalive, unregister, join, leave, reconfigure
都是forward到leader
也就是說,reserve以上的state都至少會轉發這些request到leader
PassiveState
對於passive state,主要是和其他的member完成同步
在open的時候會先truncate,沒有commit的log,即所有PassiveState往後的state都有這個操作
@Override public CompletableFuture<ServerState> open() { return super.open() .thenRun(this::truncateUncommittedEntries) .thenApply(v -> this); }
private void truncateUncommittedEntries() { if (type() == CopycatServer.State.PASSIVE) { context.getLog().truncate(Math.min(context.getCommitIndex(), context.getLog().lastIndex())); } }
open是何時調用的,
ServerContext.transition,在state遷移時,
// Close the old state. try { this.state.close().get(); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("failed to close Raft state", e); } // Force state transitions to occur synchronously in order to prevent race conditions. try { this.state = createState(state); this.state.open().get(); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("failed to initialize Raft state", e); }
會調用old state的close,並且調用新state的open
注意後麵的get,說明這裏是block同步執行的
響應connect請求,注冊connection
context.getStateMachine().executor().context().sessions().registerConnection(request.client(), connection);
響應append請求,這裏可以看到passive的append請求的邏輯要比reserve複雜,因為reserve隻需要同步config,而passive需要同步狀態機的數據
@Override public CompletableFuture<AppendResponse> append(final AppendRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); return CompletableFuture.completedFuture(logResponse(handleAppend(request))); }
handleAppend
protected AppendResponse handleAppend(AppendRequest request) { // If the request term is less than the current term then immediately // reply false and return our current term. The leader will receive // the updated term and step down. if (request.term() < context.getTerm()) { LOGGER.debug("{} - Rejected {}: request term is less than the current term ({})", context.getCluster().member().address(), request, context.getTerm()); return AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(false) .withLogIndex(context.getLog().lastIndex()) .build(); } else { return checkGlobalIndex(request); } }
當request的term比較舊的時候,直接回複拒絕append,因為這說明這個leader已經是過期的,他需要stepdown
checkGlobalIndex,調用appendEntries
/** * Appends entries to the local log. */ protected AppendResponse appendEntries(AppendRequest request) { // Get the last entry index or default to the request log index. long lastEntryIndex = request.logIndex(); if (!request.entries().isEmpty()) { lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex(); } // Ensure the commitIndex is not increased beyond the index of the last entry in the request. long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex)); // Append entries to the log starting at the last log index. for (Entry entry : request.entries()) { // If the entry index is greater than the last index and less than the commit index, append the entry. // We perform no additional consistency checks here since passive members may only receive committed entries. if (context.getLog().lastIndex() < entry.getIndex() && entry.getIndex() <= commitIndex) { context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } // Update the context commit and global indices. context.setCommitIndex(commitIndex); context.setGlobalIndex(request.globalIndex()); // Apply commits to the state machine in batch. context.getStateMachine().applyAll(context.getCommitIndex()); return AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(true) .withLogIndex(context.getLog().lastIndex()) .build(); }
關鍵就是中間的注釋,
當entry index 大於last index,並小於commit index時,就append entry
響應query請求,passive在某些條件下,也可以響應query請求
@Override public CompletableFuture<QueryResponse> query(QueryRequest request) { context.checkThread(); logRequest(request); // If the query was submitted with RYW or monotonic read consistency, attempt to apply the query to the local state machine. if (request.query().consistency() == Query.ConsistencyLevel.SEQUENTIAL) { // If this server has not yet applied entries up to the client's session ID, forward the // query to the leader. This ensures that a follower does not tell the client its session // doesn't exist if the follower hasn't had a chance to see the session's registration entry. if (context.getStateMachine().getLastApplied() < request.session()) { LOGGER.debug("{} - State out of sync, forwarding query to leader"); return queryForward(request); } // If the commit index is not in the log then we've fallen too far behind the leader to perform a local query. // Forward the request to the leader. if (context.getLog().lastIndex() < context.getCommitIndex()) { LOGGER.debug("{} - State out of sync, forwarding query to leader"); return queryForward(request); } QueryEntry entry = context.getLog().create(QueryEntry.class) .setIndex(request.index()) .setTerm(context.getTerm()) .setTimestamp(System.currentTimeMillis()) .setSession(request.session()) .setSequence(request.sequence()) .setQuery(request.query()); return queryLocal(entry); //查詢local數據 } else { return queryForward(request); } }
ActiveState
abstract class ActiveState extends PassiveState可以看到ActiveState是abstract,因為active member一定是follower,candidator,leader中的一種
同時Active member作為正式的member,需要響應如,poll,vote,append等請求
先看看append請求,和passive有什麼不同
@Override public CompletableFuture<AppendResponse> append(final AppendRequest request) { context.checkThread(); logRequest(request); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and transition to follower. boolean transition = updateTermAndLeader(request.term(), request.leader()); //如果request的term比當前的term大,說明你肯定不是leader CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request))); // If a transition is required then transition back to the follower state. // If the node is already a follower then the transition will be ignored. if (transition) { context.transition(CopycatServer.State.FOLLOWER); //切換到Follower } return future; }
handleAppend最終仍然調用到,
appendEntries
// Iterate through request entries and append them to the log. for (Entry entry : request.entries()) { // If the entry index is greater than the last log index, skip missing entries. if (context.getLog().lastIndex() < entry.getIndex()) { context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } else if (context.getCommitIndex() >= entry.getIndex()) { continue; } else { // Compare the term of the received entry with the matching entry in the log. long term = context.getLog().term(entry.getIndex()); if (term != 0) { if (entry.getTerm() != term) { // We found an invalid entry in the log. Remove the invalid entry and append the new entry. // If appending to the log fails, apply commits and reply false to the append request. LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address()); context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } else { context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } }
主要這塊邏輯和Passive是不一樣的,
當entry index > last index,不管commit index,就直接append entry
如果entry index<= last index並小於commit index, 忽略這個entry
如果entry index<= last index並大於commit index,說明有髒數據,所以truncate到entry.getIndex() - 1,繼續append entry
響應poll請求,
/** * Handles a poll request. */ protected PollResponse handlePoll(PollRequest request) { // If the request term is not as great as the current context term then don't // vote for the candidate. We want to vote for candidates that are at least // as up to date as us. if (request.term() < context.getTerm()) { LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request); return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(false) //拒絕 .build(); } else if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) { return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(true) //接受 .build(); } else { return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(false) .build(); } }
isLogUpToDate
boolean isLogUpToDate(long lastIndex, long lastTerm, Request request) { // If the log is empty then vote for the candidate. if (context.getLog().isEmpty()) { LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request); return true; } // Read the last entry index and term from the log. long localLastIndex = context.getLog().lastIndex(); long localLastTerm = context.getLog().term(localLastIndex); // If the candidate's last log term is lower than the local log's last entry term, reject the request. if (lastTerm < localLastTerm) { LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower term than the local log ({})", context.getCluster().member().address(), request, lastTerm, localLastTerm); return false; } // If the candidate's last term is equal to the local log's last entry term, reject the request if the // candidate's last index is less than the local log's last index. If the candidate's last log term is // greater than the local log's last term then it's considered up to date, and if both have the same term // then the candidate's last index must be greater than the local log's last index. if (lastTerm == localLastTerm && lastIndex < localLastIndex) { LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower index than the local log ({})", context.getCluster().member().address(), request, lastIndex, localLastIndex); return false; } // If we made it this far, the candidate's last term is greater than or equal to the local log's last // term, and if equal to the local log's last term, the candidate's last index is equal to or greater // than the local log's last index. LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request); return true; }
接受poll請求的規則,
如果當前log是empty的,那麼隻能接受
如果request term小於 local term,拒絕
如果request term等於 local term,但是request index < local index,拒絕
總之,我隻同意比我要更新的candidator,at least as up to date as us
響應vote請求
/** * Handles a vote request. */ protected VoteResponse handleVote(VoteRequest request) { // If the request term is not as great as the current context term then don't // vote for the candidate. We want to vote for candidates that are at least // as up to date as us. if (request.term() < context.getTerm()) { //request的term舊 LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If a leader was already determined for this term then reject the request. else if (context.getLeader() != null) { //已經有leader LOGGER.debug("{} - Rejected {}: leader already exists", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If the requesting candidate is not a known member of the cluster (to this // node) then don't vote for it. Only vote for candidates that we know about. else if (!context.getClusterState().getRemoteMemberStates().stream().<Integer>map(m -> m.getMember().id()).collect(Collectors.toSet()).contains(request.candidate())) { LOGGER.debug("{} - Rejected {}: candidate is not known to the local member", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If no vote has been cast, check the log and cast a vote if necessary. else if (context.getLastVotedFor() == 0) { //如果還沒有vote其他的candidator if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) { //足夠新 context.setLastVotedFor(request.candidate()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) //接受 .build(); } else { return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } } // If we already voted for the requesting server, respond successfully. else if (context.getLastVotedFor() == request.candidate()) { //之前選中也是該candidator LOGGER.debug("{} - Accepted {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) //接受 .build(); } // In this case, we've already voted for someone else. else { LOGGER.debug("{} - Rejected {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } }
FollowerState
關鍵邏輯是等待超時,並試圖成為candidator
public synchronized CompletableFuture<ServerState> open() { return super.open().thenRun(this::startHeartbeatTimeout).thenApply(v -> this); }
在open的時候,異步run,thenRun就是不依賴前麵的輸入
startHeartbeatTimeout –> resetHeartbeatTimeout
private void resetHeartbeatTimeout() { // Set the election timeout in a semi-random fashion with the random range // being election timeout and 2 * election timeout. Duration delay = context.getElectionTimeout().plus(Duration.ofMillis(random.nextInt((int) context.getElectionTimeout().toMillis()))); //隨機產生delay時間 heartbeatTimer = context.getThreadContext().schedule(delay, () -> { //delay時間到後 heartbeatTimer = null; if (isOpen()) { context.setLeader(0); //清空leader sendPollRequests(); //發送poll請求 } }); }
當超時結束時,是否我就可以成為candidator,raft論文裏麵限製,必須要具有最新commit的member才能成為candidator,
那麼我怎麼知道我是否具有最新的commit
發送poll請求,majority都同意,說明我的commit比大多數都要新或一樣新,說明我具有最新的commit
注意這個resetHeartbeatTimeout,不光在這裏調用,基本在Follower所有的請求響應時都會調用,即如果和leader有交互,會不停的重啟這個timer
隻有接收不到leader的心跳了,才會調用sendPollRequests,試圖成為candidator
/** * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state. */ private void sendPollRequests() { // Create a quorum that will track the number of nodes that have responded to the poll request. final AtomicBoolean complete = new AtomicBoolean(); final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList())); //找到所有active的members,並且生成Quorum final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { // If a majority of the cluster indicated they would vote for us then transition to candidate. complete.set(true); if (elected) { //如果elected成功 context.transition(CopycatServer.State.CANDIDATE); //遷移到candidate } else { resetHeartbeatTimeout(); } }); // Once we got the last log term, iterate through each current member // of the cluster and vote each member for a vote. for (ServerMember member : votingMembers) { LOGGER.debug("{} - Polling {} for next term {}", context.getCluster().member().address(), member, context.getTerm() + 1); PollRequest request = PollRequest.builder() .withTerm(context.getTerm()) .withCandidate(context.getCluster().member().id()) .withLogIndex(lastIndex) //當前我的lastindex .withLogTerm(lastTerm) //當前我的lastTerm,別人需要根據index和term來決定是否poll我 .build(); context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> { connection.<PollRequest, PollResponse>send(request).whenCompleteAsync((response, error) -> { //異步發送request,並且加上callback context.checkThread(); if (isOpen() && !complete.get()) { if (error != null) { LOGGER.warn("{} - {}", context.getCluster().member().address(), error.getMessage()); quorum.fail(); } else { if (response.term() > context.getTerm()) { context.setTerm(response.term()); } if (!response.accepted()) { LOGGER.debug("{} - Received rejected poll from {}", context.getCluster().member().address(), member); quorum.fail(); } else if (response.term() != context.getTerm()) { LOGGER.debug("{} - Received accepted poll for a different term from {}", context.getCluster().member().address(), member); quorum.fail(); } else { LOGGER.debug("{} - Received accepted poll from {}", context.getCluster().member().address(), member); quorum.succeed(); //各種錯誤後,隻有這個表示對方poll我了 } } } }, context.getThreadContext().executor()); }); } }
quorum的實現比較簡單,
public class Quorum { private final int quorum; private int succeeded = 1; private int failed; private Consumer<Boolean> callback; private boolean complete; public Quorum(int quorum, Consumer<Boolean> callback) { this.quorum = quorum; this.callback = callback; } private void checkComplete() { if (!complete && callback != null) { if (succeeded >= quorum) { complete = true; callback.accept(true); } else if (failed >= quorum) { complete = true; callback.accept(false); } } } /** * Indicates that a call in the quorum succeeded. */ public Quorum succeed() { succeeded++; checkComplete(); return this; }
succeed就+1,並checkComplete,當成功的數目大於quorum就調用callback
FollowerState的append請求響應
@Override public CompletableFuture<AppendResponse> append(AppendRequest request) { CompletableFuture<AppendResponse> future = super.append(request); // Reset the heartbeat timeout. resetHeartbeatTimeout(); // Send AppendEntries requests to passive members if necessary. appender.appendEntries(); return future; }
可以看到除了調用super的append
以及resetHB外,還有appender.appendEntries();
這應該是Follower需要承擔起,把數據同步給passive的責任
final class FollowerAppender extends AbstractAppender { public FollowerAppender(ServerContext context) { super(context); } /** * Sends append requests to assigned passive members. */ public void appendEntries() { if (open) { for (MemberState member : context.getClusterState().getAssignedPassiveMemberStates()) { appendEntries(member); } } }
CandidateState
candidate的邏輯就是通過vote,變成leader
public synchronized CompletableFuture<ServerState> open() { return super.open().thenRun(this::startElection).thenApply(v -> this); }
startElection-> sendVoteRequests
/** * Resets the election timer. */ private void sendVoteRequests() { // When the election timer is reset, increment the current term and // restart the election. context.setTerm(context.getTerm() + 1).setLastVotedFor(context.getCluster().member().id()); //重新選舉,所以term+1;setLastVoteFor,設成self,先投自己一票 final AtomicBoolean complete = new AtomicBoolean(); final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList())); // Send vote requests to all nodes. The vote request that is sent // to this node will be automatically successful. // First check if the quorum is null. If the quorum isn't null then that // indicates that another vote is already going on. final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { complete.set(true); if (elected) { context.transition(CopycatServer.State.LEADER); //vote成功,就是leader } else { context.transition(CopycatServer.State.FOLLOWER); } }); // Once we got the last log term, iterate through each current member // of the cluster and vote each member for a vote. for (ServerMember member : votingMembers) { LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm()); VoteRequest request = VoteRequest.builder() .withTerm(context.getTerm()) .withCandidate(context.getCluster().member().id()) .withLogIndex(lastIndex) .withLogTerm(lastTerm) .build(); context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> { connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> { context.checkThread(); if (isOpen() && !complete.get()) { if (error != null) { LOGGER.warn(error.getMessage()); quorum.fail(); } else { if (response.term() > context.getTerm()) { LOGGER.debug("{} - Received greater term from {}", context.getCluster().member().address(), member); context.setTerm(response.term()); complete.set(true); context.transition(CopycatServer.State.FOLLOWER); } else if (!response.voted()) { LOGGER.debug("{} - Received rejected vote from {}", context.getCluster().member().address(), member); quorum.fail(); } else if (response.term() != context.getTerm()) { LOGGER.debug("{} - Received successful vote for a different term from {}", context.getCluster().member().address(), member); quorum.fail(); } else { LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member); quorum.succeed(); } } } }, context.getThreadContext().executor()); }); } }
響應append請求,
public CompletableFuture<AppendResponse> append(AppendRequest request) { context.checkThread(); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and step down as a candidate. if (request.term() >= context.getTerm()) { //如果term比我的大,說明已經有leader context.setTerm(request.term()); context.transition(CopycatServer.State.FOLLOWER); //退化成follower } return super.append(request); }
響應vote請求,
@Override public CompletableFuture<VoteResponse> vote(VoteRequest request) { context.checkThread(); logRequest(request); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and step down as a candidate. if (updateTermAndLeader(request.term(), 0)) { //如果request term比我大,說明已經有leader CompletableFuture<VoteResponse> future = super.vote(request); context.transition(CopycatServer.State.FOLLOWER); //退化成follower return future; } // If the vote request is not for this candidate then reject the vote. if (request.candidate() == context.getCluster().member().id()) { //否則,隻有request的candidate id是我,我才同意 return CompletableFuture.completedFuture(logResponse(VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) .build())); } else { //candidate不會同意其他的candidate return CompletableFuture.completedFuture(logResponse(VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build())); } }
LeaderState
leaderState比較複雜單獨開個blog
最後更新:2017-04-07 21:25:10