raft_core.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
3  */
4 
5 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
6 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
7 
8 #include <cstdlib>
10 #include <uavcan/debug.hpp>
12 #include <uavcan/node/timer.hpp>
19 // UAVCAN types
20 #include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp>
21 #include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp>
22 
23 namespace uavcan
24 {
25 namespace dynamic_node_id_server
26 {
27 namespace distributed
28 {
33 {
34 public:
38  virtual void handleLogCommitOnLeader(const Entry& committed_entry) = 0;
39 
45  virtual void handleLocalLeadershipChange(bool local_node_is_leader) = 0;
46 
47  virtual ~IRaftLeaderMonitor() { }
48 };
49 
64 class RaftCore : private TimerBase
65 {
66 public:
68  {
71  ServerStateLeader
72  };
73 
74 private:
78 
81 
85 
88 
90  {
93 
95  : prev_log_index(0)
96  , num_entries(0)
97  { }
98  };
99 
100  /*
101  * Constants
102  */
103  enum { MaxNumFollowers = ClusterManager::MaxClusterSize - 1 };
104 
107 
108  /*
109  * States
110  */
114 
118 
121 
123 
124  /*
125  * Transport
126  */
131 
132  /*
133  * Methods
134  */
135  void trace(TraceCode event, int64_t argument) { tracer_.onEvent(event, argument); }
136 
137  INode& getNode() { return append_entries_srv_.getNode(); }
138  const INode& getNode() const { return append_entries_srv_.getNode(); }
139 
140  void checkInvariants() const
141  {
142  // Commit index
143  UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
144 
145  // Term
146  UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex()) !=
148  UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex())->term <=
149  persistent_state_.getCurrentTerm());
150 
151  // Elections
152  UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !request_vote_client_.hasPendingCalls() ||
153  persistent_state_.getVotedFor() == getNode().getNodeID());
154  UAVCAN_ASSERT(num_votes_received_in_this_campaign_ <= cluster_.getClusterSize());
155 
156  // Transport
157  UAVCAN_ASSERT(append_entries_client_.getNumPendingCalls() <= 1);
158  UAVCAN_ASSERT(request_vote_client_.getNumPendingCalls() <= cluster_.getNumKnownServers());
159  UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !append_entries_client_.hasPendingCalls());
160  UAVCAN_ASSERT(server_state_ != ServerStateLeader || !request_vote_client_.hasPendingCalls());
161  UAVCAN_ASSERT(server_state_ != ServerStateFollower ||
162  (!append_entries_client_.hasPendingCalls() && !request_vote_client_.hasPendingCalls()));
163  }
164 
166  {
167  last_activity_timestamp_ = getNode().getMonotonicTime();
168 
169  static const int32_t randomization_range_msec = AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS -
170  AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS;
171  // coverity[dont_call]
172  const int32_t random_msec = (std::rand() % randomization_range_msec) + 1;
173 
174  randomized_activity_timeout_ =
175  MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS + random_msec);
176 
177  UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() > AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS);
178  UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() <= AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS);
179  }
180 
181  bool isActivityTimedOut() const
182  {
183  return getNode().getMonotonicTime() > (last_activity_timestamp_ + randomized_activity_timeout_);
184  }
185 
187  {
188  UAVCAN_ASSERT(error < 0);
189  trace(TraceRaftPersistStateUpdateError, error);
190  switchState(ServerStateFollower);
191  registerActivity(); // Deferring reelections
192  }
193 
195  {
196  if (isActivityTimedOut())
197  {
198  switchState(ServerStateCandidate);
199  registerActivity();
200  }
201  }
202 
204  {
205  if (num_votes_received_in_this_campaign_ > 0)
206  {
207  trace(TraceRaftElectionComplete, num_votes_received_in_this_campaign_);
208  const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize();
209 
210  UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "Election complete, won: %d", int(won));
211 
212  switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader
213  }
214  else
215  {
216  // Set votedFor, abort on failure
217  int res = persistent_state_.setVotedFor(getNode().getNodeID());
218  if (res < 0)
219  {
220  handlePersistentStateUpdateError(res);
221  return;
222  }
223 
224  // Increment current term, abort on failure
225  res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U);
226  if (res < 0)
227  {
228  handlePersistentStateUpdateError(res);
229  return;
230  }
231 
232  num_votes_received_in_this_campaign_ = 1; // Voting for self
233 
234  RequestVote::Request req;
235  req.last_log_index = persistent_state_.getLog().getLastIndex();
236  req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term;
237  req.term = persistent_state_.getCurrentTerm();
238 
239  for (uint8_t i = 0; i < MaxNumFollowers; i++)
240  {
241  const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i);
242  if (!node_id.isUnicast())
243  {
244  break;
245  }
246 
247  UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore",
248  "Requesting vote from %d", int(node_id.get()));
249  trace(TraceRaftVoteRequestInitiation, node_id.get());
250 
251  res = request_vote_client_.call(node_id, req);
252  if (res < 0)
253  {
254  trace(TraceError, res);
255  }
256  }
257  }
258  }
259 
261  {
262  if (append_entries_client_.hasPendingCalls())
263  {
264  append_entries_client_.cancelAllCalls(); // Refer to the response callback to learn why
265  }
266 
267  if (cluster_.getClusterSize() > 1)
268  {
269  const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_);
270  UAVCAN_ASSERT(node_id.isUnicast());
271 
272  next_server_index_++;
273  if (next_server_index_ >= cluster_.getNumKnownServers())
274  {
275  next_server_index_ = 0;
276  }
277 
278  AppendEntries::Request req;
279  req.term = persistent_state_.getCurrentTerm();
280  req.leader_commit = commit_index_;
281 
282  req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U);
283 
284  const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index);
285  if (entry == UAVCAN_NULLPTR)
286  {
287  UAVCAN_ASSERT(0);
288  handlePersistentStateUpdateError(-ErrLogic);
289  return;
290  }
291 
292  req.prev_log_term = entry->term;
293 
294  for (Log::Index index = cluster_.getServerNextIndex(node_id);
295  index <= persistent_state_.getLog().getLastIndex();
296  index++)
297  {
298  req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index));
299  if (req.entries.size() == req.entries.capacity())
300  {
301  break;
302  }
303  }
304 
305  pending_append_entries_fields_.num_entries = req.entries.size();
306  pending_append_entries_fields_.prev_log_index = req.prev_log_index;
307 
308  const int res = append_entries_client_.call(node_id, req);
309  if (res < 0)
310  {
312  }
313  }
314 
315  propagateCommitIndex();
316  }
317 
318  void switchState(ServerState new_state)
319  {
320  if (server_state_ == new_state)
321  {
322  return;
323  }
324 
325  /*
326  * Logging
327  */
328  UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "State switch: %d --> %d",
329  int(server_state_), int(new_state));
330  trace(TraceRaftStateSwitch, new_state);
331 
332  /*
333  * Updating the current state
334  */
335  const ServerState old_state = server_state_;
336  server_state_ = new_state;
337 
338  /*
339  * Resetting specific states
340  */
341  cluster_.resetAllServerIndices();
342 
343  next_server_index_ = 0;
344  num_votes_received_in_this_campaign_ = 0;
345 
346  request_vote_client_.cancelAllCalls();
347  append_entries_client_.cancelAllCalls();
348 
349  /*
350  * Calling the switch handler
351  * Note that the handler may commit to the log directly
352  */
353  if ((old_state == ServerStateLeader) ||
354  (new_state == ServerStateLeader))
355  {
356  leader_monitor_.handleLocalLeadershipChange(new_state == ServerStateLeader);
357  }
358  }
359 
361  {
362  trace(TraceRaftNewerTermInResponse, new_term);
363  const int res = persistent_state_.setCurrentTerm(new_term);
364  if (res < 0)
365  {
367  }
368  registerActivity(); // Deferring future elections
369  switchState(ServerStateFollower);
370  }
371 
373  {
374  // Objective is to estimate whether we can safely increment commit index value
375  UAVCAN_ASSERT(server_state_ == ServerStateLeader);
376  UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
377 
378  if (commit_index_ < persistent_state_.getLog().getLastIndex())
379  {
380  /*
381  * Not all local entries are committed.
382  * Deciding if it is safe to increment commit index.
383  */
384  uint8_t num_nodes_with_next_log_entry_available = 1; // Local node
385  for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
386  {
387  const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
388  if (match_index > commit_index_)
389  {
390  num_nodes_with_next_log_entry_available++;
391  }
392  }
393 
394  if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize())
395  {
396  commit_index_++;
397  UAVCAN_ASSERT(commit_index_ > 0); // Index 0 is always committed
398  trace(TraceRaftNewEntryCommitted, commit_index_);
399 
400  // AT THIS POINT ALLOCATION IS COMPLETE
401  leader_monitor_.handleLogCommitOnLeader(*persistent_state_.getLog().getEntryAtIndex(commit_index_));
402  }
403  }
404  }
405 
408  {
409  checkInvariants();
410 
411  if (!cluster_.isKnownServer(request.getSrcNodeID()))
412  {
413  if (cluster_.isClusterDiscovered())
414  {
415  trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
416  response.setResponseEnabled(false);
417  return;
418  }
419  else
420  {
421  cluster_.addServer(request.getSrcNodeID());
422  }
423  }
424 
425  UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
426 
427  /*
428  * Checking if our current state is up to date.
429  * The request will be ignored if persistent state cannot be updated.
430  */
431  if (request.term > persistent_state_.getCurrentTerm())
432  {
433  int res = persistent_state_.setCurrentTerm(request.term);
434  if (res < 0)
435  {
436  handlePersistentStateUpdateError(res);
437  response.setResponseEnabled(false);
438  return;
439  }
440 
441  res = persistent_state_.resetVotedFor();
442  if (res < 0)
443  {
444  handlePersistentStateUpdateError(res);
445  response.setResponseEnabled(false);
446  return;
447  }
448  }
449 
450  /*
451  * Preparing the response
452  */
453  response.term = persistent_state_.getCurrentTerm();
454  response.success = false;
455 
456  /*
457  * Step 1 (see Raft paper)
458  * Reject the request if the leader has stale term number.
459  */
460  if (request.term < persistent_state_.getCurrentTerm())
461  {
462  response.setResponseEnabled(true);
463  return;
464  }
465 
466  registerActivity();
467  switchState(ServerStateFollower);
468 
469  /*
470  * Step 2
471  * Reject the request if the assumed log index does not exist on the local node.
472  */
473  const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index);
474  if (prev_entry == UAVCAN_NULLPTR)
475  {
476  response.setResponseEnabled(true);
477  return;
478  }
479 
480  /*
481  * Step 3
482  * Drop log entries if term number does not match.
483  * Ignore the request if the persistent state cannot be updated.
484  */
485  if (prev_entry->term != request.prev_log_term)
486  {
487  const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index);
488  response.setResponseEnabled(res >= 0);
489  if (res < 0)
490  {
492  }
493  return;
494  }
495 
496  /*
497  * Step 4
498  * Update the log with new entries - this will possibly require to rewrite existing entries.
499  * Ignore the request if the persistent state cannot be updated.
500  */
501  if (request.prev_log_index != persistent_state_.getLog().getLastIndex())
502  {
503  const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index);
504  if (res < 0)
505  {
507  response.setResponseEnabled(false);
508  return;
509  }
510  }
511 
512  for (uint8_t i = 0; i < request.entries.size(); i++)
513  {
514  const int res = persistent_state_.getLog().append(request.entries[i]);
515  if (res < 0)
516  {
518  response.setResponseEnabled(false);
519  return; // Response will not be sent, the server will assume that we're dead
520  }
521  }
522 
523  /*
524  * Step 5
525  * Update the commit index.
526  */
527  if (request.leader_commit > commit_index_)
528  {
529  commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex());
530  trace(TraceRaftCommitIndexUpdate, commit_index_);
531  }
532 
533  response.setResponseEnabled(true);
534  response.success = true;
535  }
536 
538  {
539  UAVCAN_ASSERT(server_state_ == ServerStateLeader); // When state switches, all requests must be cancelled
540  checkInvariants();
541 
542  if (!result.isSuccessful())
543  {
544  return;
545  }
546 
547  if (result.getResponse().term > persistent_state_.getCurrentTerm())
548  {
549  tryIncrementCurrentTermFromResponse(result.getResponse().term);
550  }
551  else
552  {
553  if (result.getResponse().success)
554  {
555  cluster_.incrementServerNextIndexBy(result.getCallID().server_node_id,
556  pending_append_entries_fields_.num_entries);
557  cluster_.setServerMatchIndex(result.getCallID().server_node_id,
558  Log::Index(pending_append_entries_fields_.prev_log_index +
559  pending_append_entries_fields_.num_entries));
560  }
561  else
562  {
563  cluster_.decrementServerNextIndex(result.getCallID().server_node_id);
564  trace(TraceRaftAppendEntriesRespUnsucfl, result.getCallID().server_node_id.get());
565  }
566  }
567 
568  pending_append_entries_fields_ = PendingAppendEntriesFields();
569  // Rest of the logic is implemented in periodic update handlers.
570  }
571 
574  {
575  checkInvariants();
576  trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get());
577 
578  if (!cluster_.isKnownServer(request.getSrcNodeID()))
579  {
580  trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
581  response.setResponseEnabled(false);
582  return;
583  }
584 
585  UAVCAN_ASSERT(response.isResponseEnabled()); // This is default
586 
587  /*
588  * Checking if our current state is up to date.
589  * The request will be ignored if persistent state cannot be updated.
590  */
591  if (request.term > persistent_state_.getCurrentTerm())
592  {
593  switchState(ServerStateFollower); // Our term is stale, so we can't serve as leader
594 
595  int res = persistent_state_.setCurrentTerm(request.term);
596  if (res < 0)
597  {
598  handlePersistentStateUpdateError(res);
599  response.setResponseEnabled(false);
600  return;
601  }
602 
603  res = persistent_state_.resetVotedFor();
604  if (res < 0)
605  {
606  handlePersistentStateUpdateError(res);
607  response.setResponseEnabled(false);
608  return;
609  }
610  }
611 
612  /*
613  * Preparing the response
614  */
615  response.term = persistent_state_.getCurrentTerm();
616 
617  if (request.term < response.term)
618  {
619  response.vote_granted = false;
620  }
621  else
622  {
623  const bool can_vote = !persistent_state_.isVotedForSet() ||
624  (persistent_state_.getVotedFor() == request.getSrcNodeID());
625  const bool log_is_up_to_date =
626  persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term);
627 
628  response.vote_granted = can_vote && log_is_up_to_date;
629 
630  if (response.vote_granted)
631  {
632  switchState(ServerStateFollower); // Avoiding race condition when Candidate
633  registerActivity(); // This is necessary to avoid excessive elections
634 
635  const int res = persistent_state_.setVotedFor(request.getSrcNodeID());
636  if (res < 0)
637  {
639  response.setResponseEnabled(false);
640  return;
641  }
642  }
643  }
644  }
645 
647  {
648  UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled
649  checkInvariants();
650 
651  if (!result.isSuccessful())
652  {
653  return;
654  }
655 
656  trace(TraceRaftVoteRequestSucceeded, result.getCallID().server_node_id.get());
657 
658  if (result.getResponse().term > persistent_state_.getCurrentTerm())
659  {
660  tryIncrementCurrentTermFromResponse(result.getResponse().term);
661  }
662  else
663  {
664  if (result.getResponse().vote_granted)
665  {
666  num_votes_received_in_this_campaign_++;
667  }
668  }
669  // Rest of the logic is implemented in periodic update handlers.
670  // I'm no fan of asynchronous programming. At all.
671  }
672 
673  virtual void handleTimerEvent(const TimerEvent&)
674  {
675  checkInvariants();
676 
677  switch (server_state_)
678  {
679  case ServerStateFollower:
680  {
681  updateFollower();
682  break;
683  }
684  case ServerStateCandidate:
685  {
686  updateCandidate();
687  break;
688  }
689  case ServerStateLeader:
690  {
691  updateLeader();
692  break;
693  }
694  default:
695  {
696  UAVCAN_ASSERT(0);
697  break;
698  }
699  }
700  }
701 
702 public:
704  IStorageBackend& storage,
705  IEventTracer& tracer,
706  IRaftLeaderMonitor& leader_monitor)
707  : TimerBase(node)
708  , tracer_(tracer)
709  , leader_monitor_(leader_monitor)
710  , persistent_state_(storage, tracer)
711  , cluster_(node, storage, persistent_state_.getLog(), tracer)
712  , commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero
713  , last_activity_timestamp_(node.getMonotonicTime())
714  , randomized_activity_timeout_(
715  MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS))
716  , server_state_(ServerStateFollower)
717  , next_server_index_(0)
718  , num_votes_received_in_this_campaign_(0)
719  , append_entries_srv_(node)
720  , append_entries_client_(node)
721  , request_vote_srv_(node)
722  , request_vote_client_(node)
723  { }
724 
731  int init(const uint8_t cluster_size, const TransferPriority priority)
732  {
733  /*
734  * Initializing state variables
735  */
736  server_state_ = ServerStateFollower;
737  next_server_index_ = 0;
738  num_votes_received_in_this_campaign_ = 0;
739  commit_index_ = 0;
740 
741  registerActivity();
742 
743  /*
744  * Initializing internals
745  */
746  int res = persistent_state_.init();
747  if (res < 0)
748  {
749  return res;
750  }
751 
752  res = cluster_.init(cluster_size, priority);
753  if (res < 0)
754  {
755  return res;
756  }
757 
758  res = append_entries_srv_.start(AppendEntriesCallback(this, &RaftCore::handleAppendEntriesRequest));
759  if (res < 0)
760  {
761  return res;
762  }
763 
764  res = request_vote_srv_.start(RequestVoteCallback(this, &RaftCore::handleRequestVoteRequest));
765  if (res < 0)
766  {
767  return res;
768  }
769 
770  res = append_entries_client_.init(priority);
771  if (res < 0)
772  {
773  return res;
774  }
775  append_entries_client_.setCallback(AppendEntriesResponseCallback(this,
777 
778  res = request_vote_client_.init(priority);
779  if (res < 0)
780  {
781  return res;
782  }
784 
785  /*
786  * Initializing timing constants
787  * Refer to the specification for the formula
788  */
789  const uint8_t num_followers = static_cast<uint8_t>(cluster_.getClusterSize() - 1);
790 
791  const MonotonicDuration update_interval =
792  MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS /
793  2 / max(static_cast<uint8_t>(2), num_followers));
794 
795  UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore",
796  "Update interval: %ld msec", static_cast<long>(update_interval.toMSec()));
797 
798  append_entries_client_.setRequestTimeout(min(append_entries_client_.getDefaultRequestTimeout(),
799  update_interval));
800 
801  request_vote_client_.setRequestTimeout(min(request_vote_client_.getDefaultRequestTimeout(),
802  update_interval));
803 
804  startPeriodic(update_interval);
805 
806  trace(TraceRaftCoreInited, update_interval.toUSec());
807 
808  UAVCAN_ASSERT(res >= 0);
809  return 0;
810  }
811 
815  Log::Index getCommitIndex() const { return commit_index_; }
816 
820  bool areAllLogEntriesCommitted() const { return commit_index_ == persistent_state_.getLog().getLastIndex(); }
821 
825  bool isLeader() const { return server_state_ == ServerStateLeader; }
826 
832  void appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID node_id)
833  {
834  if (isLeader())
835  {
836  Entry entry;
837  entry.node_id = node_id.get();
838  entry.unique_id = unique_id;
839  entry.term = persistent_state_.getCurrentTerm();
840 
841  trace(TraceRaftNewLogEntry, entry.node_id);
842  const int res = persistent_state_.getLog().append(entry);
843  if (res < 0)
844  {
845  handlePersistentStateUpdateError(res);
846  }
847  }
848  else
849  {
850  UAVCAN_ASSERT(0);
851  }
852  }
853 
858  {
860  bool committed;
861 
862  LogEntryInfo(const Entry& arg_entry, bool arg_committed)
863  : entry(arg_entry)
864  , committed(arg_committed)
865  { }
866  };
867 
877  template <typename Predicate>
878  inline LazyConstructor<LogEntryInfo> traverseLogFromEndUntil(const Predicate& predicate) const
879  {
880  UAVCAN_ASSERT(coerceOrFallback<bool>(predicate, true));
881  for (int index = static_cast<int>(persistent_state_.getLog().getLastIndex()); index >= 0; index--)
882  {
883  const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(Log::Index(index));
884  UAVCAN_ASSERT(entry != UAVCAN_NULLPTR);
885  const LogEntryInfo info(*entry, Log::Index(index) <= commit_index_);
886  if (predicate(info))
887  {
889  ret.template construct<const LogEntryInfo&>(info);
890  return ret;
891  }
892  }
894  }
895 
897  {
898  // Remember that index zero contains a special-purpose entry that doesn't count as allocation
899  return persistent_state_.getLog().getLastIndex();
900  }
901 
905  const PersistentState& getPersistentState() const { return persistent_state_; }
906  const ClusterManager& getClusterManager() const { return cluster_; }
907  MonotonicTime getLastActivityTimestamp() const { return last_activity_timestamp_; }
908  ServerState getServerState() const { return server_state_; }
909  MonotonicDuration getUpdateInterval() const { return getPeriod(); }
910  MonotonicDuration getRandomizedTimeout() const { return randomized_activity_timeout_; }
911 };
912 
913 }
914 }
915 }
916 
917 #endif // Include guard
std::uint8_t uint8_t
Definition: std.hpp:24
void appendLog(const Entry::FieldTypes::unique_id &unique_id, NodeID node_id)
Definition: raft_core.hpp:832
TraceRaftPersistStateUpdateError
Definition: event.hpp:23
RaftCore(INode &node, IStorageBackend &storage, IEventTracer &tracer, IRaftLeaderMonitor &leader_monitor)
Definition: raft_core.hpp:703
ServiceCallID getCallID() const
void trace(TraceCode event, int64_t argument)
Definition: raft_core.hpp:135
TraceRaftNewerTermInResponse
Definition: event.hpp:23
TraceRaftVoteRequestSucceeded
Definition: event.hpp:23
PendingAppendEntriesFields pending_append_entries_fields_
Definition: raft_core.hpp:122
LazyConstructor< LogEntryInfo > traverseLogFromEndUntil(const Predicate &predicate) const
Definition: raft_core.hpp:878
virtual void handleLocalLeadershipChange(bool local_node_is_leader)=0
TraceRaftStateSwitch
Definition: event.hpp:23
bool isUnicast() const
Definition: transfer.hpp:136
TraceRaftCommitIndexUpdate
Definition: event.hpp:23
TraceRaftNewEntryCommitted
Definition: event.hpp:23
void handleRequestVoteRequest(const ReceivedDataStructure< RequestVote::Request > &request, ServiceResponseDataStructure< RequestVote::Response > &response)
Definition: raft_core.hpp:572
TraceRaftVoteRequestInitiation
Definition: event.hpp:23
bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const
Definition: log.hpp:293
ServiceServer< AppendEntries, AppendEntriesCallback > append_entries_srv_
Definition: raft_core.hpp:127
virtual void onEvent(TraceCode event_code, int64_t event_argument)=0
void handleRequestVoteResponse(const ServiceCallResult< RequestVote > &result)
Definition: raft_core.hpp:646
TraceRaftAppendEntriesRespUnsucfl
Definition: event.hpp:23
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
int64_t toUSec() const
Definition: time.hpp:43
int init(const uint8_t init_cluster_size, const TransferPriority priority)
bool hasPendingCalls() const
ServiceClient< AppendEntries, AppendEntriesResponseCallback > append_entries_client_
Definition: raft_core.hpp:128
int init(const uint8_t cluster_size, const TransferPriority priority)
Definition: raft_core.hpp:731
const Entry * getEntryAtIndex(Index index) const
Definition: log.hpp:285
Log::Index getServerNextIndex(NodeID server_node_id) const
uint8_t get() const
Definition: transfer.hpp:132
TraceRaftRequestIgnored
Definition: event.hpp:23
UAVCAN_EXPORT const T & max(const T &a, const T &b)
Definition: templates.hpp:291
int64_t toMSec() const
Definition: time.hpp:44
const ClusterManager & getClusterManager() const
Definition: raft_core.hpp:906
int rand()
Definition: main.cpp:28
TraceRaftElectionComplete
Definition: event.hpp:23
TraceRaftNewLogEntry
Definition: event.hpp:23
def error(fmt, args)
Definition: parser.py:722
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
LogEntryInfo(const Entry &arg_entry, bool arg_committed)
Definition: raft_core.hpp:862
UAVCAN_EXPORT const T & min(const T &a, const T &b)
Definition: templates.hpp:281
void handleAppendEntriesResponse(const ServiceCallResult< AppendEntries > &result)
Definition: raft_core.hpp:537
ServiceClient< RequestVote, RequestVoteResponseCallback > request_vote_client_
Definition: raft_core.hpp:130
const ResponseFieldType & getResponse() const
TraceRaftCoreInited
Definition: event.hpp:23
void setCallback(const Callback &cb)
int start(const Callback &callback)
std::int64_t int64_t
Definition: std.hpp:32
void setRequestTimeout(MonotonicDuration timeout)
int removeEntriesWhereIndexGreaterOrEqual(Index index)
Definition: log.hpp:242
static MonotonicDuration getDefaultRequestTimeout()
static MonotonicDuration fromMSec(int64_t ms)
Definition: time.hpp:41
unsigned getNumPendingCalls() const
TraceRaftVoteRequestReceived
Definition: event.hpp:23
TraceError
Definition: event.hpp:23
void handleAppendEntriesRequest(const ReceivedDataStructure< AppendEntries::Request > &request, ServiceResponseDataStructure< AppendEntries::Response > &response)
Definition: raft_core.hpp:406
ServiceServer< RequestVote, RequestVoteCallback > request_vote_srv_
Definition: raft_core.hpp:129
virtual void handleTimerEvent(const TimerEvent &)
Definition: raft_core.hpp:673
const PersistentState & getPersistentState() const
Definition: raft_core.hpp:905
Log::Index getServerMatchIndex(NodeID server_node_id) const
int call(NodeID server_node_id, const RequestType &request)
std::int32_t int32_t
Definition: std.hpp:31
virtual void handleLogCommitOnLeader(const Entry &committed_entry)=0
uint8_t next_server_index_
Next server to query AE from.
Definition: raft_core.hpp:119
TraceRaftAppendEntriesCallFailure
Definition: event.hpp:23


uavcan_communicator
Author(s):
autogenerated on Wed Jan 11 2023 03:59:39