25 from __future__
import division, with_statement
27 PACKAGE_NAME=
'mongodb_log' 28 NODE_NAME=
'mongodb_log' 29 NODE_NAME_TEMPLATE=
'%smongodb_log' 30 WORKER_NODE_NAME =
"%smongodb_log_worker_%d_%s" 37 import mongodb_store.util
47 from threading
import Thread, Timer
49 from Queue
import Empty
50 from optparse
import OptionParser
51 from tempfile
import mktemp
52 from datetime
import datetime, timedelta
53 from time
import sleep
54 from random
import randint
55 from tf.msg
import tfMessage
56 from sensor_msgs.msg
import PointCloud, CompressedImage
57 from roslib.packages
import find_node
60 use_setproctitle =
True 62 from setproctitle
import setproctitle
64 use_setproctitle =
False 70 from multiprocessing
import Process, Lock, Condition, Queue, Value, current_process, Event
71 import multiprocessing
as mp
80 import rosgraph.masterapi
86 from pymongo
import SLOW_ONLY
87 from pymongo.errors
import InvalidDocument, InvalidStringData
89 MongoClient = mongodb_store.util.import_MongoClient()
91 BACKLOG_WARN_LIMIT = 100
97 self.
count = value
or Value(
'i', 0, lock=lock)
101 with self.
mutex: self.count.value += by
104 with self.
mutex:
return self.count.value
115 self.threads_left.value -= 1
116 if self.threads_left.value == 0:
118 self.waitcond.notify_all()
126 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
127 drop_counter_value, queue_maxsize,
128 mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
129 self.
name =
"WorkerProcess-%4d-%s" % (idnum, topic)
154 global use_setproctitle
156 setproctitle(
"mongodb_log %s" % self.
topic)
160 self.mongodb.set_profiling_level = SLOW_ONLY
163 self.collection.count()
165 self.queue.cancel_join_thread()
168 signal.signal(signal.SIGTERM, signal.SIG_DFL)
169 signal.signal(signal.SIGINT, signal.SIG_DFL)
173 rospy.init_node(worker_node_name, anonymous=
False)
178 msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.
topic, blocking=
True)
180 except rostopic.ROSTopicIOException:
181 print(
"FAILED to subscribe, will keep trying %s" % self.
name)
182 time.sleep(randint(1,10))
183 except rospy.ROSInitException:
184 print(
"FAILED to initialize, will keep trying %s" % self.
name)
185 time.sleep(randint(1,10))
191 print(
"ACTIVE: %s" % self.
name)
200 return self.quit.value == 1
206 self.queue.put(
"shutdown")
207 while not self.queue.empty(): sleep(0.1)
210 self.process.terminate()
216 return self.queue.qsize()
218 def enqueue(self, data, topic, current_time=None):
221 if self.queue.full():
223 self.queue.get_nowait()
224 self.drop_counter.increment()
225 self.worker_drop_counter.increment()
229 self.queue.put((topic, data, rospy.get_time(), data._connection_header))
230 self.in_counter.increment()
231 self.worker_in_counter.increment()
237 t = self.queue.get(
True)
243 if isinstance(t, tuple):
244 self.out_counter.increment()
245 self.worker_out_counter.increment()
249 connection_header = t[3]
251 if isinstance(msg, rospy.Message):
259 meta[
"topic"] = topic
261 if connection_header[
'latching'] ==
'1':
264 meta[
'latch'] =
False 266 if ctime
is not None:
267 meta[
'inserted_at'] = datetime.utcfromtimestamp(ctime)
269 meta[
'inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
272 mongodb_store.util.store_message(self.
collection, msg, meta)
274 except InvalidDocument, e:
275 print(
"InvalidDocument " + current_process().name +
"@" + topic +
": \n")
277 except InvalidStringData, e:
278 print(
"InvalidStringData " + current_process().name +
"@" + topic +
": \n")
288 self.subscriber.unregister()
290 while not self.queue.empty():
291 t = self.queue.get_nowait()
292 print(
"STOPPED: %s" % self.
name)
296 def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
297 drop_counter_value, queue_maxsize,
298 mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
300 self.
name =
"SubprocessWorker-%4d-%s" % (idnum, topic)
320 mongodb_host_port =
"%s:%d" % (mongodb_host, mongodb_port)
321 collection =
"%s.%s" % (mongodb_name, collname)
324 self.
process = subprocess.Popen([cpp_logger[0],
"-t", topic,
"-n", nodename,
325 "-m", mongodb_host_port,
"-c", collection],
326 stdout=subprocess.PIPE)
335 line = self.process.stdout.readline().rstrip()
336 if line ==
"":
continue 337 arr = string.split(line,
":")
338 self.in_counter.increment(int(arr[0]))
339 self.out_counter.increment(int(arr[1]))
340 self.drop_counter.increment(int(arr[2]))
341 self.
qsize = int(arr[3])
343 self.worker_in_counter.increment(int(arr[0]))
344 self.worker_out_counter.increment(int(arr[1]))
345 self.worker_drop_counter.increment(int(arr[2]))
354 def __init__(self, topics = [], treat_as_regex=False,
355 all_topics =
False, all_topics_interval = 5,
357 mongodb_host=
None, mongodb_port=
None, mongodb_name=
"roslog", mongodb_collection=
None,
358 no_specific=
False, nodename_prefix=
""):
378 global use_setproctitle
380 setproctitle(
"mongodb_log MAIN")
384 self.exclude_regex.append(re.compile(et))
403 published_topics = [t[0]
for t
in rospy.get_published_topics()]
404 for pattern
in topics:
405 exp = re.compile(pattern)
406 expanded_topics += filter(
lambda t: exp.match(t)
is not None, published_topics)
407 return expanded_topics
415 missing_topics = set()
417 if topic
and topic[-1] ==
'/':
420 if topic
in self.
topics:
continue 426 print(
"*** IGNORING topic %s due to exclusion rule" % topic)
428 self.exclude_already.append(topic)
430 if do_continue:
continue 439 collname = mongodb_store.util.topic_name_to_collection_name(topic)
441 print(
"Two converted topic names clash: %s, ignoring topic %s" 445 print(
"Adding topic %s" % topic)
447 self.workers.append(w)
449 self.
topics |= set([topic])
451 print(
'Failed to subscribe to %s due to %s' % (topic, e))
452 missing_topics.add(topic)
454 return missing_topics
459 msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=
False)
461 print(
'Topic %s not announced, cannot get type: %s' % (topic, e))
464 if real_topic
is None:
465 raise rostopic.ROSTopicException(
'topic type was empty, probably not announced')
470 if not self.
no_specific and msg_class == tfMessage:
471 print(
"DETECTED transform topic %s, using fast C++ logger" % topic)
472 node_path = find_node(PACKAGE_NAME,
"mongodb_log_tf")
474 print(
"FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
475 elif not self.
no_specific and msg_class == PointCloud:
476 print(
"DETECTED point cloud topic %s, using fast C++ logger" % topic)
477 node_path = find_node(PACKAGE_NAME,
"mongodb_log_pcl")
479 print(
"FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
480 elif not self.
no_specific and msg_class == CompressedImage:
481 print(
"DETECTED compressed image topic %s, using fast C++ logger" % topic)
482 node_path = find_node(PACKAGE_NAME,
"mongodb_log_cimg")
484 print(
"FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
486 elif msg_class == TriangleMesh: 487 print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic) 488 node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh") 490 print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)") 497 self.in_counter.count, self.out_counter.count,
498 self.drop_counter.count, QUEUE_MAXSIZE,
503 print(
"GENERIC Python logger used for topic %s" % topic)
505 self.in_counter.count, self.out_counter.count,
506 self.drop_counter.count, QUEUE_MAXSIZE,
514 looping_threshold = timedelta(0, STATS_LOOPTIME, 0)
517 started = datetime.now()
521 td = datetime.now() - started
522 while not self.
quit and td < looping_threshold:
523 sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
524 if sleeptime > 0: sleep(sleeptime)
525 td = datetime.now() - started
530 if hasattr(self,
"all_topics_timer"): self.all_topics_timer.cancel()
539 self.all_topics_timer.start()
544 self.fill_in_topics_timer.start()
550 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given). 553 ts = rospy.get_published_topics()
554 topics = set([t
for t, t_type
in ts
if t !=
"/rosout" and t !=
"/rosout_agg"])
555 new_topics = topics - self.
topics 561 Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given). 570 scale = {
'kB': 1024,
'mB': 1024 * 1024,
571 'KB': 1024,
'MB': 1024 * 1024}
573 f = open(
"/proc/%d/status" % pid)
579 if t ==
"":
return (0, 0, 0)
582 tmp = t[t.index(
"VmSize:"):].split(
None, 3)
583 size = int(tmp[1]) * scale[tmp[2]]
584 tmp = t[t.index(
"VmRSS:"):].split(
None, 3)
585 rss = int(tmp[1]) * scale[tmp[2]]
586 tmp = t[t.index(
"VmStk:"):].split(
None, 3)
587 stack = int(tmp[1]) * scale[tmp[2]]
588 return (size, rss, stack)
593 size, rss, stack = 0, 0, 0
600 return (size, rss, stack)
604 parser = OptionParser()
605 parser.usage +=
" [TOPICs...]" 606 parser.add_option(
"--nodename-prefix", dest=
"nodename_prefix",
607 help=
"Prefix for worker node names", metavar=
"ROS_NODE_NAME",
609 parser.add_option(
"--mongodb-host", dest=
"mongodb_host",
610 help=
"Hostname of MongoDB", metavar=
"HOST",
611 default=rospy.get_param(
"mongodb_host",
"localhost"))
612 parser.add_option(
"--mongodb-port", dest=
"mongodb_port",
613 help=
"Hostname of MongoDB", type=
"int",
614 metavar=
"PORT", default=rospy.get_param(
"mongodb_port", 27017))
615 parser.add_option(
"--mongodb-name", dest=
"mongodb_name",
616 help=
"Name of DB in which to store values",
617 metavar=
"NAME", default=
"roslog")
618 parser.add_option(
"--mongodb-collection", dest=
"mongodb_collection",
619 help=
"Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
620 metavar=
"COLLECTION", default=
None)
621 parser.add_option(
"-a",
"--all-topics", dest=
"all_topics", default=
False,
623 help=
"Log all existing topics (still excludes /rosout, /rosout_agg)")
624 parser.add_option(
"-e",
"--regex", dest=
"treat_as_regex", default=
False,
625 help=
"Log topics matching the follow regular expression",
627 parser.add_option(
"--all-topics-interval", dest=
"all_topics_interval", default=5,
628 help=
"Time in seconds between checks for new topics", type=
"int")
629 parser.add_option(
"-x",
"--exclude", dest=
"exclude",
630 help=
"Exclude topics matching REGEX, may be given multiple times",
631 action=
"append", type=
"string", metavar=
"REGEX", default=[])
632 parser.add_option(
"--no-specific", dest=
"no_specific", default=
False,
633 action=
"store_true", help=
"Disable specific loggers")
635 (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
637 if not options.all_topics
and len(args) == 0:
642 rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
644 print(
"Failed to communicate with master")
647 treat_as_regex=options.treat_as_regex,
648 all_topics=options.all_topics,
649 all_topics_interval = options.all_topics_interval,
650 exclude_topics = options.exclude,
651 mongodb_host=options.mongodb_host,
652 mongodb_port=options.mongodb_port,
653 mongodb_name=options.mongodb_name,
654 mongodb_collection=options.mongodb_collection,
655 no_specific=options.no_specific,
656 nodename_prefix=options.nodename_prefix)
658 def signal_handler(signal, frame):
659 mongowriter.shutdown()
661 signal.signal(signal.SIGTERM, signal_handler)
662 signal.signal(signal.SIGINT, signal_handler)
666 if __name__ ==
"__main__":
def create_worker(self, idnum, topic, collname)
def get_memory_usage_for_pid(self, pid)
def fill_in_topics(self, restart=True)
def expand_regex_to_topics(self, topics)
def __init__(self, topics=[], treat_as_regex=False, all_topics=False, all_topics_interval=5, exclude_topics=[], mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None, no_specific=False, nodename_prefix="")
def start_all_topics_timer(self)
def start_fill_in_topics_timer(self)
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger)
def enqueue(self, data, topic, current_time=None)
def increment(self, by=1)
def __init__(self, value=None, lock=True)
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix)
def update_topics(self, restart=True)
def subscribe_topics(self, topics)
def __init__(self, num_threads)
def get_memory_usage(self)