test_pulls.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # License: BSD
4 # https://raw.github.com/robotics-in-concert/rocon_multimaster/hydro_devel/rocon_gateway_tests/LICENSE
5 #
6 ##############################################################################
7 # Imports
8 ##############################################################################
9 
10 import rospy
11 import sys
12 import rocon_console.console as console
13 from rocon_gateway import samples
14 from rocon_gateway import GatewaySampleRuntimeError
15 from rocon_gateway import Graph
16 from rocon_gateway import GatewayError
17 import unittest
18 import rosunit
19 
20 
21 ##############################################################################
22 # Main
23 ##############################################################################
24 
25 class TestPulls(unittest.TestCase):
26 
27  def setUp(self):
28  print("\n********************************************************************")
29  print("* Pull Tests Setup")
30  print("********************************************************************")
31  rospy.init_node('test_pulls')
32  self.graph = Graph()
33 
34  def test_pull_all(self):
35  print("\n********************************************************************")
36  print("* Pull All")
37  print("********************************************************************")
38  try:
39  samples.pull_all()
40  except GatewaySampleRuntimeError as e:
41  self.fail("Runtime error caught when advertising all connections.")
42  pulled_interface = self._wait_for_pulled_interface()
43  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
44  self.assertEquals("5", str(len(pulled_interface)))
45 
46  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/add_two_ints" and remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and remote_rule.rule.type == "service"]), 1)
47  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/talker" and remote_rule.rule.type == "publisher"]), 1)
48  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/listener" and remote_rule.rule.type == "subscriber"]), 1)
49  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/server" and remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and remote_rule.rule.type == "action_server"]), 1)
50  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/client" and remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and remote_rule.rule.type == "action_client"]), 1)
51 
52  # Revert state
53  try:
54  samples.pull_all(cancel=True)
55  except GatewaySampleRuntimeError as e:
56  self.fail("Runtime error caught when unadvertising all connections.")
58 
60  print("\n********************************************************************")
61  print("* Pull Tutorials")
62  print("********************************************************************")
63  try:
64  samples.pull_tutorials()
65  except GatewaySampleRuntimeError as e:
66  self.fail("Runtime error caught when advertising tutorial connections.")
67  pulled_interface = self._wait_for_pulled_interface()
68  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
69  self.assertEquals("5", str(len(pulled_interface)))
70 
71  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/add_two_ints" and remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and remote_rule.rule.type == "service"]), 1)
72  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/talker" and remote_rule.rule.type == "publisher"]), 1)
73  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/listener" and remote_rule.rule.type == "subscriber"]), 1)
74  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/server" and remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and remote_rule.rule.type == "action_server"]), 1)
75  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/client" and remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and remote_rule.rule.type == "action_client"]), 1)
76 
77  try:
78  samples.pull_tutorials(cancel=True)
79  except GatewaySampleRuntimeError as e:
80  self.fail("Runtime error caught when unadvertising tutorial connections.")
82 
84  print("\n********************************************************************")
85  print("* Pull Regex Tutorials")
86  print("********************************************************************")
87  try:
88  samples.pull_tutorials(regex_patterns=True)
89  except GatewaySampleRuntimeError as e:
90  self.fail("Runtime error caught when advertising tutorial connections.")
91  pulled_interface = self._wait_for_pulled_interface()
92  rospy.loginfo(console.cyan + " - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
93  self.assertEquals("5", str(len(pulled_interface)))
94 
95  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/add_two_ints" and remote_rule.rule.node.split(',')[0] == "/add_two_ints_server" and remote_rule.rule.type == "service"]), 1)
96  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/talker" and remote_rule.rule.type == "publisher"]), 1)
97  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/chatter" and remote_rule.rule.node.split(',')[0] == "/listener" and remote_rule.rule.type == "subscriber"]), 1)
98  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/server" and remote_rule.rule.node.split(',')[0] == "/fibonacci_server" and remote_rule.rule.type == "action_server"]), 1)
99  self.assertEquals(len([remote_rule for remote_rule in pulled_interface if remote_rule.rule.name == "/fibonacci/client" and remote_rule.rule.node.split(',')[0] == "/fibonacci_client" and remote_rule.rule.type == "action_client"]), 1)
100 
101  try:
102  samples.pull_tutorials(cancel=True, regex_patterns=True)
103  except GatewaySampleRuntimeError as e:
104  self.fail("Runtime error caught when unadvertising tutorial connections.")
106 
107  def tearDown(self):
108  pass
109 
110  ##########################################################################
111  # Utility methods
112  ##########################################################################
113 
115  pulled_interface = None
116  while not pulled_interface:
117  self.graph.update()
118  pulled_interface = self.graph._local_gateway.pulled_connections
119  rospy.sleep(1.0)
120  return pulled_interface
121 
123  start_time = rospy.Time.now()
124  while True:
125  self.graph.update()
126  pulled_interface = self.graph._local_gateway.pulled_connections
127  if not pulled_interface:
128  result = "cleared"
129  break
130  else:
131  rospy.sleep(0.2)
132  if rospy.Time.now() - start_time > rospy.Duration(2.0):
133  result = "timed out waiting for pulled interface to clear"
134  break
135  self.assertEqual("cleared", result)
136 
137 NAME = 'test_pulls'
138 if __name__ == '__main__':
139  rosunit.unitrun('test_pulls', NAME, TestPulls, sys.argv, coverage_packages=['rocon_gateway'])
140 
def _assert_cleared_pulled_interface(self)
Definition: test_pulls.py:122
def test_pull_all(self)
Definition: test_pulls.py:34
def _wait_for_pulled_interface(self)
Utility methods.
Definition: test_pulls.py:114
def test_pull_regex_tutorials(self)
Definition: test_pulls.py:83
def test_pull_tutorials(self)
Definition: test_pulls.py:59


rocon_gateway_tests
Author(s): Daniel Stonier, Jihoon Lee, Piyush Khandelwal
autogenerated on Mon Jun 10 2019 14:40:15