Package rosmake :: Module parallel_build
[frames] | no frames]

Source Code for Module rosmake.parallel_build

  1  #! /usr/bin/env python 
  2   
  3  # Copyright (c) 2009, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  #  
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions are met: 
  8  #  
  9  #     * Redistributions of source code must retain the above copyright 
 10  #       notice, this list of conditions and the following disclaimer. 
 11  #     * Redistributions in binary form must reproduce the above copyright 
 12  #       notice, this list of conditions and the following disclaimer in the 
 13  #       documentation and/or other materials provided with the distribution. 
 14  #     * Neither the name of Willow Garage, Inc. nor the names of its 
 15  #       contributors may be used to endorse or promote products derived from 
 16  #       this software without specific prior written permission. 
 17  #  
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 19  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 20  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 21  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 22  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 23  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 24  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 25  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 26  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 27  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 28  # POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  # Author Tully Foote/tfoote@willowgarage.com 
 32   
 33  import os 
 34  import re 
 35  import sys 
 36  import subprocess 
 37  import time 
 38   
 39  import rospkg 
 40  import threading 
 41   
 42  if sys.hexversion > 0x03000000: #Python3 
 43      python3 = True 
 44  else: 
 45      python3 = False 
 46   
47 -def _read_stdout(cmd):
48 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 49 std_out, std_err = p.communicate() 50 if python3: 51 return std_out.decode() 52 else: 53 return std_out
54
55 -def num_cpus():
56 """ 57 Detects the number of CPUs on a system. Cribbed from pp. 58 """ 59 # Linux, Unix and MacOS: 60 if hasattr(os, "sysconf"): 61 if "SC_NPROCESSORS_ONLN" in os.sysconf_names: 62 # Linux & Unix: 63 ncpus = os.sysconf("SC_NPROCESSORS_ONLN") 64 if isinstance(ncpus, int) and ncpus > 0: 65 return ncpus 66 else: # OSX: 67 return int(_read_stdout(["sysctl", "-n", "hw.ncpu"])) or 1 68 # Windows: 69 if "NUMBER_OF_PROCESSORS" in os.environ: 70 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); 71 if ncpus > 0: 72 return ncpus 73 return 1 # Default
74 75 #TODO: may no longer need this now that we've ported to rospkg
76 -class DependencyTracker:
77 """ Track dependencies between packages. This is basically a 78 caching way to call rospkg. It also will allow you to specifiy a 79 range of packages over which to track dependencies. This is useful 80 if you are only building a subset of the tree. For example with the 81 --specified-only option. """
82 - def __init__(self, valid_packages=None, rospack=None):
83 """ 84 @param valid_packages: defaults to rospack list 85 """ 86 if rospack is None: 87 self.rospack = rospkg.RosPack() 88 else: 89 self.rospack = rospack 90 if valid_packages is None: 91 valid_packages = self.rospack.list() 92 self.valid_packages = valid_packages 93 self.deps_1 = {} 94 self.deps = {}
95
96 - def get_deps_1(self, package):
97 if not package in self.deps_1: 98 self.deps_1[package] = [] 99 try: 100 potential_dependencies = self.rospack.get_depends(package, implicit=False) 101 except rospkg.ResourceNotFound: 102 potential_dependencies = [] 103 for p in potential_dependencies: 104 if p in self.valid_packages: 105 self.deps_1[package].append(p) 106 107 return self.deps_1[package]
108
109 - def get_deps(self, package):
110 if not package in self.deps: 111 self.deps[package] = [] 112 try: 113 potential_dependencies = self.rospack.get_depends(package) 114 except rospkg.ResourceNotFound: 115 potential_dependencies = [] 116 117 for p in potential_dependencies: 118 if p in self.valid_packages: 119 self.deps[package].append(p) 120 return self.deps[package]
121
122 - def load_fake_deps(self, deps, deps1):
123 self.deps = deps 124 self.deps_1 = deps1 125 return
126 127
128 -class CompileThread(threading.Thread):
129 """ This is the class which is used as the thread for parallel 130 builds. This class will query the build queue object for new 131 commands and block on its calls until the build queue says that 132 building is done. """
133 - def __init__(self, name, build_queue, rosmakeall, argument = None):
134 threading.Thread.__init__(self) 135 self.build_queue = build_queue 136 self.rosmakeall = rosmakeall 137 self.argument = argument 138 self.name = name 139 self.logging_enabled = True
140
141 - def run(self):
142 while not self.build_queue.is_done(): 143 pkg = self.build_queue.get_valid_package() 144 if not pkg: 145 if self.build_queue.succeeded(): 146 self.rosmakeall.printer.print_verbose("[ Build Completed Thread Exiting ]", thread_name=self.name); 147 else: 148 self.rosmakeall.printer.print_verbose("[ Build Terminated Thread Exiting ]", thread_name=self.name) 149 break # no more packages must be done 150 151 # update status after accepting build 152 self.rosmakeall.update_status(self.argument , 153 self.build_queue.get_started_threads(), 154 self.build_queue.progress_str()) 155 156 if self.argument: 157 self.rosmakeall.printer.print_all ("Starting >>> %s [ make %s ]"%(pkg, self.argument), thread_name=self.name) 158 else: 159 self.rosmakeall.printer.print_all ("Starting >>> %s [ make ] "%pkg, thread_name=self.name) 160 (result, result_string) = self.rosmakeall.build(pkg, self.argument, self.build_queue.robust_build) 161 self.rosmakeall.printer.print_all("Finished <<< %s %s"%(pkg, result_string), thread_name= self.name) 162 #print "Finished2" 163 self.build_queue.return_built(pkg, result) 164 #print "returned" 165 if result or self.build_queue.robust_build: 166 pass#print "result", result, "robust", self.build_queue.robust_build 167 else: 168 if result_string.find("[Interrupted]") != -1: 169 self.rosmakeall.printer.print_all("Caught Interruption", thread_name=self.name) 170 self.build_queue.stop() #todo move this logic into BuildQueue itself 171 break # unnecessary since build_queue is done now while will quit 172 self.rosmakeall.printer.print_all("Halting due to failure in package %s. \n[ rosmake ] Waiting for other threads to complete."%pkg) 173 self.build_queue.stop() 174 break # unnecessary since build_queue is done now, while will quit 175 # update status after at end of build 176 #print "updating status" 177 self.rosmakeall.update_status(self.argument , 178 self.build_queue.get_started_threads(), 179 self.build_queue.progress_str()) 180 #print "done built", len(self.build_queue.built), self.build_queue.built 181 #print "failed", len(self.build_queue.failed), self.build_queue.failed 182 #print "to_build", len(self.build_queue.to_build), self.build_queue.to_build 183 #print "in progress", len(self.build_queue._started), self.build_queue._started 184 185 #print "last update" 186 # update status before ending thread 187 self.rosmakeall.update_status(self.argument , 188 self.build_queue.get_started_threads(), 189 self.build_queue.progress_str())
190 #print "thread finished" 191
192 -class BuildQueue:
193 """ This class provides a thread safe build queue. Which will do 194 the sequencing for many CompileThreads. """
195 - def __init__(self, package_list, dependency_tracker, robust_build = False):
196 self._total_pkgs = len(package_list) 197 self.dependency_tracker = dependency_tracker 198 self.to_build = package_list[:] # do a copy not a reference 199 self.built = [] 200 self.failed = [] 201 self.condition = threading.Condition() 202 self._done = False 203 self.robust_build = robust_build 204 self._started = {} 205 self._hack_end_counter = 0
206
207 - def progress_str(self):
208 return "[ %d Active %d/%d Complete ]"%(len(self._started), len(self.built), self._total_pkgs)
209
210 - def get_started_threads(self): #TODO sort this other than hash order
211 return self._started.copy()
212
213 - def is_completed(self):
214 """Return if the build queue has been completed """ 215 return len(self.built)+ len(self.failed) == self._total_pkgs
216
217 - def is_done(self):
218 """Return if the build queue has been completed """ 219 return self.is_completed() or self._done # finished or halted
220
221 - def succeeded(self):
222 """ Return whether the build queue has completed all packages successfully. """ 223 return len(self.built) == self._total_pkgs #flag that we're finished
224
225 - def stop(self):
226 """ Stop the build queue, including waking all blocking 227 threads. It will not stop in flight builds.""" 228 self._done = True 229 with self.condition: 230 self.condition.notifyAll() # wake any blocking threads 231
232 - def return_built(self, package, successful=True): # mark that a package is built
233 """ The thread which completes a package marks it as done with 234 this method.""" 235 with self.condition: 236 if successful: 237 self.built.append(package) 238 else: 239 self.failed.append(package) 240 if package in self._started.keys(): 241 self._started.pop(package) 242 else: 243 pass #used early on print "\n\n\nERROR THIS SHOULDN't RETURN %s\n\n\n"%package 244 if self.is_completed(): 245 self._done = True 246 self.condition.notifyAll() #wake up any waiting threads 247
248 - def get_valid_package(self): # blocking call to get a package to build returns none if done
249 """ This is a blocking call which will return a package which has 250 all dependencies met. If interrupted or done it will return 251 None""" 252 with self.condition: 253 while (not self.is_done() and len(self.to_build) > 0): 254 for p in self.to_build: 255 dependencies_met = True 256 for d in self.dependency_tracker.get_deps(p): 257 if d not in self.built and not (self.robust_build and d in self.failed): 258 dependencies_met = False 259 #print "Dependency %s not met for %s"%(d, p) 260 break 261 if dependencies_met: # all dependencies met 262 self.to_build.remove(p) 263 self._started[p] = time.time() 264 self._hack_end_counter = 0 #reset end counter if success 265 return p # break out and return package if found 266 elif len(self._started) == 0 and self._hack_end_counter > 2: 267 # we're hung with broken dependencies 268 return None 269 #print "TTGTTTTHTHT Waiting on condition" 270 self.condition.wait(1.0) # failed to find a package wait for a notify before looping 271 272 self._hack_end_counter += 1 # if we're here too often we will quit 273 if self.is_done(): 274 break 275 276 return None 277