acroname.py
Go to the documentation of this file.
1 # License: Apache 2.0. See LICENSE file in root directory.
2 # Copyright(c) 2021 Intel Corporation. All Rights Reserved.
3 
4 """
5 Brainstem Acroname Hub
6 
7 See documentation for brainstem here:
8 https://acroname.com/reference/api/python/index.html
9 """
10 
11 from rspy import log, device_hub
12 import time
13 import platform, re
14 
15 if __name__ == '__main__':
16  import os, sys, getopt
17  def usage():
18  ourname = os.path.basename( sys.argv[0] )
19  print( 'Syntax: acroname [options]' )
20  print( ' Control the acroname USB hub' )
21  print( 'Options:' )
22  print( ' --enable Enable all ports' )
23  print( ' --disable Disable all ports' )
24  print( ' --recycle Recycle all ports' )
25  print( ' --reset Reset the acroname' )
26  sys.exit(2)
27  try:
28  opts,args = getopt.getopt( sys.argv[1:], '',
29  longopts = [ 'help', 'recycle', 'enable', 'disable', 'reset' ])
30  except getopt.GetoptError as err:
31  print( '-F-', err ) # something like "option -a not recognized"
32  usage()
33  if args or not opts:
34  usage()
35  # See the end of the file for all the option handling
36 
37 
38 try:
39  import brainstem
40 except ModuleNotFoundError:
41  log.d( 'no acroname library is available' )
42  raise
43 
44 class NoneFoundError( RuntimeError ):
45  def __init__( self, message = None ):
46  super().__init__( self, message or 'no Acroname module found' )
47 
49 
50 
51 
52  def __init__(self):
53  super().__init__()
54  if discover() is None: # raise an error if there is no hub connected
55  raise NoneFoundError()
56  self.hub = None
57  self.all_hubs = None
58 
59  def connect(self, reset = False, req_spec = None ):
60  """
61  Connect to the hub. Raises RuntimeError on failure
62  :param reset: When true, the acroname will be reset as part of the connection process
63  :param req_spec: Required spec to connect to.
64  """
65  if not self.hub:
66  self.hub = brainstem.stem.USBHub3p()
67 
68  if req_spec:
69  specs = [req_spec]
70  else:
71  specs = discover()
72 
73  spec = specs[0]
74  result = self.hub.connectFromSpec( spec )
75  if result != brainstem.result.Result.NO_ERROR:
76  raise RuntimeError( "failed to connect to Acroname (result={})".format( result ))
77  elif len(specs) > 1:
78  log.d( 'connected to', spec )
79 
80  if reset:
81  log.d("resetting Acroname...")
82  result = self.hub.system.reset()
83  # According to brainstem support:
84  # Result error is expected, so we do not check it
85  # * there is also a brainstem internal console print on Linux "error release -4"
86  # Disconnection is needed after a reset command
87  self.hub.disconnect()
88 
89  result = None
90  for i in range(10):
91  result = self.hub.connectFromSpec(spec)
92  if result != brainstem.result.Result.NO_ERROR:
93  time.sleep(1)
94  else:
95  log.d('reconnected')
96  return
97  raise RuntimeError("failed to reconnect to Acroname (result={})".format(result))
98 
99 
100  def is_connected(self):
101  return self.hub is not None and self.hub.isConnected()
102 
103 
104  def disconnect(self):
105  if self.hub:
106  self.hub.disconnect()
107  del self.hub
108  self.hub = None
109 
110 
111  def all_ports(self):
112  """
113  :return: a list of all possible ports, even if currently unoccupied or disabled
114  """
115  return range(8)
116 
117 
118  def ports(self):
119  """
120  :return: a list of all ports currently occupied (and enabled)
121  """
122  occupied_ports = []
123  for port in self.all_ports():
124  if self._port_power(port) > 0.0:
125  occupied_ports.append( port )
126  return occupied_ports
127 
128 
129  def is_port_enabled(self, port ):
130  """
131  query if the input port number is enabled through Acroname
132  It doesn't mean the device is enumerated as sometimes if the FW crashed we can have a situation where the port is "Disconnected" and not "OK"
133  :param port: port number;
134  :return: True if Acroname enabled this port, False otherwise
135  """
136 
137  return self.port_state( port ) != "Disabled"
138 
139 
140  def port_state(self, port ):
141  if port < 0 or port > 7:
142  raise ValueError( "port number must be [0-7]" )
143  #
144  status = self.hub.usb.getPortState( port )
145  #log.d("getPortState for port", port ,"return", port_state_bitmask_to_string( status.value ))
146  return self.port_state_bitmask_to_string( status.value )
147 
148 
149  def port_state_bitmask_to_string(self, bitmask ):
150  """
151  https://acroname.com/reference/devices/usbhub3p/entities/usb.html#usb-port-state
152  Bit | Port State: Result Bitwise Description
153  -----------------------------------------------
154  0 | USB Vbus Enabled - Port is Disabled/Enabled by the Acroname
155  1 | USB2 Data Enabled
156  2 | Reserved
157  3 | USB3 Data Enabled
158  4:10 | Reserved
159  11 | USB2 Device Attached
160  12 | USB3 Device Attached
161  13:18 | Reserved
162  19 | USB Error Flag
163  20 | USB2 Boost Enabled
164  21:22 | Reserved
165  23 | Device Attached
166  24:31 | Reserved
167  """
168 
169  if bitmask == 0: # Port is disabled by Acroname
170  return "Disabled"
171  if bitmask == 11: # Port is enabled but no device was detected
172  return "Disconnected"
173  if bitmask > 100: # Normally we hope it will cover "Device Attached" use cases (Note, can also be turn on when 'USB Error Flag' is on...but we havn't seen that )
174  return "OK"
175  return "Unknown Error ({})".format( bitmask )
176 
177 
178  def enable_ports(self, ports = None, disable_other_ports = False, sleep_on_change = 0 ):
179  """
180  Set enable state to provided ports
181  :param ports: List of port numbers; if not provided, enable all ports
182  :param disable_other_ports: if True, the ports not in the list will be disabled
183  :param sleep_on_change: Number of seconds to sleep if any change is made
184  :return: True if no errors found, False otherwise
185  """
186  result = True
187  changed = False
188  for port in self.all_ports():
189  #
190  if ports is None or port in ports:
191  if not self.is_port_enabled( port ):
192  #log.d( "enabling port", port)
193  action_result = self.hub.usb.setPortEnable( port )
194  if action_result != brainstem.result.Result.NO_ERROR:
195  result = False
196  log.e("Failed to enable port", port)
197  else:
198  changed = True
199  #
200  elif disable_other_ports:
201  if self.is_port_enabled( port ):
202  #log.d("disabling port", port)
203  action_result = self.hub.usb.setPortDisable( port )
204  if action_result != brainstem.result.Result.NO_ERROR:
205  result = False
206  else:
207  changed = True
208  #
209  if changed and sleep_on_change:
210  import time
211  time.sleep( sleep_on_change )
212  #
213  return result
214 
215 
216  def disable_ports(self, ports = None, sleep_on_change = 0 ):
217  """
218  :param ports: List of port numbers; if not provided, disable all ports
219  :param sleep_on_change: Number of seconds to sleep if any change is made
220  :return: True if no errors found, False otherwise
221  """
222  result = True
223  changed = False
224  for port in self.all_ports():
225  if ports is None or port in ports:
226  if self.is_port_enabled( port ):
227  #log.d("disabling port", port)
228  action_result = self.hub.usb.setPortDisable( port )
229  if action_result != brainstem.result.Result.NO_ERROR:
230  result = False
231  log.e("Failed to disable port", port)
232  else:
233  changed = True
234  if changed and sleep_on_change:
235  import time
236  time.sleep( sleep_on_change )
237  #
238  return result
239 
240 
241  def set_ports_usb2(self, portlist = None, timeout = 100e-3 ):
242  """
243  Set USB ports to USB2
244  """
245  if portlist is None:
246  portlist = self.ports()
247  #
248  self.recycle_ports( portlist, timeout = timeout )
249  #
250  for port in portlist:
251  self.hub.usb.setSuperSpeedDataEnable( port )
252  self.hub.usb.setHiSpeedDataEnable( port )
253  self.hub.usb.setSuperSpeedDataDisable( port )
254 
255 
256  def set_ports_usb3(self, portlist = None, timeout = 100e-3 ):
257  """
258  Set USB ports to support USB3
259  """
260  if portlist is None:
261  portlist = self.ports()
262  #
263  self.recycle_ports( portlist, timeout = timeout )
264  #
265  for port in portlist:
266  self.hub.usb.setSuperSpeedDataEnable( port )
267  self.hub.usb.setHiSpeedDataEnable( port )
268  self.hub.usb.setHiSpeedDataDisable( port )
269 
270 
271  def _port_power(self, port ):
272  """
273  """
274  if port < 0 or port > 7:
275  raise ValueError("port number can be only within 0 and 7 (inclusive)")
276  #
277  micro_volt = self.hub.usb.getPortVoltage( port )
278  micro_curr = self.hub.usb.getPortCurrent( port )
279  volt = float(micro_volt.value) / 10.0 ** 6
280  amps = float(micro_curr.value) / 10.0 ** 6
281  #
282  return volt * amps
283 
284  if 'windows' in platform.system().lower():
285  def get_port_by_location(self, usb_location):
286  """
287  """
288  if usb_location:
289  #
290  # T265 locations look differently...
291  match = re.fullmatch(r'Port_#(\d+)\.Hub_#(\d+)', usb_location, re.IGNORECASE)
292  if match:
293  # We don't know how to get the port from these yet!
294  return None # int(match.group(2))
295  else:
296  split_location = [int(x) for x in usb_location.split('.')]
297  # lambda helper to return the last 2 non-zero numbers, used when connecting using an additional hub
298  # ex: laptop -> hub -> acroname
299  get_last_two_digits = lambda array: tuple(
300  reversed(list(reversed([i for i in array if i != 0]))[:2]))
301  # only the last two digits are necessary
302  first_index, second_index = get_last_two_digits(split_location)
303 
304  return get_port_from_usb(first_index, second_index)
305  else:
306 
307  def get_port_by_location(self, usb_location):
308  """
309  """
310  if not self.all_hubs:
311  self.all_hubs = set(device_hub.find_all_hubs('24ff')) # 24ff is Acroname VendorID
312  if usb_location:
313  #
314  # Devices connected through an acroname will be in one of two sub-hubs under the acroname main
315  # hub. Each is a 4-port hub with a different port (4 for ports 0-3, 3 for ports 4-7):
316  # /: Bus 02.Port 1: Dev 1, Class=root_hub, Driver=xhci_hcd/6p, 10000M
317  # |__ Port 2: Dev 2, If 0, Class=Hub, Driver=hub/4p, 5000M <--- ACRONAME
318  # |__ Port 3: Dev 3, If 0, Class=Hub, Driver=hub/4p, 5000M
319  # |__ Port X: Dev, If...
320  # |__ Port Y: ...
321  # |__ Port 4: Dev 4, If 0, Class=Hub, Driver=hub/4p, 5000M
322  # |__ Port Z: ...
323  # (above is output from 'lsusb -t')
324  # For the above acroname at '2-2' (bus 2, port 2), there are at least 3 devices:
325  # 2-2.3.X
326  # 2-2.3.Y
327  # 2-2.4.Z
328  # Given the two sub-ports (3.X, 3.Y, 4.Z), we can get the port number.
329  # NOTE: some of our devices are hubs themselves! For example, the SR300 will show as '2-2.3.2.1' --
330  # we must start a known hub or else the ports we look at are meaningless...
331  #
332  for port in self.all_hubs:
333  if usb_location.startswith(port + '.'):
334  match = re.search(r'^(\d+)\.(\d+)', usb_location[len(port) + 1:])
335  if match:
336  return get_port_from_usb(int(match.group(1)), int(match.group(2)))
337 
338 specs = None
339 def discover(retries = 0):
340  """
341  Return all Acroname module specs in a list. Raise NoneFoundError if one is not found!
342  """
343  global specs
344  # see https://acroname.com/reference/_modules/brainstem/module.html#Module.discoverAndConnect
345  if specs is None:
346  log.d('discovering Acroname modules ...')
347  try:
348  log.debug_indent()
349  for i in range(retries + 1):
350  specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
351  if not specs:
352  time.sleep(1)
353  else:
354  for spec in specs:
355  log.d( '...', spec )
356  break
357  if not specs:
358  raise NoneFoundError()
359  finally:
360  log.debug_unindent()
361 
362  return specs
363 
364 
365 def get_port_from_usb(first_usb_index, second_usb_index ):
366  """
367  Based on last two USB location index, provide the port number
368  """
369  acroname_port_usb_map = {(4, 4): 0,
370  (4, 3): 1,
371  (4, 2): 2,
372  (4, 1): 3,
373  (3, 4): 4,
374  (3, 3): 5,
375  (3, 2): 6,
376  (3, 1): 7,
377  }
378  return acroname_port_usb_map[(first_usb_index, second_usb_index)]
379 
380 
381 
382 if __name__ == '__main__':
383  acroname = Acroname()
384  for opt,arg in opts:
385  if opt in ('--enable'):
386  acroname.connect()
387  acroname.enable_ports() # so ports() will return all
388  elif opt in ('--disable'):
389  acroname.connect()
390  acroname.disable_ports()
391  elif opt in ('--recycle'):
392  acroname.connect()
393  acroname.enable_ports() # so ports() will return all
394  acroname.recycle_ports()
395  elif opt in ('--reset'):
396  acroname.connect( reset = True )
rspy.acroname.NoneFoundError
Definition: acroname.py:44
rspy.device_hub.device_hub
Definition: device_hub.py:15
rspy.acroname.Acroname._port_power
def _port_power(self, port)
Definition: acroname.py:271
rspy.acroname.Acroname.disable_ports
def disable_ports(self, ports=None, sleep_on_change=0)
Definition: acroname.py:216
rspy.acroname.Acroname.connect
def connect(self, reset=False, req_spec=None)
Definition: acroname.py:59
rspy.acroname.discover
def discover(retries=0)
Definition: acroname.py:339
rspy.acroname.Acroname.get_port_by_location
def get_port_by_location(self, usb_location)
Definition: acroname.py:285
rspy.acroname.Acroname.all_ports
def all_ports(self)
Definition: acroname.py:111
rspy.acroname.get_port_from_usb
def get_port_from_usb(first_usb_index, second_usb_index)
Definition: acroname.py:365
rspy.acroname.Acroname.hub
hub
Definition: acroname.py:56
rspy.acroname.NoneFoundError.__init__
def __init__(self, message=None)
Definition: acroname.py:45
rspy.acroname.Acroname.ports
def ports(self)
Definition: acroname.py:118
rspy.acroname.usage
def usage()
Definition: acroname.py:17
rspy.acroname.Acroname
Definition: acroname.py:48
rspy.acroname.Acroname.enable_ports
def enable_ports(self, ports=None, disable_other_ports=False, sleep_on_change=0)
Definition: acroname.py:178
rspy.device_hub.device_hub.recycle_ports
def recycle_ports(self, portlist=None, timeout=2)
Definition: device_hub.py:91
rspy.acroname.Acroname.is_connected
def is_connected(self)
Definition: acroname.py:100
rspy.acroname.Acroname.port_state_bitmask_to_string
def port_state_bitmask_to_string(self, bitmask)
Definition: acroname.py:149
realdds::print
std::string print(dds_guid const &guid, dds_guid_prefix const &base_prefix, bool readable_name)
Definition: dds-guid.cpp:75
rspy.acroname.Acroname.port_state
def port_state(self, port)
Definition: acroname.py:140
rspy.acroname.Acroname.is_port_enabled
def is_port_enabled(self, port)
Definition: acroname.py:129
rspy.acroname.Acroname.all_hubs
all_hubs
Definition: acroname.py:57
rspy.acroname.Acroname.set_ports_usb2
def set_ports_usb2(self, portlist=None, timeout=100e-3)
Definition: acroname.py:241
rspy.acroname.Acroname.disconnect
def disconnect(self)
Definition: acroname.py:104
rspy.acroname.Acroname.__init__
def __init__(self)
Definition: acroname.py:52
rspy.acroname.Acroname.set_ports_usb3
def set_ports_usb3(self, portlist=None, timeout=100e-3)
Definition: acroname.py:256


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Mon Apr 22 2024 02:12:54