34 __author__ =
"mferguson@willowgarage.com (Michael Ferguson)"
40 import multiprocessing
48 from serial
import Serial, SerialException, SerialTimeoutException
52 from std_msgs.msg
import Time
53 from rosserial_msgs.msg
import TopicInfo, Log
54 from rosserial_msgs.srv
import RequestParamRequest, RequestParamResponse
56 import diagnostic_msgs.msg
58 ERROR_MISMATCHED_PROTOCOL =
"Mismatched protocol version in packet: lost sync or rosserial_python is from different ros release than the rosserial client"
59 ERROR_NO_SYNC =
"no sync with device"
60 ERROR_PACKET_FAILED =
"Packet Failed : Failed to read msg data"
66 imp.find_module(package)
68 roslib.load_manifest(package)
70 m = __import__( package +
'.' + directory )
72 rospy.logerr(
"Cannot import package : %s"% package )
73 rospy.logerr(
"sys.path was " + str(path) )
79 m2 = getattr(m,
'msg')
80 return getattr(m2, message)
85 srv = getattr(s, service)
86 mreq = getattr(s, service+
"Request")
87 mres = getattr(s, service+
"Response")
92 Publisher forwards messages from the serial device to ROS.
95 """ Create a new publisher. """
96 self.
topic = topic_info.topic_name
99 package, message = topic_info.message_type.split(
'/')
101 if self.
message._md5sum == topic_info.md5sum:
104 raise Exception(
'Checksum does not match: ' + self.
message._md5sum +
',' + topic_info.md5sum)
107 """ Forward message to ROS network. """
115 Subscriber forwards messages from ROS to the serial device.
120 self.
id = topic_info.topic_id
124 package, message = topic_info.message_type.split(
'/')
126 if self.
message._md5sum == topic_info.md5sum:
129 raise Exception(
'Checksum does not match: ' + self.
message._md5sum +
',' + topic_info.md5sum)
132 """ Forward message to serial device. """
133 data_buffer = io.BytesIO()
134 msg.serialize(data_buffer)
138 rospy.loginfo(
"Removing subscriber: %s", self.
topic)
143 ServiceServer responds to requests from ROS.
151 package, service = topic_info.message_type.split(
'/')
153 s = getattr(s,
'srv')
154 self.
mreq = getattr(s, service+
"Request")
155 self.
mres = getattr(s, service+
"Response")
156 srv = getattr(s, service)
163 rospy.loginfo(
"Removing service: %s", self.
topic)
167 """ Forward request to serial device. """
168 data_buffer = io.BytesIO()
169 req.serialize(data_buffer)
171 self.
parent.
send(self.id, data_buffer.getvalue())
177 """ Forward response to ROS network. """
185 ServiceServer responds to requests from ROS.
193 package, service = topic_info.message_type.split(
'/')
195 s = getattr(s,
'srv')
196 self.
mreq = getattr(s, service+
"Request")
197 self.
mres = getattr(s, service+
"Response")
198 srv = getattr(s, service)
199 rospy.loginfo(
"Starting service client, waiting for service '" + self.
topic +
"'")
200 rospy.wait_for_service(self.
topic)
204 """ Forward request to ROS network. """
206 req.deserialize(data)
208 resp = self.
proxy(req)
210 data_buffer = io.BytesIO()
211 resp.serialize(data_buffer)
212 self.
parent.
send(self.id, data_buffer.getvalue())
216 RosSerialServer waits for a socket connection then passes itself, forked as a
217 new process, to SerialClient which uses it as a serial port. It continues to listen
218 for additional connections. Each forked process is a new ros node, and proxies ros
219 operations (e.g. publish/subscribe) from its connection to the rest of ros.
221 def __init__(self, tcp_portnum, fork_server=False):
222 rospy.loginfo(
"Fork_server is: %s" % fork_server)
228 self.
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
235 rospy.loginfo(
"Waiting for socket connection")
236 while not rospy.is_shutdown():
239 except socket.timeout:
243 rospy.loginfo(
"Established a socket connection from %s on port %s" % address)
248 rospy.loginfo(
"Forking a socket server process")
250 process.daemon =
True
252 rospy.loginfo(
"launched startSocketServer")
254 rospy.loginfo(
"calling startSerialClient")
256 rospy.loginfo(
"startSerialClient() exited")
262 except KeyboardInterrupt:
265 rospy.loginfo(
"RuntimeError exception caught")
268 rospy.loginfo(
"socket.error exception caught")
271 rospy.loginfo(
"Client has exited, closing socket.")
273 for sub
in client.subscribers.values():
275 for srv
in client.services.values():
279 rospy.loginfo(
"starting ROS Serial Python Node serial_node-%r" % address)
280 rospy.init_node(
"serial_node_%r" % address)
287 if not self.isConnected:
292 while totalsent < length:
294 totalsent += self.socket.
send(data[totalsent:])
295 except BrokenPipeError:
296 raise RuntimeError(
"RosSerialServer.write() socket connection broken")
303 while len(self.
msg) < rqsted_length:
304 chunk = self.
socket.recv(rqsted_length - len(self.
msg))
306 raise RuntimeError(
"RosSerialServer.read() socket connection broken")
307 self.
msg = self.
msg + chunk
312 chunk = self.
socket.recv(1, socket.MSG_DONTWAIT|socket.MSG_PEEK)
314 raise RuntimeError(
"RosSerialServer.inWaiting() socket connection broken")
316 except BlockingIOError:
321 ServiceServer responds to requests from the serial device.
327 protocol_ver1 = b
'\xff'
328 protocol_ver2 = b
'\xfe'
329 protocol_ver = protocol_ver2
331 def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False):
332 """ Initialize node, connect to bus, attempt to negotiate topics. """
355 rospy.loginfo(
'shutdown hook activated')
356 rospy.on_shutdown(shutdown)
358 self.
pub_diagnostics = rospy.Publisher(
'/diagnostics', diagnostic_msgs.msg.DiagnosticArray, queue_size=10)
363 elif hasattr(port,
'read'):
368 while not rospy.is_shutdown():
372 self.
port = Serial(port, baud, timeout=self.
timeout, write_timeout=10, rtscts=
True, dsrdtr=
True)
374 self.
port = Serial(port, baud, timeout=self.
timeout, write_timeout=10)
376 except SerialException
as e:
377 rospy.logerr(
"Error opening serial: %s", e)
380 if rospy.is_shutdown():
407 """ Determine topics to subscribe/publish. """
408 rospy.loginfo(
'Requesting topics...')
413 self.
port.flushInput()
419 """ Send stop tx request to client before the node exits. """
422 self.
port.flushInput()
425 rospy.loginfo(
"Sending tx stop request")
429 read_start = time.time()
430 bytes_remaining = length
432 while bytes_remaining != 0
and time.time() - read_start < self.
timeout:
434 received = self.
port.read(bytes_remaining)
435 if len(received) != 0:
437 result.extend(received)
438 bytes_remaining -= len(received)
440 if bytes_remaining != 0:
441 raise IOError(
"Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining))
444 except Exception
as e:
445 raise IOError(
"Serial Port read failure: %s" % e)
448 """ Forward recieved messages to appropriate publisher. """
459 while self.
write_thread.is_alive()
and not rospy.is_shutdown():
462 rospy.logerr(
"Lost sync with device, restarting...")
464 rospy.logerr(
"Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
466 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC)
475 if self.
port.inWaiting() < 1:
481 read_step =
'syncflag'
483 if (flag[0] != self.
header):
487 read_step =
'protocol'
490 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL)
491 rospy.logerr(
"Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1]))
492 protocol_ver_msgs = {
495 b
'\xfd':
'Some future rosserial version'
497 if flag[1]
in protocol_ver_msgs:
498 found_ver_msg =
'Protocol version of client is ' + protocol_ver_msgs[flag[1]]
500 found_ver_msg =
"Protocol version of client is unrecognized"
501 rospy.loginfo(
"%s, expected %s" % (found_ver_msg, protocol_ver_msgs[self.
protocol_ver]))
505 read_step =
'message length'
506 msg_len_bytes = self.
tryRead(3)
507 msg_length, _ = struct.unpack(
"<hB", msg_len_bytes)
510 if sum(array.array(
"B", msg_len_bytes)) % 256 != 255:
511 rospy.loginfo(
"Wrong checksum for msg length, length %d, dropping message." % (msg_length))
515 read_step =
'topic id'
516 topic_id_header = self.
tryRead(2)
517 topic_id, = struct.unpack(
"<H", topic_id_header)
524 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_PACKET_FAILED)
525 rospy.loginfo(
"Packet Failed : Failed to read msg data")
526 rospy.loginfo(
"expected msg length is %d", msg_length)
530 read_step =
'data checksum'
532 checksum = sum(array.array(
'B', topic_id_header + msg + chk))
535 if checksum % 256 == 255:
541 rospy.logerr(
"Tried to publish before configured, topic id %d" % topic_id)
545 rospy.loginfo(
"wrong checksum for topic id and msg")
547 except IOError
as exc:
548 rospy.logwarn(
'Last read step: %s' % read_step)
549 rospy.logwarn(
'Run loop error: %s' % exc)
553 self.
port.flushInput()
555 self.
port.flushOutput()
562 rospy.loginfo(
"Note: publish buffer size is %d bytes" % self.
buffer_out)
567 rospy.loginfo(
"Note: subscribe buffer size is %d bytes" % self.
buffer_in)
570 """ Register a new publisher. """
573 msg.deserialize(data)
576 self.
callbacks[msg.topic_id] = pub.handlePacket
578 rospy.loginfo(
"Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) )
579 except Exception
as e:
580 rospy.logerr(
"Creation of publisher failed: %s", e)
583 """ Register a new subscriber. """
586 msg.deserialize(data)
587 if not msg.topic_name
in list(self.
subscribers.keys()):
591 rospy.loginfo(
"Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) )
592 elif msg.message_type != self.
subscribers[msg.topic_name].message._type:
593 old_message_type = self.
subscribers[msg.topic_name].message._type
598 rospy.loginfo(
"Change the message type of subscriber on %s from [%s] to [%s]" % (msg.topic_name, old_message_type, msg.message_type) )
599 except Exception
as e:
600 rospy.logerr(
"Creation of subscriber failed: %s", e)
603 """ Register a new service server. """
606 msg.deserialize(data)
612 rospy.loginfo(
"Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
614 if srv.mres._md5sum == msg.md5sum:
615 self.
callbacks[msg.topic_id] = srv.handlePacket
617 raise Exception(
'Checksum does not match: ' + srv.mres._md5sum +
',' + msg.md5sum)
618 except Exception
as e:
619 rospy.logerr(
"Creation of service server failed: %s", e)
622 """ Register a new service server. """
625 msg.deserialize(data)
631 rospy.loginfo(
"Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
633 if srv.mreq._md5sum == msg.md5sum:
634 srv.id = msg.topic_id
636 raise Exception(
'Checksum does not match: ' + srv.mreq._md5sum +
',' + msg.md5sum)
637 except Exception
as e:
638 rospy.logerr(
"Creation of service server failed: %s", e)
641 """ Register a new service client. """
644 msg.deserialize(data)
650 rospy.loginfo(
"Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
652 if srv.mreq._md5sum == msg.md5sum:
653 self.
callbacks[msg.topic_id] = srv.handlePacket
655 raise Exception(
'Checksum does not match: ' + srv.mreq._md5sum +
',' + msg.md5sum)
656 except Exception
as e:
657 rospy.logerr(
"Creation of service client failed: %s", e)
660 """ Register a new service client. """
663 msg.deserialize(data)
669 rospy.loginfo(
"Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
671 if srv.mres._md5sum == msg.md5sum:
672 srv.id = msg.topic_id
674 raise Exception(
'Checksum does not match: ' + srv.mres._md5sum +
',' + msg.md5sum)
675 except Exception
as e:
676 rospy.logerr(
"Creation of service client failed: %s", e)
679 """ Respond to device with system time. """
681 t.data = rospy.Time.now()
682 data_buffer = io.BytesIO()
683 t.serialize(data_buffer)
684 self.
send( TopicInfo.ID_TIME, data_buffer.getvalue() )
688 """ Send parameters to device. Supports only simple datatypes and arrays of such. """
689 req = RequestParamRequest()
690 req.deserialize(data)
691 resp = RequestParamResponse()
693 param = rospy.get_param(req.name)
695 rospy.logerr(
"Parameter %s does not exist"%req.name)
699 rospy.logerr(
"Parameter %s does not exist"%req.name)
702 if isinstance(param, dict):
703 rospy.logerr(
"Cannot send param %s because it is a dictionary"%req.name)
705 if not isinstance(param, list):
711 rospy.logerr(
'All Paramers in the list %s must be of the same type'%req.name)
713 if t == int
or t == bool:
719 data_buffer = io.BytesIO()
720 resp.serialize(data_buffer)
721 self.
send(TopicInfo.ID_PARAMETER_REQUEST, data_buffer.getvalue())
724 """ Forward logging information from serial device into ROS. """
726 msg.deserialize(data)
727 if msg.level == Log.ROSDEBUG:
728 rospy.logdebug(msg.msg)
729 elif msg.level == Log.INFO:
730 rospy.loginfo(msg.msg)
731 elif msg.level == Log.WARN:
732 rospy.logwarn(msg.msg)
733 elif msg.level == Log.ERROR:
734 rospy.logerr(msg.msg)
735 elif msg.level == Log.FATAL:
736 rospy.logfatal(msg.msg)
740 Queues data to be written to the serial port.
746 Writes raw data over the serial port. Assumes the data is formatting as a packet. http://wiki.ros.org/rosserial/Overview/Protocol
749 self.
port.write(data)
754 Send a message on a particular topic to the device.
756 length = len(msg_bytes)
758 rospy.logerr(
"Message from ROS network dropped: message larger than buffer.\n%s" % msg)
762 length_bytes = struct.pack(
'<h', length)
763 length_checksum = 255 - (sum(array.array(
'B', length_bytes)) % 256)
764 length_checksum_bytes = struct.pack(
'B', length_checksum)
766 topic_bytes = struct.pack(
'<h', topic)
767 msg_checksum = 255 - (sum(array.array(
'B', topic_bytes + msg_bytes)) % 256)
768 msg_checksum_bytes = struct.pack(
'B', msg_checksum)
770 self.
_write(self.
header + self.
protocol_ver + length_bytes + length_checksum_bytes + topic_bytes + msg_bytes + msg_checksum_bytes)
775 Main loop for the thread that processes outgoing data to write to the serial port.
777 while not rospy.is_shutdown():
784 if isinstance(data, tuple):
786 self.
_send(topic, msg)
787 elif isinstance(data, bytes):
790 rospy.logerr(
"Trying to write invalid data type: %s" % type(data))
792 except SerialTimeoutException
as exc:
793 rospy.logerr(
'Write timeout: %s' % exc)
795 except RuntimeError
as exc:
796 rospy.logerr(
'Write thread exception: %s' % exc)
801 msg = diagnostic_msgs.msg.DiagnosticArray()
802 status = diagnostic_msgs.msg.DiagnosticStatus()
803 status.name =
"rosserial_python"
804 msg.header.stamp = rospy.Time.now()
805 msg.status.append(status)
807 status.message = msg_text
810 status.values.append(diagnostic_msgs.msg.KeyValue())
811 status.values[0].key=
"last sync"
813 status.values[0].value=time.ctime(self.
lastsync.to_sec())
815 status.values[0].value=
"never"
817 status.values.append(diagnostic_msgs.msg.KeyValue())
818 status.values[1].key=
"last sync lost"
819 status.values[1].value=time.ctime(self.
lastsync_lost.to_sec())