test_block.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import unittest
4 
5 import rospy
6 
7 try:
8  from std_msgs.msg import String
9 except:
10  import roslib
11  roslib.load_manifest("jsk_topic_tools")
12  from std_msgs.msg import String
13 
14 
15 PKG = 'jsk_topic_tools'
16 NAME = 'test_block'
17 
18 
19 class TestBlock(unittest.TestCase):
20  input_msg = None
21  subscriber = None
22 
23  def __init__(self, *args):
24  super(TestBlock, self).__init__(*args)
25  rospy.init_node(NAME)
26  self.output_original_pub = rospy.Publisher(
27  'output_original', String, queue_size=1)
28  self.input_original_sub = rospy.Subscriber(
29  'input_original', String, self.original_topic_cb)
30 
31  def original_topic_cb(self, msg):
32  self.running = True
33  self.output_original_pub.publish(msg)
34 
35  def reset_subscriber(self):
36  if self.subscriber:
37  self.subscriber.unregister()
38  self.subscriber = None
39  self.input_msg = None
40  self.running = False
41 
42  def test_subscribe(self):
43  self.reset_subscriber()
44  self.subscriber = rospy.Subscriber("output", String, self.cb)
45  rospy.loginfo("wait 10 seconds...")
46  rospy.sleep(10)
47  self.assertFalse(self.input_msg is None)
48 
49  def cb(self, msg):
50  self.input_msg = msg
51 
52  def test_no_subscribe(self):
53  self.reset_subscriber()
54  # do not subscribe
55  rospy.loginfo("wait 10 seconds...")
56  rospy.sleep(10)
57  self.assertTrue(self.input_msg is None)
58 
59 
60 if __name__ == "__main__":
61  import rostest
62  rostest.rosrun(PKG, NAME, TestBlock)
def __init__(self, args)
Definition: test_block.py:23
def test_no_subscribe(self)
Definition: test_block.py:52
def reset_subscriber(self)
Definition: test_block.py:35
def test_subscribe(self)
Definition: test_block.py:42
def original_topic_cb(self, msg)
Definition: test_block.py:31
def cb(self, msg)
Definition: test_block.py:49


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19