test_advertisements.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import time
11 import os
12 import sys
13 try:
14  import pyros_setup
15  import rospy
16  import rocon_console.console as console
17  from rocon_gateway import samples
18  from rocon_gateway import GatewaySampleRuntimeError
19  from rocon_gateway import Graph
20  from rocon_gateway import GatewayError
21  import unittest
22  import rosunit
23  import roslaunch
24 except ImportError:
25  raise
26 
27  # import pyros_setup
28  # sys.modules["pyros_setup"] = pyros_setup.delayed_import_auto(base_path=os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
29  # import rospy
30  # import rocon_console.console as console
31  # from rocon_gateway import samples
32  # from rocon_gateway import GatewaySampleRuntimeError
33  # from rocon_gateway import Graph
34  # from rocon_gateway import GatewayError
35  # import unittest
36  # import rosunit
37  # import roslaunch
38 
39 # roscore_process = None
40 # master = None
41 # launch = None
42 
43 
44 # def setup_module():
45 # global master
46 # global roscore_process
47 # os.environ['ROS_MASTER_URI'] = "http://localhost:11301"
48 # master, roscore_process = pyros_setup.get_master()
49 # assert master.is_online()
50 #
51 # global launch
52 # # initialize the launch environment first
53 # launch = roslaunch.scriptapi.ROSLaunch()
54 # launch.start()
55 
56 
57 # def teardown_module():
58 # # finishing all process
59 # if roscore_process is not None:
60 # roscore_process.terminate() # make sure everything is stopped
61 # rospy.signal_shutdown('test complete')
62 # while roscore_process and roscore_process.is_alive():
63 # time.sleep(0.2) # waiting for roscore to die
64 # assert not (roscore_process and master.is_online())
65 
66 
67 ##############################################################################
68 # Main
69 ##############################################################################
70 
71 class TestAdvertisements(unittest.TestCase):
72 
73  @classmethod
74  def setUpClass(cls):
75  # Note this should be called only once by process.
76  # We only run one process here, for all 3 tests
77  rospy.init_node('test_advertisements')
78 
79  @classmethod
80  def tearDownClass(cls):
81  # shutting down process here
82  pass
83 
84  def setUp(self):
85  try:
86  self.graph = Graph()
87  except Exception as exc:
88  self.fail()
89 
90  def test_advertise_all(self):
91  rospy.loginfo(console.cyan + "********************************************************************" + console.reset)
92  rospy.loginfo(console.cyan + "* Advertise All" + console.reset)
93  rospy.loginfo(console.cyan + "********************************************************************" + console.reset)
94  try:
95  samples.advertise_all()
96  except GatewaySampleRuntimeError as e:
97  self.fail("Runtime error caught when advertising all connections.")
98  time.sleep(2)
99  public_interface = self._wait_for_public_interface()
100  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
101  self.assertEquals("5", str(len(public_interface)))
102 
103  self.assertEquals(len([rule for rule in public_interface if rule.name == "/add_two_ints" and rule.node == "/add_two_ints_server" and rule.type == "service"]), 1)
104  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/talker" and rule.type == "publisher"]), 1)
105  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/listener" and rule.type == "subscriber"]), 1)
106  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/server" and rule.node == "/fibonacci_server" and rule.type == "action_server"]), 1)
107  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/client" and rule.node == "/fibonacci_client" and rule.type == "action_client"]), 1)
108 
109  # Revert state
110  try:
111  samples.advertise_all(cancel=True)
112  except GatewaySampleRuntimeError as e:
113  self.fail("Runtime error caught when unadvertising all connections.")
115 
117  rospy.loginfo(console.cyan + "********************************************************************" + console.reset)
118  rospy.loginfo(console.cyan + "* Advertise Tutorials" + console.reset)
119  rospy.loginfo(console.cyan + "********************************************************************" + console.reset)
120  try:
121  samples.advertise_tutorials()
122  except GatewaySampleRuntimeError as e:
123  self.fail("Runtime error caught when advertising tutorial connections.")
124  time.sleep(2)
125  public_interface = self._wait_for_public_interface()
126  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
127  self.assertEquals("5", str(len(public_interface)))
128 
129  self.assertEquals(len([rule for rule in public_interface if rule.name == "/add_two_ints" and rule.node == "/add_two_ints_server" and rule.type == "service"]), 1)
130  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/talker" and rule.type == "publisher"]), 1)
131  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/listener" and rule.type == "subscriber"]), 1)
132  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/server" and rule.node == "/fibonacci_server" and rule.type == "action_server"]), 1)
133  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/client" and rule.node == "/fibonacci_client" and rule.type == "action_client"]), 1)
134 
135  try:
136  samples.advertise_tutorials(cancel=True)
137  except GatewaySampleRuntimeError as e:
138  self.fail("Runtime error caught when unadvertising tutorial connections.")
140 
142  print("\n********************************************************************")
143  print("* Advertise Regex Tutorials")
144  print("********************************************************************")
145  try:
146  samples.advertise_tutorials(regex_patterns=True)
147  except GatewaySampleRuntimeError as e:
148  self.fail("Runtime error caught when advertising tutorial connections.")
149  time.sleep(2)
150  public_interface = self._wait_for_public_interface()
151  print("%s" % self.graph._local_gateway)
152  self.assertEquals("5", str(len(public_interface)))
153 
154  self.assertEquals(len([rule for rule in public_interface if rule.name == "/add_two_ints" and rule.node == "/add_two_ints_server" and rule.type == "service"]), 1)
155  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/talker" and rule.type == "publisher"]), 1)
156  self.assertEquals(len([rule for rule in public_interface if rule.name == "/chatter" and rule.node == "/listener" and rule.type == "subscriber"]), 1)
157  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/server" and rule.node == "/fibonacci_server" and rule.type == "action_server"]), 1)
158  self.assertEquals(len([rule for rule in public_interface if rule.name == "/fibonacci/client" and rule.node == "/fibonacci_client" and rule.type == "action_client"]), 1)
159 
160  try:
161  samples.advertise_tutorials(cancel=True, regex_patterns=True)
162  except GatewaySampleRuntimeError as e:
163  self.fail("Runtime error caught when unadvertising tutorial connections.")
165 
166  def tearDown(self):
167  pass
168 
169  ##########################################################################
170  # Utility methods
171  ##########################################################################
172 
174  public_interface = None
175  while not public_interface:
176  self.graph.update()
177  public_interface = self.graph._local_gateway.public_interface
178  rospy.rostime.wallsleep(0.2)
179  return public_interface
180 
182  start_time = rospy.Time.now()
183  while True:
184  self.graph.update()
185  rospy.loginfo(console.cyan + " - getting public interface" + console.reset)
186  public_interface = self.graph._local_gateway.public_interface
187  rospy.loginfo(console.cyan + " - public interface: \n" + console.reset + "%s" % public_interface)
188  if not public_interface:
189  result = "cleared"
190  break
191  else:
192  rospy.rostime.wallsleep(0.2)
193  if rospy.Time.now() - start_time > rospy.Duration(10.0):
194  result = "timed out waiting for public interface to clear"
195  break
196  self.assertEqual("cleared", result)
197 
198 NAME = 'test_advertisements'
199 if __name__ == '__main__':
200  #setup_module() # because this is not nose
201  rosunit.unitrun('test_advertisements', NAME, TestAdvertisements, sys.argv, coverage_packages=['rocon_gateway'])
202  #teardown_module() # because this is not nose
203 
def _wait_for_public_interface(self)
Utility methods.


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:15