cluster_manager.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
3  */
4 
5 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
6 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
7 
9 #include <uavcan/debug.hpp>
11 #include <uavcan/node/timer.hpp>
18 // UAVCAN types
19 #include <uavcan/protocol/dynamic_node_id/server/Discovery.hpp>
20 
21 namespace uavcan
22 {
23 namespace dynamic_node_id_server
24 {
25 namespace distributed
26 {
30 class ClusterManager : private TimerBase
31 {
32 public:
33  enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize };
34 
35 private:
37  void (ClusterManager::*)
40 
41  struct Server
42  {
46 
48  : next_index(0)
49  , match_index(0)
50  { }
51 
52  void resetIndices(const Log& log)
53  {
54  next_index = Log::Index(log.getLastIndex() + 1U);
55  match_index = 0;
56  }
57  };
58 
61  const Log& log_;
62 
65 
67 
70 
71  static IStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; }
72 
73  INode& getNode() { return discovery_sub_.getNode(); }
74  const INode& getNode() const { return discovery_sub_.getNode(); }
75 
76  const Server* findServer(NodeID node_id) const { return const_cast<ClusterManager*>(this)->findServer(node_id); }
78  {
79  for (uint8_t i = 0; i < num_known_servers_; i++)
80  {
81  UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
82  if (servers_[i].node_id == node_id)
83  {
84  return &servers_[i];
85  }
86  }
87  return UAVCAN_NULLPTR;
88  }
89 
90  virtual void handleTimerEvent(const TimerEvent&)
91  {
93 
95 
96  /*
97  * Filling the message
98  */
99  Discovery msg;
100  msg.configured_cluster_size = cluster_size_;
101 
102  msg.known_nodes.push_back(getNode().getNodeID().get()); // Putting ourselves at index 0
103 
104  for (uint8_t i = 0; i < num_known_servers_; i++)
105  {
106  UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
107  msg.known_nodes.push_back(servers_[i].node_id.get());
108  }
109 
110  UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1));
111 
112  /*
113  * Broadcasting
114  */
115  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
116  "Broadcasting Discovery message; known nodes: %d of %d",
117  int(msg.known_nodes.size()), int(cluster_size_));
118 
119  const int res = discovery_pub_.broadcast(msg);
120  if (res < 0)
121  {
122  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Discovery broadcst failed: %d", res);
123  getNode().registerInternalFailure("Raft discovery broadcast");
124  }
125 
126  /*
127  * Termination condition
128  */
129  if (isClusterDiscovered())
130  {
131  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
132  "Discovery broadcasting timer stopped");
133  stop();
134  }
135  }
136 
138  {
140 
141  /*
142  * Validating cluster configuration
143  * If there's a case of misconfiguration, the message will be ignored.
144  */
145  if (msg.configured_cluster_size != cluster_size_)
146  {
147  tracer_.onEvent(TraceRaftBadClusterSizeReceived, msg.configured_cluster_size);
148  getNode().registerInternalFailure("Bad Raft cluster size");
149  return;
150  }
151 
152  /*
153  * Updating the set of known servers
154  */
155  for (uint8_t i = 0; i < msg.known_nodes.size(); i++)
156  {
157  if (isClusterDiscovered())
158  {
159  break;
160  }
161 
162  const NodeID node_id(msg.known_nodes[i]);
163  if (node_id.isUnicast() && !isKnownServer(node_id))
164  {
165  addServer(node_id);
166  }
167  }
168 
169  /*
170  * Publishing a new Discovery request if the publishing server needs to learn about more servers.
171  */
172  if (msg.configured_cluster_size > msg.known_nodes.size())
173  {
175  }
176  }
177 
179  {
180  if (!isRunning())
181  {
182  startPeriodic(MonotonicDuration::fromMSec(Discovery::BROADCASTING_PERIOD_MS));
183  }
184  }
185 
186 public:
187  enum { ClusterSizeUnknown = 0 };
188 
194  ClusterManager(INode& node, IStorageBackend& storage, const Log& log, IEventTracer& tracer)
195  : TimerBase(node)
196  , storage_(storage)
197  , tracer_(tracer)
198  , log_(log)
201  , cluster_size_(0)
202  , num_known_servers_(0)
203  { }
204 
210  int init(const uint8_t init_cluster_size, const TransferPriority priority)
211  {
212  /*
213  * Figuring out the cluster size
214  */
215  if (init_cluster_size == ClusterSizeUnknown)
216  {
217  // Reading from the storage
219  uint32_t value = 0;
220  int res = io.get(getStorageKeyForClusterSize(), value);
221  if (res < 0)
222  {
223  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
224  "Cluster size is neither configured nor stored in the storage");
225  return res;
226  }
227  if ((value == 0) || (value > MaxClusterSize))
228  {
229  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Cluster size is invalid");
230  return -ErrInvalidConfiguration;
231  }
232  cluster_size_ = static_cast<uint8_t>(value);
233  }
234  else
235  {
236  if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize))
237  {
238  return -ErrInvalidParam;
239  }
240  cluster_size_ = init_cluster_size;
241 
242  // Writing the storage
244  uint32_t value = init_cluster_size;
245  int res = io.setAndGetBack(getStorageKeyForClusterSize(), value);
246  if ((res < 0) || (value != init_cluster_size))
247  {
248  UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Failed to store cluster size");
249  return -ErrFailure;
250  }
251  }
252 
254 
257 
258  /*
259  * Initializing pub/sub and timer
260  */
261  int res = discovery_pub_.init(priority);
262  if (res < 0)
263  {
264  return res;
265  }
266 
268  if (res < 0)
269  {
270  return res;
271  }
272 
274 
275  /*
276  * Misc
277  */
279  return 0;
280  }
281 
285  void addServer(NodeID node_id)
286  {
288  if (!isKnownServer(node_id) && node_id.isUnicast())
289  {
293  num_known_servers_ = static_cast<uint8_t>(num_known_servers_ + 1U);
294  }
295  else
296  {
297  UAVCAN_ASSERT(0);
298  }
299  }
300 
304  bool isKnownServer(NodeID node_id) const
305  {
306  if (node_id == getNode().getNodeID())
307  {
308  return true;
309  }
310  for (uint8_t i = 0; i < num_known_servers_; i++)
311  {
312  UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
313  UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID());
314  if (servers_[i].node_id == node_id)
315  {
316  return true;
317  }
318  }
319  return false;
320  }
321 
327  {
328  if (index < num_known_servers_)
329  {
330  return servers_[index].node_id;
331  }
332  return NodeID();
333  }
334 
338  Log::Index getServerNextIndex(NodeID server_node_id) const
339  {
340  const Server* const s = findServer(server_node_id);
341  if (s != UAVCAN_NULLPTR)
342  {
343  return s->next_index;
344  }
345  UAVCAN_ASSERT(0);
346  return 0;
347  }
348 
349  void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
350  {
351  Server* const s = findServer(server_node_id);
352  if (s != UAVCAN_NULLPTR)
353  {
354  s->next_index = Log::Index(s->next_index + increment);
355  }
356  else
357  {
358  UAVCAN_ASSERT(0);
359  }
360  }
361 
362  void decrementServerNextIndex(NodeID server_node_id)
363  {
364  Server* const s = findServer(server_node_id);
365  if (s != UAVCAN_NULLPTR)
366  {
367  s->next_index--;
368  }
369  else
370  {
371  UAVCAN_ASSERT(0);
372  }
373  }
374 
378  Log::Index getServerMatchIndex(NodeID server_node_id) const
379  {
380  const Server* const s = findServer(server_node_id);
381  if (s != UAVCAN_NULLPTR)
382  {
383  return s->match_index;
384  }
385  UAVCAN_ASSERT(0);
386  return 0;
387  }
388 
389  void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
390  {
391  Server* const s = findServer(server_node_id);
392  if (s != UAVCAN_NULLPTR)
393  {
394  s->match_index = match_index;
395  }
396  else
397  {
398  UAVCAN_ASSERT(0);
399  }
400  }
401 
406  {
407  for (uint8_t i = 0; i < num_known_servers_; i++)
408  {
409  UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
411  }
412  }
413 
419 
424  uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); }
425 
426  bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
427 };
428 
429 }
430 }
431 }
432 
433 #endif // Include guard
uavcan::dynamic_node_id_server::distributed::ClusterManager::getClusterSize
uint8_t getClusterSize() const
Definition: cluster_manager.hpp:423
uavcan::dynamic_node_id_server::distributed::ClusterManager::isClusterDiscovered
bool isClusterDiscovered() const
Definition: cluster_manager.hpp:426
uavcan::TimerBase
Definition: timer.hpp:46
subscriber.hpp
UAVCAN_NULLPTR
#define UAVCAN_NULLPTR
Definition: libuavcan/libuavcan/include/uavcan/build_config.hpp:51
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server::resetIndices
void resetIndices(const Log &log)
Definition: cluster_manager.hpp:52
uavcan::dynamic_node_id_server::distributed::ClusterManager::servers_
Server servers_[MaxClusterSize - 1]
Minus one because the local server is not listed there.
Definition: cluster_manager.hpp:66
uavcan::dynamic_node_id_server::distributed::ClusterManager::handleTimerEvent
virtual void handleTimerEvent(const TimerEvent &)
Definition: cluster_manager.hpp:90
uavcan::dynamic_node_id_server::distributed::ClusterManager::ClusterManager
ClusterManager(INode &node, IStorageBackend &storage, const Log &log, IEventTracer &tracer)
Definition: cluster_manager.hpp:194
debug.hpp
uavcan::uint32_t
std::uint32_t uint32_t
Definition: std.hpp:26
uavcan::Subscriber
Definition: subscriber.hpp:45
uavcan::Publisher< Discovery >
uavcan::dynamic_node_id_server::distributed::ClusterManager::decrementServerNextIndex
void decrementServerNextIndex(NodeID server_node_id)
Definition: cluster_manager.hpp:362
get
ROSCPP_DECL bool get(const std::string &key, bool &b)
uavcan::dynamic_node_id_server::distributed::ClusterManager::DiscoveryCallback
MethodBinder< ClusterManager *, void(ClusterManager::*)(const ReceivedDataStructure< Discovery > &)> DiscoveryCallback
Definition: cluster_manager.hpp:39
uavcan::dynamic_node_id_server::distributed::Log::Index
uint8_t Index
Definition: log.hpp:28
uavcan::dynamic_node_id_server::distributed::ClusterManager::getRemoteServerNodeIDAtIndex
NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const
Definition: cluster_manager.hpp:326
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server::match_index
Log::Index match_index
Definition: cluster_manager.hpp:45
uavcan::NodeID::get
uint8_t get() const
Definition: transfer.hpp:132
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server::node_id
NodeID node_id
Definition: cluster_manager.hpp:43
TraceRaftNewServerDiscovered
TraceRaftNewServerDiscovered
Definition: event.hpp:33
uavcan::NodeID
Definition: transfer.hpp:112
uavcan::dynamic_node_id_server::StorageMarshaller
Definition: storage_marshaller.hpp:25
uavcan::dynamic_node_id_server::distributed::ClusterManager::discovery_pub_
Publisher< Discovery > discovery_pub_
Definition: cluster_manager.hpp:64
uavcan::DurationBase< MonotonicDuration >::fromMSec
static MonotonicDuration fromMSec(int64_t ms)
Definition: time.hpp:41
event.hpp
uavcan::dynamic_node_id_server::StorageMarshaller::setAndGetBack
int setAndGetBack(const IStorageBackend::String &key, uint32_t &inout_value)
Definition: storage_marshaller.hpp:62
uavcan::dynamic_node_id_server::distributed::ClusterManager::cluster_size_
uint8_t cluster_size_
Definition: cluster_manager.hpp:68
uavcan::ReceivedDataStructure
Definition: generic_subscriber.hpp:39
uavcan::dynamic_node_id_server::distributed::ClusterManager::getServerNextIndex
Log::Index getServerNextIndex(NodeID server_node_id) const
Definition: cluster_manager.hpp:338
uavcan::dynamic_node_id_server::StorageMarshaller::get
int get(const IStorageBackend::String &key, uint32_t &out_value) const
Definition: storage_marshaller.hpp:90
UAVCAN_TRACE
#define UAVCAN_TRACE(...)
Definition: libuavcan/libuavcan/include/uavcan/debug.hpp:31
uavcan::dynamic_node_id_server::distributed::ClusterManager::isKnownServer
bool isKnownServer(NodeID node_id) const
Definition: cluster_manager.hpp:304
types.hpp
uavcan::dynamic_node_id_server::distributed::ClusterManager::getNode
INode & getNode()
Definition: cluster_manager.hpp:73
uavcan::dynamic_node_id_server::distributed::ClusterManager::findServer
Server * findServer(NodeID node_id)
Definition: cluster_manager.hpp:77
uavcan::dynamic_node_id_server::distributed::ClusterManager::log_
const Log & log_
Definition: cluster_manager.hpp:61
publisher.hpp
uavcan::dynamic_node_id_server::distributed::ClusterManager::incrementServerNextIndexBy
void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
Definition: cluster_manager.hpp:349
timer.hpp
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server
Definition: cluster_manager.hpp:41
uavcan::dynamic_node_id_server::distributed::ClusterManager::getServerMatchIndex
Log::Index getServerMatchIndex(NodeID server_node_id) const
Definition: cluster_manager.hpp:378
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server::Server
Server()
Definition: cluster_manager.hpp:47
uavcan::dynamic_node_id_server::distributed::ClusterManager::storage_
IStorageBackend & storage_
Definition: cluster_manager.hpp:59
uavcan::TimerBase::startPeriodic
void startPeriodic(MonotonicDuration period)
Definition: uc_timer.cpp:42
uavcan::Array
Definition: array.hpp:424
uavcan::INode::registerInternalFailure
virtual void registerInternalFailure(const char *msg)=0
uavcan::dynamic_node_id_server::distributed::ClusterManager::getStorageKeyForClusterSize
static IStorageBackend::String getStorageKeyForClusterSize()
Definition: cluster_manager.hpp:71
uavcan::uint8_t
std::uint8_t uint8_t
Definition: std.hpp:24
uavcan::dynamic_node_id_server::IStorageBackend
Definition: storage_backend.hpp:22
uavcan::dynamic_node_id_server::distributed::ClusterManager::ClusterSizeUnknown
@ ClusterSizeUnknown
Definition: cluster_manager.hpp:187
uavcan::dynamic_node_id_server::distributed::ClusterManager::tracer_
IEventTracer & tracer_
Definition: cluster_manager.hpp:60
uavcan::dynamic_node_id_server::distributed::ClusterManager::resetAllServerIndices
void resetAllServerIndices()
Definition: cluster_manager.hpp:405
uavcan::NodeID::isUnicast
bool isUnicast() const
Definition: transfer.hpp:136
TraceRaftDiscoveryBroadcast
TraceRaftDiscoveryBroadcast
Definition: event.hpp:32
method_binder.hpp
uavcan::INode
Definition: abstract_node.hpp:19
uavcan::dynamic_node_id_server::distributed::ClusterManager::MaxClusterSize
@ MaxClusterSize
Definition: cluster_manager.hpp:33
uavcan::TransferPriority
Definition: transfer.hpp:28
TraceRaftClusterSizeInited
TraceRaftClusterSizeInited
Definition: event.hpp:36
build_config.hpp
uavcan::dynamic_node_id_server::distributed::ClusterManager::getNode
const INode & getNode() const
Definition: cluster_manager.hpp:74
uavcan::DeadlineHandler::isRunning
bool isRunning() const
Definition: uc_scheduler.cpp:32
uavcan::dynamic_node_id_server::distributed::ClusterManager::Server::next_index
Log::Index next_index
Definition: cluster_manager.hpp:44
uavcan::DeadlineHandler::stop
void stop()
Definition: uc_scheduler.cpp:27
uavcan::dynamic_node_id_server::IEventTracer::onEvent
virtual void onEvent(TraceCode event_code, int64_t event_argument)=0
uavcan::GenericPublisher::init
int init()
Definition: generic_publisher.hpp:118
uavcan::dynamic_node_id_server::IEventTracer
Definition: event.hpp:90
uavcan::MethodBinder
Definition: method_binder.hpp:20
TraceRaftBadClusterSizeReceived
TraceRaftBadClusterSizeReceived
Definition: event.hpp:37
uavcan::dynamic_node_id_server::distributed::ClusterManager::discovery_sub_
Subscriber< Discovery, DiscoveryCallback > discovery_sub_
Definition: cluster_manager.hpp:63
pyuavcan_v0.dsdl.signature.s
s
Definition: signature.py:73
log.hpp
uavcan::dynamic_node_id_server::distributed::ClusterManager::init
int init(const uint8_t init_cluster_size, const TransferPriority priority)
Definition: cluster_manager.hpp:210
uavcan::dynamic_node_id_server::distributed::ClusterManager::setServerMatchIndex
void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
Definition: cluster_manager.hpp:389
uavcan::dynamic_node_id_server::distributed::ClusterManager::findServer
const Server * findServer(NodeID node_id) const
Definition: cluster_manager.hpp:76
uavcan::Publisher::broadcast
int broadcast(const DataType &message)
Definition: publisher.hpp:52
uavcan::dynamic_node_id_server::distributed::ClusterManager::getQuorumSize
uint8_t getQuorumSize() const
Definition: cluster_manager.hpp:424
uavcan::dynamic_node_id_server::distributed::Log
Definition: log.hpp:25
TraceRaftDiscoveryReceived
TraceRaftDiscoveryReceived
Definition: event.hpp:35
uavcan::dynamic_node_id_server::distributed::ClusterManager::getNumKnownServers
uint8_t getNumKnownServers() const
Definition: cluster_manager.hpp:418
pyuavcan_v0.introspect.node
node
Definition: introspect.py:398
uavcan::dynamic_node_id_server::distributed::ClusterManager::num_known_servers_
uint8_t num_known_servers_
Definition: cluster_manager.hpp:69
uavcan
Definition: libuavcan/libuavcan/include/uavcan/build_config.hpp:204
uavcan::ReceivedDataStructure::getSrcNodeID
NodeID getSrcNodeID() const
Definition: generic_subscriber.hpp:75
uavcan::dynamic_node_id_server::distributed::ClusterManager::startDiscoveryPublishingTimerIfNotRunning
void startDiscoveryPublishingTimerIfNotRunning()
Definition: cluster_manager.hpp:178
uavcan::dynamic_node_id_server::distributed::ClusterManager::handleDiscovery
void handleDiscovery(const ReceivedDataStructure< Discovery > &msg)
Definition: cluster_manager.hpp:137
uavcan::dynamic_node_id_server::distributed::ClusterManager::addServer
void addServer(NodeID node_id)
Definition: cluster_manager.hpp:285
storage_marshaller.hpp
UAVCAN_ASSERT
#define UAVCAN_ASSERT(x)
Definition: libuavcan/libuavcan/include/uavcan/build_config.hpp:184
uavcan::TimerEvent
Definition: timer.hpp:32
uavcan::dynamic_node_id_server::distributed::ClusterManager
Definition: cluster_manager.hpp:30
uavcan::dynamic_node_id_server::distributed::Log::getLastIndex
Index getLastIndex() const
Definition: log.hpp:291


uavcan_communicator
Author(s):
autogenerated on Fri Dec 13 2024 03:10:02