dynamic_node_id.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
3 #
4 # This software is distributed under the terms of the MIT License.
5 #
6 # Author: Ben Dyer <ben_dyer@mac.com>
7 # Pavel Kirienko <pavel.kirienko@zubax.com>
8 #
9 
10 from __future__ import division, absolute_import, print_function, unicode_literals
11 import time
12 import sqlite3
13 from logging import getLogger
14 import pyuavcan_v0
15 from pyuavcan_v0 import UAVCANException
16 
17 
18 logger = getLogger(__name__)
19 
20 
22  return ' '.join(['%02X' % x for x in bytearray(uid)]) if uid else None
23 
24 
25 class CentralizedServer(object):
26  QUERY_TIMEOUT = pyuavcan_v0.protocol.dynamic_node_id.Allocation().FOLLOWUP_TIMEOUT_MS / 1000 # @UndefinedVariable
27  DEFAULT_NODE_ID_RANGE = 1, 125
28  DATABASE_STORAGE_MEMORY = ':memory:'
29 
30  class AllocationTable(object):
31  def __init__(self, path):
32  # Disabling same thread check on the assumption that the developer knows what they are doing.
33  self.db = sqlite3.connect(path, check_same_thread=False) # @UndefinedVariable
34 
35  self._modify('''CREATE TABLE IF NOT EXISTS `allocation` (
36  `node_id` INTEGER NOT NULL UNIQUE,
37  `unique_id` blob,
38  `ts` time NOT NULL DEFAULT CURRENT_TIMESTAMP,
39  PRIMARY KEY(node_id));''')
40 
41  def _modify(self, what, *args):
42  c = self.db.cursor()
43  c.execute(what, args) # Tuple!
44  self.db.commit()
45 
46  def close(self):
47  self.db.close()
48 
49  def set(self, unique_id, node_id):
50  if unique_id is not None and unique_id == bytes([0] * len(unique_id)):
51  unique_id = None
52  if unique_id is not None:
53  unique_id = sqlite3.Binary(unique_id)
54  logger.debug('[CentralizedServer] AllocationTable update: %d %s', node_id, _unique_id_to_string(unique_id))
55  self._modify('''insert or replace into allocation (node_id, unique_id) values (?, ?);''',
56  node_id, unique_id)
57 
58  def get_node_id(self, unique_id):
59  assert isinstance(unique_id, bytes)
60  c = self.db.cursor()
61  c.execute('''select node_id from allocation where unique_id = ? order by ts desc limit 1''',
62  (unique_id,))
63  res = c.fetchone()
64  return res[0] if res else None
65 
66  def get_unique_id(self, node_id):
67  assert isinstance(node_id, int)
68  c = self.db.cursor()
69  c.execute('''select unique_id from allocation where node_id = ?''', (node_id,))
70  res = c.fetchone()
71  return res[0] if res else None
72 
73  def is_known_node_id(self, node_id):
74  assert isinstance(node_id, int)
75  c = self.db.cursor()
76  c.execute('''select count(*) from allocation where node_id = ?''', (node_id,))
77  return c.fetchone()[0] > 0
78 
79  def get_entries(self):
80  c = self.db.cursor()
81  c.execute('''select unique_id, node_id from allocation order by ts desc''')
82  return list(c.fetchall())
83 
84  def __init__(self, node, node_monitor, database_storage=None, dynamic_node_id_range=None):
85  """
86  :param node: Node instance.
87 
88  :param node_monitor: Instance of NodeMonitor.
89 
90  :param database_storage: Path to the file where the instance will keep the allocation table.
91  If not provided, the allocation table will be kept in memory.
92 
93  :param dynamic_node_id_range: Range of node ID available for dynamic allocation; defaults to [1, 125].
94  """
95  if node.is_anonymous:
96  raise UAVCANException('Dynamic node ID server cannot be launched on an anonymous node')
97 
98  self._node_monitor = node_monitor
99 
101  self._query = bytes()
103  self._node_monitor_event_handle = node_monitor.add_update_handler(self._handle_monitor_event)
104 
105  self._dynamic_node_id_range = dynamic_node_id_range or CentralizedServer.DEFAULT_NODE_ID_RANGE
106  self._handle = node.add_handler(pyuavcan_v0.protocol.dynamic_node_id.Allocation, # @UndefinedVariable
108 
109  self._allocation_table.set(node.node_info.hardware_version.unique_id.to_bytes(), node.node_id)
110 
111  # Initializing the table
112  for entry in node_monitor.find_all(lambda _: True):
113  unique_id = entry.info.hardware_version.unique_id.to_bytes() if entry.info else None
114  self._allocation_table.set(unique_id, entry.node_id)
115 
117  return self._allocation_table.get_entries()
118 
119  def _handle_monitor_event(self, event):
120  unique_id = event.entry.info.hardware_version.unique_id.to_bytes() if event.entry.info else None
121  self._allocation_table.set(unique_id, event.entry.node_id)
122 
123  def close(self):
124  """Stops the instance and closes the allocation table storage.
125  """
126  self._handle.remove()
127  self._node_monitor_event_handle.remove()
128  self._allocation_table.close()
129 
131  # TODO: request validation
132 
133  # Centralized allocator cannot co-exist with other allocators; this is a network configuration error.
134  if e.transfer.source_node_id != 0:
135  logger.warning('[CentralizedServer] Message from another allocator ignored: %r', e)
136  return
137 
138  # We can't grant allocations as long as there are undiscovered nodes - see specification
139  if not self._node_monitor.are_all_nodes_discovered():
140  logger.info('[CentralizedServer] Request ignored: not all nodes are discovered')
141  return
142 
143  # The local state must be reset after the specified timeout
144  if len(self._query) and time.monotonic() - self._query_timestamp > CentralizedServer.QUERY_TIMEOUT:
145  self._query = bytes()
146  logger.info("[CentralizedServer] Query timeout, resetting query")
147 
148  # Handling the message
149  if e.message.first_part_of_unique_id:
150  # First-phase messages trigger a second-phase query
151  self._query = e.message.unique_id.to_bytes()
152  self._query_timestamp = e.transfer.ts_monotonic
153 
154  response = pyuavcan_v0.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
155  response.first_part_of_unique_id = 0
156  response.node_id = 0
157  response.unique_id.from_bytes(self._query)
158  e.node.broadcast(response)
159 
160  logger.debug("[CentralizedServer] Got first-stage dynamic ID request for %s",
162 
163  elif len(e.message.unique_id) == 6 and len(self._query) == 6:
164  # Second-phase messages trigger a third-phase query
165  self._query += e.message.unique_id.to_bytes()
166  self._query_timestamp = e.transfer.ts_monotonic
167 
168  response = pyuavcan_v0.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
169  response.first_part_of_unique_id = 0
170  response.node_id = 0
171  response.unique_id.from_bytes(self._query)
172  e.node.broadcast(response)
173  logger.debug("[CentralizedServer] Got second-stage dynamic ID request for %s",
175 
176  elif len(e.message.unique_id) == 4 and len(self._query) == 12:
177  # Third-phase messages trigger an allocation
178  self._query += e.message.unique_id.to_bytes()
179  self._query_timestamp = e.transfer.ts_monotonic
180 
181  logger.debug("[CentralizedServer] Got third-stage dynamic ID request for %s",
183 
184  node_requested_id = e.message.node_id
185  node_allocated_id = self._allocation_table.get_node_id(self._query)
186 
187  # If an ID was requested but not allocated yet, allocate the first
188  # ID equal to or higher than the one that was requested
189  if node_requested_id and not node_allocated_id:
190  for node_id in range(node_requested_id, self._dynamic_node_id_range[1]):
191  if not self._allocation_table.is_known_node_id(node_id):
192  node_allocated_id = node_id
193  break
194 
195  # If no ID was allocated in the above step (also if the requested
196  # ID was zero), allocate the highest unallocated node ID
197  if not node_allocated_id:
198  for node_id in range(self._dynamic_node_id_range[1], self._dynamic_node_id_range[0], -1):
199  if not self._allocation_table.is_known_node_id(node_id):
200  node_allocated_id = node_id
201  break
202 
203  if node_allocated_id:
204  self._allocation_table.set(self._query, node_allocated_id)
205 
206  response = pyuavcan_v0.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
207  response.first_part_of_unique_id = 0
208  response.node_id = node_allocated_id
209  response.unique_id.from_bytes(self._query)
210  e.node.broadcast(response)
211 
212  logger.info("[CentralizedServer] Allocated node ID %d to node with unique ID %s",
213  node_allocated_id, _unique_id_to_string(self._query))
214 
215  self._query = bytes() # Resetting the state
216  else:
217  logger.error("[CentralizedServer] Couldn't allocate dynamic node ID")
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._query
_query
Definition: dynamic_node_id.py:101
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.__init__
def __init__(self, path)
Definition: dynamic_node_id.py:31
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.get_allocation_table
def get_allocation_table(self)
Definition: dynamic_node_id.py:116
pyuavcan_v0.app.dynamic_node_id._unique_id_to_string
def _unique_id_to_string(uid)
Definition: dynamic_node_id.py:21
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._handle
_handle
Definition: dynamic_node_id.py:106
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.DATABASE_STORAGE_MEMORY
string DATABASE_STORAGE_MEMORY
Definition: dynamic_node_id.py:28
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.is_known_node_id
def is_known_node_id(self, node_id)
Definition: dynamic_node_id.py:73
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.get_entries
def get_entries(self)
Definition: dynamic_node_id.py:79
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._allocation_table
_allocation_table
Definition: dynamic_node_id.py:100
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable
Definition: dynamic_node_id.py:30
pyuavcan_v0.UAVCANException
Definition: pyuavcan/pyuavcan_v0/__init__.py:45
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._node_monitor
_node_monitor
Definition: dynamic_node_id.py:98
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._query_timestamp
_query_timestamp
Definition: dynamic_node_id.py:102
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.get_node_id
def get_node_id(self, unique_id)
Definition: dynamic_node_id.py:58
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.close
def close(self)
Definition: dynamic_node_id.py:46
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.set
def set(self, unique_id, node_id)
Definition: dynamic_node_id.py:49
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._on_allocation_message
def _on_allocation_message(self, e)
Definition: dynamic_node_id.py:130
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable._modify
def _modify(self, what, *args)
Definition: dynamic_node_id.py:41
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._dynamic_node_id_range
_dynamic_node_id_range
Definition: dynamic_node_id.py:105
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.__init__
def __init__(self, node, node_monitor, database_storage=None, dynamic_node_id_range=None)
Definition: dynamic_node_id.py:84
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.get_unique_id
def get_unique_id(self, node_id)
Definition: dynamic_node_id.py:66
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._handle_monitor_event
def _handle_monitor_event(self, event)
Definition: dynamic_node_id.py:119
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.close
def close(self)
Definition: dynamic_node_id.py:123
pyuavcan_v0.app.dynamic_node_id.CentralizedServer.AllocationTable.db
db
Definition: dynamic_node_id.py:33
pyuavcan_v0.app.dynamic_node_id.CentralizedServer._node_monitor_event_handle
_node_monitor_event_handle
Definition: dynamic_node_id.py:103
pyuavcan_v0.app.dynamic_node_id.CentralizedServer
Definition: dynamic_node_id.py:25


uavcan_communicator
Author(s):
autogenerated on Fri Dec 13 2024 03:10:02