mongodb_log.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 
3 
11 
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU Library General Public License for more details.
21 #
22 # Read the full text in the LICENSE.GPL file in the doc directory.
23 
24 # make sure we aren't using floor division
25 from __future__ import division, with_statement
26 
27 PACKAGE_NAME='mongodb_log'
28 NODE_NAME='mongodb_log'
29 NODE_NAME_TEMPLATE='%smongodb_log'
30 WORKER_NODE_NAME = "%smongodb_log_worker_%d_%s"
31 QUEUE_MAXSIZE = 100
32 
33 # import roslib; roslib.load_manifest(PACKAGE_NAME)
34 import rospy
35 
36 # for msg_to_document
37 import mongodb_store.util
38 
39 import os
40 import re
41 import sys
42 import time
43 import pprint
44 import string
45 import signal
46 import subprocess
47 from threading import Thread, Timer
48 
49 try:
50  from queue import Empty
51 except ImportError:
52  from Queue import Empty
53 from optparse import OptionParser
54 from tempfile import mktemp
55 from datetime import datetime, timedelta
56 from time import sleep
57 from random import randint
58 from tf.msg import tfMessage
59 from sensor_msgs.msg import PointCloud, CompressedImage
60 from roslib.packages import find_node
61 #from rviz_intel.msg import TriangleMesh
62 
63 use_setproctitle = True
64 try:
65  from setproctitle import setproctitle
66 except ImportError:
67  use_setproctitle = False
68 
69 
70 use_processes = False
71 
72 # if use_processes:
73 from multiprocessing import Process, Lock, Condition, Queue, Value, current_process, Event
74 import multiprocessing as mp
75 # else:
76  # from threading import Lock, Condition, Event
77  # from Queue import Queue
78  # def Value(t, val, lock=None):
79  # return val
80 
81 
82 import genpy
83 import rosgraph.masterapi
84 import roslib.message
85 #from rospy import Time, Duration
86 import rostopic
87 
88 
89 from pymongo import SLOW_ONLY
90 from pymongo.errors import InvalidDocument, InvalidStringData
91 
92 MongoClient = mongodb_store.util.import_MongoClient()
93 
94 BACKLOG_WARN_LIMIT = 100
95 STATS_LOOPTIME = 10
96 STATS_GRAPHTIME = 60
97 
98 class Counter(object):
99  def __init__(self, value = None, lock = True):
100  self.count = value or Value('i', 0, lock=lock)
101  self.mutex = Lock()
102 
103  def increment(self, by = 1):
104  with self.mutex: self.count.value += by
105 
106  def value(self):
107  with self.mutex: return self.count.value
108 
109 class Barrier(object):
110  def __init__(self, num_threads):
111  self.num_threads = num_threads
112  self.threads_left = Value('i', num_threads, lock=True)
113  self.mutex = Lock()
114  self.waitcond = Condition(self.mutex)
115 
116  def wait(self):
117  self.mutex.acquire()
118  self.threads_left.value -= 1
119  if self.threads_left.value == 0:
120  self.threads_left.value = self.num_threads
121  self.waitcond.notify_all()
122  self.mutex.release()
123  else:
124  self.waitcond.wait()
125  self.mutex.release()
126 
127 
128 class WorkerProcess(object):
129  def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
130  drop_counter_value, queue_maxsize,
131  mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
132  self.name = "WorkerProcess-%4d-%s" % (idnum, topic)
133  self.id = idnum
134  self.topic = topic
135  self.collname = collname
136  self.queue = Queue(queue_maxsize)
137  self.out_counter = Counter(out_counter_value)
138  self.in_counter = Counter(in_counter_value)
139  self.drop_counter = Counter(drop_counter_value)
143  self.mongodb_host = mongodb_host
144  self.mongodb_port = mongodb_port
145  self.mongodb_name = mongodb_name
146  self.nodename_prefix = nodename_prefix
147  self.quit = Value('i', 0)
148 
149  # print "Creating process %s" % self.name
150  self.process = Process(name=self.name, target=self.run)
151  # self.process = Thread(name=self.name, target=self.run)
152  # print "created %s" % self.process
153  self.process.start()
154  # print "started %s" % self.process
155 
156  def init(self):
157  global use_setproctitle
158  if use_setproctitle:
159  setproctitle("mongodb_log %s" % self.topic)
160 
162  self.mongodb = self.mongoconn[self.mongodb_name]
163  self.mongodb.set_profiling_level = SLOW_ONLY
164 
165  self.collection = self.mongodb[self.collname]
166  self.collection.count()
167 
168  self.queue.cancel_join_thread()
169 
170  # clear signal handlers in this child process, rospy will handle signals for us
171  signal.signal(signal.SIGTERM, signal.SIG_DFL)
172  signal.signal(signal.SIGINT, signal.SIG_DFL)
173 
174  worker_node_name = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
175  # print "Calling init_node with %s from process %s" % (worker_node_name, mp.current_process())
176  rospy.init_node(worker_node_name, anonymous=False)
177 
178  self.subscriber = None
179  while not self.subscriber and not self.is_quit():
180  try:
181  msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.topic, blocking=True)
182  self.subscriber = rospy.Subscriber(real_topic, msg_class, self.enqueue, self.topic)
183  except rostopic.ROSTopicIOException:
184  print("FAILED to subscribe, will keep trying %s" % self.name)
185  time.sleep(randint(1,10))
186  except rospy.ROSInitException:
187  print("FAILED to initialize, will keep trying %s" % self.name)
188  time.sleep(randint(1,10))
189  self.subscriber = None
190 
191  def run(self):
192  self.init()
193 
194  print("ACTIVE: %s" % self.name)
195 
196  # run the thread
197  self.dequeue()
198 
199  # free connection
200  # self.mongoconn.end_request()
201 
202  def is_quit(self):
203  return self.quit.value == 1
204 
205  def shutdown(self):
206  if not self.is_quit():
207  #print("SHUTDOWN %s qsize %d" % (self.name, self.queue.qsize()))
208  self.quit.value = 1
209  self.queue.put("shutdown")
210  while not self.queue.empty(): sleep(0.1)
211  #print("JOIN %s qsize %d" % (self.name, self.queue.qsize()))
212  self.process.join()
213  self.process.terminate()
214 
215 
216 
217 
218  def qsize(self):
219  return self.queue.qsize()
220 
221  def enqueue(self, data, topic, current_time=None):
222 
223  if not self.is_quit():
224  if self.queue.full():
225  try:
226  self.queue.get_nowait()
227  self.drop_counter.increment()
228  self.worker_drop_counter.increment()
229  except Empty:
230  pass
231  #self.queue.put((topic, data, current_time or datetime.now()))
232  self.queue.put((topic, data, rospy.get_time(), data._connection_header))
233  self.in_counter.increment()
234  self.worker_in_counter.increment()
235 
236  def dequeue(self):
237  while not self.is_quit():
238  t = None
239  try:
240  t = self.queue.get(True)
241  except IOError:
242  # Anticipate Ctrl-C
243  #print("Quit W1: %s" % self.name)
244  self.quit.value = 1
245  break
246  if isinstance(t, tuple):
247  self.out_counter.increment()
248  self.worker_out_counter.increment()
249  topic = t[0]
250  msg = t[1]
251  ctime = t[2]
252  connection_header = t[3]
253 
254  if isinstance(msg, rospy.Message):
255 
256  try:
257  #print(self.sep + threading.current_thread().getName() + "@" + topic+": ")
258  #pprint.pprint(doc)
259  meta = {}
260  # switched to use inserted_at to match message_store
261  # meta["recorded"] = ctime or datetime.now()
262  meta["topic"] = topic
263 
264  if connection_header['latching'] == '1':
265  meta['latch'] = True
266  else:
267  meta['latch'] = False
268 
269  if ctime is not None:
270  meta['inserted_at'] = datetime.utcfromtimestamp(ctime)
271  else:
272  meta['inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
273 
274 
275  mongodb_store.util.store_message(self.collection, msg, meta)
276 
277  except InvalidDocument as e:
278  print("InvalidDocument " + current_process().name + "@" + topic +": \n")
279  print(e)
280  except InvalidStringData as e:
281  print("InvalidStringData " + current_process().name + "@" + topic +": \n")
282  print(e)
283 
284  else:
285  #print("Quit W2: %s" % self.name)
286  self.quit.value = 1
287 
288  # we must make sure to clear the queue before exiting,
289  # or the parent thread might deadlock otherwise
290  #print("Quit W3: %s" % self.name)
291  self.subscriber.unregister()
292  self.subscriber = None
293  while not self.queue.empty():
294  t = self.queue.get_nowait()
295  print("STOPPED: %s" % self.name)
296 
297 
298 class SubprocessWorker(object):
299  def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
300  drop_counter_value, queue_maxsize,
301  mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
302 
303  self.name = "SubprocessWorker-%4d-%s" % (idnum, topic)
304  self.id = idnum
305  self.topic = topic
306  self.collname = collname
307  self.queue = Queue(queue_maxsize)
308  self.out_counter = Counter(out_counter_value)
309  self.in_counter = Counter(in_counter_value)
310  self.drop_counter = Counter(drop_counter_value)
314  self.mongodb_host = mongodb_host
315  self.mongodb_port = mongodb_port
316  self.mongodb_name = mongodb_name
317  self.nodename_prefix = nodename_prefix
318  self.quit = False
319  self.qsize = 0
320 
321  self.thread = Thread(name=self.name, target=self.run)
322 
323  mongodb_host_port = "%s:%d" % (mongodb_host, mongodb_port)
324  collection = "%s.%s" % (mongodb_name, collname)
325  nodename = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
326 
327  self.process = subprocess.Popen([cpp_logger[0], "-t", topic, "-n", nodename,
328  "-m", mongodb_host_port, "-c", collection],
329  stdout=subprocess.PIPE)
330 
331  self.thread.start()
332 
333  def qsize(self):
334  return self.qsize
335 
336  def run(self):
337  while not self.quit:
338  line = self.process.stdout.readline().rstrip()
339  if line == "": continue
340  arr = string.split(line, ":")
341  self.in_counter.increment(int(arr[0]))
342  self.out_counter.increment(int(arr[1]))
343  self.drop_counter.increment(int(arr[2]))
344  self.qsize = int(arr[3])
345 
346  self.worker_in_counter.increment(int(arr[0]))
347  self.worker_out_counter.increment(int(arr[1]))
348  self.worker_drop_counter.increment(int(arr[2]))
349 
350  def shutdown(self):
351  self.quit = True
352  self.process.kill()
353  self.process.wait()
354 
355 
356 class MongoWriter(object):
357  def __init__(self, topics = [], treat_as_regex=False,
358  all_topics = False, all_topics_interval = 5,
359  exclude_topics = [],
360  mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None,
361  no_specific=False, nodename_prefix=""):
362  self.all_topics = all_topics
363  self.all_topics_interval = all_topics_interval
364  self.exclude_topics = exclude_topics
365  self.mongodb_host = mongodb_host
366  self.mongodb_port = mongodb_port
367  self.mongodb_name = mongodb_name
368  self.mongodb_collection = mongodb_collection
369  self.no_specific = no_specific
370  self.nodename_prefix = nodename_prefix
371  self.quit = False
372  self.topics = set()
373  self.collnames = set()
374  #self.str_fn = roslib.message.strify_message
375  self.sep = "\n" #'\033[2J\033[;H'
376  self.in_counter = Counter()
377  self.out_counter = Counter()
378  self.drop_counter = Counter()
379  self.workers = []
380 
381  global use_setproctitle
382  if use_setproctitle:
383  setproctitle("mongodb_log MAIN")
384 
385  self.exclude_regex = []
386  for et in self.exclude_topics:
387  self.exclude_regex.append(re.compile(et))
388  self.exclude_already = []
389 
390  if treat_as_regex:
391  topics = self.expand_regex_to_topics(topics)
392 
393  self.missing_topics = self.subscribe_topics(set(topics))
394  self.fill_in_topics()
395 
396 
397  if self.all_topics:
398  print("All topics")
399  self.ros_master = rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % self.nodename_prefix)
400  self.update_topics(restart=False)
401 
403 
404  def expand_regex_to_topics(self, topics):
405  expanded_topics = []
406  published_topics = [t[0] for t in rospy.get_published_topics()]
407  for pattern in topics:
408  exp = re.compile(pattern)
409  expanded_topics += filter(lambda t: exp.match(t) is not None, published_topics)
410  return expanded_topics
411 
412  def subscribe_topics(self, topics):
413 
414  # print "existing topics %s" % self.topics
415 
416  # print "subscribing to topics %s" % topics
417 
418  missing_topics = set()
419  for topic in topics:
420  if topic and topic[-1] == '/':
421  topic = topic[:-1]
422 
423  if topic in self.topics: continue
424  if topic in self.exclude_already: continue
425 
426  do_continue = False
427  for tre in self.exclude_regex:
428  if tre.match(topic):
429  print("*** IGNORING topic %s due to exclusion rule" % topic)
430  do_continue = True
431  self.exclude_already.append(topic)
432  break
433  if do_continue: continue
434 
435  # although the collections is not strictly necessary, since MongoDB could handle
436  # pure topic names as collection names and we could then use mongodb[topic], we want
437  # to have names that go easier with the query tools, even though there is the theoretical
438  # possibility of name clashes (hence the check)
439  if self.mongodb_collection:
440  collname = self.mongodb_collection
441  else:
442  collname = mongodb_store.util.topic_name_to_collection_name(topic)
443  if collname in self.collnames:
444  print("Two converted topic names clash: %s, ignoring topic %s"
445  % (collname, topic))
446  continue
447  try:
448  print("Adding topic %s" % topic)
449  w = self.create_worker(len(self.workers), topic, collname)
450  self.workers.append(w)
451  self.collnames |= set([collname])
452  self.topics |= set([topic])
453  except Exception as e:
454  print('Failed to subscribe to %s due to %s' % (topic, e))
455  missing_topics.add(topic)
456 
457  return missing_topics
458 
459 
460  def create_worker(self, idnum, topic, collname):
461  try:
462  msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=False)
463  except Exception as e:
464  print('Topic %s not announced, cannot get type: %s' % (topic, e))
465  raise
466 
467  if real_topic is None:
468  raise rostopic.ROSTopicException('topic type was empty, probably not announced')
469 
470  w = None
471  node_path = None
472 
473  if not self.no_specific and msg_class == tfMessage:
474  print("DETECTED transform topic %s, using fast C++ logger" % topic)
475  node_path = find_node(PACKAGE_NAME, "mongodb_log_tf")
476  if not node_path:
477  print("FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
478  elif not self.no_specific and msg_class == PointCloud:
479  print("DETECTED point cloud topic %s, using fast C++ logger" % topic)
480  node_path = find_node(PACKAGE_NAME, "mongodb_log_pcl")
481  if not node_path:
482  print("FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
483  elif not self.no_specific and msg_class == CompressedImage:
484  print("DETECTED compressed image topic %s, using fast C++ logger" % topic)
485  node_path = find_node(PACKAGE_NAME, "mongodb_log_cimg")
486  if not node_path:
487  print("FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
488  """
489  elif msg_class == TriangleMesh:
490  print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic)
491  node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh")
492  if not node_path:
493  print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)")
494  """
495 
496 
497 
498  if node_path:
499  w = SubprocessWorker(idnum, topic, collname,
500  self.in_counter.count, self.out_counter.count,
501  self.drop_counter.count, QUEUE_MAXSIZE,
502  self.mongodb_host, self.mongodb_port, self.mongodb_name,
503  self.nodename_prefix, node_path)
504 
505  if not w:
506  print("GENERIC Python logger used for topic %s" % topic)
507  w = WorkerProcess(idnum, topic, collname,
508  self.in_counter.count, self.out_counter.count,
509  self.drop_counter.count, QUEUE_MAXSIZE,
510  self.mongodb_host, self.mongodb_port, self.mongodb_name,
511  self.nodename_prefix)
512 
513  return w
514 
515 
516  def run(self):
517  looping_threshold = timedelta(0, STATS_LOOPTIME, 0)
518 
519  while not self.quit:
520  started = datetime.now()
521 
522  # the following code makes sure we run once per STATS_LOOPTIME, taking
523  # varying run-times and interrupted sleeps into account
524  td = datetime.now() - started
525  while not self.quit and td < looping_threshold:
526  sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
527  if sleeptime > 0: sleep(sleeptime)
528  td = datetime.now() - started
529 
530 
531  def shutdown(self):
532  self.quit = True
533  if hasattr(self, "all_topics_timer"): self.all_topics_timer.cancel()
534  for w in self.workers:
535  #print("Shutdown %s" % name)
536  w.shutdown()
537 
538 
540  if not self.all_topics or self.quit: return
542  self.all_topics_timer.start()
543 
545  if len(self.missing_topics) == 0 or self.quit: return
547  self.fill_in_topics_timer.start()
548 
549 
550 
551  def update_topics(self, restart=True):
552  """
553  Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
554  """
555  if not self.all_topics or self.quit: return
556  ts = rospy.get_published_topics()
557  topics = set([t for t, t_type in ts if t != "/rosout" and t != "/rosout_agg"])
558  new_topics = topics - self.topics
559  self.subscribe_topics(new_topics)
560  if restart: self.start_all_topics_timer()
561 
562  def fill_in_topics(self, restart=True):
563  """
564  Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
565  """
566  if len(self.missing_topics) == 0 or self.quit: return
568  if restart: self.start_fill_in_topics_timer()
569 
570 
571  def get_memory_usage_for_pid(self, pid):
572 
573  scale = {'kB': 1024, 'mB': 1024 * 1024,
574  'KB': 1024, 'MB': 1024 * 1024}
575  try:
576  f = open("/proc/%d/status" % pid)
577  t = f.read()
578  f.close()
579  except:
580  return (0, 0, 0)
581 
582  if t == "": return (0, 0, 0)
583 
584  try:
585  tmp = t[t.index("VmSize:"):].split(None, 3)
586  size = int(tmp[1]) * scale[tmp[2]]
587  tmp = t[t.index("VmRSS:"):].split(None, 3)
588  rss = int(tmp[1]) * scale[tmp[2]]
589  tmp = t[t.index("VmStk:"):].split(None, 3)
590  stack = int(tmp[1]) * scale[tmp[2]]
591  return (size, rss, stack)
592  except ValueError:
593  return (0, 0, 0)
594 
595  def get_memory_usage(self):
596  size, rss, stack = 0, 0, 0
597  for w in self.workers:
598  pmem = self.get_memory_usage_for_pid(w.process.pid)
599  size += pmem[0]
600  rss += pmem[1]
601  stack += pmem[2]
602  #print("Size: %d RSS: %s Stack: %s" % (size, rss, stack))
603  return (size, rss, stack)
604 
605 
606 def main(argv):
607  parser = OptionParser()
608  parser.usage += " [TOPICs...]"
609  parser.add_option("--nodename-prefix", dest="nodename_prefix",
610  help="Prefix for worker node names", metavar="ROS_NODE_NAME",
611  default="")
612  parser.add_option("--mongodb-host", dest="mongodb_host",
613  help="Hostname of MongoDB", metavar="HOST",
614  default=rospy.get_param("mongodb_host", "localhost"))
615  parser.add_option("--mongodb-port", dest="mongodb_port",
616  help="Hostname of MongoDB", type="int",
617  metavar="PORT", default=rospy.get_param("mongodb_port", 27017))
618  parser.add_option("--mongodb-name", dest="mongodb_name",
619  help="Name of DB in which to store values",
620  metavar="NAME", default="roslog")
621  parser.add_option("--mongodb-collection", dest="mongodb_collection",
622  help="Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
623  metavar="COLLECTION", default=None)
624  parser.add_option("-a", "--all-topics", dest="all_topics", default=False,
625  action="store_true",
626  help="Log all existing topics (still excludes /rosout, /rosout_agg)")
627  parser.add_option("-e", "--regex", dest="treat_as_regex", default=False,
628  help="Log topics matching the follow regular expression",
629  action="store_true")
630  parser.add_option("--all-topics-interval", dest="all_topics_interval", default=5,
631  help="Time in seconds between checks for new topics", type="int")
632  parser.add_option("-x", "--exclude", dest="exclude",
633  help="Exclude topics matching REGEX, may be given multiple times",
634  action="append", type="string", metavar="REGEX", default=[])
635  parser.add_option("--no-specific", dest="no_specific", default=False,
636  action="store_true", help="Disable specific loggers")
637 
638  (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
639 
640  if not options.all_topics and len(args) == 0:
641  parser.print_help()
642  return
643 
644  try:
645  rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
646  except socket.error:
647  print("Failed to communicate with master")
648 
649  mongowriter = MongoWriter(topics=args,
650  treat_as_regex=options.treat_as_regex,
651  all_topics=options.all_topics,
652  all_topics_interval = options.all_topics_interval,
653  exclude_topics = options.exclude,
654  mongodb_host=options.mongodb_host,
655  mongodb_port=options.mongodb_port,
656  mongodb_name=options.mongodb_name,
657  mongodb_collection=options.mongodb_collection,
658  no_specific=options.no_specific,
659  nodename_prefix=options.nodename_prefix)
660 
661  def signal_handler(signal, frame):
662  mongowriter.shutdown()
663 
664  signal.signal(signal.SIGTERM, signal_handler)
665  signal.signal(signal.SIGINT, signal_handler)
666 
667  mongowriter.run()
668 
669 if __name__ == "__main__":
670  main(sys.argv)
mongodb_log.WorkerProcess.collname
collname
Definition: mongodb_log.py:133
mongodb_log.MongoWriter.collnames
collnames
Definition: mongodb_log.py:369
mongodb_log.WorkerProcess.run
def run(self)
Definition: mongodb_log.py:191
mongodb_log.Barrier.__init__
def __init__(self, num_threads)
Definition: mongodb_log.py:110
mongodb_log.SubprocessWorker.mongodb_port
mongodb_port
Definition: mongodb_log.py:313
mongodb_log.SubprocessWorker.in_counter
in_counter
Definition: mongodb_log.py:307
mongodb_log.WorkerProcess.topic
topic
Definition: mongodb_log.py:132
mongodb_log.Counter.increment
def increment(self, by=1)
Definition: mongodb_log.py:103
mongodb_log.SubprocessWorker.mongodb_host
mongodb_host
Definition: mongodb_log.py:312
mongodb_log.MongoWriter.create_worker
def create_worker(self, idnum, topic, collname)
Definition: mongodb_log.py:460
mongodb_log.Barrier.mutex
mutex
Definition: mongodb_log.py:113
mongodb_log.MongoWriter
Definition: mongodb_log.py:356
mongodb_log.SubprocessWorker.mongodb_name
mongodb_name
Definition: mongodb_log.py:314
mongodb_log.SubprocessWorker.worker_in_counter
worker_in_counter
Definition: mongodb_log.py:310
mongodb_log.MongoWriter.get_memory_usage_for_pid
def get_memory_usage_for_pid(self, pid)
Definition: mongodb_log.py:571
mongodb_log.Barrier.threads_left
threads_left
Definition: mongodb_log.py:112
mongodb_log.SubprocessWorker.drop_counter
drop_counter
Definition: mongodb_log.py:308
mongodb_log.WorkerProcess.queue
queue
Definition: mongodb_log.py:134
mongodb_log.WorkerProcess.dequeue
def dequeue(self)
Definition: mongodb_log.py:236
mongodb_log.SubprocessWorker.quit
quit
Definition: mongodb_log.py:316
mongodb_log.MongoWriter.fill_in_topics
def fill_in_topics(self, restart=True)
Definition: mongodb_log.py:562
mongodb_log.Barrier.wait
def wait(self)
Definition: mongodb_log.py:116
mongodb_log.SubprocessWorker
Definition: mongodb_log.py:298
mongodb_log.WorkerProcess.nodename_prefix
nodename_prefix
Definition: mongodb_log.py:144
mongodb_log.SubprocessWorker.out_counter
out_counter
Definition: mongodb_log.py:306
mongodb_log.MongoWriter.in_counter
in_counter
Definition: mongodb_log.py:372
mongodb_log.SubprocessWorker.nodename_prefix
nodename_prefix
Definition: mongodb_log.py:315
mongodb_log.WorkerProcess.worker_out_counter
worker_out_counter
Definition: mongodb_log.py:138
mongodb_log.WorkerProcess.enqueue
def enqueue(self, data, topic, current_time=None)
Definition: mongodb_log.py:221
mongodb_log.WorkerProcess.quit
quit
Definition: mongodb_log.py:145
mongodb_log.SubprocessWorker.process
process
Definition: mongodb_log.py:325
mongodb_log.MongoWriter.run
def run(self)
Definition: mongodb_log.py:516
mongodb_log.WorkerProcess.worker_drop_counter
worker_drop_counter
Definition: mongodb_log.py:140
mongodb_log.MongoWriter.workers
workers
Definition: mongodb_log.py:375
mongodb_log.Counter.mutex
mutex
Definition: mongodb_log.py:101
mongodb_log.WorkerProcess.name
name
Definition: mongodb_log.py:130
mongodb_log.MongoWriter.missing_topics
missing_topics
Definition: mongodb_log.py:389
mongodb_log.MongoWriter.subscribe_topics
def subscribe_topics(self, topics)
Definition: mongodb_log.py:412
mongodb_log.Counter
Definition: mongodb_log.py:98
mongodb_log.MongoWriter.exclude_already
exclude_already
Definition: mongodb_log.py:384
mongodb_log.WorkerProcess.mongodb_port
mongodb_port
Definition: mongodb_log.py:142
mongodb_log.WorkerProcess.shutdown
def shutdown(self)
Definition: mongodb_log.py:205
mongodb_log.Barrier.num_threads
num_threads
Definition: mongodb_log.py:111
mongodb_log.MongoWriter.update_topics
def update_topics(self, restart=True)
Definition: mongodb_log.py:551
mongodb_log.WorkerProcess.in_counter
in_counter
Definition: mongodb_log.py:136
mongodb_log.MongoWriter.ros_master
ros_master
Definition: mongodb_log.py:395
mongodb_log.SubprocessWorker.worker_out_counter
worker_out_counter
Definition: mongodb_log.py:309
mongodb_log.MongoWriter.start_all_topics_timer
def start_all_topics_timer(self)
Definition: mongodb_log.py:539
mongodb_log.Counter.__init__
def __init__(self, value=None, lock=True)
Definition: mongodb_log.py:99
mongodb_log.MongoWriter.exclude_topics
exclude_topics
Definition: mongodb_log.py:360
mongodb_log.WorkerProcess
Definition: mongodb_log.py:128
mongodb_log.WorkerProcess.id
id
Definition: mongodb_log.py:131
mongodb_log.MongoWriter.__init__
def __init__(self, topics=[], treat_as_regex=False, all_topics=False, all_topics_interval=5, exclude_topics=[], mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None, no_specific=False, nodename_prefix="")
Definition: mongodb_log.py:357
mongodb_log.MongoClient
MongoClient
Definition: mongodb_log.py:92
mongodb_log.SubprocessWorker.__init__
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger)
Definition: mongodb_log.py:299
mongodb_log.WorkerProcess.mongodb_name
mongodb_name
Definition: mongodb_log.py:143
mongodb_log.WorkerProcess.mongodb_host
mongodb_host
Definition: mongodb_log.py:141
mongodb_log.MongoWriter.topics
topics
Definition: mongodb_log.py:368
mongodb_log.SubprocessWorker.queue
queue
Definition: mongodb_log.py:305
mongodb_log.WorkerProcess.qsize
def qsize(self)
Definition: mongodb_log.py:218
mongodb_log.Counter.value
def value(self)
Definition: mongodb_log.py:106
mongodb_log.MongoWriter.mongodb_port
mongodb_port
Definition: mongodb_log.py:362
mongodb_log.MongoWriter.out_counter
out_counter
Definition: mongodb_log.py:373
mongodb_log.MongoWriter.mongodb_collection
mongodb_collection
Definition: mongodb_log.py:364
mongodb_log.SubprocessWorker.name
name
Definition: mongodb_log.py:301
mongodb_log.Counter.count
count
Definition: mongodb_log.py:100
mongodb_log.MongoWriter.all_topics_interval
all_topics_interval
Definition: mongodb_log.py:359
mongodb_log.WorkerProcess.__init__
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix)
Definition: mongodb_log.py:129
mongodb_log.MongoWriter.mongodb_host
mongodb_host
Definition: mongodb_log.py:361
mongodb_log.MongoWriter.drop_counter
drop_counter
Definition: mongodb_log.py:374
mongodb_log.Barrier
Definition: mongodb_log.py:109
mongodb_log.SubprocessWorker.shutdown
def shutdown(self)
Definition: mongodb_log.py:350
mongodb_log.MongoWriter.exclude_regex
exclude_regex
Definition: mongodb_log.py:381
mongodb_log.WorkerProcess.out_counter
out_counter
Definition: mongodb_log.py:135
mongodb_log.WorkerProcess.worker_in_counter
worker_in_counter
Definition: mongodb_log.py:139
mongodb_log.MongoWriter.quit
quit
Definition: mongodb_log.py:367
mongodb_log.SubprocessWorker.worker_drop_counter
worker_drop_counter
Definition: mongodb_log.py:311
mongodb_log.SubprocessWorker.id
id
Definition: mongodb_log.py:302
mongodb_log.MongoWriter.get_memory_usage
def get_memory_usage(self)
Definition: mongodb_log.py:595
mongodb_log.WorkerProcess.collection
collection
Definition: mongodb_log.py:165
mongodb_log.MongoWriter.mongodb_name
mongodb_name
Definition: mongodb_log.py:363
mongodb_log.MongoWriter.shutdown
def shutdown(self)
Definition: mongodb_log.py:531
mongodb_log.SubprocessWorker.collname
collname
Definition: mongodb_log.py:304
mongodb_log.SubprocessWorker.thread
thread
Definition: mongodb_log.py:319
mongodb_log.WorkerProcess.init
def init(self)
Definition: mongodb_log.py:156
mongodb_log.SubprocessWorker.qsize
qsize
Definition: mongodb_log.py:317
mongodb_log.MongoWriter.all_topics_timer
all_topics_timer
Definition: mongodb_log.py:541
mongodb_log.MongoWriter.expand_regex_to_topics
def expand_regex_to_topics(self, topics)
Definition: mongodb_log.py:404
mongodb_log.WorkerProcess.drop_counter
drop_counter
Definition: mongodb_log.py:137
mongodb_log.WorkerProcess.process
process
Definition: mongodb_log.py:148
mongodb_log.SubprocessWorker.run
def run(self)
Definition: mongodb_log.py:336
mongodb_log.Barrier.waitcond
waitcond
Definition: mongodb_log.py:114
mongodb_log.MongoWriter.all_topics
all_topics
Definition: mongodb_log.py:358
mongodb_log.SubprocessWorker.topic
topic
Definition: mongodb_log.py:303
mongodb_log.MongoWriter.sep
sep
Definition: mongodb_log.py:371
mongodb_log.WorkerProcess.subscriber
subscriber
Definition: mongodb_log.py:178
mongodb_log.WorkerProcess.mongoconn
mongoconn
Definition: mongodb_log.py:161
mongodb_log.MongoWriter.start_fill_in_topics_timer
def start_fill_in_topics_timer(self)
Definition: mongodb_log.py:544
mongodb_log.main
def main(argv)
Definition: mongodb_log.py:606
mongodb_log.MongoWriter.no_specific
no_specific
Definition: mongodb_log.py:365
mongodb_log.MongoWriter.nodename_prefix
nodename_prefix
Definition: mongodb_log.py:366
mongodb_log.WorkerProcess.mongodb
mongodb
Definition: mongodb_log.py:162
mongodb_log.WorkerProcess.is_quit
def is_quit(self)
Definition: mongodb_log.py:202
mongodb_log.MongoWriter.fill_in_topics_timer
fill_in_topics_timer
Definition: mongodb_log.py:546


mongodb_log
Author(s): Tim Niemueller
autogenerated on Tue Feb 28 2023 03:27:44