Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 class Constants:
00034 pkg = "actionlib"
00035 node = "test_simple_action_server_deadlock"
00036 topic = "deadlock"
00037 deadlock_timeout = 45
00038 shutdown_timeout = 2
00039 max_action_duration = 3
00040
00041 import random
00042 import sys
00043 import threading
00044 import unittest
00045
00046 import actionlib
00047 from actionlib.msg import TestAction
00048 import rosnode
00049 import rospy
00050
00051
00052 class DeadlockTest(unittest.TestCase):
00053
00054 def test_deadlock(self):
00055
00056 self.condition = threading.Condition()
00057 self.last_execution_time = None
00058
00059
00060 self.action_server = actionlib.SimpleActionServer(
00061 Constants.topic,
00062 TestAction,
00063 execute_cb=self.execute_callback,
00064 auto_start=False)
00065
00066 self.action_server.register_preempt_callback(self.preempt_callback)
00067 self.action_server.start()
00068
00069
00070 rospy.sleep(Constants.deadlock_timeout)
00071
00072
00073 running_nodes = set(rosnode.get_node_names())
00074 required_nodes = {
00075 "/deadlock_companion_1",
00076 "/deadlock_companion_2",
00077 "/deadlock_companion_3",
00078 "/deadlock_companion_4",
00079 "/deadlock_companion_5"}
00080
00081 self.assertTrue(required_nodes.issubset(running_nodes),
00082 "Required companion nodes are not currently running")
00083
00084
00085 termination_time = rospy.Time.now()
00086 rosnode.kill_nodes(required_nodes)
00087
00088 rospy.sleep(Constants.shutdown_timeout)
00089
00090
00091 self.assertIsNotNone(self.last_execution_time is None,
00092 "Execute Callback was never executed")
00093
00094 time_since_last_execution = (
00095 termination_time - self.last_execution_time).to_sec()
00096
00097 self.assertTrue(
00098 time_since_last_execution < 2 * Constants.max_action_duration,
00099 "Too long since last goal was executed; likely due to a deadlock")
00100
00101 def execute_callback(self, goal):
00102
00103 self.last_execution_time = rospy.Time.now()
00104
00105
00106 action_duration = random.uniform(0, Constants.max_action_duration)
00107
00108 with self.condition:
00109 if not self.action_server.is_preempt_requested():
00110 self.condition.wait(action_duration)
00111
00112 if self.action_server.is_preempt_requested():
00113 self.action_server.set_preempted()
00114 else:
00115 self.action_server.set_succeeded()
00116
00117 def preempt_callback(self):
00118 with self.condition:
00119 self.condition.notify()
00120
00121
00122 if __name__ == '__main__':
00123 import rostest
00124 rospy.init_node(Constants.node)
00125 rostest.rosrun(Constants.pkg, Constants.node, DeadlockTest)