Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import os
00034 import copy
00035
00036 from urlparse import urlparse
00037 from multiprocessing import Process, Manager
00038 from vcstools.vcs_base import VcsError
00039
00040 class MultiProjectException(Exception): pass
00041
00042 def samefile(f1, f2):
00043 "Test whether two pathnames reference the same actual file"
00044 try:
00045 return os.path.samefile(f1,f2)
00046 except AttributeError:
00047 try:
00048 from nt import _getfinalpathname
00049 return _getfinalpathname(f1) == _getfinalpathname(f2)
00050 except (NotImplementedError, ImportError):
00051
00052
00053
00054
00055 return os.path.abspath(f1) == os.path.abspath(f2)
00056
00057 def conditional_abspath(uri):
00058 """
00059 @param uri: The uri to check
00060 @return: abspath(uri) if local path otherwise pass through uri
00061 """
00062 u = urlparse(uri)
00063 if u.scheme == '':
00064 return os.path.abspath(uri)
00065 else:
00066 return uri
00067
00068 def is_web_uri(source_uri):
00069 if source_uri is None or source_uri == '':
00070 return False
00071 parsed_uri = urlparse(source_uri)
00072 if (parsed_uri.scheme == ''
00073 and parsed_uri.netloc == ''
00074 and not '@' in parsed_uri.path.split('/')[0]):
00075 return False
00076 return True
00077
00078 def normabspath(localname, path):
00079 """
00080 if localname is absolute, return it normalized. If relative, return normalized join of path and localname
00081 """
00082 if os.path.isabs(localname) or path is None:
00083 return os.path.realpath(localname)
00084 abs_path = os.path.realpath(os.path.join(path, localname))
00085 return abs_path
00086
00087 def realpath_relation(abspath1, abspath2):
00088 """
00089 Computes the relationship abspath1 to abspath2
00090 :returns: None, 'SAME_AS', 'PARENT_OF', 'CHILD_OF'
00091 """
00092 assert os.path.isabs(abspath1)
00093 assert os.path.isabs(abspath2)
00094 realpath1 = os.path.realpath(abspath1)
00095 realpath2 = os.path.realpath(abspath2)
00096 if os.path.dirname(realpath1) == os.path.dirname(realpath2):
00097 if os.path.basename(realpath1) == os.path.basename(realpath2):
00098 return 'SAME_AS'
00099 return None
00100 else:
00101 commonprefix = os.path.commonprefix([realpath1, realpath2])
00102 if commonprefix == realpath1:
00103 return 'PARENT_OF'
00104 elif commonprefix == realpath2:
00105 return 'CHILD_OF'
00106 return None
00107
00108 def select_element(elements, localname):
00109 """
00110 selects entry among elements where path or localname matches.
00111 Prefers localname matches in case of ambiguity.
00112 """
00113 path_candidate = None
00114 if localname is not None:
00115 realpath = os.path.realpath(localname)
00116 for element in elements:
00117 if localname == element.get_local_name():
00118 path_candidate = element
00119 break
00120 elif realpath == os.path.realpath(element.get_path()):
00121 path_candidate = element
00122 return path_candidate
00123
00124
00125 def select_elements(config, localnames):
00126 """
00127 selects config elements with given localnames, returns in the order given in config
00128 If localnames has one element which is path of the config, return all elements
00129 """
00130 if config is None:
00131 return []
00132 if localnames is None:
00133 return config.get_config_elements()
00134 elements = config.get_config_elements()
00135 selected = []
00136 notfound = []
00137 for localname in localnames:
00138 element = select_element(elements, localname)
00139 if element is not None:
00140 selected.append(element)
00141 else:
00142 notfound.append(localname)
00143 if notfound != []:
00144 raise MultiProjectException("Unknown elements '%s'"%notfound)
00145 result = []
00146
00147 for element in config.get_config_elements():
00148 if element in selected:
00149 result.append(element)
00150 if result == []:
00151 if (len(localnames) == 1
00152 and os.path.realpath(localnames[0]) == os.path.realpath(config.get_base_path())):
00153 return config.get_elements()
00154 return result
00155
00156
00157
00158
00159
00160
00161
00162
00163 class WorkerThread(Process):
00164
00165 def __init__(self, worker, outlist, index):
00166 Process.__init__(self)
00167 self.worker = worker
00168 if worker is None or worker.element is None:
00169 raise MultiProjectException("Bug: Invalid Worker")
00170 self.outlist = outlist
00171 self.index = index
00172
00173 def run(self):
00174 result = {}
00175 try:
00176 result = {'entry': self.worker.element.get_path_spec()}
00177 result_dict = self.worker.do_work()
00178 if result_dict is not None:
00179 result.update(result_dict)
00180 else:
00181 result.update({'error': MultiProjectException("worker returned None")})
00182 except MultiProjectException as e:
00183 result.update({'error': e})
00184 except VcsError as e:
00185 result.update({'error': e})
00186 except OSError as e:
00187 result.update({'error': e})
00188 except Exception as e:
00189
00190 import sys, traceback
00191 traceback.print_exc(file=sys.stderr)
00192 result.update({'error': e})
00193 self.outlist[self.index] = result
00194
00195 class DistributedWork():
00196
00197 def __init__(self, capacity, num_threads=10, silent=True):
00198 man = Manager()
00199 self.outputs = man.list([None for _ in range(capacity)])
00200 self.threads = []
00201 self.sequentializers = {}
00202 self.index = 0
00203 self.num_threads = num_threads
00204 self.silent = silent
00205
00206 def add_thread(self, worker):
00207 thread = WorkerThread(worker, self.outputs, self.index)
00208 if self.index >= len(self.outputs):
00209 raise MultiProjectException("Bug: Declared capacity exceeded %s >= %s"%(self.index, len(self.outputs)))
00210 self.index += 1
00211 self.threads.append(thread)
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236 def run(self):
00237 """
00238 Execute all collected workers, terminate all on KeyboardInterrupt
00239 """
00240 if self.threads == []:
00241 return []
00242 if (self.num_threads == 1):
00243 for thread in self.threads:
00244 thread.run()
00245 else:
00246
00247
00248
00249
00250
00251 try:
00252 waiting_index = 0
00253 maxthreads = self.num_threads
00254 running_threads = []
00255 missing_threads = copy.copy(self.threads)
00256
00257 while len(missing_threads) > 0:
00258
00259 if len(running_threads) < maxthreads:
00260 to_index = min(waiting_index + maxthreads - len(running_threads), len(self.threads))
00261 for i in range(waiting_index, to_index):
00262 self.threads[i].start()
00263 running_threads.append(self.threads[i])
00264 waiting_index = to_index
00265
00266 missing_threads = [t for t in missing_threads if t.exitcode is None]
00267 running_threads = [t for t in running_threads if t.exitcode is None]
00268 if (not self.silent
00269 and len(running_threads) > 0):
00270 print("[%s] still active"%",".join([th.worker.element.get_local_name() for th in running_threads]))
00271 for thread in running_threads:
00272 thread.join(1)
00273 except KeyboardInterrupt as k:
00274 for thread in self.threads:
00275 if thread is not None and thread.is_alive():
00276 print("[%s] terminated while active"%thread.worker.element.get_local_name())
00277 thread.terminate()
00278 raise k
00279
00280 self.outputs = filter(lambda x: x is not None, self.outputs)
00281 message = ''
00282 for output in self.outputs:
00283 if "error" in output:
00284 if 'entry' in output:
00285 message += "Error processing '%s' : %s\n"%(output['entry'].get_local_name(), output["error"])
00286 else:
00287 message += "%s\n"%output["error"]
00288 if message != '':
00289 raise MultiProjectException(message)
00290 return self.outputs