4 NAME =
'test_bagger_node' 12 from bagger.msg
import BaggingState
13 from bagger.srv
import SetBagState
14 from nav_msgs.msg
import Odometry
15 from geometry_msgs.msg
import Point
27 for i
in range(0, len(self.last_msg.bag_profile_names)):
32 super(TestBaggerNode, self).
__init__(*args)
33 self.
node = rospy.init_node(NAME)
36 """Test that bagging state messages are correctly published""" 40 rospy.Subscriber(
'/bagger/bag_states', BaggingState, bagging_state_cb)
41 set_bagging_state_srv = rospy.ServiceProxy(
'/bagger/set_bag_state', SetBagState)
45 self.assertEquals(bagging_state_cb.bag_names_states[
"everything"],
False)
46 self.assertEquals(bagging_state_cb.bag_names_states[
"nav"],
False)
49 self.assertTrue(set_bagging_state_srv(
"everything",
False))
50 self.assertTrue(set_bagging_state_srv(
"nav",
False))
57 """Test turning on/off a couple of bags and seeing that their status is updated correctly""" 60 set_bagging_state_srv = rospy.ServiceProxy(
'/bagger/set_bag_state', SetBagState)
64 rospy.Subscriber(
'/bagger/bag_states', BaggingState, bagging_state_cb)
69 self.assertTrue(set_bagging_state_srv(
"everything",
True).success)
70 self.assertTrue(set_bagging_state_srv(
"nav",
True).success)
74 self.assertEquals(bagging_state_cb.bag_names_states[
"everything"],
True)
75 self.assertEquals(bagging_state_cb.bag_names_states[
"nav"],
True)
76 self.assertEquals(bagging_state_cb.received_msgs, 3)
79 self.assertFalse(set_bagging_state_srv(
"everything",
True).success)
80 self.assertFalse(set_bagging_state_srv(
"nav",
True).success)
84 self.assertEquals(bagging_state_cb.bag_names_states[
"everything"],
True)
85 self.assertEquals(bagging_state_cb.bag_names_states[
"nav"],
True)
88 self.assertTrue(set_bagging_state_srv(
"everything",
False))
89 self.assertTrue(set_bagging_state_srv(
"nav",
False))
96 """Test attempting to turn on a non-existant bag profile""" 98 set_bagging_state_srv = rospy.ServiceProxy(
'/bagger/set_bag_state', SetBagState)
101 set_bagging_state_srv(
"incorrect",
True)
102 self.assertFalse(set_bagging_state_srv(
"missing",
True).success)
105 self.assertTrue(set_bagging_state_srv(
"everything",
False))
106 self.assertTrue(set_bagging_state_srv(
"nav",
False))
112 @unittest.skip(
"Something is wrong with spawning the rosbag processes in Docker")
114 """Turn on bag profiles, publish a bunch of applicable ros messages, then make sure the bags captured them""" 117 set_bagging_state_srv = rospy.ServiceProxy(
'/bagger/set_bag_state', SetBagState)
118 set_bagging_state_srv(
"everything",
True)
119 set_bagging_state_srv(
"nav",
True)
124 odom_pub = rospy.Publisher(
'odometry', Odometry, queue_size=10)
125 point_pub = rospy.Publisher(
'point', Point, queue_size=10)
127 for i
in range(0,1000):
130 odom.header.frame_id =
"body" 131 odom.header.stamp = rospy.Time.now()
132 odom.pose.pose.position.x =
random() * 100
133 odom.pose.pose.position.y =
random() * 100
134 odom.pose.pose.position.z =
random() * 100
135 odom.pose.pose.orientation.x =
random()
136 odom.pose.pose.orientation.y =
random()
137 odom.pose.pose.orientation.z =
random()
138 odom.pose.pose.orientation.w =
random()
139 odom.twist.twist.linear.x =
random()
140 odom.twist.twist.linear.y =
random()
141 odom.twist.twist.linear.z =
random()
142 odom.twist.twist.angular.x =
random()
143 odom.twist.twist.angular.y =
random()
144 odom.twist.twist.angular.z =
random()
151 odom_pub.publish(odom)
152 point_pub.publish(point)
158 self.assertTrue(set_bagging_state_srv(
"everything",
False))
159 self.assertTrue(set_bagging_state_srv(
"nav",
False))
164 everything_bag_info = subprocess.check_output([
"rosbag",
"info",
"everything.bag"])
165 self.assertTrue(
"nav_msgs/Odometry" in everything_bag_info)
166 self.assertTrue(
"geometry_msgs/Point" in everything_bag_info)
168 nav_bag_info = subprocess.check_output([
"rosbag",
"info",
"nav.bag"])
169 self.assertTrue(
"nav_msgs/Odometry" in nav_bag_info)
170 self.assertFalse(
"geometry_msgs/Point" in nav_bag_info)
173 self.assertTrue(set_bagging_state_srv(
"everything",
False))
174 self.assertTrue(set_bagging_state_srv(
"nav",
False))
181 if __name__ ==
'__main__':
182 rosunit.unitrun(PKG, NAME, TestBaggerNode)
def test_bag_creation_and_bagging(self)
def test_absent_set_bag_state(self)
def test_basic_set_bag_states(self)
def test_bagging_state_names_outputs(self)