12 import rocon_console.console
as console
13 from rocon_gateway
import samples
14 from rocon_gateway
import GatewaySampleRuntimeError
15 from rocon_gateway
import Graph
16 from rocon_gateway
import GatewayError
28 print(
"\n********************************************************************")
29 print(
"* Pull Tests Setup")
30 print(
"********************************************************************")
31 rospy.init_node(
'test_pulls')
35 print(
"\n********************************************************************")
37 print(
"********************************************************************")
40 except GatewaySampleRuntimeError
as e:
41 self.fail(
"Runtime error caught when advertising all connections.")
43 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
44 self.assertEquals(
"5", str(len(pulled_interface)))
46 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/add_two_ints" and remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and remote_rule.rule.type ==
"service"]), 1)
47 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/talker" and remote_rule.rule.type ==
"publisher"]), 1)
48 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/listener" and remote_rule.rule.type ==
"subscriber"]), 1)
49 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/server" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and remote_rule.rule.type ==
"action_server"]), 1)
50 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/client" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and remote_rule.rule.type ==
"action_client"]), 1)
54 samples.pull_all(cancel=
True)
55 except GatewaySampleRuntimeError
as e:
56 self.fail(
"Runtime error caught when unadvertising all connections.")
60 print(
"\n********************************************************************")
61 print(
"* Pull Tutorials")
62 print(
"********************************************************************")
64 samples.pull_tutorials()
65 except GatewaySampleRuntimeError
as e:
66 self.fail(
"Runtime error caught when advertising tutorial connections.")
68 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
69 self.assertEquals(
"5", str(len(pulled_interface)))
71 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/add_two_ints" and remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and remote_rule.rule.type ==
"service"]), 1)
72 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/talker" and remote_rule.rule.type ==
"publisher"]), 1)
73 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/listener" and remote_rule.rule.type ==
"subscriber"]), 1)
74 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/server" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and remote_rule.rule.type ==
"action_server"]), 1)
75 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/client" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and remote_rule.rule.type ==
"action_client"]), 1)
78 samples.pull_tutorials(cancel=
True)
79 except GatewaySampleRuntimeError
as e:
80 self.fail(
"Runtime error caught when unadvertising tutorial connections.")
84 print(
"\n********************************************************************")
85 print(
"* Pull Regex Tutorials")
86 print(
"********************************************************************")
88 samples.pull_tutorials(regex_patterns=
True)
89 except GatewaySampleRuntimeError
as e:
90 self.fail(
"Runtime error caught when advertising tutorial connections.")
92 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
93 self.assertEquals(
"5", str(len(pulled_interface)))
95 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/add_two_ints" and remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and remote_rule.rule.type ==
"service"]), 1)
96 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/talker" and remote_rule.rule.type ==
"publisher"]), 1)
97 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/chatter" and remote_rule.rule.node.split(
',')[0] ==
"/listener" and remote_rule.rule.type ==
"subscriber"]), 1)
98 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/server" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and remote_rule.rule.type ==
"action_server"]), 1)
99 self.assertEquals(len([remote_rule
for remote_rule
in pulled_interface
if remote_rule.rule.name ==
"/fibonacci/client" and remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and remote_rule.rule.type ==
"action_client"]), 1)
102 samples.pull_tutorials(cancel=
True, regex_patterns=
True)
103 except GatewaySampleRuntimeError
as e:
104 self.fail(
"Runtime error caught when unadvertising tutorial connections.")
115 pulled_interface =
None 116 while not pulled_interface:
118 pulled_interface = self.graph._local_gateway.pulled_connections
120 return pulled_interface
123 start_time = rospy.Time.now()
126 pulled_interface = self.graph._local_gateway.pulled_connections
127 if not pulled_interface:
132 if rospy.Time.now() - start_time > rospy.Duration(2.0):
133 result =
"timed out waiting for pulled interface to clear" 135 self.assertEqual(
"cleared", result)
138 if __name__ ==
'__main__':
139 rosunit.unitrun(
'test_pulls', NAME, TestPulls, sys.argv, coverage_packages=[
'rocon_gateway'])
def _assert_cleared_pulled_interface(self)
def _wait_for_pulled_interface(self)
Utility methods.
def test_pull_regex_tutorials(self)
def test_pull_tutorials(self)