00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 import os
00012 import socket
00013 import errno
00014 import roslib
00015 roslib.load_manifest('rocon_gateway')
00016 import rospy
00017 import rosgraph
00018 import rostopic
00019 import rosservice
00020 import roslib.names
00021 from rosmaster.util import xmlrpcapi
00022 try:
00023 import urllib.parse as urlparse
00024 except ImportError:
00025 import urlparse
00026 import re
00027 from gateway_msgs.msg import Rule, ConnectionType
00028 from utils import Connection
00029
00030
00031
00032
00033
00034
00035 class LocalMaster(rosgraph.Master):
00036 '''
00037 Representing a ros master (local ros master). Just contains a
00038 few utility methods for retrieving master related information as well
00039 as handles for registering and unregistering rules that have
00040 been pulled or flipped in from another gateway.
00041 '''
00042
00043 def __init__(self):
00044 rosgraph.Master.__init__(self, rospy.get_name())
00045
00046
00047
00048
00049
00050 def register(self, registration):
00051 '''
00052 Registers a rule with the local master.
00053
00054 @param registration : registration details
00055 @type utils.Registration
00056
00057 @return the updated registration object (only adds an anonymously generated local node name)
00058 @rtype utils.Registration
00059 '''
00060 registration.local_node = self._get_anonymous_node_name(registration.connection.rule.node)
00061 rospy.loginfo("Gateway : registering a new node [%s] for [%s]" % (registration.local_node, registration))
00062
00063
00064
00065
00066 node_master = rosgraph.Master(registration.local_node)
00067 if registration.connection.rule.type == ConnectionType.PUBLISHER:
00068 node_master.registerPublisher(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
00069 return registration
00070 elif registration.connection.rule.type == ConnectionType.SUBSCRIBER:
00071 self._register_subscriber(node_master, registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
00072 return registration
00073 elif registration.connection.rule.type == ConnectionType.SERVICE:
00074 if rosservice.get_service_node(registration.connection.rule.name):
00075 rospy.logwarn("Gateway : tried to register a service that is already locally available, aborting [%s]" % registration.connection.rule.name)
00076 return None
00077 else:
00078 node_master.registerService(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
00079 return registration
00080 elif registration.connection.rule.type == ConnectionType.ACTION_SERVER:
00081
00082 self._register_subscriber(node_master, registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
00083 self._register_subscriber(node_master, registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
00084 node_master.registerPublisher(registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
00085 node_master.registerPublisher(registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
00086 node_master.registerPublisher(registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
00087 return registration
00088 elif registration.connection.rule.type == ConnectionType.ACTION_CLIENT:
00089 node_master.registerPublisher(registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
00090 node_master.registerPublisher(registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
00091 self._register_subscriber(node_master, registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
00092 self._register_subscriber(node_master, registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
00093 self._register_subscriber(node_master, registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
00094 return registration
00095 return None
00096
00097 def unregister(self, registration):
00098 '''
00099 Unregisters a rule with the local master.
00100
00101 @param registration : registration details for an existing gateway registered rule
00102 @type utils.Registration
00103 '''
00104 node_master = rosgraph.Master(registration.local_node)
00105 rospy.loginfo("Gateway : unregistering local node [%s] for [%s]" % (registration.local_node, registration))
00106 if registration.connection.rule.type == ConnectionType.PUBLISHER:
00107 node_master.unregisterPublisher(registration.connection.rule.name, registration.connection.xmlrpc_uri)
00108 elif registration.connection.rule.type == ConnectionType.SUBSCRIBER:
00109 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name)
00110 elif registration.connection.rule.type == ConnectionType.SERVICE:
00111 node_master.unregisterService(registration.connection.rule.name, registration.connection.type_info)
00112 elif registration.connection.rule.type == ConnectionType.ACTION_SERVER:
00113 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/goal")
00114 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/cancel")
00115 node_master.unregisterPublisher(registration.connection.rule.name + "/status", registration.connection.xmlrpc_uri)
00116 node_master.unregisterPublisher(registration.connection.rule.name + "/feedback", registration.connection.xmlrpc_uri)
00117 node_master.unregisterPublisher(registration.connection.rule.name + "/result", registration.connection.xmlrpc_uri)
00118 elif registration.connection.rule.type == ConnectionType.ACTION_CLIENT:
00119 node_master.unregisterPublisher(registration.connection.rule.name + "/goal", registration.connection.xmlrpc_uri)
00120 node_master.unregisterPublisher(registration.connection.rule.name + "/cancel", registration.connection.xmlrpc_uri)
00121 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/status")
00122 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/feedback")
00123 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/result")
00124
00125 def _register_subscriber(self, node_master, name, type_info, xmlrpc_uri):
00126 '''
00127 This one is not necessary, since you can pretty much guarantee the
00128 existence of the subscriber here, but it pays to be safe - we've seen
00129 some errors come out here when the ROS_MASTER_URI was only set to
00130 localhost.
00131
00132 @param node_master : node-master xmlrpc method handler
00133 @param type_info : type of the subscriber message
00134 @param xmlrpc_uri : the uri of the node (xmlrpc server)
00135 @type string
00136 @param name : fully resolved subscriber name
00137 '''
00138
00139
00140 pub_uri_list = node_master.registerSubscriber(name, type_info, xmlrpc_uri)
00141 try:
00142 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, pub_uri_list)
00143 except socket.error, v:
00144 errorcode = v[0]
00145 if errorcode != errno.ECONNREFUSED:
00146 rospy.logerr("Gateway : error registering subscriber (is ROS_MASTER_URI and ROS_HOSTNAME or ROS_IP correctly set?)")
00147 rospy.logerr("Gateway : errorcode [%s] xmlrpc_uri [%s]" % (str(errorcode), xmlrpc_uri))
00148 raise
00149 else:
00150 pass
00151
00152 def _unregister_subscriber(self, node_master, xmlrpc_uri, name):
00153 '''
00154 It is a special case as it requires xmlrpc handling to inform the subscriber of
00155 the disappearance of publishers it was connected to. It also needs to handle the
00156 case when that information doesn't get to the subscriber because it is down.
00157
00158 @param node_master : node-master xmlrpc method handler
00159 @param xmlrpc_uri : the uri of the node (xmlrpc server)
00160 @type string
00161 @param name : fully resolved subscriber name
00162 '''
00163
00164
00165 try:
00166 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, [])
00167 except socket.error, v:
00168 errorcode = v[0]
00169 if errorcode != errno.ECONNREFUSED:
00170 raise
00171 else:
00172 pass
00173 node_master.unregisterSubscriber(name, xmlrpc_uri)
00174
00175
00176
00177
00178
00179 def generate_connection_details(self, type, name, node):
00180 '''
00181 Creates all the extra details to create a connection object from a
00182 rule.
00183
00184 @param type : the connection type (one of gateway_msgs.msg.ConnectionType)
00185 @type string
00186 @param name : the name of the connection
00187 @type string
00188 @param node : the master node name it comes from
00189 @param string
00190
00191 @return the utils.Connection object complete with type_info and xmlrpc_uri
00192 @type utils.Connection
00193 '''
00194 xmlrpc_uri = self.lookupNode(node)
00195 connections = []
00196 if type == ConnectionType.PUBLISHER or type == ConnectionType.SUBSCRIBER:
00197 type_info = rostopic.get_topic_type(name)[0]
00198 connections.append(Connection(Rule(type, name, node), type_info, xmlrpc_uri))
00199 elif type == ConnectionType.SERVICE:
00200 type_info = rosservice.get_service_uri(name)
00201 connections.append(Connection(Rule(type, name, node), type_info, xmlrpc_uri))
00202 elif type == ConnectionType.ACTION_SERVER:
00203 type_info = rostopic.get_topic_type(name + '/goal')[0]
00204 connections.append(Connection(Rule(ConnectionType.SUBSCRIBER, name + '/goal', node), type_info, xmlrpc_uri))
00205 type_info = rostopic.get_topic_type(name + '/cancel')[0]
00206 connections.append(Connection(Rule(ConnectionType.SUBSCRIBER, name + '/cancel', node), type_info, xmlrpc_uri))
00207 type_info = rostopic.get_topic_type(name + '/status')[0]
00208 connections.append(Connection(Rule(ConnectionType.PUBLISHER, name + '/status', node), type_info, xmlrpc_uri))
00209 type_info = rostopic.get_topic_type(name + '/feedback')[0]
00210 connections.append(Connection(Rule(ConnectionType.PUBLISHER, name + '/feedback', node), type_info, xmlrpc_uri))
00211 type_info = rostopic.get_topic_type(name + '/result')[0]
00212 connections.append(Connection(Rule(ConnectionType.PUBLISHER, name + '/result', node), type_info, xmlrpc_uri))
00213 elif type == ConnectionType.ACTION_CLIENT:
00214 type_info = rostopic.get_topic_type(name + '/goal')[0]
00215 connections.append(Connection(Rule(ConnectionType.PUBLISHER, name + '/goal', node), type_info, xmlrpc_uri))
00216 type_info = rostopic.get_topic_type(name + '/cancel')[0]
00217 connections.append(Connection(Rule(ConnectionType.PUBLISHER, name + '/cancel', node), type_info, xmlrpc_uri))
00218 type_info = rostopic.get_topic_type(name + '/status')[0]
00219 connections.append(Connection(Rule(ConnectionType.SUBSCRIBER, name + '/status', node), type_info, xmlrpc_uri))
00220 type_info = rostopic.get_topic_type(name + '/feedback')[0]
00221 connections.append(Connection(Rule(ConnectionType.SUBSCRIBER, name + '/feedback', node), type_info, xmlrpc_uri))
00222 type_info = rostopic.get_topic_type(name + '/result')[0]
00223 connections.append(Connection(Rule(ConnectionType.SUBSCRIBER, name + '/result', node), type_info, xmlrpc_uri))
00224 return connections
00225
00226 def get_ros_ip(self):
00227 o = urlparse.urlparse(rosgraph.get_master_uri())
00228 if o.hostname == 'localhost':
00229 ros_ip = ''
00230 try:
00231 ros_ip = os.environ['ROS_IP']
00232 except Exception:
00233 try:
00234
00235 ros_ip = os.environ['ROS_HOSTNAME']
00236 except Exception:
00237
00238 rospy.logwarn("Gateway: no valid ip found for this host, just setting 'localhost'")
00239 return 'localhost'
00240 return ros_ip
00241 else:
00242 return o.hostname
00243
00244 def _isTopicNodeInList(self, topic, node, list):
00245
00246 available = False
00247 for candidate in list:
00248 if candidate[0] == topic and node in candidate[1]:
00249 available = True
00250 break
00251 return available
00252
00253 def _getActions(self, pubs, subs):
00254 '''
00255 Return actions and pruned publisher, subscriber lists.
00256
00257 @param publishers
00258 @type list of publishers in the form returned by rosgraph.Master.getSystemState
00259 @param subscribers
00260 @type list of subscribers in the form returned by rosgraph.Master.getSystemState
00261 @return list of actions, pruned_publishers, pruned_subscribers
00262 @rtype [base_topic, [nodes]], as param type, as param type
00263 '''
00264
00265 actions = []
00266 for goal_candidate in pubs:
00267 if re.search('goal$', goal_candidate[0]):
00268
00269 base_topic = re.sub('\/goal$', '', goal_candidate[0])
00270 nodes = goal_candidate[1]
00271 action_nodes = []
00272
00273
00274 for node in nodes:
00275 is_action = True
00276 is_action &= self._isTopicNodeInList(base_topic + '/cancel', node, pubs)
00277 is_action &= self._isTopicNodeInList(base_topic + '/status', node, subs)
00278 is_action &= self._isTopicNodeInList(base_topic + '/feedback', node, subs)
00279 is_action &= self._isTopicNodeInList(base_topic + '/result', node, subs)
00280
00281 if is_action:
00282 action_nodes.append(node)
00283
00284 if len(action_nodes) != 0:
00285
00286 actions.append([base_topic, action_nodes])
00287
00288 for connection in pubs:
00289 if connection[0] in [base_topic + '/goal', base_topic + '/cancel']:
00290 connection[1].remove(node)
00291 for connection in subs:
00292 if connection[0] in [base_topic + '/status', base_topic + '/feedback', base_topic + '/result']:
00293 connection[1].remove(node)
00294 pubs[:] = [connection for connection in pubs if len(connection[1]) != 0]
00295 subs[:] = [connection for connection in subs if len(connection[1]) != 0]
00296 return actions, pubs, subs
00297
00298 def getActionServers(self, publishers, subscribers):
00299 '''
00300 Return action servers and pruned publisher, subscriber lists.
00301
00302 @param publishers
00303 @type list of publishers in the form returned by rosgraph.Master.getSystemState
00304 @param subscribers
00305 @type list of subscribers in the form returned by rosgraph.Master.getSystemState
00306 @return list of actions, pruned_publishers, pruned_subscribers
00307 @rtype [base_topic, [nodes]], as param type, as param type
00308 '''
00309 actions, subs, pubs = self._getActions(subscribers, publishers)
00310 return actions, pubs, subs
00311
00312 def getActionClients(self, publishers, subscribers):
00313 '''
00314 Return action clients and pruned publisher, subscriber lists.
00315
00316 @param publishers
00317 @type list of publishers in the form returned by rosgraph.Master.getSystemState
00318 @param subscribers
00319 @type list of subscribers in the form returned by rosgraph.Master.getSystemState
00320 @return list of actions, pruned_publishers, pruned_subscribers
00321 @rtype [base_topic, [nodes]], as param type, as param type
00322 '''
00323 actions, pubs, subs = self._getActions(publishers, subscribers)
00324 return actions, pubs, subs
00325
00326 def getConnectionsFromPubSubList(self, list, type):
00327 connections = []
00328 for topic in list:
00329 topic_name = topic[0]
00330 topic_type = rostopic.get_topic_type(topic_name)
00331 topic_type = topic_type[0]
00332 nodes = topic[1]
00333 for node in nodes:
00334 try:
00335 node_uri = self.lookupNode(node)
00336 except:
00337 continue
00338 rule = Rule(type, topic_name, node)
00339 connection = Connection(rule, topic_type, node_uri)
00340 connections.append(connection)
00341 return connections
00342
00343 def getConnectionsFromActionList(self, list, type):
00344 connections = []
00345 for action in list:
00346 action_name = action[0]
00347 goal_topic = action_name + '/goal'
00348 goal_topic_type = rostopic.get_topic_type(goal_topic)
00349 topic_type = re.sub('ActionGoal$', '', goal_topic_type[0])
00350 nodes = action[1]
00351 for node in nodes:
00352 try:
00353 node_uri = self.lookupNode(node)
00354 except:
00355 continue
00356 rule = Rule(type, action_name, node)
00357 connection = Connection(rule, topic_type, node_uri)
00358 connections.append(connection)
00359 return connections
00360
00361 def getConnectionsFromServiceList(self, list, type):
00362 connections = []
00363 for service in list:
00364 service_name = service[0]
00365 service_uri = rosservice.get_service_uri(service_name)
00366 nodes = service[1]
00367 for node in nodes:
00368 try:
00369 node_uri = self.lookupNode(node)
00370 except:
00371 continue
00372 rule = Rule(type,service_name,node)
00373 connection = Connection(rule, service_uri, node_uri)
00374 connections.append(connection)
00375 return connections
00376
00377 def getConnectionState(self):
00378 connections = {}
00379 publishers, subscribers, services = self.getSystemState()
00380 action_servers, publishers, subscribers = self.getActionServers(publishers, subscribers)
00381 action_clients, publishers, subscribers = self.getActionClients(publishers, subscribers)
00382 connections[ConnectionType.PUBLISHER] = self.getConnectionsFromPubSubList(publishers, ConnectionType.PUBLISHER)
00383 connections[ConnectionType.SUBSCRIBER] = self.getConnectionsFromPubSubList(subscribers, ConnectionType.SUBSCRIBER)
00384 connections[ConnectionType.SERVICE] = self.getConnectionsFromServiceList(services, ConnectionType.SERVICE)
00385 connections[ConnectionType.ACTION_SERVER] = self.getConnectionsFromActionList(action_servers, ConnectionType.ACTION_SERVER)
00386 connections[ConnectionType.ACTION_CLIENT] = self.getConnectionsFromActionList(action_clients, ConnectionType.ACTION_CLIENT)
00387 return connections
00388
00389 def _get_anonymous_node_name(self, topic):
00390 t = topic[1:len(topic)]
00391 name = roslib.names.anonymous_name(t)
00392 return name
00393
00394
00395
00396
00397
00398 def findGatewayNamespace(self):
00399 '''
00400 Assists a script to find the (hopefully) unique gateway namespace.
00401 Note that unique is a necessary condition, there should only be one
00402 gateway per ros system.
00403
00404 @return Namespace of the gateway node.
00405 @rtype string
00406 '''
00407 unused_publishers, unused_subscribers, services = self.getSystemState()
00408 for service in services:
00409 service_name = service[0]
00410 if re.search(r'remote_gateway_info', service_name):
00411 if service_name == '/remote_gateway_info':
00412 return "/"
00413 else:
00414 return re.sub(r'/remote_gateway_info', '', service_name)
00415 return None