SerialClient.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #####################################################################
4 # Software License Agreement (BSD License)
5 #
6 # Copyright (c) 2011, Willow Garage, Inc.
7 # All rights reserved.
8 #
9 # Redistribution and use in source and binary forms, with or without
10 # modification, are permitted provided that the following conditions
11 # are met:
12 #
13 # * Redistributions of source code must retain the above copyright
14 # notice, this list of conditions and the following disclaimer.
15 # * Redistributions in binary form must reproduce the above
16 # copyright notice, this list of conditions and the following
17 # disclaimer in the documentation and/or other materials provided
18 # with the distribution.
19 # * Neither the name of Willow Garage, Inc. nor the names of its
20 # contributors may be used to endorse or promote products derived
21 # from this software without specific prior written permission.
22 #
23 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 # POSSIBILITY OF SUCH DAMAGE.
35 
36 __author__ = "mferguson@willowgarage.com (Michael Ferguson)"
37 
38 import imp
39 import threading
40 import sys
41 import multiprocessing
42 import StringIO
43 import errno
44 import signal
45 import socket
46 import struct
47 import time
48 from Queue import Queue
49 
50 from serial import Serial, SerialException, SerialTimeoutException
51 
52 import roslib
53 import rospy
54 from std_msgs.msg import Time
55 from rosserial_msgs.msg import TopicInfo, Log
56 from rosserial_msgs.srv import RequestParamRequest, RequestParamResponse
57 
58 import diagnostic_msgs.msg
59 
60 ERROR_MISMATCHED_PROTOCOL = "Mismatched protocol version in packet: lost sync or rosserial_python is from different ros release than the rosserial client"
61 ERROR_NO_SYNC = "no sync with device"
62 ERROR_PACKET_FAILED = "Packet Failed : Failed to read msg data"
63 
64 def load_pkg_module(package, directory):
65  #check if its in the python path
66  path = sys.path
67  try:
68  imp.find_module(package)
69  except ImportError:
70  roslib.load_manifest(package)
71  try:
72  m = __import__( package + '.' + directory )
73  except ImportError:
74  rospy.logerr( "Cannot import package : %s"% package )
75  rospy.logerr( "sys.path was " + str(path) )
76  return None
77  return m
78 
79 def load_message(package, message):
80  m = load_pkg_module(package, 'msg')
81  m2 = getattr(m, 'msg')
82  return getattr(m2, message)
83 
84 def load_service(package,service):
85  s = load_pkg_module(package, 'srv')
86  s = getattr(s, 'srv')
87  srv = getattr(s, service)
88  mreq = getattr(s, service+"Request")
89  mres = getattr(s, service+"Response")
90  return srv,mreq,mres
91 
92 class Publisher:
93  """
94  Publisher forwards messages from the serial device to ROS.
95  """
96  def __init__(self, topic_info):
97  """ Create a new publisher. """
98  self.topic = topic_info.topic_name
99 
100  # find message type
101  package, message = topic_info.message_type.split('/')
102  self.message = load_message(package, message)
103  if self.message._md5sum == topic_info.md5sum:
104  self.publisher = rospy.Publisher(self.topic, self.message, queue_size=10)
105  else:
106  raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
107 
108  def handlePacket(self, data):
109  """ Forward message to ROS network. """
110  m = self.message()
111  m.deserialize(data)
112  self.publisher.publish(m)
113 
114 
116  """
117  Subscriber forwards messages from ROS to the serial device.
118  """
119 
120  def __init__(self, topic_info, parent):
121  self.topic = topic_info.topic_name
122  self.id = topic_info.topic_id
123  self.parent = parent
124 
125  # find message type
126  package, message = topic_info.message_type.split('/')
127  self.message = load_message(package, message)
128  if self.message._md5sum == topic_info.md5sum:
129  self.subscriber = rospy.Subscriber(self.topic, self.message, self.callback)
130  else:
131  raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
132 
133  def callback(self, msg):
134  """ Forward message to serial device. """
135  data_buffer = StringIO.StringIO()
136  msg.serialize(data_buffer)
137  self.parent.send(self.id, data_buffer.getvalue())
138 
139  def unregister(self):
140  rospy.loginfo("Removing subscriber: %s", self.topic)
141  self.subscriber.unregister()
142 
144  """
145  ServiceServer responds to requests from ROS.
146  """
147 
148  def __init__(self, topic_info, parent):
149  self.topic = topic_info.topic_name
150  self.parent = parent
151 
152  # find message type
153  package, service = topic_info.message_type.split('/')
154  s = load_pkg_module(package, 'srv')
155  s = getattr(s, 'srv')
156  self.mreq = getattr(s, service+"Request")
157  self.mres = getattr(s, service+"Response")
158  srv = getattr(s, service)
159  self.service = rospy.Service(self.topic, srv, self.callback)
160 
161  # response message
162  self.data = None
163 
164  def unregister(self):
165  rospy.loginfo("Removing service: %s", self.topic)
166  self.service.shutdown()
167 
168  def callback(self, req):
169  """ Forward request to serial device. """
170  data_buffer = StringIO.StringIO()
171  req.serialize(data_buffer)
172  self.response = None
173  if self.parent.send(self.id, data_buffer.getvalue()) >= 0:
174  while self.response is None:
175  pass
176  return self.response
177 
178  def handlePacket(self, data):
179  """ Forward response to ROS network. """
180  r = self.mres()
181  r.deserialize(data)
182  self.response = r
183 
184 
186  """
187  ServiceServer responds to requests from ROS.
188  """
189 
190  def __init__(self, topic_info, parent):
191  self.topic = topic_info.topic_name
192  self.parent = parent
193 
194  # find message type
195  package, service = topic_info.message_type.split('/')
196  s = load_pkg_module(package, 'srv')
197  s = getattr(s, 'srv')
198  self.mreq = getattr(s, service+"Request")
199  self.mres = getattr(s, service+"Response")
200  srv = getattr(s, service)
201  rospy.loginfo("Starting service client, waiting for service '" + self.topic + "'")
202  rospy.wait_for_service(self.topic)
203  self.proxy = rospy.ServiceProxy(self.topic, srv)
204 
205  def handlePacket(self, data):
206  """ Forward request to ROS network. """
207  req = self.mreq()
208  req.deserialize(data)
209  # call service proxy
210  resp = self.proxy(req)
211  # serialize and publish
212  data_buffer = StringIO.StringIO()
213  resp.serialize(data_buffer)
214  self.parent.send(self.id, data_buffer.getvalue())
215 
217  """
218  RosSerialServer waits for a socket connection then passes itself, forked as a
219  new process, to SerialClient which uses it as a serial port. It continues to listen
220  for additional connections. Each forked process is a new ros node, and proxies ros
221  operations (e.g. publish/subscribe) from its connection to the rest of ros.
222  """
223  def __init__(self, tcp_portnum, fork_server=False):
224  print "Fork_server is: ", fork_server
225  self.tcp_portnum = tcp_portnum
226  self.fork_server = fork_server
227 
228  def listen(self):
229  self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
230  self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
231  #bind the socket to a public host, and a well-known port
232  self.serversocket.bind(("", self.tcp_portnum)) #become a server socket
233  self.serversocket.listen(1)
234 
235  while True:
236  #accept connections
237  print "waiting for socket connection"
238  (clientsocket, address) = self.serversocket.accept()
239 
240  #now do something with the clientsocket
241  rospy.loginfo("Established a socket connection from %s on port %s" % (address))
242  self.socket = clientsocket
243  self.isConnected = True
244 
245  if self.fork_server: # if configured to launch server in a separate process
246  rospy.loginfo("Forking a socket server process")
247  process = multiprocessing.Process(target=self.startSocketServer, args=(address))
248  process.daemon = True
249  process.start()
250  rospy.loginfo("launched startSocketServer")
251  else:
252  rospy.loginfo("calling startSerialClient")
253  self.startSerialClient()
254  rospy.loginfo("startSerialClient() exited")
255 
256  def startSerialClient(self):
257  client = SerialClient(self)
258  try:
259  client.run()
260  except KeyboardInterrupt:
261  pass
262  except RuntimeError:
263  rospy.loginfo("RuntimeError exception caught")
264  self.isConnected = False
265  except socket.error:
266  rospy.loginfo("socket.error exception caught")
267  self.isConnected = False
268  finally:
269  self.socket.close()
270  for sub in client.subscribers.values():
271  sub.unregister()
272  for srv in client.services.values():
273  srv.unregister()
274  #pass
275 
276  def startSocketServer(self, port, address):
277  rospy.loginfo("starting ROS Serial Python Node serial_node-%r" % (address,))
278  rospy.init_node("serial_node_%r" % (address,))
279  self.startSerialClient()
280 
281  def flushInput(self):
282  pass
283 
284  def write(self, data):
285  if not self.isConnected:
286  return
287  length = len(data)
288  totalsent = 0
289 
290  while totalsent < length:
291  sent = self.socket.send(data[totalsent:])
292  if sent == 0:
293  raise RuntimeError("RosSerialServer.write() socket connection broken")
294  totalsent = totalsent + sent
295 
296  def read(self, rqsted_length):
297  self.msg = ''
298  if not self.isConnected:
299  return self.msg
300 
301  while len(self.msg) < rqsted_length:
302  chunk = self.socket.recv(rqsted_length - len(self.msg))
303  if chunk == '':
304  raise RuntimeError("RosSerialServer.read() socket connection broken")
305  self.msg = self.msg + chunk
306  return self.msg
307 
308  def inWaiting(self):
309  try: # the caller checks just for <1, so we'll peek at just one byte
310  chunk = self.socket.recv(1, socket.MSG_DONTWAIT|socket.MSG_PEEK)
311  if chunk == '':
312  raise RuntimeError("RosSerialServer.inWaiting() socket connection broken")
313  return len(chunk)
314  except socket.error, e:
315  if e.args[0] == errno.EWOULDBLOCK:
316  return 0
317  raise
318 
319 class SerialClient(object):
320  """
321  ServiceServer responds to requests from the serial device.
322  """
323 
324  def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False):
325  """ Initialize node, connect to bus, attempt to negotiate topics. """
326 
327  self.read_lock = threading.RLock()
328 
329  self.write_lock = threading.RLock()
330  self.write_queue = Queue()
331  self.write_thread = None
332 
333  self.lastsync = rospy.Time(0)
334  self.lastsync_lost = rospy.Time(0)
335  self.lastsync_success = rospy.Time(0)
336  self.last_read = rospy.Time(0)
337  self.last_write = rospy.Time(0)
338  self.timeout = timeout
339  self.synced = False
340  self.fix_pyserial_for_test = fix_pyserial_for_test
341 
342  self.pub_diagnostics = rospy.Publisher('/diagnostics', diagnostic_msgs.msg.DiagnosticArray, queue_size=10)
343 
344  if port is None:
345  # no port specified, listen for any new port?
346  pass
347  elif hasattr(port, 'read'):
348  #assume its a filelike object
349  self.port=port
350  else:
351  # open a specific port
352  while not rospy.is_shutdown():
353  try:
354  if self.fix_pyserial_for_test:
355  # see https://github.com/pyserial/pyserial/issues/59
356  self.port = Serial(port, baud, timeout=self.timeout, write_timeout=10, rtscts=True, dsrdtr=True)
357  else:
358  self.port = Serial(port, baud, timeout=self.timeout, write_timeout=10)
359  break
360  except SerialException as e:
361  rospy.logerr("Error opening serial: %s", e)
362  time.sleep(3)
363 
364  if rospy.is_shutdown():
365  return
366 
367  time.sleep(0.1) # Wait for ready (patch for Uno)
368 
369  # hydro introduces protocol ver2 which must match node_handle.h
370  # The protocol version is sent as the 2nd sync byte emitted by each end
371  self.protocol_ver1 = '\xff'
372  self.protocol_ver2 = '\xfe'
374 
375  self.publishers = dict() # id:Publishers
376  self.subscribers = dict() # topic:Subscriber
377  self.services = dict() # topic:Service
378 
379  self.buffer_out = -1
380  self.buffer_in = -1
381 
382  self.callbacks = dict()
383  # endpoints for creating new pubs/subs
384  self.callbacks[TopicInfo.ID_PUBLISHER] = self.setupPublisher
385  self.callbacks[TopicInfo.ID_SUBSCRIBER] = self.setupSubscriber
386  # service client/servers have 2 creation endpoints (a publisher and a subscriber)
387  self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_PUBLISHER] = self.setupServiceServerPublisher
388  self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_SUBSCRIBER] = self.setupServiceServerSubscriber
389  self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_PUBLISHER] = self.setupServiceClientPublisher
390  self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_SUBSCRIBER] = self.setupServiceClientSubscriber
391  # custom endpoints
392  self.callbacks[TopicInfo.ID_PARAMETER_REQUEST] = self.handleParameterRequest
393  self.callbacks[TopicInfo.ID_LOG] = self.handleLoggingRequest
394  self.callbacks[TopicInfo.ID_TIME] = self.handleTimeRequest
395 
396  rospy.sleep(2.0)
397  self.requestTopics()
398  self.lastsync = rospy.Time.now()
399 
400  signal.signal(signal.SIGINT, self.txStopRequest)
401 
402  def requestTopics(self):
403  """ Determine topics to subscribe/publish. """
404  rospy.loginfo('Requesting topics...')
405 
406  # TODO remove if possible
407  if not self.fix_pyserial_for_test:
408  with self.read_lock:
409  self.port.flushInput()
410 
411  # request topic sync
412  self.write_queue.put("\xff" + self.protocol_ver + "\x00\x00\xff\x00\x00\xff")
413 
414  def txStopRequest(self, signal, frame):
415  """ send stop tx request to arduino when receive SIGINT(Ctrl-c)"""
416  if not self.fix_pyserial_for_test:
417  with self.read_lock:
418  self.port.flushInput()
419 
420  self.write_queue.put("\xff" + self.protocol_ver + "\x00\x00\xff\x0b\x00\xf4")
421 
422  # tx_stop_request is x0b
423  rospy.loginfo("Send tx stop request")
424  sys.exit(0)
425 
426  def tryRead(self, length):
427  try:
428  read_start = time.time()
429  bytes_remaining = length
430  result = bytearray()
431  while bytes_remaining != 0 and time.time() - read_start < self.timeout:
432  with self.read_lock:
433  received = self.port.read(bytes_remaining)
434  if len(received) != 0:
435  self.last_read = rospy.Time.now()
436  result.extend(received)
437  bytes_remaining -= len(received)
438 
439  if bytes_remaining != 0:
440  raise IOError("Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining))
441 
442  return bytes(result)
443  except Exception as e:
444  raise IOError("Serial Port read failure: %s" % e)
445 
446  def run(self):
447  """ Forward recieved messages to appropriate publisher. """
448 
449  # Launch write thread.
450  if self.write_thread is None:
451  self.write_thread = threading.Thread(target=self.processWriteQueue)
452  self.write_thread.daemon = True
453  self.write_thread.start()
454 
455  # Handle reading.
456  data = ''
457  read_step = None
458  while not rospy.is_shutdown():
459  if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3):
460  if self.synced:
461  rospy.logerr("Lost sync with device, restarting...")
462  else:
463  rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
464  self.lastsync_lost = rospy.Time.now()
465  self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC)
466  self.requestTopics()
467  self.lastsync = rospy.Time.now()
468 
469  # This try-block is here because we make multiple calls to read(). Any one of them can throw
470  # an IOError if there's a serial problem or timeout. In that scenario, a single handler at the
471  # bottom attempts to reconfigure the topics.
472  try:
473  with self.read_lock:
474  if self.port.inWaiting() < 1:
475  time.sleep(0.001)
476  continue
477 
478  # Find sync flag.
479  flag = [0, 0]
480  read_step = 'syncflag'
481  flag[0] = self.tryRead(1)
482  if (flag[0] != '\xff'):
483  continue
484 
485  # Find protocol version.
486  read_step = 'protocol'
487  flag[1] = self.tryRead(1)
488  if flag[1] != self.protocol_ver:
489  self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL)
490  rospy.logerr("Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1]))
491  protocol_ver_msgs = {'\xff': 'Rev 0 (rosserial 0.4 and earlier)', '\xfe': 'Rev 1 (rosserial 0.5+)', '\xfd': 'Some future rosserial version'}
492  if flag[1] in protocol_ver_msgs:
493  found_ver_msg = 'Protocol version of client is ' + protocol_ver_msgs[flag[1]]
494  else:
495  found_ver_msg = "Protocol version of client is unrecognized"
496  rospy.loginfo("%s, expected %s" % (found_ver_msg, protocol_ver_msgs[self.protocol_ver]))
497  continue
498 
499  # Read message length.
500  read_step = 'message length'
501  msg_len_bytes = self.tryRead(2)
502  msg_length, = struct.unpack("<h", msg_len_bytes)
503 
504  # Read message length checksum.
505  read_step = 'message length checksum'
506  msg_len_chk = self.tryRead(1)
507  msg_len_checksum = sum(map(ord, msg_len_bytes)) + ord(msg_len_chk)
508 
509  # Validate message length checksum.
510  if msg_len_checksum % 256 != 255:
511  rospy.loginfo("wrong checksum for msg length, length %d" %(msg_length))
512  rospy.loginfo("chk is %d" % ord(msg_len_chk))
513  continue
514 
515  # Read topic id (2 bytes)
516  read_step = 'topic id'
517  topic_id_header = self.tryRead(2)
518  topic_id, = struct.unpack("<h", topic_id_header)
519 
520  # Read serialized message data.
521  read_step = 'data'
522  try:
523  msg = self.tryRead(msg_length)
524  except IOError:
525  self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_PACKET_FAILED)
526  rospy.loginfo("Packet Failed : Failed to read msg data")
527  rospy.loginfo("expected msg length is %d", msg_length)
528  raise
529 
530  # Reada checksum for topic id and msg
531  read_step = 'data checksum'
532  chk = self.tryRead(1)
533  checksum = sum(map(ord, topic_id_header) ) + sum(map(ord, msg)) + ord(chk)
534 
535  # Validate checksum.
536  if checksum % 256 == 255:
537  self.synced = True
538  self.lastsync_success = rospy.Time.now()
539  try:
540  self.callbacks[topic_id](msg)
541  except KeyError:
542  rospy.logerr("Tried to publish before configured, topic id %d" % topic_id)
543  self.requestTopics()
544  rospy.sleep(0.001)
545  else:
546  rospy.loginfo("wrong checksum for topic id and msg")
547 
548  except IOError as exc:
549  rospy.logwarn('Last read step: %s' % read_step)
550  rospy.logwarn('Run loop error: %s' % exc)
551  # One of the read calls had an issue. Just to be safe, request that the client
552  # reinitialize their topics.
553  with self.read_lock:
554  self.port.flushInput()
555  with self.write_lock:
556  self.port.flushOutput()
557  self.requestTopics()
558 
559  def setPublishSize(self, bytes):
560  if self.buffer_out < 0:
561  self.buffer_out = bytes
562  rospy.loginfo("Note: publish buffer size is %d bytes" % self.buffer_out)
563 
564  def setSubscribeSize(self, bytes):
565  if self.buffer_in < 0:
566  self.buffer_in = bytes
567  rospy.loginfo("Note: subscribe buffer size is %d bytes" % self.buffer_in)
568 
569  def setupPublisher(self, data):
570  """ Register a new publisher. """
571  try:
572  msg = TopicInfo()
573  msg.deserialize(data)
574  pub = Publisher(msg)
575  self.publishers[msg.topic_id] = pub
576  self.callbacks[msg.topic_id] = pub.handlePacket
577  self.setPublishSize(msg.buffer_size)
578  rospy.loginfo("Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) )
579  except Exception as e:
580  rospy.logerr("Creation of publisher failed: %s", e)
581 
582  def setupSubscriber(self, data):
583  """ Register a new subscriber. """
584  try:
585  msg = TopicInfo()
586  msg.deserialize(data)
587  if not msg.topic_name in self.subscribers.keys():
588  sub = Subscriber(msg, self)
589  self.subscribers[msg.topic_name] = sub
590  self.setSubscribeSize(msg.buffer_size)
591  rospy.loginfo("Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) )
592  elif msg.message_type != self.subscribers[msg.topic_name].message._type:
593  old_message_type = self.subscribers[msg.topic_name].message._type
594  self.subscribers[msg.topic_name].unregister()
595  sub = Subscriber(msg, self)
596  self.subscribers[msg.topic_name] = sub
597  self.setSubscribeSize(msg.buffer_size)
598  rospy.loginfo("Change the message type of subscriber on %s from [%s] to [%s]" % (msg.topic_name, old_message_type, msg.message_type) )
599  except Exception as e:
600  rospy.logerr("Creation of subscriber failed: %s", e)
601 
603  """ Register a new service server. """
604  try:
605  msg = TopicInfo()
606  msg.deserialize(data)
607  self.setPublishSize(msg.buffer_size)
608  try:
609  srv = self.services[msg.topic_name]
610  except KeyError:
611  srv = ServiceServer(msg, self)
612  rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
613  self.services[msg.topic_name] = srv
614  if srv.mres._md5sum == msg.md5sum:
615  self.callbacks[msg.topic_id] = srv.handlePacket
616  else:
617  raise Exception('Checksum does not match: ' + srv.mres._md5sum + ',' + msg.md5sum)
618  except Exception as e:
619  rospy.logerr("Creation of service server failed: %s", e)
620 
622  """ Register a new service server. """
623  try:
624  msg = TopicInfo()
625  msg.deserialize(data)
626  self.setSubscribeSize(msg.buffer_size)
627  try:
628  srv = self.services[msg.topic_name]
629  except KeyError:
630  srv = ServiceServer(msg, self)
631  rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
632  self.services[msg.topic_name] = srv
633  if srv.mreq._md5sum == msg.md5sum:
634  srv.id = msg.topic_id
635  else:
636  raise Exception('Checksum does not match: ' + srv.mreq._md5sum + ',' + msg.md5sum)
637  except Exception as e:
638  rospy.logerr("Creation of service server failed: %s", e)
639 
641  """ Register a new service client. """
642  try:
643  msg = TopicInfo()
644  msg.deserialize(data)
645  self.setPublishSize(msg.buffer_size)
646  try:
647  srv = self.services[msg.topic_name]
648  except KeyError:
649  srv = ServiceClient(msg, self)
650  rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
651  self.services[msg.topic_name] = srv
652  if srv.mreq._md5sum == msg.md5sum:
653  self.callbacks[msg.topic_id] = srv.handlePacket
654  else:
655  raise Exception('Checksum does not match: ' + srv.mreq._md5sum + ',' + msg.md5sum)
656  except Exception as e:
657  rospy.logerr("Creation of service client failed: %s", e)
658 
660  """ Register a new service client. """
661  try:
662  msg = TopicInfo()
663  msg.deserialize(data)
664  self.setSubscribeSize(msg.buffer_size)
665  try:
666  srv = self.services[msg.topic_name]
667  except KeyError:
668  srv = ServiceClient(msg, self)
669  rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
670  self.services[msg.topic_name] = srv
671  if srv.mres._md5sum == msg.md5sum:
672  srv.id = msg.topic_id
673  else:
674  raise Exception('Checksum does not match: ' + srv.mres._md5sum + ',' + msg.md5sum)
675  except Exception as e:
676  rospy.logerr("Creation of service client failed: %s", e)
677 
678  def handleTimeRequest(self, data):
679  """ Respond to device with system time. """
680  t = Time()
681  t.data = rospy.Time.now()
682  data_buffer = StringIO.StringIO()
683  t.serialize(data_buffer)
684  self.send( TopicInfo.ID_TIME, data_buffer.getvalue() )
685  self.lastsync = rospy.Time.now()
686 
687  def handleParameterRequest(self, data):
688  """ Send parameters to device. Supports only simple datatypes and arrays of such. """
689  req = RequestParamRequest()
690  req.deserialize(data)
691  resp = RequestParamResponse()
692  try:
693  param = rospy.get_param(req.name)
694  except KeyError:
695  rospy.logerr("Parameter %s does not exist"%req.name)
696  return
697 
698  if param is None:
699  rospy.logerr("Parameter %s does not exist"%req.name)
700  return
701 
702  if isinstance(param, dict):
703  rospy.logerr("Cannot send param %s because it is a dictionary"%req.name)
704  return
705  if not isinstance(param, list):
706  param = [param]
707  #check to make sure that all parameters in list are same type
708  t = type(param[0])
709  for p in param:
710  if t!= type(p):
711  rospy.logerr('All Paramers in the list %s must be of the same type'%req.name)
712  return
713  if t == int or t == bool:
714  resp.ints = param
715  if t == float:
716  resp.floats =param
717  if t == str:
718  resp.strings = param
719  data_buffer = StringIO.StringIO()
720  resp.serialize(data_buffer)
721  self.send(TopicInfo.ID_PARAMETER_REQUEST, data_buffer.getvalue())
722 
723  def handleLoggingRequest(self, data):
724  """ Forward logging information from serial device into ROS. """
725  msg = Log()
726  msg.deserialize(data)
727  if msg.level == Log.ROSDEBUG:
728  rospy.logdebug(msg.msg)
729  elif msg.level == Log.INFO:
730  rospy.loginfo(msg.msg)
731  elif msg.level == Log.WARN:
732  rospy.logwarn(msg.msg)
733  elif msg.level == Log.ERROR:
734  rospy.logerr(msg.msg)
735  elif msg.level == Log.FATAL:
736  rospy.logfatal(msg.msg)
737 
738  def send(self, topic, msg):
739  """
740  Queues data to be written to the serial port.
741  """
742  self.write_queue.put((topic, msg))
743 
744  def _write(self, data):
745  """
746  Writes raw data over the serial port. Assumes the data is formatting as a packet. http://wiki.ros.org/rosserial/Overview/Protocol
747  """
748  with self.write_lock:
749  self.port.write(data)
750  self.last_write = rospy.Time.now()
751 
752  def _send(self, topic, msg):
753  """
754  Send a message on a particular topic to the device.
755  """
756  length = len(msg)
757  if self.buffer_in > 0 and length > self.buffer_in:
758  rospy.logerr("Message from ROS network dropped: message larger than buffer.\n%s" % msg)
759  return -1
760  else:
761  #modified frame : header(2 bytes) + msg_len(2 bytes) + msg_len_chk(1 byte) + topic_id(2 bytes) + msg(x bytes) + msg_topic_id_chk(1 byte)
762  # second byte of header is protocol version
763  msg_len_checksum = 255 - ( ((length&255) + (length>>8))%256 )
764  msg_checksum = 255 - ( ((topic&255) + (topic>>8) + sum([ord(x) for x in msg]))%256 )
765  data = "\xff" + self.protocol_ver + chr(length&255) + chr(length>>8) + chr(msg_len_checksum) + chr(topic&255) + chr(topic>>8)
766  data = data + msg + chr(msg_checksum)
767  self._write(data)
768  return length
769 
770  def processWriteQueue(self):
771  """
772  Main loop for the thread that processes outgoing data to write to the serial port.
773  """
774  while not rospy.is_shutdown():
775  if self.write_queue.empty():
776  time.sleep(0.01)
777  else:
778  data = self.write_queue.get()
779  while True:
780  try:
781  if isinstance(data, tuple):
782  topic, msg = data
783  self._send(topic, msg)
784  elif isinstance(data, basestring):
785  self._write(data)
786  else:
787  rospy.logerr("Trying to write invalid data type: %s" % type(data))
788  break
789  except SerialTimeoutException as exc:
790  rospy.logerr('Write timeout: %s' % exc)
791  time.sleep(1)
792 
793  def sendDiagnostics(self, level, msg_text):
794  msg = diagnostic_msgs.msg.DiagnosticArray()
795  status = diagnostic_msgs.msg.DiagnosticStatus()
796  status.name = "rosserial_python"
797  msg.header.stamp = rospy.Time.now()
798  msg.status.append(status)
799 
800  status.message = msg_text
801  status.level = level
802 
803  status.values.append(diagnostic_msgs.msg.KeyValue())
804  status.values[0].key="last sync"
805  if self.lastsync.to_sec()>0:
806  status.values[0].value=time.ctime(self.lastsync.to_sec())
807  else:
808  status.values[0].value="never"
809 
810  status.values.append(diagnostic_msgs.msg.KeyValue())
811  status.values[1].key="last sync lost"
812  status.values[1].value=time.ctime(self.lastsync_lost.to_sec())
813 
814  self.pub_diagnostics.publish(msg)
def __init__(self, tcp_portnum, fork_server=False)
def load_pkg_module(package, directory)
Definition: SerialClient.py:64
def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False)
def __init__(self, topic_info, parent)
def __init__(self, topic_info, parent)
def sendDiagnostics(self, level, msg_text)
def load_message(package, message)
Definition: SerialClient.py:79
def load_service(package, service)
Definition: SerialClient.py:84
def __init__(self, topic_info, parent)
def txStopRequest(self, signal, frame)


rosserial_python
Author(s): Michael Ferguson
autogenerated on Fri Jun 7 2019 22:02:51