dynamixel_io.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 #
00004 # Software License Agreement (BSD License)
00005 #
00006 # Copyright (c) 2010-2011, Cody Jorgensen, Antons Rebguns.
00007 # All rights reserved.
00008 #
00009 # Redistribution and use in source and binary forms, with or without
00010 # modification, are permitted provided that the following conditions
00011 # are met:
00012 #
00013 #  * Redistributions of source code must retain the above copyright
00014 #    notice, this list of conditions and the following disclaimer.
00015 #  * Redistributions in binary form must reproduce the above
00016 #    copyright notice, this list of conditions and the following
00017 #    disclaimer in the documentation and/or other materials provided
00018 #    with the distribution.
00019 #  * Neither the name of University of Arizona nor the names of its
00020 #    contributors may be used to endorse or promote products derived
00021 #    from this software without specific prior written permission.
00022 #
00023 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00024 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00025 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00026 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00027 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00028 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00029 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00030 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00031 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00032 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00033 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00034 # POSSIBILITY OF SUCH DAMAGE.
00035 
00036 
00037 __author__ = 'Cody Jorgensen, Antons Rebguns'
00038 __copyright__ = 'Copyright (c) 2010-2011 Cody Jorgensen, Antons Rebguns'
00039 
00040 __license__ = 'BSD'
00041 __maintainer__ = 'Antons Rebguns'
00042 __email__ = 'anton@email.arizona.edu'
00043 
00044 
00045 import time
00046 import serial
00047 from array import array
00048 from binascii import b2a_hex
00049 from threading import Lock
00050 
00051 from dynamixel_const import *
00052 
00053 exception = None
00054 
00055 class DynamixelIO(object):
00056     """ Provides low level IO with the Dynamixel servos through pyserial. Has the
00057     ability to write instruction packets, request and read register value
00058     packets, send and receive a response to a ping packet, and send a SYNC WRITE
00059     multi-servo instruction packet.
00060     """
00061 
00062     def __init__(self, port, baudrate, readback_echo=False):
00063         """ Constructor takes serial port and baudrate as arguments. """
00064         try:
00065             self.serial_mutex = Lock()
00066             self.ser = None
00067             self.ser = serial.Serial(port, baudrate, timeout=0.015)
00068             self.port_name = port
00069             self.readback_echo = readback_echo
00070         except SerialOpenError:
00071            raise SerialOpenError(port, baudrate)
00072 
00073     def __del__(self):
00074         """ Destructor calls DynamixelIO.close """
00075         self.close()
00076 
00077     def close(self):
00078         """
00079         Be nice, close the serial port.
00080         """
00081         if self.ser:
00082             self.ser.flushInput()
00083             self.ser.flushOutput()
00084             self.ser.close()
00085 
00086     def __write_serial(self, data):
00087         self.ser.flushInput()
00088         self.ser.flushOutput()
00089         self.ser.write(data)
00090         if self.readback_echo:
00091             self.ser.read(len(data))
00092 
00093     def __read_response(self, servo_id):
00094         data = []
00095 
00096         try:
00097             data.extend(self.ser.read(4))
00098             if not data[0:2] == ['\xff', '\xff']: raise Exception('Wrong packet prefix %s' % data[0:2])
00099             data.extend(self.ser.read(ord(data[3])))
00100             data = array('B', ''.join(data)).tolist() # [int(b2a_hex(byte), 16) for byte in data]
00101         except Exception, e:
00102             raise DroppedPacketError('Invalid response received from motor %d. %s' % (servo_id, e))
00103 
00104         # verify checksum
00105         checksum = 255 - sum(data[2:-1]) % 256
00106         if not checksum == data[-1]: raise ChecksumError(servo_id, data, checksum)
00107 
00108         return data
00109 
00110     def read(self, servo_id, address, size):
00111         """ Read "size" bytes of data from servo with "servo_id" starting at the
00112         register with "address". "address" is an integer between 0 and 57. It is
00113         recommended to use the constants in module dynamixel_const for readability.
00114 
00115         To read the position from servo with id 1, the method should be called
00116         like:
00117             read(1, DXL_GOAL_POSITION_L, 2)
00118         """
00119         # Number of bytes following standard header (0xFF, 0xFF, id, length)
00120         length = 4  # instruction, address, size, checksum
00121 
00122         # directly from AX-12 manual:
00123         # Check Sum = ~ (ID + LENGTH + INSTRUCTION + PARAM_1 + ... + PARAM_N)
00124         # If the calculated value is > 255, the lower byte is the check sum.
00125         checksum = 255 - ( (servo_id + length + DXL_READ_DATA + address + size) % 256 )
00126 
00127         # packet: FF  FF  ID LENGTH INSTRUCTION PARAM_1 ... CHECKSUM
00128         packet = [0xFF, 0xFF, servo_id, length, DXL_READ_DATA, address, size, checksum]
00129         packetStr = array('B', packet).tostring() # same as: packetStr = ''.join([chr(byte) for byte in packet])
00130 
00131         with self.serial_mutex:
00132             self.__write_serial(packetStr)
00133 
00134             # wait for response packet from the motor
00135             timestamp = time.time()
00136             time.sleep(0.0013)#0.00235)
00137 
00138             # read response
00139             data = self.__read_response(servo_id)
00140             data.append(timestamp)
00141 
00142         return data
00143 
00144     def write(self, servo_id, address, data):
00145         """ Write the values from the "data" list to the servo with "servo_id"
00146         starting with data[0] at "address", continuing through data[n-1] at
00147         "address" + (n-1), where n = len(data). "address" is an integer between
00148         0 and 49. It is recommended to use the constants in module dynamixel_const
00149         for readability. "data" is a list/tuple of integers.
00150 
00151         To set servo with id 1 to position 276, the method should be called
00152         like:
00153             write(1, DXL_GOAL_POSITION_L, (20, 1))
00154         """
00155         # Number of bytes following standard header (0xFF, 0xFF, id, length)
00156         length = 3 + len(data)  # instruction, address, len(data), checksum
00157 
00158         # directly from AX-12 manual:
00159         # Check Sum = ~ (ID + LENGTH + INSTRUCTION + PARAM_1 + ... + PARAM_N)
00160         # If the calculated value is > 255, the lower byte is the check sum.
00161         checksum = 255 - ((servo_id + length + DXL_WRITE_DATA + address + sum(data)) % 256)
00162 
00163         # packet: FF  FF  ID LENGTH INSTRUCTION PARAM_1 ... CHECKSUM
00164         packet = [0xFF, 0xFF, servo_id, length, DXL_WRITE_DATA, address]
00165         packet.extend(data)
00166         packet.append(checksum)
00167 
00168         packetStr = array('B', packet).tostring() # packetStr = ''.join([chr(byte) for byte in packet])
00169 
00170         with self.serial_mutex:
00171             self.__write_serial(packetStr)
00172 
00173             # wait for response packet from the motor
00174             timestamp = time.time()
00175             time.sleep(0.0013)
00176 
00177             # read response
00178             data = self.__read_response(servo_id)
00179             data.append(timestamp)
00180 
00181         return data
00182 
00183     def sync_write(self, address, data):
00184         """ Use Broadcast message to send multiple servos instructions at the
00185         same time. No "status packet" will be returned from any servos.
00186         "address" is an integer between 0 and 49. It is recommended to use the
00187         constants in module dynamixel_const for readability. "data" is a tuple of
00188         tuples. Each tuple in "data" must contain the servo id followed by the
00189         data that should be written from the starting address. The amount of
00190         data can be as long as needed.
00191 
00192         To set servo with id 1 to position 276 and servo with id 2 to position
00193         550, the method should be called like:
00194             sync_write(DXL_GOAL_POSITION_L, ( (1, 20, 1), (2 ,38, 2) ))
00195         """
00196         # Calculate length and sum of all data
00197         flattened = [value for servo in data for value in servo]
00198 
00199         # Number of bytes following standard header (0xFF, 0xFF, id, length) plus data
00200         length = 4 + len(flattened)
00201 
00202         checksum = 255 - ((DXL_BROADCAST + length + \
00203                           DXL_SYNC_WRITE + address + len(data[0][1:]) + \
00204                           sum(flattened)) % 256)
00205 
00206         # packet: FF  FF  ID LENGTH INSTRUCTION PARAM_1 ... CHECKSUM
00207         packet = [0xFF, 0xFF, DXL_BROADCAST, length, DXL_SYNC_WRITE, address, len(data[0][1:])]
00208         packet.extend(flattened)
00209         packet.append(checksum)
00210 
00211         packetStr = array('B', packet).tostring() # packetStr = ''.join([chr(byte) for byte in packet])
00212 
00213         with self.serial_mutex:
00214             self.__write_serial(packetStr)
00215 
00216     def ping(self, servo_id):
00217         """ Ping the servo with "servo_id". This causes the servo to return a
00218         "status packet". This can tell us if the servo is attached and powered,
00219         and if so, if there are any errors.
00220         """
00221         # Number of bytes following standard header (0xFF, 0xFF, id, length)
00222         length = 2  # instruction, checksum
00223 
00224         # directly from AX-12 manual:
00225         # Check Sum = ~ (ID + LENGTH + INSTRUCTION + PARAM_1 + ... + PARAM_N)
00226         # If the calculated value is > 255, the lower byte is the check sum.
00227         checksum = 255 - ((servo_id + length + DXL_PING) % 256)
00228 
00229         # packet: FF  FF  ID LENGTH INSTRUCTION CHECKSUM
00230         packet = [0xFF, 0xFF, servo_id, length, DXL_PING, checksum]
00231         packetStr = array('B', packet).tostring()
00232 
00233         with self.serial_mutex:
00234             self.__write_serial(packetStr)
00235 
00236             # wait for response packet from the motor
00237             timestamp = time.time()
00238             time.sleep(0.0013)
00239 
00240             # read response
00241             try:
00242                 response = self.__read_response(servo_id)
00243                 response.append(timestamp)
00244             except Exception, e:
00245                 response = []
00246 
00247         if response:
00248             self.exception_on_error(response[4], servo_id, 'ping')
00249         return response
00250 
00251     def test_bit(self, number, offset):
00252         mask = 1 << offset
00253         return (number & mask)
00254 
00255 
00256     ######################################################################
00257     # These function modify EEPROM data which persists after power cycle #
00258     ######################################################################
00259 
00260     def set_id(self, old_id, new_id):
00261         """
00262         Sets a new unique number to identify a motor. The range from 1 to 253
00263         (0xFD) can be used.
00264         """
00265         response = self.write(old_id, DXL_ID, [new_id])
00266         if response:
00267             self.exception_on_error(response[4], old_id, 'setting id to %d' % new_id)
00268         return response
00269 
00270     def set_baud_rate(self, servo_id, baud_rate):
00271         """
00272         Sets servo communication speed. The range from 0 to 254.
00273         """
00274         response = self.write(servo_id, DXL_BAUD_RATE, [baud_rate])
00275         if response:
00276             self.exception_on_error(response[4], servo_id, 'setting baud rate to %d' % baud_rate)
00277         return response
00278 
00279     def set_return_delay_time(self, servo_id, delay):
00280         """
00281         Sets the delay time from the transmission of Instruction Packet until
00282         the return of Status Packet. 0 to 254 (0xFE) can be used, and the delay
00283         time per data value is 2 usec.
00284         """
00285         response = self.write(servo_id, DXL_RETURN_DELAY_TIME, [delay])
00286         if response:
00287             self.exception_on_error(response[4], servo_id, 'setting return delay time to %d' % delay)
00288         return response
00289 
00290     def set_angle_limit_cw(self, servo_id, angle_cw):
00291         """
00292         Set the min (CW) angle of rotation limit.
00293         """
00294         loVal = int(angle_cw % 256)
00295         hiVal = int(angle_cw >> 8)
00296 
00297         response = self.write(servo_id, DXL_CW_ANGLE_LIMIT_L, (loVal, hiVal))
00298         if response:
00299             self.exception_on_error(response[4], servo_id, 'setting CW angle limits to %d' % angle_cw)
00300         return response
00301 
00302     def set_angle_limit_ccw(self, servo_id, angle_ccw):
00303         """
00304         Set the max (CCW) angle of rotation limit.
00305         """
00306         loVal = int(angle_ccw % 256)
00307         hiVal = int(angle_ccw >> 8)
00308 
00309         response = self.write(servo_id, DXL_CCW_ANGLE_LIMIT_L, (loVal, hiVal))
00310         if response:
00311             self.exception_on_error(response[4], servo_id, 'setting CCW angle limits to %d' % angle_ccw)
00312         return response
00313 
00314     def set_angle_limits(self, servo_id, min_angle, max_angle):
00315         """
00316         Set the min (CW) and max (CCW) angle of rotation limits.
00317         """
00318         loMinVal = int(min_angle % 256)
00319         hiMinVal = int(min_angle >> 8)
00320         loMaxVal = int(max_angle % 256)
00321         hiMaxVal = int(max_angle >> 8)
00322 
00323         # set 4 register values with low and high bytes for min and max angles
00324         response = self.write(servo_id, DXL_CW_ANGLE_LIMIT_L, (loMinVal, hiMinVal, loMaxVal, hiMaxVal))
00325         if response:
00326             self.exception_on_error(response[4], servo_id, 'setting CW and CCW angle limits to %d and %d' %(min_angle, max_angle))
00327         return response
00328 
00329     def set_drive_mode(self, servo_id, is_slave=False, is_reverse=False):
00330         """
00331         Sets the drive mode for EX-106 motors
00332         """
00333         drive_mode = (is_slave << 1) + is_reverse
00334 
00335         response = self.write(servo_id, DXL_DRIVE_MODE, [drive_mode])
00336         if response:
00337             self.exception_on_error(response[4], servo_id, 'setting drive mode to %d' % drive_mode)
00338         return response
00339 
00340     def set_voltage_limit_min(self, servo_id, min_voltage):
00341         """
00342         Set the minimum voltage limit.
00343         NOTE: the absolute min is 5v
00344         """
00345 
00346         if min_voltage < 5: min_voltage = 5
00347         minVal = int(min_voltage * 10)
00348 
00349         response = self.write(servo_id, DXL_DOWN_LIMIT_VOLTAGE, [minVal])
00350         if response:
00351             self.exception_on_error(response[4], servo_id, 'setting minimum voltage level to %d' % min_voltage)
00352         return response
00353 
00354     def set_voltage_limit_max(self, servo_id, max_voltage):
00355         """
00356         Set the maximum voltage limit.
00357         NOTE: the absolute min is 25v
00358         """
00359 
00360         if max_voltage > 25: max_voltage = 25
00361         maxVal = int(max_voltage * 10)
00362 
00363         response = self.write(servo_id, DXL_UP_LIMIT_VOLTAGE, [maxVal])
00364         if response:
00365             self.exception_on_error(response[4], servo_id, 'setting maximum voltage level to %d' % max_voltage)
00366         return response
00367 
00368     def set_voltage_limits(self, servo_id, min_voltage, max_voltage):
00369         """
00370         Set the min and max voltage limits.
00371         NOTE: the absolute min is 5v and the absolute max is 25v
00372         """
00373 
00374         if min_voltage < 5: min_voltage = 5
00375         if max_voltage > 25: max_voltage = 25
00376 
00377         minVal = int(min_voltage * 10)
00378         maxVal = int(max_voltage * 10)
00379 
00380         response = self.write(servo_id, DXL_DOWN_LIMIT_VOLTAGE, (minVal, maxVal))
00381         if response:
00382             self.exception_on_error(response[4], servo_id, 'setting min and max voltage levels to %d and %d' %(min_voltage, max_voltage))
00383         return response
00384 
00385 
00386     ###############################################################
00387     # These functions can send a single command to a single servo #
00388     ###############################################################
00389 
00390     def set_torque_enabled(self, servo_id, enabled):
00391         """
00392         Sets the value of the torque enabled register to 1 or 0. When the
00393         torque is disabled the servo can be moved manually while the motor is
00394         still powered.
00395         """
00396         response = self.write(servo_id, DXL_TORQUE_ENABLE, [enabled])
00397         if response:
00398             self.exception_on_error(response[4], servo_id, '%sabling torque' % 'en' if enabled else 'dis')
00399         return response
00400 
00401     def set_compliance_margin_cw(self, servo_id, margin):
00402         """
00403         The error between goal position and present position in CW direction.
00404         The range of the value is 0 to 255, and the unit is the same as Goal Position.
00405         The greater the value, the more difference occurs.
00406         """
00407         response = self.write(servo_id, DXL_CW_COMPLIANCE_MARGIN, [margin])
00408         if response:
00409             self.exception_on_error(response[4], servo_id, 'setting CW compliance margin to %d' % margin)
00410         return response
00411 
00412     def set_compliance_margin_ccw(self, servo_id, margin):
00413         """
00414         The error between goal position and present position in CCW direction.
00415         The range of the value is 0 to 255, and the unit is the same as Goal Position.
00416         The greater the value, the more difference occurs.
00417         """
00418         response = self.write(servo_id, DXL_CCW_COMPLIANCE_MARGIN, [margin])
00419         if response:
00420             self.exception_on_error(response[4], servo_id, 'setting CCW compliance margin to %d' % margin)
00421         return response
00422 
00423     def set_compliance_margins(self, servo_id, margin_cw, margin_ccw):
00424         """
00425         The error between goal position and present position in CCW direction.
00426         The range of the value is 0 to 255, and the unit is the same as Goal Position.
00427         The greater the value, the more difference occurs.
00428         """
00429         response = self.write(servo_id, DXL_CW_COMPLIANCE_MARGIN, (margin_cw, margin_ccw))
00430         if response:
00431             self.exception_on_error(response[4], servo_id, 'setting CW and CCW compliance margins to values %d and %d' %(margin_cw, margin_ccw))
00432         return response
00433 
00434     def set_compliance_slope_cw(self, servo_id, slope):
00435         """
00436         Sets the level of Torque near the goal position in CW direction.
00437         Compliance Slope is set in 7 steps, the higher the value, the more flexibility is obtained.
00438         """
00439         response = self.write(servo_id, DXL_CW_COMPLIANCE_SLOPE, [slope])
00440         if response:
00441             self.exception_on_error(response[4], servo_id, 'setting CW compliance slope to %d' %  slope)
00442         return response
00443 
00444     def set_compliance_slope_ccw(self, servo_id, slope):
00445         """
00446         Sets the level of Torque near the goal position in CCW direction.
00447         Compliance Slope is set in 7 steps, the higher the value, the more flexibility is obtained.
00448         """
00449         response = self.write(servo_id, DXL_CCW_COMPLIANCE_SLOPE, [slope])
00450         if response:
00451             self.exception_on_error(response[4], servo_id, 'setting CCW compliance slope to %d' % slope)
00452         return response
00453 
00454     def set_compliance_slopes(self, servo_id, slope_cw, slope_ccw):
00455         """
00456         Sets the level of Torque near the goal position in CW/CCW direction.
00457         Compliance Slope is set in 7 steps, the higher the value, the more flexibility is obtained.
00458         """
00459         response = self.write(servo_id, DXL_CW_COMPLIANCE_SLOPE, (slope_cw, slope_ccw))
00460         if response:
00461             self.exception_on_error(response[4], servo_id, 'setting CW and CCW compliance slopes to %d and %d' %(slope_cw, slope_ccw))
00462         return response
00463 
00464     def set_d_gain(self, servo_id, d_gain):
00465         """
00466         Sets the value of derivative action of PID controller.
00467         Gain value is in range 0 to 254.
00468         """
00469         response = self.write(servo_id, DXL_D_GAIN, [d_gain])
00470         if response:
00471             self.exception_on_error(response[4], servo_id, 'setting D gain value of PID controller to %d' % d_gain)
00472         return response
00473 
00474     def set_i_gain(self, servo_id, i_gain):
00475         """
00476         Sets the value of integral action of PID controller.
00477         Gain value is in range 0 to 254.
00478         """
00479         response = self.write(servo_id, DXL_I_GAIN, [i_gain])
00480         if response:
00481             self.exception_on_error(response[4], servo_id, 'setting I gain value of PID controller to %d' % i_gain)
00482         return response
00483 
00484     def set_p_gain(self, servo_id, p_gain):
00485         """
00486         Sets the value of proportional action of PID controller.
00487         Gain value is in range 0 to 254.
00488         """
00489         response = self.write(servo_id, DXL_P_GAIN, [p_gain])
00490         if response:
00491             self.exception_on_error(response[4], servo_id, 'setting P gain value of PID controller to %d' % p_gain)
00492         return response
00493 
00494     def set_punch(self, servo_id, punch):
00495         """
00496         Sets the limit value of torque being reduced when the output torque is
00497         decreased in the Compliance Slope area. In other words, it is the mimimum
00498         torque. The initial value is 32 (0x20) and can be extended up to 1023
00499         (0x3FF). (Refer to Compliance margin & Slope)
00500         """
00501         loVal = int(punch % 256)
00502         hiVal = int(punch >> 8)
00503         response = self.write(servo_id, DXL_PUNCH_L, (loVal, hiVal))
00504         if response:
00505             self.exception_on_error(response[4], servo_id, 'setting punch to %d' % punch)
00506         return response
00507 
00508     def set_acceleration(self, servo_id, acceleration):
00509         """
00510         Sets the acceleration. The unit is 8.583 Degree / sec^2.
00511         0 - acceleration control disabled, 1-254 - valid range for acceleration.
00512         """
00513 
00514         model = self.get_model_number(servo_id)
00515         if not model in DXL_MODEL_TO_PARAMS:
00516             raise UnsupportedFeatureError(model, DXL_GOAL_ACCELERATION)
00517 
00518         if DXL_GOAL_ACCELERATION in DXL_MODEL_TO_PARAMS[model]['features']:
00519             response = self.write(servo_id, DXL_GOAL_ACCELERATION, (acceleration, ))
00520             if response:
00521                 self.exception_on_error(response[4], servo_id, 'setting acceleration to %d' % acceleration)
00522             return response
00523         else:
00524             raise UnsupportedFeatureError(model, DXL_GOAL_ACCELERATION)
00525 
00526     def set_position(self, servo_id, position):
00527         """
00528         Set the servo with servo_id to the specified goal position.
00529         Position value must be positive.
00530         """
00531         loVal = int(position % 256)
00532         hiVal = int(position >> 8)
00533 
00534         response = self.write(servo_id, DXL_GOAL_POSITION_L, (loVal, hiVal))
00535         if response:
00536             self.exception_on_error(response[4], servo_id, 'setting goal position to %d' % position)
00537         return response
00538 
00539     def set_speed(self, servo_id, speed):
00540         """
00541         Set the servo with servo_id to the specified goal speed.
00542         Speed can be negative only if the dynamixel is in "freespin" mode.
00543         """
00544         # split speed into 2 bytes
00545         if speed >= 0:
00546             loVal = int(speed % 256)
00547             hiVal = int(speed >> 8)
00548         else:
00549             loVal = int((1023 - speed) % 256)
00550             hiVal = int((1023 - speed) >> 8)
00551 
00552         # set two register values with low and high byte for the speed
00553         response = self.write(servo_id, DXL_GOAL_SPEED_L, (loVal, hiVal))
00554         if response:
00555             self.exception_on_error(response[4], servo_id, 'setting moving speed to %d' % speed)
00556         return response
00557 
00558     def set_torque_limit(self, servo_id, torque):
00559         """
00560         Sets the value of the maximum torque limit for servo with id servo_id.
00561         Valid values are 0 to 1023 (0x3FF), and the unit is about 0.1%.
00562         For example, if the value is 512 only 50% of the maximum torque will be used.
00563         If the power is turned on, the value of Max Torque (Address 14, 15) is used as the initial value.
00564         """
00565         loVal = int(torque % 256)
00566         hiVal = int(torque >> 8)
00567 
00568         response = self.write(servo_id, DXL_TORQUE_LIMIT_L, (loVal, hiVal))
00569         if response:
00570             self.exception_on_error(response[4], servo_id, 'setting torque limit to %d' % torque)
00571         return response
00572 
00573     def set_goal_torque(self, servo_id, torque):
00574         """
00575         Set the servo to torque control mode (similar to wheel mode, but controlling the torque)
00576         Valid values are from -1023 to 1023.
00577         Anything outside this range or 'None' disables torque control.
00578         """
00579 
00580         model = self.get_model_number(servo_id)
00581         if not model in DXL_MODEL_TO_PARAMS:
00582             raise UnsupportedFeatureError(model, DXL_TORQUE_CONTROL_MODE)
00583 
00584         valid_torque = torque is not None and torque >= -1023 and torque <= 1023
00585         if torque is not None and torque < 0:
00586             torque = 1024 - torque
00587 
00588         if DXL_TORQUE_CONTROL_MODE in DXL_MODEL_TO_PARAMS[model]['features']:
00589             if valid_torque:
00590                 loVal = int(torque % 256); hiVal = int(torque >> 8);
00591                 response = self.write(servo_id, DXL_GOAL_TORQUE_L, (loVal, hiVal))
00592                 if response:
00593                     self.exception_on_error(response[4], servo_id, 'setting goal torque to %d' % torque)
00594             response = self.write(servo_id, DXL_TORQUE_CONTROL_MODE, (int(valid_torque), ))
00595             if response:
00596                 self.exception_on_error(response[4], servo_id, 'enabling torque mode')
00597             return response
00598         else:
00599             raise UnsupportedFeatureError(model, DXL_TORQUE_CONTROL_MODE)
00600 
00601     def set_position_and_speed(self, servo_id, position, speed):
00602         """
00603         Set the servo with servo_id to specified position and speed.
00604         Speed can be negative only if the dynamixel is in "freespin" mode.
00605         """
00606         # split speed into 2 bytes
00607         if speed >= 0:
00608             loSpeedVal = int(speed % 256)
00609             hiSpeedVal = int(speed >> 8)
00610         else:
00611             loSpeedVal = int((1023 - speed) % 256)
00612             hiSpeedVal = int((1023 - speed) >> 8)
00613 
00614         # split position into 2 bytes
00615         loPositionVal = int(position % 256)
00616         hiPositionVal = int(position >> 8)
00617 
00618         response = self.write(servo_id, DXL_GOAL_POSITION_L, (loPositionVal, hiPositionVal, loSpeedVal, hiSpeedVal))
00619         if response:
00620             self.exception_on_error(response[4], servo_id, 'setting goal position to %d and moving speed to %d' %(position, speed))
00621         return response
00622 
00623     def set_led(self, servo_id, led_state):
00624         """
00625         Turn the LED of servo motor on/off.
00626         Possible boolean state values:
00627             True - turn the LED on,
00628             False - turn the LED off.
00629         """
00630         response = self.write(servo_id, DXL_LED, [led_state])
00631         if response:
00632             self.exception_on_error(response[4], servo_id,
00633                     'setting a LED to %s' % led_state)
00634         return response
00635 
00636 
00637     #################################################################
00638     # These functions can send multiple commands to multiple servos #
00639     # These commands are used in ROS wrapper as they don't send a   #
00640     # response packet, ROS wrapper gets motor states at a set rate  #
00641     #################################################################
00642 
00643     def set_multi_torque_enabled(self, valueTuples):
00644         """
00645         Method to set multiple servos torque enabled.
00646         Should be called as such:
00647         set_multi_servos_to_torque_enabled( (id1, True), (id2, True), (id3, True) )
00648         """
00649         # use sync write to broadcast multi servo message
00650         self.sync_write(DXL_TORQUE_ENABLE, tuple(valueTuples))
00651 
00652     def set_multi_compliance_margin_cw(self, valueTuples):
00653         """
00654         Set different CW compliance margin for multiple servos.
00655         Should be called as such:
00656         set_multi_compliance_margin_cw( ( (id1, margin1), (id2, margin2), (id3, margin3) ) )
00657         """
00658         self.sync_write(DXL_CW_COMPLIANCE_MARGIN, tuple(valueTuples))
00659 
00660     def set_multi_compliance_margin_ccw(self, valueTuples):
00661         """
00662         Set different CCW compliance margin for multiple servos.
00663         Should be called as such:
00664         set_multi_compliance_margin_ccw( ( (id1, margin1), (id2, margin2), (id3, margin3) ) )
00665         """
00666         self.sync_write(DXL_CCW_COMPLIANCE_MARGIN, tuple(valueTuples))
00667 
00668     def set_multi_compliance_margins(self, valueTuples):
00669         """
00670         Set different CW and CCW compliance margins for multiple servos.
00671         Should be called as such:
00672         set_multi_compliance_margins( ( (id1, cw_margin1, ccw_margin1), (id2, cw_margin2, ccw_margin2) ) )
00673         """
00674         self.sync_write(DXL_CW_COMPLIANCE_MARGIN, tuple(valueTuples))
00675 
00676     def set_multi_compliance_slope_cw(self, valueTuples):
00677         """
00678         Set different CW compliance slope for multiple servos.
00679         Should be called as such:
00680         set_multi_compliance_slope_cw( ( (id1, slope1), (id2, slope2), (id3, slope3) ) )
00681         """
00682         self.sync_write(DXL_CW_COMPLIANCE_SLOPE, tuple(valueTuples))
00683 
00684     def set_multi_compliance_slope_ccw(self, valueTuples):
00685         """
00686         Set different CCW compliance slope for multiple servos.
00687         Should be called as such:
00688         set_multi_compliance_slope_ccw( ( (id1, slope1), (id2, slope2), (id3, slope3) ) )
00689         """
00690         self.sync_write(DXL_CCW_COMPLIANCE_SLOPE, tuple(valueTuples))
00691 
00692     def set_multi_compliance_slopes(self, valueTuples):
00693         """
00694         Set different CW and CCW compliance slopes for multiple servos.
00695         Should be called as such:
00696         set_multi_compliance_slopes( ( (id1, cw_slope1, ccw_slope1), (id2, cw_slope2, ccw_slope2) ) )
00697         """
00698         self.sync_write(DXL_CW_COMPLIANCE_SLOPE, tuple(valueTuples))
00699 
00700     def set_multi_punch(self, valueTuples):
00701         """
00702         Set different punch values for multiple servos.
00703         NOTE: according to documentation, currently this value is not being used.
00704         Should be called as such:
00705         set_multi_punch( ( (id1, punch1), (id2, punch2), (id3, punch3) ) )
00706         """
00707         # prepare value tuples for call to syncwrite
00708         writeableVals = []
00709 
00710         for sid,punch in valueTuples:
00711             # split punch into 2 bytes
00712             loVal = int(punch % 256)
00713             hiVal = int(punch >> 8)
00714             writeableVals.append( (sid, loVal, hiVal) )
00715 
00716         # use sync write to broadcast multi servo message
00717         self.sync_write(DXL_PUNCH_L, writeableVals)
00718 
00719     def set_multi_position(self, valueTuples):
00720         """
00721         Set different positions for multiple servos.
00722         Should be called as such:
00723         set_multi_position( ( (id1, position1), (id2, position2), (id3, position3) ) )
00724         """
00725         # prepare value tuples for call to syncwrite
00726         writeableVals = []
00727 
00728         for vals in valueTuples:
00729             sid = vals[0]
00730             position = vals[1]
00731             # split position into 2 bytes
00732             loVal = int(position % 256)
00733             hiVal = int(position >> 8)
00734             writeableVals.append( (sid, loVal, hiVal) )
00735 
00736         # use sync write to broadcast multi servo message
00737         self.sync_write(DXL_GOAL_POSITION_L, writeableVals)
00738 
00739     def set_multi_speed(self, valueTuples):
00740         """
00741         Set different speeds for multiple servos.
00742         Should be called as such:
00743         set_multi_speed( ( (id1, speed1), (id2, speed2), (id3, speed3) ) )
00744         """
00745         # prepare value tuples for call to syncwrite
00746         writeableVals = []
00747 
00748         for vals in valueTuples:
00749             sid = vals[0]
00750             speed = vals[1]
00751 
00752             # split speed into 2 bytes
00753             if speed >= 0:
00754                 loVal = int(speed % 256)
00755                 hiVal = int(speed >> 8)
00756             else:
00757                 loVal = int((1023 - speed) % 256)
00758                 hiVal = int((1023 - speed) >> 8)
00759 
00760             writeableVals.append( (sid, loVal, hiVal) )
00761 
00762         # use sync write to broadcast multi servo message
00763         self.sync_write(DXL_GOAL_SPEED_L, writeableVals)
00764 
00765     def set_multi_torque_limit(self, valueTuples):
00766         """
00767         Set different torque limits for multiple servos.
00768         Should be called as such:
00769         set_multi_torque_limit( ( (id1, torque1), (id2, torque2), (id3, torque3) ) )
00770         """
00771         # prepare value tuples for call to syncwrite
00772         writeableVals = []
00773 
00774         for sid,torque in valueTuples:
00775             # split torque into 2 bytes
00776             loVal = int(torque % 256)
00777             hiVal = int(torque >> 8)
00778             writeableVals.append( (sid, loVal, hiVal) )
00779 
00780         # use sync write to broadcast multi servo message
00781         self.sync_write(DXL_TORQUE_LIMIT_L, writeableVals)
00782 
00783     def set_multi_position_and_speed(self, valueTuples):
00784         """
00785         Set different positions and speeds for multiple servos.
00786         Should be called as such:
00787         set_multi_position_and_speed( ( (id1, position1, speed1), (id2, position2, speed2), (id3, position3, speed3) ) )
00788         """
00789         # prepare value tuples for call to syncwrite
00790         writeableVals = []
00791 
00792         for vals in valueTuples:
00793             sid = vals[0]
00794             position = vals[1]
00795             speed = vals[2]
00796 
00797             # split speed into 2 bytes
00798             if speed >= 0:
00799                 loSpeedVal = int(speed % 256)
00800                 hiSpeedVal = int(speed >> 8)
00801             else:
00802                 loSpeedVal = int((1023 - speed) % 256)
00803                 hiSpeedVal = int((1023 - speed) >> 8)
00804 
00805             # split position into 2 bytes
00806             loPositionVal = int(position % 256)
00807             hiPositionVal = int(position >> 8)
00808             writeableVals.append( (sid, loPositionVal, hiPositionVal, loSpeedVal, hiSpeedVal) )
00809 
00810         # use sync write to broadcast multi servo message
00811         self.sync_write(DXL_GOAL_POSITION_L, tuple(writeableVals))
00812 
00813 
00814     #################################
00815     # Servo status access functions #
00816     #################################
00817 
00818     def get_model_number(self, servo_id):
00819         """ Reads the servo's model number (e.g. 12 for AX-12+). """
00820         response = self.read(servo_id, DXL_MODEL_NUMBER_L, 2)
00821         if response:
00822             self.exception_on_error(response[4], servo_id, 'fetching model number')
00823         return response[5] + (response[6] << 8)
00824 
00825     def get_firmware_version(self, servo_id):
00826         """ Reads the servo's firmware version. """
00827         response = self.read(servo_id, DXL_VERSION, 1)
00828         if response:
00829             self.exception_on_error(response[4], servo_id, 'fetching firmware version')
00830         return response[5]
00831 
00832     def get_return_delay_time(self, servo_id):
00833         """ Reads the servo's return delay time. """
00834         response = self.read(servo_id, DXL_RETURN_DELAY_TIME, 1)
00835         if response:
00836             self.exception_on_error(response[4], servo_id, 'fetching return delay time')
00837         return response[5]
00838 
00839     def get_angle_limits(self, servo_id):
00840         """
00841         Returns the min and max angle limits from the specified servo.
00842         """
00843         # read in 4 consecutive bytes starting with low value of clockwise angle limit
00844         response = self.read(servo_id, DXL_CW_ANGLE_LIMIT_L, 4)
00845         if response:
00846             self.exception_on_error(response[4], servo_id, 'fetching CW/CCW angle limits')
00847         # extract data valus from the raw data
00848         cwLimit = response[5] + (response[6] << 8)
00849         ccwLimit = response[7] + (response[8] << 8)
00850 
00851         # return the data in a dictionary
00852         return {'min':cwLimit, 'max':ccwLimit}
00853 
00854     def get_drive_mode(self, servo_id):
00855         """ Reads the servo's drive mode. """
00856         response = self.read(servo_id, DXL_DRIVE_MODE, 1)
00857         if response:
00858             self.exception_on_error(response[4], servo_id, 'fetching drive mode')
00859         return response[5]
00860 
00861     def get_voltage_limits(self, servo_id):
00862         """
00863         Returns the min and max voltage limits from the specified servo.
00864         """
00865         response = self.read(servo_id, DXL_DOWN_LIMIT_VOLTAGE, 2)
00866         if response:
00867             self.exception_on_error(response[4], servo_id, 'fetching voltage limits')
00868         # extract data valus from the raw data
00869         min_voltage = response[5] / 10.0
00870         max_voltage = response[6] / 10.0
00871 
00872         # return the data in a dictionary
00873         return {'min':min_voltage, 'max':max_voltage}
00874 
00875     def get_position(self, servo_id):
00876         """ Reads the servo's position value from its registers. """
00877         response = self.read(servo_id, DXL_PRESENT_POSITION_L, 2)
00878         if response:
00879             self.exception_on_error(response[4], servo_id, 'fetching present position')
00880         position = response[5] + (response[6] << 8)
00881         return position
00882 
00883     def get_speed(self, servo_id):
00884         """ Reads the servo's speed value from its registers. """
00885         response = self.read(servo_id, DXL_PRESENT_SPEED_L, 2)
00886         if response:
00887             self.exception_on_error(response[4], servo_id, 'fetching present speed')
00888         speed = response[5] + (response[6] << 8)
00889         if speed > 1023:
00890             return 1023 - speed
00891         return speed
00892 
00893     def get_voltage(self, servo_id):
00894         """ Reads the servo's voltage. """
00895         response = self.read(servo_id, DXL_PRESENT_VOLTAGE, 1)
00896         if response:
00897             self.exception_on_error(response[4], servo_id, 'fetching supplied voltage')
00898         return response[5] / 10.0
00899 
00900     def get_current(self, servo_id):
00901         """ Reads the servo's current consumption (if supported by model) """
00902         model = self.get_model_number(servo_id)
00903         if not model in DXL_MODEL_TO_PARAMS:
00904             raise UnsupportedFeatureError(model, DXL_CURRENT_L)
00905 
00906         if DXL_CURRENT_L in DXL_MODEL_TO_PARAMS[model]['features']:
00907             response = self.read(servo_id, DXL_CURRENT_L, 2)
00908             if response:
00909                 self.exception_on_error(response[4], servo_id, 'fetching sensed current')
00910             current = response[5] + (response[6] << 8)
00911             return 0.0045 * (current - 2048)
00912 
00913         if DXL_SENSED_CURRENT_L in DXL_MODEL_TO_PARAMS[model]['features']:
00914             response = self.read(servo_id, DXL_SENSED_CURRENT_L, 2)
00915             if response:
00916                 self.exception_on_error(response[4], servo_id, 'fetching sensed current')
00917             current = response[5] + (response[6] << 8)
00918             return 0.01 * (current - 512)
00919 
00920         else:
00921             raise UnsupportedFeatureError(model, DXL_CURRENT_L)
00922 
00923 
00924     def get_feedback(self, servo_id):
00925         """
00926         Returns the id, goal, position, error, speed, load, voltage, temperature
00927         and moving values from the specified servo.
00928         """
00929         # read in 17 consecutive bytes starting with low value for goal position
00930         response = self.read(servo_id, DXL_GOAL_POSITION_L, 17)
00931 
00932         if response:
00933             self.exception_on_error(response[4], servo_id, 'fetching full servo status')
00934         if len(response) == 24:
00935             # extract data values from the raw data
00936             goal = response[5] + (response[6] << 8)
00937             position = response[11] + (response[12] << 8)
00938             error = position - goal
00939             speed = response[13] + ( response[14] << 8)
00940             if speed > 1023: speed = 1023 - speed
00941             load_raw = response[15] + (response[16] << 8)
00942             load_direction = 1 if self.test_bit(load_raw, 10) else 0
00943             load = (load_raw & int('1111111111', 2)) / 1024.0
00944             if load_direction == 1: load = -load
00945             voltage = response[17] / 10.0
00946             temperature = response[18]
00947             moving = response[21]
00948             timestamp = response[-1]
00949 
00950             # return the data in a dictionary
00951             return { 'timestamp': timestamp,
00952                      'id': servo_id,
00953                      'goal': goal,
00954                      'position': position,
00955                      'error': error,
00956                      'speed': speed,
00957                      'load': load,
00958                      'voltage': voltage,
00959                      'temperature': temperature,
00960                      'moving': bool(moving) }
00961 
00962     def get_led(self, servo_id):
00963         """
00964         Get status of the LED. Boolean return values:
00965             True - LED is on,
00966             False - LED is off.
00967         """
00968         response = self.read(servo_id, DXL_LED, 1)
00969         if response:
00970             self.exception_on_error(response[4], servo_id,
00971                 'fetching LED status')
00972 
00973         return bool(response[5])
00974 
00975 
00976     def exception_on_error(self, error_code, servo_id, command_failed):
00977         global exception
00978         exception = None
00979         ex_message = '[servo #%d on %s@%sbps]: %s failed' % (servo_id, self.ser.port, self.ser.baudrate, command_failed)
00980 
00981         if not isinstance(error_code, int):
00982             msg = 'Communcation Error ' + ex_message
00983             exception = NonfatalErrorCodeError(msg, 0)
00984             return
00985         if not error_code & DXL_OVERHEATING_ERROR == 0:
00986             msg = 'Overheating Error ' + ex_message
00987             exception = FatalErrorCodeError(msg, error_code)
00988         if not error_code & DXL_OVERLOAD_ERROR == 0:
00989             msg = 'Overload Error ' + ex_message
00990             exception = FatalErrorCodeError(msg, error_code)
00991         if not error_code & DXL_INPUT_VOLTAGE_ERROR == 0:
00992             msg = 'Input Voltage Error ' + ex_message
00993             exception = NonfatalErrorCodeError(msg, error_code)
00994         if not error_code & DXL_ANGLE_LIMIT_ERROR == 0:
00995             msg = 'Angle Limit Error ' + ex_message
00996             exception = NonfatalErrorCodeError(msg, error_code)
00997         if not error_code & DXL_RANGE_ERROR == 0:
00998             msg = 'Range Error ' + ex_message
00999             exception = NonfatalErrorCodeError(msg, error_code)
01000         if not error_code & DXL_CHECKSUM_ERROR == 0:
01001             msg = 'Checksum Error ' + ex_message
01002             exception = NonfatalErrorCodeError(msg, error_code)
01003         if not error_code & DXL_INSTRUCTION_ERROR == 0:
01004             msg = 'Instruction Error ' + ex_message
01005             exception = NonfatalErrorCodeError(msg, error_code)
01006 
01007 class SerialOpenError(Exception):
01008     def __init__(self, port, baud):
01009         Exception.__init__(self)
01010         self.message = "Cannot open port '%s' at %d bps" %(port, baud)
01011         self.port = port
01012         self.baud = baud
01013     def __str__(self):
01014         return self.message
01015 
01016 class ChecksumError(Exception):
01017     def __init__(self, servo_id, response, checksum):
01018         Exception.__init__(self)
01019         self.message = 'Checksum received from motor %d does not match the expected one (%d != %d)' \
01020                        %(servo_id, response[-1], checksum)
01021         self.response_data = response
01022         self.expected_checksum = checksum
01023     def __str__(self):
01024         return self.message
01025 
01026 class FatalErrorCodeError(Exception):
01027     def __init__(self, message, ec_const):
01028         Exception.__init__(self)
01029         self.message = message
01030         self.error_code = ec_const
01031     def __str__(self):
01032         return self.message
01033 
01034 class NonfatalErrorCodeError(Exception):
01035     def __init__(self, message, ec_const):
01036         Exception.__init__(self)
01037         self.message = message
01038         self.error_code = ec_const
01039     def __str__(self):
01040         return self.message
01041 
01042 class ErrorCodeError(Exception):
01043     def __init__(self, message, ec_const):
01044         Exception.__init__(self)
01045         self.message = message
01046         self.error_code = ec_const
01047     def __str__(self):
01048         return self.message
01049 
01050 class DroppedPacketError(Exception):
01051     def __init__(self, message):
01052         Exception.__init__(self)
01053         self.message = message
01054     def __str__(self):
01055         return self.message
01056 
01057 class UnsupportedFeatureError(Exception):
01058     def __init__(self, model_id, feature_id):
01059         Exception.__init__(self)
01060         if model_id in DXL_MODEL_TO_PARAMS:
01061             model = DXL_MODEL_TO_PARAMS[model_id]['name']
01062         else:
01063             model = 'Unknown'
01064         self.message = "Feature %d not supported by model %d (%s)" %(feature_id, model_id, model)
01065     def __str__(self):
01066         return self.message
01067 


dynamixel_driver
Author(s): Antons Rebguns, Cody Jorgensen
autogenerated on Thu Jun 6 2019 18:17:33