16 import rocon_console.console
as console
17 from rocon_gateway
import samples
18 from rocon_gateway
import GatewaySampleRuntimeError
19 from rocon_gateway
import Graph
20 from rocon_gateway
import GatewayError
77 rospy.init_node(
'test_advertisements')
87 except Exception
as exc:
91 rospy.loginfo(console.cyan +
"********************************************************************" + console.reset)
92 rospy.loginfo(console.cyan +
"* Advertise All" + console.reset)
93 rospy.loginfo(console.cyan +
"********************************************************************" + console.reset)
95 samples.advertise_all()
96 except GatewaySampleRuntimeError
as e:
97 self.fail(
"Runtime error caught when advertising all connections.")
100 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
101 self.assertEquals(
"5", str(len(public_interface)))
103 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/add_two_ints" and rule.node ==
"/add_two_ints_server" and rule.type ==
"service"]), 1)
104 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/talker" and rule.type ==
"publisher"]), 1)
105 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/listener" and rule.type ==
"subscriber"]), 1)
106 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/server" and rule.node ==
"/fibonacci_server" and rule.type ==
"action_server"]), 1)
107 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/client" and rule.node ==
"/fibonacci_client" and rule.type ==
"action_client"]), 1)
111 samples.advertise_all(cancel=
True)
112 except GatewaySampleRuntimeError
as e:
113 self.fail(
"Runtime error caught when unadvertising all connections.")
117 rospy.loginfo(console.cyan +
"********************************************************************" + console.reset)
118 rospy.loginfo(console.cyan +
"* Advertise Tutorials" + console.reset)
119 rospy.loginfo(console.cyan +
"********************************************************************" + console.reset)
121 samples.advertise_tutorials()
122 except GatewaySampleRuntimeError
as e:
123 self.fail(
"Runtime error caught when advertising tutorial connections.")
126 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
127 self.assertEquals(
"5", str(len(public_interface)))
129 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/add_two_ints" and rule.node ==
"/add_two_ints_server" and rule.type ==
"service"]), 1)
130 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/talker" and rule.type ==
"publisher"]), 1)
131 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/listener" and rule.type ==
"subscriber"]), 1)
132 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/server" and rule.node ==
"/fibonacci_server" and rule.type ==
"action_server"]), 1)
133 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/client" and rule.node ==
"/fibonacci_client" and rule.type ==
"action_client"]), 1)
136 samples.advertise_tutorials(cancel=
True)
137 except GatewaySampleRuntimeError
as e:
138 self.fail(
"Runtime error caught when unadvertising tutorial connections.")
142 print(
"\n********************************************************************")
143 print(
"* Advertise Regex Tutorials")
144 print(
"********************************************************************")
146 samples.advertise_tutorials(regex_patterns=
True)
147 except GatewaySampleRuntimeError
as e:
148 self.fail(
"Runtime error caught when advertising tutorial connections.")
151 print(
"%s" % self.graph._local_gateway)
152 self.assertEquals(
"5", str(len(public_interface)))
154 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/add_two_ints" and rule.node ==
"/add_two_ints_server" and rule.type ==
"service"]), 1)
155 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/talker" and rule.type ==
"publisher"]), 1)
156 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/chatter" and rule.node ==
"/listener" and rule.type ==
"subscriber"]), 1)
157 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/server" and rule.node ==
"/fibonacci_server" and rule.type ==
"action_server"]), 1)
158 self.assertEquals(len([rule
for rule
in public_interface
if rule.name ==
"/fibonacci/client" and rule.node ==
"/fibonacci_client" and rule.type ==
"action_client"]), 1)
161 samples.advertise_tutorials(cancel=
True, regex_patterns=
True)
162 except GatewaySampleRuntimeError
as e:
163 self.fail(
"Runtime error caught when unadvertising tutorial connections.")
174 public_interface =
None 175 while not public_interface:
177 public_interface = self.graph._local_gateway.public_interface
178 rospy.rostime.wallsleep(0.2)
179 return public_interface
182 start_time = rospy.Time.now()
185 rospy.loginfo(console.cyan +
" - getting public interface" + console.reset)
186 public_interface = self.graph._local_gateway.public_interface
187 rospy.loginfo(console.cyan +
" - public interface: \n" + console.reset +
"%s" % public_interface)
188 if not public_interface:
192 rospy.rostime.wallsleep(0.2)
193 if rospy.Time.now() - start_time > rospy.Duration(10.0):
194 result =
"timed out waiting for public interface to clear" 196 self.assertEqual(
"cleared", result)
198 NAME =
'test_advertisements' 199 if __name__ ==
'__main__':
201 rosunit.unitrun(
'test_advertisements', NAME, TestAdvertisements, sys.argv, coverage_packages=[
'rocon_gateway'])
def test_advertise_regex_tutorials(self)
def _assert_cleared_public_interface(self)
def test_advertise_tutorials(self)
def test_advertise_all(self)
def _wait_for_public_interface(self)
Utility methods.