Package smach_ros :: Module introspection

Source Code for Module smach_ros.introspection

  1   
  2  import roslib; roslib.load_manifest('smach_ros') 
  3  import rospy 
  4  from std_msgs.msg import Header 
  5   
  6  import pickle 
  7  import threading 
  8   
  9  import rostopic 
 10  import smach 
 11   
 12  from smach_msgs.msg import SmachContainerStatus,SmachContainerInitialStatusCmd,SmachContainerStructure 
 13   
 14   
 15  __all__ = ['IntrospectionClient','IntrospectionServer'] 
 16   
 17  # Topic names 
 18  STATUS_TOPIC = '/smach/container_status' 
 19  INIT_TOPIC = '/smach/container_init' 
 20  STRUCTURE_TOPIC = '/smach/container_structure' 
 21   
22 -class IntrospectionClient():
23 - def get_servers(self):
24 """Get the base names that are broadcasting smach states.""" 25 26 # Get the currently broadcasted smach introspection topics 27 topics = rostopic.find_by_type('smach_msgs/SmachContainerStatus') 28 29 return [t[:t.rfind(STATUS_TOPIC)] for t in topics]
30
31 - def set_initial_state(self, 32 server, 33 path, 34 initial_states, 35 initial_userdata = smach.UserData(), 36 timeout = None):
37 """Set the initial state of a smach server. 38 39 @type server: string 40 @param server: The name of the introspection server to which this client 41 should connect. 42 43 @type path: string 44 @param path: The path to the target container in the state machine. 45 46 @type initial_states: list of string 47 @param inital_state: The state the target container should take when it 48 starts. This is as list of at least one state label. 49 50 @type initial_userdata: UserData 51 @param initial_userdata: The userdata to inject into the target container. 52 53 @type timeout: rospy.Duration 54 @param timeout: Timeout for this call. If this is set to None, it will not 55 block, and the initial state may not be set before the target state machine 56 goes active. 57 """ 58 59 # Construct initial state command 60 initial_status_msg = SmachContainerInitialStatusCmd( 61 path = path, 62 initial_states = initial_states, 63 local_data = pickle.dumps(initial_userdata._data,2)) 64 65 # A status message to receive confirmation that the state was set properly 66 msg_response = SmachContainerStatus() 67 68 # Define a local callback to just stuff a local message 69 def local_cb(msg, msg_response): 70 rospy.logdebug("Received status response: "+str(msg)) 71 msg_response.path = msg.path 72 msg_response.initial_states = msg.initial_states 73 msg_response.local_data = msg.local_data
74 75 # Create a subscriber to verify the request went through 76 state_sub = rospy.Subscriber(server+STATUS_TOPIC, SmachContainerStatus, 77 callback=local_cb, callback_args=msg_response) 78 79 # Create a publisher to send the command 80 rospy.logdebug("Sending initial state command: "+str(initial_status_msg.path)+" on topic '"+server+INIT_TOPIC+"'") 81 init_pub = rospy.Publisher(server+INIT_TOPIC, 82 SmachContainerInitialStatusCmd, queue_size=1) 83 init_pub.publish(initial_status_msg) 84 85 start_time = rospy.Time.now() 86 87 # Block until we get a new state back 88 if timeout is not None: 89 while rospy.Time.now() - start_time < timeout: 90 # Send the initial state command 91 init_pub.publish(initial_status_msg) 92 93 # Filter messages that are from other containers 94 if msg_response.path == path: 95 # Check if the heartbeat came back to match 96 state_match = all([s in msg_response.initial_states for s in initial_states]) 97 local_data = smach.UserData() 98 local_data._data = pickle.loads(msg_response.local_data) 99 ud_match = all([\ 100 (key in local_data and local_data._data[key] == initial_userdata._data[key])\ 101 for key in initial_userdata._data]) 102 103 rospy.logdebug("STATE MATCH: "+str(state_match)+", UD_MATCH: "+str(ud_match)) 104 105 if state_match and ud_match: 106 return True 107 rospy.sleep(0.3) 108 return False
109 110
111 -class ContainerProxy():
112 """Smach Container Introspection proxy. 113 114 This class is used as a container for introspection and debugging. 115 """
116 - def __init__(self, server_name, container, path, update_rate=rospy.Duration(2.0)):
117 """Constructor for tree-wide data structure. 118 """ 119 self._path = path 120 self._container = container 121 self._update_rate = update_rate 122 self._status_pub_lock = threading.Lock() 123 124 # Advertise init service 125 self._init_cmd = rospy.Subscriber( 126 server_name + INIT_TOPIC, 127 SmachContainerInitialStatusCmd, 128 self._init_cmd_cb) 129 130 # Advertise structure publisher 131 self._structure_pub = rospy.Publisher( 132 name=server_name + STRUCTURE_TOPIC, 133 data_class=SmachContainerStructure, 134 queue_size=1) 135 136 # Advertise status publisher 137 self._status_pub = rospy.Publisher( 138 name=server_name + STATUS_TOPIC, 139 data_class=SmachContainerStatus, 140 queue_size=1) 141 142 # Set transition callback 143 container.register_transition_cb(self._transition_cb) 144 145 # Create thread to constantly publish 146 self._status_pub_thread = threading.Thread(name=server_name+':status_publisher',target=self._status_pub_loop) 147 148 self._structure_pub_thread = threading.Thread(name=server_name+':structure_publisher',target=self._structure_pub_loop) 149 150 self._keep_running = False
151
152 - def start(self):
153 self._keep_running = True 154 self._status_pub_thread.start() 155 self._structure_pub_thread.start()
156
157 - def stop(self):
158 self._keep_running = False
159
160 - def _status_pub_loop(self):
161 """Loop to publish the status and structure heartbeats.""" 162 while not rospy.is_shutdown() and self._keep_running: 163 #TODO 164 self._publish_status('HEARTBEAT') 165 try: 166 end_time = rospy.Time.now() + self._update_rate 167 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 168 rospy.sleep(0.1) 169 except: 170 pass
171
172 - def _structure_pub_loop(self):
173 """Loop to publish the status and structure heartbeats.""" 174 while not rospy.is_shutdown() and self._keep_running: 175 self._publish_structure('HEARTBEAT') 176 try: 177 end_time = rospy.Time.now() + self._update_rate 178 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 179 rospy.sleep(0.1) 180 except: 181 pass
182
183 - def _publish_structure(self, info_str=''):
184 path = self._path 185 children = list(self._container.get_children().keys()) 186 187 internal_outcomes = [] 188 outcomes_from = [] 189 outcomes_to = [] 190 for (outcome, from_label, to_label) in self._container.get_internal_edges(): 191 internal_outcomes.append(str(outcome)) 192 outcomes_from.append(str(from_label)) 193 outcomes_to.append(str(to_label)) 194 container_outcomes = self._container.get_registered_outcomes() 195 196 # Construct structure message 197 structure_msg = SmachContainerStructure( 198 Header(stamp = rospy.Time.now()), 199 path, 200 children, 201 internal_outcomes, 202 outcomes_from, 203 outcomes_to, 204 container_outcomes) 205 try: 206 self._structure_pub.publish(structure_msg) 207 except: 208 if not rospy.is_shutdown(): 209 rospy.logerr("Publishing SMACH introspection structure message failed.")
210
211 - def _publish_status(self, info_str=''):
212 """Publish current state of this container.""" 213 # Construct messages 214 with self._status_pub_lock: 215 path = self._path 216 217 #print str(structure_msg) 218 # Construct status message 219 #print self._container.get_active_states() 220 state_msg = SmachContainerStatus( 221 Header(stamp = rospy.Time.now()), 222 path, 223 self._container.get_initial_states(), 224 self._container.get_active_states(), 225 pickle.dumps(self._container.userdata._data,2), 226 info_str) 227 # Publish message 228 self._status_pub.publish(state_msg)
229 230 ### Transition reporting
231 - def _transition_cb(self, *args, **kwargs):
232 """Transition callback, passed to all internal nodes in the tree. 233 This callback locks an internal mutex, preventing any hooked transitions 234 from occurring while we're walking the tree. 235 """ 236 info_str = (str(args) + ', ' + str(kwargs)) 237 rospy.logdebug("Transitioning: "+info_str) 238 self._publish_status(info_str)
239
240 - def _init_cmd_cb(self, msg):
241 """Initialize a tree's state and userdata.""" 242 initial_states = msg.initial_states 243 local_data = msg.local_data 244 245 # Check if this init message is directed at this path 246 rospy.logdebug('Received init message for path: '+msg.path+' to '+str(initial_states)) 247 if msg.path == self._path: 248 if all(s in self._container.get_children() for s in initial_states): 249 ud = smach.UserData() 250 ud._data = pickle.loads(msg.local_data) 251 rospy.logdebug("Setting initial state in smach path: '"+self._path+"' to '"+str(initial_states)+"' with userdata: "+str(ud._data)) 252 253 # Set the initial state 254 self._container.set_initial_state( 255 initial_states, 256 ud) 257 # Publish initial state 258 self._publish_status("REMOTE_INIT") 259 else: 260 rospy.logerr("Attempting to set initial state in container '"+self._path+"' to '"+str(initial_states)+"', but this container only has states: "+str(self._container.get_children()))
261 262
263 -class IntrospectionServer():
264 """Server for providing introspection and control for smach."""
265 - def __init__(self, server_name, state, path):
266 """Traverse the smach tree starting at root, and construct introspection 267 proxies for getting and setting debug state.""" 268 269 # A list of introspection proxies 270 self._proxies = [] 271 272 # Store args 273 self._server_name = server_name 274 self._state = state 275 self._path = path
276
277 - def start(self):
278 # Construct proxies 279 self.construct(self._server_name, self._state, self._path)
280
281 - def stop(self):
282 for proxy in self._proxies: 283 proxy.stop()
284
285 - def construct(self, server_name, state, path):
286 """Recursively construct proxies to containers.""" 287 # Construct a new proxy 288 proxy = ContainerProxy(server_name, state, path) 289 290 if path == '/': 291 path = '' 292 293 # Get a list of children that are also containers 294 for (label, child) in state.get_children().items(): 295 # If this is also a container, recurse into it 296 if isinstance(child, smach.container.Container): 297 self.construct(server_name, child, path+'/'+label) 298 299 # Publish initial state 300 proxy._publish_status("Initial state") 301 302 # Start publisher threads 303 proxy.start() 304 305 # Store the proxy 306 self._proxies.append(proxy)
307
308 - def clear(self):
309 """Clear all proxies in this server.""" 310 self._proxies = []
311