distributed/server.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
3  */
4 
5 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
6 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
7 
9 #include <uavcan/debug.hpp>
15 
16 namespace uavcan
17 {
18 namespace dynamic_node_id_server
19 {
20 namespace distributed
21 {
27 {
29  {
31 
33  : unique_id(uid)
34  { }
35 
37  {
38  return info.entry.unique_id == unique_id;
39  }
40  };
41 
43  {
44  const NodeID node_id;
45 
47  : node_id(nid)
48  { }
49 
51  {
52  return info.entry.node_id == node_id.get();
53  }
54  };
55 
56  /*
57  * States
58  */
60 
61  /*
62  * Methods of IAllocationRequestHandler
63  */
65  {
66  /*
67  * The server is allowed to publish follow-up allocation responses only if both conditions are met:
68  * - The server is leader.
69  * - The last allocation request has been completed successfully.
70  *
71  * Why second condition? Imagine a case when there's two Raft nodes that don't hear each other - A and B,
72  * both of them are leaders (but only A can commit to the log, B is in a minor partition); then there's a
73  * client X that can exchange with both leaders, and a client Y that can exchange only with A. Such a
74  * situation can occur in case of a very unlikely failure of redundant interfaces.
75  *
76  * Both clients X and Y initially send a first-stage Allocation request; A responds to Y with a first-stage
77  * response, whereas B responds to X. Both X and Y will issue a follow-up second-stage requests, which may
78  * cause A to mix second-stage Allocation requests from different nodes, leading to reception of an invalid
79  * unique ID. When both leaders receive full unique IDs (A will receive an invalid one, B will receive a valid
80  * unique ID of X), only A will be able to make a commit, because B is in a minority. Since both clients were
81  * unable to receive node ID values in this round, they will try again later.
82  *
83  * Now, in order to prevent B from disrupting client-server communication second time around, we introduce this
84  * second restriction: the server cannot exchange with clients as long as its log contains uncommitted entries.
85  *
86  * Note that this restriction does not apply to allocation requests sent via CAN FD frames, as in this case
87  * no follow-up responses are necessary. So only CAN FD can offer reliable Allocation exchange.
88  */
89  return raft_core_.isLeader() && raft_core_.areAllLogEntriesCommitted();
90  }
91 
92  virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id)
93  {
94  /*
95  * Note that it is possible that the local node is not leader. We will still perform the log search
96  * and try to find the node that requested allocation. If the node is found, response will be sent;
97  * otherwise the request will be ignored because only leader can add new allocations.
98  */
100  raft_core_.traverseLogFromEndUntil(UniqueIDLogPredicate(unique_id));
101 
102  if (result.isConstructed())
103  {
104  if (result->committed)
105  {
106  tryPublishAllocationResult(result->entry);
107  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
108  "Allocation request served with existing allocation; node ID %d",
109  int(result->entry.node_id));
110  }
111  else
112  {
113  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
114  "Allocation request ignored - allocation exists but not committed yet; node ID %d",
115  int(result->entry.node_id));
116  }
117  }
118  else
119  {
120  if (raft_core_.isLeader() && !node_discoverer_.hasUnknownNodes())
121  {
122  allocateNewNode(unique_id, preferred_node_id);
123  }
124  }
125  }
126 
127  /*
128  * Methods of INodeDiscoveryHandler
129  */
130  virtual bool canDiscoverNewNodes() const
131  {
132  return raft_core_.isLeader();
133  }
134 
135  virtual NodeAwareness checkNodeAwareness(NodeID node_id) const
136  {
138  raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id));
139  if (result.isConstructed())
140  {
141  return result->committed ? NodeAwarenessKnownAndCommitted : NodeAwarenessKnownButNotCommitted;
142  }
143  else
144  {
145  return NodeAwarenessUnknown;
146  }
147  }
148 
149  virtual void handleNewNodeDiscovery(const UniqueID* unique_id_or_null, NodeID node_id)
150  {
151  if (raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id)).isConstructed())
152  {
153  UAVCAN_ASSERT(0); // Such node is already known, the class that called this method should have known that
154  return;
155  }
156 
157  const UniqueID uid = (unique_id_or_null == UAVCAN_NULLPTR) ? UniqueID() : *unique_id_or_null;
158 
159  if (raft_core_.isLeader())
160  {
161  raft_core_.appendLog(uid, node_id);
162  }
163  }
164 
165  /*
166  * Methods of IRaftLeaderMonitor
167  */
168  virtual void handleLogCommitOnLeader(const protocol::dynamic_node_id::server::Entry& entry)
169  {
170  /*
171  * Maybe this node did not request allocation at all, we don't care, we publish anyway.
172  */
173  tryPublishAllocationResult(entry);
174  }
175 
176  virtual void handleLocalLeadershipChange(bool local_node_is_leader)
177  {
178  if (!local_node_is_leader)
179  {
180  return;
181  }
182 
184  raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
185 
186  if (!result.isConstructed())
187  {
188  raft_core_.appendLog(getOwnUniqueID(), node_.getNodeID());
189  }
190  }
191 
192  /*
193  * Private methods
194  */
195  bool isNodeIDTaken(const NodeID node_id) const
196  {
197  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
198  "Testing if node ID %d is taken", int(node_id.get()));
199  return raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id));
200  }
201 
202  void allocateNewNode(const UniqueID& unique_id, const NodeID preferred_node_id)
203  {
204  const NodeID allocated_node_id =
205  NodeIDSelector<Server>(this, &Server::isNodeIDTaken).findFreeNodeID(preferred_node_id);
206  if (!allocated_node_id.isUnicast())
207  {
208  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "Request ignored - no free node ID left");
209  return;
210  }
211 
212  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "New node ID allocated: %d",
213  int(allocated_node_id.get()));
214  raft_core_.appendLog(unique_id, allocated_node_id);
215  }
216 
217  void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry& entry)
218  {
219  const int res = allocation_request_manager_.broadcastAllocationResponse(entry.unique_id, entry.node_id);
220  if (res < 0)
221  {
222  tracer_.onEvent(TraceError, res);
223  node_.registerInternalFailure("Dynamic allocation response");
224  }
225  }
226 
227 public:
229  IStorageBackend& storage,
230  IEventTracer& tracer)
231  : AbstractServer(node, tracer)
232  , raft_core_(node, storage, tracer, *this)
233  { }
234 
235  int init(const UniqueID& own_unique_id,
236  const uint8_t cluster_size = ClusterManager::ClusterSizeUnknown,
238  {
239  /*
240  * Initializing Raft core first, because the next step requires Log to be loaded
241  */
242  int res = raft_core_.init(cluster_size, priority);
243  if (res < 0)
244  {
245  return res;
246  }
247 
248  /*
249  * Common logic
250  */
251  res = AbstractServer::init(own_unique_id, priority);
252  if (res < 0)
253  {
254  return res;
255  }
256 
257  /*
258  * Making sure that the server is started with the same node ID
259  */
260  const LazyConstructor<RaftCore::LogEntryInfo> own_log_entry =
261  raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
262 
263  if (own_log_entry.isConstructed())
264  {
265  if (own_log_entry->entry.unique_id != getOwnUniqueID())
266  {
267  return -ErrInvalidConfiguration;
268  }
269  }
270 
271  return 0;
272  }
273 
274  Log::Index getNumAllocations() const { return raft_core_.getNumAllocations(); }
275 
279  const RaftCore& getRaftCore() const { return raft_core_; }
280 };
281 
287 {
289 
291 
294 
297 
299 
302 
304 
306  {
310 
312  : next_index(0)
313  , match_index(0)
314  { }
315  } followers[ClusterManager::MaxClusterSize - 1];
316 
318  : cluster_size (s.getRaftCore().getClusterManager().getClusterSize())
319  , state (s.getRaftCore().getServerState())
320  , last_log_index (s.getRaftCore().getPersistentState().getLog().getLastIndex())
321  , commit_index (s.getRaftCore().getCommitIndex())
322  , last_log_term (0) // See below
323  , current_term (s.getRaftCore().getPersistentState().getCurrentTerm())
324  , voted_for (s.getRaftCore().getPersistentState().getVotedFor())
325  , last_activity_timestamp(s.getRaftCore().getLastActivityTimestamp())
326  , randomized_timeout (s.getRaftCore().getRandomizedTimeout())
327  , num_unknown_nodes (s.getNodeDiscoverer().getNumUnknownNodes())
328  {
329  const Entry* const e = s.getRaftCore().getPersistentState().getLog().getEntryAtIndex(last_log_index);
331  if (e != UAVCAN_NULLPTR)
332  {
333  last_log_term = e->term;
334  }
335 
336  for (uint8_t i = 0; i < (cluster_size - 1U); i++)
337  {
338  const ClusterManager& mgr = s.getRaftCore().getClusterManager();
339  const NodeID node_id = mgr.getRemoteServerNodeIDAtIndex(i);
340  if (node_id.isUnicast())
341  {
342  followers[i].node_id = node_id;
343  followers[i].next_index = mgr.getServerNextIndex(node_id);
344  followers[i].match_index = mgr.getServerMatchIndex(node_id);
345  }
346  }
347  }
348 };
349 
350 }
351 }
352 }
353 
354 #endif // Include guard
std::uint8_t uint8_t
Definition: std.hpp:24
void appendLog(const Entry::FieldTypes::unique_id &unique_id, NodeID node_id)
Definition: raft_core.hpp:832
virtual void handleAllocationRequest(const UniqueID &unique_id, const NodeID preferred_node_id)
virtual void handleLogCommitOnLeader(const protocol::dynamic_node_id::server::Entry &entry)
Server(INode &node, IStorageBackend &storage, IEventTracer &tracer)
void allocateNewNode(const UniqueID &unique_id, const NodeID preferred_node_id)
LazyConstructor< LogEntryInfo > traverseLogFromEndUntil(const Predicate &predicate) const
Definition: raft_core.hpp:878
bool isUnicast() const
Definition: transfer.hpp:136
void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry &entry)
virtual NodeAwareness checkNodeAwareness(NodeID node_id) const
int init(const UniqueID &own_unique_id, const uint8_t cluster_size=ClusterManager::ClusterSizeUnknown, const TransferPriority priority=TransferPriority::OneHigherThanLowest)
int init(const UniqueID &own_unique_id, const TransferPriority priority)
int init(const uint8_t cluster_size, const TransferPriority priority)
Definition: raft_core.hpp:731
const Entry * getEntryAtIndex(Index index) const
Definition: log.hpp:285
Log::Index getServerNextIndex(NodeID server_node_id) const
uint8_t get() const
Definition: transfer.hpp:132
protocol::dynamic_node_id::server::Entry::FieldTypes::unique_id UniqueID
const ClusterManager & getClusterManager() const
Definition: raft_core.hpp:906
virtual void handleLocalLeadershipChange(bool local_node_is_leader)
virtual void handleNewNodeDiscovery(const UniqueID *unique_id_or_null, NodeID node_id)
TraceError
Definition: event.hpp:23
static const TransferPriority OneHigherThanLowest
Definition: transfer.hpp:40
const PersistentState & getPersistentState() const
Definition: raft_core.hpp:905
Log::Index getServerMatchIndex(NodeID server_node_id) const


uavcan_communicator
Author(s):
autogenerated on Wed Jan 11 2023 03:59:39