fork/methods.py
Go to the documentation of this file.
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Implementations of fork support test methods."""
15 
16 import enum
17 import json
18 import logging
19 import multiprocessing
20 import os
21 import threading
22 import time
23 
24 import grpc
25 from six.moves import queue
26 
27 from src.proto.grpc.testing import empty_pb2
28 from src.proto.grpc.testing import messages_pb2
29 from src.proto.grpc.testing import test_pb2_grpc
30 
31 _LOGGER = logging.getLogger(__name__)
32 _RPC_TIMEOUT_S = 10
33 _CHILD_FINISH_TIMEOUT_S = 60
34 
35 
36 def _channel(args):
37  target = '{}:{}'.format(args['server_host'], args['server_port'])
38  if args['use_tls']:
39  channel_credentials = grpc.ssl_channel_credentials()
40  channel = grpc.secure_channel(target, channel_credentials)
41  else:
42  channel = grpc.insecure_channel(target)
43  return channel
44 
45 
46 def _validate_payload_type_and_length(response, expected_type, expected_length):
47  if response.payload.type is not expected_type:
48  raise ValueError('expected payload type %s, got %s' %
49  (expected_type, type(response.payload.type)))
50  elif len(response.payload.body) != expected_length:
51  raise ValueError('expected payload body size %d, got %d' %
52  (expected_length, len(response.payload.body)))
53 
54 
55 def _async_unary(stub):
56  size = 314159
58  response_type=messages_pb2.COMPRESSABLE,
59  response_size=size,
60  payload=messages_pb2.Payload(body=b'\x00' * 271828))
61  response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
62  response = response_future.result()
63  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
64 
65 
66 def _blocking_unary(stub):
67  size = 314159
69  response_type=messages_pb2.COMPRESSABLE,
70  response_size=size,
71  payload=messages_pb2.Payload(body=b'\x00' * 271828))
72  response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
73  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
74 
75 
76 class _Pipe(object):
77 
78  def __init__(self):
79  self._condition = threading.Condition()
80  self._values = []
81  self._open = True
82 
83  def __iter__(self):
84  return self
85 
86  def __next__(self):
87  return self.next()
88 
89  def next(self):
90  with self._condition:
91  while not self._values and self._open:
92  self._condition.wait()
93  if self._values:
94  return self._values.pop(0)
95  else:
96  raise StopIteration()
97 
98  def add(self, value):
99  with self._condition:
100  self._values.append(value)
101  self._condition.notify()
102 
103  def close(self):
104  with self._condition:
105  self._open = False
106  self._condition.notify()
107 
108  def __enter__(self):
109  return self
110 
111  def __exit__(self, type, value, traceback):
112  self.close()
113 
114 
115 class _ChildProcess(object):
116 
117  def __init__(self, task, args=None):
118  if args is None:
119  args = ()
120  self._exceptions = multiprocessing.Queue()
121 
122  def record_exceptions():
123  try:
124  task(*args)
125  except grpc.RpcError as rpc_error:
126  self._exceptions.put('RpcError: %s' % rpc_error)
127  except Exception as e: # pylint: disable=broad-except
128  self._exceptions.put(e)
129 
130  self._process = multiprocessing.Process(target=record_exceptions)
131 
132  def start(self):
133  self._process.start()
134 
135  def finish(self):
136  self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
137  if self._process.is_alive():
138  raise RuntimeError('Child process did not terminate')
139  if self._process.exitcode != 0:
140  raise ValueError('Child process failed with exitcode %d' %
141  self._process.exitcode)
142  try:
143  exception = self._exceptions.get(block=False)
144  raise ValueError('Child process failed: "%s": "%s"' %
145  (repr(exception), exception))
146  except queue.Empty:
147  pass
148 
149 
151 
152  def child_target():
153  try:
154  _async_unary(stub)
155  raise Exception(
156  'Child should not be able to re-use channel after fork')
157  except ValueError as expected_value_error:
158  pass
159 
160  stub = test_pb2_grpc.TestServiceStub(channel)
161  _async_unary(stub)
162  child_process = _ChildProcess(child_target)
163  child_process.start()
164  _async_unary(stub)
165  child_process.finish()
166 
167 
168 def _async_unary_new_channel(channel, args):
169 
170  def child_target():
171  with _channel(args) as child_channel:
172  child_stub = test_pb2_grpc.TestServiceStub(child_channel)
173  _async_unary(child_stub)
174  child_channel.close()
175 
176  stub = test_pb2_grpc.TestServiceStub(channel)
177  _async_unary(stub)
178  child_process = _ChildProcess(child_target)
179  child_process.start()
180  _async_unary(stub)
181  child_process.finish()
182 
183 
185 
186  def child_target():
187  try:
188  _blocking_unary(stub)
189  raise Exception(
190  'Child should not be able to re-use channel after fork')
191  except ValueError as expected_value_error:
192  pass
193 
194  stub = test_pb2_grpc.TestServiceStub(channel)
195  _blocking_unary(stub)
196  child_process = _ChildProcess(child_target)
197  child_process.start()
198  child_process.finish()
199 
200 
201 def _blocking_unary_new_channel(channel, args):
202 
203  def child_target():
204  with _channel(args) as child_channel:
205  child_stub = test_pb2_grpc.TestServiceStub(child_channel)
206  _blocking_unary(child_stub)
207 
208  stub = test_pb2_grpc.TestServiceStub(channel)
209  _blocking_unary(stub)
210  child_process = _ChildProcess(child_target)
211  child_process.start()
212  _blocking_unary(stub)
213  child_process.finish()
214 
215 
216 # Verify that the fork channel registry can handle already closed channels
217 def _close_channel_before_fork(channel, args):
218 
219  def child_target():
220  new_channel.close()
221  with _channel(args) as child_channel:
222  child_stub = test_pb2_grpc.TestServiceStub(child_channel)
223  _blocking_unary(child_stub)
224 
225  stub = test_pb2_grpc.TestServiceStub(channel)
226  _blocking_unary(stub)
227  channel.close()
228 
229  with _channel(args) as new_channel:
230  new_stub = test_pb2_grpc.TestServiceStub(new_channel)
231  child_process = _ChildProcess(child_target)
232  child_process.start()
233  _blocking_unary(new_stub)
234  child_process.finish()
235 
236 
237 def _connectivity_watch(channel, args):
238 
239  parent_states = []
240  parent_channel_ready_event = threading.Event()
241 
242  def child_target():
243 
244  child_channel_ready_event = threading.Event()
245 
246  def child_connectivity_callback(state):
247  if state is grpc.ChannelConnectivity.READY:
248  child_channel_ready_event.set()
249 
250  with _channel(args) as child_channel:
251  child_stub = test_pb2_grpc.TestServiceStub(child_channel)
252  child_channel.subscribe(child_connectivity_callback)
253  _async_unary(child_stub)
254  if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
255  raise ValueError('Channel did not move to READY')
256  if len(parent_states) > 1:
257  raise ValueError(
258  'Received connectivity updates on parent callback',
259  parent_states)
260  child_channel.unsubscribe(child_connectivity_callback)
261 
262  def parent_connectivity_callback(state):
263  parent_states.append(state)
264  if state is grpc.ChannelConnectivity.READY:
265  parent_channel_ready_event.set()
266 
267  channel.subscribe(parent_connectivity_callback)
268  stub = test_pb2_grpc.TestServiceStub(channel)
269  child_process = _ChildProcess(child_target)
270  child_process.start()
271  _async_unary(stub)
272  if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
273  raise ValueError('Channel did not move to READY')
274  channel.unsubscribe(parent_connectivity_callback)
275  child_process.finish()
276 
277 
279  channel, args, child_target, run_after_close=True):
280  request_response_sizes = (
281  31415,
282  9,
283  2653,
284  58979,
285  )
286  request_payload_sizes = (
287  27182,
288  8,
289  1828,
290  45904,
291  )
292  stub = test_pb2_grpc.TestServiceStub(channel)
293  pipe = _Pipe()
294  parent_bidi_call = stub.FullDuplexCall(pipe)
295  child_processes = []
296  first_message_received = False
297  for response_size, payload_size in zip(request_response_sizes,
298  request_payload_sizes):
300  response_type=messages_pb2.COMPRESSABLE,
301  response_parameters=(messages_pb2.ResponseParameters(
302  size=response_size),),
303  payload=messages_pb2.Payload(body=b'\x00' * payload_size))
304  pipe.add(request)
305  if first_message_received:
306  child_process = _ChildProcess(child_target,
307  (parent_bidi_call, channel, args))
308  child_process.start()
309  child_processes.append(child_process)
310  response = next(parent_bidi_call)
311  first_message_received = True
312  child_process = _ChildProcess(child_target,
313  (parent_bidi_call, channel, args))
314  child_process.start()
315  child_processes.append(child_process)
316  _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
317  response_size)
318  pipe.close()
319  if run_after_close:
320  child_process = _ChildProcess(child_target,
321  (parent_bidi_call, channel, args))
322  child_process.start()
323  child_processes.append(child_process)
324  for child_process in child_processes:
325  child_process.finish()
326 
327 
329 
330  def child_target(parent_bidi_call, parent_channel, args):
331  stub = test_pb2_grpc.TestServiceStub(parent_channel)
332  try:
333  _async_unary(stub)
334  raise Exception(
335  'Child should not be able to re-use channel after fork')
336  except ValueError as expected_value_error:
337  pass
338  inherited_code = parent_bidi_call.code()
339  inherited_details = parent_bidi_call.details()
340  if inherited_code != grpc.StatusCode.CANCELLED:
341  raise ValueError('Expected inherited code CANCELLED, got %s' %
342  inherited_code)
343  if inherited_details != 'Channel closed due to fork':
344  raise ValueError(
345  'Expected inherited details Channel closed due to fork, got %s'
346  % inherited_details)
347 
348  # Don't run child_target after closing the parent call, as the call may have
349  # received a status from the server before fork occurs.
351  None,
352  child_target,
353  run_after_close=False)
354 
355 
357 
358  def child_target(parent_bidi_call, parent_channel, args):
359  stub = test_pb2_grpc.TestServiceStub(parent_channel)
360  try:
361  _async_unary(stub)
362  raise Exception(
363  'Child should not be able to re-use channel after fork')
364  except ValueError as expected_value_error:
365  pass
366 
368  channel, None, child_target)
369 
370 
372 
373  def child_target(parent_bidi_call, parent_channel, args):
374  stub = test_pb2_grpc.TestServiceStub(parent_channel)
375  try:
376  _blocking_unary(stub)
377  raise Exception(
378  'Child should not be able to re-use channel after fork')
379  except ValueError as expected_value_error:
380  pass
381 
383  channel, None, child_target)
384 
385 
387 
388  def child_target(parent_bidi_call, parent_channel, args):
389  with _channel(args) as channel:
390  stub = test_pb2_grpc.TestServiceStub(channel)
391  _async_unary(stub)
392 
394  channel, args, child_target)
395 
396 
398 
399  def child_target(parent_bidi_call, parent_channel, args):
400  with _channel(args) as channel:
401  stub = test_pb2_grpc.TestServiceStub(channel)
402  _blocking_unary(stub)
403 
405  channel, args, child_target)
406 
407 
408 @enum.unique
409 class TestCase(enum.Enum):
410 
411  CONNECTIVITY_WATCH = 'connectivity_watch'
412  CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
413  ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
414  ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
415  BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
416  BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
417  IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
418  IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
419  IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
420  IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
421  IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
422 
423  def run_test(self, args):
424  _LOGGER.info("Running %s", self)
425  channel = _channel(args)
426  if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
428  elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
429  _async_unary_new_channel(channel, args)
430  elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
432  elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
433  _blocking_unary_new_channel(channel, args)
434  elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
435  _close_channel_before_fork(channel, args)
436  elif self is TestCase.CONNECTIVITY_WATCH:
437  _connectivity_watch(channel, args)
438  elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
440  elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
442  elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
444  elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
446  elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
448  else:
449  raise NotImplementedError('Test case "%s" not implemented!' %
450  self.name)
451  channel.close()
tests.fork.methods._ChildProcess.finish
def finish(self)
Definition: fork/methods.py:135
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.fork.methods._ChildProcess.__init__
def __init__(self, task, args=None)
Definition: fork/methods.py:117
tests.fork.methods._blocking_unary
def _blocking_unary(stub)
Definition: fork/methods.py:66
tests.fork.methods._blocking_unary_new_channel
def _blocking_unary_new_channel(channel, args)
Definition: fork/methods.py:201
http2_test_server.format
format
Definition: http2_test_server.py:118
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
tests.fork.methods._ChildProcess
Definition: fork/methods.py:115
tests.fork.methods._Pipe.__next__
def __next__(self)
Definition: fork/methods.py:86
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests.fork.methods._ChildProcess._exceptions
_exceptions
Definition: fork/methods.py:120
tests.fork.methods._in_progress_bidi_new_channel_blocking_call
def _in_progress_bidi_new_channel_blocking_call(channel, args)
Definition: fork/methods.py:397
tests.fork.methods._Pipe.__iter__
def __iter__(self)
Definition: fork/methods.py:83
tests.fork.methods._Pipe.add
def add(self, value)
Definition: fork/methods.py:98
tests.fork.methods._connectivity_watch
def _connectivity_watch(channel, args)
Definition: fork/methods.py:237
tests.fork.methods._ChildProcess.start
def start(self)
Definition: fork/methods.py:132
tests.fork.methods._ping_pong_with_child_processes_after_first_response
def _ping_pong_with_child_processes_after_first_response(channel, args, child_target, run_after_close=True)
Definition: fork/methods.py:278
tests.fork.methods._ChildProcess._process
_process
Definition: fork/methods.py:130
tests.fork.methods._Pipe.__init__
def __init__(self)
Definition: fork/methods.py:78
tests.fork.methods._async_unary_new_channel
def _async_unary_new_channel(channel, args)
Definition: fork/methods.py:168
tests.fork.methods.TestCase.run_test
def run_test(self, args)
Definition: fork/methods.py:423
tests.fork.methods._Pipe.__exit__
def __exit__(self, type, value, traceback)
Definition: fork/methods.py:111
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
tests.fork.methods._Pipe.__enter__
def __enter__(self)
Definition: fork/methods.py:108
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests.fork.methods._blocking_unary_same_channel
def _blocking_unary_same_channel(channel)
Definition: fork/methods.py:184
tests.fork.methods._Pipe.next
def next(self)
Definition: fork/methods.py:89
tests.fork.methods._close_channel_before_fork
def _close_channel_before_fork(channel, args)
Definition: fork/methods.py:217
tests.fork.methods._in_progress_bidi_same_channel_async_call
def _in_progress_bidi_same_channel_async_call(channel)
Definition: fork/methods.py:356
tests.fork.methods._in_progress_bidi_continue_call
def _in_progress_bidi_continue_call(channel)
Definition: fork/methods.py:328
tests.fork.methods._async_unary
def _async_unary(stub)
Definition: fork/methods.py:55
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
grpc.ssl_channel_credentials
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
Definition: src/python/grpcio/grpc/__init__.py:1607
tests.fork.methods._in_progress_bidi_new_channel_async_call
def _in_progress_bidi_new_channel_async_call(channel, args)
Definition: fork/methods.py:386
wait
static void wait(notification *n)
Definition: alts_tsi_handshaker_test.cc:114
tests.fork.methods._in_progress_bidi_same_channel_blocking_call
def _in_progress_bidi_same_channel_blocking_call(channel)
Definition: fork/methods.py:371
tests.fork.methods._validate_payload_type_and_length
def _validate_payload_type_and_length(response, expected_type, expected_length)
Definition: fork/methods.py:46
tests.fork.methods._Pipe._condition
_condition
Definition: fork/methods.py:79
tests.fork.methods._async_unary_same_channel
def _async_unary_same_channel(channel)
Definition: fork/methods.py:150
tests.fork.methods.TestCase
Definition: fork/methods.py:409
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests.fork.methods._Pipe._open
_open
Definition: fork/methods.py:81
tests.fork.methods._Pipe
Definition: fork/methods.py:76
tests.fork.methods._Pipe.close
def close(self)
Definition: fork/methods.py:103
tests.fork.methods._channel
def _channel(args)
Definition: fork/methods.py:36
tests.fork.methods._Pipe._values
_values
Definition: fork/methods.py:80
grpc.secure_channel
def secure_channel(target, credentials, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1982


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:29