5 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED 6 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED 20 #include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp> 21 #include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp> 25 namespace dynamic_node_id_server
153 persistent_state_.
getVotedFor() == getNode().getNodeID());
167 last_activity_timestamp_ = getNode().getMonotonicTime();
169 static const int32_t randomization_range_msec = AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS -
170 AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS;
174 randomized_activity_timeout_ =
177 UAVCAN_ASSERT(randomized_activity_timeout_.
toMSec() > AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS);
178 UAVCAN_ASSERT(randomized_activity_timeout_.
toMSec() <= AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS);
183 return getNode().getMonotonicTime() > (last_activity_timestamp_ + randomized_activity_timeout_);
190 switchState(ServerStateFollower);
196 if (isActivityTimedOut())
198 switchState(ServerStateCandidate);
205 if (num_votes_received_in_this_campaign_ > 0)
208 const bool won = num_votes_received_in_this_campaign_ >= cluster_.
getQuorumSize();
210 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
"Election complete, won: %d",
int(won));
212 switchState(won ? ServerStateLeader : ServerStateFollower);
217 int res = persistent_state_.
setVotedFor(getNode().getNodeID());
220 handlePersistentStateUpdateError(res);
228 handlePersistentStateUpdateError(res);
232 num_votes_received_in_this_campaign_ = 1;
234 RequestVote::Request req;
239 for (
uint8_t i = 0; i < MaxNumFollowers; i++)
247 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
248 "Requesting vote from %d",
int(node_id.
get()));
251 res = request_vote_client_.
call(node_id, req);
272 next_server_index_++;
275 next_server_index_ = 0;
278 AppendEntries::Request req;
280 req.leader_commit = commit_index_;
288 handlePersistentStateUpdateError(-
ErrLogic);
292 req.prev_log_term = entry->term;
299 if (req.entries.size() == req.entries.capacity())
305 pending_append_entries_fields_.
num_entries = req.entries.size();
306 pending_append_entries_fields_.
prev_log_index = req.prev_log_index;
308 const int res = append_entries_client_.
call(node_id, req);
315 propagateCommitIndex();
320 if (server_state_ == new_state)
328 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
"State switch: %d --> %d",
329 int(server_state_),
int(new_state));
336 server_state_ = new_state;
343 next_server_index_ = 0;
344 num_votes_received_in_this_campaign_ = 0;
353 if ((old_state == ServerStateLeader) ||
354 (new_state == ServerStateLeader))
369 switchState(ServerStateFollower);
384 uint8_t num_nodes_with_next_log_entry_available = 1;
388 if (match_index > commit_index_)
390 num_nodes_with_next_log_entry_available++;
394 if (num_nodes_with_next_log_entry_available >= cluster_.
getQuorumSize())
436 handlePersistentStateUpdateError(res);
444 handlePersistentStateUpdateError(res);
454 response.success =
false;
467 switchState(ServerStateFollower);
485 if (prev_entry->term != request.prev_log_term)
512 for (
uint8_t i = 0; i < request.entries.size(); i++)
514 const int res = persistent_state_.
getLog().
append(request.entries[i]);
527 if (request.leader_commit > commit_index_)
534 response.success =
true;
549 tryIncrementCurrentTermFromResponse(result.
getResponse().term);
593 switchState(ServerStateFollower);
598 handlePersistentStateUpdateError(res);
606 handlePersistentStateUpdateError(res);
617 if (request.term < response.term)
619 response.vote_granted =
false;
625 const bool log_is_up_to_date =
628 response.vote_granted = can_vote && log_is_up_to_date;
630 if (response.vote_granted)
632 switchState(ServerStateFollower);
660 tryIncrementCurrentTermFromResponse(result.
getResponse().term);
666 num_votes_received_in_this_campaign_++;
677 switch (server_state_)
679 case ServerStateFollower:
684 case ServerStateCandidate:
689 case ServerStateLeader:
709 , leader_monitor_(leader_monitor)
710 , persistent_state_(storage, tracer)
711 , cluster_(node, storage, persistent_state_.getLog(), tracer)
713 , last_activity_timestamp_(node.getMonotonicTime())
714 , randomized_activity_timeout_(
715 MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS))
716 , server_state_(ServerStateFollower)
717 , next_server_index_(0)
718 , num_votes_received_in_this_campaign_(0)
719 , append_entries_srv_(node)
720 , append_entries_client_(node)
721 , request_vote_srv_(node)
722 , request_vote_client_(node)
736 server_state_ = ServerStateFollower;
737 next_server_index_ = 0;
738 num_votes_received_in_this_campaign_ = 0;
746 int res = persistent_state_.
init();
752 res = cluster_.
init(cluster_size, priority);
770 res = append_entries_client_.
init(priority);
778 res = request_vote_client_.
init(priority);
793 2 /
max(static_cast<uint8_t>(2), num_followers));
795 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
796 "Update interval: %ld msec", static_cast<long>(update_interval.
toMSec()));
804 startPeriodic(update_interval);
825 bool isLeader()
const {
return server_state_ == ServerStateLeader; }
837 entry.node_id = node_id.
get();
838 entry.unique_id = unique_id;
842 const int res = persistent_state_.
getLog().
append(entry);
845 handlePersistentStateUpdateError(res);
864 , committed(arg_committed)
877 template <
typename Predicate>
881 for (
int index = static_cast<int>(persistent_state_.
getLog().
getLastIndex()); index >= 0; index--)
889 ret.template construct<const LogEntryInfo&>(
info);
917 #endif // Include guard IRaftLeaderMonitor & leader_monitor_
void appendLog(const Entry::FieldTypes::unique_id &unique_id, NodeID node_id)
uint8_t num_votes_received_in_this_campaign_
TraceRaftPersistStateUpdateError
RaftCore(INode &node, IStorageBackend &storage, IEventTracer &tracer, IRaftLeaderMonitor &leader_monitor)
ServiceCallID getCallID() const
void trace(TraceCode event, int64_t argument)
NodeID getSrcNodeID() const
TraceRaftNewerTermInResponse
int removeEntriesWhereIndexGreater(Index index)
TraceRaftVoteRequestSucceeded
PendingAppendEntriesFields pending_append_entries_fields_
bool isVotedForSet() const
LazyConstructor< LogEntryInfo > traverseLogFromEndUntil(const Predicate &predicate) const
virtual void handleLocalLeadershipChange(bool local_node_is_leader)=0
void resetAllServerIndices()
bool areAllLogEntriesCommitted() const
uint8_t getClusterSize() const
uint8_t getNumKnownServers() const
TraceRaftCommitIndexUpdate
void decrementServerNextIndex(NodeID server_node_id)
TraceRaftNewEntryCommitted
ServerState server_state_
Log::Index getCommitIndex() const
PersistentState persistent_state_
void handleRequestVoteRequest(const ReceivedDataStructure< RequestVote::Request > &request, ServiceResponseDataStructure< RequestVote::Response > &response)
const INode & getNode() const
MonotonicDuration getUpdateInterval() const
TraceRaftVoteRequestInitiation
bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const
void switchState(ServerState new_state)
ServiceServer< AppendEntries, AppendEntriesCallback > append_entries_srv_
virtual void onEvent(TraceCode event_code, int64_t event_argument)=0
void handleRequestVoteResponse(const ServiceCallResult< RequestVote > &result)
TraceRaftAppendEntriesRespUnsucfl
void setResponseEnabled(bool x)
#define UAVCAN_TRACE(...)
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
uint8_t getQuorumSize() const
int init(const uint8_t init_cluster_size, const TransferPriority priority)
bool hasPendingCalls() const
ServiceClient< AppendEntries, AppendEntriesResponseCallback > append_entries_client_
int init(const uint8_t cluster_size, const TransferPriority priority)
const Entry * getEntryAtIndex(Index index) const
Log::Index getServerNextIndex(NodeID server_node_id) const
NodeID getVotedFor() const
void propagateCommitIndex()
bool isActivityTimedOut() const
Log::Index prev_log_index
UAVCAN_EXPORT const T & max(const T &a, const T &b)
bool isResponseEnabled() const
PendingAppendEntriesFields()
const ClusterManager & getClusterManager() const
TraceRaftElectionComplete
void checkInvariants() const
virtual ~IRaftLeaderMonitor()
int setCurrentTerm(Term term)
void handlePersistentStateUpdateError(int error)
MonotonicTime getLastActivityTimestamp() const
int setVotedFor(NodeID node_id)
MonotonicTime last_activity_timestamp_
bool isKnownServer(NodeID node_id) const
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
LogEntryInfo(const Entry &arg_entry, bool arg_committed)
UAVCAN_EXPORT const T & min(const T &a, const T &b)
MonotonicDuration randomized_activity_timeout_
void handleAppendEntriesResponse(const ServiceCallResult< AppendEntries > &result)
ServiceClient< RequestVote, RequestVoteResponseCallback > request_vote_client_
const ResponseFieldType & getResponse() const
NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const
void tryIncrementCurrentTermFromResponse(Term new_term)
MonotonicDuration getRandomizedTimeout() const
void setCallback(const Callback &cb)
Log::Index getNumAllocations() const
int start(const Callback &callback)
void setRequestTimeout(MonotonicDuration timeout)
Term getCurrentTerm() const
Index getLastIndex() const
ServerState getServerState() const
int removeEntriesWhereIndexGreaterOrEqual(Index index)
StorageType< Entry::FieldTypes::term >::Type Term
static MonotonicDuration getDefaultRequestTimeout()
static MonotonicDuration fromMSec(int64_t ms)
unsigned getNumPendingCalls() const
TraceRaftVoteRequestReceived
bool isClusterDiscovered() const
void handleAppendEntriesRequest(const ReceivedDataStructure< AppendEntries::Request > &request, ServiceResponseDataStructure< AppendEntries::Response > &response)
ServiceServer< RequestVote, RequestVoteCallback > request_vote_srv_
void addServer(NodeID node_id)
virtual void handleTimerEvent(const TimerEvent &)
const PersistentState & getPersistentState() const
Log::Index getServerMatchIndex(NodeID server_node_id) const
int call(NodeID server_node_id, const RequestType &request)
bool isSuccessful() const
virtual void handleLogCommitOnLeader(const Entry &committed_entry)=0
uint8_t next_server_index_
Next server to query AE from.
TraceRaftAppendEntriesCallFailure
int append(const Entry &entry)
static const uavcan::int16_t ErrLogic
Internal logic error.