devices.py
Go to the documentation of this file.
1 # License: Apache 2.0. See LICENSE file in root directory.
2 # Copyright(c) 2021 Intel Corporation. All Rights Reserved.
3 
4 from rspy import log
5 import sys, os, re
6 
7 
8 
9 # We need both pyrealsense2 and acroname. We can work without acroname, but
10 # without rs no devices at all will be returned.
11 try:
12  import pyrealsense2 as rs
13  log.d( rs )
14  #
15  # Have to add site-packages, just in case: if -S was used, or parent script played with
16  # sys.path (as run-unit-tests does), then we may not have it!
17  sys.path += [os.path.join( os.path.dirname( sys.executable ), 'lib', 'site-packages')]
18  #
19  try:
20  from rspy import acroname
21  except ModuleNotFoundError:
22  # Error should have already been printed
23  # We assume there's no brainstem library, meaning no acroname either
24  log.d( 'sys.path=', sys.path )
25  acroname = None
26  #
27  sys.path = sys.path[:-1] # remove what we added
28 except ModuleNotFoundError:
29  log.w( 'No pyrealsense2 library is available! Running as if no cameras available...' )
30  import sys
31  log.d( 'sys.path=', sys.path )
32  rs = None
33  acroname = None
34 
35 import time
36 
37 _device_by_sn = dict()
38 _context = None
39 
40 
41 class Device:
42  def __init__( self, sn, dev ):
43  self._sn = sn
44  self._dev = dev
45  self._name = None
46  if dev.supports( rs.camera_info.name ):
47  self._name = dev.get_info( rs.camera_info.name )
48  self._product_line = None
49  if dev.supports( rs.camera_info.product_line ):
50  self._product_line = dev.get_info( rs.camera_info.product_line )
51  self._physical_port = dev.supports( rs.camera_info.physical_port ) and dev.get_info( rs.camera_info.physical_port ) or None
53  self._port = None
54  if acroname:
55  try:
56  self._port = _get_port_by_loc( self._usb_location )
57  except Exception as e:
58  log.e( 'Failed to get device port:', e )
59  log.d( ' physical port is', self._physical_port )
60  log.d( ' USB location is', self._usb_location )
61  self._removed = False
62 
63  @property
64  def serial_number( self ):
65  return self._sn
66 
67  @property
68  def name( self ):
69  return self._name
70 
71  @property
72  def product_line( self ):
73  return self._product_line
74 
75  @property
76  def physical_port( self ):
77  return self._physical_port
78 
79  @property
80  def usb_location( self ):
81  return self._usb_location
82 
83  @property
84  def port( self ):
85  return self._port
86 
87  @property
88  def handle( self ):
89  return self._dev
90 
91  @property
92  def enabled( self ):
93  return self._removed is False
94 
95 
96 def query( monitor_changes = True ):
97  """
98  Start a new LRS context, and collect all devices
99  :param monitor_changes: If True, devices will update dynamically as they are removed/added
100  """
101  global rs
102  if not rs:
103  return
104  #
105  # Before we can start a context and query devices, we need to enable all the ports
106  # on the acroname, if any:
107  if acroname:
108  acroname.connect() # MAY THROW!
109  acroname.enable_ports( sleep_on_change = 5 ) # make sure all connected!
110  #
111  # Get all devices, and store by serial-number
112  global _device_by_sn, _context, _port_to_sn
113  _context = rs.context()
114  _device_by_sn = dict()
115  try:
116  log.d( 'discovering devices ...' )
117  log.debug_indent()
118  for retry in range(3):
119  try:
120  devices = _context.query_devices()
121  break
122  except RuntimeError as e:
123  log.d( 'FAILED to query devices:', e )
124  if retry > 1:
125  log.e( 'FAILED to query devices', retry + 1, 'times!' )
126  raise
127  else:
128  time.sleep( 1 )
129  for dev in devices:
130  if dev.is_update_device():
131  sn = dev.get_info( rs.camera_info.firmware_update_id )
132  else:
133  sn = dev.get_info( rs.camera_info.serial_number )
134  device = Device( sn, dev )
135  _device_by_sn[sn] = device
136  log.d( '... port {}:'.format( device.port is None and '?' or device.port ), dev )
137  finally:
138  log.debug_unindent()
139  #
140  if monitor_changes:
141  _context.set_devices_changed_callback( _device_change_callback )
142 
143 
145  """
146  Called when librealsense detects a device change (see query())
147  """
148  global _device_by_sn
149  for device in _device_by_sn.values():
150  if device.enabled and info.was_removed( device.handle ):
151  log.d( 'device removed:', device.serial_number )
152  device._removed = True
153  for handle in info.get_new_devices():
154  if handle.is_update_device():
155  sn = handle.get_info( rs.camera_info.firmware_update_id )
156  else:
157  sn = handle.get_info( rs.camera_info.serial_number )
158  log.d( 'device added:', handle )
159  if sn in _device_by_sn:
160  _device_by_sn[sn]._removed = False
161  else:
162  log.d( 'New device detected!?' ) # shouldn't see new devices...
163  _device_by_sn[sn] = Device( sn, handle )
164 
165 
166 def all():
167  """
168  :return: A set of all device serial-numbers at the time of query()
169  """
170  global _device_by_sn
171  return _device_by_sn.keys()
172 
173 
174 def enabled():
175  """
176  :return: A set of all device serial-numbers that are currently enabled
177  """
178  global _device_by_sn
179  return { device.serial_number for device in _device_by_sn.values() if device.enabled }
180 
181 
182 def by_product_line( product_line ):
183  """
184  :param product_line: The product line we're interested in, as a string ("L500", etc.)
185  :return: A set of device serial-numbers
186  """
187  global _device_by_sn
188  return { device.serial_number for device in _device_by_sn.values() if device.product_line == product_line }
189 
190 
191 def by_name( name ):
192  """
193  :param name: Part of the product name to search for ("L515" would match "Intel RealSense L515")
194  :return: A set of device serial-numbers
195  """
196  global _device_by_sn
197  return { device.serial_number for device in _device_by_sn.values() if device.name and device.name.find( name ) >= 0 }
198 
199 
200 def _get_sns_from_spec( spec ):
201  """
202  Helper function for by_configuration. Yields all serial-numbers matching the given spec
203  """
204  if spec.endswith( '*' ):
205  for sn in by_product_line( spec[:-1] ):
206  yield sn
207  else:
208  for sn in by_name( spec ):
209  yield sn
210 
211 
212 def by_configuration( config ):
213  """
214  Yields the serial numbers fitting the given configuration. If configuration includes an 'each' directive
215  will yield all fitting serial numbers one at a time. Otherwise yields one set of serial numbers fitting the configuration
216 
217  :param config: A test:device line collection of arguments (e.g., [L515 D400*])
218 
219  If no device matches the configuration devices specified, a RuntimeError will be
220  raised!
221  """
222  if len( config ) == 1 and re.fullmatch( r'each\(.+\)', config[0], re.IGNORECASE ):
223  spec = config[0][5:-1]
224  for sn in _get_sns_from_spec( spec ):
225  yield { sn }
226  else:
227  sns = set()
228  for spec in config:
229  old_len = len(sns)
230  for sn in _get_sns_from_spec( spec ):
231  if sn not in sns:
232  sns.add( sn )
233  break
234  new_len = len(sns)
235  if new_len == old_len:
236  if old_len:
237  raise RuntimeError( 'no device matches configuration "' + spec + '" (after already matching ' + str(sns) + ')' )
238  else:
239  raise RuntimeError( 'no device matches configuration "' + spec + '"' )
240  if sns:
241  yield sns
242 
243 
244 def get( sn ):
245  """
246  NOTE: will raise an exception if the SN is unknown!
247 
248  :param sn: The serial-number of the requested device
249  :return: The pyrealsense2.device object with the given SN, or None
250  """
251  global _device_by_sn
252  return _device_by_sn.get(sn).handle
253 
254 
255 def get_port( sn ):
256  """
257  NOTE: will raise an exception if the SN is unknown!
258 
259  :param sn: The serial-number of the requested device
260  :return: The port number (0-7) the device is on, or None if Acroname interface is unavailable
261  """
262  global _device_by_sn
263  return _device_by_sn.get(sn).port
264 
265 
266 def enable_only( serial_numbers, recycle = False, timeout = 5 ):
267  """
268  Enable only the devices corresponding to the given serial-numbers. This can work either
269  with or without Acroname: without, the devices will simply be HW-reset, but other devices
270  will still be present.
271 
272  NOTE: will raise an exception if any SN is unknown!
273 
274  :param serial_numbers: A collection of serial-numbers to enable - all others' ports are
275  disabled and will no longer be usable!
276  :param recycle: If False, the devices will not be reset if they were already enabled. If
277  True, the devices will be recycled by disabling the port, waiting, then
278  re-enabling
279  :param timeout: The maximum seconds to wait to make sure the devices are indeed online
280  """
281  if acroname:
282  #
283  ports = [ get_port( sn ) for sn in serial_numbers ]
284  #
285  if recycle:
286  #
287  log.d( 'recycling ports via acroname:', ports )
288  #
289  acroname.disable_ports( acroname.ports() )
290  _wait_until_removed( serial_numbers, timeout = timeout )
291  #
292  acroname.enable_ports( ports )
293  #
294  else:
295  #
296  acroname.enable_ports( ports, disable_other_ports = True )
297  #
298  _wait_for( serial_numbers, timeout = timeout )
299  #
300  elif recycle:
301  #
302  hw_reset( serial_numbers )
303  #
304  else:
305  log.d( 'no acroname; ports left as-is' )
306 
307 
309  """
310  Enables all ports on an Acroname -- without an Acroname, this does nothing!
311  """
312  if acroname:
313  acroname.enable_ports()
314 
315 
316 def _wait_until_removed( serial_numbers, timeout = 5 ):
317  """
318  Wait until the given serial numbers are all offline
319 
320  :param serial_numbers: A collection of serial-numbers to wait until removed
321  :param timeout: Number of seconds of maximum wait time
322  :return: True if all have come offline; False if timeout was reached
323  """
324  while True:
325  have_devices = False
326  enabled_sns = enabled()
327  for sn in serial_numbers:
328  if sn in enabled_sns:
329  have_devices = True
330  break
331  if not have_devices:
332  return True
333  #
334  if timeout <= 0:
335  return False
336  timeout -= 1
337  time.sleep( 1 )
338 
339 
340 def _wait_for( serial_numbers, timeout = 5 ):
341  """
342  Wait until the given serial numbers are all online
343 
344  :param serial_numbers: A collection of serial-numbers to wait for
345  :param timeout: Number of seconds of maximum wait time
346  :return: True if all have come online; False if timeout was reached
347  """
348  did_some_waiting = False
349  while True:
350  #
351  have_all_devices = True
352  enabled_sns = enabled()
353  for sn in serial_numbers:
354  if sn not in enabled_sns:
355  have_all_devices = False
356  break
357  #
358  if have_all_devices:
359  if did_some_waiting:
360  # Wait an extra second, just in case -- let the devices properly power up
361  #log.d( 'all devices powered up' )
362  time.sleep( 1 )
363  return True
364  #
365  if timeout <= 0:
366  if did_some_waiting:
367  log.d( 'timed out' )
368  return False
369  timeout -= 1
370  time.sleep( 1 )
371  did_some_waiting = True
372 
373 
374 def hw_reset( serial_numbers, timeout = 5 ):
375  """
376  Recycles the given devices manually, using a hardware-reset (rather than any acroname port
377  reset). The devices are sent a HW-reset command and then we'll wait until they come back
378  online.
379 
380  NOTE: will raise an exception if any SN is unknown!
381 
382  :param serial_numbers: A collection of serial-numbers to reset
383  :param timeout: Maximum # of seconds to wait for the devices to come back online
384  :return: True if all devices have come back online before timeout
385  """
386  for sn in serial_numbers:
387  dev = get( sn )
388  dev.hardware_reset()
389  #
390  _wait_until_removed( serial_numbers )
391  #
392  return _wait_for( serial_numbers, timeout = timeout )
393 
394 
395 ###############################################################################################
396 import platform
397 if 'windows' in platform.system().lower():
398  #
399  def _get_usb_location( physical_port ):
400  """
401  Helper method to get windows USB location from registry
402  """
403  if not physical_port:
404  return None
405  # physical port example:
406  # \\?\usb#vid_8086&pid_0b07&mi_00#6&8bfcab3&0&0000#{e5323777-f976-4f5b-9b55-b94699c46e44}\global
407  #
408  re_result = re.match( r'.*\\(.*)#vid_(.*)&pid_(.*)(?:&mi_(.*))?#(.*)#', physical_port, flags = re.IGNORECASE )
409  dev_type = re_result.group(1)
410  vid = re_result.group(2)
411  pid = re_result.group(3)
412  mi = re_result.group(4)
413  unique_identifier = re_result.group(5)
414  #
415  import winreg
416  if mi:
417  registry_path = "SYSTEM\CurrentControlSet\Enum\{}\VID_{}&PID_{}&MI_{}\{}".format(
418  dev_type, vid, pid, mi, unique_identifier
419  )
420  else:
421  registry_path = "SYSTEM\CurrentControlSet\Enum\{}\VID_{}&PID_{}\{}".format(
422  dev_type, vid, pid, unique_identifier
423  )
424  try:
425  reg_key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, registry_path )
426  except FileNotFoundError:
427  log.e( 'Could not find registry key for port:', registry_path )
428  log.e( ' usb location:', physical_port )
429  return None
430  result = winreg.QueryValueEx( reg_key, "LocationInformation" )
431  # location example: 0000.0014.0000.016.003.004.003.000.000
432  # and, for T265: Port_#0002.Hub_#0006
433  return result[0]
434  #
435  def _get_port_by_loc( usb_location ):
436  """
437  """
438  if usb_location:
439  #
440  # T265 locations look differently...
441  match = re.fullmatch( r'Port_#(\d+)\.Hub_#(\d+)', usb_location, re.IGNORECASE )
442  if match:
443  # We don't know how to get the port from these yet!
444  return None #int(match.group(2))
445  else:
446  split_location = [int(x) for x in usb_location.split('.') if int(x) > 0]
447  # only the last two digits are necessary
448  return acroname.get_port_from_usb( split_location[-2], split_location[-1] )
449  #
450 else:
451  #
452  def _get_usb_location( physical_port ):
453  """
454  """
455  if not physical_port:
456  return None
457  # physical port example:
458  # u'/sys/devices/pci0000:00/0000:00:14.0/usb2/2-3/2-3.3/2-3.3.1/2-3.3.1:1.0/video4linux/video0'
459  #
460  split_location = physical_port.split( '/' )
461  port_location = split_location[-4]
462  # location example: 2-3.3.1
463  return port_location
464  #
465  def _get_port_by_loc( usb_location ):
466  """
467  """
468  if usb_location:
469  split_port_location = usb_location.split( '.' )
470  first_port_coordinate = int( split_port_location[-2] )
471  second_port_coordinate = int( split_port_location[-1] )
472  port = acroname.get_port_from_usb( first_port_coordinate, second_port_coordinate )
473  return port
474 
def _get_usb_location(physical_port)
Definition: devices.py:399
def port(self)
Definition: devices.py:84
def name(self)
Definition: devices.py:68
def enabled(self)
Definition: devices.py:92
def by_configuration(config)
Definition: devices.py:212
def _wait_until_removed(serial_numbers, timeout=5)
Definition: devices.py:316
def all()
Definition: devices.py:166
def hw_reset(serial_numbers, timeout=5)
Definition: devices.py:374
def usb_location(self)
Definition: devices.py:80
def enable_all()
Definition: devices.py:308
def get(sn)
Definition: devices.py:244
def by_name(name)
Definition: devices.py:191
def physical_port(self)
Definition: devices.py:76
def _wait_for(serial_numbers, timeout=5)
Definition: devices.py:340
def enable_only(serial_numbers, recycle=False, timeout=5)
Definition: devices.py:266
def query(monitor_changes=True)
Definition: devices.py:96
def handle(self)
Definition: devices.py:88
def _get_sns_from_spec(spec)
Definition: devices.py:200
def __init__(self, sn, dev)
Definition: devices.py:42
def get_port(sn)
Definition: devices.py:255
def serial_number(self)
Definition: devices.py:64
def _get_port_by_loc(usb_location)
Definition: devices.py:435
def _device_change_callback(info)
Definition: devices.py:144
def product_line(self)
Definition: devices.py:72
def enabled()
Definition: devices.py:174
def by_product_line(product_line)
Definition: devices.py:182


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:12