00001
00002
00003 import roslib; roslib.load_manifest('flexbe_mirror')
00004 import rospy
00005 import smach
00006 import threading
00007 import zlib
00008
00009 from flexbe_core import JumpableStateMachine, LockableStateMachine
00010 from flexbe_core.core import PreemptableState
00011 from mirror_state import MirrorState
00012
00013 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
00014 from flexbe_msgs.msg import ContainerStructure, BehaviorSync, BEStatus
00015 from std_msgs.msg import Empty, String, Int32, UInt8
00016
00017 '''
00018 Created on 08.05.2013
00019
00020 @author: Philipp Schillinger
00021 '''
00022 class VigirBehaviorMirror(object):
00023 '''
00024 classdocs
00025 '''
00026
00027
00028 def __init__(self):
00029 '''
00030 Constructor
00031 '''
00032 self._sm = None
00033
00034 smach.set_loggers (
00035 rospy.logdebug,
00036 rospy.logwarn,
00037 rospy.logdebug,
00038 rospy.logerr
00039 )
00040
00041
00042
00043 self._pub = ProxyPublisher({'flexbe/behavior_update': String,
00044 'flexbe/request_mirror_structure': Int32})
00045
00046 self._running = False
00047 self._stopping = False
00048 self._active_id = 0
00049 self._starting_path = None
00050 self._current_struct = None
00051 self._sync_lock = threading.Lock()
00052
00053 self._outcome_topic = 'flexbe/mirror/outcome'
00054
00055 self._struct_buffer = list()
00056
00057
00058 self._sub = ProxySubscriberCached()
00059 self._sub.subscribe(self._outcome_topic, UInt8)
00060 self._sub.enable_buffer(self._outcome_topic)
00061
00062 self._sub.subscribe('flexbe/mirror/structure', ContainerStructure, self._mirror_callback)
00063 self._sub.subscribe('flexbe/status', BEStatus, self._status_callback)
00064 self._sub.subscribe('flexbe/mirror/sync', BehaviorSync, self._sync_callback)
00065 self._sub.subscribe('flexbe/mirror/preempt', Empty, self._preempt_callback)
00066
00067
00068 def _mirror_callback(self, msg):
00069 rate = rospy.Rate(10)
00070 while self._stopping:
00071 rate.sleep()
00072
00073 if self._running:
00074 rospy.logwarn('Received a new mirror structure while mirror is already running, adding to buffer (checksum: %s).' % str(msg.behavior_id))
00075 elif self._active_id != 0 and msg.behavior_id != self._active_id:
00076 rospy.logwarn('checksum of received mirror structure (%s) does not match expected checksum (%s), will ignore.' % (str(msg.behavior_id), str(self._active_id)))
00077 return
00078 else:
00079 rospy.loginfo('Received a new mirror structure for checksum %s' % str(msg.behavior_id))
00080
00081 self._struct_buffer.append(msg)
00082
00083 if self._active_id == msg.behavior_id:
00084 self._struct_buffer = list()
00085 self._mirror_state_machine(msg)
00086 rospy.loginfo('Mirror built.')
00087
00088 self._execute_mirror()
00089
00090
00091 def _status_callback(self, msg):
00092 if msg.code == BEStatus.STARTED:
00093 thread = threading.Thread(target=self._start_mirror, args=[msg])
00094 thread.daemon = True
00095 thread.start()
00096 else:
00097 thread = threading.Thread(target=self._stop_mirror, args=[msg])
00098 thread.daemon = True
00099 thread.start()
00100
00101
00102 def _start_mirror(self, msg):
00103 self._sync_lock.acquire()
00104 rate = rospy.Rate(10)
00105 while self._stopping:
00106 rate.sleep()
00107
00108 if self._running:
00109 rospy.logwarn('Tried to start mirror while it is already running, will ignore.')
00110 return
00111
00112 if len(msg.args) > 0:
00113 self._starting_path = "/" + msg.args[0][1:].replace("/", "_mirror/") + "_mirror"
00114
00115 self._active_id = msg.behavior_id
00116
00117 while self._sm is None and len(self._struct_buffer) > 0:
00118 struct = self._struct_buffer[0]
00119 self._struct_buffer = self._struct_buffer[1:]
00120 if struct.behavior_id == self._active_id:
00121 self._mirror_state_machine(struct)
00122 rospy.loginfo('Mirror built.')
00123 else:
00124 rospy.logwarn('Discarded mismatching buffered structure for checksum %s' % str(struct.behavior_id))
00125
00126 if self._sm is None:
00127 rospy.logwarn('Missing correct mirror structure, requesting...')
00128 rospy.sleep(0.2)
00129 self._pub.publish('flexbe/request_mirror_structure', Int32(msg.behavior_id))
00130 self._active_id = msg.behavior_id
00131 return
00132 self._sync_lock.release()
00133
00134 self._execute_mirror()
00135
00136
00137 def _stop_mirror(self, msg):
00138 self._sync_lock.acquire()
00139 self._stopping = True
00140 if self._sm is not None and self._running:
00141 if msg is not None and msg.code == BEStatus.FINISHED:
00142 rospy.loginfo('Onboard behavior finished successfully.')
00143 self._pub.publish('flexbe/behavior_update', String())
00144 elif msg is not None and msg.code == BEStatus.SWITCHING:
00145 self._starting_path = None
00146 rospy.loginfo('Onboard performing behavior switch.')
00147 elif msg is not None and msg.code == BEStatus.READY:
00148 rospy.loginfo('Onboard engine just started, stopping currently running mirror.')
00149 self._pub.publish('flexbe/behavior_update', String())
00150 elif msg is not None:
00151 rospy.logwarn('Onboard behavior failed!')
00152 self._pub.publish('flexbe/behavior_update', String())
00153
00154 PreemptableState.preempt = True
00155 rate = rospy.Rate(10)
00156 while self._running:
00157 rate.sleep()
00158 else:
00159 rospy.loginfo('No onboard behavior is active.')
00160
00161 self._active_id = 0
00162 self._sm = None
00163 self._current_struct = None
00164 self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
00165
00166 if msg is not None and msg.code != BEStatus.SWITCHING:
00167 rospy.loginfo('\033[92m--- Behavior Mirror ready! ---\033[0m')
00168
00169 self._stopping = False
00170 self._sync_lock.release()
00171
00172
00173 def _sync_callback(self, msg):
00174 if msg.behavior_id == self._active_id:
00175 thread = threading.Thread(target=self._restart_mirror, args=[msg])
00176 thread.daemon = True
00177 thread.start()
00178 else:
00179 rospy.logerr('Cannot synchronize! Different behavior is running onboard, please stop execution!')
00180 thread = threading.Thread(target=self._stop_mirror, args=[None])
00181 thread.daemon = True
00182 thread.start()
00183
00184
00185 def _restart_mirror(self, msg):
00186 self._sync_lock.acquire()
00187 rospy.loginfo('Restarting mirror for synchronization...')
00188 self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
00189 if self._sm is not None and self._running:
00190 PreemptableState.preempt = True
00191 rate = rospy.Rate(10)
00192 while self._running:
00193 rate.sleep()
00194 self._sm = None
00195 if msg.current_state_checksum in self._state_checksums:
00196 current_state_path = self._state_checksums[msg.current_state_checksum]
00197 self._starting_path = "/" + current_state_path[1:].replace("/", "_mirror/") + "_mirror"
00198 rospy.loginfo('Current state: %s' % current_state_path)
00199 try:
00200 self._mirror_state_machine(self._current_struct)
00201 rospy.loginfo('Mirror built.')
00202 except (AttributeError, smach.InvalidStateError):
00203 rospy.loginfo('Stopping synchronization because behavior has stopped.')
00204 self._sync_lock.release()
00205
00206 self._execute_mirror()
00207
00208
00209 def _execute_mirror(self):
00210 self._running = True
00211
00212 rospy.loginfo("Executing mirror...")
00213 if self._starting_path is not None:
00214 LockableStateMachine.path_for_switch = self._starting_path
00215 rospy.loginfo("Starting mirror in state " + self._starting_path)
00216 self._starting_path = None
00217
00218 result = 'preempted'
00219 try:
00220 result = self._sm.execute()
00221 except Exception as e:
00222 rospy.loginfo('Caught exception on preempt:\n%s' % str(e))
00223 result = 'preempted'
00224
00225 if self._sm is not None:
00226 self._sm.destroy()
00227 self._running = False
00228
00229 rospy.loginfo('Mirror finished with result ' + result)
00230
00231
00232 def _mirror_state_machine(self, msg):
00233 self._current_struct = msg
00234 self._state_checksums = dict()
00235 root = None
00236 for con_msg in msg.containers:
00237 if con_msg.path.find('/') == -1:
00238 root = con_msg.path
00239 break
00240 self._add_node(msg, root)
00241
00242 for con_msg in msg.containers:
00243 if con_msg.path.find('/') != -1:
00244 self._state_checksums[zlib.adler32(con_msg.path)] = con_msg.path
00245
00246
00247 def _add_node(self, msg, path):
00248
00249 container = None
00250 for con_msg in msg.containers:
00251 if con_msg.path == path:
00252 container = con_msg
00253 break
00254
00255 transitions = None
00256 if container.transitions is not None:
00257 transitions = {}
00258 for i in range(len(container.transitions)):
00259 transitions[container.outcomes[i]] = container.transitions[i] + '_mirror'
00260
00261 path_frags = path.split('/')
00262 container_name = path_frags[len(path_frags)-1]
00263 if len(container.children) > 0:
00264 sm_outcomes = []
00265 for outcome in container.outcomes: sm_outcomes.append(outcome + '_mirror')
00266 sm = JumpableStateMachine(outcomes=sm_outcomes)
00267 with sm:
00268 for child in container.children:
00269 self._add_node(msg, path+'/'+child)
00270 if len(transitions) > 0:
00271 container_transitions = {}
00272 for i in range(len(container.transitions)): container_transitions[sm_outcomes[i]] = transitions[container.outcomes[i]]
00273 JumpableStateMachine.add(container_name + '_mirror', sm, transitions=container_transitions)
00274 else:
00275 self._sm = sm
00276 else:
00277 JumpableStateMachine.add(container_name + '_mirror', MirrorState(container_name, path, container.outcomes, container.autonomy), transitions=transitions)
00278
00279
00280 def _jump_callback(self, msg):
00281 start_time = rospy.Time.now()
00282 current_elapsed = 0
00283 jumpable = True
00284 while self._sm is None or not self._sm.is_running():
00285 current_elapsed = rospy.Time.now() - start_time
00286 if current_elapsed > rospy.Duration.from_sec(10):
00287 jumpable = False
00288 break
00289 rospy.sleep(0.05)
00290 if jumpable:
00291 self._sm.jump_to(msg.stateName)
00292 else:
00293 rospy.logwarn('Could not jump in mirror: Mirror does not exist or is not running!')
00294
00295
00296 def _preempt_callback(self, msg):
00297 if self._sm is not None and self._sm.is_running():
00298 rospy.logwarn('Explicit preempting is currently ignored, mirror should be preempted by onboard behavior.')
00299 else:
00300 rospy.logwarn('Could not preempt mirror because it seems not to be running!')
00301
00302
00303
00304