Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 import random
00032 import threading
00033 import unittest
00034
00035 import actionlib
00036 from actionlib.msg import TestAction
00037 import rosnode
00038 import rospy
00039
00040
00041 class Constants:
00042 pkg = "actionlib"
00043 node = "test_simple_action_server_deadlock"
00044 topic = "deadlock"
00045 deadlock_timeout = 45
00046 shutdown_timeout = 2
00047 max_action_duration = 3
00048
00049
00050 class DeadlockTest(unittest.TestCase):
00051
00052 def test_deadlock(self):
00053
00054 self.condition = threading.Condition()
00055 self.last_execution_time = None
00056
00057
00058 self.action_server = actionlib.SimpleActionServer(
00059 Constants.topic,
00060 TestAction,
00061 execute_cb=self.execute_callback,
00062 auto_start=False)
00063
00064 self.action_server.register_preempt_callback(self.preempt_callback)
00065 self.action_server.start()
00066
00067
00068 rospy.sleep(Constants.deadlock_timeout)
00069
00070
00071 running_nodes = set(rosnode.get_node_names())
00072 required_nodes = {
00073 "/deadlock_companion_1",
00074 "/deadlock_companion_2",
00075 "/deadlock_companion_3",
00076 "/deadlock_companion_4",
00077 "/deadlock_companion_5"}
00078
00079 self.assertTrue(
00080 required_nodes.issubset(running_nodes),
00081 "Required companion nodes are not currently running")
00082
00083
00084 termination_time = rospy.Time.now()
00085 rosnode.kill_nodes(required_nodes)
00086
00087 rospy.sleep(Constants.shutdown_timeout)
00088
00089
00090 self.assertIsNotNone(
00091 self.last_execution_time is None,
00092 "Execute Callback was never executed")
00093
00094 time_since_last_execution = (
00095 termination_time - self.last_execution_time).to_sec()
00096
00097 self.assertTrue(
00098 time_since_last_execution < 2 * Constants.max_action_duration,
00099 "Too long since last goal was executed; likely due to a deadlock")
00100
00101 def execute_callback(self, goal):
00102
00103 self.last_execution_time = rospy.Time.now()
00104
00105
00106 action_duration = random.uniform(0, Constants.max_action_duration)
00107
00108 with self.condition:
00109 if not self.action_server.is_preempt_requested():
00110 self.condition.wait(action_duration)
00111
00112 if self.action_server.is_preempt_requested():
00113 self.action_server.set_preempted()
00114 else:
00115 self.action_server.set_succeeded()
00116
00117 def preempt_callback(self):
00118 with self.condition:
00119 self.condition.notify()
00120
00121
00122 if __name__ == '__main__':
00123 import rostest
00124 rospy.init_node(Constants.node)
00125 rostest.rosrun(Constants.pkg, Constants.node, DeadlockTest)