36 from actionlib.msg
import TestAction
43 node =
"test_simple_action_server_deadlock" 47 max_action_duration = 3
65 self.action_server.start()
68 rospy.sleep(Constants.deadlock_timeout)
71 running_nodes = set(rosnode.get_node_names())
73 "/deadlock_companion_1",
74 "/deadlock_companion_2",
75 "/deadlock_companion_3",
76 "/deadlock_companion_4",
77 "/deadlock_companion_5"}
80 required_nodes.issubset(running_nodes),
81 "Required companion nodes are not currently running")
84 termination_time = rospy.Time.now()
85 rosnode.kill_nodes(required_nodes)
87 rospy.sleep(Constants.shutdown_timeout)
92 "Execute Callback was never executed")
94 time_since_last_execution = (
98 time_since_last_execution < 2 * Constants.max_action_duration,
99 "Too long since last goal was executed; likely due to a deadlock")
106 action_duration = random.uniform(0, Constants.max_action_duration)
109 if not self.action_server.is_preempt_requested():
110 self.condition.wait(action_duration)
112 if self.action_server.is_preempt_requested():
113 self.action_server.set_preempted()
115 self.action_server.set_succeeded()
119 self.condition.notify()
122 if __name__ ==
'__main__':
124 rospy.init_node(Constants.node)
125 rostest.rosrun(Constants.pkg, Constants.node, DeadlockTest)
def preempt_callback(self)
def execute_callback(self, goal)