Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from std_msgs.msg import String, Empty
00019
00020 import threading
00021 import smach
00022 import rospy
00023 import airbus_ssm_core.msg
00024 from ast import literal_eval
00025
00026
00027 class ssmIntrospection():
00028 def __init__(self, state_machine, action_server = None):
00029 """Traverse the smach tree starting at root, and construct introspection
00030 proxies for getting and setting debug state."""
00031
00032 self._state_machine = state_machine
00033 self._as = action_server
00034 self._tree_view = {}
00035 self._server_name = rospy.get_param('ssm_server_name', '/ssm')
00036 self._update_pub = rospy.Publisher(self._server_name + "/ssm_status", String, queue_size = 1)
00037 self._request_update = rospy.Subscriber(self._server_name + "/status_request", Empty, self._status_request_cb, queue_size = 1)
00038
00039 def start(self):
00040
00041 self._state_machine._create_tree_view()
00042 self.construct(self._state_machine,"ROOT")
00043
00044 def stop(self):
00045 self._request_update.unregister()
00046 self._tree_view = {}
00047
00048
00049 def construct(self, state, _key):
00050 """Recursively construct tree view."""
00051 self._tree_view[_key] = state.get_tree_view()
00052 state.register_tree_view_cb(self._update_tree_view_cb)
00053
00054 for (label, child) in state.get_children().items():
00055
00056 if isinstance(child, smach.container.Container):
00057 child._create_tree_view()
00058 self.construct(child, label)
00059
00060 def _update_tree_view_cb(self, *args, **kwargs):
00061 self._update_pub.publish(str(self._tree_view))
00062 if(self._as is not None):
00063 feedback = airbus_ssm_core.msg.SSMFeedback()
00064 feedback.current_active_states = str(self._tree_view)
00065 self._as.publish_feedback(feedback)
00066
00067 def _status_request_cb(self, msg):
00068 self._update_pub.publish(str(self._tree_view))
00069
00070