src/cob_auto_tools/auto_recover.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import copy
19 import rospy
20 
21 from cob_msgs.msg import EmergencyStopState
22 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
23 from std_srvs.srv import Trigger, TriggerResponse
24 
25 from simple_script_server import *
27 
28 class AutoRecover():
29 
30  def __init__(self):
31  now = rospy.Time.now()
32  self.em_state = EmergencyStopState.EMFREE
33  self.recover_emergency = rospy.get_param('~recover_emergency', True)
34  self.recover_diagnostics = rospy.get_param('~recover_diagnostics', True)
35  self.components = rospy.get_param('~components', {})
37  for component in list(self.components.keys()):
38  self.components_recover_time[component] = now
39 
40  rospy.Service('~enable',Trigger,self.enable_cb)
41  rospy.Service('~disable',Trigger,self.disable_cb)
42  self.enabled = True
43  self.subscribe()
44 
45  # auto recover based on emergency stop
46  def em_cb(self, msg):
47  if msg.emergency_state == EmergencyStopState.EMFREE and self.em_state != EmergencyStopState.EMFREE:
48  if not self.recover_emergency:
49  rospy.loginfo("auto_recover from emergency state is disabled")
50  else:
51  rospy.loginfo("auto_recover from emergency state")
52  self.recover(list(self.components.keys()))
53  self.em_state = copy.deepcopy(msg.emergency_state)
54 
55  # auto recover based on diagnostics
56  def diagnostics_cb(self, msg):
57  if self.recover_diagnostics:
58  for status in msg.status:
59  for component in list(self.components.keys()):
60  if status.name.lower().startswith(self.components[component].lower()) and status.level > DiagnosticStatus.OK and self.em_state == EmergencyStopState.EMFREE and (rospy.Time.now() - self.components_recover_time[component] > rospy.Duration(10)):
61  rospy.loginfo("auto_recover from diagnostic failure")
62  self.recover([component])
63  else:
64  rospy.loginfo_once("auto_recover from diagnostic failure is disabled") # pylint: disable=no-member
65 
66  # callback for enable service
67  def enable_cb(self, req):
68  if not self.enabled:
69  self.enabled = True
70  self.subscribe()
71  return TriggerResponse(True, "auto recover enabled")
72 
73  # callback for disable service
74  def disable_cb(self, req):
75  if self.enabled:
76  self.enabled = False
77  self.unsubscribe()
78  return TriggerResponse(True, "auto recover disabled")
79 
80  def subscribe(self):
81  self.em_stop_state_subscriber = rospy.Subscriber("/emergency_stop_state", EmergencyStopState, self.em_cb, queue_size=1)
82  self.diagnostics_subscriber = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diagnostics_cb, queue_size=1)
83 
84  def unsubscribe(self):
85  self.em_stop_state_subscriber.unregister()
86  self.diagnostics_subscriber.unregister()
87 
88  def recover(self, components):
89  if self.enabled:
90  for component in components:
91  handle = sss.recover(component)
92  if not (handle.get_error_code() == 0):
93  rospy.logerr("[auto_recover]: Could not recover %s", component)
94  else:
95  rospy.loginfo("[auto_recover]: Component %s recovered successfully", component)
96  self.components_recover_time[component] = rospy.Time.now()
97 
cob_auto_tools.auto_recover.AutoRecover.diagnostics_subscriber
diagnostics_subscriber
Definition: src/cob_auto_tools/auto_recover.py:82
cob_auto_tools.auto_recover.AutoRecover.subscribe
def subscribe(self)
Definition: src/cob_auto_tools/auto_recover.py:80
cob_auto_tools.auto_recover.AutoRecover.em_state
em_state
Definition: src/cob_auto_tools/auto_recover.py:32
cob_auto_tools.auto_recover.AutoRecover.recover_diagnostics
recover_diagnostics
Definition: src/cob_auto_tools/auto_recover.py:34
cob_auto_tools.auto_recover.AutoRecover.components_recover_time
components_recover_time
Definition: src/cob_auto_tools/auto_recover.py:36
cob_auto_tools.auto_recover.AutoRecover.em_cb
def em_cb(self, msg)
Definition: src/cob_auto_tools/auto_recover.py:46
simple_script_server
cob_auto_tools.auto_recover.AutoRecover.diagnostics_cb
def diagnostics_cb(self, msg)
Definition: src/cob_auto_tools/auto_recover.py:56
cob_auto_tools.auto_recover.AutoRecover.components
components
Definition: src/cob_auto_tools/auto_recover.py:35
cob_auto_tools.auto_recover.AutoRecover.disable_cb
def disable_cb(self, req)
Definition: src/cob_auto_tools/auto_recover.py:74
cob_auto_tools.auto_recover.AutoRecover.recover_emergency
recover_emergency
Definition: src/cob_auto_tools/auto_recover.py:33
cob_auto_tools.auto_recover.AutoRecover.__init__
def __init__(self)
Definition: src/cob_auto_tools/auto_recover.py:30
cob_auto_tools.auto_recover.AutoRecover.enable_cb
def enable_cb(self, req)
Definition: src/cob_auto_tools/auto_recover.py:67
cob_auto_tools.auto_recover.AutoRecover.recover
def recover(self, components)
Definition: src/cob_auto_tools/auto_recover.py:88
cob_auto_tools.auto_recover.AutoRecover
Definition: src/cob_auto_tools/auto_recover.py:28
cob_auto_tools.auto_recover.AutoRecover.enabled
enabled
Definition: src/cob_auto_tools/auto_recover.py:42
cob_auto_tools.auto_recover.AutoRecover.em_stop_state_subscriber
em_stop_state_subscriber
Definition: src/cob_auto_tools/auto_recover.py:81
cob_auto_tools.auto_recover.AutoRecover.unsubscribe
def unsubscribe(self)
Definition: src/cob_auto_tools/auto_recover.py:84


cob_helper_tools
Author(s): Felix Messmer
autogenerated on Fri Aug 2 2024 09:45:50