test_flips.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import time
11 import os
12 import sys
13 try:
14  import pyros_setup
15  import rospy
16  import rocon_console.console as console
17  from rocon_gateway import samples
18  from rocon_gateway import GatewaySampleRuntimeError
19  from rocon_gateway import Graph
20  from rocon_gateway import GatewayError
21  import unittest
22  import rosunit
23  import roslaunch
24 except ImportError:
25  raise
26 
27  # import pyros_setup
28  # sys.modules["pyros_setup"] = pyros_setup.delayed_import_auto(base_path=os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
29  # import rospy
30  # import rocon_console.console as console
31  # from rocon_gateway import samples
32  # from rocon_gateway import GatewaySampleRuntimeError
33  # from rocon_gateway import Graph
34  # from rocon_gateway import GatewayError
35  # import unittest
36  # import rosunit
37  # import roslaunch
38 
39 # roscore_process = None
40 # master = None
41 # launch = None
42 
43 
44 # def setup_module():
45 # global master
46 # global roscore_process
47 # os.environ['ROS_MASTER_URI'] = "http://localhost:11331"
48 # master, roscore_process = pyros_setup.get_master()
49 # assert master.is_online()
50 #
51 # global launch
52 # # initialize the launch environment first
53 # launch = roslaunch.scriptapi.ROSLaunch()
54 # launch.start()
55 
56 
57 # def teardown_module():
58 # # finishing all process
59 # if roscore_process is not None:
60 # roscore_process.terminate() # make sure everything is stopped
61 # rospy.signal_shutdown('test complete')
62 # while roscore_process and roscore_process.is_alive():
63 # time.sleep(0.2) # waiting for roscore to die
64 # assert not (roscore_process and master.is_online())
65 
66 
67 ##############################################################################
68 # Main
69 ##############################################################################
70 
71 class TestFlips(unittest.TestCase):
72 
73  @classmethod
74  def setUpClass(cls):
75  # Note this should be called only once by process.
76  # We only run one process here, for all 3 tests
77  rospy.init_node('test_pullments')
78 
79  @classmethod
80  def tearDownClass(cls):
81  # shutting down process here
82  pass
83 
84  def setUp(self):
85  self.graph = Graph()
86 
87  def test_flip_all(self):
88  print(console.bold + "\n********************************************************************" + console.reset)
89  print(console.bold + "* Flip All" + console.reset)
90  print(console.bold + "********************************************************************" + console.reset)
91  try:
92  samples.flip_all()
93  except GatewaySampleRuntimeError as e:
94  self.fail("Runtime error caught when flipping all connections [%s]" % str(e))
95  flipped_interface = self._wait_for_flipped_interface()
96  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
97  self.assertEquals("5", str(len(flipped_interface)))
98 
99  self.assertEquals(len([flip for flip in flipped_interface if flip.remote_rule.rule.name == "/add_two_ints" and flip.remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and flip.remote_rule.rule.type == "service"]), 1)
100  self.assertEquals(len([flip for flip in flipped_interface if flip.remote_rule.rule.name == "/chatter" and flip.remote_rule.rule.node.split(',')[0] == "/talker" and flip.remote_rule.rule.type == "publisher"]), 1)
101  self.assertEquals(len([flip for flip in flipped_interface if flip.remote_rule.rule.name == "/chatter" and flip.remote_rule.rule.node.split(',')[0] == "/listener" and flip.remote_rule.rule.type == "subscriber"]), 1)
102  self.assertEquals(len([flip for flip in flipped_interface if flip.remote_rule.rule.name == "/fibonacci/server" and flip.remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and flip.remote_rule.rule.type == "action_server"]), 1)
103  self.assertEquals(len([flip for flip in flipped_interface if flip.remote_rule.rule.name == "/fibonacci/client" and flip.remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and flip.remote_rule.rule.type == "action_client"]), 1)
104 
106  self.assertTrue(result)
107  # Revert state
108  try:
109  samples.flip_all(cancel=True)
110  except GatewaySampleRuntimeError as e:
111  self.fail("Runtime error caught when unflipping all connections [%s]" % str(e))
113 
115  print(console.bold + "\n********************************************************************" + console.reset)
116  print(console.bold + "* Flip Tutorials" + console.reset)
117  print(console.bold + "********************************************************************" + console.reset)
118  try:
119  samples.flip_tutorials()
120  except GatewaySampleRuntimeError as e:
121  self.fail("Runtime error caught when flipping tutorial connections.")
122  flipped_interface = self._wait_for_flipped_interface()
123  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
124  self.assertEquals("5", str(len(flipped_interface)))
125 
126  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/add_two_ints" and remote_rule.remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and remote_rule.remote_rule.rule.type == "service"]), 1)
127  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/chatter" and remote_rule.remote_rule.rule.node.split(',')[0] == "/talker" and remote_rule.remote_rule.rule.type == "publisher"]), 1)
128  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/chatter" and remote_rule.remote_rule.rule.node.split(',')[0] == "/listener" and remote_rule.remote_rule.rule.type == "subscriber"]), 1)
129  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/fibonacci/server" and remote_rule.remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and remote_rule.remote_rule.rule.type == "action_server"]), 1)
130  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/fibonacci/client" and remote_rule.remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and remote_rule.remote_rule.rule.type == "action_client"]), 1)
131 
133  self.assertTrue(result)
134  try:
135  samples.flip_tutorials(cancel=True)
136  except GatewaySampleRuntimeError as e:
137  self.fail("Runtime error caught when unflipping tutorial connections.")
139 
141  print(console.bold + "\n********************************************************************" + console.reset)
142  print(console.bold + "* Flip Regex Tutorials" + console.reset)
143  print(console.bold + "********************************************************************" + console.reset)
144  try:
145  samples.flip_tutorials(regex_patterns=True)
146  except GatewaySampleRuntimeError as e:
147  self.fail("Runtime error caught when flipping tutorial connections.")
148  flipped_interface = self._wait_for_flipped_interface()
149  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
150  self.assertEquals("5", str(len(flipped_interface)))
151 
152  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/add_two_ints" and remote_rule.remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and remote_rule.remote_rule.rule.type == "service"]), 1)
153  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/chatter" and remote_rule.remote_rule.rule.node.split(',')[0] == "/talker" and remote_rule.remote_rule.rule.type == "publisher"]), 1)
154  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/chatter" and remote_rule.remote_rule.rule.node.split(',')[0] == "/listener" and remote_rule.remote_rule.rule.type == "subscriber"]), 1)
155  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/fibonacci/server" and remote_rule.remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and remote_rule.remote_rule.rule.type == "action_server"]), 1)
156  self.assertEquals(len([remote_rule for remote_rule in flipped_interface if remote_rule.remote_rule.rule.name == "/fibonacci/client" and remote_rule.remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and remote_rule.remote_rule.rule.type == "action_client"]), 1)
157 
159  self.assertTrue(result)
160  try:
161  samples.flip_tutorials(cancel=True, regex_patterns=True)
162  except GatewaySampleRuntimeError as e:
163  self.fail("Runtime error caught when unflipping tutorial connections.")
165 
166  def tearDown(self):
167  pass
168 
169  ##########################################################################
170  # Utility methods
171  ##########################################################################
172 
174  result = False
175  start_time = rospy.Time.now()
176  while not result and (rospy.Time.now() - start_time) < rospy.Duration(5.0):
177  rospy.sleep(0.1)
178  self.graph.update()
179  accepted = True
180  flipped_interface = self.graph._local_gateway.flipped_connections
181  if not flipped_interface:
182  continue
183  for remote_rule in flipped_interface:
184  result = True
185  if remote_rule.status != 'accepted':
186  result = False
187  break
188  return result
189 
191  flipped_interface = None
192  while not flipped_interface:
193  self.graph.update()
194  flipped_interface = self.graph._local_gateway.flipped_connections
195  rospy.sleep(0.2)
196  return flipped_interface
197 
199  start_time = rospy.Time.now()
200  while True:
201  self.graph.update()
202  flipped_interface = self.graph._local_gateway.flipped_connections
203  if not flipped_interface:
204  result = "cleared"
205  break
206  else:
207  rospy.sleep(0.2)
208  if rospy.Time.now() - start_time > rospy.Duration(2.0):
209  result = "timed out waiting for flipped interface to clear"
210  break
211  self.assertEqual("cleared", result)
212 
213 NAME = 'test_flips'
214 if __name__ == '__main__':
215  #setup_module() # because this is not nose
216  rosunit.unitrun('test_flips', NAME, TestFlips, sys.argv, coverage_packages=['rocon_gateway'])
217  #teardown_module() # because this is not nose
def _wait_for_accepted_flipped_interface(self)
Utility methods.
Definition: test_flips.py:173
def test_flip_all(self)
Definition: test_flips.py:87
def test_flip_regex_tutorials(self)
Definition: test_flips.py:140
def tearDownClass(cls)
Definition: test_flips.py:80
def _assert_cleared_flipped_interface(self)
Definition: test_flips.py:198
def test_flip_tutorials(self)
Definition: test_flips.py:114
def _wait_for_flipped_interface(self)
Definition: test_flips.py:190


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:15