36 __author__ =
"mferguson@willowgarage.com (Michael Ferguson)" 41 import multiprocessing
48 from Queue
import Queue
50 from serial
import Serial, SerialException, SerialTimeoutException
54 from std_msgs.msg
import Time
55 from rosserial_msgs.msg
import TopicInfo, Log
56 from rosserial_msgs.srv
import RequestParamRequest, RequestParamResponse
58 import diagnostic_msgs.msg
60 ERROR_MISMATCHED_PROTOCOL =
"Mismatched protocol version in packet: lost sync or rosserial_python is from different ros release than the rosserial client" 61 ERROR_NO_SYNC =
"no sync with device" 62 ERROR_PACKET_FAILED =
"Packet Failed : Failed to read msg data" 68 imp.find_module(package)
70 roslib.load_manifest(package)
72 m = __import__( package +
'.' + directory )
74 rospy.logerr(
"Cannot import package : %s"% package )
75 rospy.logerr(
"sys.path was " + str(path) )
81 m2 = getattr(m,
'msg')
82 return getattr(m2, message)
87 srv = getattr(s, service)
88 mreq = getattr(s, service+
"Request")
89 mres = getattr(s, service+
"Response")
94 Publisher forwards messages from the serial device to ROS. 97 """ Create a new publisher. """ 98 self.
topic = topic_info.topic_name
101 package, message = topic_info.message_type.split(
'/')
103 if self.
message._md5sum == topic_info.md5sum:
106 raise Exception(
'Checksum does not match: ' + self.
message._md5sum +
',' + topic_info.md5sum)
109 """ Forward message to ROS network. """ 117 Subscriber forwards messages from ROS to the serial device. 122 self.
id = topic_info.topic_id
126 package, message = topic_info.message_type.split(
'/')
128 if self.
message._md5sum == topic_info.md5sum:
131 raise Exception(
'Checksum does not match: ' + self.
message._md5sum +
',' + topic_info.md5sum)
134 """ Forward message to serial device. """ 135 data_buffer = StringIO.StringIO()
136 msg.serialize(data_buffer)
140 rospy.loginfo(
"Removing subscriber: %s", self.
topic)
145 ServiceServer responds to requests from ROS. 153 package, service = topic_info.message_type.split(
'/')
155 s = getattr(s,
'srv')
156 self.
mreq = getattr(s, service+
"Request")
157 self.
mres = getattr(s, service+
"Response")
158 srv = getattr(s, service)
165 rospy.loginfo(
"Removing service: %s", self.
topic)
169 """ Forward request to serial device. """ 170 data_buffer = StringIO.StringIO()
171 req.serialize(data_buffer)
173 if self.
parent.
send(self.id, data_buffer.getvalue()) >= 0:
179 """ Forward response to ROS network. """ 187 ServiceServer responds to requests from ROS. 195 package, service = topic_info.message_type.split(
'/')
197 s = getattr(s,
'srv')
198 self.
mreq = getattr(s, service+
"Request")
199 self.
mres = getattr(s, service+
"Response")
200 srv = getattr(s, service)
201 rospy.loginfo(
"Starting service client, waiting for service '" + self.
topic +
"'")
202 rospy.wait_for_service(self.
topic)
206 """ Forward request to ROS network. """ 208 req.deserialize(data)
210 resp = self.
proxy(req)
212 data_buffer = StringIO.StringIO()
213 resp.serialize(data_buffer)
214 self.
parent.
send(self.id, data_buffer.getvalue())
218 RosSerialServer waits for a socket connection then passes itself, forked as a 219 new process, to SerialClient which uses it as a serial port. It continues to listen 220 for additional connections. Each forked process is a new ros node, and proxies ros 221 operations (e.g. publish/subscribe) from its connection to the rest of ros. 223 def __init__(self, tcp_portnum, fork_server=False):
224 print(
"Fork_server is: ", fork_server)
230 self.
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
237 print(
"waiting for socket connection")
241 rospy.loginfo(
"Established a socket connection from %s on port %s" % (address))
246 rospy.loginfo(
"Forking a socket server process")
248 process.daemon =
True 250 rospy.loginfo(
"launched startSocketServer")
252 rospy.loginfo(
"calling startSerialClient")
254 rospy.loginfo(
"startSerialClient() exited")
260 except KeyboardInterrupt:
263 rospy.loginfo(
"RuntimeError exception caught")
266 rospy.loginfo(
"socket.error exception caught")
270 for sub
in client.subscribers.values():
272 for srv
in client.services.values():
277 rospy.loginfo(
"starting ROS Serial Python Node serial_node-%r" % (address,))
278 rospy.init_node(
"serial_node_%r" % (address,))
285 if not self.isConnected:
290 while totalsent < length:
291 sent = self.socket.
send(data[totalsent:])
293 raise RuntimeError(
"RosSerialServer.write() socket connection broken")
294 totalsent = totalsent + sent
301 while len(self.
msg) < rqsted_length:
302 chunk = self.
socket.recv(rqsted_length - len(self.
msg))
304 raise RuntimeError(
"RosSerialServer.read() socket connection broken")
305 self.
msg = self.
msg + chunk
310 chunk = self.
socket.recv(1, socket.MSG_DONTWAIT|socket.MSG_PEEK)
312 raise RuntimeError(
"RosSerialServer.inWaiting() socket connection broken")
314 except socket.error
as e:
315 if e.args[0] == errno.EWOULDBLOCK:
321 ServiceServer responds to requests from the serial device. 324 def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False):
325 """ Initialize node, connect to bus, attempt to negotiate topics. """ 342 self.
pub_diagnostics = rospy.Publisher(
'/diagnostics', diagnostic_msgs.msg.DiagnosticArray, queue_size=10)
347 elif hasattr(port,
'read'):
352 while not rospy.is_shutdown():
356 self.
port = Serial(port, baud, timeout=self.
timeout, write_timeout=10, rtscts=
True, dsrdtr=
True)
358 self.
port = Serial(port, baud, timeout=self.
timeout, write_timeout=10)
360 except SerialException
as e:
361 rospy.logerr(
"Error opening serial: %s", e)
364 if rospy.is_shutdown():
403 """ Determine topics to subscribe/publish. """ 404 rospy.loginfo(
'Requesting topics...')
409 self.
port.flushInput()
415 """ send stop tx request to arduino when receive SIGINT(Ctrl-c)""" 418 self.
port.flushInput()
423 rospy.loginfo(
"Send tx stop request")
428 read_start = time.time()
429 bytes_remaining = length
431 while bytes_remaining != 0
and time.time() - read_start < self.
timeout:
433 received = self.
port.read(bytes_remaining)
434 if len(received) != 0:
436 result.extend(received)
437 bytes_remaining -= len(received)
439 if bytes_remaining != 0:
440 raise IOError(
"Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining))
443 except Exception
as e:
444 raise IOError(
"Serial Port read failure: %s" % e)
447 """ Forward recieved messages to appropriate publisher. """ 458 while not rospy.is_shutdown():
461 rospy.logerr(
"Lost sync with device, restarting...")
463 rospy.logerr(
"Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
465 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC)
474 if self.
port.inWaiting() < 1:
480 read_step =
'syncflag' 482 if (flag[0] !=
'\xff'):
486 read_step =
'protocol' 489 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL)
490 rospy.logerr(
"Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1]))
491 protocol_ver_msgs = {
'\xff':
'Rev 0 (rosserial 0.4 and earlier)',
'\xfe':
'Rev 1 (rosserial 0.5+)',
'\xfd':
'Some future rosserial version'}
492 if flag[1]
in protocol_ver_msgs:
493 found_ver_msg =
'Protocol version of client is ' + protocol_ver_msgs[flag[1]]
495 found_ver_msg =
"Protocol version of client is unrecognized" 496 rospy.loginfo(
"%s, expected %s" % (found_ver_msg, protocol_ver_msgs[self.
protocol_ver]))
500 read_step =
'message length' 501 msg_len_bytes = self.
tryRead(2)
502 msg_length, = struct.unpack(
"<h", msg_len_bytes)
505 read_step =
'message length checksum' 507 msg_len_checksum = sum(map(ord, msg_len_bytes)) + ord(msg_len_chk)
510 if msg_len_checksum % 256 != 255:
511 rospy.loginfo(
"wrong checksum for msg length, length %d" %(msg_length))
512 rospy.loginfo(
"chk is %d" % ord(msg_len_chk))
516 read_step =
'topic id' 517 topic_id_header = self.
tryRead(2)
518 topic_id, = struct.unpack(
"<h", topic_id_header)
525 self.
sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_PACKET_FAILED)
526 rospy.loginfo(
"Packet Failed : Failed to read msg data")
527 rospy.loginfo(
"expected msg length is %d", msg_length)
531 read_step =
'data checksum' 533 checksum = sum(map(ord, topic_id_header) ) + sum(map(ord, msg)) + ord(chk)
536 if checksum % 256 == 255:
542 rospy.logerr(
"Tried to publish before configured, topic id %d" % topic_id)
546 rospy.loginfo(
"wrong checksum for topic id and msg")
548 except IOError
as exc:
549 rospy.logwarn(
'Last read step: %s' % read_step)
550 rospy.logwarn(
'Run loop error: %s' % exc)
554 self.
port.flushInput()
556 self.
port.flushOutput()
562 rospy.loginfo(
"Note: publish buffer size is %d bytes" % self.
buffer_out)
567 rospy.loginfo(
"Note: subscribe buffer size is %d bytes" % self.
buffer_in)
570 """ Register a new publisher. """ 573 msg.deserialize(data)
576 self.
callbacks[msg.topic_id] = pub.handlePacket
578 rospy.loginfo(
"Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) )
579 except Exception
as e:
580 rospy.logerr(
"Creation of publisher failed: %s", e)
583 """ Register a new subscriber. """ 586 msg.deserialize(data)
591 rospy.loginfo(
"Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) )
592 elif msg.message_type != self.
subscribers[msg.topic_name].message._type:
593 old_message_type = self.
subscribers[msg.topic_name].message._type
598 rospy.loginfo(
"Change the message type of subscriber on %s from [%s] to [%s]" % (msg.topic_name, old_message_type, msg.message_type) )
599 except Exception
as e:
600 rospy.logerr(
"Creation of subscriber failed: %s", e)
603 """ Register a new service server. """ 606 msg.deserialize(data)
612 rospy.loginfo(
"Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
614 if srv.mres._md5sum == msg.md5sum:
615 self.
callbacks[msg.topic_id] = srv.handlePacket
617 raise Exception(
'Checksum does not match: ' + srv.mres._md5sum +
',' + msg.md5sum)
618 except Exception
as e:
619 rospy.logerr(
"Creation of service server failed: %s", e)
622 """ Register a new service server. """ 625 msg.deserialize(data)
631 rospy.loginfo(
"Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
633 if srv.mreq._md5sum == msg.md5sum:
634 srv.id = msg.topic_id
636 raise Exception(
'Checksum does not match: ' + srv.mreq._md5sum +
',' + msg.md5sum)
637 except Exception
as e:
638 rospy.logerr(
"Creation of service server failed: %s", e)
641 """ Register a new service client. """ 644 msg.deserialize(data)
650 rospy.loginfo(
"Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
652 if srv.mreq._md5sum == msg.md5sum:
653 self.
callbacks[msg.topic_id] = srv.handlePacket
655 raise Exception(
'Checksum does not match: ' + srv.mreq._md5sum +
',' + msg.md5sum)
656 except Exception
as e:
657 rospy.logerr(
"Creation of service client failed: %s", e)
660 """ Register a new service client. """ 663 msg.deserialize(data)
669 rospy.loginfo(
"Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
671 if srv.mres._md5sum == msg.md5sum:
672 srv.id = msg.topic_id
674 raise Exception(
'Checksum does not match: ' + srv.mres._md5sum +
',' + msg.md5sum)
675 except Exception
as e:
676 rospy.logerr(
"Creation of service client failed: %s", e)
679 """ Respond to device with system time. """ 681 t.data = rospy.Time.now()
682 data_buffer = StringIO.StringIO()
683 t.serialize(data_buffer)
684 self.
send( TopicInfo.ID_TIME, data_buffer.getvalue() )
688 """ Send parameters to device. Supports only simple datatypes and arrays of such. """ 689 req = RequestParamRequest()
690 req.deserialize(data)
691 resp = RequestParamResponse()
693 param = rospy.get_param(req.name)
695 rospy.logerr(
"Parameter %s does not exist"%req.name)
699 rospy.logerr(
"Parameter %s does not exist"%req.name)
702 if isinstance(param, dict):
703 rospy.logerr(
"Cannot send param %s because it is a dictionary"%req.name)
705 if not isinstance(param, list):
711 rospy.logerr(
'All Paramers in the list %s must be of the same type'%req.name)
713 if t == int
or t == bool:
719 data_buffer = StringIO.StringIO()
720 resp.serialize(data_buffer)
721 self.
send(TopicInfo.ID_PARAMETER_REQUEST, data_buffer.getvalue())
724 """ Forward logging information from serial device into ROS. """ 726 msg.deserialize(data)
727 if msg.level == Log.ROSDEBUG:
728 rospy.logdebug(msg.msg)
729 elif msg.level == Log.INFO:
730 rospy.loginfo(msg.msg)
731 elif msg.level == Log.WARN:
732 rospy.logwarn(msg.msg)
733 elif msg.level == Log.ERROR:
734 rospy.logerr(msg.msg)
735 elif msg.level == Log.FATAL:
736 rospy.logfatal(msg.msg)
740 Queues data to be written to the serial port. 746 Writes raw data over the serial port. Assumes the data is formatting as a packet. http://wiki.ros.org/rosserial/Overview/Protocol 749 self.
port.write(data)
754 Send a message on a particular topic to the device. 758 rospy.logerr(
"Message from ROS network dropped: message larger than buffer.\n%s" % msg)
763 msg_len_checksum = 255 - ( ((length&255) + (length>>8))%256 )
764 msg_checksum = 255 - ( ((topic&255) + (topic>>8) + sum([ord(x)
for x
in msg]))%256 )
765 data =
"\xff" + self.
protocol_ver + chr(length&255) + chr(length>>8) + chr(msg_len_checksum) + chr(topic&255) + chr(topic>>8)
766 data = data + msg + chr(msg_checksum)
772 Main loop for the thread that processes outgoing data to write to the serial port. 774 while not rospy.is_shutdown():
781 if isinstance(data, tuple):
783 self.
_send(topic, msg)
784 elif isinstance(data, basestring):
787 rospy.logerr(
"Trying to write invalid data type: %s" % type(data))
789 except SerialTimeoutException
as exc:
790 rospy.logerr(
'Write timeout: %s' % exc)
794 msg = diagnostic_msgs.msg.DiagnosticArray()
795 status = diagnostic_msgs.msg.DiagnosticStatus()
796 status.name =
"rosserial_python" 797 msg.header.stamp = rospy.Time.now()
798 msg.status.append(status)
800 status.message = msg_text
803 status.values.append(diagnostic_msgs.msg.KeyValue())
804 status.values[0].key=
"last sync" 806 status.values[0].value=time.ctime(self.
lastsync.to_sec())
808 status.values[0].value=
"never" 810 status.values.append(diagnostic_msgs.msg.KeyValue())
811 status.values[1].key=
"last sync lost" 812 status.values[1].value=time.ctime(self.
lastsync_lost.to_sec())
def __init__(self, tcp_portnum, fork_server=False)
def handleTimeRequest(self, data)
def handleParameterRequest(self, data)
def load_pkg_module(package, directory)
def startSocketServer(self, port, address)
def __init__(self, topic_info)
def setupServiceClientPublisher(self, data)
def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False)
def startSerialClient(self)
def __init__(self, topic_info, parent)
def handlePacket(self, data)
def __init__(self, topic_info, parent)
def setSubscribeSize(self, bytes)
def handleLoggingRequest(self, data)
def sendDiagnostics(self, level, msg_text)
def load_message(package, message)
def setupSubscriber(self, data)
def setupServiceServerPublisher(self, data)
def _send(self, topic, msg)
def handlePacket(self, data)
def processWriteQueue(self)
def load_service(package, service)
def send(self, topic, msg)
def read(self, rqsted_length)
def __init__(self, topic_info, parent)
def handlePacket(self, data)
def setupServiceClientSubscriber(self, data)
def tryRead(self, length)
def txStopRequest(self, signal, frame)
def setPublishSize(self, bytes)
def setupServiceServerSubscriber(self, data)
def setupPublisher(self, data)