12 import rocon_console.console
as console
13 from rocon_gateway
import Graph
14 from rocon_gateway
import GatewayError
17 import rocon_gateway_utils
25 print(
"[TEST] %s" % msg)
35 rospy.init_node(
'test_graph')
43 flips = self.graph._local_gateway.flip_watchlist
44 rospy.rostime.wallsleep(0.2)
45 printtest(
"********************************************************************")
47 printtest(
"********************************************************************")
48 printtest(
"%s" % self.graph._local_gateway)
49 self.assertEquals(
"5", str(len(flips)))
52 self.assertEquals(len([flip
for flip
in flips
if flip.gateway ==
"remote_gateway" and flip.rule.name ==
"/add_two_ints" and flip.rule.type ==
"service"]), 1)
53 self.assertEquals(len([flip
for flip
in flips
if flip.gateway ==
"remote_gateway" and flip.rule.name ==
"/chatter" and flip.rule.type ==
"publisher"]), 1)
54 self.assertEquals(len([flip
for flip
in flips
if flip.gateway ==
"remote_gateway" and flip.rule.name ==
"/chatter" and flip.rule.type ==
"subscriber"]), 1)
55 self.assertEquals(len([flip
for flip
in flips
if flip.gateway ==
"remote_gateway" and flip.rule.name ==
"/fibonacci" and flip.rule.type ==
"action_server"]), 1)
56 self.assertEquals(len([flip
for flip
in flips
if flip.gateway ==
"remote_gateway" and flip.rule.name ==
"/fibonacci" and flip.rule.type ==
"action_client"]), 1)
58 printtest(
"********************************************************************")
60 printtest(
"********************************************************************")
61 printtest(
"%s" % self.graph._remote_gateways)
62 for remote_gateway
in self.graph._remote_gateways:
63 self.assertEquals(
"remote_gateway", rocon_gateway_utils.gateway_basename(remote_gateway.name))
69 if __name__ ==
'__main__':
70 rosunit.unitrun(
'test_graph', NAME, TestGraph, sys.argv, coverage_packages=[
'rocon_gateway'])
def printtest(msg)
Logging.