Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 from __future__ import print_function
00011
00012 import sys
00013 import unittest
00014 import rospy
00015 import rocon_gateway
00016 from std_msgs.msg import String
00017 import rosunit
00018 import rosgraph
00019 from gateway_msgs.msg import Rule, ConnectionType
00020
00021
00022
00023
00024
00025
00026 PUBLISHER = ConnectionType.PUBLISHER
00027 SUBSCRIBER = ConnectionType.SUBSCRIBER
00028 SERVICE = ConnectionType.SERVICE
00029 ACTION_SERVER = ConnectionType.ACTION_SERVER
00030 ACTION_CLIENT = ConnectionType.ACTION_CLIENT
00031
00032
00033
00034
00035
00036 def test_connection_cache():
00037 master = rosgraph.Master(rospy.get_name())
00038 connection_cache = rocon_gateway.ConnectionCache(master.getSystemState)
00039
00040 new_system_state = {}
00041 new_system_state[PUBLISHER] = [['/chatter', ['/talker']]]
00042 new_system_state[SUBSCRIBER] = []
00043 new_system_state[SERVICE] = []
00044 connection_cache.update(new_system_state)
00045 new_system_state[PUBLISHER] = [['/chatter', ['/talker', '/babbler']]]
00046 new_connections, lost_connections = connection_cache.update(new_system_state)
00047 assert "/babbler", new_connections[PUBLISHER][0].rule.node