00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 from __future__ import division, with_statement
00026
00027 PACKAGE_NAME='mongodb_log'
00028 NODE_NAME='mongodb_log'
00029 NODE_NAME_TEMPLATE='%smongodb_log'
00030 WORKER_NODE_NAME = "%smongodb_log_worker_%d_%s"
00031 QUEUE_MAXSIZE = 100
00032
00033
00034 import rospy
00035
00036
00037 import mongodb_store.util
00038
00039 import os
00040 import re
00041 import sys
00042 import time
00043 import pprint
00044 import string
00045 import signal
00046 import subprocess
00047 from threading import Thread, Timer
00048
00049 from Queue import Empty
00050 from optparse import OptionParser
00051 from tempfile import mktemp
00052 from datetime import datetime, timedelta
00053 from time import sleep
00054 from random import randint
00055 from tf.msg import tfMessage
00056 from sensor_msgs.msg import PointCloud, CompressedImage
00057 from roslib.packages import find_node
00058
00059
00060 use_setproctitle = True
00061 try:
00062 from setproctitle import setproctitle
00063 except ImportError:
00064 use_setproctitle = False
00065
00066
00067 use_processes = False
00068
00069
00070 from multiprocessing import Process, Lock, Condition, Queue, Value, current_process, Event
00071 import multiprocessing as mp
00072
00073
00074
00075
00076
00077
00078
00079 import genpy
00080 import rosgraph.masterapi
00081 import roslib.message
00082
00083 import rostopic
00084
00085
00086 from pymongo import SLOW_ONLY
00087 from pymongo.errors import InvalidDocument, InvalidStringData
00088
00089 MongoClient = mongodb_store.util.import_MongoClient()
00090
00091 BACKLOG_WARN_LIMIT = 100
00092 STATS_LOOPTIME = 10
00093 STATS_GRAPHTIME = 60
00094
00095 class Counter(object):
00096 def __init__(self, value = None, lock = True):
00097 self.count = value or Value('i', 0, lock=lock)
00098 self.mutex = Lock()
00099
00100 def increment(self, by = 1):
00101 with self.mutex: self.count.value += by
00102
00103 def value(self):
00104 with self.mutex: return self.count.value
00105
00106 class Barrier(object):
00107 def __init__(self, num_threads):
00108 self.num_threads = num_threads
00109 self.threads_left = Value('i', num_threads, lock=True)
00110 self.mutex = Lock()
00111 self.waitcond = Condition(self.mutex)
00112
00113 def wait(self):
00114 self.mutex.acquire()
00115 self.threads_left.value -= 1
00116 if self.threads_left.value == 0:
00117 self.threads_left.value = self.num_threads
00118 self.waitcond.notify_all()
00119 self.mutex.release()
00120 else:
00121 self.waitcond.wait()
00122 self.mutex.release()
00123
00124
00125 class WorkerProcess(object):
00126 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
00127 drop_counter_value, queue_maxsize,
00128 mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
00129 self.name = "WorkerProcess-%4d-%s" % (idnum, topic)
00130 self.id = idnum
00131 self.topic = topic
00132 self.collname = collname
00133 self.queue = Queue(queue_maxsize)
00134 self.out_counter = Counter(out_counter_value)
00135 self.in_counter = Counter(in_counter_value)
00136 self.drop_counter = Counter(drop_counter_value)
00137 self.worker_out_counter = Counter()
00138 self.worker_in_counter = Counter()
00139 self.worker_drop_counter = Counter()
00140 self.mongodb_host = mongodb_host
00141 self.mongodb_port = mongodb_port
00142 self.mongodb_name = mongodb_name
00143 self.nodename_prefix = nodename_prefix
00144 self.quit = Value('i', 0)
00145
00146
00147 self.process = Process(name=self.name, target=self.run)
00148
00149
00150 self.process.start()
00151
00152
00153 def init(self):
00154 global use_setproctitle
00155 if use_setproctitle:
00156 setproctitle("mongodb_log %s" % self.topic)
00157
00158 self.mongoconn = MongoClient(self.mongodb_host, self.mongodb_port)
00159 self.mongodb = self.mongoconn[self.mongodb_name]
00160 self.mongodb.set_profiling_level = SLOW_ONLY
00161
00162 self.collection = self.mongodb[self.collname]
00163 self.collection.count()
00164
00165 self.queue.cancel_join_thread()
00166
00167
00168 signal.signal(signal.SIGTERM, signal.SIG_DFL)
00169 signal.signal(signal.SIGINT, signal.SIG_DFL)
00170
00171 worker_node_name = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
00172
00173 rospy.init_node(worker_node_name, anonymous=False)
00174
00175 self.subscriber = None
00176 while not self.subscriber and not self.is_quit():
00177 try:
00178 msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.topic, blocking=True)
00179 self.subscriber = rospy.Subscriber(real_topic, msg_class, self.enqueue, self.topic)
00180 except rostopic.ROSTopicIOException:
00181 print("FAILED to subscribe, will keep trying %s" % self.name)
00182 time.sleep(randint(1,10))
00183 except rospy.ROSInitException:
00184 print("FAILED to initialize, will keep trying %s" % self.name)
00185 time.sleep(randint(1,10))
00186 self.subscriber = None
00187
00188 def run(self):
00189 self.init()
00190
00191 print("ACTIVE: %s" % self.name)
00192
00193
00194 self.dequeue()
00195
00196
00197
00198
00199 def is_quit(self):
00200 return self.quit.value == 1
00201
00202 def shutdown(self):
00203 if not self.is_quit():
00204
00205 self.quit.value = 1
00206 self.queue.put("shutdown")
00207 while not self.queue.empty(): sleep(0.1)
00208
00209 self.process.join()
00210 self.process.terminate()
00211
00212
00213
00214
00215 def qsize(self):
00216 return self.queue.qsize()
00217
00218 def enqueue(self, data, topic, current_time=None):
00219
00220 if not self.is_quit():
00221 if self.queue.full():
00222 try:
00223 self.queue.get_nowait()
00224 self.drop_counter.increment()
00225 self.worker_drop_counter.increment()
00226 except Empty:
00227 pass
00228
00229 self.queue.put((topic, data, rospy.get_time(), data._connection_header))
00230 self.in_counter.increment()
00231 self.worker_in_counter.increment()
00232
00233 def dequeue(self):
00234 while not self.is_quit():
00235 t = None
00236 try:
00237 t = self.queue.get(True)
00238 except IOError:
00239
00240
00241 self.quit.value = 1
00242 break
00243 if isinstance(t, tuple):
00244 self.out_counter.increment()
00245 self.worker_out_counter.increment()
00246 topic = t[0]
00247 msg = t[1]
00248 ctime = t[2]
00249 connection_header = t[3]
00250
00251 if isinstance(msg, rospy.Message):
00252
00253 try:
00254
00255
00256 meta = {}
00257
00258
00259 meta["topic"] = topic
00260
00261 if connection_header['latching'] == '1':
00262 meta['latch'] = True
00263 else:
00264 meta['latch'] = False
00265
00266 if ctime is not None:
00267 meta['inserted_at'] = datetime.utcfromtimestamp(ctime)
00268 else:
00269 meta['inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
00270
00271
00272 mongodb_store.util.store_message(self.collection, msg, meta)
00273
00274 except InvalidDocument, e:
00275 print("InvalidDocument " + current_process().name + "@" + topic +": \n")
00276 print e
00277 except InvalidStringData, e:
00278 print("InvalidStringData " + current_process().name + "@" + topic +": \n")
00279 print e
00280
00281 else:
00282
00283 self.quit.value = 1
00284
00285
00286
00287
00288 self.subscriber.unregister()
00289 self.subscriber = None
00290 while not self.queue.empty():
00291 t = self.queue.get_nowait()
00292 print("STOPPED: %s" % self.name)
00293
00294
00295 class SubprocessWorker(object):
00296 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
00297 drop_counter_value, queue_maxsize,
00298 mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
00299
00300 self.name = "SubprocessWorker-%4d-%s" % (idnum, topic)
00301 self.id = idnum
00302 self.topic = topic
00303 self.collname = collname
00304 self.queue = Queue(queue_maxsize)
00305 self.out_counter = Counter(out_counter_value)
00306 self.in_counter = Counter(in_counter_value)
00307 self.drop_counter = Counter(drop_counter_value)
00308 self.worker_out_counter = Counter()
00309 self.worker_in_counter = Counter()
00310 self.worker_drop_counter = Counter()
00311 self.mongodb_host = mongodb_host
00312 self.mongodb_port = mongodb_port
00313 self.mongodb_name = mongodb_name
00314 self.nodename_prefix = nodename_prefix
00315 self.quit = False
00316 self.qsize = 0
00317
00318 self.thread = Thread(name=self.name, target=self.run)
00319
00320 mongodb_host_port = "%s:%d" % (mongodb_host, mongodb_port)
00321 collection = "%s.%s" % (mongodb_name, collname)
00322 nodename = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
00323
00324 self.process = subprocess.Popen([cpp_logger[0], "-t", topic, "-n", nodename,
00325 "-m", mongodb_host_port, "-c", collection],
00326 stdout=subprocess.PIPE)
00327
00328 self.thread.start()
00329
00330 def qsize(self):
00331 return self.qsize
00332
00333 def run(self):
00334 while not self.quit:
00335 line = self.process.stdout.readline().rstrip()
00336 if line == "": continue
00337 arr = string.split(line, ":")
00338 self.in_counter.increment(int(arr[0]))
00339 self.out_counter.increment(int(arr[1]))
00340 self.drop_counter.increment(int(arr[2]))
00341 self.qsize = int(arr[3])
00342
00343 self.worker_in_counter.increment(int(arr[0]))
00344 self.worker_out_counter.increment(int(arr[1]))
00345 self.worker_drop_counter.increment(int(arr[2]))
00346
00347 def shutdown(self):
00348 self.quit = True
00349 self.process.kill()
00350 self.process.wait()
00351
00352
00353 class MongoWriter(object):
00354 def __init__(self, topics = [], treat_as_regex=False,
00355 all_topics = False, all_topics_interval = 5,
00356 exclude_topics = [],
00357 mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None,
00358 no_specific=False, nodename_prefix=""):
00359 self.all_topics = all_topics
00360 self.all_topics_interval = all_topics_interval
00361 self.exclude_topics = exclude_topics
00362 self.mongodb_host = mongodb_host
00363 self.mongodb_port = mongodb_port
00364 self.mongodb_name = mongodb_name
00365 self.mongodb_collection = mongodb_collection
00366 self.no_specific = no_specific
00367 self.nodename_prefix = nodename_prefix
00368 self.quit = False
00369 self.topics = set()
00370 self.collnames = set()
00371
00372 self.sep = "\n"
00373 self.in_counter = Counter()
00374 self.out_counter = Counter()
00375 self.drop_counter = Counter()
00376 self.workers = []
00377
00378 global use_setproctitle
00379 if use_setproctitle:
00380 setproctitle("mongodb_log MAIN")
00381
00382 self.exclude_regex = []
00383 for et in self.exclude_topics:
00384 self.exclude_regex.append(re.compile(et))
00385 self.exclude_already = []
00386
00387 if treat_as_regex:
00388 topics = self.expand_regex_to_topics(topics)
00389
00390 self.missing_topics = self.subscribe_topics(set(topics))
00391 self.fill_in_topics()
00392
00393
00394 if self.all_topics:
00395 print("All topics")
00396 self.ros_master = rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % self.nodename_prefix)
00397 self.update_topics(restart=False)
00398
00399 self.start_all_topics_timer()
00400
00401 def expand_regex_to_topics(self, topics):
00402 expanded_topics = []
00403 published_topics = [t[0] for t in rospy.get_published_topics()]
00404 for pattern in topics:
00405 exp = re.compile(pattern)
00406 expanded_topics += filter(lambda t: exp.match(t) is not None, published_topics)
00407 return expanded_topics
00408
00409 def subscribe_topics(self, topics):
00410
00411
00412
00413
00414
00415 missing_topics = set()
00416 for topic in topics:
00417 if topic and topic[-1] == '/':
00418 topic = topic[:-1]
00419
00420 if topic in self.topics: continue
00421 if topic in self.exclude_already: continue
00422
00423 do_continue = False
00424 for tre in self.exclude_regex:
00425 if tre.match(topic):
00426 print("*** IGNORING topic %s due to exclusion rule" % topic)
00427 do_continue = True
00428 self.exclude_already.append(topic)
00429 break
00430 if do_continue: continue
00431
00432
00433
00434
00435
00436 if self.mongodb_collection:
00437 collname = self.mongodb_collection
00438 else:
00439 collname = mongodb_store.util.topic_name_to_collection_name(topic)
00440 if collname in self.collnames:
00441 print("Two converted topic names clash: %s, ignoring topic %s"
00442 % (collname, topic))
00443 continue
00444 try:
00445 print("Adding topic %s" % topic)
00446 w = self.create_worker(len(self.workers), topic, collname)
00447 self.workers.append(w)
00448 self.collnames |= set([collname])
00449 self.topics |= set([topic])
00450 except Exception, e:
00451 print('Failed to subsribe to %s due to %s' % (topic, e))
00452 missing_topics.add(topic)
00453
00454 return missing_topics
00455
00456
00457 def create_worker(self, idnum, topic, collname):
00458 try:
00459 msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=False)
00460 except Exception, e:
00461 print('Topic %s not announced, cannot get type: %s' % (topic, e))
00462 raise
00463
00464 if real_topic is None:
00465 raise rostopic.ROSTopicException('topic type was empty, probably not announced')
00466
00467 w = None
00468 node_path = None
00469
00470 if not self.no_specific and msg_class == tfMessage:
00471 print("DETECTED transform topic %s, using fast C++ logger" % topic)
00472 node_path = find_node(PACKAGE_NAME, "mongodb_log_tf")
00473 if not node_path:
00474 print("FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
00475 elif not self.no_specific and msg_class == PointCloud:
00476 print("DETECTED point cloud topic %s, using fast C++ logger" % topic)
00477 node_path = find_node(PACKAGE_NAME, "mongodb_log_pcl")
00478 if not node_path:
00479 print("FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
00480 elif not self.no_specific and msg_class == CompressedImage:
00481 print("DETECTED compressed image topic %s, using fast C++ logger" % topic)
00482 node_path = find_node(PACKAGE_NAME, "mongodb_log_cimg")
00483 if not node_path:
00484 print("FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
00485 """
00486 elif msg_class == TriangleMesh:
00487 print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic)
00488 node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh")
00489 if not node_path:
00490 print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)")
00491 """
00492
00493
00494
00495 if node_path:
00496 w = SubprocessWorker(idnum, topic, collname,
00497 self.in_counter.count, self.out_counter.count,
00498 self.drop_counter.count, QUEUE_MAXSIZE,
00499 self.mongodb_host, self.mongodb_port, self.mongodb_name,
00500 self.nodename_prefix, node_path)
00501
00502 if not w:
00503 print("GENERIC Python logger used for topic %s" % topic)
00504 w = WorkerProcess(idnum, topic, collname,
00505 self.in_counter.count, self.out_counter.count,
00506 self.drop_counter.count, QUEUE_MAXSIZE,
00507 self.mongodb_host, self.mongodb_port, self.mongodb_name,
00508 self.nodename_prefix)
00509
00510 return w
00511
00512
00513 def run(self):
00514 looping_threshold = timedelta(0, STATS_LOOPTIME, 0)
00515
00516 while not self.quit:
00517 started = datetime.now()
00518
00519
00520
00521 td = datetime.now() - started
00522 while not self.quit and td < looping_threshold:
00523 sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
00524 if sleeptime > 0: sleep(sleeptime)
00525 td = datetime.now() - started
00526
00527
00528 def shutdown(self):
00529 self.quit = True
00530 if hasattr(self, "all_topics_timer"): self.all_topics_timer.cancel()
00531 for w in self.workers:
00532
00533 w.shutdown()
00534
00535
00536 def start_all_topics_timer(self):
00537 if not self.all_topics or self.quit: return
00538 self.all_topics_timer = Timer(self.all_topics_interval, self.update_topics)
00539 self.all_topics_timer.start()
00540
00541 def start_fill_in_topics_timer(self):
00542 if len(self.missing_topics) == 0 or self.quit: return
00543 self.fill_in_topics_timer = Timer(self.all_topics_interval, self.fill_in_topics)
00544 self.fill_in_topics_timer.start()
00545
00546
00547
00548 def update_topics(self, restart=True):
00549 """
00550 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
00551 """
00552 if not self.all_topics or self.quit: return
00553 ts = rospy.get_published_topics()
00554 topics = set([t for t, t_type in ts if t != "/rosout" and t != "/rosout_agg"])
00555 new_topics = topics - self.topics
00556 self.subscribe_topics(new_topics)
00557 if restart: self.start_all_topics_timer()
00558
00559 def fill_in_topics(self, restart=True):
00560 """
00561 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
00562 """
00563 if len(self.missing_topics) == 0 or self.quit: return
00564 self.missing_topics = self.subscribe_topics(self.missing_topics)
00565 if restart: self.start_fill_in_topics_timer()
00566
00567
00568 def get_memory_usage_for_pid(self, pid):
00569
00570 scale = {'kB': 1024, 'mB': 1024 * 1024,
00571 'KB': 1024, 'MB': 1024 * 1024}
00572 try:
00573 f = open("/proc/%d/status" % pid)
00574 t = f.read()
00575 f.close()
00576 except:
00577 return (0, 0, 0)
00578
00579 if t == "": return (0, 0, 0)
00580
00581 try:
00582 tmp = t[t.index("VmSize:"):].split(None, 3)
00583 size = int(tmp[1]) * scale[tmp[2]]
00584 tmp = t[t.index("VmRSS:"):].split(None, 3)
00585 rss = int(tmp[1]) * scale[tmp[2]]
00586 tmp = t[t.index("VmStk:"):].split(None, 3)
00587 stack = int(tmp[1]) * scale[tmp[2]]
00588 return (size, rss, stack)
00589 except ValueError:
00590 return (0, 0, 0)
00591
00592 def get_memory_usage(self):
00593 size, rss, stack = 0, 0, 0
00594 for w in self.workers:
00595 pmem = self.get_memory_usage_for_pid(w.process.pid)
00596 size += pmem[0]
00597 rss += pmem[1]
00598 stack += pmem[2]
00599
00600 return (size, rss, stack)
00601
00602
00603 def main(argv):
00604 parser = OptionParser()
00605 parser.usage += " [TOPICs...]"
00606 parser.add_option("--nodename-prefix", dest="nodename_prefix",
00607 help="Prefix for worker node names", metavar="ROS_NODE_NAME",
00608 default="")
00609 parser.add_option("--mongodb-host", dest="mongodb_host",
00610 help="Hostname of MongoDB", metavar="HOST",
00611 default=rospy.get_param("mongodb_host", "localhost"))
00612 parser.add_option("--mongodb-port", dest="mongodb_port",
00613 help="Hostname of MongoDB", type="int",
00614 metavar="PORT", default=rospy.get_param("mongodb_port", 27017))
00615 parser.add_option("--mongodb-name", dest="mongodb_name",
00616 help="Name of DB in which to store values",
00617 metavar="NAME", default=rospy.get_param("robot/database"))
00618 parser.add_option("--mongodb-collection", dest="mongodb_collection",
00619 help="Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
00620 metavar="COLLECTION", default=rospy.get_param("robot/name"))
00621 parser.add_option("-a", "--all-topics", dest="all_topics", default=False,
00622 action="store_true",
00623 help="Log all existing topics (still excludes /rosout, /rosout_agg)")
00624 parser.add_option("-e", "--regex", dest="treat_as_regex", default=False,
00625 help="Log topics matching the follow regular expression",
00626 action="store_true")
00627 parser.add_option("--all-topics-interval", dest="all_topics_interval", default=5,
00628 help="Time in seconds between checks for new topics", type="int")
00629 parser.add_option("-x", "--exclude", dest="exclude",
00630 help="Exclude topics matching REGEX, may be given multiple times",
00631 action="append", type="string", metavar="REGEX", default=[])
00632 parser.add_option("--no-specific", dest="no_specific", default=False,
00633 action="store_true", help="Disable specific loggers")
00634
00635 (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
00636
00637 if not options.all_topics and len(args) == 0:
00638 parser.print_help()
00639 return
00640
00641 try:
00642 rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
00643 except socket.error:
00644 print("Failed to communicate with master")
00645
00646 mongowriter = MongoWriter(topics=args,
00647 treat_as_regex=options.treat_as_regex,
00648 all_topics=options.all_topics,
00649 all_topics_interval = options.all_topics_interval,
00650 exclude_topics = options.exclude,
00651 mongodb_host=options.mongodb_host,
00652 mongodb_port=options.mongodb_port,
00653 mongodb_name=options.mongodb_name,
00654 mongodb_collection=options.mongodb_collection,
00655 no_specific=options.no_specific,
00656 nodename_prefix=options.nodename_prefix)
00657
00658 def signal_handler(signal, frame):
00659 mongowriter.shutdown()
00660
00661 signal.signal(signal.SIGTERM, signal_handler)
00662 signal.signal(signal.SIGINT, signal_handler)
00663
00664 mongowriter.run()
00665
00666 if __name__ == "__main__":
00667 main(sys.argv)