Package node_manager_fkie :: Module launch_server_handler
[frames] | no frames]

Source Code for Module node_manager_fkie.launch_server_handler

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from python_qt_binding.QtCore import QObject, Signal 
 34  import random 
 35  import socket 
 36  import threading 
 37  import time 
 38  import xmlrpclib 
 39   
 40  import rospy 
 41   
 42   
43 -class LaunchServerHandler(QObject):
44 ''' 45 A class to retrieve the state of launch servers. To retrieve the state a new 46 thread will be created. 47 ''' 48 launch_server_signal = Signal(str, int, list) 49 ''' 50 @ivar: launch_server_signal is a signal (serveruri, pid, nodes), which is emitted, if a info from 51 launch server was successful retrieved. 52 ''' 53 error_signal = Signal(str, str) 54 ''' 55 @ivar: error_signal is a signal (serveruri, error message), which is emitted, 56 if an error while retrieving a launch server info was occurred. 57 ''' 58
59 - def __init__(self):
60 QObject.__init__(self) 61 self.__updateThreads = {} 62 self.__requestedUpdates = {} 63 self._lock = threading.RLock()
64
65 - def stop(self):
66 if len(self.__updateThreads) > 0: 67 print " Shutdown launch update threads..." 68 self.__requestedUpdates.clear() 69 with self._lock: 70 for _, thread in self.__updateThreads.iteritems(): 71 thread.launch_server_signal.disconnect() 72 thread.error_signal.disconnect() 73 print " Launch update threads are off!"
74
75 - def updateLaunchServerInfo(self, serveruri, delayed_exec=0.0):
76 ''' 77 This method starts a thread to get the informations about the launch server by 78 the given RCP uri of the launch server. If all informations are 79 retrieved, a C{launch_server_signal} of this class will be emitted. If for given 80 serveruri a thread is already running, it will be inserted to the requested 81 updates. For the same serveruri only one requested update can be stored. 82 On update error the requested update will be ignored. 83 This method is thread safe. 84 85 @param serveruri: the URI of the remote launch server 86 @type serveruri: C{str} 87 @param delayed_exec: Delay the execution of the request for given seconds. 88 @type delayed_exec: C{float} 89 ''' 90 with self._lock: 91 try: 92 if serveruri in self.__updateThreads: 93 self.__requestedUpdates[serveruri] = delayed_exec 94 else: 95 self.__create_update_thread(serveruri, delayed_exec) 96 except: 97 pass
98
99 - def _on_launch_server_info(self, serveruri, pid, nodes):
100 self.launch_server_signal.emit(serveruri, pid, nodes) 101 self.__handle_requests(serveruri)
102
103 - def _on_error(self, serveruri, error):
104 self.error_signal.emit(serveruri, error) 105 self.__handle_requests(serveruri)
106
107 - def __handle_requests(self, serveruri):
108 with self._lock: 109 try: 110 thread = self.__updateThreads.pop(serveruri) 111 del thread 112 delayed_exec = self.__requestedUpdates.pop(serveruri) 113 self.__create_update_thread(serveruri, delayed_exec) 114 except KeyError: 115 pass 116 except: 117 import traceback 118 print traceback.format_exc(2)
119
120 - def __create_update_thread(self, serveruri, delayed_exec):
121 upthread = LaunchServerUpdateThread(serveruri, delayed_exec) 122 self.__updateThreads[serveruri] = upthread 123 upthread.launch_server_signal.connect(self._on_launch_server_info) 124 upthread.error_signal.connect(self._on_error) 125 upthread.start()
126 127
128 -class LaunchServerUpdateThread(QObject, threading.Thread):
129 ''' 130 A thread to retrieve the list of pid and nodes from launch server and publish 131 it by sending a QT signal. 132 ''' 133 launch_server_signal = Signal(str, int, list) 134 error_signal = Signal(str, str) 135
136 - def __init__(self, launch_serveruri, delayed_exec=0.0, parent=None):
137 QObject.__init__(self) 138 threading.Thread.__init__(self) 139 self._launch_serveruri = launch_serveruri 140 self._delayed_exec = delayed_exec 141 self.setDaemon(True)
142
143 - def run(self):
144 ''' 145 ''' 146 try: 147 delay = self._delayed_exec + 0.5 + random.random() 148 time.sleep(delay) 149 socket.setdefaulttimeout(25) 150 server = xmlrpclib.ServerProxy(self._launch_serveruri) 151 _, _, pid = server.get_pid() # _:=code, msg 152 _, _, nodes = server.get_node_names() # _:=code, msg 153 self.launch_server_signal.emit(self._launch_serveruri, pid, nodes) 154 except: 155 import traceback 156 # print traceback.print_exc() 157 formatted_lines = traceback.format_exc(1).splitlines() 158 rospy.logwarn("Connection to launch server @ %s failed:\n\t%s", str(self._launch_serveruri), formatted_lines[-1]) 159 # 'print "request failed", self._monitoruri 160 self.error_signal.emit(self._launch_serveruri, formatted_lines[-1]) 161 finally: 162 if socket is not None: 163 socket.setdefaulttimeout(None)
164