1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import os
34 import re
35 import sys
36 import subprocess
37 import time
38
39 import rospkg
40 import threading
41
42 if sys.hexversion > 0x03000000:
43 python3 = True
44 else:
45 python3 = False
46
48 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
49 std_out, std_err = p.communicate()
50 if python3:
51 return std_out.decode()
52 else:
53 return std_out
54
56 """
57 Detects the number of CPUs on a system. Cribbed from pp.
58 """
59
60 if hasattr(os, "sysconf"):
61 if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
62
63 ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
64 if isinstance(ncpus, int) and ncpus > 0:
65 return ncpus
66 else:
67 return int(_read_stdout(["sysctl", "-n", "hw.ncpu"])) or 1
68
69 if "NUMBER_OF_PROCESSORS" in os.environ:
70 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
71 if ncpus > 0:
72 return ncpus
73 return 1
74
75
77 """ Track dependencies between packages. This is basically a
78 caching way to call rospkg. It also will allow you to specifiy a
79 range of packages over which to track dependencies. This is useful
80 if you are only building a subset of the tree. For example with the
81 --specified-only option. """
82 - def __init__(self, valid_packages=None, rospack=None):
83 """
84 @param valid_packages: defaults to rospack list
85 """
86 if rospack is None:
87 self.rospack = rospkg.RosPack()
88 else:
89 self.rospack = rospack
90 if valid_packages is None:
91 valid_packages = self.rospack.list()
92 self.valid_packages = valid_packages
93 self.deps_1 = {}
94 self.deps = {}
95
97 if not package in self.deps_1:
98 self.deps_1[package] = []
99 try:
100 potential_dependencies = self.rospack.get_depends(package, implicit=False)
101 except rospkg.ResourceNotFound:
102 potential_dependencies = []
103 for p in potential_dependencies:
104 if p in self.valid_packages:
105 self.deps_1[package].append(p)
106
107 return self.deps_1[package]
108
110 if not package in self.deps:
111 self.deps[package] = []
112 try:
113 potential_dependencies = self.rospack.get_depends(package)
114 except rospkg.ResourceNotFound:
115 potential_dependencies = []
116
117 for p in potential_dependencies:
118 if p in self.valid_packages:
119 self.deps[package].append(p)
120 return self.deps[package]
121
123 self.deps = deps
124 self.deps_1 = deps1
125 return
126
127
129 """ This is the class which is used as the thread for parallel
130 builds. This class will query the build queue object for new
131 commands and block on its calls until the build queue says that
132 building is done. """
133 - def __init__(self, name, build_queue, rosmakeall, argument = None):
134 threading.Thread.__init__(self)
135 self.build_queue = build_queue
136 self.rosmakeall = rosmakeall
137 self.argument = argument
138 self.name = name
139 self.logging_enabled = True
140
142 while not self.build_queue.is_done():
143 pkg = self.build_queue.get_valid_package()
144 if not pkg:
145 if self.build_queue.succeeded():
146 self.rosmakeall.printer.print_verbose("[ Build Completed Thread Exiting ]", thread_name=self.name);
147 else:
148 self.rosmakeall.printer.print_verbose("[ Build Terminated Thread Exiting ]", thread_name=self.name)
149 break
150
151
152 self.rosmakeall.update_status(self.argument ,
153 self.build_queue.get_started_threads(),
154 self.build_queue.progress_str())
155
156 if self.argument:
157 self.rosmakeall.printer.print_all ("Starting >>> %s [ make %s ]"%(pkg, self.argument), thread_name=self.name)
158 else:
159 self.rosmakeall.printer.print_all ("Starting >>> %s [ make ] "%pkg, thread_name=self.name)
160 (result, result_string) = self.rosmakeall.build(pkg, self.argument, self.build_queue.robust_build)
161 self.rosmakeall.printer.print_all("Finished <<< %s %s"%(pkg, result_string), thread_name= self.name)
162
163 self.build_queue.return_built(pkg, result)
164
165 if result or self.build_queue.robust_build:
166 pass
167 else:
168 if result_string.find("[Interrupted]") != -1:
169 self.rosmakeall.printer.print_all("Caught Interruption", thread_name=self.name)
170 self.build_queue.stop()
171 break
172 self.rosmakeall.printer.print_all("Halting due to failure in package %s. \n[ rosmake ] Waiting for other threads to complete."%pkg)
173 self.build_queue.stop()
174 break
175
176
177 self.rosmakeall.update_status(self.argument ,
178 self.build_queue.get_started_threads(),
179 self.build_queue.progress_str())
180
181
182
183
184
185
186
187 self.rosmakeall.update_status(self.argument ,
188 self.build_queue.get_started_threads(),
189 self.build_queue.progress_str())
190
191
193 """ This class provides a thread safe build queue. Which will do
194 the sequencing for many CompileThreads. """
195 - def __init__(self, package_list, dependency_tracker, robust_build = False):
196 self._total_pkgs = len(package_list)
197 self.dependency_tracker = dependency_tracker
198 self.to_build = package_list[:]
199 self.built = []
200 self.failed = []
201 self.condition = threading.Condition()
202 self._done = False
203 self.robust_build = robust_build
204 self._started = {}
205 self._hack_end_counter = 0
206
208 return "[ %d Active %d/%d Complete ]"%(len(self._started), len(self.built), self._total_pkgs)
209
211 return self._started.copy()
212
214 """Return if the build queue has been completed """
215 return len(self.built)+ len(self.failed) == self._total_pkgs
216
218 """Return if the build queue has been completed """
219 return self.is_completed() or self._done
220
222 """ Return whether the build queue has completed all packages successfully. """
223 return len(self.built) == self._total_pkgs
224
226 """ Stop the build queue, including waking all blocking
227 threads. It will not stop in flight builds."""
228 self._done = True
229 with self.condition:
230 self.condition.notifyAll()
231
233 """ The thread which completes a package marks it as done with
234 this method."""
235 with self.condition:
236 if successful:
237 self.built.append(package)
238 else:
239 self.failed.append(package)
240 if package in self._started.keys():
241 self._started.pop(package)
242 else:
243 pass
244 if self.is_completed():
245 self._done = True
246 self.condition.notifyAll()
247
249 """ This is a blocking call which will return a package which has
250 all dependencies met. If interrupted or done it will return
251 None"""
252 with self.condition:
253 while (not self.is_done() and len(self.to_build) > 0):
254 for p in self.to_build:
255 dependencies_met = True
256 for d in self.dependency_tracker.get_deps(p):
257 if d not in self.built and not (self.robust_build and d in self.failed):
258 dependencies_met = False
259
260 break
261 if dependencies_met:
262 self.to_build.remove(p)
263 self._started[p] = time.time()
264 self._hack_end_counter = 0
265 return p
266 elif len(self._started) == 0 and self._hack_end_counter > 2:
267
268 return None
269
270 self.condition.wait(1.0)
271
272 self._hack_end_counter += 1
273 if self.is_done():
274 break
275
276 return None
277