test_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import rospy
4 from std_msgs.msg import Int64, Bool
5 from threading import Thread
6 import mongodb_store_msgs.srv as dc_srv
7 import mongodb_store.util as dc_util
8 from mongodb_store.message_store import MessageStoreProxy
9 
10 class TestPublisher(object):
11 
12  def __init__(self, topic, delay, rate, count=20):
13  super(TestPublisher, self).__init__()
14  self.publisher = rospy.Publisher(topic, Int64, queue_size=min(10, count))
15  self.count = count
16  self.delay = delay
17  self.rate = rate
18  self.thread = Thread(target=self.publish, name=topic)
19 
20 
21  def publish(self):
22  rospy.sleep(self.delay)
23 
24  for i in range(self.count):
25  self.publisher.publish(i)
26  self.rate.sleep()
27 
28 
29 
30 
31 if __name__ == '__main__':
32 
33  rospy.init_node("mongodb_log_test_publisher")
34 
35 
36  # topic, delay, rate, count
37  to_publish = [ ('test_0', rospy.Duration(10), rospy.Rate(1), 20), ('test_1', rospy.Duration(20), rospy.Rate(1), 20) ]
38 
39  publishers = map(lambda tup: TestPublisher(*tup), to_publish)
40  map(lambda pub: pub.thread.start(), publishers)
41  map(lambda pub: pub.thread.join(), publishers)
42 
43 
44  # now I should be able to get these back from the datacentre
45 
46  for target in to_publish:
47  msg_store = MessageStoreProxy(database='roslog', collection=target[0])
48  print len(msg_store.query(Int64._type)) == target[3]
49 
50 
def __init__(self, topic, delay, rate, count=20)


mongodb_log
Author(s): Tim Niemueller
autogenerated on Sat Sep 7 2019 03:40:41