_runner.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from __future__ import absolute_import
16 
17 import collections
18 import os
19 import select
20 import signal
21 import sys
22 import tempfile
23 import threading
24 import time
25 import unittest
26 import uuid
27 
28 import six
29 from six import moves
30 
31 from tests import _loader
32 from tests import _result
33 
34 
35 class CaptureFile(object):
36  """A context-managed file to redirect output to a byte array.
37 
38  Use by invoking `start` (`__enter__`) and at some point invoking `stop`
39  (`__exit__`). At any point after the initial call to `start` call `output` to
40  get the current redirected output. Note that we don't currently use file
41  locking, so calling `output` between calls to `start` and `stop` may muddle
42  the result (you should only be doing this during a Python-handled interrupt as
43  a last ditch effort to provide output to the user).
44 
45  Attributes:
46  _redirected_fd (int): File descriptor of file to redirect writes from.
47  _saved_fd (int): A copy of the original value of the redirected file
48  descriptor.
49  _into_file (TemporaryFile or None): File to which writes are redirected.
50  Only non-None when self is started.
51  """
52 
53  def __init__(self, fd):
54  self._redirected_fd = fd
55  self._saved_fd = os.dup(self._redirected_fd)
56  self._into_file = None
57 
58  def output(self):
59  """Get all output from the redirected-to file if it exists."""
60  if self._into_file:
61  self._into_file.seek(0)
62  return bytes(self._into_file.read())
63  else:
64  return bytes()
65 
66  def start(self):
67  """Start redirection of writes to the file descriptor."""
68  self._into_file = tempfile.TemporaryFile()
69  os.dup2(self._into_file.fileno(), self._redirected_fd)
70 
71  def stop(self):
72  """Stop redirection of writes to the file descriptor."""
73  # n.b. this dup2 call auto-closes self._redirected_fd
74  os.dup2(self._saved_fd, self._redirected_fd)
75 
76  def write_bypass(self, value):
77  """Bypass the redirection and write directly to the original file.
78 
79  Arguments:
80  value (str): What to write to the original file.
81  """
82  if six.PY3 and not isinstance(value, six.binary_type):
83  value = value.encode('ascii')
84  if self._saved_fd is None:
85  os.write(self._redirect_fd, value)
86  else:
87  os.write(self._saved_fd, value)
88 
89  def __enter__(self):
90  self.start()
91  return self
92 
93  def __exit__(self, type, value, traceback):
94  self.stop()
95 
96  def close(self):
97  """Close any resources used by self not closed by stop()."""
98  os.close(self._saved_fd)
99 
100 
101 class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
102  """A test case with a guaranteed unique externally specified identifier.
103 
104  Attributes:
105  case (unittest.TestCase): TestCase we're decorating with an additional
106  identifier.
107  id (object): Any identifier that may be considered 'unique' for testing
108  purposes.
109  """
110 
111  def __new__(cls, case, id=None):
112  if id is None:
113  id = uuid.uuid4()
114  return super(cls, AugmentedCase).__new__(cls, case, id)
115 
116 
117 # NOTE(lidiz) This complex wrapper is not triggering setUpClass nor
118 # tearDownClass. Do not use those methods, or fix this wrapper!
119 class Runner(object):
120 
121  def __init__(self, dedicated_threads=False):
122  """Constructs the Runner object.
123 
124  Args:
125  dedicated_threads: A bool indicates whether to spawn each unit test
126  in separate thread or not.
127  """
128  self._skipped_tests = []
129  self._dedicated_threads = dedicated_threads
130 
131  def skip_tests(self, tests):
132  self._skipped_tests = tests
133 
134  def run(self, suite):
135  """See setuptools' test_runner setup argument for information."""
136  # only run test cases with id starting with given prefix
137  testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
138  filtered_cases = []
139  for case in _loader.iterate_suite_cases(suite):
140  if not testcase_filter or case.id().startswith(testcase_filter):
141  filtered_cases.append(case)
142 
143  # Ensure that every test case has no collision with any other test case in
144  # the augmented results.
145  augmented_cases = [
146  AugmentedCase(case, uuid.uuid4()) for case in filtered_cases
147  ]
148  case_id_by_case = dict((augmented_case.case, augmented_case.id)
149  for augmented_case in augmented_cases)
150  result_out = moves.cStringIO()
151  result = _result.TerminalResult(
152  result_out, id_map=lambda case: case_id_by_case[case])
153  stdout_pipe = CaptureFile(sys.stdout.fileno())
154  stderr_pipe = CaptureFile(sys.stderr.fileno())
155  kill_flag = [False]
156 
157  def sigint_handler(signal_number, frame):
158  if signal_number == signal.SIGINT:
159  kill_flag[0] = True # Python 2.7 not having 'local'... :-(
160  signal.signal(signal_number, signal.SIG_DFL)
161 
162  def fault_handler(signal_number, frame):
163  stdout_pipe.write_bypass(
164  'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'.format(
165  signal_number, stdout_pipe.output(), stderr_pipe.output()))
166  os._exit(1)
167 
168  def check_kill_self():
169  if kill_flag[0]:
170  stdout_pipe.write_bypass('Stopping tests short...')
171  result.stopTestRun()
172  stdout_pipe.write_bypass(result_out.getvalue())
173  stdout_pipe.write_bypass('\ninterrupted stdout:\n{}\n'.format(
174  stdout_pipe.output().decode()))
175  stderr_pipe.write_bypass('\ninterrupted stderr:\n{}\n'.format(
176  stderr_pipe.output().decode()))
177  os._exit(1)
178 
179  def try_set_handler(name, handler):
180  try:
181  signal.signal(getattr(signal, name), handler)
182  except AttributeError:
183  pass
184 
185  try_set_handler('SIGINT', sigint_handler)
186  try_set_handler('SIGBUS', fault_handler)
187  try_set_handler('SIGABRT', fault_handler)
188  try_set_handler('SIGFPE', fault_handler)
189  try_set_handler('SIGILL', fault_handler)
190  # Sometimes output will lag after a test has successfully finished; we
191  # ignore such writes to our pipes.
192  try_set_handler('SIGPIPE', signal.SIG_IGN)
193 
194  # Run the tests
195  result.startTestRun()
196  for augmented_case in augmented_cases:
197  for skipped_test in self._skipped_tests:
198  if skipped_test in augmented_case.case.id():
199  break
200  else:
201  sys.stdout.write('Running {}\n'.format(
202  augmented_case.case.id()))
203  sys.stdout.flush()
204  if self._dedicated_threads:
205  # (Deprecated) Spawns dedicated thread for each test case.
206  case_thread = threading.Thread(
207  target=augmented_case.case.run, args=(result,))
208  try:
209  with stdout_pipe, stderr_pipe:
210  case_thread.start()
211  # If the thread is exited unexpected, stop testing.
212  while case_thread.is_alive():
213  check_kill_self()
214  time.sleep(0)
215  case_thread.join()
216  except: # pylint: disable=try-except-raise
217  # re-raise the exception after forcing the with-block to end
218  raise
219  # Records the result of the test case run.
220  result.set_output(augmented_case.case, stdout_pipe.output(),
221  stderr_pipe.output())
222  sys.stdout.write(result_out.getvalue())
223  sys.stdout.flush()
224  result_out.truncate(0)
225  check_kill_self()
226  else:
227  # Donates current thread to test case execution.
228  augmented_case.case.run(result)
229  result.stopTestRun()
230  stdout_pipe.close()
231  stderr_pipe.close()
232 
233  # Report results
234  sys.stdout.write(result_out.getvalue())
235  sys.stdout.flush()
236  signal.signal(signal.SIGINT, signal.SIG_DFL)
237  with open('report.xml', 'wb') as report_xml_file:
238  _result.jenkins_junit_xml(result).write(report_xml_file)
239  return result
http2_test_server.format
format
Definition: http2_test_server.py:118
tests._result.TerminalResult
Definition: _result.py:261
tests._runner.Runner._dedicated_threads
_dedicated_threads
Definition: _runner.py:129
tests._runner.Runner.skip_tests
def skip_tests(self, tests)
Definition: _runner.py:131
write
#define write
Definition: test-fs.c:47
tests._runner.CaptureFile._into_file
_into_file
Definition: _runner.py:56
tests._runner.CaptureFile.__enter__
def __enter__(self)
Definition: _runner.py:89
tests._runner.CaptureFile.__exit__
def __exit__(self, type, value, traceback)
Definition: _runner.py:93
sigint_handler
static void sigint_handler(int)
Definition: server_common.cc:35
tests._runner.CaptureFile.start
def start(self)
Definition: _runner.py:66
tests._runner.CaptureFile.output
def output(self)
Definition: _runner.py:58
tests._runner.CaptureFile.write_bypass
def write_bypass(self, value)
Definition: _runner.py:76
tests._runner.CaptureFile.stop
def stop(self)
Definition: _runner.py:71
tests._runner.CaptureFile.close
def close(self)
Definition: _runner.py:96
tests._runner.CaptureFile._redirected_fd
_redirected_fd
Definition: _runner.py:54
tests._runner.Runner.__init__
def __init__(self, dedicated_threads=False)
Definition: _runner.py:121
tests._runner.CaptureFile
Definition: _runner.py:35
tests._runner.CaptureFile._saved_fd
_saved_fd
Definition: _runner.py:55
tests._runner.CaptureFile.__init__
def __init__(self, fd)
Definition: _runner.py:53
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
tests._runner.Runner._skipped_tests
_skipped_tests
Definition: _runner.py:128
tests._runner.AugmentedCase
Definition: _runner.py:101
grpc._common.decode
def decode(b)
Definition: grpc/_common.py:75
open
#define open
Definition: test-fs.c:46
tests._runner.AugmentedCase.__new__
def __new__(cls, case, id=None)
Definition: _runner.py:111
tests._runner.Runner.run
def run(self, suite)
Definition: _runner.py:134
tests._runner.Runner
Definition: _runner.py:119


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27