Go to the documentation of this file.
5 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
6 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
20 #include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp>
21 #include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp>
25 namespace dynamic_node_id_server
169 static const int32_t randomization_range_msec = AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS -
170 AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS;
210 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
"Election complete, won: %d",
int(won));
234 RequestVote::Request req;
247 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
248 "Requesting vote from %d",
int(node_id.
get()));
278 AppendEntries::Request req;
292 req.prev_log_term = entry->term;
299 if (req.entries.size() == req.entries.capacity())
328 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
"State switch: %d --> %d",
384 uint8_t num_nodes_with_next_log_entry_available = 1;
390 num_nodes_with_next_log_entry_available++;
485 if (prev_entry->term != request.prev_log_term)
488 response.setResponseEnabled(res >= 0);
512 for (
uint8_t i = 0; i < request.entries.size(); i++)
625 const bool log_is_up_to_date =
628 response.vote_granted = can_vote && log_is_up_to_date;
715 MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS))
793 2 /
max(
static_cast<uint8_t>(2), num_followers));
795 UAVCAN_TRACE(
"dynamic_node_id_server::distributed::RaftCore",
796 "Update interval: %ld msec",
static_cast<long>(update_interval.
toMSec()));
837 entry.node_id = node_id.
get();
838 entry.unique_id = unique_id;
877 template <
typename Predicate>
889 ret.template construct<const LogEntryInfo&>(
info);
917 #endif // Include guard
const std::string response
MethodBinder< RaftCore *, void(RaftCore::*)(const ReceivedDataStructure< RequestVote::Request > &, ServiceResponseDataStructure< RequestVote::Response > &)> RequestVoteCallback
uint8_t getClusterSize() const
bool isClusterDiscovered() const
MonotonicDuration getPeriod() const
MonotonicTime getMonotonicTime() const
TraceRaftNewEntryCommitted
TraceRaftVoteRequestSucceeded
StorageType< Entry::FieldTypes::term >::Type Term
bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const
virtual void handleLogCommitOnLeader(const Entry &committed_entry)=0
void decrementServerNextIndex(NodeID server_node_id)
bool isVotedForSet() const
ServiceCallID getCallID() const
ServerState server_state_
NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const
const Entry * getEntryAtIndex(Index index) const
void propagateCommitIndex()
bool isActivityTimedOut() const
TraceRaftNewerTermInResponse
static MonotonicDuration fromMSec(int64_t ms)
bool areAllLogEntriesCommitted() const
int setVotedFor(NodeID node_id)
void handleRequestVoteResponse(const ServiceCallResult< RequestVote > &result)
TraceRaftPersistStateUpdateError
IRaftLeaderMonitor & leader_monitor_
RaftCore(INode &node, IStorageBackend &storage, IEventTracer &tracer, IRaftLeaderMonitor &leader_monitor)
Log::Index getServerNextIndex(NodeID server_node_id) const
#define UAVCAN_TRACE(...)
MonotonicDuration getUpdateInterval() const
bool isKnownServer(NodeID node_id) const
void appendLog(const Entry::FieldTypes::unique_id &unique_id, NodeID node_id)
void checkInvariants() const
int append(const Entry &entry)
void tryIncrementCurrentTermFromResponse(Term new_term)
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
ServiceClient< RequestVote, RequestVoteResponseCallback > request_vote_client_
MonotonicDuration getRandomizedTimeout() const
Log::Index getServerMatchIndex(NodeID server_node_id) const
void startPeriodic(MonotonicDuration period)
MethodBinder< RaftCore *, void(RaftCore::*)(const ServiceCallResult< AppendEntries > &)> AppendEntriesResponseCallback
Log::Index prev_log_index
MonotonicTime last_activity_timestamp_
const INode & getNode() const
static const uavcan::int16_t ErrLogic
Internal logic error.
int removeEntriesWhereIndexGreaterOrEqual(Index index)
virtual ~IRaftLeaderMonitor()
int setCurrentTerm(Term term)
const UAVCAN_EXPORT T & max(const T &a, const T &b)
MethodBinder< RaftCore *, void(RaftCore::*)(const ServiceCallResult< RequestVote > &)> RequestVoteResponseCallback
void handleRequestVoteRequest(const ReceivedDataStructure< RequestVote::Request > &request, ServiceResponseDataStructure< RequestVote::Response > &response)
void resetAllServerIndices()
void trace(TraceCode event, int64_t argument)
Log::Index getNumAllocations() const
const UAVCAN_EXPORT T & min(const T &a, const T &b)
ServiceServer< RequestVote, RequestVoteCallback > request_vote_srv_
TraceRaftAppendEntriesCallFailure
PersistentState persistent_state_
virtual void handleTimerEvent(const TimerEvent &)
const ResponseFieldType & getResponse() const
TraceRaftVoteRequestInitiation
bool isSuccessful() const
uint8_t next_server_index_
Next server to query AE from.
TraceRaftElectionComplete
ServerState getServerState() const
uint8_t num_votes_received_in_this_campaign_
virtual void onEvent(TraceCode event_code, int64_t event_argument)=0
int init(const uint8_t cluster_size, const TransferPriority priority)
virtual void handleLocalLeadershipChange(bool local_node_is_leader)=0
PendingAppendEntriesFields()
int removeEntriesWhereIndexGreater(Index index)
TraceRaftAppendEntriesRespUnsucfl
const PersistentState & getPersistentState() const
MonotonicTime getLastActivityTimestamp() const
void handlePersistentStateUpdateError(int error)
MonotonicDuration randomized_activity_timeout_
void switchState(ServerState new_state)
void handleAppendEntriesRequest(const ReceivedDataStructure< AppendEntries::Request > &request, ServiceResponseDataStructure< AppendEntries::Response > &response)
int init(const uint8_t init_cluster_size, const TransferPriority priority)
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
TraceRaftVoteRequestReceived
const ClusterManager & getClusterManager() const
LazyConstructor< LogEntryInfo > traverseLogFromEndUntil(const Predicate &predicate) const
uint8_t getQuorumSize() const
uint8_t getNumKnownServers() const
Log::Index getCommitIndex() const
LogEntryInfo(const Entry &arg_entry, bool arg_committed)
NodeID getSrcNodeID() const
Term getCurrentTerm() const
ServiceServer< AppendEntries, AppendEntriesCallback > append_entries_srv_
void addServer(NodeID node_id)
void handleAppendEntriesResponse(const ServiceCallResult< AppendEntries > &result)
NodeID getVotedFor() const
PendingAppendEntriesFields pending_append_entries_fields_
MethodBinder< RaftCore *, void(RaftCore::*)(const ReceivedDataStructure< AppendEntries::Request > &, ServiceResponseDataStructure< AppendEntries::Response > &)> AppendEntriesCallback
TraceRaftCommitIndexUpdate
ServiceClient< AppendEntries, AppendEntriesResponseCallback > append_entries_client_
Index getLastIndex() const