14 """Run a group of subprocesses and then finish."""
18 import multiprocessing
29 measure_cpu_costs =
False
31 _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
34 _MAX_RESULT_SIZE = 64 * 1024
42 return ''.join(c
for c
in s
if ord(c) < 128)
47 for key, value
in list(env.items()):
53 if platform.system() ==
'Windows':
55 elif platform.system()[:7] ==
'MSYS_NT':
57 elif platform.system() ==
'Darwin':
59 elif platform.system() ==
'Linux':
75 signal.signal(signal.SIGCHLD,
lambda unused_signum, unused_frame:
None)
76 signal.signal(signal.SIGALRM, alarm_handler)
93 _BEGINNING_OF_LINE =
'\x1b[0G'
94 _CLEAR_LINE =
'\x1b[2K'
99 'TIMEOUT_FLAKE':
'purple',
110 _FORMAT =
'%(asctime)-15s %(message)s'
111 logging.basicConfig(level=logging.INFO, format=_FORMAT)
115 """Run fn until it doesn't stop because of EINTR"""
120 if e.errno != errno.EINTR:
124 def message(tag, msg, explanatory_text=None, do_newline=False):
125 if message.old_tag == tag
and message.old_msg == msg
and not explanatory_text:
127 message.old_tag = tag
128 message.old_msg = msg
130 if isinstance(explanatory_text, bytes):
131 explanatory_text = explanatory_text.decode(
'utf8', errors=
'replace')
136 logging.info(explanatory_text)
137 logging.info(
'%s: %s', tag, msg)
140 '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' %
141 (_BEGINNING_OF_LINE, _CLEAR_LINE,
'\n%s' %
142 explanatory_text
if explanatory_text
is not None else '',
143 _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
145 if do_newline
or explanatory_text
is not None else ''))
149 if e.errno != errno.EINTR:
160 for path
in os.environ[
'PATH'].
split(os.pathsep):
161 if os.path.exists(os.path.join(path, filename)):
162 return os.path.join(path, filename)
163 raise Exception(
'%s not found' % filename)
167 """Specifies what to run for a job."""
175 timeout_seconds=5 * 60,
180 verbose_success=False,
184 cmdline: a list of arguments to pass as the command line
185 environ: a dictionary of environment variables to set in the child process
186 kill_handler: a handler that will be called whenever job.kill() is invoked
187 cpu_cost: number of cores per second this job needs
188 logfilename: use given file to store job's output, rather than using a temporary file
194 self.
shortname = cmdline[0]
if shortname
is None else shortname
207 'Cannot use custom logfile when retries are enabled')
216 return self.
identity() == other.identity()
219 return self.
identity() < other.identity()
222 return 'JobSpec(shortname=%s, cmdline=%s)' % (self.
shortname,
226 return '%s: %s %s' % (self.
shortname,
' '.join(
227 '%s=%s' % kv
for kv
in list(self.
environ.
items())),
' '.join(
250 """Manages one job."""
257 quiet_success=False):
275 if self.
_spec.logfilename:
277 logfile_dir = os.path.dirname(
278 os.path.abspath(self.
_spec.logfilename))
279 if not os.path.exists(logfile_dir):
280 os.makedirs(logfile_dir)
286 self.
_logfile = tempfile.NamedTemporaryFile()
287 env = dict(os.environ)
288 env.update(self.
_spec.environ)
292 cmdline = self.
_spec.cmdline
295 global measure_cpu_costs
296 if measure_cpu_costs
and not 'vsprojects\\build' in cmdline[0]:
297 cmdline = [
'time',
'-p'] + cmdline
299 measure_cpu_costs =
False
300 try_start =
lambda: subprocess.Popen(args=cmdline,
301 stderr=subprocess.STDOUT,
304 shell=self.
_spec.shell,
307 for i
in range(0, 4):
313 'WARNING',
'Failed to start %s, retrying in %f seconds' %
314 (self.
_spec.shortname, delay))
322 """Poll current state of the job. Prints messages at completion."""
326 self.
result.message = stdout[-_MAX_RESULT_SIZE:]
330 elapsed = time.time() - self.
_start
331 self.
result.elapsed_time = elapsed
335 '%s [ret=%d, pid=%d]' %
341 self.
result.num_failures += 1
349 '%s [ret=%d, pid=%d, time=%.1fsec]' %
354 self.
result.state =
'FAILED'
355 self.
result.num_failures += 1
360 if measure_cpu_costs:
362 r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
364 real = float(m.group(1))
365 user = float(m.group(2))
366 sys = float(m.group(3))
368 cores = (user + sys) / real
369 self.
result.cpu_measured = float(
'%.01f' % cores)
370 self.
result.cpu_estimated = float(
'%.01f' %
372 measurement =
'; cpu_cost=%.01f; estimated=%.01f' % (
376 '%s [time=%.1fsec, retries=%d:%d%s]' %
381 self.
result.state =
'PASSED'
382 elif (self.
_state == _RUNNING
and
383 self.
_spec.timeout_seconds
is not None and
384 time.time() - self.
_start > self.
_spec.timeout_seconds):
385 elapsed = time.time() - self.
_start
386 self.
result.elapsed_time = elapsed
394 self.
result.num_failures += 1
396 if self.
_spec.kill_handler:
397 self.
_spec.kill_handler(self)
403 '%s [pid=%d, time=%.1fsec]' %
408 self.
result.state =
'TIMEOUT'
409 self.
result.num_failures += 1
413 if self.
_state == _RUNNING:
415 if self.
_spec.kill_handler:
416 self.
_spec.kill_handler(self)
424 """Manages one run of jobs."""
426 def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
427 newline_on_success, travis, stop_on_failure, add_env,
428 quiet_success, max_time):
455 c += job._spec.cpu_cost
459 """Start a job. Return True on success, False on failure."""
464 skipped_job_result.state =
'SKIPPED'
465 message(
'SKIPPED', spec.shortname, do_newline=
True)
466 self.
resultset[spec.shortname] = [skipped_job_result]
471 if current_cpu_cost == 0:
473 if current_cpu_cost + spec.cpu_cost <= self.
_maxjobs:
476 self.
reap(spec.shortname, spec.cpu_cost)
482 if job.GetSpec().shortname
not in self.
resultset:
483 self.
resultset[job.GetSpec().shortname] = []
486 def reap(self, waiting_for=None, waiting_for_cost=None):
487 """Collect the dead jobs."""
494 if st == _FAILURE
or st == _KILLED:
505 self.
resultset[job.GetSpec().shortname].append(job.result)
516 rstr =
'ETA %.1f sec; %s' % (remaining, rstr)
517 if waiting_for
is not None:
518 wstr =
' next: %s @ %.2f cpu' % (waiting_for,
524 '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
534 """Poll for cancellation."""
562 if len(staging) > 5000:
563 yield (staging.pop(0),
None)
565 for i, x
in enumerate(staging):
570 check_cancelled=_never_cancelled,
572 maxjobs_cpu_agnostic=None,
573 newline_on_success=False,
576 stop_on_failure=False,
584 skipped_job_result.state =
'SKIPPED'
586 message(
'SKIPPED', job.shortname, do_newline=
True)
587 resultset[job.shortname] = [skipped_job_result]
590 check_cancelled, maxjobs
if maxjobs
is not None else _DEFAULT_MAX_JOBS,
591 maxjobs_cpu_agnostic
if maxjobs_cpu_agnostic
is not None else
592 _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env,
593 quiet_success, max_time)
595 if not js.start(cmdline):
597 if remaining
is not None:
598 js.set_remaining(remaining)
600 return js.get_num_failures(), js.resultset