connection_manager.py
Go to the documentation of this file.
00001 '''
00002 Created on 09/08/2011
00003 
00004 @author: snorri
00005 '''
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import os
00011 import xmlrpclib
00012 import threading
00013 import socket #socket.error
00014 import sys
00015 
00016 # Ros imports
00017 import roslib; roslib.load_manifest('rocon_conductor')
00018 import rospy
00019 
00020 # Rocon imports
00021 import zeroconf_avahi
00022 import concert_msgs
00023 from zeroconf_msgs.msg import *
00024 from zeroconf_msgs.srv import *
00025 from concert_msgs.msg import *
00026 
00027 # Local imports
00028 import zeroconf
00029 from .utilities import platform_id_to_string
00030 from .utilities import system_id_to_string
00031 
00032 ##############################################################################
00033 # Class
00034 ##############################################################################
00035 
00036 
00037 class ConcertClientError(Exception):
00038     '''
00039        Input value should be a string
00040     '''
00041     def __init__(self, value):
00042         self.value = value
00043 
00044     def __str__(self):
00045         return self.value
00046 
00047 
00048 class ConcertClient(object):
00049     '''
00050       Initialises the client with zeroconf information and a few extra
00051       details from xmlrpc handshaking. These clients are actually created and info stored,
00052       even if they do not actually connect to the concert. I may have to revert on this decision later.
00053 
00054       @param master zconf_info : zero-configuration to store.
00055 
00056       @var zconf_info : the zero-configuration for this client [zeroconf_msgs.msg.DiscoveredService]
00057       @var connected  : connection status
00058       @var app_manager_uri : address and port number of the app manager's xmlrpc handshaking server.
00059     '''
00060     def __init__(self, unique_name_generator, zconf_info=None):
00061         '''
00062           Can be initialised via zeroconf identification, or we get a connection out of the blue,
00063           in which case we have no zeroconf information.
00064         '''
00065         # Description variables
00066         self.zconf_info = None
00067         # System triple
00068         self.platform_info = concert_msgs.msg.PlatformInfo() 
00069         self.app_manager_uri = None
00070 
00071         # Status variables
00072         self.is_connected = False
00073         self.last_connection_timestamp = None
00074 
00075         # Functionality variables
00076         self.platform_info_subscriber = None
00077          
00078         if zconf_info is not None:
00079             self.zconf_info = zconf_info
00080             rospy.logwarn("Conductor: update this to handle multiple addresses - Todo!!")
00081             # Need to better check for number of addresses here
00082             self.app_manager_uri = "http://" + self.zconf_info.ipv4_addresses[0] + ":" + str(self.zconf_info.port)
00083             s = self.configuration_server()
00084             # get system-platform-robot triple + key (random hex string) + suggested name
00085             rospy.loginfo("Conductor: requesting information from the app manager at %s"%self.app_manager_uri)
00086             try:
00087                 self.platform_info.platform = s.platform()
00088                 self.platform_info.system = s.system()
00089                 self.platform_info.robot = s.robot()
00090                 self.platform_info.key = s.key()
00091                 self.platform_info.suggested_name = s.suggested_name()
00092                 # configure this client with a uniquely generated name
00093                 self.platform_info.unique_name = unique_name_generator(self.platform_info.suggested_name)
00094             except xmlrpclib.Fault, err:
00095                 raise ConcertClientError("could not retrieve platform information [%s][%s][%s]"%(err.faultCode,err.faultString,zconf_info.name))
00096             except xmlrpclib.ProtocolError:
00097                 raise ConcertClientError("could not contact client on the zeroconf url [%s][%s]"%(self.app_manager_uri,zconf_info.name))
00098             except IOError, err: # socket.error
00099                 raise ConcertClientError("io error connecting to client [%s][%s][%s]"%(err.errno,os.strerror(err.errno),zconf_info.name))
00100             except:
00101                 raise ConcertClientError("unexpected error contacting the client's xmlrpc server [%s]"%zconf_info.name)
00102         
00103     def to_msg_format(self):
00104         client = concert_msgs.msg.ConcertClient()
00105         client.zeroconf = self.zconf_info if (self.zconf_info is not None) else zeroconf_msgs.msg.DiscoveredService() 
00106         client.platform = platform_id_to_string(self.platform_info.platform) 
00107         client.system = system_id_to_string(self.platform_info.system) 
00108         client.robot = self.platform_info.robot if (self.platform_info.robot is not None) else "unknown" 
00109         client.suggested_name = self.platform_info.suggested_name if (self.platform_info.suggested_name is not None) else "unknown" 
00110         client.unique_name = self.platform_info.unique_name if (self.platform_info.unique_name is not None) else "unknown" 
00111         client.key = self.platform_info.key if (self.platform_info.key is not None) else "none" 
00112         client.app_manager_uri = self.app_manager_uri if (self.app_manager_uri is not None) else "unknown"
00113         client.is_connected = self.is_connected
00114         client.last_connection_timestamp =  self.last_connection_timestamp if (self.last_connection_timestamp is not None) else rospy.Time() # 0sec, 0nsec
00115         return client
00116 
00117     def register_connection(self):
00118         '''
00119           Used by the connections manager to save a timestamp highlighting
00120           when the last client connection was registered. 
00121         '''
00122         rospy.loginfo("Conductor: timestamping new connection [%s]"%self.platform_info.unique_name)
00123         self.last_connection_timestamp = rospy.Time.now()
00124         self.is_connected = True 
00125         
00126     def has_platform_information(self):
00127         '''
00128           Part of the android mess where we couldn't query xmlrpc server for 
00129           platform information - this lets us get it via ros publisher
00130           after the client is already connected.
00131         '''
00132         if self.platform_info_subscriber is not None:
00133             # We're currently waiting for the callback to kick in
00134             return False
00135         elif self.platform_info.platform is None or self.platform_info.system is None:
00136             return False
00137         else:
00138             return True
00139             
00140 #    def retrieve_platform_information(self, unique_name):    
00141 #        '''
00142 #          Part of the android mess where we couldn't query xmlrpc server for 
00143 #          platform information - this lets us get it via ros publisher
00144 #          after the client is already connected.
00145 #        '''
00146 #        self.platform_info.unique_name = unique_name
00147 #        if self.platform_info_subscriber is not None:
00148 #            rospy.logerr("Conductor: client's platform info publisher was already started (shouldn't be).")
00149 #            return
00150 #        self.platform_info_subscriber = rospy.Subscriber( 
00151 #                        roslib.names.make_global_ns(roslib.names.ns_join(unique_name,"platform_info")), 
00152 #                        concert_msgs.msg.PlatformInfo,
00153 #                        self.retrieve_platform_information_callback) 
00154 #
00155 #    def retrieve_platform_information_callback(self, data):
00156 #        '''
00157 #           This will usually fire immediately once the publisher is initiated as its latched.
00158 #        '''
00159 #        self.platform_info = data
00160 #
00161 #        self.platform_info_subscriber.unregister()
00162 #        del self.platform_info_subscriber
00163 #        self.platform_info_subscriber = None
00164 #        # this is naive, move to the spin() loop to make sure we check all clients for connectivity.
00165 #        self.register_connection()
00166 #        rospy.loginfo("Conductor: client connected [%s][%s][%s]"%(self.unique_name,self.platform,self.system))
00167         
00168     def configuration_server(self):                
00169         return xmlrpclib.ServerProxy(self.app_manager_uri)
00170     
00171     def invite_to_concert(self, concert_master_name):
00172         '''
00173           Sends an invite to the client (contact's the client's xmlrpc invitation server).
00174           
00175           @param concert_master_name : name of the concert master it is being invited to [str]
00176           @return bool : success or failure
00177           
00178           Assumptions: 
00179             1) for now we're always inviting, and it always works.
00180         '''
00181         rospy.loginfo("Conductor: inviting client [%s][%s][%s]"%(self.platform_info.unique_name, self.zconf_info.name, self.app_manager_uri))
00182         s = self.configuration_server()
00183         # Set the unique name for the app manager to pair with this concert master
00184         s.unique_name(concert_master_name, self.platform_info.unique_name)
00185         try:
00186             if s.invite(concert_master_name):
00187                 rospy.loginfo("Conductor: client accepted the invitation [%s][%s]"%(self.platform_info.unique_name, self.app_manager_uri))
00188             else:
00189                 rospy.loginfo("Conductor: client rejected the invitation [%s][%s]"%(self.platform_info.unique_name, self.app_manager_uri))
00190         except xmlrpclib.Fault, err:
00191             rospy.logwarn("Conductor: a fault occurred")
00192             rospy.logwarn("Conductor: fault code: %d" % err.faultCode)
00193             rospy.logwarn("Conductor: fault string: %s" % err.faultString)
00194             rospy.logwarn("Conductor: client does not support being invited [%s]"%self.platform_info.unique_name)
00195         except socket.timeout, err:
00196             rospy.logwarn("Conductor: socket timed out when trying to invite client [%s][%s][%s]"%(err.errno,os.strerror(err.errno),self.platform_info.unique_name))
00197 
00198 class Connections(threading.Thread):
00199     
00200     unique_id_counter = 0
00201 
00202     '''
00203     @var concert_master_name : zeroconf name for the concert master [str]
00204     @var concert_clients : list of concert clients [ConcertClient[]]
00205     @var auto_connect_clients : whether to auto-invite clients or not
00206     
00207     @param auto_invite_clients
00208     '''
00209     def __init__(self):
00210         '''
00211            Subscriber callback that listens to the zeroconf new_connection topic.
00212         '''
00213         threading.Thread.__init__(self)        
00214         self.concert_clients = set([])
00215         self.concert_master_name = zeroconf.concert_master_name()
00216         rospy.loginfo("Conductor: initialised for concert '%s'"%self.concert_master_name)
00217         # This is stub functionality, we'll expand auto-invitations properly later
00218         self._auto_invite_clients = rospy.get_param("auto_invite_clients", True)
00219 
00220         # Ros comms
00221         self.concert_clients_publisher = rospy.Publisher("concert_clients", concert_msgs.msg.ConcertClients, latch=True)
00222         rospy.Subscriber("new_connections",zeroconf_msgs.msg.DiscoveredService,self.new_connection_cb)
00223         rospy.Subscriber("lost_connections",zeroconf_msgs.msg.DiscoveredService,self.lost_connection_cb)
00224         
00225         for zconf_client in zeroconf.discover_concert_clients():
00226             try:
00227                 concert_client = ConcertClient(self.unique_name_generator, zconf_client)
00228                 # should check that its not already in in the concert_clients list
00229                 if self._auto_invite_clients:
00230                     concert_client.invite_to_concert(self.concert_master_name)
00231                 self.concert_clients.add( concert_client ) # add it whether the invite succeeded or not, details of the success are in the client class
00232             except ConcertClientError as e:
00233                 rospy.logerr("Conductor: %s"%e)
00234                 rospy.logerr("Conductor: not appending to the concert client list")
00235         self.publish_discovered_clients()
00236                 
00237 
00238     def unique_name_generator(self, suggested_name):
00239         '''
00240           Checks if the suggested name is currently unique and if not, stamps it with a # to guarantee 
00241           uniqueness.
00242           
00243           This could be a little bit 'nicer'. The # increments for every robot, not in parallel for every 
00244           robot with a particular suggested name. i.e. irobi, robosem, irobo_1, robosem_2, robosem_3, 
00245           irobi_4.
00246         '''
00247         existing_client = next((client for client in self.concert_clients if client.platform_info.unique_name == suggested_name), None)
00248         if existing_client is None:
00249             return suggested_name
00250         else:
00251             unique_name = suggested_name + "_" + str(Connections.unique_id_counter)
00252             Connections.unique_id_counter += 1
00253             return unique_name
00254         
00255     def new_registration_cb(self, client):
00256         rospy.loginfo("Conductor: new registration [%s][%s]"%(client.key, client.unique_name))
00257         
00258     def new_connection_cb(self,new_zconf_client):
00259         '''
00260            Relays removed zeroconf connections from the zeroconf/new_connections topic.
00261         '''
00262         duplicated_client = next((client for client in self.concert_clients if client.zconf_info.name == new_zconf_client.name), None)
00263         if duplicated_client is not None:
00264             # Not really checking for this, but if zeroconf is not buggy, this is typically true (might be being naive!).
00265             rospy.loginfo("Conductor: discovered existing client on a new interface [ignoring][%s]"%new_zconf_client.name)
00266             #rospy.logwarn("Conductor: existing connection: \n%s"%zeroconf_avahi.utilities.service_to_str(duplicated_client.zconf_info))
00267             #rospy.logwarn("Conductor: new connection: \n%s"%zeroconf_avahi.utilities.service_to_str(new_zconf_client))
00268             return
00269         try:
00270             client = ConcertClient(self.unique_name_generator, new_zconf_client)
00271             client.invite_to_concert(self.concert_master_name)
00272             self.concert_clients.add( client )
00273             self.publish_discovered_clients()
00274         except ConcertClientError as err:
00275             rospy.logerr("Conductor: %s"%err)
00276             rospy.logerr("Conductor: not appending to the concert client list")
00277             
00278 
00279     def lost_connection_cb(self,lost_zconf_client):
00280         '''
00281            Relays removed zeroconf connections from the zeroconf/lost_connections topic.
00282         '''
00283         lost_client = next((client for client in self.concert_clients if client.zconf_info.name == lost_zconf_client.name), None)
00284         if lost_client is not None:
00285             rospy.loginfo("Conductor: removing client [%s]"%lost_zconf_client.name)
00286             self.concert_clients.remove(lost_client)
00287             self.publish_discovered_clients()
00288         else:
00289             rospy.logwarn("Conductor: tried to remove a non-attached client")
00290     
00291     def watchdog(self, master):
00292         '''
00293           This keeps tab on the ros master's xmlrpc api to check for new incoming connections.
00294           It's a bit naive, but it does the job - at least until we can do proper invites across
00295           the board (android is on-connect only atm).
00296         '''
00297         caller_id = '/script'
00298         code, msg, val = master.getSystemState(caller_id)
00299         if code == 1:
00300             pubs, unused_subs, unused_srvs = val
00301             # Later shift the search to a service
00302             # Here find the publishers with form: _unique_name_/platform_info
00303             for pub, unused_node in pubs:
00304                 if pub.find("platform_info") != -1:
00305                     unique_name = pub.split('/')[1]  # is of form /ros_robot/platform_info
00306                     client = next((client for client in self.concert_clients if client.platform_info.unique_name == unique_name), None)
00307                     
00308                     # Initially, our android clients werent' getting found by zeroconf, so 
00309                     # we had to create the ConcertClient and get platform information here
00310                     if client is None:
00311                         # client = ConcertClient(self.unique_name_generator)
00312                         # self.concert_clients.add( client )
00313                         # Now we just pass here and wait for zeroconf to create it
00314                         continue
00315                     # Now we always get our platform info from the xmlrpc servers, so skip this too
00316                     # if not client.has_platform_information():
00317                     #    client.retrieve_platform_information(unique_name)
00318                     if not client.is_connected:
00319                         client.register_connection()
00320                         self.publish_discovered_clients()
00321         else:
00322             rospy.logerr("Conductor: failed to call the concert master for the system state [%s][%s]."%(code,msg))
00323 
00324     ##############################################################################
00325     # Come in spinner!
00326     ##############################################################################
00327 
00328     def run(self):
00329         '''
00330           This loop currently runs a watchdog and publishes current client list.
00331           See the watchdog method for more detailed information.
00332            
00333           Todo: fix a race condition when calling concert_clients in this thread.
00334         '''
00335         master = xmlrpclib.ServerProxy(os.environ['ROS_MASTER_URI'])
00336         while not rospy.is_shutdown():
00337             self.watchdog(master)
00338             rospy.sleep(2.0)
00339 
00340     def spin(self):
00341         '''
00342           Convenient label if we're spinning in the main thread (i.e. not using this class's thread)..
00343         '''
00344         self.run()
00345         
00346     def join(self):
00347         ''' 
00348             Wait for the thread to terminate. Note that it doesn't need help - it will terminate
00349             when ros shuts down.
00350         '''
00351         self.join()
00352     
00353     ##############################################################################
00354     # Utilities
00355     ##############################################################################
00356 
00357     def publish_discovered_clients(self):
00358         '''
00359             Provide a list of currently discovered clients. This goes onto a 
00360             latched publisher, so subscribers will always have the latest
00361             without the need to poll a service.
00362         '''
00363         discovered_concert = concert_msgs.msg.ConcertClients()
00364         for client in self.concert_clients:
00365             discovered_concert.clients.append(client.to_msg_format())
00366         self.concert_clients_publisher.publish(discovered_concert)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends


graveyard_rocon_conductor
Author(s): Daniel Stonier
autogenerated on Wed Jan 23 2013 13:42:04