Copycat - command
client.submit(new PutCommand("foo", "Hello world!"));
ServerContext
connection.handler(CommandRequest.class, request -> state.command(request));
State.command
ReserveState開始,會把command forward到leader,隻有leader可以處理command
@Override public CompletableFuture<CommandResponse> command(CommandRequest request) { context.checkThread(); logRequest(request); if (context.getLeader() == null) { return CompletableFuture.completedFuture(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build())); } else { return this.<CommandRequest, CommandResponse>forward(request) .exceptionally(error -> CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build()) .thenApply(this::logResponse); } }
LeaderState.Command
public CompletableFuture<CommandResponse> command(final CommandRequest request) { context.checkThread(); logRequest(request); // Get the client's server session. If the session doesn't exist, return an unknown session error. ServerSessionContext session = context.getStateMachine().executor().context().sessions().getSession(request.session()); if (session == null) { //如果session不存在,無法處理該command return CompletableFuture.completedFuture(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.UNKNOWN_SESSION_ERROR) .build())); } ComposableFuture<CommandResponse> future = new ComposableFuture<>(); sequenceCommand(request, session, future); return future; }
sequenceCommand
/** * Sequences the given command to the log. */ private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { // If the command is LINEARIZABLE and the session's current sequence number is less then one prior to the request // sequence number, queue this request for handling later. We want to handle command requests in the order in which // they were sent by the client. Note that it's possible for the session sequence number to be greater than the request // sequence number. In that case, it's likely that the command was submitted more than once to the // cluster, and the command will be deduplicated once applied to the state machine. if (request.sequence() > session.nextRequestSequence()) { //session中的request需要按sequence執行,所以如果request的sequence num大於我們期望的,說明這個request需要等之前的request先執行 // If the request sequence number is more than 1k requests above the last sequenced request, reject the request. // The client should resubmit a request that fails with a COMMAND_ERROR. if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) { //如果request的sequence大的太多,和當前sequence比,大100以上 future.complete(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.COMMAND_ERROR) //拒絕該command .build()); } // Register the request in the request queue if it's not too far ahead of the current sequence number. else { session.registerRequest(request.sequence(), () -> applyCommand(request, session, future)); //放入queue等待 } } else { applyCommand(request, session, future); //apply該command } }
如果command的request比期望的大,
session.registerRequest
ServerSessionContext
ServerSessionContext registerRequest(long sequence, Runnable runnable) { commands.put(sequence, runnable); return this; }
可以看到會把sequence id和對於的function注冊到commands裏麵,這裏就是applyCommand
問題這個commands會在什麼時候被觸發執行,
ServerSessionContext setRequestSequence(long request) { if (request > this.requestSequence) { this.requestSequence = request; // When the request sequence number is incremented, get the next queued request callback and call it. // This will allow the command request to be evaluated in sequence. Runnable command = this.commands.remove(nextRequestSequence()); if (command != null) { command.run(); } } return this; }
在setRequestSequence的時候,
當set的時候,去commands裏麵看下,是否有下一個request在等待,如果有直接執行掉
applyCommand
/** * Applies the given command to the log. */ private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { final Command command = request.command(); final long term = context.getTerm(); final long timestamp = System.currentTimeMillis(); final long index; // Create a CommandEntry and append it to the log. try (CommandEntry entry = context.getLog().create(CommandEntry.class)) { entry.setTerm(term) .setSession(request.session()) .setTimestamp(timestamp) .setSequence(request.sequence()) .setCommand(command); index = context.getLog().append(entry); //把CommandEntry寫入log LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry); } // Replicate the command to followers. appendCommand(index, future); // Set the last processed request for the session. This will cause sequential command callbacks to be executed. session.setRequestSequence(request.sequence()); //更新session的sequence,這裏會試圖去check session.commands是否有next request }
appendCommand
/** * Sends append requests for a command to followers. */ private void appendCommand(long index, CompletableFuture<CommandResponse> future) { appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到該index context.checkThread(); if (isOpen()) { if (commitError == null) { applyCommand(index, future); //如果成功,applyCommand } else { future.complete(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.INTERNAL_ERROR) .build())); } } }); }
applyCommand,函數名不能換換嗎
/** * Applies a command to the state machine. */ private void applyCommand(long index, CompletableFuture<CommandResponse> future) { context.getStateMachine().<ServerStateMachine.Result>apply(index).whenComplete((result, error) -> { if (isOpen()) { completeOperation(result, CommandResponse.builder(), error, future); } }); }
apply,我收到command首先要把它寫到log裏麵,然後同步給follower,最終,需要去執行command,比如修改狀態機裏麵的值,a=1
ServerContext.getStateMachine(),返回
private ServerStateMachine stateMachine;
這裏調用ServerStateMachine.apply(index)
調用apply(entry)
調用apply((CommandEntry) entry)
private CompletableFuture<Result> apply(CommandEntry entry) { final CompletableFuture<Result> future = new CompletableFuture<>(); final ThreadContext context = ThreadContext.currentContextOrThrow(); //這裏保留當前thread的引用 // First check to ensure that the session exists. ServerSessionContext session = executor.context().sessions().getSession(entry.getSession()); // If the session is null, return an UnknownSessionException. Commands applied to the state machine must // have a session. We ensure that session register/unregister entries are not compacted from the log // until all associated commands have been cleaned. if (session == null) { //session不存在 log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession())); } // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the // session registry until all prior commands have been released by the state machine, but new commands can // only be applied for sessions in an active state. else if (!session.state().active()) { //session的狀態非active log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession())); } // If the command's sequence number is less than the next session sequence number then that indicates that // we've received a command that was previously applied to the state machine. Ensure linearizability by // returning the cached response instead of applying it to the user defined state machine. else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已經apply過的entry // Ensure the response check is executed in the state machine thread in order to ensure the // command was applied, otherwise there will be a race condition and concurrent modification issues. long sequence = entry.getSequence(); // Switch to the state machine thread and get the existing response. executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的結果 return future; } // If we've made it this far, the command must have been applied in the proper order as sequenced by the // session. This should be the case for most commands applied to the state machine. else { // Allow the executor to execute any scheduled events. long index = entry.getIndex(); long sequence = entry.getSequence(); // Calculate the updated timestamp for the command. long timestamp = executor.timestamp(entry.getTimestamp()); // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread. ServerCommit commit = commits.acquire(entry, session, timestamp); //這裏有個ServerCommitPool的實現,為了避免反複生成ServerCommit對象,直接從pool裏麵拿一個,用完放回去 executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context)); // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced // at this index receive the index of the command. setLastApplied(index); // Update the session timestamp and command sequence number. This is done in the caller's thread since all // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread. session.setTimestamp(timestamp).setCommandSequence(sequence); return future; } }
executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);
executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));
注意這裏有兩個線程,
一個是context,是
ThreadContext threadContext
用來響應server請求的
還有一個是executor裏麵的stateContext,用來改變stateMachine的狀態的
所以這裏是用executor來執行executeCommand,但把ThreadContext傳入
/** * Executes a state machine command. */ private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) { // Trigger scheduled callbacks in the state machine. executor.tick(index, timestamp); // Update the state machine context with the commit index and local server context. The synchronous flag // indicates whether the server expects linearizable completion of published events. Events will be published // based on the configured consistency level for the context. executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND); // Store the event index to return in the command response. long eventIndex = session.getEventIndex(); try { // Execute the state machine operation and get the result. Object output = executor.executeOperation(commit); // Once the operation has been applied to the state machine, commit events published by the command. // The state machine context will build a composite future for events published to all sessions. executor.commit(); // Store the result for linearizability and complete the command. Result result = new Result(index, eventIndex, output); session.registerResult(sequence, result); // 緩存執行結果 context.executor().execute(() -> future.complete(result)); // complete future,表示future執行結束 } catch (Exception e) { // If an exception occurs during execution of the command, store the exception. Result result = new Result(index, eventIndex, e); session.registerResult(sequence, result); context.executor().execute(() -> future.complete(result)); } }
ServerStateMachineExecutor.tick
根據時間,去觸發scheduledTasks中已經到時間的task
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) { context.update(index, instant, type); } //ServerStateMachineContext void update(long index, Instant instant, Type type) { this.index = index; this.type = type; clock.set(instant); }
ServerStateMachineExecutor.executeOperation
<T extends Operation<U>, U> U executeOperation(Commit commit) { // Get the function registered for the operation. If no function is registered, attempt to // use a global function if available. Function function = operations.get(commit.type()); //從operations找到type對應的function if (function == null) { // If no operation function was found for the class, try to find an operation function // registered with a parent class. for (Map.Entry<Class, Function> entry : operations.entrySet()) { if (entry.getKey().isAssignableFrom(commit.type())) { //如果注冊的type是commit.type的父類 function = entry.getValue(); break; } } // If a parent operation function was found, store the function for future reference. if (function != null) { operations.put(commit.type(), function); } } if (function == null) { throw new IllegalStateException("unknown state machine operation: " + commit.type()); } else { // Execute the operation. If the operation return value is a Future, await the result, // otherwise immediately complete the execution future. try { return (U) function.apply(commit); //真正執行function } catch (Exception e) { throw new ApplicationException(e, "An application error occurred"); } } }
RequestSequence 和 CommandSequence有什麼不同的,看看都在什麼地方用了?
RequestSequence
Set
ServerStateMachine
private CompletableFuture<Void> apply(KeepAliveEntry entry) {
//…
// Update the session keep alive index for log cleaning.
session.setKeepAliveIndex(entry.getIndex()).setRequestSequence(commandSequence);
}
LeaderState
private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
//……
// Set the last processed request for the session. This will cause sequential command callbacks to be executed.
session.setRequestSequence(request.sequence());
}
Get
ServerSessionContext.setCommandSequence
// If the request sequence number is less than the applied sequence number, update the request // sequence number. This is necessary to ensure that if the local server is a follower that is // later elected leader, its sequences are consistent for commands. if (sequence > requestSequence) { // Only attempt to trigger command callbacks if any are registered. if (!this.commands.isEmpty()) { // For each request sequence number, a command callback completing the command submission may exist. for (long i = this.requestSequence + 1; i <= sequence; i++) { this.requestSequence = i; Runnable command = this.commands.remove(i); if (command != null) { command.run(); } } } else { this.requestSequence = sequence; } }
LeaderState
/** * Sequences the given command to the log. */ private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { // If the command is LINEARIZABLE and the session's current sequence number is less then one prior to the request // sequence number, queue this request for handling later. We want to handle command requests in the order in which // they were sent by the client. Note that it's possible for the session sequence number to be greater than the request // sequence number. In that case, it's likely that the command was submitted more than once to the // cluster, and the command will be deduplicated once applied to the state machine. if (request.sequence() > session.nextRequestSequence()) { // If the request sequence number is more than 1k requests above the last sequenced request, reject the request. // The client should resubmit a request that fails with a COMMAND_ERROR. if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) {
CommandSequence
Set
ServerSessionContext.setCommandSequence
for (long i = commandSequence + 1; i <= sequence; i++) { commandSequence = i; List<Runnable> queries = this.sequenceQueries.remove(commandSequence); if (queries != null) { for (Runnable query : queries) { query.run(); } queries.clear(); queriesPool.add(queries); } }
ServerStateMachine
private CompletableFuture<Result> apply(CommandEntry entry)
// Update the session timestamp and command sequence number. This is done in the caller's thread since all
// timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
session.setTimestamp(timestamp).setCommandSequence(sequence);
Get
LeaderState
sequenceLinearizableQuery, sequenceBoundedLinearizableQuery
/** * Sequences a bounded linearizable query. */ private void sequenceBoundedLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) { // If the query's sequence number is greater than the session's current sequence number, queue the request for // handling once the state machine is caught up. if (entry.getSequence() > session.getCommandSequence()) { session.registerSequenceQuery(entry.getSequence(), () -> applyQuery(entry, future)); } else { applyQuery(entry, future); } }
PassiveState
private void sequenceQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) { // If the query's sequence number is greater than the session's current sequence number, queue the request for // handling once the state machine is caught up. if (entry.getSequence() > session.getCommandSequence()) { session.registerSequenceQuery(entry.getSequence(), () -> indexQuery(entry, future)); } else { indexQuery(entry, future); } }
最後更新:2017-04-07 21:25:10