00001
00002 """
00003 usage: %(progname)s [--debug]
00004 """
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 PKG_NAME = 'rosweb'
00041 import roslib; roslib.load_manifest(PKG_NAME)
00042
00043 import getopt
00044 import cStringIO
00045 import os
00046 import signal
00047 import string
00048 import sys
00049 import threading
00050 import time
00051 import gzip
00052 import traceback
00053 import BaseHTTPServer
00054
00055
00056 import BaseHTTPServer, select, socket, SocketServer, urlparse
00057 SocketServer.ThreadingMixIn.daemon_threads = True
00058
00059 import Cookie
00060 import cgi
00061
00062 import roslib.scriptutil
00063 import rospy
00064 import rosservice
00065 import rosjson
00066
00067 from rosservice import ROSServiceException
00068 from genpy.message import MessageException
00069 from roslib import rosenv
00070
00071 import roslib.message
00072 import roslib.rospack
00073
00074 class ROSWebException(Exception): pass
00075
00076 def splitTopic(topic):
00077 parts = topic.split(":", 2)
00078 topic = parts[0]
00079 subtopic = None
00080 params = None
00081 if len(parts) >= 2:
00082 subtopic = parts[1]
00083 if len(parts) >= 3:
00084 params = parts[2]
00085 return topic, subtopic, params
00086
00087
00088
00089 class RWTFactory(object):
00090 def __init__(self, webserver):
00091 self.map = {}
00092 self.lock = threading.Lock()
00093
00094 self.webserver = webserver
00095
00096 self.counter = 1
00097 self.cond = threading.Condition()
00098
00099 def close(self):
00100 try:
00101 self.lock.acquire()
00102 for topic in self.map.keys():
00103 rwt = self.map[topic]
00104 rwt.factory = None
00105 rwt = None
00106 del self.map[topic]
00107 finally:
00108 self.lock.release()
00109
00110 def unsubscribe(self, topic):
00111 try:
00112 self.lock.acquire()
00113 if topic in self.map:
00114 del self.map[topic]
00115 finally:
00116 self.lock.release()
00117
00118 def subscribe(self, topic):
00119 maintopic, subtopic, params = splitTopic(topic)
00120 main_rwt = None
00121 try:
00122 self.lock.acquire()
00123 if topic in self.map:
00124 return self.map[topic]
00125 main_rwt = ROSWebTopic(maintopic, self)
00126 self.map[maintopic] = main_rwt
00127 rwt = main_rwt
00128 if subtopic:
00129 handlerClass = self.webserver._subtopics["%s:%s" % (maintopic, subtopic)]
00130 rwt = handlerClass(topic, self, main_rwt)
00131 self.map[topic] = rwt
00132 finally:
00133 self.lock.release()
00134 return main_rwt
00135
00136 def get(self, topic):
00137 return self.map.get(topic, None)
00138
00139 def registerVirtualTopic(self, topic, thread):
00140 vt = VirtualTopic(topic, self, thread)
00141 try:
00142 self.lock.acquire()
00143 self.map[topic] = vt
00144 vt.init()
00145 finally:
00146 self.lock.release()
00147
00148
00149 class ROSWebTopic(object):
00150 def __init__(self, topic, factory):
00151 self.topic = topic
00152 self.initialized = False
00153 self.sub = None
00154 self.factory = factory
00155
00156 self.last_message = None
00157 self.messages = []
00158 self.subtopics = []
00159
00160 def init(self):
00161 if self.initialized: return
00162
00163 try:
00164 self.factory.cond.acquire()
00165 m = roslib.scriptutil.get_master()
00166
00167 code, _, topics = m.getPublishedTopics('/'+PKG_NAME, '/')
00168 if code != 1:
00169 raise ROSWebException("unable to communicate with master")
00170
00171 for t, topic_type in topics:
00172 if t == self.topic: break
00173 else:
00174 raise ROSWebException("%s is not a published topic"%self.topic)
00175
00176 msg_class = roslib.message.get_message_class(topic_type)
00177
00178
00179 try:
00180 self.sub = rospy.Subscriber(self.topic, msg_class, self.topic_callback)
00181 except:
00182 print self.topic, msg_class
00183 raise
00184 self.initialized = True
00185 except ROSWebException:
00186 raise
00187 except Exception, e:
00188
00189 rospy.signal_shutdown(str(e))
00190 raise
00191 finally:
00192 self.factory.cond.release()
00193
00194 def close(self):
00195 self.factory = None
00196 for rwt, subtopic in self.subtopics:
00197 subtopic.close()
00198
00199 def topic_callback(self, data):
00200 try:
00201 self.factory.cond.acquire()
00202
00203 self.messages.append((self.factory.counter, data))
00204 self.factory.counter = self.factory.counter + 1
00205 self.messages = self.messages[-1:]
00206
00207 for rwt, subtopic in self.subtopics:
00208 rwt.subtopic_callback(data)
00209
00210 self.factory.cond.notifyAll()
00211 finally:
00212 self.factory.cond.release()
00213
00214 def subscribeSubTopic(self, rwt, subtopic):
00215 self.subtopics.append((rwt, subtopic))
00216
00217
00218 class ROSWebSubTopic(object):
00219 def __init__(self, topic, factory, main_rwt):
00220 self.factory = factory
00221 self.topic = topic
00222
00223 self.messages = []
00224 self.maintopic, self.subtopic, params = splitTopic(topic)
00225 main_rwt.subscribeSubTopic(self, self.subtopic)
00226
00227 def init(self):
00228 pass
00229
00230 def subtopic_callback(self, msg):
00231
00232
00233 newmsg = self.transform(msg)
00234
00235 if not newmsg: return
00236
00237 self.messages.append((self.factory.counter, newmsg))
00238 self.factory.counter = self.factory.counter + 1
00239 self.messages = self.messages[-1:]
00240
00241
00242
00243
00244 class VirtualTopic(object):
00245 def __init__(self, topic, factory, thread, intopics=[]):
00246 self.factory = factory
00247 self.topic = topic
00248 self.intopics = intopics
00249
00250 self.messages = []
00251
00252 self.thread = thread
00253 self.initialized = False
00254
00255 def init(self):
00256 if not self.initialized:
00257 for _topic in self.intopics:
00258 self.factory.subscribe(_topic)
00259
00260 self.initialized = True
00261 self.thread.setCallback(self.callback)
00262 self.thread.start()
00263
00264 def callback(self, data):
00265 try:
00266 self.factory.cond.acquire()
00267
00268 self.messages.append((self.factory.counter, data))
00269 self.factory.counter = self.factory.counter + 1
00270 self.messages = self.messages[-3:]
00271
00272 self.factory.cond.notifyAll()
00273 finally:
00274 self.factory.cond.release()
00275
00276
00277 class ROSWebHandler(BaseHTTPServer.BaseHTTPRequestHandler):
00278 def __init__(self, *args):
00279 self.failed_topics = []
00280 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
00281
00282 def subscribe(self, topic):
00283 rwt = self.server.factory.subscribe(topic)
00284 try:
00285 rwt.init()
00286 except ROSWebException, e:
00287 self.server.factory.unsubscribe(topic)
00288 if topic not in self.failed_topics:
00289 rospy.logwarn("Cannot subscribe to %s" % topic)
00290 self.failed_topics.append(topic)
00291
00292 return None
00293 return rwt
00294
00295 def _do_GET_topic(self, topics, since, nowait=False, callback=None):
00296 rwttopics = []
00297
00298 for topic in topics:
00299 rwt = self.server.factory.get(topic)
00300 if rwt:
00301 rwttopics.append((topic, rwt))
00302 else:
00303 rwt = self.subscribe(topic)
00304 if rwt: rwttopics.append((topic, rwt))
00305
00306 if len(rwttopics) == 0:
00307 rospy.logwarn("no valid topics")
00308 self.send_failure()
00309 return
00310
00311 (messages, lastSince) = self._get_messages(rwttopics, since, nowait)
00312
00313 buf = cStringIO.StringIO()
00314 buf.write("{")
00315 buf.write('"since": %s,' % lastSince)
00316 buf.write('"msgs": [')
00317 _i = 0
00318 for (topic, msg) in messages:
00319 _i = _i + 1
00320 if _i > 1: buf.write(',')
00321 buf.write("{")
00322 buf.write('"topic": "%s",' % topic)
00323 buf.write('"msg": ')
00324 buf.write(rosjson.ros_message_to_json(msg))
00325 buf.write("}")
00326 buf.write(']')
00327 buf.write('}')
00328
00329 buf = buf.getvalue()
00330
00331 self.send_success(buf, callback)
00332
00333
00334 def _get_messages(self, rwttopics, since, nowait=False):
00335 messages = []
00336 lastSince = 0
00337 try:
00338 self.server.factory.cond.acquire()
00339
00340
00341 if since > self.server.factory.counter:
00342
00343 rospy.logerr("invalid since counter, reseting... %s" % (since, ))
00344 since = 0
00345
00346
00347 lastSince = since
00348 for topic, rwt in rwttopics:
00349 if not rwt: continue
00350 for t,msg in rwt.messages:
00351 if t > since:
00352 messages.append((topic, msg))
00353 lastSince = max(t, lastSince)
00354
00355 if nowait:
00356
00357 return messages, lastSince
00358
00359
00360 while not messages:
00361 self.server.factory.cond.wait()
00362
00363
00364 for topic, rwt in rwttopics:
00365 if not rwt: continue
00366 for t,msg in rwt.messages:
00367 if t > lastSince:
00368 messages.append((topic, msg))
00369 lastSince = max(t, lastSince)
00370 finally:
00371 self.server.factory.cond.release()
00372 return (messages, lastSince)
00373
00374
00375 def _connect_to(self, netloc, soc):
00376 i = netloc.find(':')
00377 if i >= 0:
00378 host_port = netloc[:i], int(netloc[i+1:])
00379 else:
00380 host_port = netloc, 80
00381
00382 try: soc.connect(host_port)
00383 except socket.error, arg:
00384 try: msg = arg[1]
00385 except:
00386 msg = arg
00387 self.send_error(404, msg)
00388 return 0
00389 return 1
00390
00391 def _send_responsepage(self, retcode=200, buf = "{}", callback = None):
00392 try:
00393 self.send_response(retcode)
00394 if callback:
00395
00396 buf = callback + "=" + buf + ";\nvar _xsajax$transport_status=200;"
00397 self.send_header('Content-Type', 'text/javascript')
00398 else:
00399 self.send_header('Content-Type', 'application/json')
00400
00401 if len(buf) > 500:
00402 acceptEncoding = self.headers.get('Accept-Encoding', '')
00403 if acceptEncoding.find('gzip') != -1:
00404 zbuf = cStringIO.StringIO()
00405 zfile = gzip.GzipFile(None, "wb", 9, zbuf)
00406 zfile.write(buf)
00407 zfile.close()
00408
00409 nbuf = len(buf)
00410 buf = zbuf.getvalue()
00411 self.send_header('Content-encoding', 'gzip')
00412
00413 self.send_header('Content-Length', str(len(buf)))
00414
00415 if self.headers.get("Connection", '').lower() == "keep-alive":
00416 self.send_header('Connection', 'keep-alive')
00417
00418 self.send_header("Access-Control-Allow-Origin", "*")
00419
00420 self.end_headers()
00421
00422
00423 self.wfile.write(buf)
00424 except socket.error, (ecode, reason):
00425 if ecode == 32:
00426 print ecode, reason
00427 else:
00428 raise
00429
00430
00431 def send_success(self, buf = "{}", callback=None):
00432 self._send_responsepage(200, buf, callback)
00433
00434 def send_failure(self, buf = "{}", callback=None):
00435 self._send_responsepage(404, buf, callback)
00436
00437 def send_forbidden(self, buf = "{}", callback=None):
00438 self._send_responsepage(403, buf, callback)
00439
00440 def address_string(self):
00441 host, port = self.client_address[:2]
00442 return host
00443
00444 def handle_ROS(self, path, qdict):
00445 cstring = self.headers.get('Cookie', '')
00446
00447 cookie = Cookie.BaseCookie()
00448 cookie.load(cstring)
00449
00450 path_parts = path.split("/")
00451
00452 username = ""
00453
00454 cmd = path_parts[2]
00455
00456
00457 if cookie.has_key('MB_L1'):
00458 l1 = cookie['MB_L1'].value
00459 if l1.startswith("V1/"):
00460 code = l1[3:]
00461 parts = code.split(":", 1)
00462 username = parts[0]
00463
00464
00465
00466
00467
00468
00469 if cmd == "ping":
00470 self.send_success("{'ping': %f}" % time.time())
00471
00472 if cmd == "service":
00473 name = "/".join(path_parts[3:])
00474 callback = qdict.get("callback", [''])[0]
00475
00476 try:
00477 service_class = rosservice.get_service_class_by_name("/" + name)
00478 except ROSServiceException:
00479 self.send_failure()
00480 return
00481
00482 request = service_class._request_class()
00483
00484 args = qdict.get('args', None)
00485 if not args:
00486 args = qdict.get('json', ['{}'])[0]
00487 args = eval(args)
00488 try:
00489 now = rospy.get_rostime()
00490 keys = { 'now': now, 'auto': rospy.Header(stamp=now) }
00491 roslib.message.fill_message_args(request, args, keys=keys)
00492 except MessageException:
00493 raise ROSServiceException("Not enough arguments to call service.\n"+\
00494 "Args should be: [%s], but are actually [%s] " %
00495 (roslib.message.get_printable_message_args(request), args))
00496
00497 rospy.logdebug("service call: name=%s, args=%s" % (name, args))
00498 rospy.wait_for_service(name)
00499 service_proxy = rospy.ServiceProxy(name, service_class)
00500 try:
00501 msg = service_proxy(request)
00502 except rospy.ServiceException:
00503 self.send_failure()
00504 return
00505 msg = rosjson.ros_message_to_json(msg)
00506
00507 rospy.logdebug("service call: name=%s, resp=%s" % (name, msg))
00508 self.send_success(msg, callback=callback)
00509 return
00510
00511 if cmd == "publish":
00512 topic = string.join(path_parts[3:], "/")
00513
00514 callback = qdict.get("callback", [''])[0]
00515 topic_type = qdict.get("topic_type", [''])[0]
00516
00517 message_klass = roslib.message.get_message_class(topic_type)
00518
00519 message = message_klass()
00520
00521 args = qdict.get('args', None)
00522 if not args:
00523 args = qdict.get('json', ['{}'])[0]
00524 args = eval(args)
00525 try:
00526 now = rospy.get_rostime()
00527 keys = { 'now': now, 'auto': rospy.Header(stamp=now) }
00528 roslib.message.fill_message_args(message, args, keys=keys)
00529 except MessageException:
00530 raise ROSServiceException("Not enough arguments to call service.\n"+\
00531 "Args should be: [%s], but are actually [%s] " %
00532 (roslib.message.get_printable_message_args(message), args))
00533
00534 rospy.logdebug("publish: topic=%s, topic_type=%s, args=%s" % (topic, topic_type, args))
00535
00536 pub = rospy.Publisher(topic, message_klass, latch=True)
00537
00538 try:
00539 pub.publish(message)
00540 except rospy.ServiceException:
00541 self.send_failure()
00542 return
00543
00544 self.send_success(callback=callback)
00545 return
00546
00547 if cmd == "receive":
00548 since = int(qdict.get('since', ['0'])[0])
00549 topics = qdict.get("topic", "")
00550 callback = qdict.get("callback", [''])[0]
00551 nowait = int(qdict.get('nowait', ['0'])[0])
00552 self._do_GET_topic(topics, since, nowait=nowait, callback=callback)
00553 return
00554
00555
00556
00557 def do_GET(self):
00558 if get_robot_type().startswith("texas"):
00559 try:
00560 stored_cookie = open("/var/ros/user_cookie.dat").read()
00561 request_cookie = self.headers.get('Cookie', '')
00562 if stored_cookie.find(request_cookie) == -1:
00563 rospy.logwarn("cookie did not match stored cookie \n%s\n%s" % (stored_cookie, request_cookie))
00564 self.send_forbidden()
00565 return
00566 except:
00567 self.send_forbidden()
00568 return
00569
00570 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
00571 self.path, 'http')
00572 qdict = cgi.parse_qs(query)
00573 pdict = cgi.parse_qs(str(self.rfile._rbuf))
00574 qdict.update(pdict)
00575
00576 self.log_request()
00577
00578 for handler in self.server._handlers:
00579 if path.startswith(handler['url']):
00580 handler['handler'](self, path, qdict)
00581 return
00582
00583 netloc = "localhost:80"
00584 scm = "http"
00585
00586 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00587 try:
00588 if self._connect_to(netloc, soc):
00589 soc.send("%s %s %s\r\n" % (
00590 self.command,
00591 urlparse.urlunparse(('', '', path, params, query, '')),
00592 self.request_version))
00593 self.headers['Connection'] = 'close'
00594 del self.headers['Proxy-Connection']
00595 for key_val in self.headers.items():
00596 soc.send("%s: %s\r\n" % key_val)
00597 soc.send("\r\n")
00598 self._read_write(soc)
00599 finally:
00600 soc.close()
00601 self.connection.close()
00602
00603
00604 def _read_write(self, soc, max_idling=20):
00605 iw = (self.connection, soc)
00606 ow = ()
00607 count = 0
00608
00609 while 1:
00610 count += 1
00611 (ins, _, exs) = select.select(iw, ow, iw, 3)
00612 if exs: break
00613 if ins:
00614 for i in ins:
00615 if i is soc:
00616 out = self.connection
00617 elif i is self.connection:
00618 out = soc
00619 else:
00620 print "error"
00621 data = i.recv(8192)
00622 if data:
00623 bytes_sent = 0
00624 while bytes_sent != len(data):
00625 n = out.send(data[bytes_sent:])
00626 bytes_sent += n
00627 count = 0
00628 else:
00629 pass
00630 if count == max_idling: break
00631
00632 do_POST = do_GET
00633 do_OPTIONS = do_GET
00634
00635 def log_message(self, format, *args):
00636 self.server.accesslog_fp.write("%s - - [%s] %s\n" %
00637 (self.address_string(),
00638 self.log_date_time_string(),
00639 format%args))
00640 self.server.accesslog_fp.flush()
00641
00642
00643 class MasterMonitor:
00644 def run(self):
00645 global running
00646 master = roslib.scriptutil.get_master()
00647 while running:
00648 try:
00649 master.getSystemState('/rostopic')
00650 except:
00651 import traceback
00652
00653 return
00654 time.sleep(1)
00655
00656 class ThreadingHTTPServer (threading.Thread,
00657 SocketServer.ThreadingMixIn,
00658 BaseHTTPServer.HTTPServer):
00659 def __init__(self, hostport, handlerClass):
00660 threading.Thread.__init__(self)
00661 SocketServer.TCPServer.__init__(self, hostport, handlerClass)
00662
00663 self.factory = RWTFactory(self)
00664
00665 logfn = os.path.join(rosenv.get_log_dir(), "access_log")
00666 self.accesslog_fp = open(logfn, "a+")
00667
00668 self._handlers = []
00669 self.register_handler('/ros', ROSWebHandler.handle_ROS)
00670
00671 self._subtopics = {}
00672
00673 self.__load_plugins()
00674 self.__registerVirtualTopics()
00675
00676 def register_handler(self, urlprefix, handler):
00677 h = {'url': urlprefix, 'handler': handler}
00678 self._handlers.append(h)
00679
00680 def register_subtopic(self, topic, handlerClass):
00681 self._subtopics[topic] = handlerClass
00682
00683 def __load_plugins(self):
00684 plugins = roslib.rospack.rospack_plugins(PKG_NAME)
00685 for (package, plugin) in plugins:
00686 try:
00687 roslib.load_manifest(package)
00688 mods = plugin.split('.')
00689 mod = __import__(plugin)
00690 for sub_mod in mods[1:]:
00691 mod = getattr(mod, sub_mod)
00692
00693 mod.config_plugin(self)
00694
00695 except Exception, reason:
00696 rospy.logerr("plugin %s got exception %s" % (plugin, reason))
00697
00698
00699 def __registerVirtualTopics(self):
00700 import topics
00701 ut = topics.TopicThread()
00702 ut.setDaemon(True)
00703 self.factory.registerVirtualTopic("/topics", ut)
00704
00705 import params
00706 ut = params.ParamThread()
00707 ut.setDaemon(True)
00708 self.factory.registerVirtualTopic("/parameters", ut)
00709
00710 def run(self):
00711 while 1:
00712 self.handle_request()
00713
00714 def rosweb_start():
00715 web_server = None
00716 mm = None
00717
00718 try:
00719 rospy.init_node(PKG_NAME, disable_signals=True)
00720
00721 web_server = ThreadingHTTPServer(('', 8068), ROSWebHandler)
00722
00723 web_server.setDaemon(True)
00724 web_server.start()
00725 rospy.loginfo("starting Web server")
00726
00727 mm = MasterMonitor()
00728 mm.run()
00729
00730 finally:
00731 rospy.signal_shutdown(PKG_NAME + ' exiting')
00732 if web_server: web_server.server_close()
00733
00734 error_log_fp = None
00735
00736 def signal_handler(signal, frame):
00737 global running
00738 running = False
00739
00740 def usage(progname):
00741 print __doc__ % vars()
00742
00743
00744 def get_robot_type():
00745 try:
00746 return open(os.path.join("/etc/ros/env", "ROBOT")).read().strip()
00747 except IOError:
00748 return "desktop"
00749
00750 def main(argv, stdout, environ):
00751 progname = argv[0]
00752 optlist, args = getopt.getopt(argv[1:], "", ["help", "debug"])
00753
00754 debugFlag = False
00755 for (field, val) in optlist:
00756 if field == "--help":
00757 usage(progname)
00758 return
00759 elif field == "--debug":
00760 debugFlag = True
00761
00762 global running, error_log
00763 running = True
00764
00765
00766
00767
00768
00769
00770 signal.signal(signal.SIGINT, signal_handler)
00771
00772
00773 while running:
00774 rospy.loginfo("starting")
00775 try:
00776 rosweb_start()
00777 except:
00778 import traceback
00779 traceback.print_exc()
00780 time.sleep(1)
00781
00782 if error_log_fp: error_log_fp.close()
00783
00784 if __name__ == '__main__':
00785 main(sys.argv, sys.stdout, os.environ)