00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 __author__ = "mferguson@willowgarage.com (Michael Ferguson)"
00037
00038 import roslib; roslib.load_manifest("rosserial_python")
00039 import rospy
00040
00041 import thread
00042 import multiprocessing
00043 from serial import *
00044 import StringIO
00045
00046 from std_msgs.msg import Time
00047 from rosserial_msgs.msg import *
00048 from rosserial_msgs.srv import *
00049
00050 import socket
00051 import time
00052 import struct
00053
00054 def load_pkg_module(package, directory):
00055
00056 in_path = False
00057 path = sys.path
00058 pkg_src = package+'/src'
00059
00060 for entry in sys.path:
00061 if pkg_src in entry:
00062 in_path = True
00063 if not in_path:
00064 roslib.load_manifest(package)
00065 try:
00066 m = __import__( package + '.' + directory )
00067 except:
00068 rospy.logerr( "Cannot import package : %s"% package )
00069 rospy.logerr( "sys.path was " + str(path) )
00070 return None
00071 return m
00072
00073 def load_message(package, message):
00074 m = load_pkg_module(package, 'msg')
00075 m2 = getattr(m, 'msg')
00076 return getattr(m2, message)
00077
00078 class Publisher:
00079 """
00080 Publisher forwards messages from the serial device to ROS.
00081 """
00082 def __init__(self, topic_info):
00083 """ Create a new publisher. """
00084 self.topic = topic_info.topic_name
00085
00086
00087 package, message = topic_info.message_type.split('/')
00088 self.message = load_message(package, message)
00089 if self.message._md5sum == topic_info.md5sum:
00090 self.publisher = rospy.Publisher(self.topic, self.message)
00091 else:
00092 raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
00093
00094 def handlePacket(self, data):
00095 """ Forward message to ROS network. """
00096 m = self.message()
00097 m.deserialize(data)
00098 self.publisher.publish(m)
00099
00100
00101 class Subscriber:
00102 """
00103 Subscriber forwards messages from ROS to the serial device.
00104 """
00105
00106 def __init__(self, topic_info, parent):
00107 self.topic = topic_info.topic_name
00108 self.id = topic_info.topic_id
00109 self.parent = parent
00110
00111
00112 package, message = topic_info.message_type.split('/')
00113 self.message = load_message(package, message)
00114 if self.message._md5sum == topic_info.md5sum:
00115 rospy.Subscriber(self.topic, self.message, self.callback)
00116 else:
00117 raise Exception('Checksum does not match: ' + self.message._md5sum + ',' + topic_info.md5sum)
00118
00119 def callback(self, msg):
00120 """ Forward message to serial device. """
00121 data_buffer = StringIO.StringIO()
00122 msg.serialize(data_buffer)
00123 self.parent.send(self.id, data_buffer.getvalue())
00124
00125
00126 class ServiceServer:
00127 """
00128 ServiceServer responds to requests from ROS.
00129 """
00130
00131 def __init__(self, topic_info, parent):
00132 self.topic = topic_info.topic_name
00133 self.parent = parent
00134
00135
00136 package, service = topic_info.message_type.split('/')
00137 s = load_pkg_module(package, 'srv')
00138 s = getattr(s, 'srv')
00139 self.mreq = getattr(s, service+"Request")
00140 self.mres = getattr(s, service+"Response")
00141 srv = getattr(s, service)
00142 self.service = rospy.Service(self.topic, srv, self.callback)
00143
00144
00145 self.data = None
00146
00147 def callback(self, req):
00148 """ Forward request to serial device. """
00149 data_buffer = StringIO.StringIO()
00150 req.serialize(data_buffer)
00151 self.response = None
00152 if self.parent.send(self.id, data_buffer.getvalue()) > 0:
00153 while self.response == None:
00154 pass
00155 return self.response
00156
00157 def handlePacket(self, data):
00158 """ Forward response to ROS network. """
00159 r = self.mres()
00160 r.deserialize(data)
00161 self.response = r
00162
00163
00164 class ServiceClient:
00165 """
00166 ServiceServer responds to requests from ROS.
00167 """
00168
00169 def __init__(self, topic_info, parent):
00170 self.topic = topic_info.topic_name
00171 self.parent = parent
00172
00173
00174 package, service = topic_info.message_type.split('/')
00175 s = load_pkg_module(package, 'srv')
00176 s = getattr(s, 'srv')
00177 self.mreq = getattr(s, service+"Request")
00178 self.mres = getattr(s, service+"Response")
00179 srv = getattr(s, service)
00180 rospy.loginfo("Starting service client, waiting for service '" + self.topic + "'")
00181 rospy.wait_for_service(self.topic)
00182 self.proxy = rospy.ServiceProxy(self.topic, srv)
00183
00184 def handlePacket(self, data):
00185 """ Forward request to ROS network. """
00186 req = self.mreq()
00187 req.deserialize(data)
00188
00189 resp = self.proxy(req)
00190
00191 data_buffer = StringIO.StringIO()
00192 resp.serialize(data_buffer)
00193 self.parent.send(self.id, data_buffer.getvalue())
00194
00195 class RosSerialServer:
00196 """
00197 RosSerialServer waits for a socket connection then passes itself, forked as a
00198 new process, to SerialClient which uses it as a serial port. It continues to listen
00199 for additional connections. Each forked process is a new ros node, and proxies ros
00200 operations (e.g. publish/subscribe) from its connection to the rest of ros.
00201 """
00202 def __init__(self, tcp_portnum, fork_server=False):
00203 print "Fork_server is: ", fork_server
00204 self.tcp_portnum = tcp_portnum
00205 self.fork_server = fork_server
00206
00207 def listen(self):
00208 self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00209
00210 self.serversocket.bind(("", self.tcp_portnum))
00211 self.serversocket.listen(1)
00212
00213 while True:
00214
00215 print "waiting for socket connection"
00216 (clientsocket, address) = self.serversocket.accept()
00217
00218
00219 rospy.loginfo("Established a socket connection from %s on port %s" % (address))
00220 self.socket = clientsocket
00221 self.isConnected = True
00222
00223 if (self.fork_server == True):
00224 rospy.loginfo("Forking a socket server process")
00225 process = multiprocessing.Process(target=self.startSocketServer, args=(address))
00226 process.daemon = True
00227 process.start()
00228 rospy.loginfo("launched startSocketServer")
00229 else:
00230 rospy.init_node("serial_node")
00231 rospy.loginfo("ROS Serial Python Node")
00232 rospy.loginfo("calling startSerialClient")
00233 self.startSerialClient()
00234 rospy.loginfo("startSerialClient() exited")
00235
00236 def startSerialClient(self):
00237 client = SerialClient(self)
00238 try:
00239 client.run()
00240 except KeyboardInterrupt:
00241 pass
00242 except RuntimeError:
00243 rospy.loginfo("RuntimeError exception caught")
00244 self.isConnected = False
00245 except socket.error:
00246 rospy.loginfo("socket.error exception caught")
00247 self.isConnected = False
00248 finally:
00249 self.socket.close()
00250
00251
00252 def startSocketServer(self, port, address):
00253 rospy.loginfo("starting ROS Serial Python Node serial_node-%r" % (address,))
00254 rospy.init_node("serial_node_%r" % (address,))
00255 self.startSerialClient()
00256
00257 def flushInput(self):
00258 pass
00259
00260 def write(self, data):
00261 if (self.isConnected == False):
00262 return
00263 length = len(data)
00264 totalsent = 0
00265
00266 while totalsent < length:
00267 sent = self.socket.send(data[totalsent:])
00268 if sent == 0:
00269 raise RuntimeError("RosSerialServer.write() socket connection broken")
00270 totalsent = totalsent + sent
00271
00272 def read(self, rqsted_length):
00273 self.msg = ''
00274 if (self.isConnected == False):
00275 return self.msg
00276
00277 while len(self.msg) < rqsted_length:
00278 chunk = self.socket.recv(rqsted_length - len(self.msg))
00279 if chunk == '':
00280 raise RuntimeError("RosSerialServer.read() socket connection broken")
00281 self.msg = self.msg + chunk
00282 return self.msg
00283
00284 def close(self):
00285 self.port.close()
00286
00287
00288 class SerialClient:
00289 """
00290 ServiceServer responds to requests from the serial device.
00291 """
00292
00293 def __init__(self, port=None, baud=57600, timeout=5.0):
00294 """ Initialize node, connect to bus, attempt to negotiate topics. """
00295 self.mutex = thread.allocate_lock()
00296
00297 self.lastsync = rospy.Time.now()
00298 self.timeout = timeout
00299
00300
00301 if port== None:
00302
00303 pass
00304 elif hasattr(port, 'read'):
00305
00306 self.port=port
00307 else:
00308
00309 self.port = Serial(port, baud, timeout=self.timeout*0.5)
00310
00311 self.port.timeout = 0.01
00312
00313 time.sleep(0.1)
00314
00315 self.publishers = dict()
00316 self.subscribers = dict()
00317 self.services = dict()
00318
00319 self.buffer_out = -1
00320 self.buffer_in = -1
00321
00322 self.callbacks = dict()
00323
00324 self.callbacks[TopicInfo.ID_PUBLISHER] = self.setupPublisher
00325 self.callbacks[TopicInfo.ID_SUBSCRIBER] = self.setupSubscriber
00326
00327 self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_PUBLISHER] = self.setupServiceServerPublisher
00328 self.callbacks[TopicInfo.ID_SERVICE_SERVER+TopicInfo.ID_SUBSCRIBER] = self.setupServiceServerSubscriber
00329 self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_PUBLISHER] = self.setupServiceClientPublisher
00330 self.callbacks[TopicInfo.ID_SERVICE_CLIENT+TopicInfo.ID_SUBSCRIBER] = self.setupServiceClientSubscriber
00331
00332 self.callbacks[TopicInfo.ID_PARAMETER_REQUEST] = self.handleParameterRequest
00333 self.callbacks[TopicInfo.ID_LOG] = self.handleLoggingRequest
00334 self.callbacks[TopicInfo.ID_TIME] = self.handleTimeRequest
00335
00336 rospy.sleep(2.0)
00337 self.requestTopics()
00338
00339 def requestTopics(self):
00340 """ Determine topics to subscribe/publish. """
00341 self.port.flushInput()
00342
00343 self.port.write("\xff\xff\x00\x00\x00\x00\xff")
00344
00345 def run(self):
00346 """ Forward recieved messages to appropriate publisher. """
00347 data = ''
00348 while not rospy.is_shutdown():
00349 if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3):
00350 rospy.logerr("Lost sync with device, restarting...")
00351 self.requestTopics()
00352 self.lastsync = rospy.Time.now()
00353
00354 flag = [0,0]
00355
00356 flag[0] = self.port.read(1)
00357 if (flag[0] != '\xff'):
00358 continue
00359 flag[1] = self.port.read(1)
00360 if ( flag[1] != '\xff'):
00361 rospy.loginfo("Failed Packet Flags ")
00362 continue
00363
00364 header = self.port.read(4)
00365 if (len(header) != 4):
00366
00367 continue
00368
00369 topic_id, msg_length = struct.unpack("<hh", header)
00370 msg = self.port.read(msg_length)
00371 if (len(msg) != msg_length):
00372 rospy.loginfo("Packet Failed : Failed to read msg data")
00373
00374 continue
00375 chk = self.port.read(1)
00376 checksum = sum(map(ord,header) ) + sum(map(ord, msg)) + ord(chk)
00377
00378 if checksum%256 == 255:
00379 try:
00380 self.callbacks[topic_id](msg)
00381 except KeyError:
00382 rospy.logerr("Tried to publish before configured, topic id %d" % topic_id)
00383 rospy.sleep(0.001)
00384
00385 def setPublishSize(self, bytes):
00386 if self.buffer_out < 0:
00387 self.buffer_out = bytes
00388 rospy.loginfo("Note: publish buffer size is %d bytes" % self.buffer_out)
00389
00390 def setSubscribeSize(self, bytes):
00391 if self.buffer_in < 0:
00392 self.buffer_in = bytes
00393 rospy.loginfo("Note: subscribe buffer size is %d bytes" % self.buffer_in)
00394
00395 def setupPublisher(self, data):
00396 """ Register a new publisher. """
00397 try:
00398 msg = TopicInfo()
00399 msg.deserialize(data)
00400 pub = Publisher(msg)
00401 self.publishers[msg.topic_id] = pub
00402 self.callbacks[msg.topic_id] = pub.handlePacket
00403 self.setPublishSize(msg.buffer_size)
00404 rospy.loginfo("Setup publisher on %s [%s]" % (msg.topic_name, msg.message_type) )
00405 except Exception as e:
00406 rospy.logerr("Creation of publisher failed: %s", e)
00407
00408 def setupSubscriber(self, data):
00409 """ Register a new subscriber. """
00410 try:
00411 msg = TopicInfo()
00412 msg.deserialize(data)
00413 sub = Subscriber(msg, self)
00414 self.subscribers[msg.topic_name] = sub
00415 self.setSubscribeSize(msg.buffer_size)
00416 rospy.loginfo("Setup subscriber on %s [%s]" % (msg.topic_name, msg.message_type) )
00417 except Exception as e:
00418 rospy.logerr("Creation of subscriber failed: %s", e)
00419
00420 def setupServiceServerPublisher(self, data):
00421 """ Register a new service server. """
00422 try:
00423 msg = TopicInfo()
00424 msg.deserialize(data)
00425 self.setPublishSize(msg.buffer_size)
00426 try:
00427 srv = self.services[msg.topic_name]
00428 except:
00429 srv = ServiceServer(msg, self)
00430 rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
00431 self.services[msg.topic_name] = srv
00432 if srv.mres._md5sum == msg.md5sum:
00433 self.callbacks[msg.topic_id] = srv.handlePacket
00434 else:
00435 raise Exception('Checksum does not match: ' + srv.res._md5sum + ',' + msg.md5sum)
00436 except Exception as e:
00437 rospy.logerr("Creation of service server failed: %s", e)
00438 def setupServiceServerSubscriber(self, data):
00439 """ Register a new service server. """
00440 try:
00441 msg = TopicInfo()
00442 msg.deserialize(data)
00443 self.setSubscribeSize(msg.buffer_size)
00444 try:
00445 srv = self.services[msg.topic_name]
00446 except:
00447 srv = ServiceServer(msg, self)
00448 rospy.loginfo("Setup service server on %s [%s]" % (msg.topic_name, msg.message_type) )
00449 self.services[msg.topic_name] = srv
00450 if srv.mreq._md5sum == msg.md5sum:
00451 srv.id = msg.topic_id
00452 else:
00453 raise Exception('Checksum does not match: ' + srv.req._md5sum + ',' + msg.md5sum)
00454 except Exception as e:
00455 rospy.logerr("Creation of service server failed: %s", e)
00456
00457 def setupServiceClientPublisher(self, data):
00458 """ Register a new service client. """
00459 try:
00460 msg = TopicInfo()
00461 msg.deserialize(data)
00462 self.setPublishSize(msg.buffer_size)
00463 try:
00464 srv = self.services[msg.topic_name]
00465 except:
00466 srv = ServiceClient(msg, self)
00467 rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
00468 self.services[msg.topic_name] = srv
00469 if srv.mreq._md5sum == msg.md5sum:
00470 self.callbacks[msg.topic_id] = srv.handlePacket
00471 else:
00472 raise Exception('Checksum does not match: ' + srv.req._md5sum + ',' + msg.md5sum)
00473 except Exception as e:
00474 rospy.logerr("Creation of service client failed: %s", e)
00475 def setupServiceClientSubscriber(self, data):
00476 """ Register a new service client. """
00477 try:
00478 msg = TopicInfo()
00479 msg.deserialize(data)
00480 self.setSubscribeSize(msg.buffer_size)
00481 try:
00482 srv = self.services[msg.topic_name]
00483 except:
00484 srv = ServiceClient(msg, self)
00485 rospy.loginfo("Setup service client on %s [%s]" % (msg.topic_name, msg.message_type) )
00486 self.services[msg.topic_name] = srv
00487 if srv.mres._md5sum == msg.md5sum:
00488 srv.id = msg.topic_id
00489 else:
00490 raise Exception('Checksum does not match: ' + srv.res._md5sum + ',' + msg.md5sum)
00491 except Exception as e:
00492 rospy.logerr("Creation of service client failed: %s", e)
00493
00494 def handleTimeRequest(self, data):
00495 """ Respond to device with system time. """
00496 t = Time()
00497 t.data = rospy.Time.now()
00498 data_buffer = StringIO.StringIO()
00499 t.serialize(data_buffer)
00500 self.send( TopicInfo.ID_TIME, data_buffer.getvalue() )
00501 self.lastsync = rospy.Time.now()
00502
00503 def handleParameterRequest(self, data):
00504 """ Send parameters to device. Supports only simple datatypes and arrays of such. """
00505 req = RequestParamRequest()
00506 req.deserialize(data)
00507 resp = RequestParamResponse()
00508 param = rospy.get_param(req.name)
00509 if param == None:
00510 rospy.logerr("Parameter %s does not exist"%req.name)
00511 return
00512 if (type(param) == dict):
00513 rospy.logerr("Cannot send param %s because it is a dictionary"%req.name)
00514 return
00515 if (type(param) != list):
00516 param = [param]
00517
00518 t = type(param[0])
00519 for p in param:
00520 if t!= type(p):
00521 rospy.logerr('All Paramers in the list %s must be of the same type'%req.name)
00522 return
00523 if (t == int):
00524 resp.ints= param
00525 if (t == float):
00526 resp.floats=param
00527 if (t == str):
00528 resp.strings = param
00529 print resp
00530 data_buffer = StringIO.StringIO()
00531 resp.serialize(data_buffer)
00532 self.send(TopicInfo.ID_PARAMETER_REQUEST, data_buffer.getvalue())
00533
00534 def handleLoggingRequest(self, data):
00535 """ Forward logging information from serial device into ROS. """
00536 msg = Log()
00537 msg.deserialize(data)
00538 if (msg.level == Log.DEBUG):
00539 rospy.logdebug(msg.msg)
00540 elif(msg.level== Log.INFO):
00541 rospy.loginfo(msg.msg)
00542 elif(msg.level== Log.WARN):
00543 rospy.logwarn(msg.msg)
00544 elif(msg.level== Log.ERROR):
00545 rospy.logerr(msg.msg)
00546 elif(msg.level==Log.FATAL):
00547 rospy.logfatal(msg.msg)
00548
00549 def send(self, topic, msg):
00550 """ Send a message on a particular topic to the device. """
00551 with self.mutex:
00552 length = len(msg)
00553 if self.buffer_in > 0 and length > self.buffer_in:
00554 rospy.logerr("Message from ROS network dropped: message larger than buffer.")
00555 print msg
00556 return -1
00557 else:
00558 checksum = 255 - ( ((topic&255) + (topic>>8) + (length&255) + (length>>8) + sum([ord(x) for x in msg]))%256 )
00559 data = '\xff\xff'+ chr(topic&255) + chr(topic>>8) + chr(length&255) + chr(length>>8)
00560 data = data + msg + chr(checksum)
00561 self.port.write(data)
00562 return length
00563