16 import rocon_console.console
as console
17 from rocon_gateway
import samples
18 from rocon_gateway
import GatewaySampleRuntimeError
19 from rocon_gateway
import Graph
20 from rocon_gateway
import GatewayError
77 rospy.init_node(
'test_pullments')
88 print(console.bold +
"\n********************************************************************" + console.reset)
89 print(console.bold +
"* Flip All" + console.reset)
90 print(console.bold +
"********************************************************************" + console.reset)
93 except GatewaySampleRuntimeError
as e:
94 self.fail(
"Runtime error caught when flipping all connections [%s]" % str(e))
96 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
97 self.assertEquals(
"5", str(len(flipped_interface)))
99 self.assertEquals(len([flip
for flip
in flipped_interface
if flip.remote_rule.rule.name ==
"/add_two_ints" and flip.remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and flip.remote_rule.rule.type ==
"service"]), 1)
100 self.assertEquals(len([flip
for flip
in flipped_interface
if flip.remote_rule.rule.name ==
"/chatter" and flip.remote_rule.rule.node.split(
',')[0] ==
"/talker" and flip.remote_rule.rule.type ==
"publisher"]), 1)
101 self.assertEquals(len([flip
for flip
in flipped_interface
if flip.remote_rule.rule.name ==
"/chatter" and flip.remote_rule.rule.node.split(
',')[0] ==
"/listener" and flip.remote_rule.rule.type ==
"subscriber"]), 1)
102 self.assertEquals(len([flip
for flip
in flipped_interface
if flip.remote_rule.rule.name ==
"/fibonacci/server" and flip.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and flip.remote_rule.rule.type ==
"action_server"]), 1)
103 self.assertEquals(len([flip
for flip
in flipped_interface
if flip.remote_rule.rule.name ==
"/fibonacci/client" and flip.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and flip.remote_rule.rule.type ==
"action_client"]), 1)
106 self.assertTrue(result)
109 samples.flip_all(cancel=
True)
110 except GatewaySampleRuntimeError
as e:
111 self.fail(
"Runtime error caught when unflipping all connections [%s]" % str(e))
115 print(console.bold +
"\n********************************************************************" + console.reset)
116 print(console.bold +
"* Flip Tutorials" + console.reset)
117 print(console.bold +
"********************************************************************" + console.reset)
119 samples.flip_tutorials()
120 except GatewaySampleRuntimeError
as e:
121 self.fail(
"Runtime error caught when flipping tutorial connections.")
123 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
124 self.assertEquals(
"5", str(len(flipped_interface)))
126 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/add_two_ints" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and remote_rule.remote_rule.rule.type ==
"service"]), 1)
127 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/chatter" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/talker" and remote_rule.remote_rule.rule.type ==
"publisher"]), 1)
128 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/chatter" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/listener" and remote_rule.remote_rule.rule.type ==
"subscriber"]), 1)
129 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/fibonacci/server" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and remote_rule.remote_rule.rule.type ==
"action_server"]), 1)
130 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/fibonacci/client" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and remote_rule.remote_rule.rule.type ==
"action_client"]), 1)
133 self.assertTrue(result)
135 samples.flip_tutorials(cancel=
True)
136 except GatewaySampleRuntimeError
as e:
137 self.fail(
"Runtime error caught when unflipping tutorial connections.")
141 print(console.bold +
"\n********************************************************************" + console.reset)
142 print(console.bold +
"* Flip Regex Tutorials" + console.reset)
143 print(console.bold +
"********************************************************************" + console.reset)
145 samples.flip_tutorials(regex_patterns=
True)
146 except GatewaySampleRuntimeError
as e:
147 self.fail(
"Runtime error caught when flipping tutorial connections.")
149 rospy.loginfo(console.cyan +
" - local gateway graph : \n%s" % self.graph._local_gateway + console.reset)
150 self.assertEquals(
"5", str(len(flipped_interface)))
152 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/add_two_ints" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/add_two_ints_server" and remote_rule.remote_rule.rule.type ==
"service"]), 1)
153 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/chatter" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/talker" and remote_rule.remote_rule.rule.type ==
"publisher"]), 1)
154 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/chatter" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/listener" and remote_rule.remote_rule.rule.type ==
"subscriber"]), 1)
155 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/fibonacci/server" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_server" and remote_rule.remote_rule.rule.type ==
"action_server"]), 1)
156 self.assertEquals(len([remote_rule
for remote_rule
in flipped_interface
if remote_rule.remote_rule.rule.name ==
"/fibonacci/client" and remote_rule.remote_rule.rule.node.split(
',')[0] ==
"/fibonacci_client" and remote_rule.remote_rule.rule.type ==
"action_client"]), 1)
159 self.assertTrue(result)
161 samples.flip_tutorials(cancel=
True, regex_patterns=
True)
162 except GatewaySampleRuntimeError
as e:
163 self.fail(
"Runtime error caught when unflipping tutorial connections.")
175 start_time = rospy.Time.now()
176 while not result
and (rospy.Time.now() - start_time) < rospy.Duration(5.0):
180 flipped_interface = self.graph._local_gateway.flipped_connections
181 if not flipped_interface:
183 for remote_rule
in flipped_interface:
185 if remote_rule.status !=
'accepted':
191 flipped_interface =
None 192 while not flipped_interface:
194 flipped_interface = self.graph._local_gateway.flipped_connections
196 return flipped_interface
199 start_time = rospy.Time.now()
202 flipped_interface = self.graph._local_gateway.flipped_connections
203 if not flipped_interface:
208 if rospy.Time.now() - start_time > rospy.Duration(2.0):
209 result =
"timed out waiting for flipped interface to clear" 211 self.assertEqual(
"cleared", result)
214 if __name__ ==
'__main__':
216 rosunit.unitrun(
'test_flips', NAME, TestFlips, sys.argv, coverage_packages=[
'rocon_gateway'])
def _wait_for_accepted_flipped_interface(self)
Utility methods.
def test_flip_regex_tutorials(self)
def _assert_cleared_flipped_interface(self)
def test_flip_tutorials(self)
def _wait_for_flipped_interface(self)