12 import pyrealsense2
as rs
17 sys.path += [os.path.join( os.path.dirname( sys.executable ),
'lib',
'site-packages')]
20 from rspy
import acroname
21 except ModuleNotFoundError:
24 log.d(
'sys.path=', sys.path )
27 sys.path = sys.path[:-1]
28 except ModuleNotFoundError:
29 log.w(
'No pyrealsense2 library is available! Running as if no cameras available...' )
31 log.d(
'sys.path=', sys.path )
37 _device_by_sn = dict()
46 if dev.supports( rs.camera_info.name ):
47 self.
_name = dev.get_info( rs.camera_info.name )
49 if dev.supports( rs.camera_info.product_line ):
50 self.
_product_line = dev.get_info( rs.camera_info.product_line )
51 self.
_physical_port = dev.supports( rs.camera_info.physical_port )
and dev.get_info( rs.camera_info.physical_port )
or None 57 except Exception
as e:
58 log.e(
'Failed to get device port:', e )
96 def query( monitor_changes = True ):
98 Start a new LRS context, and collect all devices 99 :param monitor_changes: If True, devices will update dynamically as they are removed/added 109 acroname.enable_ports( sleep_on_change = 5 )
112 global _device_by_sn, _context, _port_to_sn
113 _context = rs.context()
114 _device_by_sn = dict()
116 log.d(
'discovering devices ...' )
118 for retry
in range(3):
120 devices = _context.query_devices()
122 except RuntimeError
as e:
123 log.d(
'FAILED to query devices:', e )
125 log.e(
'FAILED to query devices', retry + 1,
'times!' )
130 if dev.is_update_device():
131 sn = dev.get_info( rs.camera_info.firmware_update_id )
133 sn = dev.get_info( rs.camera_info.serial_number )
134 device =
Device( sn, dev )
135 _device_by_sn[sn] = device
136 log.d(
'... port {}:'.format( device.port
is None and '?' or device.port ), dev )
141 _context.set_devices_changed_callback( _device_change_callback )
146 Called when librealsense detects a device change (see query()) 149 for device
in _device_by_sn.values():
150 if device.enabled
and info.was_removed( device.handle ):
151 log.d(
'device removed:', device.serial_number )
152 device._removed =
True 153 for handle
in info.get_new_devices():
154 if handle.is_update_device():
155 sn = handle.get_info( rs.camera_info.firmware_update_id )
157 sn = handle.get_info( rs.camera_info.serial_number )
158 log.d(
'device added:', handle )
159 if sn
in _device_by_sn:
160 _device_by_sn[sn]._removed =
False 162 log.d(
'New device detected!?' )
163 _device_by_sn[sn] =
Device( sn, handle )
168 :return: A set of all device serial-numbers at the time of query() 171 return _device_by_sn.keys()
176 :return: A set of all device serial-numbers that are currently enabled 179 return { device.serial_number
for device
in _device_by_sn.values()
if device.enabled }
184 :param product_line: The product line we're interested in, as a string ("L500", etc.) 185 :return: A set of device serial-numbers 188 return { device.serial_number
for device
in _device_by_sn.values()
if device.product_line == product_line }
193 :param name: Part of the product name to search for ("L515" would match "Intel RealSense L515") 194 :return: A set of device serial-numbers 197 return { device.serial_number
for device
in _device_by_sn.values()
if device.name
and device.name.find( name ) >= 0 }
202 Helper function for by_configuration. Yields all serial-numbers matching the given spec 204 if spec.endswith(
'*' ):
214 Yields the serial numbers fitting the given configuration. If configuration includes an 'each' directive 215 will yield all fitting serial numbers one at a time. Otherwise yields one set of serial numbers fitting the configuration 217 :param config: A test:device line collection of arguments (e.g., [L515 D400*]) 219 If no device matches the configuration devices specified, a RuntimeError will be 222 if len( config ) == 1
and re.fullmatch(
r'each\(.+\)', config[0], re.IGNORECASE ):
223 spec = config[0][5:-1]
235 if new_len == old_len:
237 raise RuntimeError(
'no device matches configuration "' + spec +
'" (after already matching ' +
str(sns) +
')' )
239 raise RuntimeError(
'no device matches configuration "' + spec +
'"' )
246 NOTE: will raise an exception if the SN is unknown! 248 :param sn: The serial-number of the requested device 249 :return: The pyrealsense2.device object with the given SN, or None 252 return _device_by_sn.get(sn).handle
257 NOTE: will raise an exception if the SN is unknown! 259 :param sn: The serial-number of the requested device 260 :return: The port number (0-7) the device is on, or None if Acroname interface is unavailable 263 return _device_by_sn.get(sn).port
268 Enable only the devices corresponding to the given serial-numbers. This can work either 269 with or without Acroname: without, the devices will simply be HW-reset, but other devices 270 will still be present. 272 NOTE: will raise an exception if any SN is unknown! 274 :param serial_numbers: A collection of serial-numbers to enable - all others' ports are 275 disabled and will no longer be usable! 276 :param recycle: If False, the devices will not be reset if they were already enabled. If 277 True, the devices will be recycled by disabling the port, waiting, then 279 :param timeout: The maximum seconds to wait to make sure the devices are indeed online 283 ports = [
get_port( sn )
for sn
in serial_numbers ]
287 log.d(
'recycling ports via acroname:', ports )
289 acroname.disable_ports( acroname.ports() )
292 acroname.enable_ports( ports )
296 acroname.enable_ports( ports, disable_other_ports =
True )
298 _wait_for( serial_numbers, timeout = timeout )
305 log.d(
'no acroname; ports left as-is' )
310 Enables all ports on an Acroname -- without an Acroname, this does nothing! 313 acroname.enable_ports()
318 Wait until the given serial numbers are all offline 320 :param serial_numbers: A collection of serial-numbers to wait until removed 321 :param timeout: Number of seconds of maximum wait time 322 :return: True if all have come offline; False if timeout was reached 326 enabled_sns = enabled()
327 for sn
in serial_numbers:
328 if sn
in enabled_sns:
342 Wait until the given serial numbers are all online 344 :param serial_numbers: A collection of serial-numbers to wait for 345 :param timeout: Number of seconds of maximum wait time 346 :return: True if all have come online; False if timeout was reached 348 did_some_waiting =
False 351 have_all_devices =
True 352 enabled_sns = enabled()
353 for sn
in serial_numbers:
354 if sn
not in enabled_sns:
355 have_all_devices =
False 371 did_some_waiting =
True 376 Recycles the given devices manually, using a hardware-reset (rather than any acroname port 377 reset). The devices are sent a HW-reset command and then we'll wait until they come back 380 NOTE: will raise an exception if any SN is unknown! 382 :param serial_numbers: A collection of serial-numbers to reset 383 :param timeout: Maximum # of seconds to wait for the devices to come back online 384 :return: True if all devices have come back online before timeout 386 for sn
in serial_numbers:
392 return _wait_for( serial_numbers, timeout = timeout )
397 if 'windows' in platform.system().lower():
401 Helper method to get windows USB location from registry 403 if not physical_port:
408 re_result = re.match(
r'.*\\(.*)#vid_(.*)&pid_(.*)(?:&mi_(.*))?#(.*)#', physical_port, flags = re.IGNORECASE )
409 dev_type = re_result.group(1)
410 vid = re_result.group(2)
411 pid = re_result.group(3)
412 mi = re_result.group(4)
413 unique_identifier = re_result.group(5)
417 registry_path =
"SYSTEM\CurrentControlSet\Enum\{}\VID_{}&PID_{}&MI_{}\{}".format(
418 dev_type, vid, pid, mi, unique_identifier
421 registry_path =
"SYSTEM\CurrentControlSet\Enum\{}\VID_{}&PID_{}\{}".format(
422 dev_type, vid, pid, unique_identifier
425 reg_key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, registry_path )
426 except FileNotFoundError:
427 log.e(
'Could not find registry key for port:', registry_path )
428 log.e(
' usb location:', physical_port )
430 result = winreg.QueryValueEx( reg_key,
"LocationInformation" )
441 match = re.fullmatch(
r'Port_#(\d+)\.Hub_#(\d+)', usb_location, re.IGNORECASE )
446 split_location = [int(x)
for x
in usb_location.split(
'.')
if int(x) > 0]
448 return acroname.get_port_from_usb( split_location[-2], split_location[-1] )
455 if not physical_port:
460 split_location = physical_port.split(
'/' )
461 port_location = split_location[-4]
469 split_port_location = usb_location.split(
'.' )
470 first_port_coordinate = int( split_port_location[-2] )
471 second_port_coordinate = int( split_port_location[-1] )
472 port = acroname.get_port_from_usb( first_port_coordinate, second_port_coordinate )
def _get_usb_location(physical_port)
def by_configuration(config)
def _wait_until_removed(serial_numbers, timeout=5)
def hw_reset(serial_numbers, timeout=5)
def _wait_for(serial_numbers, timeout=5)
def enable_only(serial_numbers, recycle=False, timeout=5)
def query(monitor_changes=True)
def _get_sns_from_spec(spec)
def __init__(self, sn, dev)
def _get_port_by_loc(usb_location)
def _device_change_callback(info)
def by_product_line(product_line)