閱讀890 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Copycat - 狀態

Member.Status

status的變遷是源於heartbeat

heartbeat,append空的entries

複製代碼
/**
   * Triggers a heartbeat to a majority of the cluster.
   * <p>
   * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be
   * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()
   * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever
   * result in a single AppendRequest to each follower at any given time, and the returned future will be
   * shared by all concurrent calls.
   *
   * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.
   */
  public CompletableFuture<Long> appendEntries() {
    // If there are no other active members in the cluster, simply complete the append operation.
    if (context.getClusterState().getRemoteMemberStates().isEmpty())
      return CompletableFuture.completedFuture(null);

    // If no heartbeat future already exists, that indicates there's no heartbeat currently under way.
    // Create a new heartbeat future and commit to all members in the cluster.
    if (heartbeatFuture == null) {
      CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();
      heartbeatFuture = newHeartbeatFuture;
      heartbeatTime = System.currentTimeMillis();
      for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
        appendEntries(member); // 對所有member發起appendEntries
      }
      return newHeartbeatFuture;
    }
複製代碼

heartbeat的邏輯是會向所有的getRemoteMemberStates,發起heartbeat

 

AVAILABLE

在初始化的時候,每個ServerMember默認是Status.AVAILABLE

public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {
  private Member.Type type;
  private Status status = Status.AVAILABLE;

 

LeaderAppender

複製代碼
@Override
  protected void succeedAttempt(MemberState member) {
    super.succeedAttempt(member);

    // If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration.
    if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) {
      member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now());
      leader.configure(context.getCluster().members());
    }
  }
複製代碼

 

在succeedAttempt裏麵會將unavailable轉換成available;在super.succeedAttempt中會將fail count清空

 

這個當收到AppendResponseOk的時候會調用,

protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
    // Reset the member failure count and update the member's availability status if necessary.
    succeedAttempt(member);

leader的心跳是通過空AppendResponse實現的,所以可以收到ResponseOK,說明member是available的

 

UNAVAILABLE

在fail Attempt中被調用

複製代碼
@Override
  protected void failAttempt(MemberState member, Throwable error) {
    super.failAttempt(member, error);

    // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
    // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
    // that a partition occurred and transition back to the FOLLOWER state.
    if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
      LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
      context.setLeader(0);
      context.transition(CopycatServer.State.FOLLOWER);
    }
    // If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so.
    else if (member.getFailureCount() >= 3) {
      // If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration.
      if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) {
        member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now());
        leader.configure(context.getCluster().members());
      }
    }
  }
複製代碼

super.failAttempt中,會重置connection,和increase failcount

member.incrementFailureCount();

 

第一個判斷Math.max(heartbeatTime(), leaderTime)

heartbeatTime

複製代碼
/**
   * Returns the last time a majority of the cluster was contacted.
   * <p>
   * This is calculated by sorting the list of active members and getting the last time the majority of
   * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE
   * members, index 1 (the second member) will be used to determine the commit time in a sorted members list.
   */
  private long heartbeatTime() {
    int quorumIndex = quorumIndex();
    if (quorumIndex >= 0) {
      return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();
    }
    return System.currentTimeMillis();
  }
複製代碼

這個意思將ActiveMember按heartbeat排序,然後取出quorumIndex的heartbeat,即多數派中最早的heartbeat 
如果leader收到的有效heartbeat達不到多數派,說明發生腦裂

這時,leader會退化成follower

 

第二個判斷,當一個member的failcount>3,就把他標記為UNAVAILABLE

 

而failAttempt,會在各種fail response裏麵被調用

AbstractAppender
handleAppendRequestFailure,
handleAppendResponseFailure,
handleConfigureRequestFailure,
handleInstallRequestFailure

 

 

CopycatServer.State

 

複製代碼
public enum State {

    /**
     * Represents the state of an inactive server.
     * <p>
     * All servers start in this state and return to this state when {@link #leave() stopped}.
     */
    INACTIVE,

    /**
     * Represents the state of a server that is a reserve member of the cluster.
     * <p>
     * Reserve servers only receive notification of leader, term, and configuration changes.
     */
    RESERVE,

    /**
     * Represents the state of a server in the process of catching up its log.
     * <p>
     * Upon successfully joining an existing cluster, the server will transition to the passive state and remain there
     * until the leader determines that the server has caught up enough to be promoted to a full member.
     */
    PASSIVE,

    /**
     * Represents the state of a server participating in normal log replication.
     * <p>
     * The follower state is a standard Raft state in which the server receives replicated log entries from the leader.
     */
    FOLLOWER,

    /**
     * Represents the state of a server attempting to become the leader.
     * <p>
     * When a server in the follower state fails to receive communication from a valid leader for some time period,
     * the follower will transition to the candidate state. During this period, the candidate requests votes from
     * each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority
     * of the cluster, it will transition to the leader state.
     */
    CANDIDATE,

    /**
     * Represents the state of a server which is actively coordinating and replicating logs with other servers.
     * <p>
     * Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can
     * exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}.
     */
    LEADER

  }
複製代碼

 

在serverContext初始化的時候,state為Inactive

public class ServerContext implements AutoCloseable {
  //......
  protected ServerState state = new InactiveState(this);

 

比較tricky的是,在Member裏麵有,

複製代碼
enum Type {

    /**
     * Represents an inactive member.
     * <p>
     * The {@code INACTIVE} member type represents a member which does not participate in any communication
     * and is not an active member of the cluster. This is typically the state of a member prior to joining
     * or after leaving a cluster.
     */
    INACTIVE,

    /**
     * Represents a member which does not participate in replication.
     * <p>
     * The {@code RESERVE} member type is representative of a member that does not participate in any
     * replication of state but only maintains contact with the cluster leader and is an active member
     * of the {@link Cluster}. Typically, reserve members act as standby nodes which can be
     * {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed.
     */
    RESERVE,

    /**
     * Represents a member which participates in asynchronous replication but does not vote in elections
     * or otherwise participate in the Raft consensus algorithm.
     * <p>
     * The {@code PASSIVE} member type is representative of a member that receives state changes from
     * follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes,
     * committed state changes are asynchronously replicated by followers to passive members. This allows
     * passive members to maintain nearly up-to-date state with minimal impact on the performance of the
     * Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting
     * members if necessary.
     */
    PASSIVE,

    /**
     * Represents a full voting member of the Raft cluster which participates fully in leader election
     * and replication algorithms.
     * <p>
     * The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members
     * participate in the Raft leader election and replication algorithms and can themselves be elected
     * leaders.
     */
    ACTIVE,

  }
複製代碼

看看不同,這裏麵有Active,而State裏麵沒有

除此state包含type;

意思是,memeber可以是inactive,reserve,passive和active

當member是inactive,reserve,passive時,那麼server的state也和其相應

當member是active時,那麼server的state,可能是follower,candidator或leader其中之一

 

在CopycatServer.builder中,

public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> {
  //......
  private Member.Type type = Member.Type.ACTIVE;

 

而注意,transition是根據Member.type,來transition state的

複製代碼
/**
   * Transitions the server to the base state for the given member type.
   */
  protected void transition(Member.Type type) {
    switch (type) {
      case ACTIVE:
        if (!(state instanceof ActiveState)) {
          transition(CopycatServer.State.FOLLOWER);
        }
        break;
      case PASSIVE:
        if (this.state.type() != CopycatServer.State.PASSIVE) {
          transition(CopycatServer.State.PASSIVE);
        }
        break;
      case RESERVE:
        if (this.state.type() != CopycatServer.State.RESERVE) {
          transition(CopycatServer.State.RESERVE);
        }
        break;
      default:
        if (this.state.type() != CopycatServer.State.INACTIVE) {
          transition(CopycatServer.State.INACTIVE);
        }
        break;
    }
  }
複製代碼

注意Active的處理,

當Member.type為active,如果這個時候state不是ActiveState,就transition到follower;顯然candidator和leader不是能直接transition過去的

 

可以看到上麵ServerContext在初始化的時候,state的初始狀態是inactive 
何時會變成active,

在server bootstrap或join一個cluster時, 都會調用ClusterState.join,裏麵會做狀態的transition

複製代碼
@Override
  public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {

    if (configuration == null) {
      if (member.type() != Member.Type.ACTIVE) {
        return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
      } else {
        // Create a set of active members.
        Set<Member> activeMembers = cluster.stream()
          .filter(m -> !m.equals(member.serverAddress()))
          .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
          .collect(Collectors.toSet());

        // Add the local member to the set of active members.
        activeMembers.add(member);

        // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
        configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
      }
    }
    return join();
  }
複製代碼

 

複製代碼
@Override
  public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {

    // If no configuration was loaded from disk, create a new configuration.
    if (configuration == null) {
      // Create a set of cluster members, excluding the local member which is joining a cluster.
      Set<Member> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.serverAddress()))
        .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
        .collect(Collectors.toSet());

      // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
      // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
      configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置
    }
    return join();
  }

  /**
   * Starts the join to the cluster.
   */
  private synchronized CompletableFuture<Void> join() {
    joinFuture = new CompletableFuture<>();

    context.getThreadContext().executor().execute(() -> {
      // Transition the server to the appropriate state for the local member type.
      context.transition(member.type()); //transition state

      // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
      // will result in the member attempting to get elected. This allows initial clusters to form.
      List<MemberState> activeMembers = getActiveMemberStates();
      if (!activeMembers.isEmpty()) {
        join(getActiveMemberStates().iterator());
      } else {
        joinFuture.complete(null);
      }
    });
複製代碼

 

 

下麵看看leader,candidator和follower之間的轉化條件,

Leader

隻有當Candidator發起vote,得到majority同意時,

context.transition(CopycatServer.State.LEADER)
複製代碼
/**
   * Resets the election timer.
   */
  private void sendVoteRequests() {
    //.........
    // Send vote requests to all nodes. The vote request that is sent
    // to this node will be automatically successful.
    // First check if the quorum is null. If the quorum isn't null then that
    // indicates that another vote is already going on.
    final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
      complete.set(true);
      if (elected) {
        context.transition(CopycatServer.State.LEADER); //checkComplete()調用
      } else {
        context.transition(CopycatServer.State.FOLLOWER);
      }
    });

    // Once we got the last log term, iterate through each current member
    // of the cluster and vote each member for a vote.
    for (ServerMember member : votingMembers) {
      LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());
      VoteRequest request = VoteRequest.builder()
        .withTerm(context.getTerm())
        .withCandidate(context.getCluster().member().id())
        .withLogIndex(lastIndex)
        .withLogTerm(lastTerm)
        .build();

      context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
        connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {
          context.checkThread();
          if (isOpen() && !complete.get()) {
            if (error != null) {
              LOGGER.warn(error.getMessage());
              quorum.fail();
            } else {
                //........
              } else {
                LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);
                quorum.succeed(); //member同意,succeeded++;checkComplete();
              }
            }
          }
        }, context.getThreadContext().executor());
      });
複製代碼

 

Candidator

隻有當Follower發起Poll請求,並得到majority的同意後,

複製代碼
  /**
   * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
   */
  private void sendPollRequests() {
   final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
      // If a majority of the cluster indicated they would vote for us then transition to candidate.
      complete.set(true);
      if (elected) {
        context.transition(CopycatServer.State.CANDIDATE);
      } else {
        resetHeartbeatTimeout();
      }
    });
    
    //......
複製代碼

 

Follower

Leader –> Follower

在LeaderAppender中,由於heartbeat觸發

複製代碼
/**
   * Handles a {@link Response.Status#OK} response.
   */
  protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
    //......
    // If we've received a greater term, update the term and transition back to follower.
    else if (response.term() > context.getTerm()) {
      context.setTerm(response.term()).setLeader(0);
      context.transition(CopycatServer.State.FOLLOWER);
    }
複製代碼

如果收到Response OK,但是response的term大於我的term,說明我已經不是leader了 
所以要退化成follower

複製代碼
/**
   * Handles a {@link Response.Status#ERROR} response.
   */
  protected void handleAppendResponseError(MemberState member, AppendRequest request, AppendResponse response) {
    // If we've received a greater term, update the term and transition back to follower.
    if (response.term() > context.getTerm()) {
      context.setTerm(response.term()).setLeader(0);
      context.transition(CopycatServer.State.FOLLOWER);
複製代碼

對於ResponseError也一樣

複製代碼
@Override
  protected void failAttempt(MemberState member, Throwable error) {
    super.failAttempt(member, error);

    // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
    // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
    // that a partition occurred and transition back to the FOLLOWER state.
    if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
      LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
      context.setLeader(0);
      context.transition(CopycatServer.State.FOLLOWER);
    }
複製代碼

failAttemp時,兩個getElectionTimeout超時內,收不到majority的heartbeat,說明發生partition 
退化成follower

 

在LeaderState中,

leader初始化失敗時,

複製代碼
/**
   * Commits a no-op entry to the log, ensuring any entries from a previous term are committed.
   */
  private CompletableFuture<Void> commitInitialEntries() {
    // The Raft protocol dictates that leaders cannot commit entries from previous terms until
    // at least one entry from their current term has been stored on a majority of servers. Thus,
    // we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure
    // that the commitIndex is not increased until the no-op entry (appender.index()) is committed.
    CompletableFuture<Void> future = new CompletableFuture<>();
    appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> {
      context.checkThread();
      if (isOpen()) {
        if (error == null) {
          context.getStateMachine().apply(resultIndex);
          future.complete(null);
        } else {
          context.setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
      }
    });
    return future;
  }
複製代碼

也會退化為follower

 

Candidator –> Follower

Vote失敗時,退化為follower

複製代碼
/**
   * Resets the election timer.
   */
  private void sendVoteRequests() {
    //......
    // Send vote requests to all nodes. The vote request that is sent
    // to this node will be automatically successful.
    // First check if the quorum is null. If the quorum isn't null then that
    // indicates that another vote is already going on.
    final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
      complete.set(true);
      if (elected) {
        context.transition(CopycatServer.State.LEADER);
      } else {
        context.transition(CopycatServer.State.FOLLOWER); //沒被選中
      }
    });
複製代碼

 

ActiveState –> Follower

包含LeaderState,CandidatorState,在響應vote,append請求時,都會下麵的邏輯

複製代碼
    // If the request indicates a term that is greater than the current term then
    // assign that term and leader to the current context and transition to follower.
    boolean transition = updateTermAndLeader(request.term(), request.leader());
    
    // If a transition is required then transition back to the follower state.
    // If the node is already a follower then the transition will be ignored.
    if (transition) {
      context.transition(CopycatServer.State.FOLLOWER);
    }
複製代碼

 

複製代碼
/**
   * Updates the term and leader.
   */
  protected boolean updateTermAndLeader(long term, int leader) {
    // If the request indicates a term that is greater than the current term or no leader has been
    // set for the current term, update leader and term.
    if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {
      context.setTerm(term);
      context.setLeader(leader);

      // Reset the current cluster configuration to the last committed configuration when a leader change occurs.
      context.getClusterState().reset();
      return true;
    }
    return false;
  }
複製代碼

最後更新:2017-04-07 21:25:10

  上一篇:go 改變iOS app的icon(iOS10.3)
  下一篇:go Java8 CompletableFuture