Package concert_orchestra :: Module orchestration
[frames] | no frames]

Source Code for Module concert_orchestra.orchestration

  1  #!/usr/bin/env python 
  2  # 
  3  # License: BSD 
  4  #   https://raw.github.com/robotics-in-concert/rocon_concert/hydro-devel/concert_orchestra/LICENSE 
  5  # 
  6  ############################################################################## 
  7  # Imports 
  8  ############################################################################## 
  9   
 10  import re 
 11  import copy 
 12  import rospy 
 13  import rocon_app_manager_msgs.msg as rapp_manager_msgs 
 14  import rocon_app_manager_msgs.srv as rapp_manager_srvs 
 15  import concert_msgs.msg as concert_msgs 
 16  import concert_msgs.srv as concert_srvs 
 17   
 18  # Local imports 
 19  from .implementation import Implementation 
 20  from .compatibility_tree import create_compatibility_tree, prune_compatibility_tree, CompatibilityTree 
 21   
 22  ############################################################################## 
 23  # Orchestration 
 24  ############################################################################## 
 25   
 26   
27 -class Orchestration(object):
28
29 - def __init__(self):
30 self._implementation = Implementation() 31 self._compatibility_tree = create_compatibility_tree(self._implementation.nodes, {}) 32 self._solution_running = False 33 self._concert_clients = {} # dictionary of human friendly name - concert_msgs.ConcertClient pairs 34 rospy.Subscriber("list_concert_clients", concert_msgs.ConcertClients, self._callback_concert_clients) 35 36 # parameters 37 self._params = {} 38 self._params['auto_start'] = rospy.get_param("~auto_start", False) 39 40 self._services = {} 41 # later disassemble these to start_apps/stop_apps (plural) to the conductor 42 self._services['stop_solution'] = rospy.Service('stop_solution', concert_srvs.StopSolution, self._process_stop_solution) 43 self._services['start_solution'] = rospy.Service('start_solution', concert_srvs.StartSolution, self._process_start_solution)
44
45 - def _callback_concert_clients(self, concert):
46 ''' 47 The conductor publishes the concert client list, which also happens to 48 be latched so you'll always get the latest list. 49 50 It stores the concert clients in a dictionary of concert clients keyed 51 by the client names. 52 53 @todo - write what is the imperative reason for a dic over a simple list 54 55 @param up to date concert client list provided by the conductor 56 @type concert_msgs.ConcertClients 57 ''' 58 rospy.loginfo("Orchestration : updated concert clients list:") 59 old_concert_clients = copy.deepcopy(self._concert_clients) 60 self._concert_clients = {} # maybe small race condition in doing this 61 for concert_client in concert.clients: 62 # create a dictionary of concert client objects, keyed by the human consumable name 63 self._concert_clients[concert_client.name] = copy.deepcopy(concert_client) 64 rospy.loginfo(" Client: %s" % (concert_client.name)) 65 rospy.loginfo(" %s.%s.%s" % (concert_client.platform, concert_client.system, concert_client.robot)) 66 rospy.loginfo(" %s" % concert_client.client_status) 67 if not self._solution_running: 68 self._compatibility_tree = create_compatibility_tree(self._implementation.nodes, self._concert_clients) 69 pruned_branches = prune_compatibility_tree(self._compatibility_tree, verbosity=True) 70 if pruned_branches: 71 self._pruned_compatibility_tree = CompatibilityTree(pruned_branches) 72 if self._pruned_compatibility_tree.is_valid(): 73 rospy.loginfo("Orchestration : solution is ready to run") 74 # Could do this, but no-one is listening right now. 75 # self._implementation.rebuild(node_client_matches) 76 # self._implementation.publish() 77 if self._params['auto_start']: 78 self._process_start_solution(concert_srvs.StartSolutionRequest()) 79 else: 80 rospy.loginfo("Orchestration : solution not yet ready [%s]" % self._pruned_compatibility_tree.error_message) 81 self._pruned_compatibility_tree.print_branches("Current Branches") 82 else: 83 diff = lambda l1, l2: [x for x in l1 if x not in l2] # diff of lists 84 new_client_names = diff(self._concert_clients.keys(), old_concert_clients.keys()) 85 lost_client_names = diff(old_concert_clients.keys(), self._concert_clients.keys()) 86 for new_client_name in new_client_names: 87 new_client = self._concert_clients[new_client_name] 88 branch = self._pruned_compatibility_tree.add_leaf(new_client) 89 if branch is None: 90 rospy.logerr("Orchestra: this client is not compatible for any node in this implementation.") 91 # should get listed as a bad client? 92 else: 93 self._process_start_client(new_client, branch) 94 for lost_client_name in lost_client_names: 95 lost_client = old_concert_clients[lost_client_name] 96 self._pruned_compatibility_tree.remove_leaf(lost_client) 97 if not self._pruned_compatibility_tree.is_valid(): 98 rospy.logerr("Orchestra: client disengaged and the solution is now no longer valid [%s]" % lost_client_name) 99 self._process_stop_solution()
100 101 ########################################################################## 102 # Ros Callbacks 103 ########################################################################## 104 # These should be moved to the conductor under the guise of 105 # 'start apps', 'stop apps' (plural). 106
107 - def _process_start_solution(self, req):
108 # Put in checks to see if a solution is already running 109 response = concert_srvs.StartSolutionResponse() 110 if not self._pruned_compatibility_tree.is_valid(): 111 response.success = False 112 response.message = "cowardly refused to start the solution [%s]..." % self._pruned_compatibility_tree.error_message 113 rospy.loginfo("Orchestration : %s" % response.message) 114 self._pruned_compatibility_tree.print_branches() 115 return response 116 if self._solution_running: 117 rospy.logwarn("Orchestration : %s" % response.message) 118 response.message = "chincha? the solution is already running..." 119 response.success = False 120 return response 121 implementation = self._implementation.to_msg() 122 response.success = True 123 response.message = "bonza" 124 link_graph = implementation.link_graph 125 126 rospy.loginfo("Orchestra : starting solution [%s]" % implementation.name) 127 for branch in self._pruned_compatibility_tree.branches: 128 app_name = branch.node.tuple.split('.')[3] 129 node_name = branch.node.id 130 remappings = [] 131 for edge in link_graph.edges: 132 if edge.start == node_name or edge.finish == node_name: 133 remappings.append((edge.remap_from, edge.remap_to)) 134 for leaf in branch.leaves: 135 concert_client_name = leaf.name 136 rospy.loginfo(" node: %s/%s" % (node_name, concert_client_name)) 137 rospy.loginfo(" app: %s" % app_name) 138 rospy.loginfo(" remaps") 139 for (remap_from, remap_to) in remappings: 140 rospy.loginfo(" %s->%s" % (remap_from, remap_to)) 141 # Check to see if start app service exists for the node, abort if not 142 start_app_name = '/' + self._concert_clients[concert_client_name].gateway_name + '/start_app' 143 rospy.wait_for_service(start_app_name) 144 start_app = rospy.ServiceProxy(start_app_name, rapp_manager_srvs.StartApp) 145 req = rapp_manager_srvs.StartAppRequest() 146 req.name = app_name 147 req.remappings = [] 148 for remapping in remappings: 149 req.remappings.append(rapp_manager_msgs.Remapping(remapping[0], remapping[1])) 150 rospy.loginfo(" Starting...") 151 resp = start_app(req) 152 if not resp.started: 153 response.success = False 154 response.message = "aigoo, failed to start app %s of %s" % (app_name, concert_client_name) 155 rospy.logwarn(" failed to start app %s" % (app_name)) 156 if response.success: 157 rospy.loginfo("Orchestra: All clients' app are started") 158 else: 159 rospy.logwarn("Orchestra: " + response.message) 160 161 self._solution_running = True 162 return response
163
164 - def _process_start_client(self, client, branch):
165 ''' 166 Used to start a single client. This is done when a client dynamically joins after the solution has started. 167 ''' 168 app_name = branch.node.tuple.split('.')[3] 169 node_name = branch.node.id 170 remappings = [] 171 implementation = self._implementation.to_msg() 172 link_graph = implementation.link_graph 173 for edge in link_graph.edges: 174 if edge.start == node_name or edge.finish == node_name: 175 remappings.append((edge.remap_from, edge.remap_to)) 176 rospy.loginfo(" node: %s/%s" % (node_name, client.name)) 177 rospy.loginfo(" app: %s" % app_name) 178 rospy.loginfo(" remaps") 179 for (remap_from, remap_to) in remappings: 180 rospy.loginfo(" %s->%s" % (remap_from, remap_to)) 181 # Check to see if start app service exists for the node, abort if not 182 start_app_name = '/' + client.gateway_name + '/start_app' 183 rospy.wait_for_service(start_app_name) 184 start_app = rospy.ServiceProxy(start_app_name, rapp_manager_srvs.StartApp) 185 req = rapp_manager_srvs.StartAppRequest() 186 req.name = app_name 187 req.remappings = [] 188 for remapping in remappings: 189 req.remappings.append(rapp_manager_msgs.Remapping(remapping[0], remapping[1])) 190 rospy.loginfo(" Starting...") 191 resp = start_app(req) 192 if not resp.started: 193 rospy.logwarn(" failed to start app %s" % (app_name))
194
195 - def _process_stop_solution(self, req=None):
196 response = concert_srvs.StopSolutionResponse() 197 response.success = True 198 response.message = "Bonza" 199 if not self._solution_running: 200 response.success = False 201 response.message = "chincha? the solution is not running..." 202 rospy.logwarn("Orchestration : %s" % response.message) 203 return response 204 self._solution_running = False 205 rospy.loginfo("Orchestra : stopping the solution.") 206 for branch in self._pruned_compatibility_tree.branches: 207 app_name = branch.node.tuple.split('.')[3] 208 for leaf in branch.leaves: 209 stop_app_name = '/' + leaf.gateway_name + '/stop_app' 210 rospy.loginfo("Orchestra : stopping %s" % stop_app_name) 211 # check first if it exists, also timeouts? 212 rospy.wait_for_service(stop_app_name) 213 stop_app = rospy.ServiceProxy(stop_app_name, rapp_manager_srvs.StopApp) 214 req = rapp_manager_srvs.StopAppRequest() 215 resp = stop_app(req) 216 if not resp.stopped: 217 response.success = False 218 response.message = "aigoo, failed to stop app %s" % app_name 219 rospy.loginfo("Orchestra : the solution has stopped successfully") 220 return response
221 222 ############################################################################## 223 # Graveyard 224 ############################################################################## 225 226 # def _implementation_ready_graveyard(self): 227 # ''' 228 # Checks if the listed concert clients are a match with the 229 # implementation. 230 # 231 # @return list of (node, client) tuples or None 232 # ''' 233 # clients = copy.deepcopy(self._concert_clients.values()) 234 # matched = [] 235 # for node in self._implementation.nodes: 236 # #print "Node %s" % str(node) 237 # index = 0 238 # possible_match_indices = [] 239 # for client in clients: 240 # if self._match(node, client): 241 # possible_match_indices.append(index) 242 # index += 1 243 # #print "Possible match indices %s" % str(possible_match_indices) 244 # if not possible_match_indices: 245 # #print "Match failed: %s" % str(node) 246 # return None 247 # elif len(possible_match_indices) == 1: 248 # matched.append((node['id'], clients[possible_match_indices[0]].name)) 249 # del clients[possible_match_indices[0]] 250 # else: 251 # matching_index = possible_match_indices[0] 252 # for index in possible_match_indices: 253 # if node['id'] == clients[index].name: 254 # matching_index = index 255 # break 256 # matched.append((node['id'], clients[matching_index].name)) 257 # #print "Appending matched %s-%s" % (node['id'], clients[matching_index].name) 258 # del clients[matching_index] 259 # return matched 260