auto_recover.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import copy
19 import rospy
20 
21 from cob_msgs.msg import EmergencyStopState
22 from diagnostic_msgs.msg import DiagnosticArray
23 from std_srvs.srv import Trigger, TriggerResponse
24 
25 from simple_script_server import *
27 
28 class AutoRecover():
29 
30  def __init__(self):
31  now = rospy.Time.now()
32  self.em_state = 0
33  self.recover_emergency = rospy.get_param('~recover_emergency', True)
34  self.recover_diagnostics = rospy.get_param('~recover_diagnostics', True)
35  self.components = rospy.get_param('~components', {})
37  for component in list(self.components.keys()):
38  self.components_recover_time[component] = now
39 
40  rospy.Service('~enable',Trigger,self.enable_cb)
41  rospy.Service('~disable',Trigger,self.disable_cb)
42  self.enabled = True
43  self.subscribe()
44 
45  # auto recover based on emergency stop
46  def em_cb(self, msg):
47  if not self.recover_emergency:
48  return
49  if msg.emergency_state == 0 and self.em_state != 0:
50  rospy.loginfo("auto_recover from emergency state")
51  self.recover(list(self.components.keys()))
52  self.em_state = copy.deepcopy(msg.emergency_state)
53 
54  # auto recover based on diagnostics
55  def diagnostics_cb(self, msg):
56  if not self.recover_diagnostics:
57  return
58  for status in msg.status:
59  for component in list(self.components.keys()):
60  if status.name.lower().startswith(self.components[component].lower()) and status.level > 0 and self.em_state == 0 and (rospy.Time.now() - self.components_recover_time[component] > rospy.Duration(10)):
61  rospy.loginfo("auto_recover from diagnostic failure")
62  self.recover([component])
63 
64  # callback for enable service
65  def enable_cb(self, req):
66  self.enabled = True
67  self.subscribe()
68  return TriggerResponse(True, "auto recover enabled")
69 
70  # callback for disable service
71  def disable_cb(self, req):
72  self.enabled = False
73  self.unsubscribe()
74  return TriggerResponse(True, "auto recover disabled")
75 
76  def subscribe(self):
77  self.em_stop_state_subscriber = rospy.Subscriber("/emergency_stop_state", EmergencyStopState, self.em_cb, queue_size=1)
78  self.diagnostics_subscriber = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diagnostics_cb, queue_size=1)
79 
80  def unsubscribe(self):
81  self.em_stop_state_subscriber.unregister()
82  self.diagnostics_subscriber.unregister()
83 
84  def recover(self, components):
85  if self.enabled:
86  for component in components:
87  handle = sss.recover(component)
88  if not (handle.get_error_code() == 0):
89  rospy.logerr("[auto_recover]: Could not recover %s", component)
90  else:
91  rospy.loginfo("[auto_recover]: Component %s recovered successfully", component)
92  self.components_recover_time[component] = rospy.Time.now()
93 
94 if __name__ == "__main__":
95  rospy.init_node("auto_recover")
96  AR = AutoRecover()
97  rospy.loginfo("auto recover running")
98  rospy.spin()
def em_cb(self, msg)
Definition: auto_recover.py:46
def recover(self, components)
Definition: auto_recover.py:84
def enable_cb(self, req)
Definition: auto_recover.py:65
def disable_cb(self, req)
Definition: auto_recover.py:71
def diagnostics_cb(self, msg)
Definition: auto_recover.py:55


cob_helper_tools
Author(s): Felix Messmer
autogenerated on Wed Apr 7 2021 03:03:09