gpio_control_utils.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 """
4 Control GPIO pins via ROS. Made to be as generic as possible, allowing the same node to be
5 used in multiple configurations/devices.
6 
7 @author cst <chris thierauf, christopher.thierauf@tufts.edu>
8 @version 0.0.1
9 @license Apache 2.0
10 @copyright Christopher Thierauf 2020.
11 
12 The copyright holder uses this copyright to release the code in accordance with the license of
13 this repository, which is a Free and Open Source license: you may use, modify, and share this
14 code in any way you see fit as described by the terms of the license. You are not legally
15 obligated to provide attribution, but it would be greatly appreciated.
16 """
17 
18 # core stuff
19 import os
20 import random
21 import subprocess
22 import sys
23 
24 import rospy
25 
26 # messages
27 from std_msgs.msg import Header
28 from gpio_control.msg import InputState, OutputState
29 
30 # a list of devices we have support for
31 VALID_DEVICES = [
32  # pi support via pigpio
33  'pi',
34  # jetson support also via pigpio
35  'jetson',
36  # beaglebone support via Adafruit_BBIO
37  'beaglebone-experimental',
38  # onion support via onionGpio, though I don't know how popular an option this is.
39  'onion-experimental',
40  # Support of standard Linux systems via the LFS
41  'generic',
42  # Don't actually do anything, just print to the screen
43  'simulated'
44 ]
45 
46 # rate at which to run the node (by default)
47 _DEFAULT_RATE_VAL = 10
48 
49 # valid imports are going to depend on our hardware and what's installed. We'll try to import
50 # everything we might use, and if we fail, keep track of it for later.
51 _IMPORTED_PIGPIO = False
52 _IMPORTED_ADAFRUIT_BBIO = False
53 _IMPORTED_ONION_GPIO = False
54 _IMPORTED_GPIO_API = False
55 
56 try:
57  import pigpio
58 
59  _IMPORTED_PIGPIO = True
60  _IMPORTED_GPIO_API = True
61 except ImportError:
62  pass
63 
64 try:
65  import Adafruit_BBIO.GPIO as BBGPIO
66 
67  _IMPORTED_ADAFRUIT_BBIO = True
68  _IMPORTED_GPIO_API = True
69 except ImportError:
70  pass
71 
72 try:
73  import onionGpio
74 
75  _IMPORTED_ONION_GPIO = True
76  _IMPORTED_GPIO_API = True
77 except ImportError:
78  pass
79 
80 # This was originally done using the 'word2num' python package, but it's pretty simple
81 # to implement via this stack overflow post:
82 # https://stackoverflow.com/questions/19504350/how-to-convert-numbers-to-words-without-using-num2word-library#19504396
83 # So we're removing a dependency here.
84 _num2words1 = {1: 'One', 2: 'Two', 3: 'Three', 4: 'Four', 5: 'Five',
85  6: 'Six', 7: 'Seven', 8: 'Eight', 9: 'Nine', 10: 'Ten',
86  11: 'Eleven', 12: 'Twelve', 13: 'Thirteen', 14: 'Fourteen',
87  15: 'Fifteen', 16: 'Sixteen', 17: 'Seventeen', 18: 'Eighteen', 19: 'Nineteen'}
88 _num2words2 = ['Twenty', 'Thirty', 'Forty', 'Fifty', 'Sixty', 'Seventy', 'Eighty', 'Ninety']
89 
90 
91 def _num2word(num: int):
92  if 0 <= num <= 19:
93  return _num2words1[num]
94  elif 20 <= num <= 99:
95  tens, remainder = divmod(num, 10)
96  return _num2words2[tens-2] + _num2words1[remainder] if remainder else _num2words2[tens-2]
97  else:
98  return _num2word(int(num/10)) + "_thousand"
99 
100 
102  """
103  Class to provide consistent function calls to different pin outputs.
104 
105  Takes in three functions as arguments, which allows us to call 'configure', 'set_low',
106  and 'set_high' on this class later, despite it being any actual implementation of gpio control
107  Also provide the option of an 'additional_shutdown' to run on close() if necessary.
108 
109  We're using functional programming here and elsewhere. If you aren't familiar with that, give
110  that topic a quick read: https://docs.python.org/3/howto/functional.html.
111  If you're reading this because you'd like to contribute (thanks!) but are feeling daunted,
112  please feel free to open an issue on the tracker so I can work with you on that.
113  """
114 
115  def __init__(self, configured_pinbus_object, pin, set_high_=None, set_low_=None):
116  self.type = 'output'
117  if configured_pinbus_object is not None:
118  self.configured_pinbus_object = configured_pinbus_object
119  self.set_high_func = set_high_
120  self.set_low_func = set_low_
121  self.pin = pin
122 
123  def set_low(self):
124  """ Pull the pin low (0v), but don't try to run None """
125  return self.set_low_func() if self.set_low_func is not None else None
126 
127  def set_high(self):
128  """ Pull the pin high (typically 3.3v or 5v), but don't try to run None """
129  return self.set_high_func() if self.set_high_func is not None else None
130 
131  def close(self):
132  """ Make sure we're cleaning up while we shut down here """
133  self.set_low()
134 
135 
137  """
138  Class to provide consistent function calls to different pin inputs.
139 
140  As before, takes in functions to be called later.
141  """
142 
143  def __init__(self, configured_pinbus_object, pin, get_=None):
144  self.type = 'input'
145  if configured_pinbus_object is not None:
146  self.configured_pinbus_object = configured_pinbus_object
147  self.get_func = get_
148  self.pin = pin
149 
150  def get(self):
151  """ Get the current state of the GPIO pin. Returns true/false to indicate high/low """
152  return self.get_func()
153 
154 
155 def _configure_input(pin, device: str, bus):
156  """
157  Configure the node as an input. Takes in a pin to access, and a string which is what was
158  passed in at the command line.
159  """
160 
161  # Run through our options, configure the generic input interfaces, and then return.
162  if device in ('pi', 'jetson'):
163  return_input = _GenericInputPin(bus, int(pin))
164  return_input.get_func = lambda: return_input.configured_pinbus_object.read(int(pin))
165  return return_input
166 
167  if device == 'beaglebone':
168  BBGPIO.setup(pin, BBGPIO.IN)
169  return _GenericInputPin(bus, pin, (lambda: BBGPIO.input(pin)))
170 
171  if device == 'onion':
172  return_input = _GenericInputPin(bus, pin)
173  return_input.configured_pinbus_object.setInputDirection()
174  return_input.get_func = return_input.configured_pinbus_object.getValue
175  return return_input
176 
177  if device == 'generic':
178  # todo: o boy do i hate this implementation
179  os.system('echo ' + str(pin) + ' > /sys/class/gpio/export')
180  os.system('echo in > /sys/class/gpio/gpio' + str(pin) + '/direction')
181 
182  return _GenericInputPin(bus,
183  pin,
184  lambda: int(subprocess.check_output(
185  ['cat', '/sys/class/gpio/gpio' + str(pin) + '/value']
186  ).decode("utf-8"))
187  )
188 
189  if device == 'simulated':
190  return _GenericInputPin(None, pin, get_=(lambda: random.choice([True, False])))
191 
192  raise RuntimeError('Device was invalid: ' + device)
193 
194 
195 def _configure_output(pin: int, device: str, bus):
196  """
197  Configure the node as an output. Takes in a pin to access, and a string which is what was
198  passed in at the command line.
199  """
200  if device in ('pi', 'jetson'):
201  # as with configure_input, we can use pigpio for both
202  return_output = _GenericOutputPin(bus, pin)
203  return_output.set_low_func = (
204  lambda: return_output.configured_pinbus_object.write(int(pin), pigpio.LOW)
205  )
206  return_output.set_high_func = (
207  lambda: return_output.configured_pinbus_object.write(int(pin), pigpio.HIGH)
208  )
209  return return_output
210 
211  if device == 'beaglebone':
212  return _GenericOutputPin(
213  lambda: BBGPIO.setup(pin, BBGPIO.OUT),
214  lambda: BBGPIO.output(pin, BBGPIO.LOW),
215  lambda: BBGPIO.output(pin, BBGPIO.HIGH),
216  )
217 
218  if device == 'onion':
219  return_output = _GenericOutputPin(onionGpio.OnionGpio(pin), pin)
220  return_output.configured_pinbus_object.setOutputDirection()
221  return_output.set_low_func = lambda: return_output.configured_pinbus_object.setValue(0)
222  return_output.set_high_func = lambda: return_output.configured_pinbus_object.setValue(1)
223  return return_output
224 
225  if device == 'generic':
226  # todo: as above
227  os.system('echo ' + str(pin) + ' > /sys/class/gpio/export')
228  os.system('echo out > /sys/class/gpio/gpio' + str(pin) + '/direction')
229 
230  return _GenericOutputPin(None,
231  pin,
232  set_low_=lambda: os.system(
233  'echo 0 > /sys/class/gpio/gpio' + str(pin) + '/value'),
234  set_high_=lambda: os.system(
235  'echo 1 > /sys/class/gpio/gpio' + str(pin) + '/value')
236  )
237 
238  if device == 'simulated':
239  return _GenericOutputPin(None, pin, set_high_=(lambda: print("[simulated] high!")),
240  set_low_=(lambda: print("[simulated] low!")))
241 
242  raise RuntimeError('Device was invalid: ' + device)
243 
244 
245 def _to_valid_ros_topic_name(input_string):
246  """
247  Convert input to a valid ROS name (alphabetic). This is necessary because ROS best practice is
248  to have topic names be only alphabetic (hyphens/slashes optional). Most GPIO pins will have
249  numbers in them, which can be an issue.
250  """
251 
252  # Don't bother dealing with things that are just numbers, convert them right away
253  if isinstance(input_string, int) or input_string.isnumeric():
254  return _num2word(int(input_string)).lower()
255 
256  # If it's alphabetic, convert numbers to characters, append numbers, and separate the
257  # two using underscores. Creates something like P8U4 -> p_eight_u_4
258  output_string = ""
259  just_did_str = False
260  just_did_num = False
261  for character in input_string:
262  if character.isalpha():
263  if just_did_num:
264  output_string += '_'
265  just_did_num = False
266  output_string += character.lower()
267  just_did_str = True
268  elif character.isnumeric():
269  if just_did_str:
270  output_string += '_'
271  just_did_str = False
272  output_string += _num2word(character).lower()
273  just_did_num = True
274  else:
275  # Don't bother putting in special characters (even though I don't think they'll come up)
276  pass
277 
278  return output_string
279 
280 
281 def configure_bus(device):
282  """
283  Configure a GPIO bus for the specific hardware we're dealing with. Return an object of the
284  appropriate hardware type, if the specific implementation requires that.
285  """
286  # Both the Pi and Jetson have the same pinout and can use the same lib.
287  if device in ('pi', 'jetson'):
288  if not _IMPORTED_PIGPIO:
289  rospy.logfatal("You want to control your device with pigpio, but it didn't import "
290  "properly. Node will exit.")
291  sys.exit(2)
292 
293  return pigpio.pi()
294 
295  if device == 'beaglebone':
296  rospy.loginfo("This implementation is currently in beta. If you encounter bugs, please "
297  "report them!")
298  if not _IMPORTED_ADAFRUIT_BBIO:
299  rospy.logfatal("You want to control your device using the Adafruit BeagleBone Black "
300  "GPIO library, but it didn't import properly. Is it installed? Node "
301  "will exit.")
302  sys.exit(2)
303 
304  if device == 'onion':
305  rospy.loginfo("This implementation is currently in beta. If you encounter bugs, please "
306  "report them!")
307  if not _IMPORTED_ONION_GPIO:
308  rospy.logfatal("You want to control your device using the Onion Omega "
309  "GPIO library, but it didn't import properly. Is it installed? Node "
310  "will exit.")
311  sys.exit(2)
312 
313  if device == 'generic':
314  rospy.logwarn('You are using the generic GPIO controller, which operates using the Linux '
315  'FHS to control GPIO states. It should not be considered as stable or safe '
316  'as non-generic options.')
317 
318  return None
319 
320 
321 def configure_cleanup(device):
322  """
323  If the device in question requires cleanup functions, give them a run.
324  """
325  if device == 'beaglebone':
326  return BBGPIO.cleanup()
327 
328  if device == 'generic':
329  unexporter = open('/sys/class/gpio/unexport', 'w')
330  # unexporter.write(str(pin)) # todo: make this a 'foreach' in gpio dir. Nothing bad will happen if we don't (probably), it's just to be nice and clean up after ourselves.
331  unexporter.close()
332 
333  return None
334 
335 
337  """
338  Generic control of a GPIO device. Wraps the setup and then provides a spinner to run.
339  """
340 
341  def __init__(self, device: str):
342  self._device = device
343  self._publishers = {}
345  self._subscribers = {}
346  self._bus = configure_bus(device)
348 
349  if device not in VALID_DEVICES:
350  rospy.logerr("I don't know that device (" + device + "). Valid devices: " +
351  str(VALID_DEVICES) + "\nExiting.")
352  sys.exit(1)
353 
354  if device not in ['generic', 'simulated'] and not _IMPORTED_GPIO_API:
355  rospy.logfatal("You're trying to use a device-specific API, but we weren't able to "
356  "import the appropriate Python library. Please make sure that you have "
357  "the right library installed properly:\n "
358  "\t* Raspberry Pi: pigpio \n"
359  "\t* Nvidia Jetson: pigpio\n"
360  "\t* Beaglebone Black: Adafruit_BBIO\n"
361  "\t* Onion Omega: onionGpio\n"
362  "Cannot recover, exiting.")
363  sys.exit(3)
364 
365  def add_input_pin(self, pin):
366  """ Add a pin to perform input IO operations. """
367  input_pin_obj = _configure_input(pin, self._device, self._bus)
368  self._generic_pin_objects[pin] = input_pin_obj
369  self._publishers[pin] = rospy.Publisher("gpio_inputs/" + _to_valid_ros_topic_name(pin),
370  InputState,
371  queue_size=1)
372 
373  def add_output_pin(self, pin):
374  """ Add a pin to perform output IO operations. """
375  output_pin_obj = _configure_output(pin, self._device, self._bus)
376 
377  def subscriber_callback(msg):
378  """
379  Subscriber callback.
380  Called every time a message comes in telling us to change a pin state.
381  """
382  if msg.state:
383  output_pin_obj.set_high_func()
384  elif not msg.state:
385  output_pin_obj.set_low_func()
386  else:
387  rospy.logerr("Not sure how to deal with " + str(msg))
388 
389  self._generic_pin_objects[pin] = output_pin_obj
390  self._subscribers[pin] = rospy.Subscriber("gpio_outputs/" + _to_valid_ros_topic_name(pin),
391  OutputState,
392  subscriber_callback)
393 
394  def spin(self, rate_val: int = None):
395  """ Wrapping the spinner function. """
396  # Here's where we're doing the actual spinning: read the pin, set up a message, publish,
397  # rate.sleep(), repeat.
398  if rate_val is None:
399  rate_val = _DEFAULT_RATE_VAL
400  rate = rospy.Rate(rate_val)
401  while not rospy.is_shutdown():
402  for pin_obj in self._generic_pin_objects.values():
403  if pin_obj.type == 'input':
404  val = pin_obj.get()
405 
406  header = Header()
407  header.stamp = rospy.Time.now()
408  # Different implementations might give us this in int or bool form.
409  # Check both and do the appropriate thing with each.
410  if (isinstance(val, int) and val == 1) or (isinstance(val, bool) and val):
411  val = True
412  elif (isinstance(val, int) and val == 0) or (isinstance(val, bool) and not val):
413  val = False
414  else:
415  rospy.logerr("Not sure how to deal with " + str(val))
416 
417  try:
418  self._publishers[str(pin_obj.pin)].publish(
419  InputState(header, val, str(pin_obj.pin))
420  )
421  except KeyError:
422  rospy.logfatal_once("KeyError, you tried getting " +
423  str(pin_obj.pin) + " but only " +
424  str(self._publishers.keys()) +
425  " is acceptable")
426 
427  rate.sleep()
428 
429  def set_pin(self, pin, state: bool):
430  """
431  If using this code as an import, provide a simple function to set the pin.
432  """
433  if pin not in self._generic_pin_objects.keys():
434  rospy.logerr("The pin you requested (" + str(pin) +
435  ") isn't in the list of ones we know about: " +
436  str(self._generic_pin_objects.keys()))
437 
438  if not self._generic_pin_objects[pin].type == 'output':
439  raise EnvironmentError("This pin is not an output! Can't set the state.")
440 
441  if state:
442  pin.set_high()
443  else:
444  pin.set_low()
445 
446  def get_pin(self, pin):
447  """
448  If using this code as an import, provide a simple function to get the state of the pin.
449  """
450  if pin not in self._generic_pin_objects.keys():
451  rospy.logerr("The pin you requested (" + str(pin) +
452  ") isn't in the list of ones we know about: " +
453  str(self._generic_pin_objects.keys()))
454 
455  if not self._generic_pin_objects[pin].type == 'output':
456  raise EnvironmentError("This pin is not an output! Can't set the state.")
457 
458  return pin.get()
def __init__(self, configured_pinbus_object, pin, set_high_=None, set_low_=None)
def __init__(self, configured_pinbus_object, pin, get_=None)
def _to_valid_ros_topic_name(input_string)


gpio_control
Author(s):
autogenerated on Thu Apr 22 2021 02:55:39