Copycat - 状态
Member.Status
status的变迁是源于heartbeat
heartbeat,append空的entries
/** * Triggers a heartbeat to a majority of the cluster. * <p> * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries() * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever * result in a single AppendRequest to each follower at any given time, and the returned future will be * shared by all concurrent calls. * * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster. */ public CompletableFuture<Long> appendEntries() { // If there are no other active members in the cluster, simply complete the append operation. if (context.getClusterState().getRemoteMemberStates().isEmpty()) return CompletableFuture.completedFuture(null); // If no heartbeat future already exists, that indicates there's no heartbeat currently under way. // Create a new heartbeat future and commit to all members in the cluster. if (heartbeatFuture == null) { CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>(); heartbeatFuture = newHeartbeatFuture; heartbeatTime = System.currentTimeMillis(); for (MemberState member : context.getClusterState().getRemoteMemberStates()) { appendEntries(member); // 对所有member发起appendEntries } return newHeartbeatFuture; }
heartbeat的逻辑是会向所有的getRemoteMemberStates,发起heartbeat
AVAILABLE
在初始化的时候,每个ServerMember默认是Status.AVAILABLE
public final class ServerMember implements Member, CatalystSerializable, AutoCloseable { private Member.Type type; private Status status = Status.AVAILABLE;
LeaderAppender
@Override protected void succeedAttempt(MemberState member) { super.succeedAttempt(member); // If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration. if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) { member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now()); leader.configure(context.getCluster().members()); } }
在succeedAttempt里面会将unavailable转换成available;在super.succeedAttempt中会将fail count清空
这个当收到AppendResponseOk的时候会调用,
protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) { // Reset the member failure count and update the member's availability status if necessary. succeedAttempt(member);
leader的心跳是通过空AppendResponse实现的,所以可以收到ResponseOK,说明member是available的
UNAVAILABLE
在fail Attempt中被调用
@Override protected void failAttempt(MemberState member, Throwable error) { super.failAttempt(member, error); // Verify that the leader has contacted a majority of the cluster within the last two election timeouts. // If the leader is not able to contact a majority of the cluster within two election timeouts, assume // that a partition occurred and transition back to the FOLLOWER state. if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) { LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address()); context.setLeader(0); context.transition(CopycatServer.State.FOLLOWER); } // If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so. else if (member.getFailureCount() >= 3) { // If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration. if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) { member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now()); leader.configure(context.getCluster().members()); } } }
super.failAttempt中,会重置connection,和increase failcount
member.incrementFailureCount();
第一个判断Math.max(heartbeatTime(), leaderTime)
heartbeatTime
/** * Returns the last time a majority of the cluster was contacted. * <p> * This is calculated by sorting the list of active members and getting the last time the majority of * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE * members, index 1 (the second member) will be used to determine the commit time in a sorted members list. */ private long heartbeatTime() { int quorumIndex = quorumIndex(); if (quorumIndex >= 0) { return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime(); } return System.currentTimeMillis(); }
这个意思将ActiveMember按heartbeat排序,然后取出quorumIndex的heartbeat,即多数派中最早的heartbeat
如果leader收到的有效heartbeat达不到多数派,说明发生脑裂
这时,leader会退化成follower
第二个判断,当一个member的failcount>3,就把他标记为UNAVAILABLE
而failAttempt,会在各种fail response里面被调用
AbstractAppender
handleAppendRequestFailure,
handleAppendResponseFailure,
handleConfigureRequestFailure,
handleInstallRequestFailure
CopycatServer.State
public enum State { /** * Represents the state of an inactive server. * <p> * All servers start in this state and return to this state when {@link #leave() stopped}. */ INACTIVE, /** * Represents the state of a server that is a reserve member of the cluster. * <p> * Reserve servers only receive notification of leader, term, and configuration changes. */ RESERVE, /** * Represents the state of a server in the process of catching up its log. * <p> * Upon successfully joining an existing cluster, the server will transition to the passive state and remain there * until the leader determines that the server has caught up enough to be promoted to a full member. */ PASSIVE, /** * Represents the state of a server participating in normal log replication. * <p> * The follower state is a standard Raft state in which the server receives replicated log entries from the leader. */ FOLLOWER, /** * Represents the state of a server attempting to become the leader. * <p> * When a server in the follower state fails to receive communication from a valid leader for some time period, * the follower will transition to the candidate state. During this period, the candidate requests votes from * each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority * of the cluster, it will transition to the leader state. */ CANDIDATE, /** * Represents the state of a server which is actively coordinating and replicating logs with other servers. * <p> * Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can * exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}. */ LEADER }
在serverContext初始化的时候,state为Inactive
public class ServerContext implements AutoCloseable { //...... protected ServerState state = new InactiveState(this);
比较tricky的是,在Member里面有,
enum Type { /** * Represents an inactive member. * <p> * The {@code INACTIVE} member type represents a member which does not participate in any communication * and is not an active member of the cluster. This is typically the state of a member prior to joining * or after leaving a cluster. */ INACTIVE, /** * Represents a member which does not participate in replication. * <p> * The {@code RESERVE} member type is representative of a member that does not participate in any * replication of state but only maintains contact with the cluster leader and is an active member * of the {@link Cluster}. Typically, reserve members act as standby nodes which can be * {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed. */ RESERVE, /** * Represents a member which participates in asynchronous replication but does not vote in elections * or otherwise participate in the Raft consensus algorithm. * <p> * The {@code PASSIVE} member type is representative of a member that receives state changes from * follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes, * committed state changes are asynchronously replicated by followers to passive members. This allows * passive members to maintain nearly up-to-date state with minimal impact on the performance of the * Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting * members if necessary. */ PASSIVE, /** * Represents a full voting member of the Raft cluster which participates fully in leader election * and replication algorithms. * <p> * The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members * participate in the Raft leader election and replication algorithms and can themselves be elected * leaders. */ ACTIVE, }
看看不同,这里面有Active,而State里面没有
除此state包含type;
意思是,memeber可以是inactive,reserve,passive和active
当member是inactive,reserve,passive时,那么server的state也和其相应
当member是active时,那么server的state,可能是follower,candidator或leader其中之一
在CopycatServer.builder中,
public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> { //...... private Member.Type type = Member.Type.ACTIVE;
而注意,transition是根据Member.type,来transition state的
/** * Transitions the server to the base state for the given member type. */ protected void transition(Member.Type type) { switch (type) { case ACTIVE: if (!(state instanceof ActiveState)) { transition(CopycatServer.State.FOLLOWER); } break; case PASSIVE: if (this.state.type() != CopycatServer.State.PASSIVE) { transition(CopycatServer.State.PASSIVE); } break; case RESERVE: if (this.state.type() != CopycatServer.State.RESERVE) { transition(CopycatServer.State.RESERVE); } break; default: if (this.state.type() != CopycatServer.State.INACTIVE) { transition(CopycatServer.State.INACTIVE); } break; } }
注意Active的处理,
当Member.type为active,如果这个时候state不是ActiveState,就transition到follower;显然candidator和leader不是能直接transition过去的
可以看到上面ServerContext在初始化的时候,state的初始状态是inactive
何时会变成active,
在server bootstrap或join一个cluster时, 都会调用ClusterState.join,里面会做状态的transition
@Override public CompletableFuture<Void> bootstrap(Collection<Address> cluster) { if (configuration == null) { if (member.type() != Member.Type.ACTIVE) { return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster")); } else { // Create a set of active members. Set<Member> activeMembers = cluster.stream() .filter(m -> !m.equals(member.serverAddress())) .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) .collect(Collectors.toSet()); // Add the local member to the set of active members. activeMembers.add(member); // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration. configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); } } return join(); }
@Override public synchronized CompletableFuture<Void> join(Collection<Address> cluster) { // If no configuration was loaded from disk, create a new configuration. if (configuration == null) { // Create a set of cluster members, excluding the local member which is joining a cluster. Set<Member> activeMembers = cluster.stream() .filter(m -> !m.equals(member.serverAddress())) .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) .collect(Collectors.toSet()); // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary. configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置 } return join(); } /** * Starts the join to the cluster. */ private synchronized CompletableFuture<Void> join() { joinFuture = new CompletableFuture<>(); context.getThreadContext().executor().execute(() -> { // Transition the server to the appropriate state for the local member type. context.transition(member.type()); //transition state // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster // will result in the member attempting to get elected. This allows initial clusters to form. List<MemberState> activeMembers = getActiveMemberStates(); if (!activeMembers.isEmpty()) { join(getActiveMemberStates().iterator()); } else { joinFuture.complete(null); } });
下面看看leader,candidator和follower之间的转化条件,
Leader
只有当Candidator发起vote,得到majority同意时,
context.transition(CopycatServer.State.LEADER)
/** * Resets the election timer. */ private void sendVoteRequests() { //......... // Send vote requests to all nodes. The vote request that is sent // to this node will be automatically successful. // First check if the quorum is null. If the quorum isn't null then that // indicates that another vote is already going on. final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { complete.set(true); if (elected) { context.transition(CopycatServer.State.LEADER); //checkComplete()调用 } else { context.transition(CopycatServer.State.FOLLOWER); } }); // Once we got the last log term, iterate through each current member // of the cluster and vote each member for a vote. for (ServerMember member : votingMembers) { LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm()); VoteRequest request = VoteRequest.builder() .withTerm(context.getTerm()) .withCandidate(context.getCluster().member().id()) .withLogIndex(lastIndex) .withLogTerm(lastTerm) .build(); context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> { connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> { context.checkThread(); if (isOpen() && !complete.get()) { if (error != null) { LOGGER.warn(error.getMessage()); quorum.fail(); } else { //........ } else { LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member); quorum.succeed(); //member同意,succeeded++;checkComplete(); } } } }, context.getThreadContext().executor()); });
Candidator
只有当Follower发起Poll请求,并得到majority的同意后,
/** * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state. */ private void sendPollRequests() { final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { // If a majority of the cluster indicated they would vote for us then transition to candidate. complete.set(true); if (elected) { context.transition(CopycatServer.State.CANDIDATE); } else { resetHeartbeatTimeout(); } }); //......
Follower
Leader –> Follower
在LeaderAppender中,由于heartbeat触发
/** * Handles a {@link Response.Status#OK} response. */ protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) { //...... // If we've received a greater term, update the term and transition back to follower. else if (response.term() > context.getTerm()) { context.setTerm(response.term()).setLeader(0); context.transition(CopycatServer.State.FOLLOWER); }
如果收到Response OK,但是response的term大于我的term,说明我已经不是leader了
所以要退化成follower
/** * Handles a {@link Response.Status#ERROR} response. */ protected void handleAppendResponseError(MemberState member, AppendRequest request, AppendResponse response) { // If we've received a greater term, update the term and transition back to follower. if (response.term() > context.getTerm()) { context.setTerm(response.term()).setLeader(0); context.transition(CopycatServer.State.FOLLOWER);
对于ResponseError也一样
@Override protected void failAttempt(MemberState member, Throwable error) { super.failAttempt(member, error); // Verify that the leader has contacted a majority of the cluster within the last two election timeouts. // If the leader is not able to contact a majority of the cluster within two election timeouts, assume // that a partition occurred and transition back to the FOLLOWER state. if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) { LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address()); context.setLeader(0); context.transition(CopycatServer.State.FOLLOWER); }
failAttemp时,两个getElectionTimeout超时内,收不到majority的heartbeat,说明发生partition
退化成follower
在LeaderState中,
leader初始化失败时,
/** * Commits a no-op entry to the log, ensuring any entries from a previous term are committed. */ private CompletableFuture<Void> commitInitialEntries() { // The Raft protocol dictates that leaders cannot commit entries from previous terms until // at least one entry from their current term has been stored on a majority of servers. Thus, // we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure // that the commitIndex is not increased until the no-op entry (appender.index()) is committed. CompletableFuture<Void> future = new CompletableFuture<>(); appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> { context.checkThread(); if (isOpen()) { if (error == null) { context.getStateMachine().apply(resultIndex); future.complete(null); } else { context.setLeader(0); context.transition(CopycatServer.State.FOLLOWER); } } }); return future; }
也会退化为follower
Candidator –> Follower
Vote失败时,退化为follower
/** * Resets the election timer. */ private void sendVoteRequests() { //...... // Send vote requests to all nodes. The vote request that is sent // to this node will be automatically successful. // First check if the quorum is null. If the quorum isn't null then that // indicates that another vote is already going on. final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { complete.set(true); if (elected) { context.transition(CopycatServer.State.LEADER); } else { context.transition(CopycatServer.State.FOLLOWER); //没被选中 } });
ActiveState –> Follower
包含LeaderState,CandidatorState,在响应vote,append请求时,都会下面的逻辑
// If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and transition to follower. boolean transition = updateTermAndLeader(request.term(), request.leader()); // If a transition is required then transition back to the follower state. // If the node is already a follower then the transition will be ignored. if (transition) { context.transition(CopycatServer.State.FOLLOWER); }
/** * Updates the term and leader. */ protected boolean updateTermAndLeader(long term, int leader) { // If the request indicates a term that is greater than the current term or no leader has been // set for the current term, update leader and term. if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) { context.setTerm(term); context.setLeader(leader); // Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset(); return true; } return false; }
最后更新:2017-04-07 21:25:10