mongodb_log.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 
3 ###########################################################################
4 # mongodb_log.py - Python based ROS to MongoDB logger (multi-process)
5 #
6 # Created: Sun Dec 05 19:45:51 2010
7 # Copyright 2010-2012 Tim Niemueller [www.niemueller.de]
8 # 2010-2011 Carnegie Mellon University
9 # 2010 Intel Labs Pittsburgh
10 ###########################################################################
11 
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU Library General Public License for more details.
21 #
22 # Read the full text in the LICENSE.GPL file in the doc directory.
23 
24 # make sure we aren't using floor division
25 from __future__ import division, with_statement
26 
27 PACKAGE_NAME='mongodb_log'
28 NODE_NAME='mongodb_log'
29 NODE_NAME_TEMPLATE='%smongodb_log'
30 WORKER_NODE_NAME = "%smongodb_log_worker_%d_%s"
31 QUEUE_MAXSIZE = 100
32 
33 # import roslib; roslib.load_manifest(PACKAGE_NAME)
34 import rospy
35 
36 # for msg_to_document
37 import mongodb_store.util
38 
39 import os
40 import re
41 import sys
42 import time
43 import pprint
44 import string
45 import signal
46 import subprocess
47 from threading import Thread, Timer
48 
49 from Queue import Empty
50 from optparse import OptionParser
51 from tempfile import mktemp
52 from datetime import datetime, timedelta
53 from time import sleep
54 from random import randint
55 from tf.msg import tfMessage
56 from sensor_msgs.msg import PointCloud, CompressedImage
57 from roslib.packages import find_node
58 #from rviz_intel.msg import TriangleMesh
59 
60 use_setproctitle = True
61 try:
62  from setproctitle import setproctitle
63 except ImportError:
64  use_setproctitle = False
65 
66 
67 use_processes = False
68 
69 # if use_processes:
70 from multiprocessing import Process, Lock, Condition, Queue, Value, current_process, Event
71 import multiprocessing as mp
72 # else:
73  # from threading import Lock, Condition, Event
74  # from Queue import Queue
75  # def Value(t, val, lock=None):
76  # return val
77 
78 
79 import genpy
80 import rosgraph.masterapi
81 import roslib.message
82 #from rospy import Time, Duration
83 import rostopic
84 
85 
86 from pymongo import SLOW_ONLY
87 from pymongo.errors import InvalidDocument, InvalidStringData
88 
89 MongoClient = mongodb_store.util.import_MongoClient()
90 
91 BACKLOG_WARN_LIMIT = 100
92 STATS_LOOPTIME = 10
93 STATS_GRAPHTIME = 60
94 
95 class Counter(object):
96  def __init__(self, value = None, lock = True):
97  self.count = value or Value('i', 0, lock=lock)
98  self.mutex = Lock()
99 
100  def increment(self, by = 1):
101  with self.mutex: self.count.value += by
102 
103  def value(self):
104  with self.mutex: return self.count.value
105 
106 class Barrier(object):
107  def __init__(self, num_threads):
108  self.num_threads = num_threads
109  self.threads_left = Value('i', num_threads, lock=True)
110  self.mutex = Lock()
111  self.waitcond = Condition(self.mutex)
112 
113  def wait(self):
114  self.mutex.acquire()
115  self.threads_left.value -= 1
116  if self.threads_left.value == 0:
117  self.threads_left.value = self.num_threads
118  self.waitcond.notify_all()
119  self.mutex.release()
120  else:
121  self.waitcond.wait()
122  self.mutex.release()
123 
124 
125 class WorkerProcess(object):
126  def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
127  drop_counter_value, queue_maxsize,
128  mongodb_host, mongodb_port, mongodb_name, nodename_prefix):
129  self.name = "WorkerProcess-%4d-%s" % (idnum, topic)
130  self.id = idnum
131  self.topic = topic
132  self.collname = collname
133  self.queue = Queue(queue_maxsize)
134  self.out_counter = Counter(out_counter_value)
135  self.in_counter = Counter(in_counter_value)
136  self.drop_counter = Counter(drop_counter_value)
140  self.mongodb_host = mongodb_host
141  self.mongodb_port = mongodb_port
142  self.mongodb_name = mongodb_name
143  self.nodename_prefix = nodename_prefix
144  self.quit = Value('i', 0)
145 
146  # print "Creating process %s" % self.name
147  self.process = Process(name=self.name, target=self.run)
148  # self.process = Thread(name=self.name, target=self.run)
149  # print "created %s" % self.process
150  self.process.start()
151  # print "started %s" % self.process
152 
153  def init(self):
154  global use_setproctitle
155  if use_setproctitle:
156  setproctitle("mongodb_log %s" % self.topic)
157 
159  self.mongodb = self.mongoconn[self.mongodb_name]
160  self.mongodb.set_profiling_level = SLOW_ONLY
161 
162  self.collection = self.mongodb[self.collname]
163  self.collection.count()
164 
165  self.queue.cancel_join_thread()
166 
167  # clear signal handlers in this child process, rospy will handle signals for us
168  signal.signal(signal.SIGTERM, signal.SIG_DFL)
169  signal.signal(signal.SIGINT, signal.SIG_DFL)
170 
171  worker_node_name = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
172  # print "Calling init_node with %s from process %s" % (worker_node_name, mp.current_process())
173  rospy.init_node(worker_node_name, anonymous=False)
174 
175  self.subscriber = None
176  while not self.subscriber and not self.is_quit():
177  try:
178  msg_class, real_topic, msg_eval = rostopic.get_topic_class(self.topic, blocking=True)
179  self.subscriber = rospy.Subscriber(real_topic, msg_class, self.enqueue, self.topic)
180  except rostopic.ROSTopicIOException:
181  print("FAILED to subscribe, will keep trying %s" % self.name)
182  time.sleep(randint(1,10))
183  except rospy.ROSInitException:
184  print("FAILED to initialize, will keep trying %s" % self.name)
185  time.sleep(randint(1,10))
186  self.subscriber = None
187 
188  def run(self):
189  self.init()
190 
191  print("ACTIVE: %s" % self.name)
192 
193  # run the thread
194  self.dequeue()
195 
196  # free connection
197  # self.mongoconn.end_request()
198 
199  def is_quit(self):
200  return self.quit.value == 1
201 
202  def shutdown(self):
203  if not self.is_quit():
204  #print("SHUTDOWN %s qsize %d" % (self.name, self.queue.qsize()))
205  self.quit.value = 1
206  self.queue.put("shutdown")
207  while not self.queue.empty(): sleep(0.1)
208  #print("JOIN %s qsize %d" % (self.name, self.queue.qsize()))
209  self.process.join()
210  self.process.terminate()
211 
212 
213 
214 
215  def qsize(self):
216  return self.queue.qsize()
217 
218  def enqueue(self, data, topic, current_time=None):
219 
220  if not self.is_quit():
221  if self.queue.full():
222  try:
223  self.queue.get_nowait()
224  self.drop_counter.increment()
225  self.worker_drop_counter.increment()
226  except Empty:
227  pass
228  #self.queue.put((topic, data, current_time or datetime.now()))
229  self.queue.put((topic, data, rospy.get_time(), data._connection_header))
230  self.in_counter.increment()
231  self.worker_in_counter.increment()
232 
233  def dequeue(self):
234  while not self.is_quit():
235  t = None
236  try:
237  t = self.queue.get(True)
238  except IOError:
239  # Anticipate Ctrl-C
240  #print("Quit W1: %s" % self.name)
241  self.quit.value = 1
242  break
243  if isinstance(t, tuple):
244  self.out_counter.increment()
245  self.worker_out_counter.increment()
246  topic = t[0]
247  msg = t[1]
248  ctime = t[2]
249  connection_header = t[3]
250 
251  if isinstance(msg, rospy.Message):
252 
253  try:
254  #print(self.sep + threading.current_thread().getName() + "@" + topic+": ")
255  #pprint.pprint(doc)
256  meta = {}
257  # switched to use inserted_at to match message_store
258  # meta["recorded"] = ctime or datetime.now()
259  meta["topic"] = topic
260 
261  if connection_header['latching'] == '1':
262  meta['latch'] = True
263  else:
264  meta['latch'] = False
265 
266  if ctime is not None:
267  meta['inserted_at'] = datetime.utcfromtimestamp(ctime)
268  else:
269  meta['inserted_at'] = datetime.utcfromtimestamp(rospy.get_rostime().to_sec())
270 
271 
272  mongodb_store.util.store_message(self.collection, msg, meta)
273 
274  except InvalidDocument, e:
275  print("InvalidDocument " + current_process().name + "@" + topic +": \n")
276  print e
277  except InvalidStringData, e:
278  print("InvalidStringData " + current_process().name + "@" + topic +": \n")
279  print e
280 
281  else:
282  #print("Quit W2: %s" % self.name)
283  self.quit.value = 1
284 
285  # we must make sure to clear the queue before exiting,
286  # or the parent thread might deadlock otherwise
287  #print("Quit W3: %s" % self.name)
288  self.subscriber.unregister()
289  self.subscriber = None
290  while not self.queue.empty():
291  t = self.queue.get_nowait()
292  print("STOPPED: %s" % self.name)
293 
294 
295 class SubprocessWorker(object):
296  def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value,
297  drop_counter_value, queue_maxsize,
298  mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger):
299 
300  self.name = "SubprocessWorker-%4d-%s" % (idnum, topic)
301  self.id = idnum
302  self.topic = topic
303  self.collname = collname
304  self.queue = Queue(queue_maxsize)
305  self.out_counter = Counter(out_counter_value)
306  self.in_counter = Counter(in_counter_value)
307  self.drop_counter = Counter(drop_counter_value)
311  self.mongodb_host = mongodb_host
312  self.mongodb_port = mongodb_port
313  self.mongodb_name = mongodb_name
314  self.nodename_prefix = nodename_prefix
315  self.quit = False
316  self.qsize = 0
317 
318  self.thread = Thread(name=self.name, target=self.run)
319 
320  mongodb_host_port = "%s:%d" % (mongodb_host, mongodb_port)
321  collection = "%s.%s" % (mongodb_name, collname)
322  nodename = WORKER_NODE_NAME % (self.nodename_prefix, self.id, self.collname)
323 
324  self.process = subprocess.Popen([cpp_logger[0], "-t", topic, "-n", nodename,
325  "-m", mongodb_host_port, "-c", collection],
326  stdout=subprocess.PIPE)
327 
328  self.thread.start()
329 
330  def qsize(self):
331  return self.qsize
332 
333  def run(self):
334  while not self.quit:
335  line = self.process.stdout.readline().rstrip()
336  if line == "": continue
337  arr = string.split(line, ":")
338  self.in_counter.increment(int(arr[0]))
339  self.out_counter.increment(int(arr[1]))
340  self.drop_counter.increment(int(arr[2]))
341  self.qsize = int(arr[3])
342 
343  self.worker_in_counter.increment(int(arr[0]))
344  self.worker_out_counter.increment(int(arr[1]))
345  self.worker_drop_counter.increment(int(arr[2]))
346 
347  def shutdown(self):
348  self.quit = True
349  self.process.kill()
350  self.process.wait()
351 
352 
353 class MongoWriter(object):
354  def __init__(self, topics = [], treat_as_regex=False,
355  all_topics = False, all_topics_interval = 5,
356  exclude_topics = [],
357  mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None,
358  no_specific=False, nodename_prefix=""):
359  self.all_topics = all_topics
360  self.all_topics_interval = all_topics_interval
361  self.exclude_topics = exclude_topics
362  self.mongodb_host = mongodb_host
363  self.mongodb_port = mongodb_port
364  self.mongodb_name = mongodb_name
365  self.mongodb_collection = mongodb_collection
366  self.no_specific = no_specific
367  self.nodename_prefix = nodename_prefix
368  self.quit = False
369  self.topics = set()
370  self.collnames = set()
371  #self.str_fn = roslib.message.strify_message
372  self.sep = "\n" #'\033[2J\033[;H'
376  self.workers = []
377 
378  global use_setproctitle
379  if use_setproctitle:
380  setproctitle("mongodb_log MAIN")
381 
382  self.exclude_regex = []
383  for et in self.exclude_topics:
384  self.exclude_regex.append(re.compile(et))
385  self.exclude_already = []
386 
387  if treat_as_regex:
388  topics = self.expand_regex_to_topics(topics)
389 
390  self.missing_topics = self.subscribe_topics(set(topics))
391  self.fill_in_topics()
392 
393 
394  if self.all_topics:
395  print("All topics")
396  self.ros_master = rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % self.nodename_prefix)
397  self.update_topics(restart=False)
398 
400 
401  def expand_regex_to_topics(self, topics):
402  expanded_topics = []
403  published_topics = [t[0] for t in rospy.get_published_topics()]
404  for pattern in topics:
405  exp = re.compile(pattern)
406  expanded_topics += filter(lambda t: exp.match(t) is not None, published_topics)
407  return expanded_topics
408 
409  def subscribe_topics(self, topics):
410 
411  # print "existing topics %s" % self.topics
412 
413  # print "subscribing to topics %s" % topics
414 
415  missing_topics = set()
416  for topic in topics:
417  if topic and topic[-1] == '/':
418  topic = topic[:-1]
419 
420  if topic in self.topics: continue
421  if topic in self.exclude_already: continue
422 
423  do_continue = False
424  for tre in self.exclude_regex:
425  if tre.match(topic):
426  print("*** IGNORING topic %s due to exclusion rule" % topic)
427  do_continue = True
428  self.exclude_already.append(topic)
429  break
430  if do_continue: continue
431 
432  # although the collections is not strictly necessary, since MongoDB could handle
433  # pure topic names as collection names and we could then use mongodb[topic], we want
434  # to have names that go easier with the query tools, even though there is the theoretical
435  # possibility of name clashes (hence the check)
436  if self.mongodb_collection:
437  collname = self.mongodb_collection
438  else:
439  collname = mongodb_store.util.topic_name_to_collection_name(topic)
440  if collname in self.collnames:
441  print("Two converted topic names clash: %s, ignoring topic %s"
442  % (collname, topic))
443  continue
444  try:
445  print("Adding topic %s" % topic)
446  w = self.create_worker(len(self.workers), topic, collname)
447  self.workers.append(w)
448  self.collnames |= set([collname])
449  self.topics |= set([topic])
450  except Exception, e:
451  print('Failed to subscribe to %s due to %s' % (topic, e))
452  missing_topics.add(topic)
453 
454  return missing_topics
455 
456 
457  def create_worker(self, idnum, topic, collname):
458  try:
459  msg_class, real_topic, msg_eval = rostopic.get_topic_class(topic, blocking=False)
460  except Exception, e:
461  print('Topic %s not announced, cannot get type: %s' % (topic, e))
462  raise
463 
464  if real_topic is None:
465  raise rostopic.ROSTopicException('topic type was empty, probably not announced')
466 
467  w = None
468  node_path = None
469 
470  if not self.no_specific and msg_class == tfMessage:
471  print("DETECTED transform topic %s, using fast C++ logger" % topic)
472  node_path = find_node(PACKAGE_NAME, "mongodb_log_tf")
473  if not node_path:
474  print("FAILED to detect mongodb_log_tf, falling back to generic logger (did not build package?)")
475  elif not self.no_specific and msg_class == PointCloud:
476  print("DETECTED point cloud topic %s, using fast C++ logger" % topic)
477  node_path = find_node(PACKAGE_NAME, "mongodb_log_pcl")
478  if not node_path:
479  print("FAILED to detect mongodb_log_pcl, falling back to generic logger (did not build package?)")
480  elif not self.no_specific and msg_class == CompressedImage:
481  print("DETECTED compressed image topic %s, using fast C++ logger" % topic)
482  node_path = find_node(PACKAGE_NAME, "mongodb_log_cimg")
483  if not node_path:
484  print("FAILED to detect mongodb_log_cimg, falling back to generic logger (did not build package?)")
485  """
486  elif msg_class == TriangleMesh:
487  print("DETECTED triangle mesh topic %s, using fast C++ logger" % topic)
488  node_path = find_node(PACKAGE_NAME, "mongodb_log_trimesh")
489  if not node_path:
490  print("FAILED to detect mongodb_log_trimesh, falling back to generic logger (did not build package?)")
491  """
492 
493 
494 
495  if node_path:
496  w = SubprocessWorker(idnum, topic, collname,
497  self.in_counter.count, self.out_counter.count,
498  self.drop_counter.count, QUEUE_MAXSIZE,
499  self.mongodb_host, self.mongodb_port, self.mongodb_name,
500  self.nodename_prefix, node_path)
501 
502  if not w:
503  print("GENERIC Python logger used for topic %s" % topic)
504  w = WorkerProcess(idnum, topic, collname,
505  self.in_counter.count, self.out_counter.count,
506  self.drop_counter.count, QUEUE_MAXSIZE,
507  self.mongodb_host, self.mongodb_port, self.mongodb_name,
508  self.nodename_prefix)
509 
510  return w
511 
512 
513  def run(self):
514  looping_threshold = timedelta(0, STATS_LOOPTIME, 0)
515 
516  while not self.quit:
517  started = datetime.now()
518 
519  # the following code makes sure we run once per STATS_LOOPTIME, taking
520  # varying run-times and interrupted sleeps into account
521  td = datetime.now() - started
522  while not self.quit and td < looping_threshold:
523  sleeptime = STATS_LOOPTIME - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
524  if sleeptime > 0: sleep(sleeptime)
525  td = datetime.now() - started
526 
527 
528  def shutdown(self):
529  self.quit = True
530  if hasattr(self, "all_topics_timer"): self.all_topics_timer.cancel()
531  for w in self.workers:
532  #print("Shutdown %s" % name)
533  w.shutdown()
534 
535 
537  if not self.all_topics or self.quit: return
539  self.all_topics_timer.start()
540 
542  if len(self.missing_topics) == 0 or self.quit: return
544  self.fill_in_topics_timer.start()
545 
546 
547 
548  def update_topics(self, restart=True):
549  """
550  Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
551  """
552  if not self.all_topics or self.quit: return
553  ts = rospy.get_published_topics()
554  topics = set([t for t, t_type in ts if t != "/rosout" and t != "/rosout_agg"])
555  new_topics = topics - self.topics
556  self.subscribe_topics(new_topics)
557  if restart: self.start_all_topics_timer()
558 
559  def fill_in_topics(self, restart=True):
560  """
561  Called at a fixed interval (see start_all_topics_timer) to update the list of topics if we are logging all topics (e.g. --all-topics flag is given).
562  """
563  if len(self.missing_topics) == 0 or self.quit: return
565  if restart: self.start_fill_in_topics_timer()
566 
567 
568  def get_memory_usage_for_pid(self, pid):
569 
570  scale = {'kB': 1024, 'mB': 1024 * 1024,
571  'KB': 1024, 'MB': 1024 * 1024}
572  try:
573  f = open("/proc/%d/status" % pid)
574  t = f.read()
575  f.close()
576  except:
577  return (0, 0, 0)
578 
579  if t == "": return (0, 0, 0)
580 
581  try:
582  tmp = t[t.index("VmSize:"):].split(None, 3)
583  size = int(tmp[1]) * scale[tmp[2]]
584  tmp = t[t.index("VmRSS:"):].split(None, 3)
585  rss = int(tmp[1]) * scale[tmp[2]]
586  tmp = t[t.index("VmStk:"):].split(None, 3)
587  stack = int(tmp[1]) * scale[tmp[2]]
588  return (size, rss, stack)
589  except ValueError:
590  return (0, 0, 0)
591 
592  def get_memory_usage(self):
593  size, rss, stack = 0, 0, 0
594  for w in self.workers:
595  pmem = self.get_memory_usage_for_pid(w.process.pid)
596  size += pmem[0]
597  rss += pmem[1]
598  stack += pmem[2]
599  #print("Size: %d RSS: %s Stack: %s" % (size, rss, stack))
600  return (size, rss, stack)
601 
602 
603 def main(argv):
604  parser = OptionParser()
605  parser.usage += " [TOPICs...]"
606  parser.add_option("--nodename-prefix", dest="nodename_prefix",
607  help="Prefix for worker node names", metavar="ROS_NODE_NAME",
608  default="")
609  parser.add_option("--mongodb-host", dest="mongodb_host",
610  help="Hostname of MongoDB", metavar="HOST",
611  default=rospy.get_param("mongodb_host", "localhost"))
612  parser.add_option("--mongodb-port", dest="mongodb_port",
613  help="Hostname of MongoDB", type="int",
614  metavar="PORT", default=rospy.get_param("mongodb_port", 27017))
615  parser.add_option("--mongodb-name", dest="mongodb_name",
616  help="Name of DB in which to store values",
617  metavar="NAME", default="roslog")
618  parser.add_option("--mongodb-collection", dest="mongodb_collection",
619  help="Name of Collection in which to store values. All topics are stored in the collection if used this option, otherwise topic names are used as collections",
620  metavar="COLLECTION", default=None)
621  parser.add_option("-a", "--all-topics", dest="all_topics", default=False,
622  action="store_true",
623  help="Log all existing topics (still excludes /rosout, /rosout_agg)")
624  parser.add_option("-e", "--regex", dest="treat_as_regex", default=False,
625  help="Log topics matching the follow regular expression",
626  action="store_true")
627  parser.add_option("--all-topics-interval", dest="all_topics_interval", default=5,
628  help="Time in seconds between checks for new topics", type="int")
629  parser.add_option("-x", "--exclude", dest="exclude",
630  help="Exclude topics matching REGEX, may be given multiple times",
631  action="append", type="string", metavar="REGEX", default=[])
632  parser.add_option("--no-specific", dest="no_specific", default=False,
633  action="store_true", help="Disable specific loggers")
634 
635  (options, args) = parser.parse_args(rospy.myargv(argv=sys.argv)[1:])
636 
637  if not options.all_topics and len(args) == 0:
638  parser.print_help()
639  return
640 
641  try:
642  rosgraph.masterapi.Master(NODE_NAME_TEMPLATE % options.nodename_prefix).getPid()
643  except socket.error:
644  print("Failed to communicate with master")
645 
646  mongowriter = MongoWriter(topics=args,
647  treat_as_regex=options.treat_as_regex,
648  all_topics=options.all_topics,
649  all_topics_interval = options.all_topics_interval,
650  exclude_topics = options.exclude,
651  mongodb_host=options.mongodb_host,
652  mongodb_port=options.mongodb_port,
653  mongodb_name=options.mongodb_name,
654  mongodb_collection=options.mongodb_collection,
655  no_specific=options.no_specific,
656  nodename_prefix=options.nodename_prefix)
657 
658  def signal_handler(signal, frame):
659  mongowriter.shutdown()
660 
661  signal.signal(signal.SIGTERM, signal_handler)
662  signal.signal(signal.SIGINT, signal_handler)
663 
664  mongowriter.run()
665 
666 if __name__ == "__main__":
667  main(sys.argv)
def create_worker(self, idnum, topic, collname)
Definition: mongodb_log.py:457
def get_memory_usage_for_pid(self, pid)
Definition: mongodb_log.py:568
def fill_in_topics(self, restart=True)
Definition: mongodb_log.py:559
def expand_regex_to_topics(self, topics)
Definition: mongodb_log.py:401
def __init__(self, topics=[], treat_as_regex=False, all_topics=False, all_topics_interval=5, exclude_topics=[], mongodb_host=None, mongodb_port=None, mongodb_name="roslog", mongodb_collection=None, no_specific=False, nodename_prefix="")
Definition: mongodb_log.py:358
def start_all_topics_timer(self)
Definition: mongodb_log.py:536
def main(argv)
Definition: mongodb_log.py:603
def start_fill_in_topics_timer(self)
Definition: mongodb_log.py:541
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix, cpp_logger)
Definition: mongodb_log.py:298
def enqueue(self, data, topic, current_time=None)
Definition: mongodb_log.py:218
def increment(self, by=1)
Definition: mongodb_log.py:100
def __init__(self, value=None, lock=True)
Definition: mongodb_log.py:96
def __init__(self, idnum, topic, collname, in_counter_value, out_counter_value, drop_counter_value, queue_maxsize, mongodb_host, mongodb_port, mongodb_name, nodename_prefix)
Definition: mongodb_log.py:128
def update_topics(self, restart=True)
Definition: mongodb_log.py:548
def subscribe_topics(self, topics)
Definition: mongodb_log.py:409
def __init__(self, num_threads)
Definition: mongodb_log.py:107


mongodb_log
Author(s): Tim Niemueller
autogenerated on Sat Sep 7 2019 03:40:41