acroname.py
Go to the documentation of this file.
1 """
2 Brainstem Acroname Hub
3 
4 See documentation for brainstem here:
5 https://acroname.com/reference/python/index.html
6 """
7 
8 from rspy import log
9 
10 
11 if __name__ == '__main__':
12  import os, sys, getopt
13  def usage():
14  ourname = os.path.basename( sys.argv[0] )
15  print( 'Syntax: acroname [options]' )
16  print( ' Control the acroname USB hub' )
17  print( 'Options:' )
18  print( ' --recycle Recycle all ports' )
19  sys.exit(2)
20  try:
21  opts,args = getopt.getopt( sys.argv[1:], '',
22  longopts = [ 'help', 'recycle' ])
23  except getopt.GetoptError as err:
24  print( '-F-', err ) # something like "option -a not recognized"
25  usage()
26  if args or not opts:
27  usage()
28  # See the end of the file for all the option handling
29 
30 
31 try:
32  import brainstem
33 except ModuleNotFoundError:
34  log.w( 'No acroname library is available!' )
35  raise
36 
37 hub = None
38 
39 
40 class NoneFoundError( RuntimeError ):
41  """
42  """
43  def __init__( self, message = None ):
44  super().__init__( self, message or 'no Acroname module found' )
45 
46 
47 def discover():
48  """
49  Return all Acroname module specs in a list. Raise NoneFoundError if one is not found!
50  """
51 
52  log.d( 'discovering Acroname modules ...' )
53  # see https://acroname.com/reference/_modules/brainstem/module.html#Module.discoverAndConnect
54  try:
55  log.debug_indent()
56  specs = brainstem.discover.findAllModules( brainstem.link.Spec.USB )
57  if not specs:
58  raise NoneFoundError()
59  for spec in specs:
60  log.d( '...', spec )
61  finally:
62  log.debug_unindent()
63 
64  return specs
65 
66 
67 def connect( spec = None ):
68  """
69  Connect to the hub. Raises RuntimeError on failure
70  """
71 
72  global hub
73  if not hub:
74  hub = brainstem.stem.USBHub3p()
75 
76  if spec:
77  specs = [spec]
78  else:
79  specs = discover()
80  spec = specs[0]
81 
82  result = hub.connectFromSpec( spec )
83  if result != brainstem.result.Result.NO_ERROR:
84  raise RuntimeError( "failed to connect to acroname (result={})".format( result ))
85  elif len(specs) > 1:
86  log.d( 'connected to', spec )
87 
88 
90  try:
91  connect()
92  return True
93  except Exception:
94  return False
95 
96 
97 def disconnect():
98  global hub
99  hub.disconnect()
100  del hub
101  hub = None
102 
103 
104 def ports():
105  """
106  :return: a list of all ports currently occupied (and enabled)
107  """
108  occupied_ports = []
109  for port in range(8):
110  if port_power( port ) > 0.0:
111  occupied_ports.append( port )
112  return occupied_ports
113 
114 
115 def is_port_enabled( port ):
116  return port_state( port ) == "OK"
117 
118 
119 def port_state( port ):
120  if port < 0 or port > 7:
121  raise ValueError( "port number must be [0-7]" )
122  #
123  global hub
124  status = hub.usb.getPortState( port )
125  #
126  if status.value == 0:
127  return "Disabled"
128  if status.value == 11:
129  return "Disconnected"
130  if status.value > 100:
131  return "OK"
132  return "Unknown Error ({})".format( status.value )
133 
134 
135 def enable_ports( ports = None, disable_other_ports = False, sleep_on_change = 0 ):
136  """
137  Set enable state to provided ports
138  :param ports: List of port numbers; if not provided, enable all ports
139  :param disable_other_ports: if True, the ports not in the list will be disabled
140  :param sleep_on_change: Number of seconds to sleep if any change is made
141  :return: True if no errors found, False otherwise
142  """
143  global hub
144  result = True
145  changed = False
146  for port in range(0, 8):
147  #
148  if ports is None or port in ports:
149  if not is_port_enabled( port ):
150  action_result = hub.usb.setPortEnable( port )
151  if action_result != brainstem.result.Result.NO_ERROR:
152  result = False
153  else:
154  changed = True
155  #
156  elif disable_other_ports:
157  if is_port_enabled( port ):
158  action_result = hub.usb.setPortDisable( port )
159  if action_result != brainstem.result.Result.NO_ERROR:
160  result = False
161  else:
162  changed = True
163  #
164  if changed and sleep_on_change:
165  import time
166  time.sleep( sleep_on_change )
167  #
168  return result
169 
170 
171 def disable_ports( ports ):
172  """
173  :param ports: List of port numbers
174  :return: True if no errors found, False otherwise
175  """
176  global hub
177  result = True
178  for port in ports:
179  #
180  action_result = hub.usb.setPortDisable( port )
181  if action_result != brainstem.result.Result.NO_ERROR:
182  result = False
183  #
184  return result
185 
186 
187 def recycle_ports( portlist = None, timeout = 2 ):
188  """
189  Disable and enable a port
190  :param timeout: how long to wait before re-enabling
191  :return: True if everything OK, False otherwise
192  """
193  if portlist is None:
194  portlist = ports()
195  #
196  result = disable_ports( portlist )
197  #
198  import time
199  time.sleep( timeout )
200  #
201  result = enable_ports( portlist ) and result
202  #
203  return result
204 
205 
206 def set_ports_usb2( portlist = None, timeout = 100e-3 ):
207  """
208  Set USB ports to USB2
209  """
210  if portlist is None:
211  portlist = ports()
212  #
213  recycle_ports( portlist, timeout = timeout )
214  #
215  global hub
216  for port in portlist:
217  hub.usb.setSuperSpeedDataEnable( port )
218  hub.usb.setHiSpeedDataEnable( port )
219  hub.usb.setSuperSpeedDataDisable( port )
220 
221 
222 def set_ports_usb3( portlist = None, timeout = 100e-3 ):
223  """
224  Set USB ports to support USB3
225  """
226  if portlist is None:
227  portlist = ports()
228  #
229  recycle_ports( portlist, timeout = timeout )
230  #
231  global hub
232  for port in portlist:
233  hub.usb.setSuperSpeedDataEnable( port )
234  hub.usb.setHiSpeedDataEnable( port )
235  hub.usb.setHiSpeedDataDisable( port )
236 
237 
238 def port_power( port ):
239  """
240  """
241  if port < 0 or port > 7:
242  raise ValueError("port number can be only within 0 and 7 (inclusive)")
243  #
244  global hub
245  micro_volt = hub.usb.getPortVoltage( port )
246  micro_curr = hub.usb.getPortCurrent( port )
247  volt = float(micro_volt.value) / 10.0 ** 6
248  amps = float(micro_curr.value) / 10.0 ** 6
249  #
250  return volt * amps
251 
252 
253 def get_port_from_usb( first_usb_index, second_usb_index ):
254  """
255  Based on last two USB location index, provide the port number
256  """
257  acroname_port_usb_map = {(4, 4): 0,
258  (4, 3): 1,
259  (4, 2): 2,
260  (4, 1): 3,
261  (3, 4): 4,
262  (3, 3): 5,
263  (3, 2): 6,
264  (3, 1): 7,
265  }
266  return acroname_port_usb_map[(first_usb_index, second_usb_index)]
267 
268 
269 if __name__ == '__main__':
270  for opt,arg in opts:
271  if opt in ('--recycle'):
272  connect()
273  enable_ports() # so ports() will return all
274  recycle_ports()
def is_port_enabled(port)
Definition: acroname.py:115
def enable_ports(ports=None, disable_other_ports=False, sleep_on_change=0)
Definition: acroname.py:135
def port_power(port)
Definition: acroname.py:238
uvc_xu_option< int > super
Definition: l500-options.h:32
def port_state(port)
Definition: acroname.py:119
def set_ports_usb3(portlist=None, timeout=100e-3)
Definition: acroname.py:222
def ports()
Definition: acroname.py:104
def disable_ports(ports)
Definition: acroname.py:171
def usage()
Definition: acroname.py:13
def __init__(self, message=None)
Definition: acroname.py:43
def disconnect()
Definition: acroname.py:97
def is_connected()
Definition: acroname.py:89
def set_ports_usb2(portlist=None, timeout=100e-3)
Definition: acroname.py:206
def connect(spec=None)
Definition: acroname.py:67
def get_port_from_usb(first_usb_index, second_usb_index)
Definition: acroname.py:253
static std::string print(const transformation &tf)
def recycle_ports(portlist=None, timeout=2)
Definition: acroname.py:187
def discover()
Definition: acroname.py:47


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:45:06