common.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 import os
00034 import copy
00035 # choosing multiprocessing over threading for clean Control-C interrupts (provides terminate())
00036 from urlparse import urlparse
00037 from multiprocessing import Process, Manager
00038 from vcstools.vcs_base import VcsError
00039 
00040 class MultiProjectException(Exception): pass
00041 
00042 def samefile(f1, f2):
00043     "Test whether two pathnames reference the same actual file"
00044     try:
00045         return os.path.samefile(f1,f2)
00046     except AttributeError:
00047         try:
00048             from nt import _getfinalpathname
00049             return _getfinalpathname(f1) == _getfinalpathname(f2)
00050         except (NotImplementedError, ImportError):
00051             # On Windows XP and earlier, two files are the same if their
00052             #  absolute pathnames are the same.
00053             # Also, on other operating systems, fake this method with a
00054             #  Windows-XP approximation.
00055             return os.path.abspath(f1) == os.path.abspath(f2)
00056 
00057 def conditional_abspath(uri):
00058   """
00059   @param uri: The uri to check
00060   @return: abspath(uri) if local path otherwise pass through uri
00061   """
00062   u = urlparse(uri)
00063   if u.scheme == '': # maybe it's a local file?
00064     return os.path.abspath(uri)
00065   else:
00066     return uri
00067 
00068 def is_web_uri(source_uri):
00069   if source_uri is None or source_uri == '':
00070     return False
00071   parsed_uri = urlparse(source_uri)
00072   if (parsed_uri.scheme == ''
00073       and parsed_uri.netloc == ''
00074       and not '@' in parsed_uri.path.split('/')[0]):
00075     return False
00076   return True
00077 
00078 def normabspath(localname, path):
00079   """
00080   if localname is absolute, return it normalized. If relative, return normalized join of path and localname
00081   """
00082   if os.path.isabs(localname) or path is None:
00083     return os.path.realpath(localname)
00084   abs_path = os.path.realpath(os.path.join(path, localname))
00085   return abs_path
00086 
00087 def realpath_relation(abspath1, abspath2):
00088   """
00089   Computes the relationship abspath1 to abspath2
00090   :returns: None, 'SAME_AS', 'PARENT_OF', 'CHILD_OF'
00091   """
00092   assert os.path.isabs(abspath1)
00093   assert os.path.isabs(abspath2)
00094   realpath1 = os.path.realpath(abspath1)
00095   realpath2 = os.path.realpath(abspath2)
00096   if os.path.dirname(realpath1) == os.path.dirname(realpath2):
00097     if os.path.basename(realpath1) == os.path.basename(realpath2):
00098       return 'SAME_AS'
00099     return None
00100   else:
00101     commonprefix = os.path.commonprefix([realpath1, realpath2])
00102     if commonprefix == realpath1:
00103       return 'PARENT_OF'
00104     elif commonprefix == realpath2:
00105       return 'CHILD_OF'
00106   return None
00107 
00108 def select_element(elements, localname):
00109   """
00110   selects entry among elements where path or localname matches.
00111   Prefers localname matches in case of ambiguity.
00112   """
00113   path_candidate = None
00114   if localname is not None:
00115     realpath = os.path.realpath(localname)
00116     for element in elements:
00117       if localname == element.get_local_name():
00118         path_candidate = element
00119         break
00120       elif realpath == os.path.realpath(element.get_path()):
00121         path_candidate = element
00122   return path_candidate
00123 
00124 
00125 def select_elements(config, localnames):
00126   """
00127   selects config elements with given localnames, returns in the order given in config
00128   If localnames has one element which is path of the config, return all elements
00129   """
00130   if config is None:
00131     return []
00132   if localnames is None:
00133     return config.get_config_elements()
00134   elements = config.get_config_elements()
00135   selected = []
00136   notfound = []
00137   for localname in localnames:
00138     element = select_element(elements, localname)
00139     if element is not None:
00140       selected.append(element)
00141     else:
00142       notfound.append(localname)
00143   if notfound != []:
00144     raise MultiProjectException("Unknown elements '%s'"%notfound)
00145   result = []
00146   # select in order and remove duplicates
00147   for element in config.get_config_elements():
00148     if element in selected:
00149       result.append(element)
00150   if result == []:
00151       if (len(localnames) == 1
00152           and os.path.realpath(localnames[0]) == os.path.realpath(config.get_base_path())):
00153         return config.get_elements()
00154   return result
00155   
00156   
00157 ## Multithreading The following classes help with distributing work
00158 ## over several instances, providing wrapping for starting, joining,
00159 ## collecting results, and catching Exceptions. Also they provide
00160 ## support for running groups of threads sequentially, for the case
00161 ## that some library is not thread-safe.
00162   
00163 class WorkerThread(Process):
00164 
00165   def __init__(self, worker, outlist, index):
00166     Process.__init__(self)
00167     self.worker = worker
00168     if worker is None or worker.element is None:
00169       raise MultiProjectException("Bug: Invalid Worker")
00170     self.outlist = outlist
00171     self.index = index
00172 
00173   def run(self):
00174     result = {}
00175     try:
00176       result = {'entry': self.worker.element.get_path_spec()}
00177       result_dict = self.worker.do_work()
00178       if result_dict is not None:
00179         result.update(result_dict)
00180       else:
00181         result.update({'error': MultiProjectException("worker returned None")})
00182     except MultiProjectException as e:
00183       result.update({'error': e})
00184     except VcsError as e:
00185       result.update({'error': e})
00186     except OSError as e:
00187       result.update({'error': e})
00188     except Exception as e:
00189       # this would be a bug, and we need trace to find them in multithreaded cases.
00190       import sys, traceback
00191       traceback.print_exc(file=sys.stderr)
00192       result.update({'error': e})
00193     self.outlist[self.index] = result
00194 
00195 class DistributedWork():
00196   
00197   def __init__(self, capacity, num_threads=10, silent=True):
00198     man = Manager() # need managed array since we need the results later
00199     self.outputs = man.list([None for _ in range(capacity)])
00200     self.threads = []
00201     self.sequentializers = {}
00202     self.index = 0
00203     self.num_threads = num_threads
00204     self.silent = silent
00205     
00206   def add_thread(self, worker):
00207     thread = WorkerThread(worker, self.outputs, self.index)
00208     if self.index >= len(self.outputs):
00209       raise MultiProjectException("Bug: Declared capacity exceeded %s >= %s"%(self.index, len(self.outputs)))
00210     self.index += 1
00211     self.threads.append(thread)
00212     
00213   # def add_to_sequential_thread_group(self, worker, group):
00214   #   """Workers in each sequential thread group run one after the other, groups run in parallel"""
00215   #   class ThreadSequentializer(Process):
00216   #     """helper class to run 'threads' one after the other"""
00217   #     def __init__(self):
00218   #       Process.__init__(self)
00219   #       self.workers = []
00220   #     def add_worker(self, worker):
00221   #       self.workers.append(worker)
00222   #     def run(self):
00223   #     for worker in self.workers:
00224   #       worker.run() # not calling start on purpose
00225   #   thread = WorkerThread(worker, self.outputs, self.index)
00226   #   if self.index >= len(self.outputs):
00227   #     raise MultiProjectException("Bug: Declared capacity exceeded %s >= %s"%(self.index, len(self.outputs)))
00228   #   self.index += 1
00229   #   if group not in self.sequentializers:
00230   #     self.sequentializers[group] = ThreadSequentializer()
00231   #     self.sequentializers[group].add_worker(thread)
00232   #     self.threads.append(self.sequentializers[group])
00233   #   else:
00234   #     self.sequentializers[group].add_worker(thread)
00235     
00236   def run(self):
00237     """
00238     Execute all collected workers, terminate all on KeyboardInterrupt
00239     """
00240     if self.threads == []:
00241       return []
00242     if (self.num_threads == 1):
00243       for thread in self.threads:
00244         thread.run()
00245     else:
00246       # The following code is rather delicate and may behave differently
00247       # using threading or multiprocessing. running_threads is
00248       # intentionally not used as a shrinking list because of al the
00249       # possible multithreading / interruption corner cases
00250       # Not using Pool because of KeyboardInterrupt cases
00251       try:
00252         waiting_index = 0
00253         maxthreads = self.num_threads
00254         running_threads = []
00255         missing_threads = copy.copy(self.threads)
00256         # we are done if all threads have finished
00257         while len(missing_threads) > 0:
00258           # we spawn more threads whenever some threads have finished
00259           if len(running_threads) < maxthreads:
00260             to_index = min(waiting_index + maxthreads - len(running_threads), len(self.threads))
00261             for i in range(waiting_index, to_index):
00262               self.threads[i].start()
00263               running_threads.append(self.threads[i])
00264             waiting_index = to_index
00265           # threads have exitcode only once they terminated
00266           missing_threads = [t for t in missing_threads if t.exitcode is None]
00267           running_threads = [t for t in running_threads if t.exitcode is None]
00268           if (not self.silent
00269               and len(running_threads) > 0):
00270             print("[%s] still active"%",".join([th.worker.element.get_local_name() for th in running_threads]))
00271           for thread in running_threads:
00272             thread.join(1) # this should prevent busy waiting
00273       except KeyboardInterrupt as k:
00274         for thread in self.threads:
00275           if thread is not None and thread.is_alive():
00276             print("[%s] terminated while active"%thread.worker.element.get_local_name())
00277             thread.terminate()
00278         raise k
00279     
00280     self.outputs = filter(lambda x: x is not None, self.outputs)
00281     message = ''
00282     for output in self.outputs:
00283       if "error" in output:
00284         if 'entry' in output:
00285           message += "Error processing '%s' : %s\n"%(output['entry'].get_local_name(), output["error"])
00286         else:
00287           message += "%s\n"%output["error"]
00288     if message != '':
00289       raise MultiProjectException(message)
00290     return self.outputs


win_rosinstall
Author(s): Daniel Stonier
autogenerated on Fri Jan 3 2014 12:16:33