moving_arm.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2015 Airbus
00004 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License");
00007 # you may not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 #   http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS,
00014 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 
00018 
00019 import rospy
00020 
00021 from airbus_ssm_core import ssm_state
00022 from std_msgs.msg import Empty
00023 from geometry_msgs.msg  import PoseStamped
00024         
00025     
00026 class MoveArticular(ssm_state.ssmState):
00027 
00028     def __init__(self):
00029         ssm_state.ssmState.__init__(self,
00030                                     outcomes = ["finish"],
00031                                     io_keys = ['target_joint','movement_timeout', 'arm_prefix'])
00032         
00033         self.pub_joint_cmd = None
00034         self.pub_stop_cmd = None
00035         
00036         self.sub_trajectory_finished = None
00037         self.finished_ = False
00038         
00039     def _finished_cb(self, finished):
00040             self.finished_ = True
00041         
00042     def move(self, traj, ud):
00043         self.sub_trajectory_finished = rospy.Subscriber("/"+arm_prefix+"/movement_finished",Empty, self._finished_cb)
00044         self.finished_ = False     
00045  
00046     
00047         if(len(traj.positions) == 0):
00048             rospy.logerr("[Arm Skill] : Reading Path Error")
00049             return "preempt"
00050         
00051         r = rospy.Rate(10)
00052         trajectory_done = False
00053         timeout_ = int(ud.movement_timeout)
00054         if(timeout_ < 10):
00055             rospy.logwarn("[Arm Skill] : Articular Command Timeout set to 10 seconds !")
00056             timeout_ = 10
00057             
00058         now_ = rospy.Time.now()
00059         self.pub_joint_cmd.publish(traj)
00060         rospy.loginfo("[Arm Skill] : Moving")
00061         while (not((self.preempt_requested()) or (self.finished_))):
00062             if ((rospy.Time.now() - now_) > timeout_):
00063                 rospy.logerr("[Arm Skill] : Articular Command Timed out")
00064                 self.stop_cmd_pub.publish()
00065                 self.execution = False
00066                 return "preempt"
00067             else:
00068                 r.sleep()
00069                 
00070         if(self.finished_):
00071             return "finish"
00072         else:
00073             rospy.logerr("[Arm Skill] : Command Preempted")
00074             self.stop_cmd_pub.publish()
00075             return "preempt"    
00076         
00077     def execution(self, ud):
00078         self.pub_joint_cmd = rospy.Publisher("/"+ud.arm_prefix+"/articular_cmd",JointTrajectoryPoint,queue_size=1)
00079         self.pub_stop_cmd = rospy.Publisher("/"+ud.arm_prefix+"/stop_movement",Empty,queue_size=1)
00080         joint = ud.target_joint
00081         rospy.sleep(1)
00082         if(joint == None):
00083             return "preempt"
00084         return self.move(joint, ud)
00085     
00086 class MoveCartesian(ssm_state.ssmState):
00087 
00088     def __init__(self):
00089         ssm_state.ssmState.__init__(self,
00090                                     outcomes = ["finish"],
00091                                     io_keys = ['target_point' , 'tool', 'movement_timeout', 'arm_prefix'])
00092         
00093 
00094         
00095         self.sub_trajectory_finished = None
00096         self.finished_ = False
00097         
00098     def _finished_cb(self, finished):
00099             self.finished_ = True
00100         
00101     def move(self, pt, ud):
00102         self.sub_trajectory_finished = rospy.Subscriber("/"+ud.arm_prefix+"/movement_finished",Empty, self._finished_cb)
00103         self.finished_ = False     
00104 
00105         trajectory_done = False
00106         timeout_ = int(ud.movement_timeout)
00107         if(timeout_ < 10):
00108             rospy.logwarn("[Arm Skill] : Cartesian Command Timeout set to 10 seconds !")
00109             timeout_ = 10
00110             
00111         now_ = rospy.Time.now()
00112         self.pub_cart_cmd.publish(pt)
00113         rospy.loginfo("[Arm Skill] : Moving")
00114         while (not((self.preempt_requested()) or (self.finished_))):
00115             if ((rospy.Time.now() - now_) > timeout_):
00116                 rospy.logerr("[Arm Skill] : Cartesian Command Timed out")
00117                 self.stop_cmd_pub.publish()
00118                 self.execution = False
00119                 return "preempt"
00120             else:
00121                 rospy.sleep(0.1)
00122                 
00123         if(self.finished_):
00124             return "finish"
00125         else:
00126             rospy.logerr("[Arm Skill] : Command Preempted")
00127             self.stop_cmd_pub.publish()
00128             return "preempt"    
00129         
00130     def execution(self, ud):
00131         self.pub_cart_cmd = rospy.Publisher("/"+ud.arm_prefix+"/cartesian_cmd",PoseStamped,queue_size=1)
00132         self.pub_stop_cmd = rospy.Publisher("/"+ud.arm_prefix+"/stop_movement",Empty,queue_size=1)
00133         rospy.sleep(1)
00134         if(ud.target_point == None):
00135             rospy.logerr("[Arm Skill] : Target Point Not Defined !")
00136             return "preempt"
00137         return self.move(ud.target_point, ud)
00138 
00139 
00140 
00141 #End of file
00142 


airbus_ssm_tutorial
Author(s): eads
autogenerated on Thu Jun 6 2019 17:59:32