acroname.py
Go to the documentation of this file.
1 # License: Apache 2.0. See LICENSE file in root directory.
2 # Copyright(c) 2021 Intel Corporation. All Rights Reserved.
3 
4 """
5 Brainstem Acroname Hub
6 
7 See documentation for brainstem here:
8 https://acroname.com/reference/python/index.html
9 """
10 
11 from rspy import log
12 
13 
14 if __name__ == '__main__':
15  import os, sys, getopt
16  def usage():
17  ourname = os.path.basename( sys.argv[0] )
18  print( 'Syntax: acroname [options]' )
19  print( ' Control the acroname USB hub' )
20  print( 'Options:' )
21  print( ' --enable Enable all ports' )
22  print( ' --recycle Recycle all ports' )
23  sys.exit(2)
24  try:
25  opts,args = getopt.getopt( sys.argv[1:], '',
26  longopts = [ 'help', 'recycle', 'enable' ])
27  except getopt.GetoptError as err:
28  print( '-F-', err ) # something like "option -a not recognized"
29  usage()
30  if args or not opts:
31  usage()
32  # See the end of the file for all the option handling
33 
34 
35 try:
36  import brainstem
37 except ModuleNotFoundError:
38  log.w( 'No acroname library is available!' )
39  raise
40 
41 hub = None
42 
43 
44 class NoneFoundError( RuntimeError ):
45  """
46  """
47  def __init__( self, message = None ):
48  super().__init__( self, message or 'no Acroname module found' )
49 
50 
51 def discover():
52  """
53  Return all Acroname module specs in a list. Raise NoneFoundError if one is not found!
54  """
55 
56  log.d( 'discovering Acroname modules ...' )
57  # see https://acroname.com/reference/_modules/brainstem/module.html#Module.discoverAndConnect
58  try:
59  log.debug_indent()
60  specs = brainstem.discover.findAllModules( brainstem.link.Spec.USB )
61  if not specs:
62  raise NoneFoundError()
63  for spec in specs:
64  log.d( '...', spec )
65  finally:
66  log.debug_unindent()
67 
68  return specs
69 
70 
71 def connect( spec = None ):
72  """
73  Connect to the hub. Raises RuntimeError on failure
74  """
75 
76  global hub
77  if not hub:
78  hub = brainstem.stem.USBHub3p()
79 
80  if spec:
81  specs = [spec]
82  else:
83  specs = discover()
84  spec = specs[0]
85 
86  result = hub.connectFromSpec( spec )
87  if result != brainstem.result.Result.NO_ERROR:
88  raise RuntimeError( "failed to connect to acroname (result={})".format( result ))
89  elif len(specs) > 1:
90  log.d( 'connected to', spec )
91 
92 
94  """
95  Yields all hub port numbers
96  """
97  from rspy import lsusb
98  #
99  # 24ff:8013 =
100  # iManufacturer Acroname Inc.
101  # iProduct USBHub3p-3[A]
102  hubs = set( lsusb.devices_by_vendor( '24ff' ))
103  ports = set()
104  # go thru the tree and find only the top-level ones (which we should encounter first)
105  for dev,dev_port in lsusb.tree():
106  if dev not in hubs:
107  continue
108  for port in ports:
109  # ignore everything inside existing hubs - we only want the top-level
110  if dev_port.startswith( port + '.' ):
111  break
112  else:
113  ports.add( dev_port )
114  yield dev_port
115 
116 
118  try:
119  connect()
120  return True
121  except Exception:
122  return False
123 
124 
126  global hub
127  hub.disconnect()
128  del hub
129  hub = None
130 
131 
132 def all_ports():
133  """
134  :return: a list of all possible ports, even if currently unoccupied or disabled
135  """
136  return range(8)
137 
138 
139 def ports():
140  """
141  :return: a list of all ports currently occupied (and enabled)
142  """
143  occupied_ports = []
144  for port in all_ports():
145  if port_power( port ) > 0.0:
146  occupied_ports.append( port )
147  return occupied_ports
148 
149 
150 def is_port_enabled( port ):
151  return port_state( port ) == "OK"
152 
153 
154 def port_state( port ):
155  if port < 0 or port > 7:
156  raise ValueError( "port number must be [0-7]" )
157  #
158  global hub
159  status = hub.usb.getPortState( port )
160  #
161  if status.value == 0:
162  return "Disabled"
163  if status.value == 11:
164  return "Disconnected"
165  if status.value > 100:
166  return "OK"
167  return "Unknown Error ({})".format( status.value )
168 
169 
170 def enable_ports( ports = None, disable_other_ports = False, sleep_on_change = 0 ):
171  """
172  Set enable state to provided ports
173  :param ports: List of port numbers; if not provided, enable all ports
174  :param disable_other_ports: if True, the ports not in the list will be disabled
175  :param sleep_on_change: Number of seconds to sleep if any change is made
176  :return: True if no errors found, False otherwise
177  """
178  global hub
179  result = True
180  changed = False
181  for port in all_ports():
182  #
183  if ports is None or port in ports:
184  if not is_port_enabled( port ):
185  action_result = hub.usb.setPortEnable( port )
186  if action_result != brainstem.result.Result.NO_ERROR:
187  result = False
188  else:
189  changed = True
190  #
191  elif disable_other_ports:
192  if is_port_enabled( port ):
193  action_result = hub.usb.setPortDisable( port )
194  if action_result != brainstem.result.Result.NO_ERROR:
195  result = False
196  else:
197  changed = True
198  #
199  if changed and sleep_on_change:
200  import time
201  time.sleep( sleep_on_change )
202  #
203  return result
204 
205 
206 def disable_ports( ports ):
207  """
208  :param ports: List of port numbers
209  :return: True if no errors found, False otherwise
210  """
211  global hub
212  result = True
213  for port in ports:
214  #
215  action_result = hub.usb.setPortDisable( port )
216  if action_result != brainstem.result.Result.NO_ERROR:
217  result = False
218  #
219  return result
220 
221 
222 def recycle_ports( portlist = None, timeout = 2 ):
223  """
224  Disable and enable a port
225  :param timeout: how long to wait before re-enabling
226  :return: True if everything OK, False otherwise
227  """
228  if portlist is None:
229  portlist = ports()
230  #
231  result = disable_ports( portlist )
232  #
233  import time
234  time.sleep( timeout )
235  #
236  result = enable_ports( portlist ) and result
237  #
238  return result
239 
240 
241 def set_ports_usb2( portlist = None, timeout = 100e-3 ):
242  """
243  Set USB ports to USB2
244  """
245  if portlist is None:
246  portlist = ports()
247  #
248  recycle_ports( portlist, timeout = timeout )
249  #
250  global hub
251  for port in portlist:
252  hub.usb.setSuperSpeedDataEnable( port )
253  hub.usb.setHiSpeedDataEnable( port )
254  hub.usb.setSuperSpeedDataDisable( port )
255 
256 
257 def set_ports_usb3( portlist = None, timeout = 100e-3 ):
258  """
259  Set USB ports to support USB3
260  """
261  if portlist is None:
262  portlist = ports()
263  #
264  recycle_ports( portlist, timeout = timeout )
265  #
266  global hub
267  for port in portlist:
268  hub.usb.setSuperSpeedDataEnable( port )
269  hub.usb.setHiSpeedDataEnable( port )
270  hub.usb.setHiSpeedDataDisable( port )
271 
272 
273 def port_power( port ):
274  """
275  """
276  if port < 0 or port > 7:
277  raise ValueError("port number can be only within 0 and 7 (inclusive)")
278  #
279  global hub
280  micro_volt = hub.usb.getPortVoltage( port )
281  micro_curr = hub.usb.getPortCurrent( port )
282  volt = float(micro_volt.value) / 10.0 ** 6
283  amps = float(micro_curr.value) / 10.0 ** 6
284  #
285  return volt * amps
286 
287 
288 def get_port_from_usb( first_usb_index, second_usb_index ):
289  """
290  Based on last two USB location index, provide the port number
291  """
292  acroname_port_usb_map = {(4, 4): 0,
293  (4, 3): 1,
294  (4, 2): 2,
295  (4, 1): 3,
296  (3, 4): 4,
297  (3, 3): 5,
298  (3, 2): 6,
299  (3, 1): 7,
300  }
301  return acroname_port_usb_map[(first_usb_index, second_usb_index)]
302 
303 
304 if __name__ == '__main__':
305  for opt,arg in opts:
306  if opt in ('--enable'):
307  connect()
308  enable_ports() # so ports() will return all
309  elif opt in ('--recycle'):
310  connect()
311  enable_ports() # so ports() will return all
312  recycle_ports()
def is_port_enabled(port)
Definition: acroname.py:150
def enable_ports(ports=None, disable_other_ports=False, sleep_on_change=0)
Definition: acroname.py:170
def port_power(port)
Definition: acroname.py:273
def port_state(port)
Definition: acroname.py:154
def set_ports_usb3(portlist=None, timeout=100e-3)
Definition: acroname.py:257
def ports()
Definition: acroname.py:139
def disable_ports(ports)
Definition: acroname.py:206
def usage()
Definition: acroname.py:16
def __init__(self, message=None)
Definition: acroname.py:47
def disconnect()
Definition: acroname.py:125
def is_connected()
Definition: acroname.py:117
def set_ports_usb2(portlist=None, timeout=100e-3)
Definition: acroname.py:241
def all_ports()
Definition: acroname.py:132
def connect(spec=None)
Definition: acroname.py:71
def get_port_from_usb(first_usb_index, second_usb_index)
Definition: acroname.py:288
static std::string print(const transformation &tf)
def find_all_hubs()
Definition: acroname.py:93
def recycle_ports(portlist=None, timeout=2)
Definition: acroname.py:222
def discover()
Definition: acroname.py:51


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Thu Dec 22 2022 03:41:41