test_republisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Unit test for a ROS node that republishes messages from /input_topic to /output_topic.
3 # It uses the unittest framework and rostest for testing.
4 
5 import unittest
6 import rospy
7 import rostest
8 from std_msgs.msg import String
9 
10 class TestRepublisher(unittest.TestCase):
11  def setUp(self):
12  # Initialize the ROS node for testing
13  rospy.init_node('test_republisher', anonymous=True)
14  self.test_pub = rospy.Publisher('/input_topic', String, queue_size=10)
16  rospy.Subscriber('/output_topic', String, self.callback)
17 
18  # Safe way to Wait for publisher connections to be established
19  timeout = rospy.get_time() + 5.0 # Timeout limited to 5 seconds
20  rate = rospy.Rate(5) # ROS Rate at 5Hz
21  while self.test_pub.get_num_connections() == 0:
22  if rospy.get_time() > timeout:
23  self.fail("Test setup failed: Publisher connection timeout.")
24  rate.sleep()
25 
26  def callback(self, msg):
27  # Method to save messages received on /output_topic.
28  self.received_messages.append(msg.data)
29 
31  # Method to send a message (key=value) to /input_topic and
32  # check if the republisher correctly adds the prefix (input_to_output=) and sends it to /output_topic.
33  test_message = "key=value"
34  expected_message = f"input_to_output={test_message}"
35  rospy.loginfo(f"Publishing: {test_message} to /input_topic")
36  self.test_pub.publish(String(data=test_message))
37  try:
38  msg = rospy.wait_for_message('/output_topic', String, timeout=35)
39  print(f"Received message: {msg.data}")
40  except rospy.ROSException:
41  print("Timeout exceeded while waiting for a message.")
42  rospy.loginfo(f"Messages received: {self.received_messages}")
43  self.assertIn(expected_message, self.received_messages)
44 
45 if __name__ == '__main__':
46  rostest.rosrun('my_ros_package', 'test_republisher', TestRepublisher)
test_republisher.TestRepublisher.received_messages
received_messages
Definition: test_republisher.py:15
test_republisher.TestRepublisher.test_pub
test_pub
Definition: test_republisher.py:14
test_republisher.TestRepublisher.callback
def callback(self, msg)
Definition: test_republisher.py:26
test_republisher.TestRepublisher
Definition: test_republisher.py:10
test_republisher.TestRepublisher.test_message_republishing
def test_message_republishing(self)
Definition: test_republisher.py:30
test_republisher.TestRepublisher.setUp
def setUp(self)
Definition: test_republisher.py:11


inorbit_republisher
Author(s):
autogenerated on Sat Feb 1 2025 03:43:34