test_bagger_node.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 PKG = 'bagger'
4 NAME = 'test_bagger_node'
5 
6 import unittest
7 import rosunit
8 import rospy
9 import time
10 import subprocess
11 from random import *
12 from bagger.msg import BaggingState
13 from bagger.srv import SetBagState
14 from nav_msgs.msg import Odometry
15 from geometry_msgs.msg import Point
16 
17 
18 class BagStateCallback(object):
19  def __init__(self):
20  self.received_msgs = 0
21  self.last_msg = None
22 
23  def __call__(self, msg):
24  self.received_msgs += 1
25  self.last_msg = msg
26  self.bag_names_states = {}
27  for i in range(0, len(self.last_msg.bag_profile_names)):
28  self.bag_names_states[msg.bag_profile_names[i]] = msg.bagging[i]
29 
30 class TestBaggerNode(unittest.TestCase):
31  def __init__(self, *args):
32  super(TestBaggerNode, self).__init__(*args)
33  self.node = rospy.init_node(NAME)
34 
36  """Test that bagging state messages are correctly published"""
37 
38  # Create callback handler and subscribe
39  bagging_state_cb = BagStateCallback()
40  rospy.Subscriber('/bagger/bag_states', BaggingState, bagging_state_cb)
41  set_bagging_state_srv = rospy.ServiceProxy('/bagger/set_bag_state', SetBagState)
42 
43  # Sleep a little and assert that both bag profiles are in the off state
44  rospy.sleep(0.5)
45  self.assertEquals(bagging_state_cb.bag_names_states["everything"], False)
46  self.assertEquals(bagging_state_cb.bag_names_states["nav"], False)
47 
48  # Set bag states to false - reset state
49  self.assertTrue(set_bagging_state_srv("everything", False))
50  self.assertTrue(set_bagging_state_srv("nav", False))
51 
52  # Sleep to make sure any bag state msgs from the last set_bagging state are published
53  # before the next test is run
54  rospy.sleep(0.5)
55 
57  """Test turning on/off a couple of bags and seeing that their status is updated correctly"""
58 
59  # Create callback handler
60  set_bagging_state_srv = rospy.ServiceProxy('/bagger/set_bag_state', SetBagState)
61 
62  # Now that the states are known, subscribe and setup cb
63  bagging_state_cb = BagStateCallback()
64  rospy.Subscriber('/bagger/bag_states', BaggingState, bagging_state_cb)
65 
66  rospy.sleep(0.5)
67 
68  # Turn on the everything bag and the navigation bag
69  self.assertTrue(set_bagging_state_srv("everything", True).success)
70  self.assertTrue(set_bagging_state_srv("nav", True).success)
71 
72  # Wait a bit to make sure the status is updated
73  rospy.sleep(0.5)
74  self.assertEquals(bagging_state_cb.bag_names_states["everything"], True)
75  self.assertEquals(bagging_state_cb.bag_names_states["nav"], True)
76  self.assertEquals(bagging_state_cb.received_msgs, 3)
77 
78  # Turn on the everything and nav bags again - they're already on so they should just stay that way
79  self.assertFalse(set_bagging_state_srv("everything", True).success)
80  self.assertFalse(set_bagging_state_srv("nav", True).success)
81 
82  # Wait a bit to make sure the status is updated
83  rospy.sleep(1.0)
84  self.assertEquals(bagging_state_cb.bag_names_states["everything"], True)
85  self.assertEquals(bagging_state_cb.bag_names_states["nav"], True)
86 
87  # Set bag states to false - reset state
88  self.assertTrue(set_bagging_state_srv("everything", False))
89  self.assertTrue(set_bagging_state_srv("nav", False))
90 
91  # Sleep to make sure any bag state msgs from the last set_bagging state are published
92  # before the next test is run
93  rospy.sleep(0.5)
94 
96  """Test attempting to turn on a non-existant bag profile"""
97 
98  set_bagging_state_srv = rospy.ServiceProxy('/bagger/set_bag_state', SetBagState)
99 
100  # Turn on a non-existant bag
101  set_bagging_state_srv("incorrect", True)
102  self.assertFalse(set_bagging_state_srv("missing", True).success)
103 
104  # Set bag states to false - reset state
105  self.assertTrue(set_bagging_state_srv("everything", False))
106  self.assertTrue(set_bagging_state_srv("nav", False))
107 
108  # Sleep to make sure any bag state msgs from the last set_bagging state are published
109  # before the next test is run
110  rospy.sleep(0.5)
111 
112  @unittest.skip("Something is wrong with spawning the rosbag processes in Docker")
114  """Turn on bag profiles, publish a bunch of applicable ros messages, then make sure the bags captured them"""
115 
116  # Make sure both bag profiles are recording
117  set_bagging_state_srv = rospy.ServiceProxy('/bagger/set_bag_state', SetBagState)
118  set_bagging_state_srv("everything", True)
119  set_bagging_state_srv("nav", True)
120 
121  rospy.sleep(1.0)
122 
123  # Set up publisher for both odom and point messages
124  odom_pub = rospy.Publisher('odometry', Odometry, queue_size=10)
125  point_pub = rospy.Publisher('point', Point, queue_size=10)
126 
127  for i in range(0,1000):
128  # Publish a bunch of point and odometry messages
129  odom = Odometry()
130  odom.header.frame_id = "body"
131  odom.header.stamp = rospy.Time.now()
132  odom.pose.pose.position.x = random() * 100
133  odom.pose.pose.position.y = random() * 100
134  odom.pose.pose.position.z = random() * 100
135  odom.pose.pose.orientation.x = random()
136  odom.pose.pose.orientation.y = random()
137  odom.pose.pose.orientation.z = random()
138  odom.pose.pose.orientation.w = random()
139  odom.twist.twist.linear.x = random()
140  odom.twist.twist.linear.y = random()
141  odom.twist.twist.linear.z = random()
142  odom.twist.twist.angular.x = random()
143  odom.twist.twist.angular.y = random()
144  odom.twist.twist.angular.z = random()
145 
146  point = Point()
147  point.x = random() * 10
148  point.y = random() * 10
149  point.z = random() * 10
150 
151  odom_pub.publish(odom)
152  point_pub.publish(point)
153  rospy.sleep(0.005)
154 
155  # Sleep for a little bit, the turn both bag profiles off
156  rospy.sleep(1.0)
157 
158  self.assertTrue(set_bagging_state_srv("everything", False))
159  self.assertTrue(set_bagging_state_srv("nav", False))
160 
161  rospy.sleep(1.0)
162 
163  # verify that bags have been created in the appropriate directory and that they contain the correct messages
164  everything_bag_info = subprocess.check_output(["rosbag", "info", "everything.bag"])
165  self.assertTrue("nav_msgs/Odometry" in everything_bag_info)
166  self.assertTrue("geometry_msgs/Point" in everything_bag_info)
167 
168  nav_bag_info = subprocess.check_output(["rosbag", "info", "nav.bag"])
169  self.assertTrue("nav_msgs/Odometry" in nav_bag_info)
170  self.assertFalse("geometry_msgs/Point" in nav_bag_info)
171 
172  # Set bag states to false - reset state
173  self.assertTrue(set_bagging_state_srv("everything", False))
174  self.assertTrue(set_bagging_state_srv("nav", False))
175 
176  # Sleep to make sure any bag state msgs from the last set_bagging state are published
177  # before the next test is run
178  rospy.sleep(0.5)
179 
180 
181 if __name__ == '__main__':
182  rosunit.unitrun(PKG, NAME, TestBaggerNode)


bagger
Author(s): Brenden Gibbons
autogenerated on Tue Apr 27 2021 02:29:15