00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 __author__ = "mferguson@willowgarage.com (Michael Ferguson)"
00037
00038 import roslib;
00039 import rospy
00040
00041 import thread
00042 import multiprocessing
00043 from serial import *
00044 import StringIO
00045
00046 from std_msgs.msg import Time
00047 from rosserial_msgs.msg import *
00048 from rosserial_msgs.srv import *
00049
00050 import diagnostic_msgs.msg
00051
00052 import socket
00053 import time
00054 import struct
00055
00056 def load_pkg_module(package, directory):
00057
00058 in_path = False
00059 path = sys.path
00060 pkg_src = package+'/src'
00061
00062 for entry in sys.path:
00063 if pkg_src in entry:
00064 in_path = True
00065 if not in_path:
00066 roslib.load_manifest(package)
00067 try:
00068 m = __import__( package + '.' + directory )
00069 except:
00070 rospy.logerr( "Cannot import package : %s"% package )
00071 rospy.logerr( "sys.path was " + str(path) )
00072 return None
00073 return m
00074
00075 def load_message(package, message):
00076 m = load_pkg_module(package, 'msg')
00077 m2 = getattr(m, 'msg')
00078 return getattr(m2, message)
00079
00080 class Publisher:
00081 """
00082 Publisher forwards messages from the serial device to ROS.
00083 """
00084 def __init__(self, topic_info):
00085 """ Create a new publisher. """
00086 self.topic = topic_info.topic_name
00087
00088
00089 package, message = topic_info.message_type.split('/')
00090 self.message = load_message(package, message)
00091 if self.message._md5sum == topic_info.md5sum:
00092 self.publisher = rospy.Publisher(self.topic, self.message)
00093 else:
00094 raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
00095
00096 def handlePacket(self, data):
00097 """ Forward message to ROS network. """
00098 m = self.message()
00099 m.deserialize(data)
00100 self.publisher.publish(m)
00101
00102
00103 class Subscriber:
00104 """
00105 Subscriber forwards messages from ROS to the serial device.
00106 """
00107
00108 def __init__(self, topic_info, parent):
00109 self.topic = topic_info.topic_name
00110 self.id = topic_info.topic_id
00111 self.parent = parent
00112
00113
00114 package, message = topic_info.message_type.split('/')
00115 self.message = load_message(package, message)
00116 if self.message._md5sum == topic_info.md5sum:
00117 rospy.Subscriber(self.topic, self.message, self.callback)
00118 else:
00119 raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
00120
00121 def callback(self, msg):
00122 """ Forward message to serial device. """
00123 data_buffer = StringIO.StringIO()
00124 msg.serialize(data_buffer)
00125 self.parent.send(self.id, data_buffer.getvalue())
00126
00127
00128 class ServiceServer:
00129 """
00130 ServiceServer responds to requests from ROS.
00131 """
00132
00133 def __init__(self, topic_info, parent):
00134 self.topic = topic_info.topic_name
00135 self.parent = parent
00136
00137
00138 package, service = topic_info.message_type.split('/')
00139 s = load_pkg_module(package, 'srv')
00140 s = getattr(s, 'srv')
00141 self.mreq = getattr(s, service+"Request")
00142 self.mres = getattr(s, service+"Response")
00143 srv = getattr(s, service)
00144 self.service = rospy.Service(self.topic, srv, self.callback)
00145
00146
00147 self.data = None
00148
00149 def callback(self, req):
00150 """ Forward request to serial device. """
00151 data_buffer = StringIO.StringIO()
00152 req.serialize(data_buffer)
00153 self.response = None
00154 if self.parent.send(self.id, data_buffer.getvalue()) >= 0:
00155 while self.response == None:
00156 pass
00157 return self.response
00158
00159 def handlePacket(self, data):
00160 """ Forward response to ROS network. """
00161 r = self.mres()
00162 r.deserialize(data)
00163 self.response = r
00164
00165
00166 class ServiceClient:
00167 """
00168 ServiceServer responds to requests from ROS.
00169 """
00170
00171 def __init__(self, topic_info, parent):
00172 self.topic = topic_info.topic_name
00173 self.parent = parent
00174
00175
00176 package, service = topic_info.message_type.split('/')
00177 s = load_pkg_module(package, 'srv')
00178 s = getattr(s, 'srv')
00179 self.mreq = getattr(s, service+"Request")
00180 self.mres = getattr(s, service+"Response")
00181 srv = getattr(s, service)
00182 rospy.loginfo("Starting service client, waiting for service '" + self.topic + "'")
00183 rospy.wait_for_service(self.topic)
00184 self.proxy = rospy.ServiceProxy(self.topic, srv)
00185
00186 def handlePacket(self, data):
00187 """ Forward request to ROS network. """
00188 req = self.mreq()
00189 req.deserialize(data)
00190
00191 resp = self.proxy(req)
00192
00193 data_buffer = StringIO.StringIO()
00194 resp.serialize(data_buffer)
00195 self.parent.send(self.id, data_buffer.getvalue())
00196
00197 class RosSerialServer:
00198 """
00199 RosSerialServer waits for a socket connection then passes itself, forked as a
00200 new process, to SerialClient which uses it as a serial port. It continues to listen
00201 for additional connections. Each forked process is a new ros node, and proxies ros
00202 operations (e.g. publish/subscribe) from its connection to the rest of ros.
00203 """
00204 def __init__(self, tcp_portnum, fork_server=False):
00205 print "Fork_server is: ", fork_server
00206 self.tcp_portnum = tcp_portnum
00207 self.fork_server = fork_server
00208
00209 def listen(self):
00210 self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00211
00212 self.serversocket.bind(("", self.tcp_portnum))
00213 self.serversocket.listen(1)
00214
00215 while True:
00216
00217 print "waiting for socket connection"
00218 (clientsocket, address) = self.serversocket.accept()
00219
00220
00221 rospy.loginfo("Established a socket connection from %s on port %s" % (address))
00222 self.socket = clientsocket
00223 self.isConnected = True
00224
00225 if (self.fork_server == True):
00226 rospy.loginfo("Forking a socket server process")
00227 process = multiprocessing.Process(target=self.startSocketServer, args=(address))
00228 process.daemon = True
00229 process.start()
00230 rospy.loginfo("launched startSocketServer")
00231 else:
00232 rospy.loginfo("calling startSerialClient")
00233 self.startSerialClient()
00234 rospy.loginfo("startSerialClient() exited")
00235
00236 def startSerialClient(self):
00237 client = SerialClient(self)
00238 try:
00239 client.run()
00240 except KeyboardInterrupt:
00241 pass
00242 except RuntimeError:
00243 rospy.loginfo("RuntimeError exception caught")
00244 self.isConnected = False
00245 except socket.error:
00246 rospy.loginfo("socket.error exception caught")
00247 self.isConnected = False
00248 finally:
00249 self.socket.close()
00250
00251
00252 def startSocketServer(self, port, address):
00253 rospy.loginfo("starting ROS Serial Python Node serial_node-%r" % (address,))
00254 rospy.init_node("serial_node_%r" % (address,))
00255 self.startSerialClient()
00256
00257 def flushInput(self):
00258 pass
00259
00260 def write(self, data):
00261 if (self.isConnected == False):
00262 return
00263 length = len(data)
00264 totalsent = 0
00265
00266 while totalsent < length:
00267 sent = self.socket.send(data[totalsent:])
00268 if sent == 0:
00269 raise RuntimeError("RosSerialServer.write() socket connection broken")
00270 totalsent = totalsent + sent
00271
00272 def read(self, rqsted_length):
00273 self.msg = ''
00274 if (self.isConnected == False):
00275 return self.msg
00276
00277 while len(self.msg) < rqsted_length:
00278 chunk = self.socket.recv(rqsted_length - len(self.msg))
00279 if chunk == '':
00280 raise RuntimeError("RosSerialServer.read() socket connection broken")
00281 self.msg = self.msg + chunk
00282 return self.msg
00283
00284 def close(self):
00285 self.port.close()
00286
00287
00288 class SerialClient:
00289 """
00290 ServiceServer responds to requests from the serial device.
00291 """
00292
00293 def __init__(self, port=None, baud=57600, timeout=5.0):
00294 """ Initialize node, connect to bus, attempt to negotiate topics. """
00295 self.mutex = thread.allocate_lock()
00296
00297 self.lastsync = rospy.Time(0)
00298 self.lastsync_lost = rospy.Time(0)
00299 self.timeout = timeout
00300
00301 self.pub_diagnostics = rospy.Publisher('/diagnostics', diagnostic_msgs.msg.DiagnosticArray)
00302
00303 if port== None:
00304
00305 pass
00306 elif hasattr(port, 'read'):
00307
00308 self.port=port
00309 else:
00310
00311 try:
00312 self.port = Serial(port, baud, timeout=self.timeout*0.5)
00313 except SerialException as e:
00314 rospy.logerr("Error opening serial: %s", e)
00315 rospy.signal_shutdown("Error opening serial: %s" % e)
00316 raise SystemExit
00317
00318 self.port.timeout = 0.01
00319
00320 time.sleep(0.1)
00321
00322 self.publishers = dict()
00323 self.subscribers = dict()
00324 self.services = dict()
00325
00326 self.buffer_out = -1
00327 self.buffer_in = -1
00328
00329 self.callbacks = dict()
00330
00331 self.callbacks[TopicInfo.ID_PUBLISHER] = self.setupPublisher
00332 self.callbacks[TopicInfo.ID_SUBSCRIBER] = self.setupSubscriber
00333
00334 self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_PUBLISHER] = self.setupServiceServerPublisher
00335 self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_SUBSCRIBER] = self.setupServiceServerSubscriber
00336 self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_PUBLISHER] = self.setupServiceClientPublisher
00337 self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_SUBSCRIBER] = self.setupServiceClientSubscriber
00338
00339 self.callbacks[TopicInfo.ID_PARAMETER_REQUEST] = self.handleParameterRequest
00340 self.callbacks[TopicInfo.ID_LOG] = self.handleLoggingRequest
00341 self.callbacks[TopicInfo.ID_TIME] = self.handleTimeRequest
00342
00343 rospy.sleep(2.0)
00344 self.requestTopics()
00345 self.lastsync = rospy.Time.now()
00346
00347 def requestTopics(self):
00348 """ Determine topics to subscribe/publish. """
00349 self.port.flushInput()
00350
00351 self.port.write("\xff\xff\x00\x00\x00\x00\xff")
00352
00353 def run(self):
00354 """ Forward recieved messages to appropriate publisher. """
00355 data = ''
00356 while not rospy.is_shutdown():
00357 if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3):
00358 rospy.logerr("Lost sync with device, restarting...")
00359 self.lastsync_lost = rospy.Time.now()
00360 self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, "no sync with device")
00361 self.requestTopics()
00362 self.lastsync = rospy.Time.now()
00363
00364 flag = [0,0]
00365 flag[0] = self.port.read(1)
00366 if (flag[0] != '\xff'):
00367 continue
00368 flag[1] = self.port.read(1)
00369 if ( flag[1] != '\xff'):
00370 self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Failed Packet Flags")
00371 rospy.loginfo("Failed Packet Flags ")
00372 continue
00373
00374 header = self.port.read(4)
00375 if (len(header) != 4):
00376
00377 continue
00378
00379 topic_id, msg_length = struct.unpack("<hh", header)
00380 msg = self.port.read(msg_length)
00381 if (len(msg) != msg_length):
00382 self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Packet Failed : Failed to read msg data")
00383 rospy.loginfo("Packet Failed : Failed to read msg data")
00384
00385 continue
00386 chk = self.port.read(1)
00387 checksum = sum(map(ord,header) ) + sum(map(ord, msg)) + ord(chk)
00388
00389 if checksum%256 == 255:
00390 try:
00391 self.callbacks[topic_id](msg)
00392 except KeyError:
00393 rospy.logerr("Tried to publish before configured, topic id %d" % topic_id)
00394 rospy.sleep(0.001)
00395
00396 def setPublishSize(self, bytes):
00397 if self.buffer_out < 0:
00398 self.buffer_out = bytes
00399 rospy.loginfo("Note: publish buffer size is %d bytes" % self.buffer_out)
00400
00401 def setSubscribeSize(self, bytes):
00402 if self.buffer_in < 0:
00403 self.buffer_in = bytes
00404 rospy.loginfo("Note: subscribe buffer size is %d bytes" % self.buffer_in)
00405
00406 def setupPublisher(self, data):
00407 """ Register a new publisher. """
00408 try:
00409 msg = TopicInfo()
00410 msg.deserialize(data)
00411 pub = Publisher(msg)
00412 self.publishers[msg.topic_id] = pub
00413 self.callbacks[msg.topic_id] = pub.handlePacket
00414 self.setPublishSize(msg.buffer_size)
00415 rospy.loginfo("Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) )
00416 except Exception as e:
00417 rospy.logerr("Creation of publisher failed: %s", e)
00418
00419 def setupSubscriber(self, data):
00420 """ Register a new subscriber. """
00421 try:
00422 msg = TopicInfo()
00423 msg.deserialize(data)
00424 sub = Subscriber(msg, self)
00425 self.subscribers[msg.topic_name] = sub
00426 self.setSubscribeSize(msg.buffer_size)
00427 rospy.loginfo("Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) )
00428 except Exception as e:
00429 rospy.logerr("Creation of subscriber failed: %s", e)
00430
00431 def setupServiceServerPublisher(self, data):
00432 """ Register a new service server. """
00433 try:
00434 msg = TopicInfo()
00435 msg.deserialize(data)
00436 self.setPublishSize(msg.buffer_size)
00437 try:
00438 srv = self.services[msg.topic_name]
00439 except:
00440 srv = ServiceServer(msg, self)
00441 rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
00442 self.services[msg.topic_name] = srv
00443 if srv.mres._md5sum == msg.md5sum:
00444 self.callbacks[msg.topic_id] = srv.handlePacket
00445 else:
00446 raise Exception('Checksum does not match: ' + srv.mres._md5sum + ',' + msg.md5sum)
00447 except Exception as e:
00448 rospy.logerr("Creation of service server failed: %s", e)
00449 def setupServiceServerSubscriber(self, data):
00450 """ Register a new service server. """
00451 try:
00452 msg = TopicInfo()
00453 msg.deserialize(data)
00454 self.setSubscribeSize(msg.buffer_size)
00455 try:
00456 srv = self.services[msg.topic_name]
00457 except:
00458 srv = ServiceServer(msg, self)
00459 rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
00460 self.services[msg.topic_name] = srv
00461 if srv.mreq._md5sum == msg.md5sum:
00462 srv.id = msg.topic_id
00463 else:
00464 raise Exception('Checksum does not match: ' + srv.mreq._md5sum + ',' + msg.md5sum)
00465 except Exception as e:
00466 rospy.logerr("Creation of service server failed: %s", e)
00467
00468 def setupServiceClientPublisher(self, data):
00469 """ Register a new service client. """
00470 try:
00471 msg = TopicInfo()
00472 msg.deserialize(data)
00473 self.setPublishSize(msg.buffer_size)
00474 try:
00475 srv = self.services[msg.topic_name]
00476 except:
00477 srv = ServiceClient(msg, self)
00478 rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
00479 self.services[msg.topic_name] = srv
00480 if srv.mreq._md5sum == msg.md5sum:
00481 self.callbacks[msg.topic_id] = srv.handlePacket
00482 else:
00483 raise Exception('Checksum does not match: ' + srv.mreq._md5sum + ',' + msg.md5sum)
00484 except Exception as e:
00485 rospy.logerr("Creation of service client failed: %s", e)
00486 def setupServiceClientSubscriber(self, data):
00487 """ Register a new service client. """
00488 try:
00489 msg = TopicInfo()
00490 msg.deserialize(data)
00491 self.setSubscribeSize(msg.buffer_size)
00492 try:
00493 srv = self.services[msg.topic_name]
00494 except:
00495 srv = ServiceClient(msg, self)
00496 rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
00497 self.services[msg.topic_name] = srv
00498 if srv.mres._md5sum == msg.md5sum:
00499 srv.id = msg.topic_id
00500 else:
00501 raise Exception('Checksum does not match: ' + srv.mres._md5sum + ',' + msg.md5sum)
00502 except Exception as e:
00503 rospy.logerr("Creation of service client failed: %s", e)
00504
00505 def handleTimeRequest(self, data):
00506 """ Respond to device with system time. """
00507 t = Time()
00508 t.data = rospy.Time.now()
00509 data_buffer = StringIO.StringIO()
00510 t.serialize(data_buffer)
00511 self.send( TopicInfo.ID_TIME, data_buffer.getvalue() )
00512 self.lastsync = rospy.Time.now()
00513
00514 def handleParameterRequest(self, data):
00515 """ Send parameters to device. Supports only simple datatypes and arrays of such. """
00516 req = RequestParamRequest()
00517 req.deserialize(data)
00518 resp = RequestParamResponse()
00519 try:
00520 param = rospy.get_param(req.name)
00521 except KeyError:
00522 rospy.logerr("Parameter %s does not exist"%req.name)
00523 return
00524
00525 if param == None:
00526 rospy.logerr("Parameter %s does not exist"%req.name)
00527 return
00528
00529 if (type(param) == dict):
00530 rospy.logerr("Cannot send param %s because it is a dictionary"%req.name)
00531 return
00532 if (type(param) != list):
00533 param = [param]
00534
00535 t = type(param[0])
00536 for p in param:
00537 if t!= type(p):
00538 rospy.logerr('All Paramers in the list %s must be of the same type'%req.name)
00539 return
00540 if (t == int):
00541 resp.ints= param
00542 if (t == float):
00543 resp.floats=param
00544 if (t == str):
00545 resp.strings = param
00546 data_buffer = StringIO.StringIO()
00547 resp.serialize(data_buffer)
00548 self.send(TopicInfo.ID_PARAMETER_REQUEST, data_buffer.getvalue())
00549
00550 def handleLoggingRequest(self, data):
00551 """ Forward logging information from serial device into ROS. """
00552 msg = Log()
00553 msg.deserialize(data)
00554 if (msg.level == Log.ROSDEBUG):
00555 rospy.logdebug(msg.msg)
00556 elif(msg.level== Log.INFO):
00557 rospy.loginfo(msg.msg)
00558 elif(msg.level== Log.WARN):
00559 rospy.logwarn(msg.msg)
00560 elif(msg.level== Log.ERROR):
00561 rospy.logerr(msg.msg)
00562 elif(msg.level==Log.FATAL):
00563 rospy.logfatal(msg.msg)
00564
00565 def send(self, topic, msg):
00566 """ Send a message on a particular topic to the device. """
00567 with self.mutex:
00568 length = len(msg)
00569 if self.buffer_in > 0 and length > self.buffer_in:
00570 rospy.logerr("Message from ROS network dropped: message larger than buffer.")
00571 print msg
00572 return -1
00573 else:
00574 checksum = 255 - ( ((topic&255) + (topic>>8) + (length&255) + (length>>8) + sum([ord(x) for x in msg]))%256 )
00575 data = '\xff\xff'+ chr(topic&255) + chr(topic>>8) + chr(length&255) + chr(length>>8)
00576 data = data + msg + chr(checksum)
00577 self.port.write(data)
00578 return length
00579
00580 def sendDiagnostics(self, level, msg_text):
00581 msg = diagnostic_msgs.msg.DiagnosticArray()
00582 status = diagnostic_msgs.msg.DiagnosticStatus()
00583 status.name = "rosserial_python"
00584 msg.header.stamp = rospy.Time.now()
00585 msg.status.append(status)
00586
00587 status.message = msg_text
00588 status.level = level
00589
00590 status.values.append(diagnostic_msgs.msg.KeyValue())
00591 status.values[0].key="last sync"
00592 if self.lastsync.to_sec()>0:
00593 status.values[0].value=time.ctime(self.lastsync.to_sec())
00594 else:
00595 status.values[0].value="never"
00596
00597 status.values.append(diagnostic_msgs.msg.KeyValue())
00598 status.values[1].key="last sync lost"
00599 status.values[1].value=time.ctime(self.lastsync_lost.to_sec())
00600
00601 self.pub_diagnostics.publish(msg)
00602