00001
00002 """
00003 usage: %(progname)s [--debug]
00004 """
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 PKG_NAME = 'rosweb'
00041 import roslib; roslib.load_manifest(PKG_NAME)
00042
00043 import getopt
00044 import cStringIO
00045 import os
00046 import signal
00047 import string
00048 import sys
00049 import threading
00050 import time
00051 import gzip
00052 import traceback
00053 import BaseHTTPServer
00054
00055
00056 import BaseHTTPServer, select, socket, SocketServer, urlparse
00057 SocketServer.ThreadingMixIn.daemon_threads = True
00058
00059 import Cookie
00060 import cgi
00061
00062 import roslib.scriptutil
00063 import rospy
00064 import rosservice
00065 import rosjson
00066
00067 from rosservice import ROSServiceException
00068 from roslib.message import ROSMessageException
00069 from roslib import rosenv
00070
00071 import roslib.message
00072
00073 class ROSWebException(Exception): pass
00074
00075 def splitTopic(topic):
00076 parts = topic.split(":", 2)
00077 topic = parts[0]
00078 subtopic = None
00079 params = None
00080 if len(parts) >= 2:
00081 subtopic = parts[1]
00082 if len(parts) >= 3:
00083 params = parts[2]
00084 return topic, subtopic, params
00085
00086
00087
00088 class RWTFactory(object):
00089 def __init__(self, webserver):
00090 self.map = {}
00091 self.lock = threading.Lock()
00092
00093 self.webserver = webserver
00094
00095 self.counter = 1
00096 self.cond = threading.Condition()
00097
00098 def close(self):
00099 try:
00100 self.lock.acquire()
00101 for topic in self.map.keys():
00102 rwt = self.map[topic]
00103 rwt.factory = None
00104 rwt = None
00105 del self.map[topic]
00106 finally:
00107 self.lock.release()
00108
00109 def unsubscribe(self, topic):
00110 try:
00111 self.lock.acquire()
00112 if topic in self.map:
00113 del self.map[topic]
00114 finally:
00115 self.lock.release()
00116
00117 def subscribe(self, topic):
00118 maintopic, subtopic, params = splitTopic(topic)
00119 main_rwt = None
00120 try:
00121 self.lock.acquire()
00122 if topic in self.map:
00123 return self.map[topic]
00124 main_rwt = ROSWebTopic(maintopic, self)
00125 self.map[maintopic] = main_rwt
00126 rwt = main_rwt
00127 if subtopic:
00128 handlerClass = self.webserver._subtopics["%s:%s" % (maintopic, subtopic)]
00129 rwt = handlerClass(topic, self, main_rwt)
00130 self.map[topic] = rwt
00131 finally:
00132 self.lock.release()
00133 return main_rwt
00134
00135 def get(self, topic):
00136 return self.map.get(topic, None)
00137
00138 def registerVirtualTopic(self, topic, thread):
00139 vt = VirtualTopic(topic, self, thread)
00140 try:
00141 self.lock.acquire()
00142 self.map[topic] = vt
00143 vt.init()
00144 finally:
00145 self.lock.release()
00146
00147
00148 class ROSWebTopic(object):
00149 def __init__(self, topic, factory):
00150 self.topic = topic
00151 self.initialized = False
00152 self.sub = None
00153 self.factory = factory
00154
00155 self.last_message = None
00156 self.messages = []
00157 self.subtopics = []
00158
00159 def init(self):
00160 if self.initialized: return
00161
00162 try:
00163 self.factory.cond.acquire()
00164 m = roslib.scriptutil.get_master()
00165
00166 code, _, topics = m.getPublishedTopics('/'+PKG_NAME, '/')
00167 if code != 1:
00168 raise ROSWebException("unable to communicate with master")
00169
00170 for t, topic_type in topics:
00171 if t == self.topic: break
00172 else:
00173 raise ROSWebException("%s is not a published topic"%self.topic)
00174
00175 msg_class = roslib.message.get_message_class(topic_type)
00176
00177
00178 try:
00179 self.sub = rospy.Subscriber(self.topic, msg_class, self.topic_callback)
00180 except:
00181 print self.topic, msg_class
00182 raise
00183 self.initialized = True
00184 except ROSWebException:
00185 raise
00186 except Exception, e:
00187
00188 rospy.signal_shutdown(str(e))
00189 raise
00190 finally:
00191 self.factory.cond.release()
00192
00193 def close(self):
00194 self.factory = None
00195 for rwt, subtopic in self.subtopics:
00196 subtopic.close()
00197
00198 def topic_callback(self, data):
00199 try:
00200 self.factory.cond.acquire()
00201
00202 self.messages.append((self.factory.counter, data))
00203 self.factory.counter = self.factory.counter + 1
00204 self.messages = self.messages[-1:]
00205
00206 for rwt, subtopic in self.subtopics:
00207 rwt.subtopic_callback(data)
00208
00209 self.factory.cond.notifyAll()
00210 finally:
00211 self.factory.cond.release()
00212
00213 def subscribeSubTopic(self, rwt, subtopic):
00214 self.subtopics.append((rwt, subtopic))
00215
00216
00217 class ROSWebSubTopic(object):
00218 def __init__(self, topic, factory, main_rwt):
00219 self.factory = factory
00220 self.topic = topic
00221
00222 self.messages = []
00223 self.maintopic, self.subtopic, params = splitTopic(topic)
00224 main_rwt.subscribeSubTopic(self, self.subtopic)
00225
00226 def init(self):
00227 pass
00228
00229 def subtopic_callback(self, msg):
00230
00231
00232 newmsg = self.transform(msg)
00233
00234 if not newmsg: return
00235
00236 self.messages.append((self.factory.counter, newmsg))
00237 self.factory.counter = self.factory.counter + 1
00238 self.messages = self.messages[-1:]
00239
00240
00241
00242
00243 class VirtualTopic(object):
00244 def __init__(self, topic, factory, thread, intopics=[]):
00245 self.factory = factory
00246 self.topic = topic
00247 self.intopics = intopics
00248
00249 self.messages = []
00250
00251 self.thread = thread
00252 self.initialized = False
00253
00254 def init(self):
00255 if not self.initialized:
00256 for _topic in self.intopics:
00257 self.factory.subscribe(_topic)
00258
00259 self.initialized = True
00260 self.thread.setCallback(self.callback)
00261 self.thread.start()
00262
00263 def callback(self, data):
00264 try:
00265 self.factory.cond.acquire()
00266
00267 self.messages.append((self.factory.counter, data))
00268 self.factory.counter = self.factory.counter + 1
00269 self.messages = self.messages[-3:]
00270
00271 self.factory.cond.notifyAll()
00272 finally:
00273 self.factory.cond.release()
00274
00275
00276 class ROSWebHandler(BaseHTTPServer.BaseHTTPRequestHandler):
00277 def __init__(self, *args):
00278 self.failed_topics = []
00279 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
00280
00281 def subscribe(self, topic):
00282 rwt = self.server.factory.subscribe(topic)
00283 try:
00284 rwt.init()
00285 except ROSWebException, e:
00286 self.server.factory.unsubscribe(topic)
00287 if topic not in self.failed_topics:
00288 rospy.logwarn("Cannot subscribe to %s" % topic)
00289 self.failed_topics.append(topic)
00290
00291 return None
00292 return rwt
00293
00294 def _do_GET_topic(self, topics, since, nowait=False, callback=None):
00295 rwttopics = []
00296
00297 for topic in topics:
00298 rwt = self.server.factory.get(topic)
00299 if rwt:
00300 rwttopics.append((topic, rwt))
00301 else:
00302 rwt = self.subscribe(topic)
00303 if rwt: rwttopics.append((topic, rwt))
00304
00305 if len(rwttopics) == 0:
00306 rospy.logwarn("no valid topics")
00307 self.send_failure()
00308 return
00309
00310 (messages, lastSince) = self._get_messages(rwttopics, since, nowait)
00311
00312 buf = cStringIO.StringIO()
00313 buf.write("{")
00314 buf.write('"since": %s,' % lastSince)
00315 buf.write('"msgs": [')
00316 _i = 0
00317 for (topic, msg) in messages:
00318 _i = _i + 1
00319 if _i > 1: buf.write(',')
00320 buf.write("{")
00321 buf.write('"topic": "%s",' % topic)
00322 buf.write('"msg": ')
00323 buf.write(rosjson.ros_message_to_json(msg))
00324 buf.write("}")
00325 buf.write(']')
00326 buf.write('}')
00327
00328 buf = buf.getvalue()
00329
00330 self.send_success(buf, callback)
00331
00332
00333 def _get_messages(self, rwttopics, since, nowait=False):
00334 messages = []
00335 lastSince = 0
00336 try:
00337 self.server.factory.cond.acquire()
00338
00339
00340 if since > self.server.factory.counter:
00341
00342 rospy.logerr("invalid since counter, reseting... %s" % (since, ))
00343 since = 0
00344
00345
00346 lastSince = since
00347 for topic, rwt in rwttopics:
00348 if not rwt: continue
00349 for t,msg in rwt.messages:
00350 if t > since:
00351 messages.append((topic, msg))
00352 lastSince = max(t, lastSince)
00353
00354 if nowait:
00355
00356 return messages, lastSince
00357
00358
00359 while not messages:
00360 self.server.factory.cond.wait()
00361
00362
00363 for topic, rwt in rwttopics:
00364 if not rwt: continue
00365 for t,msg in rwt.messages:
00366 if t > lastSince:
00367 messages.append((topic, msg))
00368 lastSince = max(t, lastSince)
00369 finally:
00370 self.server.factory.cond.release()
00371 return (messages, lastSince)
00372
00373
00374 def _connect_to(self, netloc, soc):
00375 i = netloc.find(':')
00376 if i >= 0:
00377 host_port = netloc[:i], int(netloc[i+1:])
00378 else:
00379 host_port = netloc, 80
00380
00381 try: soc.connect(host_port)
00382 except socket.error, arg:
00383 try: msg = arg[1]
00384 except:
00385 msg = arg
00386 self.send_error(404, msg)
00387 return 0
00388 return 1
00389
00390 def _send_responsepage(self, retcode=200, buf = "{}", callback = None):
00391 try:
00392 self.send_response(retcode)
00393 if callback:
00394
00395 buf = callback + "=" + buf + ";\nvar _xsajax$transport_status=200;"
00396 self.send_header('Content-Type', 'text/javascript')
00397 else:
00398 self.send_header('Content-Type', 'application/json')
00399
00400 if len(buf) > 500:
00401 acceptEncoding = self.headers.get('Accept-Encoding', '')
00402 if acceptEncoding.find('gzip') != -1:
00403 zbuf = cStringIO.StringIO()
00404 zfile = gzip.GzipFile(None, "wb", 9, zbuf)
00405 zfile.write(buf)
00406 zfile.close()
00407
00408 nbuf = len(buf)
00409 buf = zbuf.getvalue()
00410 self.send_header('Content-encoding', 'gzip')
00411
00412 self.send_header('Content-Length', str(len(buf)))
00413
00414 if self.headers.get("Connection", '').lower() == "keep-alive":
00415 self.send_header('Connection', 'keep-alive')
00416
00417 self.send_header("Access-Control-Allow-Origin", "*")
00418
00419 self.end_headers()
00420
00421
00422 self.wfile.write(buf)
00423 except socket.error, (ecode, reason):
00424 if ecode == 32:
00425 print ecode, reason
00426 else:
00427 raise
00428
00429
00430 def send_success(self, buf = "{}", callback=None):
00431 self._send_responsepage(200, buf, callback)
00432
00433 def send_failure(self, buf = "{}", callback=None):
00434 self._send_responsepage(404, buf, callback)
00435
00436 def send_forbidden(self, buf = "{}", callback=None):
00437 self._send_responsepage(403, buf, callback)
00438
00439 def address_string(self):
00440 host, port = self.client_address[:2]
00441 return host
00442
00443 def handle_ROS(self, path, qdict):
00444 cstring = self.headers.get('Cookie', '')
00445
00446 cookie = Cookie.BaseCookie()
00447 cookie.load(cstring)
00448
00449 path_parts = path.split("/")
00450
00451 username = ""
00452
00453 cmd = path_parts[2]
00454
00455
00456 if cookie.has_key('MB_L1'):
00457 l1 = cookie['MB_L1'].value
00458 if l1.startswith("V1/"):
00459 code = l1[3:]
00460 parts = code.split(":", 1)
00461 username = parts[0]
00462
00463
00464
00465
00466
00467
00468 if cmd == "ping":
00469 self.send_success("{'ping': %f}" % time.time())
00470
00471 if cmd == "service":
00472 name = "/".join(path_parts[3:])
00473 callback = qdict.get("callback", [''])[0]
00474
00475 try:
00476 service_class = rosservice.get_service_class_by_name("/" + name)
00477 except ROSServiceException:
00478 self.send_failure()
00479 return
00480
00481 request = service_class._request_class()
00482
00483 args = qdict.get('args', None)
00484 if not args:
00485 args = qdict.get('json', ['{}'])[0]
00486 args = eval(args)
00487 try:
00488 now = rospy.get_rostime()
00489 keys = { 'now': now, 'auto': rospy.Header(stamp=now) }
00490 roslib.message.fill_message_args(request, args, keys=keys)
00491 except ROSMessageException:
00492 raise ROSServiceException("Not enough arguments to call service.\n"+\
00493 "Args should be: [%s], but are actually [%s] " %
00494 (roslib.message.get_printable_message_args(request), args))
00495
00496 rospy.logdebug("service call: name=%s, args=%s" % (name, args))
00497 rospy.wait_for_service(name)
00498 service_proxy = rospy.ServiceProxy(name, service_class)
00499 try:
00500 msg = service_proxy(request)
00501 except rospy.ServiceException:
00502 self.send_failure()
00503 return
00504 msg = rosjson.ros_message_to_json(msg)
00505
00506 rospy.logdebug("service call: name=%s, resp=%s" % (name, msg))
00507 self.send_success(msg, callback=callback)
00508 return
00509
00510 if cmd == "publish":
00511 topic = string.join(path_parts[3:], "/")
00512
00513 callback = qdict.get("callback", [''])[0]
00514 topic_type = qdict.get("topic_type", [''])[0]
00515
00516 message_klass = roslib.message.get_message_class(topic_type)
00517
00518 message = message_klass()
00519
00520 args = qdict.get('args', None)
00521 if not args:
00522 args = qdict.get('json', ['{}'])[0]
00523 args = eval(args)
00524 try:
00525 now = rospy.get_rostime()
00526 keys = { 'now': now, 'auto': rospy.Header(stamp=now) }
00527 roslib.message.fill_message_args(message, args, keys=keys)
00528 except ROSMessageException:
00529 raise ROSServiceException("Not enough arguments to call service.\n"+\
00530 "Args should be: [%s], but are actually [%s] " %
00531 (roslib.message.get_printable_message_args(message), args))
00532
00533 rospy.logdebug("publish: topic=%s, topic_type=%s, args=%s" % (topic, topic_type, args))
00534
00535 pub = rospy.Publisher(topic, message_klass, latch=True)
00536
00537 try:
00538 pub.publish(message)
00539 except rospy.ServiceException:
00540 self.send_failure()
00541 return
00542
00543 self.send_success(callback=callback)
00544 return
00545
00546 if cmd == "receive":
00547 since = int(qdict.get('since', ['0'])[0])
00548 topics = qdict.get("topic", "")
00549 callback = qdict.get("callback", [''])[0]
00550 nowait = int(qdict.get('nowait', ['0'])[0])
00551 self._do_GET_topic(topics, since, nowait=nowait, callback=callback)
00552 return
00553
00554
00555
00556 def do_GET(self):
00557 if get_robot_type().startswith("texas"):
00558 try:
00559 stored_cookie = open("/var/ros/user_cookie.dat").read()
00560 request_cookie = self.headers.get('Cookie', '')
00561 if stored_cookie.find(request_cookie) == -1:
00562 rospy.logwarn("cookie did not match stored cookie \n%s\n%s" % (stored_cookie, request_cookie))
00563 self.send_forbidden()
00564 return
00565 except:
00566 self.send_forbidden()
00567 return
00568
00569 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
00570 self.path, 'http')
00571 qdict = cgi.parse_qs(query)
00572 pdict = cgi.parse_qs(str(self.rfile._rbuf))
00573 qdict.update(pdict)
00574
00575 self.log_request()
00576
00577 for handler in self.server._handlers:
00578 if path.startswith(handler['url']):
00579 handler['handler'](self, path, qdict)
00580 return
00581
00582 netloc = "localhost:80"
00583 scm = "http"
00584
00585 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00586 try:
00587 if self._connect_to(netloc, soc):
00588 soc.send("%s %s %s\r\n" % (
00589 self.command,
00590 urlparse.urlunparse(('', '', path, params, query, '')),
00591 self.request_version))
00592 self.headers['Connection'] = 'close'
00593 del self.headers['Proxy-Connection']
00594 for key_val in self.headers.items():
00595 soc.send("%s: %s\r\n" % key_val)
00596 soc.send("\r\n")
00597 self._read_write(soc)
00598 finally:
00599 soc.close()
00600 self.connection.close()
00601
00602
00603 def _read_write(self, soc, max_idling=20):
00604 iw = (self.connection, soc)
00605 ow = ()
00606 count = 0
00607
00608 while 1:
00609 count += 1
00610 (ins, _, exs) = select.select(iw, ow, iw, 3)
00611 if exs: break
00612 if ins:
00613 for i in ins:
00614 if i is soc:
00615 out = self.connection
00616 elif i is self.connection:
00617 out = soc
00618 else:
00619 print "error"
00620 data = i.recv(8192)
00621 if data:
00622 bytes_sent = 0
00623 while bytes_sent != len(data):
00624 n = out.send(data[bytes_sent:])
00625 bytes_sent += n
00626 count = 0
00627 else:
00628 pass
00629 if count == max_idling: break
00630
00631 do_POST = do_GET
00632 do_OPTIONS = do_GET
00633
00634 def log_message(self, format, *args):
00635 self.server.accesslog_fp.write("%s - - [%s] %s\n" %
00636 (self.address_string(),
00637 self.log_date_time_string(),
00638 format%args))
00639 self.server.accesslog_fp.flush()
00640
00641
00642 class MasterMonitor:
00643 def run(self):
00644 global running
00645 master = roslib.scriptutil.get_master()
00646 while running:
00647 try:
00648 master.getSystemState('/rostopic')
00649 except:
00650 import traceback
00651
00652 return
00653 time.sleep(1)
00654
00655 class ThreadingHTTPServer (threading.Thread,
00656 SocketServer.ThreadingMixIn,
00657 BaseHTTPServer.HTTPServer):
00658 def __init__(self, hostport, handlerClass):
00659 threading.Thread.__init__(self)
00660 SocketServer.TCPServer.__init__(self, hostport, handlerClass)
00661
00662 self.factory = RWTFactory(self)
00663
00664 logfn = os.path.join(rosenv.get_log_dir(), "access_log")
00665 self.accesslog_fp = open(logfn, "a+")
00666
00667 self._handlers = []
00668 self.register_handler('/ros', ROSWebHandler.handle_ROS)
00669
00670 self._subtopics = {}
00671
00672 self.__load_plugins()
00673 self.__registerVirtualTopics()
00674
00675 def register_handler(self, urlprefix, handler):
00676 h = {'url': urlprefix, 'handler': handler}
00677 self._handlers.append(h)
00678
00679 def register_subtopic(self, topic, handlerClass):
00680 self._subtopics[topic] = handlerClass
00681
00682 def __load_plugins(self):
00683 plugins = roslib.rospack.rospack_plugins(PKG_NAME)
00684 for (package, plugin) in plugins:
00685 try:
00686 roslib.load_manifest(package)
00687 mods = plugin.split('.')
00688 mod = __import__(plugin)
00689 for sub_mod in mods[1:]:
00690 mod = getattr(mod, sub_mod)
00691
00692 mod.config_plugin(self)
00693
00694 except Exception, reason:
00695 rospy.logerr("plugin %s got exception %s" % (plugin, reason))
00696
00697
00698 def __registerVirtualTopics(self):
00699 import topics
00700 ut = topics.TopicThread()
00701 ut.setDaemon(True)
00702 self.factory.registerVirtualTopic("/topics", ut)
00703
00704 import params
00705 ut = params.ParamThread()
00706 ut.setDaemon(True)
00707 self.factory.registerVirtualTopic("/parameters", ut)
00708
00709 def run(self):
00710 while 1:
00711 self.handle_request()
00712
00713 def rosweb_start():
00714 web_server = None
00715 mm = None
00716
00717 try:
00718 rospy.init_node(PKG_NAME, disable_signals=True)
00719
00720 web_server = ThreadingHTTPServer(('', 8068), ROSWebHandler)
00721
00722 web_server.setDaemon(True)
00723 web_server.start()
00724 rospy.loginfo("starting Web server")
00725
00726 mm = MasterMonitor()
00727 mm.run()
00728
00729 finally:
00730 rospy.signal_shutdown(PKG_NAME + ' exiting')
00731 if web_server: web_server.server_close()
00732
00733 error_log_fp = None
00734
00735 def signal_handler(signal, frame):
00736 global running
00737 running = False
00738
00739 def usage(progname):
00740 print __doc__ % vars()
00741
00742
00743 def get_robot_type():
00744 try:
00745 return open(os.path.join("/etc/ros/env", "ROBOT")).read().strip()
00746 except IOError:
00747 return "desktop"
00748
00749 def main(argv, stdout, environ):
00750 progname = argv[0]
00751 optlist, args = getopt.getopt(argv[1:], "", ["help", "debug"])
00752
00753 debugFlag = False
00754 for (field, val) in optlist:
00755 if field == "--help":
00756 usage(progname)
00757 return
00758 elif field == "--debug":
00759 debugFlag = True
00760
00761 global running, error_log
00762 running = True
00763
00764
00765
00766
00767
00768
00769 signal.signal(signal.SIGINT, signal_handler)
00770
00771
00772 while running:
00773 rospy.loginfo("starting")
00774 try:
00775 rosweb_start()
00776 except:
00777 import traceback
00778 traceback.print_exc()
00779 time.sleep(1)
00780
00781 if error_log_fp: error_log_fp.close()
00782
00783 if __name__ == '__main__':
00784 main(sys.argv, sys.stdout, os.environ)