00001 '''
00002 Created on 09/08/2011
00003
00004 @author: snorri
00005 '''
00006
00007
00008
00009
00010 import os
00011 import xmlrpclib
00012 import threading
00013 import socket
00014 import sys
00015
00016
00017 import roslib; roslib.load_manifest('rocon_conductor')
00018 import rospy
00019
00020
00021 import zeroconf_avahi
00022 import concert_msgs
00023 from zeroconf_msgs.msg import *
00024 from zeroconf_msgs.srv import *
00025 from concert_msgs.msg import *
00026
00027
00028 import zeroconf
00029 from .utilities import platform_id_to_string
00030 from .utilities import system_id_to_string
00031
00032
00033
00034
00035
00036
00037 class ConcertClientError(Exception):
00038 '''
00039 Input value should be a string
00040 '''
00041 def __init__(self, value):
00042 self.value = value
00043
00044 def __str__(self):
00045 return self.value
00046
00047
00048 class ConcertClient(object):
00049 '''
00050 Initialises the client with zeroconf information and a few extra
00051 details from xmlrpc handshaking. These clients are actually created and info stored,
00052 even if they do not actually connect to the concert. I may have to revert on this decision later.
00053
00054 @param master zconf_info : zero-configuration to store.
00055
00056 @var zconf_info : the zero-configuration for this client [zeroconf_msgs.msg.DiscoveredService]
00057 @var connected : connection status
00058 @var app_manager_uri : address and port number of the app manager's xmlrpc handshaking server.
00059 '''
00060 def __init__(self, unique_name_generator, zconf_info=None):
00061 '''
00062 Can be initialised via zeroconf identification, or we get a connection out of the blue,
00063 in which case we have no zeroconf information.
00064 '''
00065
00066 self.zconf_info = None
00067
00068 self.platform_info = concert_msgs.msg.PlatformInfo()
00069 self.app_manager_uri = None
00070
00071
00072 self.is_connected = False
00073 self.last_connection_timestamp = None
00074
00075
00076 self.platform_info_subscriber = None
00077
00078 if zconf_info is not None:
00079 self.zconf_info = zconf_info
00080 rospy.logwarn("Conductor: update this to handle multiple addresses - Todo!!")
00081
00082 self.app_manager_uri = "http://" + self.zconf_info.ipv4_addresses[0] + ":" + str(self.zconf_info.port)
00083 s = self.configuration_server()
00084
00085 rospy.loginfo("Conductor: requesting information from the app manager at %s"%self.app_manager_uri)
00086 try:
00087 self.platform_info.platform = s.platform()
00088 self.platform_info.system = s.system()
00089 self.platform_info.robot = s.robot()
00090 self.platform_info.key = s.key()
00091 self.platform_info.suggested_name = s.suggested_name()
00092
00093 self.platform_info.unique_name = unique_name_generator(self.platform_info.suggested_name)
00094 except xmlrpclib.Fault, err:
00095 raise ConcertClientError("could not retrieve platform information [%s][%s][%s]"%(err.faultCode,err.faultString,zconf_info.name))
00096 except xmlrpclib.ProtocolError:
00097 raise ConcertClientError("could not contact client on the zeroconf url [%s][%s]"%(self.app_manager_uri,zconf_info.name))
00098 except IOError, err:
00099 raise ConcertClientError("io error connecting to client [%s][%s][%s]"%(err.errno,os.strerror(err.errno),zconf_info.name))
00100 except:
00101 raise ConcertClientError("unexpected error contacting the client's xmlrpc server [%s]"%zconf_info.name)
00102
00103 def to_msg_format(self):
00104 client = concert_msgs.msg.ConcertClient()
00105 client.zeroconf = self.zconf_info if (self.zconf_info is not None) else zeroconf_msgs.msg.DiscoveredService()
00106 client.platform = platform_id_to_string(self.platform_info.platform)
00107 client.system = system_id_to_string(self.platform_info.system)
00108 client.robot = self.platform_info.robot if (self.platform_info.robot is not None) else "unknown"
00109 client.suggested_name = self.platform_info.suggested_name if (self.platform_info.suggested_name is not None) else "unknown"
00110 client.unique_name = self.platform_info.unique_name if (self.platform_info.unique_name is not None) else "unknown"
00111 client.key = self.platform_info.key if (self.platform_info.key is not None) else "none"
00112 client.app_manager_uri = self.app_manager_uri if (self.app_manager_uri is not None) else "unknown"
00113 client.is_connected = self.is_connected
00114 client.last_connection_timestamp = self.last_connection_timestamp if (self.last_connection_timestamp is not None) else rospy.Time()
00115 return client
00116
00117 def register_connection(self):
00118 '''
00119 Used by the connections manager to save a timestamp highlighting
00120 when the last client connection was registered.
00121 '''
00122 rospy.loginfo("Conductor: timestamping new connection [%s]"%self.platform_info.unique_name)
00123 self.last_connection_timestamp = rospy.Time.now()
00124 self.is_connected = True
00125
00126 def has_platform_information(self):
00127 '''
00128 Part of the android mess where we couldn't query xmlrpc server for
00129 platform information - this lets us get it via ros publisher
00130 after the client is already connected.
00131 '''
00132 if self.platform_info_subscriber is not None:
00133
00134 return False
00135 elif self.platform_info.platform is None or self.platform_info.system is None:
00136 return False
00137 else:
00138 return True
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168 def configuration_server(self):
00169 return xmlrpclib.ServerProxy(self.app_manager_uri)
00170
00171 def invite_to_concert(self, concert_master_name):
00172 '''
00173 Sends an invite to the client (contact's the client's xmlrpc invitation server).
00174
00175 @param concert_master_name : name of the concert master it is being invited to [str]
00176 @return bool : success or failure
00177
00178 Assumptions:
00179 1) for now we're always inviting, and it always works.
00180 '''
00181 rospy.loginfo("Conductor: inviting client [%s][%s][%s]"%(self.platform_info.unique_name, self.zconf_info.name, self.app_manager_uri))
00182 s = self.configuration_server()
00183
00184 s.unique_name(concert_master_name, self.platform_info.unique_name)
00185 try:
00186 if s.invite(concert_master_name):
00187 rospy.loginfo("Conductor: client accepted the invitation [%s][%s]"%(self.platform_info.unique_name, self.app_manager_uri))
00188 else:
00189 rospy.loginfo("Conductor: client rejected the invitation [%s][%s]"%(self.platform_info.unique_name, self.app_manager_uri))
00190 except xmlrpclib.Fault, err:
00191 rospy.logwarn("Conductor: a fault occurred")
00192 rospy.logwarn("Conductor: fault code: %d" % err.faultCode)
00193 rospy.logwarn("Conductor: fault string: %s" % err.faultString)
00194 rospy.logwarn("Conductor: client does not support being invited [%s]"%self.platform_info.unique_name)
00195 except socket.timeout, err:
00196 rospy.logwarn("Conductor: socket timed out when trying to invite client [%s][%s][%s]"%(err.errno,os.strerror(err.errno),self.platform_info.unique_name))
00197
00198 class Connections(threading.Thread):
00199
00200 unique_id_counter = 0
00201
00202 '''
00203 @var concert_master_name : zeroconf name for the concert master [str]
00204 @var concert_clients : list of concert clients [ConcertClient[]]
00205 @var auto_connect_clients : whether to auto-invite clients or not
00206
00207 @param auto_invite_clients
00208 '''
00209 def __init__(self):
00210 '''
00211 Subscriber callback that listens to the zeroconf new_connection topic.
00212 '''
00213 threading.Thread.__init__(self)
00214 self.concert_clients = set([])
00215 self.concert_master_name = zeroconf.concert_master_name()
00216 rospy.loginfo("Conductor: initialised for concert '%s'"%self.concert_master_name)
00217
00218 self._auto_invite_clients = rospy.get_param("auto_invite_clients", True)
00219
00220
00221 self.concert_clients_publisher = rospy.Publisher("concert_clients", concert_msgs.msg.ConcertClients, latch=True)
00222 rospy.Subscriber("new_connections",zeroconf_msgs.msg.DiscoveredService,self.new_connection_cb)
00223 rospy.Subscriber("lost_connections",zeroconf_msgs.msg.DiscoveredService,self.lost_connection_cb)
00224
00225 for zconf_client in zeroconf.discover_concert_clients():
00226 try:
00227 concert_client = ConcertClient(self.unique_name_generator, zconf_client)
00228
00229 if self._auto_invite_clients:
00230 concert_client.invite_to_concert(self.concert_master_name)
00231 self.concert_clients.add( concert_client )
00232 except ConcertClientError as e:
00233 rospy.logerr("Conductor: %s"%e)
00234 rospy.logerr("Conductor: not appending to the concert client list")
00235 self.publish_discovered_clients()
00236
00237
00238 def unique_name_generator(self, suggested_name):
00239 '''
00240 Checks if the suggested name is currently unique and if not, stamps it with a # to guarantee
00241 uniqueness.
00242
00243 This could be a little bit 'nicer'. The # increments for every robot, not in parallel for every
00244 robot with a particular suggested name. i.e. irobi, robosem, irobo_1, robosem_2, robosem_3,
00245 irobi_4.
00246 '''
00247 existing_client = next((client for client in self.concert_clients if client.platform_info.unique_name == suggested_name), None)
00248 if existing_client is None:
00249 return suggested_name
00250 else:
00251 unique_name = suggested_name + "_" + str(Connections.unique_id_counter)
00252 Connections.unique_id_counter += 1
00253 return unique_name
00254
00255 def new_registration_cb(self, client):
00256 rospy.loginfo("Conductor: new registration [%s][%s]"%(client.key, client.unique_name))
00257
00258 def new_connection_cb(self,new_zconf_client):
00259 '''
00260 Relays removed zeroconf connections from the zeroconf/new_connections topic.
00261 '''
00262 duplicated_client = next((client for client in self.concert_clients if client.zconf_info.name == new_zconf_client.name), None)
00263 if duplicated_client is not None:
00264
00265 rospy.loginfo("Conductor: discovered existing client on a new interface [ignoring][%s]"%new_zconf_client.name)
00266
00267
00268 return
00269 try:
00270 client = ConcertClient(self.unique_name_generator, new_zconf_client)
00271 client.invite_to_concert(self.concert_master_name)
00272 self.concert_clients.add( client )
00273 self.publish_discovered_clients()
00274 except ConcertClientError as err:
00275 rospy.logerr("Conductor: %s"%err)
00276 rospy.logerr("Conductor: not appending to the concert client list")
00277
00278
00279 def lost_connection_cb(self,lost_zconf_client):
00280 '''
00281 Relays removed zeroconf connections from the zeroconf/lost_connections topic.
00282 '''
00283 lost_client = next((client for client in self.concert_clients if client.zconf_info.name == lost_zconf_client.name), None)
00284 if lost_client is not None:
00285 rospy.loginfo("Conductor: removing client [%s]"%lost_zconf_client.name)
00286 self.concert_clients.remove(lost_client)
00287 self.publish_discovered_clients()
00288 else:
00289 rospy.logwarn("Conductor: tried to remove a non-attached client")
00290
00291 def watchdog(self, master):
00292 '''
00293 This keeps tab on the ros master's xmlrpc api to check for new incoming connections.
00294 It's a bit naive, but it does the job - at least until we can do proper invites across
00295 the board (android is on-connect only atm).
00296 '''
00297 caller_id = '/script'
00298 code, msg, val = master.getSystemState(caller_id)
00299 if code == 1:
00300 pubs, unused_subs, unused_srvs = val
00301
00302
00303 for pub, unused_node in pubs:
00304 if pub.find("platform_info") != -1:
00305 unique_name = pub.split('/')[1]
00306 client = next((client for client in self.concert_clients if client.platform_info.unique_name == unique_name), None)
00307
00308
00309
00310 if client is None:
00311
00312
00313
00314 continue
00315
00316
00317
00318 if not client.is_connected:
00319 client.register_connection()
00320 self.publish_discovered_clients()
00321 else:
00322 rospy.logerr("Conductor: failed to call the concert master for the system state [%s][%s]."%(code,msg))
00323
00324
00325
00326
00327
00328 def run(self):
00329 '''
00330 This loop currently runs a watchdog and publishes current client list.
00331 See the watchdog method for more detailed information.
00332
00333 Todo: fix a race condition when calling concert_clients in this thread.
00334 '''
00335 master = xmlrpclib.ServerProxy(os.environ['ROS_MASTER_URI'])
00336 while not rospy.is_shutdown():
00337 self.watchdog(master)
00338 rospy.sleep(2.0)
00339
00340 def spin(self):
00341 '''
00342 Convenient label if we're spinning in the main thread (i.e. not using this class's thread)..
00343 '''
00344 self.run()
00345
00346 def join(self):
00347 '''
00348 Wait for the thread to terminate. Note that it doesn't need help - it will terminate
00349 when ros shuts down.
00350 '''
00351 self.join()
00352
00353
00354
00355
00356
00357 def publish_discovered_clients(self):
00358 '''
00359 Provide a list of currently discovered clients. This goes onto a
00360 latched publisher, so subscribers will always have the latest
00361 without the need to poll a service.
00362 '''
00363 discovered_concert = concert_msgs.msg.ConcertClients()
00364 for client in self.concert_clients:
00365 discovered_concert.clients.append(client.to_msg_format())
00366 self.concert_clients_publisher.publish(discovered_concert)