mongodb_log.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 
00003 ###########################################################################
00004 #  mongodb_log.py - Python based ROS to MongoDB logger (multi-process)
00005 #
00006 #  Created: Sun Dec 05 19:45:51 2010
00007 #  Copyright  2010-2012  Tim Niemueller [www.niemueller.de]
00008 #             2010-2011  Carnegie Mellon University
00009 #             2010       Intel Labs Pittsburgh
00010 ###########################################################################
00011 
00012 #  This program is free software; you can redistribute it and/or modify
00013 #  it under the terms of the GNU General Public License as published by
00014 #  the Free Software Foundation; either version 2 of the License, or
00015 #  (at your option) any later version.
00016 #
00017 #  This program is distributed in the hope that it will be useful,
00018 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
00019 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00020 #  GNU Library General Public License for more details.
00021 #
00022 #  Read the full text in the LICENSE.GPL file in the doc directory.
00023 
00024 # make sure we aren't using floor division
00025 from __future__ import division, with_statement
00026 
00027 PACKAGE_NAME='mongodb_log'
00028 NODE_NAME='mongodb_log'
00029 NODE_NAME_TEMPLATE='%smongodb_log'
00030 WORKER_NODE_NAME = "%smongodb_log_worker_%d_%s"
00031 QUEUE_MAXSIZE = 100
00032 
00033 # import roslib; roslib.load_manifest(PACKAGE_NAME)
00034 import rospy
00035 
00036 # for msg_to_document
00037 import mongodb_store.util
00038 
00039 import os
00040 import re
00041 import sys
00042 import time
00043 import pprint
00044 import string
00045 import signal
00046 import subprocess
00047 from threading import Thread, Timer
00048 
00049 from Queue import Empty
00050 from optparse import OptionParser
00051 from tempfile import mktemp
00052 from datetime import datetime, timedelta
00053 from time import sleep
00054 from random import randint
00055 from tf.msg import tfMessage
00056 from sensor_msgs.msg import PointCloud, CompressedImage
00057 from roslib.packages import find_node
00058 #from rviz_intel.msg import TriangleMesh
00059 
00060 use_setproctitle = True
00061 try:
00062     from setproctitle import setproctitle
00063 except ImportError:
00064     use_setproctitle = False
00065 
00066 
00067 use_processes = False
00068 
00069 # if use_processes:
00070 from multiprocessing import Process, Lock, Condition, Queue, Value, current_process, Event
00071 import multiprocessing as mp
00072 # else:
00073     # from threading import Lock, Condition, Event
00074     # from Queue import Queue
00075     # def Value(t, val, lock=None):
00076         # return val
00077 
00078 
00079 import genpy
00080 import rosgraph.masterapi
00081 import roslib.message
00082 #from rospy import Time, Duration
00083 import rostopic
00084 
00085 
00086 from pymongo import SLOW_ONLY
00087 from pymongo.errors import InvalidDocument, InvalidStringData
00088 
00089 MongoClient = mongodb_store.util.import_MongoClient()
00090 
00091 BACKLOG_WARN_LIMIT = 100
00092 STATS_LOOPTIME     = 10
00093 STATS_GRAPHTIME    = 60
00094 
00095 class Counter(object):
00096     def __init__(self, value = None, lock = True):
00097         self.count = value or Value('i', 0, lock=lock)
00098         self.mutex = Lock()
00099 
00100     def increment(self, by = 1):
00101         with self.mutex: self.count.value += by
00102 
00103     def value(self):
00104         with self.mutex: return self.count.value
00105 
00106 class Barrier(object):
00107     def __init__(self, num_threads):
00108         self.num_threads = num_threads
00109         self.threads_left = Value('i', num_threads, lock=True)
00110         self.mutex = Lock()
00111         self.waitcond = Condition(self.mutex)
00112 
00113     def wait(self):
00114         self.mutex.acquire()
00115         self.threads_left.value -= 1
00116         if self.threads_left.value == 0:
00117             self.threads_left.value = self.num_threads
00118             self.waitcond.notify_all()
00119             self.mutex.release()
00120         else:
00121             self.waitcond.wait()
00122             self.mutex.release()
00123 
00124 
00125 class WorkerProcess(object):
00126     def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
00127                  drop_counter_value, queue_maxsize,
00128                  mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
00129         self.name = "WorkerProcess-%4d-%s" % (idnum, topic)
00130         self.id = idnum
00131         self.topic = topic
00132         self.collname = collname
00133         self.queue = Queue(queue_maxsize)
00134         self.out_counter = Counter(out_counter_value)
00135         self.in_counter  = Counter(in_counter_value)
00136         self.drop_counter = Counter(drop_counter_value)
00137         self.worker_out_counter = Counter()
00138         self.worker_in_counter  = Counter()
00139         self.worker_drop_counter = Counter()
00140         self.mongodb_host = mongodb_host
00141         self.mongodb_port = mongodb_port
00142         self.mongodb_name = mongodb_name
00143         self.nodename_prefix = nodename_prefix
00144         self.quit = Value('i', 0)
00145 
00146         # print "Creating process %s" % self.name
00147         self.process = Process(name=self.name, target=self.run)
00148         # self.process = Thread(name=self.name, target=self.run)
00149         # print "created %s" % self.process
00150         self.process.start()
00151         # print "started %s" % self.process
00152 
00153     def init(self):
00154         global use_setproctitle
00155         if use_setproctitle:
00156             setproctitle("mongodb_log %s" % self.topic)
00157 
00158         self.mongoconn = MongoClient(self.mongodb_host, self.mongodb_port)
00159         self.mongodb = self.mongoconn[self.mongodb_name]
00160         self.mongodb.set_profiling_level = SLOW_ONLY
00161 
00162         self.collection = self.mongodb[self.collname]
00163         self.collection.count()
00164 
00165         self.queue.cancel_join_thread()
00166 
00167         # clear signal handlers in this child process, rospy will handle signals for us
00168         signal.signal(signal.SIGTERM, signal.SIG_DFL)
00169         signal.signal(signal.SIGINT, signal.SIG_DFL)
00170 
00171         worker_node_name = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
00172         # print "Calling init_node with %s from process %s" % (worker_node_name, mp.current_process())
00173         rospy.init_node(worker_node_name, anonymous=False)
00174 
00175         self.subscriber = None
00176         while not self.subscriber and not self.is_quit():
00177             try:
00178                 msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.topic, blocking=True)
00179                 self.subscriber = rospy.Subscriber(real_topic, msg_class, self.enqueue, self.topic)
00180             except rostopic.ROSTopicIOException:
00181                 print("FAILED to subscribe, will keep trying %s" % self.name)
00182                 time.sleep(randint(1,10))
00183             except rospy.ROSInitException:
00184                 print("FAILED to initialize, will keep trying %s" % self.name)
00185                 time.sleep(randint(1,10))
00186                 self.subscriber = None
00187 
00188     def run(self):
00189         self.init()
00190 
00191         print("ACTIVE: %s" % self.name)
00192 
00193         # run the thread
00194         self.dequeue()
00195 
00196         # free connection
00197         # self.mongoconn.end_request()
00198 
00199     def is_quit(self):
00200         return self.quit.value == 1
00201 
00202     def shutdown(self):
00203         if not self.is_quit():
00204             #print("SHUTDOWN %s qsize %d" % (self.name, self.queue.qsize()))
00205             self.quit.value = 1
00206             self.queue.put("shutdown")
00207             while not self.queue.empty(): sleep(0.1)
00208         #print("JOIN %s qsize %d" % (self.name, self.queue.qsize()))
00209         self.process.join()
00210         self.process.terminate()
00211 
00212 
00213 
00214 
00215     def qsize(self):
00216         return self.queue.qsize()
00217 
00218     def enqueue(self, data, topic, current_time=None):
00219 
00220         if not self.is_quit():
00221             if self.queue.full():
00222                 try:
00223                     self.queue.get_nowait()
00224                     self.drop_counter.increment()
00225                     self.worker_drop_counter.increment()
00226                 except Empty:
00227                     pass
00228             #self.queue.put((topic, data, current_time or datetime.now()))
00229             self.queue.put((topic, data, rospy.get_time(), data._connection_header))
00230             self.in_counter.increment()
00231             self.worker_in_counter.increment()
00232 
00233     def dequeue(self):
00234         while not self.is_quit():
00235             t = None
00236             try:
00237                 t = self.queue.get(True)
00238             except IOError:
00239                 # Anticipate Ctrl-C
00240                 #print("Quit W1: %s" % self.name)
00241                 self.quit.value = 1
00242                 break
00243             if isinstance(t, tuple):
00244                 self.out_counter.increment()
00245                 self.worker_out_counter.increment()
00246                 topic = t[0]
00247                 msg   = t[1]
00248                 ctime = t[2]
00249                 connection_header = t[3]
00250 
00251                 if isinstance(msg, rospy.Message):
00252 
00253                     try:
00254                         #print(self.sep + threading.current_thread().getName() + "@" + topic+": ")
00255                         #pprint.pprint(doc)
00256                         meta = {}
00257                         # switched to use inserted_at to match message_store
00258                         # meta["recorded"] = ctime or datetime.now()
00259                         meta["topic"]    = topic
00260 
00261                         if connection_header['latching'] == '1':
00262                             meta['latch'] = True
00263                         else:
00264                             meta['latch'] = False
00265 
00266                         if ctime is not None:
00267                             meta['inserted_at'] = datetime.utcfromtimestamp(ctime)
00268                         else:
00269                             meta['inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
00270 
00271 
00272                         mongodb_store.util.store_message(self.collection, msg, meta)
00273 
00274                     except InvalidDocument, e:
00275                         print("InvalidDocument " + current_process().name + "@" + topic +": \n")
00276                         print e
00277                     except InvalidStringData, e:
00278                         print("InvalidStringData " + current_process().name + "@" + topic +": \n")
00279                         print e
00280 
00281             else:
00282                 #print("Quit W2: %s" % self.name)
00283                 self.quit.value = 1
00284 
00285         # we must make sure to clear the queue before exiting,
00286         # or the parent thread might deadlock otherwise
00287         #print("Quit W3: %s" % self.name)
00288         self.subscriber.unregister()
00289         self.subscriber = None
00290         while not self.queue.empty():
00291             t = self.queue.get_nowait()
00292         print("STOPPED: %s" % self.name)
00293 
00294 
00295 class SubprocessWorker(object):
00296     def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
00297                  drop_counter_value, queue_maxsize,
00298                  mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
00299 
00300         self.name = "SubprocessWorker-%4d-%s" % (idnum, topic)
00301         self.id = idnum
00302         self.topic = topic
00303         self.collname = collname
00304         self.queue = Queue(queue_maxsize)
00305         self.out_counter = Counter(out_counter_value)
00306         self.in_counter  = Counter(in_counter_value)
00307         self.drop_counter = Counter(drop_counter_value)
00308         self.worker_out_counter = Counter()
00309         self.worker_in_counter  = Counter()
00310         self.worker_drop_counter = Counter()
00311         self.mongodb_host = mongodb_host
00312         self.mongodb_port = mongodb_port
00313         self.mongodb_name = mongodb_name
00314         self.nodename_prefix = nodename_prefix
00315         self.quit = False
00316         self.qsize = 0
00317 
00318         self.thread = Thread(name=self.name, target=self.run)
00319 
00320         mongodb_host_port = "%s:%d" % (mongodb_host, mongodb_port)
00321         collection = "%s.%s" % (mongodb_name, collname)
00322         nodename = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
00323 
00324         self.process = subprocess.Popen([cpp_logger[0], "-t", topic, "-n", nodename,
00325                                          "-m", mongodb_host_port, "-c", collection],
00326                                         stdout=subprocess.PIPE)
00327 
00328         self.thread.start()
00329 
00330     def qsize(self):
00331         return self.qsize
00332 
00333     def run(self):
00334         while not self.quit:
00335             line = self.process.stdout.readline().rstrip()
00336             if line == "": continue
00337             arr = string.split(line, ":")
00338             self.in_counter.increment(int(arr[0]))
00339             self.out_counter.increment(int(arr[1]))
00340             self.drop_counter.increment(int(arr[2]))
00341             self.qsize = int(arr[3])
00342 
00343             self.worker_in_counter.increment(int(arr[0]))
00344             self.worker_out_counter.increment(int(arr[1]))
00345             self.worker_drop_counter.increment(int(arr[2]))
00346 
00347     def shutdown(self):
00348         self.quit = True
00349         self.process.kill()
00350         self.process.wait()
00351 
00352 
00353 class MongoWriter(object):
00354     def __init__(self, topics = [], treat_as_regex=False,
00355                  all_topics = False, all_topics_interval = 5,
00356                  exclude_topics = [],
00357                  mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None,
00358                  no_specific=False, nodename_prefix=""):
00359         self.all_topics = all_topics
00360         self.all_topics_interval = all_topics_interval
00361         self.exclude_topics = exclude_topics
00362         self.mongodb_host = mongodb_host
00363         self.mongodb_port = mongodb_port
00364         self.mongodb_name = mongodb_name
00365         self.mongodb_collection = mongodb_collection
00366         self.no_specific = no_specific
00367         self.nodename_prefix = nodename_prefix
00368         self.quit = False
00369         self.topics = set()
00370         self.collnames = set()
00371         #self.str_fn = roslib.message.strify_message
00372         self.sep = "\n" #'\033[2J\033[;H'
00373         self.in_counter = Counter()
00374         self.out_counter = Counter()
00375         self.drop_counter = Counter()
00376         self.workers = []
00377 
00378         global use_setproctitle
00379         if use_setproctitle:
00380             setproctitle("mongodb_log MAIN")
00381 
00382         self.exclude_regex = []
00383         for et in self.exclude_topics:
00384             self.exclude_regex.append(re.compile(et))
00385         self.exclude_already = []
00386 
00387         if treat_as_regex:
00388             topics = self.expand_regex_to_topics(topics)
00389 
00390         self.missing_topics = self.subscribe_topics(set(topics))
00391         self.fill_in_topics()
00392 
00393 
00394         if self.all_topics:
00395             print("All topics")
00396             self.ros_master = rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % self.nodename_prefix)
00397             self.update_topics(restart=False)
00398 
00399         self.start_all_topics_timer()
00400 
00401     def expand_regex_to_topics(self, topics):
00402         expanded_topics = []
00403         published_topics = [t[0] for t in rospy.get_published_topics()]
00404         for pattern in topics:
00405             exp = re.compile(pattern)
00406             expanded_topics += filter(lambda t: exp.match(t) is not None, published_topics)
00407         return expanded_topics
00408 
00409     def subscribe_topics(self, topics):
00410 
00411         # print "existing topics %s" % self.topics
00412 
00413         # print "subscribing to topics %s" % topics
00414 
00415         missing_topics = set()
00416         for topic in topics:
00417             if topic and topic[-1] == '/':
00418                 topic = topic[:-1]
00419 
00420             if topic in self.topics: continue
00421             if topic in self.exclude_already: continue
00422 
00423             do_continue = False
00424             for tre in self.exclude_regex:
00425                 if tre.match(topic):
00426                     print("*** IGNORING topic %s due to exclusion rule" % topic)
00427                     do_continue = True
00428                     self.exclude_already.append(topic)
00429                     break
00430             if do_continue: continue
00431 
00432             # although the collections is not strictly necessary, since MongoDB could handle
00433             # pure topic names as collection names and we could then use mongodb[topic], we want
00434             # to have names that go easier with the query tools, even though there is the theoretical
00435             # possibility of name clashes (hence the check)
00436             if self.mongodb_collection:
00437                 collname = self.mongodb_collection
00438             else:
00439                 collname = mongodb_store.util.topic_name_to_collection_name(topic)
00440                 if collname in self.collnames:
00441                     print("Two converted topic names clash: %s, ignoring topic %s"
00442                           % (collname, topic))
00443                     continue
00444             try:
00445                 print("Adding topic %s" % topic)
00446                 w = self.create_worker(len(self.workers), topic, collname)
00447                 self.workers.append(w)
00448                 self.collnames |= set([collname])
00449                 self.topics |= set([topic])
00450             except Exception, e:
00451                 print('Failed to subsribe to %s due to %s' % (topic, e))
00452                 missing_topics.add(topic)
00453 
00454         return missing_topics
00455 
00456 
00457     def create_worker(self, idnum, topic, collname):
00458         try:
00459             msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=False)
00460         except Exception, e:
00461             print('Topic %s not announced, cannot get type: %s' % (topic, e))
00462             raise
00463 
00464         if real_topic is None:
00465             raise rostopic.ROSTopicException('topic type was empty, probably not announced')
00466 
00467         w = None
00468         node_path = None
00469 
00470         if not self.no_specific and msg_class == tfMessage:
00471             print("DETECTED transform topic %s, using fast C++ logger" % topic)
00472             node_path = find_node(PACKAGE_NAME, "mongodb_log_tf")
00473             if not node_path:
00474                 print("FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
00475         elif not self.no_specific and msg_class == PointCloud:
00476             print("DETECTED point cloud topic %s, using fast C++ logger" % topic)
00477             node_path = find_node(PACKAGE_NAME, "mongodb_log_pcl")
00478             if not node_path:
00479                 print("FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
00480         elif not self.no_specific and msg_class == CompressedImage:
00481             print("DETECTED compressed image topic %s, using fast C++ logger" % topic)
00482             node_path = find_node(PACKAGE_NAME, "mongodb_log_cimg")
00483             if not node_path:
00484                 print("FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
00485         """
00486         elif msg_class == TriangleMesh:
00487             print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic)
00488             node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh")
00489             if not node_path:
00490                 print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)")
00491         """
00492 
00493 
00494 
00495         if node_path:
00496             w = SubprocessWorker(idnum, topic, collname,
00497                                  self.in_counter.count, self.out_counter.count,
00498                                  self.drop_counter.count, QUEUE_MAXSIZE,
00499                                  self.mongodb_host, self.mongodb_port, self.mongodb_name,
00500                                  self.nodename_prefix, node_path)
00501 
00502         if not w:
00503             print("GENERIC Python logger used for topic %s" % topic)
00504             w = WorkerProcess(idnum, topic, collname,
00505                               self.in_counter.count, self.out_counter.count,
00506                               self.drop_counter.count, QUEUE_MAXSIZE,
00507                               self.mongodb_host, self.mongodb_port, self.mongodb_name,
00508                               self.nodename_prefix)
00509 
00510         return w
00511 
00512 
00513     def run(self):
00514         looping_threshold = timedelta(0, STATS_LOOPTIME,  0)
00515 
00516         while not self.quit:
00517             started = datetime.now()
00518 
00519             # the following code makes sure we run once per STATS_LOOPTIME, taking
00520             # varying run-times and interrupted sleeps into account
00521             td = datetime.now() - started
00522             while not self.quit and td < looping_threshold:
00523                 sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
00524                 if sleeptime > 0: sleep(sleeptime)
00525                 td = datetime.now() - started
00526 
00527 
00528     def shutdown(self):
00529         self.quit = True
00530         if hasattr(self, "all_topics_timer"): self.all_topics_timer.cancel()
00531         for w in self.workers:
00532             #print("Shutdown %s" % name)
00533             w.shutdown()
00534 
00535 
00536     def start_all_topics_timer(self):
00537         if not self.all_topics or self.quit: return
00538         self.all_topics_timer = Timer(self.all_topics_interval, self.update_topics)
00539         self.all_topics_timer.start()
00540 
00541     def start_fill_in_topics_timer(self):
00542         if len(self.missing_topics) == 0 or self.quit: return
00543         self.fill_in_topics_timer = Timer(self.all_topics_interval, self.fill_in_topics)
00544         self.fill_in_topics_timer.start()
00545 
00546 
00547 
00548     def update_topics(self, restart=True):
00549         """
00550         Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
00551         """
00552         if not self.all_topics or self.quit: return
00553         ts = rospy.get_published_topics()
00554         topics = set([t for t, t_type in ts if t != "/rosout" and t != "/rosout_agg"])
00555         new_topics = topics - self.topics
00556         self.subscribe_topics(new_topics)
00557         if restart: self.start_all_topics_timer()
00558 
00559     def fill_in_topics(self, restart=True):
00560         """
00561         Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
00562         """
00563         if len(self.missing_topics) == 0 or self.quit: return
00564         self.missing_topics = self.subscribe_topics(self.missing_topics)
00565         if restart: self.start_fill_in_topics_timer()
00566 
00567 
00568     def get_memory_usage_for_pid(self, pid):
00569 
00570         scale = {'kB': 1024, 'mB': 1024 * 1024,
00571                  'KB': 1024, 'MB': 1024 * 1024}
00572         try:
00573             f = open("/proc/%d/status" % pid)
00574             t = f.read()
00575             f.close()
00576         except:
00577             return (0, 0, 0)
00578 
00579         if t == "": return (0, 0, 0)
00580 
00581         try:
00582             tmp   = t[t.index("VmSize:"):].split(None, 3)
00583             size  = int(tmp[1]) * scale[tmp[2]]
00584             tmp   = t[t.index("VmRSS:"):].split(None, 3)
00585             rss   = int(tmp[1]) * scale[tmp[2]]
00586             tmp   = t[t.index("VmStk:"):].split(None, 3)
00587             stack = int(tmp[1]) * scale[tmp[2]]
00588             return (size, rss, stack)
00589         except ValueError:
00590             return (0, 0, 0)
00591 
00592     def get_memory_usage(self):
00593         size, rss, stack = 0, 0, 0
00594         for w in self.workers:
00595             pmem = self.get_memory_usage_for_pid(w.process.pid)
00596             size  += pmem[0]
00597             rss   += pmem[1]
00598             stack += pmem[2]
00599         #print("Size: %d  RSS: %s  Stack: %s" % (size, rss, stack))
00600         return (size, rss, stack)
00601 
00602 
00603 def main(argv):
00604     parser = OptionParser()
00605     parser.usage += " [TOPICs...]"
00606     parser.add_option("--nodename-prefix", dest="nodename_prefix",
00607                       help="Prefix for worker node names", metavar="ROS_NODE_NAME",
00608                       default="")
00609     parser.add_option("--mongodb-host", dest="mongodb_host",
00610                       help="Hostname of MongoDB", metavar="HOST",
00611                       default=rospy.get_param("mongodb_host", "localhost"))
00612     parser.add_option("--mongodb-port", dest="mongodb_port",
00613                       help="Hostname of MongoDB", type="int",
00614                       metavar="PORT", default=rospy.get_param("mongodb_port", 27017))
00615     parser.add_option("--mongodb-name", dest="mongodb_name",
00616                       help="Name of DB in which to store values",
00617                       metavar="NAME", default=rospy.get_param("robot/database"))
00618     parser.add_option("--mongodb-collection", dest="mongodb_collection",
00619                       help="Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
00620                       metavar="COLLECTION", default=rospy.get_param("robot/name"))
00621     parser.add_option("-a", "--all-topics", dest="all_topics", default=False,
00622                       action="store_true",
00623                       help="Log all existing topics (still excludes /rosout, /rosout_agg)")
00624     parser.add_option("-e", "--regex", dest="treat_as_regex", default=False,
00625                       help="Log topics matching the follow regular expression",
00626                       action="store_true")
00627     parser.add_option("--all-topics-interval", dest="all_topics_interval", default=5,
00628                       help="Time in seconds between checks for new topics", type="int")
00629     parser.add_option("-x", "--exclude", dest="exclude",
00630                       help="Exclude topics matching REGEX, may be given multiple times",
00631                       action="append", type="string", metavar="REGEX", default=[])
00632     parser.add_option("--no-specific", dest="no_specific", default=False,
00633                       action="store_true", help="Disable specific loggers")
00634 
00635     (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
00636 
00637     if not options.all_topics and len(args) == 0:
00638         parser.print_help()
00639         return
00640 
00641     try:
00642         rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
00643     except socket.error:
00644         print("Failed to communicate with master")
00645 
00646     mongowriter = MongoWriter(topics=args,
00647                               treat_as_regex=options.treat_as_regex,
00648                               all_topics=options.all_topics,
00649                               all_topics_interval = options.all_topics_interval,
00650                               exclude_topics = options.exclude,
00651                               mongodb_host=options.mongodb_host,
00652                               mongodb_port=options.mongodb_port,
00653                               mongodb_name=options.mongodb_name,
00654                               mongodb_collection=options.mongodb_collection,
00655                               no_specific=options.no_specific,
00656                               nodename_prefix=options.nodename_prefix)
00657 
00658     def signal_handler(signal, frame):
00659         mongowriter.shutdown()
00660 
00661     signal.signal(signal.SIGTERM, signal_handler)
00662     signal.signal(signal.SIGINT, signal_handler)
00663 
00664     mongowriter.run()
00665 
00666 if __name__ == "__main__":
00667     main(sys.argv)


jsk_pr2_startup
Author(s):
autogenerated on Wed Sep 16 2015 10:32:17