Package smach_ros :: Module introspection

Source Code for Module smach_ros.introspection

  1   
  2  import roslib; roslib.load_manifest('smach_ros') 
  3  import rospy 
  4  from std_msgs.msg import Header 
  5   
  6  import pickle 
  7  import threading 
  8   
  9  import rostopic 
 10  import smach 
 11   
 12  from smach_msgs.msg import SmachContainerStatus,SmachContainerInitialStatusCmd,SmachContainerStructure 
 13   
 14   
 15  __all__ = ['IntrospectionClient','IntrospectionServer'] 
 16   
 17  # Topic names 
 18  STATUS_TOPIC = '/smach/container_status' 
 19  INIT_TOPIC = '/smach/container_init' 
 20  STRUCTURE_TOPIC = '/smach/container_structure' 
 21   
22 -class IntrospectionClient():
23 - def get_servers(self):
24 """Get the base names that are broadcasting smach states.""" 25 26 # Get the currently broadcasting smach introspection topics 27 topics = rostopic.find_by_type('smach_msgs/SmachContainerStatus') 28 29 return [t[:t.rfind(STATUS_TOPIC)] for t in topics]
30
31 - def set_initial_state(self, 32 server, 33 path, 34 initial_states, 35 initial_userdata = smach.UserData(), 36 timeout = None):
37 """Set the initial state of a smach server. 38 39 @type server: string 40 @param server: The name of the introspection server to which this client 41 should connect. 42 43 @type path: string 44 @param path: The path to the target container in the state machine. 45 46 @type initial_states: list of string 47 @param inital_state: The state the target container should take when it 48 starts. This is as list of at least one state label. 49 50 @type initial_userdata: UserData 51 @param initial_userdata: The userdata to inject into the target container. 52 53 @type timeout: rospy.Duration 54 @param timeout: Timeout for this call. If this is set to None, it will not 55 block, and the initial state may not be set before the target state machine 56 goes active. 57 """ 58 59 # Construct initial state command 60 initial_status_msg = SmachContainerInitialStatusCmd( 61 path = path, 62 initial_states = initial_states, 63 local_data = pickle.dumps(initial_userdata._data,2)) 64 65 # A status message to receive confirmation that the state was set properly 66 msg_response = SmachContainerStatus() 67 68 # Define a local callback to just stuff a local message 69 def local_cb(msg, msg_response): 70 rospy.logdebug("Received status response: "+str(msg)) 71 msg_response.path = msg.path 72 msg_response.initial_states = msg.initial_states 73 msg_response.local_data = msg.local_data
74 75 # Create a subscriber to verify the request went trhough 76 state_sub = rospy.Subscriber(server+STATUS_TOPIC,SmachContainerStatus, 77 callback = local_cb, callback_args = msg_response) 78 79 # Create a publisher to send the command 80 rospy.logdebug("Sending initial state command: "+str(initial_status_msg.path)+" on topic '"+server+INIT_TOPIC+"'") 81 init_pub = rospy.Publisher(server+INIT_TOPIC,SmachContainerInitialStatusCmd) 82 init_pub.publish(initial_status_msg) 83 84 start_time = rospy.Time.now() 85 86 # Block until we get a new state back 87 if timeout is not None: 88 while rospy.Time.now()-start_time < timeout: 89 # Send the initial state command 90 init_pub.publish(initial_status_msg) 91 92 # Filter messages that are from other containers 93 if msg_response.path == path: 94 # Check if the heartbeat came back to match 95 state_match = all([s in msg_response.initial_states for s in initial_states]) 96 local_data = smach.UserData() 97 local_data._data = pickle.loads(msg_response.local_data) 98 ud_match = all([\ 99 (key in local_data and local_data._data[key] == initial_userdata._data[key])\ 100 for (key,value)\ 101 in initial_userdata._data.iteritems()]) 102 103 rospy.logdebug("STATE MATCH: "+str(state_match)+", UD_MATCH: "+str(ud_match)) 104 105 if state_match and ud_match: 106 return True 107 rospy.sleep(0.3) 108 return False
109 110
111 -class ContainerProxy():
112 """Smach Container Introspection proxy. 113 114 This class is used as a container for introspection and debugging. 115 """
116 - def __init__(self, server_name, container, path, update_rate = rospy.Duration(2.0)):
117 """Constructor for tree-wide data structure. 118 """ 119 self._path = path 120 self._container = container 121 self._update_rate = update_rate 122 self._status_pub_lock = threading.Lock() 123 124 # Advertise init service 125 self._init_cmd= rospy.Subscriber( 126 server_name+INIT_TOPIC, 127 SmachContainerInitialStatusCmd, 128 self._init_cmd_cb) 129 130 # Advertise structure publisher 131 self._structure_pub = rospy.Publisher( 132 name = server_name+STRUCTURE_TOPIC, 133 data_class = SmachContainerStructure) 134 135 # Advertise status publisher 136 self._status_pub = rospy.Publisher( 137 name = server_name+STATUS_TOPIC, 138 data_class = SmachContainerStatus) 139 140 # Set transition callback 141 container.register_transition_cb(self._transition_cb) 142 143 # Create thread to constantly publish 144 self._status_pub_thread = threading.Thread(name=server_name+':status_publisher',target=self._status_pub_loop) 145 146 self._structure_pub_thread = threading.Thread(name=server_name+':structure_publisher',target=self._structure_pub_loop)
147
148 - def start(self):
149 self._keep_running = True 150 self._status_pub_thread.start() 151 self._structure_pub_thread.start()
152
153 - def stop(self):
154 self._keep_running = False
155
156 - def _status_pub_loop(self):
157 """Loop to publish the status and structure heartbeats.""" 158 while not rospy.is_shutdown() and self._keep_running: 159 #TODO 160 self._publish_status('HEARTBEAT') 161 try: 162 end_time = rospy.Time.now() + self._update_rate 163 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 164 rospy.sleep(0.1) 165 except: 166 pass
167
168 - def _structure_pub_loop(self):
169 """Loop to publish the status and structure heartbeats.""" 170 while not rospy.is_shutdown() and self._keep_running: 171 self._publish_structure('HEARTBEAT') 172 try: 173 end_time = rospy.Time.now() + self._update_rate 174 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 175 rospy.sleep(0.1) 176 except: 177 pass
178
179 - def _publish_structure(self, info_str = ''):
180 path = self._path 181 children = self._container.get_children().keys() 182 183 internal_outcomes = [] 184 outcomes_from = [] 185 outcomes_to = [] 186 for (outcome, from_label, to_label) in self._container.get_internal_edges(): 187 internal_outcomes.append(str(outcome)) 188 outcomes_from.append(str(from_label)) 189 outcomes_to.append(str(to_label)) 190 container_outcomes = self._container.get_registered_outcomes() 191 192 # Construct structure message 193 structure_msg = SmachContainerStructure( 194 Header(stamp = rospy.Time.now()), 195 path, 196 children, 197 internal_outcomes, 198 outcomes_from, 199 outcomes_to, 200 container_outcomes) 201 try: 202 self._structure_pub.publish(structure_msg) 203 except: 204 if not rospy.is_shutdown(): 205 rospy.logerr("Publishing SMACH introspection structure message failed.")
206
207 - def _publish_status(self, info_str = ''):
208 """Publish current state of this container.""" 209 # Construct messages 210 with self._status_pub_lock: 211 path = self._path 212 children = self._container.get_children().keys() 213 214 #print str(structure_msg) 215 # Construct status message 216 #print self._container.get_active_states() 217 state_msg = SmachContainerStatus( 218 Header(stamp = rospy.Time.now()), 219 path, 220 self._container.get_initial_states(), 221 self._container.get_active_states(), 222 pickle.dumps(self._container.userdata._data,2), 223 info_str) 224 # Publish message 225 self._status_pub.publish(state_msg)
226 227 ### Transition reporting
228 - def _transition_cb(self, *args, **kwargs):
229 """Transition callback, passed to all internal nodes in the tree. 230 This callback locks an internal mutex, preventing any hooked transitions 231 from occurring while we're walking the tree. 232 """ 233 info_str = (str(args) + ', ' + str(kwargs)) 234 rospy.logdebug("Transitioning: "+info_str) 235 self._publish_status(info_str)
236
237 - def _init_cmd_cb(self, msg):
238 """Initialize a tree's state and userdata.""" 239 initial_states = msg.initial_states 240 local_data = msg.local_data 241 242 # Check if this init message is directed at this path 243 rospy.logdebug('Received init message for path: '+msg.path+' to '+str(initial_states)) 244 if msg.path == self._path: 245 if all([s in self._container.get_children() for s in initial_states]): 246 ud = smach.UserData() 247 ud._data = pickle.loads(msg.local_data) 248 rospy.logdebug("Setting initial state in smach path: '"+self._path+"' to '"+str(initial_states)+"' with userdata: "+str(ud._data)) 249 250 # Set the initial state 251 self._container.set_initial_state( 252 initial_states, 253 ud) 254 # Publish initial state 255 self._publish_status("REMOTE_INIT") 256 else: 257 rospy.logerr("Attempting to set initial state in container '"+self._path+"' to '"+str(initial_states)+"', but this container only has states: "+str(self._container.get_children()))
258
259 -class IntrospectionServer():
260 """Server for providing introspection and control for smach."""
261 - def __init__(self, server_name, state, path):
262 """Traverse the smach tree starting at root, and construct introspection 263 proxies for getting and setting debug state.""" 264 265 # A list of introspection proxies 266 self._proxies = [] 267 268 # Store args 269 self._server_name = server_name 270 self._state = state 271 self._path = path
272
273 - def start(self):
274 # Construct proxies 275 self.construct(self._server_name, self._state, self._path)
276
277 - def stop(self):
278 for proxy in self._proxies: 279 proxy.stop()
280
281 - def construct(self, server_name, state, path):
282 """Recursively construct proxies to containers.""" 283 # Construct a new proxy 284 proxy = ContainerProxy(server_name, state, path) 285 286 if path == '/': 287 path = '' 288 289 # Get a list of children that are also containers 290 for (label, child) in state.get_children().iteritems(): 291 # If this is also a container, recursei into it 292 if isinstance(child, smach.container.Container): 293 self.construct(server_name, child, path+'/'+label) 294 295 # Publish initial state 296 proxy._publish_status("Initial state") 297 298 # Start publisher threads 299 proxy.start() 300 301 # Store the proxy 302 self._proxies.append(proxy)
303
304 - def clear(self):
305 """Clear all proxies in this server.""" 306 self._proxies = []
307