robotis_servo_sim.py
Go to the documentation of this file.
00001 
00002 # ROS imports
00003 import roslib
00004 roslib.load_manifest('rfid_people_following')
00005 roslib.load_manifest('robotis')
00006 import rospy
00007 
00008 from std_msgs.msg import Float64
00009 from robotis.srv import None_Float
00010 from robotis.srv import None_FloatResponse
00011 from robotis.srv import MoveAng
00012 from robotis.srv import MoveAngResponse
00013 from robotis.srv import None_Int32
00014 from robotis.srv import None_Int32Response
00015 
00016 import time
00017 import numpy as np, math
00018 from threading import Thread
00019 
00020 def standard_rad(t):
00021     if t > 0:
00022         return ((t + np.pi) % (np.pi * 2))  - np.pi
00023     else:
00024         return ((t - np.pi) % (np.pi * -2)) + np.pi
00025 
00026 
00027 class USB2Dynamixel_Device():
00028     ''' Class that manages serial port contention between servos on same bus
00029     '''
00030     def __init__( self, dev_name = '/dev/ttyUSB0', baudrate = 57600 ):
00031         self.dev_name = dev_name
00032 
00033 
00034 
00035 class Robotis_Servo( Thread ):
00036     ''' Class to use a robotis RX-28 or RX-64 servo.
00037     '''
00038     def __init__(self, USB2Dynamixel, servo_id ):
00039         ''' USB2Dynamixel - USB2Dynamixel_Device object to handle serial port.
00040                             Handles threadsafe operation for multiple servos
00041             servo_id - servo ids connected to USB2Dynamixel 1,2,3,4 ... (1 to 253)
00042                        [0 is broadcast if memory serves]
00043         '''
00044         Thread.__init__( self )
00045 
00046         try:
00047             rospy.init_node( 'RobotisServo_'+str(servo_id) )
00048         except rospy.ROSException:
00049             pass
00050 
00051         self.should_run = True
00052         self.is_moving_flag = False
00053         self.curr_angle = 0.0
00054         self.rate = 0.0
00055         self.des_angle = 0.0
00056         self.max_speed = math.radians( 50 )
00057 
00058 
00059         self.dyn = USB2Dynamixel
00060 
00061         # ID exists on bus?
00062         self.servo_id = servo_id
00063         self.start()
00064 
00065     def is_moving(self):
00066         ''' returns True if servo is moving.
00067         '''
00068         return self.is_moving_flag
00069 
00070     def write_address(self, address, data):
00071         ''' writes data at the address.
00072             data = [n1,n2 ...] list of numbers.
00073             return [n1,n2 ...] (list of return parameters)
00074         '''
00075         return 
00076 
00077     def read_angle(self):
00078         ''' returns the current servo angle (radians)
00079         '''
00080         return self.curr_angle
00081 
00082     def move_angle(self, ang, angvel=None, blocking=True):
00083         ''' move to angle (radians)
00084         '''
00085         self.rate = angvel
00086         self.des_angle = ang
00087         rospy.sleep( 0.05 )
00088         while blocking and self.is_moving_flag:
00089             rospy.sleep( 0.05 )
00090         return
00091 
00092     def run( self ):
00093         #last_time = time.time()
00094         last_time = rospy.Time.now().to_nsec() * 10**-9
00095         while self.should_run and not rospy.is_shutdown():
00096             rospy.sleep( 1.0 / 30.0 )
00097             new_time = rospy.Time.now().to_nsec() * 10**-9
00098             curr = self.curr_angle
00099             des = self.des_angle
00100             
00101             if curr == des:
00102                 self.is_moving_flag = False
00103             else:
00104                 ang_diff = standard_rad( des - curr )
00105                 move_mag = np.sign( ang_diff ) * (new_time - last_time) * self.rate
00106                 new_ang = curr + move_mag
00107                 if np.abs( ang_diff ) < np.abs( move_mag ):
00108                     self.curr_angle = des
00109                     self.is_moving_flag = False
00110                 else:
00111                     self.is_moving_flag = True
00112                     self.curr_angle = new_ang
00113             
00114             last_time = new_time
00115                 
00116 
00117 class ROS_Robotis_Server():
00118     # This class provides ROS services for a select few lib_robotis functions
00119     def __init__(self, servo = None, name = '' ):
00120         if servo == None:
00121             raise RuntimeError( 'ROS_Robotis_Servo: No servo specified for server.\n' )
00122 
00123         self.servo = servo
00124         self.name = name
00125         
00126         try:
00127             rospy.init_node( 'robotis_servo_' + self.name )
00128         except rospy.ROSException:
00129             pass
00130 
00131         #self.servo.disable_torque()
00132 
00133         rospy.logout( 'ROS_Robotis_Servo: Starting Server /robotis/servo_' + self.name )
00134         self.channel = rospy.Publisher('/robotis/servo_' + self.name, Float64)
00135 
00136         self.__service_ang = rospy.Service('/robotis/servo_' + name + '_readangle',
00137                                            None_Float, self.__cb_readangle)
00138 
00139         self.__service_ismove = rospy.Service('/robotis/servo_' + name + '_ismoving',
00140                                               None_Int32, self.__cb_ismoving)
00141 
00142         self.__service_moveang = rospy.Service('/robotis/servo_' + name + '_moveangle',
00143                                                MoveAng, self.__cb_moveangle)
00144 
00145     def __cb_readangle( self, request ):
00146         ang = self.update_server()
00147         return None_FloatResponse( ang )
00148 
00149     def __cb_ismoving( self, request ):
00150         status = self.servo.is_moving()
00151         return None_Int32Response( int(status) )
00152 
00153     def __cb_moveangle( self, request ):
00154         rospy.logout( 'RobotisServo: Move %s to angle %3.2f' % (self.name, math.degrees( request.angle )))
00155         ang = request.angle
00156         angvel = request.angvel
00157         blocking = bool( request.blocking )
00158         self.servo.move_angle( ang, angvel, blocking )
00159         rospy.logout( 'RobotisServo: Move %s Done' % (self.name))
00160         return MoveAngResponse()
00161 
00162     def update_server(self):
00163         ang = self.servo.read_angle()
00164         self.channel.publish( Float64(ang) )
00165         return ang
00166 
00167 
00168 class ROS_Robotis_Poller( Thread ):
00169     # A utility class that will setup and poll a number of ROS_Robotis_Servos on one USB2Dynamixel
00170     def __init__( self, dev_name, ids, names ):
00171         Thread.__init__(self)
00172 
00173         self.should_run = True
00174         self.dev_name = dev_name
00175         self.ids = ids
00176         self.names = names
00177 
00178         for n in self.names:
00179             rospy.logout( 'ROS_Robotis_Servo: Starting Up /robotis/servo_' + n + ' on ' + self.dev_name )
00180 
00181         self.dyn = USB2Dynamixel_Device( self.dev_name )
00182         self.servos = [ Robotis_Servo( self.dyn, i ) for i in self.ids ]
00183         self.ros_servers = [ ROS_Robotis_Server( s, n ) for s,n in zip( self.servos, self.names ) ]
00184 
00185         rospy.logout( 'ROS_Robotis_Servo: Setup Complete on ' + self.dev_name )
00186 
00187         self.start()
00188 
00189     def run( self ):
00190         while self.should_run:
00191             [ s.update_server() for s in self.ros_servers ]
00192             rospy.sleep(0.001)
00193 
00194         for n in self.names:
00195             rospy.logout( 'ROS_Robotis_Servo: Shutting Down /robotis/servo_' + n + ' on ' + self.dev_name )
00196 
00197     def stop(self):
00198         self.should_run = False
00199         self.join(3)
00200         if (self.isAlive()):
00201             raise RuntimeError("ROS_Robotis_Servo: unable to stop thread")
00202 
00203 
00204 
00205 
00206 # Client should come from robotis package.  All services defined.
00207 


rfid_people_following
Author(s): Travis Deyle (Healthcare Robotics Lab, Georgia Tech)
autogenerated on Wed Nov 27 2013 11:38:30