$search
00001 #!/usr/bin/env python 00002 00003 """ 00004 ArbotiX Node: serial connection to an ArbotiX board w/ PyPose/NUKE/ROS 00005 Copyright (c) 2008-2011 Michael E. Ferguson. All right reserved. 00006 00007 Redistribution and use in source and binary forms, with or without 00008 modification, are permitted provided that the following conditions are met: 00009 * Redistributions of source code must retain the above copyright 00010 notice, this list of conditions and the following disclaimer. 00011 * Redistributions in binary form must reproduce the above copyright 00012 notice, this list of conditions and the following disclaimer in the 00013 documentation and/or other materials provided with the distribution. 00014 * Neither the name of Vanadium Labs LLC nor the names of its 00015 contributors may be used to endorse or promote products derived 00016 from this software without specific prior written permission. 00017 00018 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00019 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00020 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00021 DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT, 00022 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 00023 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 00025 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 00026 OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 00027 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00028 """ 00029 00030 import roslib; roslib.load_manifest('arbotix_python') 00031 import rospy 00032 import sys 00033 00034 from arbotix_msgs.msg import * 00035 from arbotix_msgs.srv import * 00036 00037 from arbotix_python.arbotix import ArbotiX 00038 from arbotix_python.diff_controller import DiffController 00039 from arbotix_python.follow_controller import FollowController 00040 from arbotix_python.servo_controller import * 00041 from arbotix_python.linear_controller import * 00042 from arbotix_python.publishers import * 00043 from arbotix_python.io import * 00044 00045 # name: [ControllerClass, pause] 00046 controller_types = { "follow_controller" : FollowController, 00047 "diff_controller" : DiffController, 00048 # "omni_controller" : OmniController, 00049 "linear_controller" : LinearControllerAbsolute, 00050 "linear_controller_i" : LinearControllerIncremental } 00051 00052 ############################################################################### 00053 # Main ROS interface 00054 class ArbotixROS(ArbotiX): 00055 00056 def __init__(self): 00057 pause = False 00058 00059 # load configurations 00060 port = rospy.get_param("~port", "/dev/ttyUSB0") 00061 baud = int(rospy.get_param("~baud", "115200")) 00062 00063 self.rate = rospy.get_param("~rate", 100.0) 00064 self.fake = rospy.get_param("~sim", False) 00065 00066 self.use_sync_read = rospy.get_param("~sync_read",True) # use sync read? 00067 self.use_sync_write = rospy.get_param("~sync_write",True) # use sync write? 00068 00069 # setup publishers 00070 self.diagnostics = DiagnosticsPublisher() 00071 self.joint_state_publisher = JointStatePublisher() 00072 00073 # start an arbotix driver 00074 if not self.fake: 00075 ArbotiX.__init__(self, port, baud) 00076 rospy.sleep(1.0) 00077 rospy.loginfo("Started ArbotiX connection on port " + port + ".") 00078 else: 00079 rospy.loginfo("ArbotiX being simulated.") 00080 00081 # setup joints 00082 self.joints = dict() 00083 for name in rospy.get_param("~joints", dict()).keys(): 00084 joint_type = rospy.get_param("~joints/"+name+"/type", "dynamixel") 00085 if joint_type == "dynamixel": 00086 self.joints[name] = DynamixelServo(self, name) 00087 elif joint_type == "hobby_servo": 00088 self.joints[name] = HobbyServo(self, name) 00089 elif joint_type == "calibrated_linear": 00090 self.joints[name] = LinearJoint(self, name) 00091 00092 # TODO: <BEGIN> REMOVE THIS BEFORE 1.0 00093 if len(rospy.get_param("~dynamixels", dict()).keys()) > 0: 00094 rospy.logwarn("Warning: use of dynamixels as a dictionary is deprecated and will be removed in 0.8.0") 00095 for name in rospy.get_param("~dynamixels", dict()).keys(): 00096 self.joints[name] = DynamixelServo(self, name, "~dynamixels") 00097 if len(rospy.get_param("~servos", dict()).keys()) > 0: 00098 rospy.logwarn("Warning: use of servos as a dictionary is deprecated and will be removed in 0.8.0") 00099 for name in rospy.get_param("~servos", dict()).keys(): 00100 self.joints[name] = HobbyServo(self, name, "~servos") 00101 # TODO: <END> REMOVE THIS BEFORE 1.0 00102 00103 # setup controller 00104 self.controllers = [ServoController(self, "servos"), ] 00105 controllers = rospy.get_param("~controllers", dict()) 00106 for name, params in controllers.items(): 00107 try: 00108 controller = controller_types[params["type"]](self, name) 00109 self.controllers.append( controller ) 00110 pause = pause or controller.pause 00111 except Exception as e: 00112 if type(e) == KeyError: 00113 rospy.logerr("Unrecognized controller: " + params["type"]) 00114 else: 00115 rospy.logerr(str(type(e)) + str(e)) 00116 00117 # wait for arbotix to start up (especially after reset) 00118 if not self.fake: 00119 if rospy.has_param("~digital_servos") or rospy.has_param("~digital_sensors") or rospy.has_param("~analog_sensors"): 00120 pause = True 00121 if pause: 00122 while self.getDigital(1) == -1 and not rospy.is_shutdown(): 00123 rospy.loginfo("Waiting for response...") 00124 rospy.sleep(0.25) 00125 rospy.loginfo("ArbotiX connected.") 00126 00127 for controller in self.controllers: 00128 controller.startup() 00129 00130 # services for io 00131 rospy.Service('~SetupAnalogIn',SetupChannel, self.analogInCb) 00132 rospy.Service('~SetupDigitalIn',SetupChannel, self.digitalInCb) 00133 rospy.Service('~SetupDigitalOut',SetupChannel, self.digitalOutCb) 00134 # initialize digital/analog IO streams 00135 self.io = dict() 00136 if not self.fake: 00137 for v,t in {"digital_servos":DigitalServo,"digital_sensors":DigitalSensor,"analog_sensors":AnalogSensor}.items(): 00138 temp = rospy.get_param("~"+v,dict()) 00139 for name in temp.keys(): 00140 pin = rospy.get_param('~'+v+'/'+name+'/pin',1) 00141 value = rospy.get_param('~'+v+'/'+name+'/value',0) 00142 rate = rospy.get_param('~'+v+'/'+name+'/rate',10) 00143 self.io[name] = t(name, pin, value, rate, self) 00144 00145 r = rospy.Rate(self.rate) 00146 00147 # main loop -- do all the read/write here 00148 while not rospy.is_shutdown(): 00149 00150 # update controllers 00151 for controller in self.controllers: 00152 controller.update() 00153 00154 # update io 00155 for io in self.io.values(): 00156 io.update() 00157 00158 # publish feedback 00159 self.joint_state_publisher.update(self.joints.values(), self.controllers) 00160 self.diagnostics.update(self.joints.values(), self.controllers) 00161 00162 r.sleep() 00163 00164 # do shutdown 00165 for controller in self.controllers: 00166 controller.shutdown() 00167 00168 def analogInCb(self, req): 00169 # TODO: Add check, only 1 service per pin 00170 if not self.fake: 00171 self.io[req.topic_name] = AnalogSensor(req.topic_name, req.pin, req.value, req.rate, self) 00172 return SetupChannelResponse() 00173 00174 def digitalInCb(self, req): 00175 if not self.fake: 00176 self.io[req.topic_name] = DigitalSensor(req.topic_name, req.pin, req.value, req.rate, self) 00177 return SetupChannelResponse() 00178 00179 def digitalOutCb(self, req): 00180 if not self.fake: 00181 self.io[req.topic_name] = DigitalServo(req.topic_name, req.pin, req.value, req.rate, self) 00182 return SetupChannelResponse() 00183 00184 00185 if __name__ == "__main__": 00186 rospy.init_node('arbotix') 00187 a = ArbotixROS() 00188