sensor_managers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import roslib; roslib.load_manifest('calibration_launch')
35 import sys
36 import threading
37 
38 import rospy
39 from calibration_msgs.msg import *
40 import sensor_msgs.msg
41 
42 import message_filters
43 
44 
46  def __init__(self, chain_id, callback, *cb_args):
47  self._chain_id = chain_id
48  self._callback = callback
49  self._cb_args = cb_args
50  self._mode = "off"
51 
52  self._lock = threading.Lock()
53 
54  self._chain_sub = rospy.Subscriber(chain_id + "/chain_state", sensor_msgs.msg.JointState, self.callback)
55 
56  def callback(self, msg):
57  self._lock.acquire()
58  if self._mode is "on":
59  # Populate measurement message
60  m_msg = ChainMeasurement()
61  m_msg.header.stamp = msg.header.stamp
62  m_msg.chain_id = self._chain_id
63  m_msg.chain_state = msg
64  self._callback(self._chain_id, m_msg, *self._cb_args)
65  self._lock.release()
66 
67  def enable(self):
68  self._lock.acquire()
69  self._mode = "on"
70  self._lock.release()
71 
72  def disable(self):
73  self._lock.acquire()
74  self._mode = "off"
75  self._lock.release()
76 
77 class CamManager:
78  def __init__(self, cam_id, callback, *cb_args):
79  self._cam_id = cam_id
80  self._callback = callback
81  self._cb_args = cb_args
82  self._mode = "off"
83 
84  self._lock = threading.Lock()
85 
86  # How do I specify a queue size of 1?
87  self._cam_info_sub = message_filters.Subscriber(cam_id + "/camera_info", sensor_msgs.msg.CameraInfo)
88  self._features_sub = message_filters.Subscriber(cam_id + "/features", calibration_msgs.msg.CalibrationPattern)
89  self._image_sub = message_filters.Subscriber(cam_id + "/image", sensor_msgs.msg.Image)
90  self._image_rect_sub = message_filters.Subscriber(cam_id + "/image_rect", sensor_msgs.msg.Image)
92  self._verbose_sync.registerCallback(self.verbose_callback)
94  self._minimal_sync.registerCallback(self.minimal_callback)
95 
96  def verbose_callback(self, cam_info, features, image, image_rect):
97  self._lock.acquire()
98  if self._mode is "verbose":
99  # Populate measurement message
100  msg = CameraMeasurement()
101  msg.header.stamp = cam_info.header.stamp
102  msg.camera_id = self._cam_id
103  msg.image_points = features.image_points
104  msg.cam_info = cam_info
105  msg.verbose = True
106  msg.image = image
107  msg.image_rect = image_rect
108  msg.features = features
109  self._callback(self._cam_id, msg, *self._cb_args)
110  self._lock.release()
111 
112  def minimal_callback(self, cam_info, features):
113  self._lock.acquire()
114  if self._mode is "minimal":
115  # Populate measurement message
116  msg = CameraMeasurement()
117  msg.header.stamp = cam_info.header.stamp
118  msg.camera_id = self._cam_id
119  msg.image_points = features.image_points
120  msg.cam_info = cam_info
121  msg.verbose = False
122  self._callback(self._cam_id, msg, *self._cb_args)
123  self._lock.release()
124 
125  def enable(self, verbose=False):
126  self._lock.acquire()
127  if verbose:
128  self._mode = "verbose"
129  else:
130  self._mode = "minimal"
131  self._lock.release()
132 
133  def disable(self):
134  self._lock.acquire()
135  self._mode = "off"
136  self._lock.release()
137 
139  def __init__(self, laser_id, callback, *cb_args):
140  self._laser_id = laser_id
141  self._callback = callback
142  self._cb_args = cb_args
143  self._mode = "off"
144 
145  self._lock = threading.Lock()
146 
147  # How do I specify a queue size of 1?
148  self._snapshot_sub = message_filters.Subscriber(laser_id + "/snapshot", calibration_msgs.msg.DenseLaserSnapshot)
149  self._laser_image_sub = message_filters.Subscriber(laser_id + "/laser_image", sensor_msgs.msg.Image)
150  self._image_features_sub = message_filters.Subscriber(laser_id + "/image_features", calibration_msgs.msg.CalibrationPattern)
151  self._joint_features_sub = message_filters.Subscriber(laser_id + "/joint_features", calibration_msgs.msg.JointStateCalibrationPattern)
152  self._duration_sub = message_filters.Subscriber(laser_id + "/laser_interval", calibration_msgs.msg.IntervalStamped)
153 
155  self._laser_image_sub,
156  self._image_features_sub,
157  self._joint_features_sub,
158  self._duration_sub], 10)
160  self._duration_sub], 10)
161 
162  self._verbose_sync.registerCallback(self.verbose_callback)
163  self._minimal_sync.registerCallback(self.minimal_callback)
164 
165  def verbose_callback(self, snapshot, laser_image, image_features, joint_features, laser_duration):
166  self._lock.acquire()
167  if self._mode is "verbose":
168  # Populate measurement message
169  msg = LaserMeasurement()
170  msg.laser_id = self._laser_id
171  msg.joint_points = joint_features.joint_points
172  msg.verbose = True
173  msg.snapshot = snapshot
174  msg.laser_image = laser_image
175  msg.image_features = image_features
176  msg.joint_features = joint_features
177  self._callback(self._laser_id, msg, laser_duration.interval.start, laser_duration.interval.end, *self._cb_args)
178  self._lock.release()
179 
180  def minimal_callback(self, joint_features, laser_duration):
181  self._lock.acquire()
182  if self._mode is "minimal":
183  # Populate measurement message
184  msg = LaserMeasurement()
185  msg.laser_id = self._laser_id
186  msg.joint_points = joint_features.joint_points
187  msg.verbose = True
188  self._callback(self._laser_id, msg, laser_duration.interval.start, laser_duration.interval.end, *self._cb_args)
189  self._lock.release()
190 
191  def enable(self, verbose=False):
192  self._lock.acquire()
193  if verbose:
194  self._mode = "verbose"
195  else:
196  self._mode = "minimal"
197  self._lock.release()
198 
199  def disable(self):
200  self._lock.acquire()
201  self._mode = "off"
202  self._lock.release()
203 
def minimal_callback(self, joint_features, laser_duration)
def __init__(self, laser_id, callback, cb_args)
def minimal_callback(self, cam_info, features)
def verbose_callback(self, snapshot, laser_image, image_features, joint_features, laser_duration)
def __init__(self, cam_id, callback, cb_args)
def verbose_callback(self, cam_info, features, image, image_rect)
def __init__(self, chain_id, callback, cb_args)


calibration_launch
Author(s): Michael Ferguson
autogenerated on Fri Apr 2 2021 02:13:07