calibrate_class.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2014 Shadow Robot Company Ltd.
00004 #
00005 # This program is free software: you can redistribute it and/or modify it
00006 # under the terms of the GNU General Public License as published by the Free
00007 # Software Foundation, either version 2 of the License, or (at your option)
00008 # any later version.
00009 #
00010 # This program is distributed in the hope that it will be useful, but WITHOUT
00011 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
00013 # more details.
00014 #
00015 # You should have received a copy of the GNU General Public License along
00016 # with this program.  If not, see <http://www.gnu.org/licenses/>.
00017 
00018 
00019 import threading
00020 from time import sleep
00021 
00022 # Loads interface with the robot.
00023 import rospy
00024 import controller_manager_msgs.srv as controller_manager_srvs
00025 from std_msgs.msg import Bool, Empty
00026 
00027 
00028 class Calibrate(object):
00029 
00030     def __init__(self):
00031         rospy.loginfo("waiting for the controller manager")
00032         rospy.wait_for_service('controller_manager/load_controller', timeout=40.0)
00033         rospy.wait_for_service('controller_manager/unload_controller', timeout=40.0)
00034         rospy.wait_for_service('controller_manager/switch_controller', timeout=40.0)
00035         rospy.loginfo("OK the controller manager is ready")
00036 
00037         self.load_controller = rospy.ServiceProxy('controller_manager/load_controller',
00038                                                   controller_manager_srvs.LoadController)
00039         self.unload_controller = rospy.ServiceProxy('controller_manager/unload_controller',
00040                                                     controller_manager_srvs.UnloadController)
00041         self.switch_controller = rospy.ServiceProxy('controller_manager/switch_controller',
00042                                                     controller_manager_srvs.SwitchController)
00043         self.pub_calibrated = rospy.Publisher('calibrated', Bool, queue_size=1, latch=True)
00044 
00045     def calibrate(self, controllers):
00046 
00047         success = True
00048 
00049         if type(controllers) is not list:
00050             controllers = [controllers]
00051 
00052         launched = []
00053         try:
00054             # Loads the controllers
00055             for c in controllers:
00056                 resp = self.load_controller(c)
00057                 if resp.ok == 0:
00058                     rospy.logerr("Failed: %s" % c)
00059                     success = False
00060                 else:
00061                     launched.append(c)
00062             print("Launched: %s" % ', '.join(launched))
00063 
00064             # Starts the launched controllers
00065             self.switch_controller(launched, [], controller_manager_srvs.SwitchControllerRequest.BEST_EFFORT)
00066 
00067             # Sets up callbacks for calibration completion
00068             waiting_for = launched[:]
00069 
00070             def calibrated(msg, name):  # Somewhat not thread-safe
00071                 if name in waiting_for:
00072                     waiting_for.remove(name)
00073             for name in launched:
00074                 rospy.Subscriber("%s/calibrated" % name, Empty, calibrated, name)
00075 
00076             # Waits until all the controllers have calibrated
00077             while waiting_for and not rospy.is_shutdown():
00078                 print("Waiting for: %s" % ', '.join(waiting_for))
00079                 sleep(0.5)
00080         finally:
00081             for name in launched:
00082                 try:
00083                     resp_stop = self.switch_controller([], [name], controller_manager_srvs.SwitchControllerRequest.STRICT)
00084                     if (resp_stop == 0):
00085                         rospy.logerr("Failed to stop controller %s" % name)
00086                     resp_unload = self.unload_controller(name)
00087                     if (resp_unload == 0):
00088                         rospy.logerr("Failed to unload controller %s" % name)
00089                 except Exception, ex:
00090                     rospy.logerr("Failed to stop/unload controller %s" % name)
00091         return success
00092 
00093     @staticmethod
00094     def calibrate_imu():
00095         class is_calibrated_helper:
00096             def __init__(self):
00097                 self.is_calibrated = False
00098                 self.cond = threading.Condition()
00099 
00100             def callback(self, msg):
00101                 if msg.data:
00102                     with self.cond:
00103                         self.is_calibrated = True
00104                         self.cond.notify()
00105 
00106             def wait_for_calibrated(self, topic, timeout):
00107                 self.sub = rospy.Subscriber(topic, Bool, self.callback)
00108                 try:
00109                     with self.cond:
00110                         if not self.is_calibrated:
00111                             self.cond.wait(timeout)
00112                     return self.is_calibrated
00113                 finally:
00114                     self.sub.unregister()
00115                 return self.is_calibrated
00116 
00117         print("Waiting up to 20s for IMU calibration to complete")
00118         helper = is_calibrated_helper()
00119         if not helper.wait_for_calibrated("torso_lift_imu/is_calibrated", 20):
00120             rospy.logerr("IMU took too long to calibrate.")
00121             return False
00122         return True


ros_ethercat_model
Author(s): Manos Nikolaidis
autogenerated on Thu Jul 4 2019 20:01:55