orchestration.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_orchestration/rocon_orchestra/LICENSE
00005 #
00006 ##############################################################################
00007 # Imports
00008 ##############################################################################
00009 
00010 import copy
00011 import roslib
00012 roslib.load_manifest('rocon_orchestra')
00013 import rospy
00014 import appmanager_msgs.msg as appmanager_msgs
00015 import appmanager_msgs.srv as appmanager_srvs
00016 import concert_msgs.msg as concert_msgs
00017 import concert_msgs.srv as concert_srvs
00018 
00019 # Local imports
00020 from .implementation import Implementation
00021 
00022 ##############################################################################
00023 # Orchestration
00024 ##############################################################################
00025 
00026 
00027 class Orchestration(object):
00028 
00029     def __init__(self):
00030         self._implementation = Implementation()
00031         self._solution_running = False
00032         self._concert_clients = []
00033         rospy.Subscriber("list_concert_clients", concert_msgs.ConcertClients, self._callback_concert_clients)
00034 
00035         self._services = {}
00036         # later disassemble these to start_apps/stop_apps (plural) to the conductor
00037         self._services['stop_solution'] = rospy.Service('stop_solution', concert_srvs.StopSolution, self._process_stop_solution)
00038         self._services['start_solution'] = rospy.Service('start_solution', concert_srvs.StartSolution, self._process_start_solution)
00039 
00040     def _callback_concert_clients(self, concert):
00041         '''
00042           The conductor publishes the concert client list, which also happens to
00043           be latched so you'll always get the latest list.
00044         '''
00045         self._concert_clients = copy.deepcopy(concert.clients)
00046         rospy.loginfo("Orchestration : updated concert clients list:")
00047         for concert_client in concert.clients:
00048             rospy.loginfo("       Client: %s" % (concert_client.name))
00049             rospy.loginfo("               %s.%s.%s" % (concert_client.platform, concert_client.system, concert_client.robot))
00050             rospy.loginfo("               %s" % concert_client.client_status)
00051         node_client_matches = self._implementation_ready()
00052         if node_client_matches:
00053             if not self._solution_running:
00054                 self._implementation.rebuild(node_client_matches)
00055                 self._implementation.publish()
00056                 rospy.loginfo("Orchestration : solution is ready to run")
00057         else:
00058             # means you've lost a client
00059             # probably not robust if you have apps coming and going
00060             self._solution_running = False
00061             #self._process_stop_solution()
00062 
00063     def _implementation_ready(self):
00064         '''
00065           Checks if the listed concert clients are a match with the
00066           implementation.
00067 
00068           @return list of (node, client) tuples or None
00069         '''
00070         clients = copy.deepcopy(self._concert_clients)
00071         matched = []
00072         for node in self._implementation.nodes:
00073             #print "Node %s" % str(node)
00074             index = 0
00075             possible_match_indices = []
00076             for client in clients:
00077                 if self._match(node, client):
00078                     possible_match_indices.append(index)
00079                 index += 1
00080             #print "Possible match indices %s" % str(possible_match_indices)
00081             if not possible_match_indices:
00082                 #print "Match failed: %s" % str(node)
00083                 return None
00084             elif len(possible_match_indices) == 1:
00085                 matched.append((node['id'], clients[possible_match_indices[0]].name))
00086                 del clients[possible_match_indices[0]]
00087             else:
00088                 matching_index = possible_match_indices[0]
00089                 for index in possible_match_indices:
00090                     if node['id'] == clients[index].name:
00091                         matching_index = index
00092                         break
00093                 matched.append((node['id'], clients[matching_index].name))
00094                 #print "Appending matched %s-%s" % (node['id'], clients[matching_index].name)
00095                 del clients[matching_index]
00096         return matched
00097 
00098     def _match(self, node, concert_client):
00099         #print "****** _match ******"
00100         #print str(node)
00101         #print concert_client.name + "-" + concert_client.platform + "." + concert_client.system + "." + concert_client.robot
00102         parts = node['tuple'].split('.')
00103         platform = parts[0]
00104         system = parts[1]
00105         robot = parts[2]
00106         app_name = parts[3]
00107         if platform != concert_client.platform:
00108             return False
00109         if system != concert_client.system:
00110             return False
00111         if robot != concert_client.robot:
00112             return False
00113         for client_app in concert_client.apps:
00114             if app_name == client_app.name:
00115                 return True
00116         return False
00117 
00118     ##########################################################################
00119     # Ros Callbacks
00120     ##########################################################################
00121     # These should be moved to the conductor under the guise of
00122     # 'start apps', 'stop apps' (plural).
00123 
00124     def _process_start_solution(self, req):
00125         # Put in checks to see if a solution is already running
00126         response = concert_srvs.StartSolutionResponse()
00127         if not self._implementation_ready():
00128             response.success = False
00129             response.message = "solution is not yet ready (waiting for clients)..."
00130             return response
00131         if self._solution_running:
00132             rospy.loginfo("Orchestration : chincha? the solution is already running, try restarting anyway")
00133         implementation = self._implementation.to_msg()
00134         response.success = True
00135         link_graph = implementation.link_graph
00136         rospy.loginfo("Orchestra : starting solution [%s]" % implementation.name)
00137         for node in link_graph.nodes:
00138             concert_client_name = node.id
00139             app_name = node.tuple.split('.')[3]
00140             remappings = []
00141             rospy.loginfo("            node: %s" % concert_client_name)
00142             rospy.loginfo("              app: %s" % app_name)
00143             rospy.loginfo("              remaps")
00144             for edge in link_graph.edges:
00145                 if edge.start == concert_client_name or edge.finish == concert_client_name:
00146                     rospy.loginfo("                %s->%s" % (edge.remap_from, edge.remap_to))
00147                     remappings.append((edge.remap_from, edge.remap_to))
00148             # Check to see if start app service exists for the node, abort if not
00149             #self._concert_clients['/' + concert_client_name].start_app(app_name, remappings)
00150             start_app_name = '/' + node.id + '/start_app'
00151             rospy.wait_for_service(start_app_name)
00152             start_app = rospy.ServiceProxy(start_app_name, appmanager_srvs.StartApp)
00153             req = appmanager_srvs.StartAppRequest()
00154             req.name = app_name
00155             req.remappings = []
00156             for remapping in remappings:
00157                 req.remappings.append(appmanager_msgs.Remapping(remapping[0], remapping[1]))
00158             resp = start_app(req)
00159             if not resp.started:
00160                 response.success = False
00161                 response.message = "aigoo, failed to start app %s" % app_name
00162         response.message = "bonza"
00163         self._solution_running = True
00164         return response
00165 
00166     def _process_stop_solution(self, req=None):
00167         response = concert_srvs.StopSolutionResponse()
00168         response.success = True
00169         response.message = "Bonza"
00170         if not self._solution_running:
00171             response.success = False
00172             response.message = "chincha? the solution is not running..."
00173             return response
00174         self._solution_running = False
00175         rospy.loginfo("Orchestra : stopping the solution.")
00176         for node in self._implementation.nodes:
00177             stop_app_name = '/' + node['id'] + '/stop_app'
00178             app_name = node['tuple'].split('.')[3]
00179             # check first if it exists, also timeouts?
00180             rospy.wait_for_service(stop_app_name)
00181             stop_app = rospy.ServiceProxy(stop_app_name, appmanager_srvs.StopApp)
00182             req = appmanager_srvs.StopAppRequest(app_name)
00183             resp = stop_app(req)
00184             if not resp.stopped:
00185                 response.success = False
00186                 response.message = "aigoo, failed to stop app %s" % app_name
00187         return response
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends


rocon_orchestra
Author(s): Daniel Stonier
autogenerated on Tue Jan 22 2013 13:08:37