Package concert_conductor :: Module conductor
[frames] | no frames]

Source Code for Module concert_conductor.conductor

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_concert/concert_conductor/LICENSE 
  5  # 
  6  ############################################################################## 
  7  # Imports 
  8  ############################################################################## 
  9   
 10  import os 
 11  import rospy 
 12  import rocon_app_manager_msgs.srv as rapp_manager_srvs 
 13  import concert_msgs.msg as concert_msgs 
 14  import concert_msgs.srv as concert_srvs 
 15  import gateway_msgs.msg as gateway_msgs 
 16  import gateway_msgs.srv as gateway_srvs 
 17  import rocon_utilities 
 18  import xmlrpclib 
 19   
 20  # local imports 
 21  from .concert_client import ConcertClient, ConcertClientException 
 22   
 23  ############################################################################## 
 24  # Conductor 
 25  ############################################################################## 
 26   
 27   
28 -class Conductor(object):
29
30 - def __init__(self):
31 ################################## 32 # Pubs, Subs and Services 33 ################################## 34 self.publishers = {} 35 # private spammy list_concert_clients publisher - used by web applications since they can't yet handle latched 36 self.publishers["spammy_list_concert_clients"] = rospy.Publisher("~list_concert_clients", concert_msgs.ConcertClients) 37 # efficient latched publisher, put in the public concert namespace. 38 self.publishers["list_concert_clients"] = rospy.Publisher("list_concert_clients", concert_msgs.ConcertClients, latch=True) 39 self.services = {} 40 self.services['invite_concert_clients'] = rospy.Service('~invite_concert_clients', concert_srvs.Invite, self._process_invitation_request) 41 # service clients 42 self._remote_gateway_info_service = rospy.ServiceProxy("~remote_gateway_info", gateway_srvs.RemoteGatewayInfo) 43 try: 44 self._remote_gateway_info_service.wait_for_service() 45 except rospy.ServiceException, e: 46 raise e 47 48 ################################## 49 # Variables 50 ################################## 51 # Keys are client human friendly names, values the client class themselves 52 self._concert_clients = {} # List of concert_clients, both visible and out of range 53 self._invited_clients = {} # Clients that have previously been invited, but disappeared 54 # List of gateway names identifying bad clients 55 self._bad_clients = [] # Used to remember clients that are bad.so we don't try and pull them again 56 self._concert_name = None 57 self._watcher_period = 1.0 # Period for the watcher thread (i.e. update rate) 58 59 ################################## 60 # Initialisation 61 ################################## 62 self._get_concert_name() 63 self._param = self._setup_ros_parameters() 64 self._publish_discovered_concert_clients() # Publish an empty list, to latch it and start
65
66 - def invite(self, concert_name, clientnames, ok_flag):
67 for name in clientnames: 68 try: 69 unused_resp = self._concert_clients[name].invite(concert_name, name, ok_flag) 70 rospy.loginfo("Conductor : successfully invited [%s]" % str(name)) 71 self._invited_clients[name] = ok_flag 72 except KeyError: # raised when name is not in the self._concert_clients keys 73 rospy.logerr("Conductor : tried to invite unknown concert client [%s]" % name) 74 return False 75 except Exception as e: 76 rospy.logerr("Conductor : failed to invite concert client [%s]" % str(e)) 77 return False 78 return True
79
80 - def spin(self):
81 ''' 82 Maintains the clientele list. We have to manage for two kinds here. Currently I use the same 83 class interface for both with just a flag to differentiate, but it could probably use a split somewhere in the future. 84 ''' 85 master = xmlrpclib.ServerProxy(os.environ['ROS_MASTER_URI']) 86 while not rospy.is_shutdown(): 87 gateway_clients = self._get_gateway_clients() # list of clients identified by gateway hash names 88 local_clients = [client for client in self._get_local_clients(master) if client not in gateway_clients] 89 visible_clients = gateway_clients + local_clients 90 number_of_pruned_clients = self._prune_client_list(visible_clients) 91 number_of_new_clients = 0 92 new_clients = [c for c in visible_clients if (c not in [client.gateway_name for client in self._concert_clients.values()]) 93 and (c not in self._bad_clients)] 94 # Create new clients info instance 95 for gateway_hash_name in new_clients: 96 is_local_client = True if gateway_hash_name in local_clients and gateway_hash_name not in visible_clients else False 97 gateway_name = rocon_utilities.gateway_basename(gateway_hash_name) 98 try: 99 # remove the 16 byte hex hash from the name 100 same_name_count = 0 101 for client in self._concert_clients.values(): 102 if gateway_name == rocon_utilities.gateway_basename(client.gateway_name): 103 same_name_count += 1 104 concert_name = gateway_name if same_name_count == 0 else gateway_name + str(same_name_count + 1) 105 self._concert_clients[concert_name] = ConcertClient(concert_name, gateway_hash_name, is_local_client=is_local_client) 106 rospy.loginfo("Conductor : new client found [%s]" % concert_name) 107 number_of_new_clients += 1 108 109 # re-invitation of clients that disappeared and came back 110 if concert_name in self._invited_clients: 111 self.invite(self._concert_name, [concert_name], True) 112 except Exception as e: 113 self._bad_clients.append(gateway_name) 114 rospy.loginfo("Conductor : failed to establish client [%s][%s][%s]" % (str(gateway_hash_name), str(e), type(e))) 115 116 if self._param['config']['auto_invite']: 117 client_list = [client for client in self._concert_clients 118 if (client not in self._invited_clients) 119 or (client in self._invited_clients and self._invited_clients[client] == False)] 120 self.invite(self._concert_name, client_list, True) 121 # Continually publish so it goes to web apps for now (inefficient). 122 self._publish_discovered_concert_clients(self.publishers["spammy_list_concert_clients"]) 123 # Long term solution 124 if number_of_pruned_clients != 0 or number_of_new_clients != 0: 125 self._publish_discovered_concert_clients() 126 rospy.sleep(self._watcher_period)
127 128 ########################################################################### 129 # Ros Callbacks 130 ########################################################################### 131
132 - def _process_invitation_request(self, req):
133 ''' 134 Handles service requests from the concert master to invite a set of concert clients. 135 ''' 136 mastername = req.mastername 137 resp = self.invite(mastername, req.clientnames, req.ok_flag) 138 return concert_srvs.InviteResponse("Success to invite[" + str(resp) + "] : " + str(req.clientnames))
139 140 ########################################################################### 141 # Helpers 142 ########################################################################### 143
144 - def _get_local_clients(self, master):
145 ''' 146 This keeps tab on the ros master's xmlrpc api to check for new incoming connections. 147 It's a bit naive, but it does the job - at least until we can do proper invites across 148 the board (android is on-connect only atm). 149 150 @param master : comes from `master = xmlrpclib.ServerProxy(os.environ['ROS_MASTER_URI'])` 151 @type xmlrpclib server proxy 152 ''' 153 caller_id = '/script' 154 error_code, msg, val = master.getSystemState(caller_id) 155 client_hash_name_list = [] 156 if error_code == 1: 157 unused_pubs, unused_subs, srvs = val 158 for srv, unused_node in srvs: 159 if srv.find("platform_info") != -1: 160 client_hash_name_list.append(srv.split('/')[1]) # is of form /client_hash_name/platform_info 161 else: 162 rospy.logerr("Conductor: failed to call the concert master for the system state [%s][%s]." % (error_code, msg)) 163 return client_hash_name_list
164
165 - def _get_gateway_clients(self):
166 ''' 167 Return the list of clients currently visible on network. This 168 currently just checks the remote gateways for 'platform_info' 169 services ...which should be representative of a 170 concert client (aye, not real safe like). 171 172 @return visible_clients : list of visible clients identified by their gateway hash names 173 @rtype list of str 174 ''' 175 suffix = 'platform_info' 176 visible_clients = [] 177 remote_gateway_info = self._remote_gateway_info_service() 178 for remote_gateway in remote_gateway_info.gateways: 179 for rule in remote_gateway.public_interface: 180 if rule.name.endswith(suffix): 181 visible_clients.append(rule.name[1:-(len(suffix) + 1)]) # escape the initial '/' and the trailing '/platform_info' 182 return visible_clients
183
184 - def _prune_client_list(self, new_clients):
185 ''' 186 Remove from the current client list any whose topics and services have disappeared. 187 188 @param new_clients : a list of new clients identified by their gateway hash names 189 @type list of str 190 @return number of pruned clients 191 @rtype int 192 ''' 193 number_of_pruned_clients = 0 194 # Gateway names are unique hashed names, client names are also unique, but more human friendly 195 for (client_name, gateway_name) in [(name, client.gateway_name) for name, client in self._concert_clients.items()]: 196 if not gateway_name in new_clients: 197 number_of_pruned_clients += 1 198 rospy.loginfo("Conductor : client left : " + client_name) 199 del self._concert_clients[client_name] 200 return number_of_pruned_clients
201
202 - def _publish_discovered_concert_clients(self, list_concert_clients_publisher=None):
203 ''' 204 Provide a list of currently discovered clients. This goes onto a 205 latched publisher, so subscribers will always have the latest 206 without the need to poll a service. 207 ''' 208 discovered_concert = concert_msgs.ConcertClients() 209 for unused_client, client_info in self._concert_clients.iteritems(): 210 try: 211 discovered_concert.clients.append(client_info.to_msg_format()) 212 except ConcertClientException: 213 # service was broken, quietly do not add it 214 # (it will be deleted from client list next pass) 215 pass 216 if not list_concert_clients_publisher: 217 list_concert_clients_publisher = self.publishers["list_concert_clients"] # default 218 list_concert_clients_publisher.publish(discovered_concert)
219 220 ########################################################################### 221 # Private Initialisation 222 ########################################################################### 223
224 - def _get_concert_name(self):
225 # Get concert name (i.e. gateway name) 226 gateway_info_proxy = rocon_utilities.SubscriberProxy("~gateway_info", gateway_msgs.GatewayInfo) 227 try: 228 gateway_info_proxy.wait_for_publishers() 229 except rospy.exceptions.ROSInterruptException: 230 rospy.logwarn("Conductor : ros shut down before gateway info could be found.") 231 232 while not rospy.is_shutdown(): 233 gateway_info = gateway_info_proxy(rospy.Duration(0.2)) 234 if gateway_info: 235 if gateway_info.connected: 236 self._concert_name = gateway_info.name 237 break 238 else: 239 rospy.loginfo("Conductor : no hub yet available, spinning...") 240 rospy.sleep(1.0)
241
242 - def _setup_ros_parameters(self):
243 param = {} 244 param['config'] = {} 245 param['config']['auto_invite'] = rospy.get_param('~auto_invite', False) 246 return param
247