arm.py
Go to the documentation of this file.
1 
2 import glob
3 import serial
4 import time
5 import usb
6 import string
7 import sys
8 import time
9 
10 # Arm controller serial properties.
11 BAUD_RATE = 19200
12 PARITY = serial.PARITY_NONE
13 STOP_BITS = serial.STOPBITS_TWO
14 BYTE_SIZE = serial.EIGHTBITS
15 
16 OUTPUT_ENCODING = 'latin_1'
17 
18 # The read timeout is intentionally long, so that a complete calibration cycle
19 # can happen before timing out.
20 READ_TIMEOUT = 0.1
21 READ_SLEEP_TIME = 0.05
22 
23 CMD_SUCCESS = 'OK'
24 CMD_ERROR = 'ABORTED'
25 RESPONSE_END_WORDS = [CMD_SUCCESS, CMD_ERROR]
26 
27 # Assumption: Strings end with a keyword, followed only by ASCII
28 # whitespace and >, which is to be stripped off.
29 OUTPUT_STRIP_CHARS = string.whitespace + '>'
30 
31 
32 class ArmException(Exception):
33  ''' Exception raised when things go wrong with the robot arm. '''
34  pass
35 
36 
37 def r12_serial_port(port):
38  ''' Create a serial connect to the arm. '''
39  return serial.Serial(port, baudrate=BAUD_RATE, parity=PARITY,
40  stopbits=STOP_BITS, bytesize=BYTE_SIZE)
41 
42 
43 def search_for_port(port_glob, req, expected_res):
44  ''' Find the serial port the arm is connected to. '''
45 
46  # Check that the USB port actually exists, based on the known vendor and
47  # product ID.
48  if usb.core.find(idVendor=0x0403, idProduct=0x6001) is None:
49  return None
50 
51  # Find ports matching the supplied glob.
52  ports = glob.glob(port_glob)
53  if len(ports) == 0:
54  return None
55 
56  for port in ports:
57  with r12_serial_port(port) as ser:
58  if not ser.isOpen():
59  ser.open()
60 
61  '''# Write a request out.
62  if sys.version_info[0] == 2:
63  ser.write(str(req).encode('utf-8'))
64  else:
65  ser.write(bytes(req, 'utf-8'))
66 
67  # Wait a short period to allow the connection to generate output.
68  time.sleep(0.1)
69 
70  # Read output from the serial connection check if it's what we want.
71  res = ser.read(ser.in_waiting).decode(OUTPUT_ENCODING)
72  if expected_res in res:'''
73  return port
74 
75  raise ArmException('ST Robotics connection found, but is not responsive.'
76  + ' Is the arm powered on?')
77  return None
78 
79 
80 def ending_in(s, li):
81  ''' If s ends with an element of the list li, that element will be
82  returned. If multiple elements match, the first will be returned. If no
83  elements match, returns None. '''
84  for ending in li:
85  if s.endswith(ending):
86  return ending
87  return None
88 
89 
90 class Arm(object):
91  ''' Represents an ST Robotics arm. '''
92 
93  def __init__(self):
94  self.ser = None
95  self.port = None
96 
97 
98  def connect(self, port=None):
99  ''' Open a serial connection to the arm. '''
100  if port is None:
101  self.port = search_for_port('/dev/ttyUSB*', 'ROBOFORTH\r\n',
102  'ROBOFORTH')
103  else:
104  self.port = port
105 
106  if self.port is None:
107  raise ArmException('ST Robotics connection not found.')
108 
109  self.ser = r12_serial_port(self.port)
110 
111  if not self.ser.isOpen():
112  self.ser.open()
113 
114  if not self.ser.isOpen():
115  raise ArmException('Failed to open serial port. Exiting.')
116 
117  return self.port
118 
119 
120  def disconnect(self):
121  ''' Disconnect from the arm. '''
122  self.ser.close()
123  self.ser = None
124  self.port = None
125 
126 
127  def write(self, text):
128  ''' Write text out to the arm. '''
129  # Output is converted to bytes with Windows-style line endings.
130  if sys.version_info[0] == 2:
131  text_bytes = str(text.upper()).encode('utf-8')
132  else:
133  text_bytes = bytes(text.upper(), 'utf-8')
134  self.ser.write(text_bytes)
135  time.sleep(0.05)
136 
137 
138  def read(self, timeout=READ_TIMEOUT, raw=False):
139  ''' Read data from the arm. Data is returned as a latin_1 encoded
140  string, or raw bytes if 'raw' is True. '''
141  time.sleep(READ_SLEEP_TIME)
142  raw_out = self.ser.read(self.ser.in_waiting)
143  out = raw_out.decode(OUTPUT_ENCODING)
144  time_waiting = 0
145  while len(out) == 0 or ending_in(out.strip(OUTPUT_STRIP_CHARS), RESPONSE_END_WORDS) is None:
146  time.sleep(READ_SLEEP_TIME)
147  time_waiting += READ_SLEEP_TIME
148  raw_out += self.ser.read(self.ser.in_waiting)
149  out = raw_out.decode(OUTPUT_ENCODING)
150  # TODO how to handle timeouts, if they're now unexpected?
151  if time_waiting >= timeout:
152  break
153  if raw:
154  return raw_out
155  return out
156 
157  def stop_route(self):
158  '''Sends a command to stop the arm during the $RUN procedure'''
159  self.read()
160  self.write('S')
161  self.write('\r\n')
162  self.read()
163  return
164 
165  def read_pos(self, pos):
166  '''Reads arm position during $RUN and returns joint counts as an integer list, ordered upwards from waist to wrist. Takes input last known position in joint counts'''
167  self.read()
168  self.write('$')
169  self.write('\r\n')
170  out = self.read()
171  if (out.find('SPEED = ')) <> -1 : #Strip away SPEED = NUMBER output at start of $RUN
172  out = out[ (out.find('SPEED = ') + 8) :]
173  for i in range (0, len(out)):
174  if not out[i].isdigit() :
175  break
176  out = out[i:]
177  out = out.strip()
178  if len(out) > 0 :
179  for i in range (0,5) :
180  space = True
181  while space == True :
182  if not (out[0].isdigit() or out[0] == '-') :
183  out = out[1:]
184  else :
185  space = False
186  num = ''
187  while space == False :
188  if (out[0].isdigit() or out[0] == '-') :
189  num += out[0]
190  out = out[1:]
191  else :
192  space = True
193  pos[i] = int(num)
194  return pos
195 
196 
197  def dump(self, raw=False):
198  ''' Dump all output currently in the arm's output queue. '''
199  raw_out = self.ser.read(self.ser.in_waiting)
200  if raw:
201  return raw_out
202  return raw_out.decode(OUTPUT_ENCODING)
203 
204 
205  def is_connected(self):
206  ''' True if the serial connection to arm is open. False otherwise. '''
207  return self.ser.isOpen() if self.ser else False
208 
209 
210  def get_info(self):
211  ''' Returns status of the robot arm. '''
212  return {
213  'Connected': self.is_connected(),
214  'Port': self.port,
215  'Bytes Waiting': self.ser.in_waiting if self.ser else 0
216  }
217 
def disconnect(self)
Definition: arm.py:120
def dump(self, raw=False)
Definition: arm.py:197
def stop_route(self)
Definition: arm.py:157
def __init__(self)
Definition: arm.py:93
def r12_serial_port(port)
Definition: arm.py:37
def ending_in(s, li)
Definition: arm.py:80
ser
Definition: arm.py:94
def get_info(self)
Definition: arm.py:210
def search_for_port(port_glob, req, expected_res)
Definition: arm.py:43
port
Definition: arm.py:95
def write(self, text)
Definition: arm.py:127
Definition: arm.py:90
def is_connected(self)
Definition: arm.py:205
def connect(self, port=None)
Definition: arm.py:98
def read(self, timeout=READ_TIMEOUT, raw=False)
Definition: arm.py:138
def read_pos(self, pos)
Definition: arm.py:165


r12_hardware_interface
Author(s): Andreas Hogstrand
autogenerated on Mon Feb 28 2022 23:20:33