25 from __future__
import division, with_statement
27 PACKAGE_NAME=
'mongodb_log'
28 NODE_NAME=
'mongodb_log'
29 NODE_NAME_TEMPLATE=
'%smongodb_log'
30 WORKER_NODE_NAME =
"%smongodb_log_worker_%d_%s"
37 import mongodb_store.util
47 from threading
import Thread, Timer
50 from queue
import Empty
52 from Queue
import Empty
53 from optparse
import OptionParser
54 from tempfile
import mktemp
55 from datetime
import datetime, timedelta
56 from time
import sleep
57 from random
import randint
58 from tf.msg
import tfMessage
59 from sensor_msgs.msg
import PointCloud, CompressedImage
60 from roslib.packages
import find_node
63 use_setproctitle =
True
65 from setproctitle
import setproctitle
67 use_setproctitle =
False
73 from multiprocessing
import Process, Lock, Condition, Queue, Value, current_process, Event
74 import multiprocessing
as mp
83 import rosgraph.masterapi
89 from pymongo
import SLOW_ONLY
90 from pymongo.errors
import InvalidDocument, InvalidStringData
92 MongoClient = mongodb_store.util.import_MongoClient()
94 BACKLOG_WARN_LIMIT = 100
100 self.
count = value
or Value(
'i', 0, lock=lock)
129 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
130 drop_counter_value, queue_maxsize,
131 mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
132 self.
name =
"WorkerProcess-%4d-%s" % (idnum, topic)
147 self.
quit = Value(
'i', 0)
157 global use_setproctitle
159 setproctitle(
"mongodb_log %s" % self.
topic)
163 self.
mongodb.set_profiling_level = SLOW_ONLY
168 self.
queue.cancel_join_thread()
171 signal.signal(signal.SIGTERM, signal.SIG_DFL)
172 signal.signal(signal.SIGINT, signal.SIG_DFL)
176 rospy.init_node(worker_node_name, anonymous=
False)
181 msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.
topic, blocking=
True)
183 except rostopic.ROSTopicIOException:
184 print(
"FAILED to subscribe, will keep trying %s" % self.
name)
185 time.sleep(randint(1,10))
186 except rospy.ROSInitException:
187 print(
"FAILED to initialize, will keep trying %s" % self.
name)
188 time.sleep(randint(1,10))
194 print(
"ACTIVE: %s" % self.
name)
203 return self.
quit.value == 1
209 self.
queue.put(
"shutdown")
210 while not self.
queue.empty(): sleep(0.1)
221 def enqueue(self, data, topic, current_time=None):
224 if self.
queue.full():
226 self.
queue.get_nowait()
232 self.
queue.put((topic, data, rospy.get_time(), data._connection_header))
240 t = self.
queue.get(
True)
246 if isinstance(t, tuple):
252 connection_header = t[3]
254 if isinstance(msg, rospy.Message):
262 meta[
"topic"] = topic
264 if connection_header[
'latching'] ==
'1':
267 meta[
'latch'] =
False
269 if ctime
is not None:
270 meta[
'inserted_at'] = datetime.utcfromtimestamp(ctime)
272 meta[
'inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
275 mongodb_store.util.store_message(self.
collection, msg, meta)
277 except InvalidDocument
as e:
278 print(
"InvalidDocument " + current_process().name +
"@" + topic +
": \n")
280 except InvalidStringData
as e:
281 print(
"InvalidStringData " + current_process().name +
"@" + topic +
": \n")
293 while not self.
queue.empty():
294 t = self.
queue.get_nowait()
295 print(
"STOPPED: %s" % self.
name)
299 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
300 drop_counter_value, queue_maxsize,
301 mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
303 self.
name =
"SubprocessWorker-%4d-%s" % (idnum, topic)
323 mongodb_host_port =
"%s:%d" % (mongodb_host, mongodb_port)
324 collection =
"%s.%s" % (mongodb_name, collname)
327 self.
process = subprocess.Popen([cpp_logger[0],
"-t", topic,
"-n", nodename,
328 "-m", mongodb_host_port,
"-c", collection],
329 stdout=subprocess.PIPE)
338 line = self.
process.stdout.readline().rstrip()
339 if line ==
"":
continue
340 arr = string.split(line,
":")
344 self.
qsize = int(arr[3])
357 def __init__(self, topics = [], treat_as_regex=False,
358 all_topics = False, all_topics_interval = 5,
360 mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None,
361 no_specific=False, nodename_prefix=""):
381 global use_setproctitle
383 setproctitle(
"mongodb_log MAIN")
406 published_topics = [t[0]
for t
in rospy.get_published_topics()]
407 for pattern
in topics:
408 exp = re.compile(pattern)
409 expanded_topics += filter(
lambda t: exp.match(t)
is not None, published_topics)
410 return expanded_topics
418 missing_topics = set()
420 if topic
and topic[-1] ==
'/':
423 if topic
in self.
topics:
continue
429 print(
"*** IGNORING topic %s due to exclusion rule" % topic)
433 if do_continue:
continue
442 collname = mongodb_store.util.topic_name_to_collection_name(topic)
444 print(
"Two converted topic names clash: %s, ignoring topic %s"
448 print(
"Adding topic %s" % topic)
452 self.
topics |= set([topic])
453 except Exception
as e:
454 print(
'Failed to subscribe to %s due to %s' % (topic, e))
455 missing_topics.add(topic)
457 return missing_topics
462 msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=
False)
463 except Exception
as e:
464 print(
'Topic %s not announced, cannot get type: %s' % (topic, e))
467 if real_topic
is None:
468 raise rostopic.ROSTopicException(
'topic type was empty, probably not announced')
473 if not self.
no_specific and msg_class == tfMessage:
474 print(
"DETECTED transform topic %s, using fast C++ logger" % topic)
475 node_path = find_node(PACKAGE_NAME,
"mongodb_log_tf")
477 print(
"FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
478 elif not self.
no_specific and msg_class == PointCloud:
479 print(
"DETECTED point cloud topic %s, using fast C++ logger" % topic)
480 node_path = find_node(PACKAGE_NAME,
"mongodb_log_pcl")
482 print(
"FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
483 elif not self.
no_specific and msg_class == CompressedImage:
484 print(
"DETECTED compressed image topic %s, using fast C++ logger" % topic)
485 node_path = find_node(PACKAGE_NAME,
"mongodb_log_cimg")
487 print(
"FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
489 elif msg_class == TriangleMesh:
490 print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic)
491 node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh")
493 print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)")
506 print(
"GENERIC Python logger used for topic %s" % topic)
517 looping_threshold = timedelta(0, STATS_LOOPTIME, 0)
520 started = datetime.now()
524 td = datetime.now() - started
525 while not self.
quit and td < looping_threshold:
526 sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
527 if sleeptime > 0: sleep(sleeptime)
528 td = datetime.now() - started
553 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
556 ts = rospy.get_published_topics()
557 topics = set([t
for t, t_type
in ts
if t !=
"/rosout" and t !=
"/rosout_agg"])
558 new_topics = topics - self.
topics
564 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
573 scale = {
'kB': 1024,
'mB': 1024 * 1024,
574 'KB': 1024,
'MB': 1024 * 1024}
576 f = open(
"/proc/%d/status" % pid)
582 if t ==
"":
return (0, 0, 0)
585 tmp = t[t.index(
"VmSize:"):].split(
None, 3)
586 size = int(tmp[1]) * scale[tmp[2]]
587 tmp = t[t.index(
"VmRSS:"):].split(
None, 3)
588 rss = int(tmp[1]) * scale[tmp[2]]
589 tmp = t[t.index(
"VmStk:"):].split(
None, 3)
590 stack = int(tmp[1]) * scale[tmp[2]]
591 return (size, rss, stack)
596 size, rss, stack = 0, 0, 0
603 return (size, rss, stack)
607 parser = OptionParser()
608 parser.usage +=
" [TOPICs...]"
609 parser.add_option(
"--nodename-prefix", dest=
"nodename_prefix",
610 help=
"Prefix for worker node names", metavar=
"ROS_NODE_NAME",
612 parser.add_option(
"--mongodb-host", dest=
"mongodb_host",
613 help=
"Hostname of MongoDB", metavar=
"HOST",
614 default=rospy.get_param(
"mongodb_host",
"localhost"))
615 parser.add_option(
"--mongodb-port", dest=
"mongodb_port",
616 help=
"Hostname of MongoDB", type=
"int",
617 metavar=
"PORT", default=rospy.get_param(
"mongodb_port", 27017))
618 parser.add_option(
"--mongodb-name", dest=
"mongodb_name",
619 help=
"Name of DB in which to store values",
620 metavar=
"NAME", default=
"roslog")
621 parser.add_option(
"--mongodb-collection", dest=
"mongodb_collection",
622 help=
"Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
623 metavar=
"COLLECTION", default=
None)
624 parser.add_option(
"-a",
"--all-topics", dest=
"all_topics", default=
False,
626 help=
"Log all existing topics (still excludes /rosout, /rosout_agg)")
627 parser.add_option(
"-e",
"--regex", dest=
"treat_as_regex", default=
False,
628 help=
"Log topics matching the follow regular expression",
630 parser.add_option(
"--all-topics-interval", dest=
"all_topics_interval", default=5,
631 help=
"Time in seconds between checks for new topics", type=
"int")
632 parser.add_option(
"-x",
"--exclude", dest=
"exclude",
633 help=
"Exclude topics matching REGEX, may be given multiple times",
634 action=
"append", type=
"string", metavar=
"REGEX", default=[])
635 parser.add_option(
"--no-specific", dest=
"no_specific", default=
False,
636 action=
"store_true", help=
"Disable specific loggers")
638 (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
640 if not options.all_topics
and len(args) == 0:
645 rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
647 print(
"Failed to communicate with master")
650 treat_as_regex=options.treat_as_regex,
651 all_topics=options.all_topics,
652 all_topics_interval = options.all_topics_interval,
653 exclude_topics = options.exclude,
654 mongodb_host=options.mongodb_host,
655 mongodb_port=options.mongodb_port,
656 mongodb_name=options.mongodb_name,
657 mongodb_collection=options.mongodb_collection,
658 no_specific=options.no_specific,
659 nodename_prefix=options.nodename_prefix)
661 def signal_handler(signal, frame):
662 mongowriter.shutdown()
664 signal.signal(signal.SIGTERM, signal_handler)
665 signal.signal(signal.SIGINT, signal_handler)
669 if __name__ ==
"__main__":