Package smach_ros :: Module introspection

Source Code for Module smach_ros.introspection

  1   
  2  import roslib; roslib.load_manifest('smach_ros') 
  3  import rospy 
  4  from std_msgs.msg import Header 
  5   
  6  import pickle 
  7  import threading 
  8   
  9  import rostopic 
 10  import smach 
 11   
 12  from smach_msgs.msg import SmachContainerStatus,SmachContainerInitialStatusCmd,SmachContainerStructure 
 13   
 14   
 15  __all__ = ['IntrospectionClient','IntrospectionServer'] 
 16   
 17  # Topic names 
 18  STATUS_TOPIC = '/smach/container_status' 
 19  INIT_TOPIC = '/smach/container_init' 
 20  STRUCTURE_TOPIC = '/smach/container_structure' 
 21   
22 -class IntrospectionClient():
23 - def get_servers(self):
24 """Get the base names that are broadcasting smach states.""" 25 26 # Get the currently broadcasted smach introspection topics 27 topics = rostopic.find_by_type('smach_msgs/SmachContainerStatus') 28 29 return [t[:t.rfind(STATUS_TOPIC)] for t in topics]
30
31 - def set_initial_state(self, 32 server, 33 path, 34 initial_states, 35 initial_userdata = smach.UserData(), 36 timeout = None):
37 """Set the initial state of a smach server. 38 39 @type server: string 40 @param server: The name of the introspection server to which this client 41 should connect. 42 43 @type path: string 44 @param path: The path to the target container in the state machine. 45 46 @type initial_states: list of string 47 @param inital_state: The state the target container should take when it 48 starts. This is as list of at least one state label. 49 50 @type initial_userdata: UserData 51 @param initial_userdata: The userdata to inject into the target container. 52 53 @type timeout: rospy.Duration 54 @param timeout: Timeout for this call. If this is set to None, it will not 55 block, and the initial state may not be set before the target state machine 56 goes active. 57 """ 58 59 # Construct initial state command 60 initial_status_msg = SmachContainerInitialStatusCmd( 61 path = path, 62 initial_states = initial_states, 63 local_data = pickle.dumps(initial_userdata._data,2)) 64 65 # A status message to receive confirmation that the state was set properly 66 msg_response = SmachContainerStatus() 67 68 # Define a local callback to just stuff a local message 69 def local_cb(msg, msg_response): 70 rospy.logdebug("Received status response: "+str(msg)) 71 msg_response.path = msg.path 72 msg_response.initial_states = msg.initial_states 73 msg_response.local_data = msg.local_data
74 75 # Create a subscriber to verify the request went through 76 state_sub = rospy.Subscriber(server+STATUS_TOPIC, SmachContainerStatus, 77 callback=local_cb, callback_args=msg_response) 78 79 # Create a publisher to send the command 80 rospy.logdebug("Sending initial state command: "+str(initial_status_msg.path)+" on topic '"+server+INIT_TOPIC+"'") 81 init_pub = rospy.Publisher(server+INIT_TOPIC,SmachContainerInitialStatusCmd) 82 init_pub.publish(initial_status_msg) 83 84 start_time = rospy.Time.now() 85 86 # Block until we get a new state back 87 if timeout is not None: 88 while rospy.Time.now() - start_time < timeout: 89 # Send the initial state command 90 init_pub.publish(initial_status_msg) 91 92 # Filter messages that are from other containers 93 if msg_response.path == path: 94 # Check if the heartbeat came back to match 95 state_match = all([s in msg_response.initial_states for s in initial_states]) 96 local_data = smach.UserData() 97 local_data._data = pickle.loads(msg_response.local_data) 98 ud_match = all([\ 99 (key in local_data and local_data._data[key] == initial_userdata._data[key])\ 100 for key in initial_userdata._data]) 101 102 rospy.logdebug("STATE MATCH: "+str(state_match)+", UD_MATCH: "+str(ud_match)) 103 104 if state_match and ud_match: 105 return True 106 rospy.sleep(0.3) 107 return False
108 109
110 -class ContainerProxy():
111 """Smach Container Introspection proxy. 112 113 This class is used as a container for introspection and debugging. 114 """
115 - def __init__(self, server_name, container, path, update_rate=rospy.Duration(2.0)):
116 """Constructor for tree-wide data structure. 117 """ 118 self._path = path 119 self._container = container 120 self._update_rate = update_rate 121 self._status_pub_lock = threading.Lock() 122 123 # Advertise init service 124 self._init_cmd = rospy.Subscriber( 125 server_name + INIT_TOPIC, 126 SmachContainerInitialStatusCmd, 127 self._init_cmd_cb) 128 129 # Advertise structure publisher 130 self._structure_pub = rospy.Publisher( 131 name=server_name + STRUCTURE_TOPIC, 132 data_class=SmachContainerStructure) 133 134 # Advertise status publisher 135 self._status_pub = rospy.Publisher( 136 name=server_name + STATUS_TOPIC, 137 data_class=SmachContainerStatus) 138 139 # Set transition callback 140 container.register_transition_cb(self._transition_cb) 141 142 # Create thread to constantly publish 143 self._status_pub_thread = threading.Thread(name=server_name+':status_publisher',target=self._status_pub_loop) 144 145 self._structure_pub_thread = threading.Thread(name=server_name+':structure_publisher',target=self._structure_pub_loop) 146 147 self._keep_running = False
148
149 - def start(self):
150 self._keep_running = True 151 self._status_pub_thread.start() 152 self._structure_pub_thread.start()
153
154 - def stop(self):
155 self._keep_running = False
156
157 - def _status_pub_loop(self):
158 """Loop to publish the status and structure heartbeats.""" 159 while not rospy.is_shutdown() and self._keep_running: 160 #TODO 161 self._publish_status('HEARTBEAT') 162 try: 163 end_time = rospy.Time.now() + self._update_rate 164 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 165 rospy.sleep(0.1) 166 except: 167 pass
168
169 - def _structure_pub_loop(self):
170 """Loop to publish the status and structure heartbeats.""" 171 while not rospy.is_shutdown() and self._keep_running: 172 self._publish_structure('HEARTBEAT') 173 try: 174 end_time = rospy.Time.now() + self._update_rate 175 while not rospy.is_shutdown() and rospy.Time.now() < end_time: 176 rospy.sleep(0.1) 177 except: 178 pass
179
180 - def _publish_structure(self, info_str=''):
181 path = self._path 182 children = list(self._container.get_children().keys()) 183 184 internal_outcomes = [] 185 outcomes_from = [] 186 outcomes_to = [] 187 for (outcome, from_label, to_label) in self._container.get_internal_edges(): 188 internal_outcomes.append(str(outcome)) 189 outcomes_from.append(str(from_label)) 190 outcomes_to.append(str(to_label)) 191 container_outcomes = self._container.get_registered_outcomes() 192 193 # Construct structure message 194 structure_msg = SmachContainerStructure( 195 Header(stamp = rospy.Time.now()), 196 path, 197 children, 198 internal_outcomes, 199 outcomes_from, 200 outcomes_to, 201 container_outcomes) 202 try: 203 self._structure_pub.publish(structure_msg) 204 except: 205 if not rospy.is_shutdown(): 206 rospy.logerr("Publishing SMACH introspection structure message failed.")
207
208 - def _publish_status(self, info_str=''):
209 """Publish current state of this container.""" 210 # Construct messages 211 with self._status_pub_lock: 212 path = self._path 213 214 #print str(structure_msg) 215 # Construct status message 216 #print self._container.get_active_states() 217 state_msg = SmachContainerStatus( 218 Header(stamp = rospy.Time.now()), 219 path, 220 self._container.get_initial_states(), 221 self._container.get_active_states(), 222 pickle.dumps(self._container.userdata._data,2), 223 info_str) 224 # Publish message 225 self._status_pub.publish(state_msg)
226 227 ### Transition reporting
228 - def _transition_cb(self, *args, **kwargs):
229 """Transition callback, passed to all internal nodes in the tree. 230 This callback locks an internal mutex, preventing any hooked transitions 231 from occurring while we're walking the tree. 232 """ 233 info_str = (str(args) + ', ' + str(kwargs)) 234 rospy.logdebug("Transitioning: "+info_str) 235 self._publish_status(info_str)
236
237 - def _init_cmd_cb(self, msg):
238 """Initialize a tree's state and userdata.""" 239 initial_states = msg.initial_states 240 local_data = msg.local_data 241 242 # Check if this init message is directed at this path 243 rospy.logdebug('Received init message for path: '+msg.path+' to '+str(initial_states)) 244 if msg.path == self._path: 245 if all(s in self._container.get_children() for s in initial_states): 246 ud = smach.UserData() 247 ud._data = pickle.loads(msg.local_data) 248 rospy.logdebug("Setting initial state in smach path: '"+self._path+"' to '"+str(initial_states)+"' with userdata: "+str(ud._data)) 249 250 # Set the initial state 251 self._container.set_initial_state( 252 initial_states, 253 ud) 254 # Publish initial state 255 self._publish_status("REMOTE_INIT") 256 else: 257 rospy.logerr("Attempting to set initial state in container '"+self._path+"' to '"+str(initial_states)+"', but this container only has states: "+str(self._container.get_children()))
258 259
260 -class IntrospectionServer():
261 """Server for providing introspection and control for smach."""
262 - def __init__(self, server_name, state, path):
263 """Traverse the smach tree starting at root, and construct introspection 264 proxies for getting and setting debug state.""" 265 266 # A list of introspection proxies 267 self._proxies = [] 268 269 # Store args 270 self._server_name = server_name 271 self._state = state 272 self._path = path
273
274 - def start(self):
275 # Construct proxies 276 self.construct(self._server_name, self._state, self._path)
277
278 - def stop(self):
279 for proxy in self._proxies: 280 proxy.stop()
281
282 - def construct(self, server_name, state, path):
283 """Recursively construct proxies to containers.""" 284 # Construct a new proxy 285 proxy = ContainerProxy(server_name, state, path) 286 287 if path == '/': 288 path = '' 289 290 # Get a list of children that are also containers 291 for (label, child) in state.get_children().items(): 292 # If this is also a container, recurse into it 293 if isinstance(child, smach.container.Container): 294 self.construct(server_name, child, path+'/'+label) 295 296 # Publish initial state 297 proxy._publish_status("Initial state") 298 299 # Start publisher threads 300 proxy.start() 301 302 # Store the proxy 303 self._proxies.append(proxy)
304
305 - def clear(self):
306 """Clear all proxies in this server.""" 307 self._proxies = []
308