14 """Implementations of fork support test methods."""
19 import multiprocessing
25 from six.moves
import queue
27 from src.proto.grpc.testing
import empty_pb2
28 from src.proto.grpc.testing
import messages_pb2
29 from src.proto.grpc.testing
import test_pb2_grpc
31 _LOGGER = logging.getLogger(__name__)
33 _CHILD_FINISH_TIMEOUT_S = 60
37 target =
'{}:{}'.
format(args[
'server_host'], args[
'server_port'])
47 if response.payload.type
is not expected_type:
48 raise ValueError(
'expected payload type %s, got %s' %
49 (expected_type,
type(response.payload.type)))
50 elif len(response.payload.body) != expected_length:
51 raise ValueError(
'expected payload body size %d, got %d' %
52 (expected_length,
len(response.payload.body)))
58 response_type=messages_pb2.COMPRESSABLE,
61 response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
62 response = response_future.result()
69 response_type=messages_pb2.COMPRESSABLE,
72 response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
122 def record_exceptions():
127 except Exception
as e:
130 self.
_process = multiprocessing.Process(target=record_exceptions)
136 self.
_process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
138 raise RuntimeError(
'Child process did not terminate')
140 raise ValueError(
'Child process failed with exitcode %d' %
144 raise ValueError(
'Child process failed: "%s": "%s"' %
145 (repr(exception), exception))
156 'Child should not be able to re-use channel after fork')
157 except ValueError
as expected_value_error:
160 stub = test_pb2_grpc.TestServiceStub(channel)
163 child_process.start()
165 child_process.finish()
171 with _channel(args)
as child_channel:
172 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
174 child_channel.close()
176 stub = test_pb2_grpc.TestServiceStub(channel)
179 child_process.start()
181 child_process.finish()
190 'Child should not be able to re-use channel after fork')
191 except ValueError
as expected_value_error:
194 stub = test_pb2_grpc.TestServiceStub(channel)
197 child_process.start()
198 child_process.finish()
204 with _channel(args)
as child_channel:
205 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
208 stub = test_pb2_grpc.TestServiceStub(channel)
211 child_process.start()
213 child_process.finish()
221 with _channel(args)
as child_channel:
222 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
225 stub = test_pb2_grpc.TestServiceStub(channel)
230 new_stub = test_pb2_grpc.TestServiceStub(new_channel)
232 child_process.start()
234 child_process.finish()
240 parent_channel_ready_event = threading.Event()
244 child_channel_ready_event = threading.Event()
246 def child_connectivity_callback(state):
247 if state
is grpc.ChannelConnectivity.READY:
248 child_channel_ready_event.set()
250 with _channel(args)
as child_channel:
251 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
252 child_channel.subscribe(child_connectivity_callback)
254 if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
255 raise ValueError(
'Channel did not move to READY')
256 if len(parent_states) > 1:
258 'Received connectivity updates on parent callback',
260 child_channel.unsubscribe(child_connectivity_callback)
262 def parent_connectivity_callback(state):
263 parent_states.append(state)
264 if state
is grpc.ChannelConnectivity.READY:
265 parent_channel_ready_event.set()
267 channel.subscribe(parent_connectivity_callback)
268 stub = test_pb2_grpc.TestServiceStub(channel)
270 child_process.start()
272 if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
273 raise ValueError(
'Channel did not move to READY')
274 channel.unsubscribe(parent_connectivity_callback)
275 child_process.finish()
279 channel, args, child_target, run_after_close=True):
280 request_response_sizes = (
286 request_payload_sizes = (
292 stub = test_pb2_grpc.TestServiceStub(channel)
294 parent_bidi_call = stub.FullDuplexCall(pipe)
296 first_message_received =
False
297 for response_size, payload_size
in zip(request_response_sizes,
298 request_payload_sizes):
300 response_type=messages_pb2.COMPRESSABLE,
302 size=response_size),),
305 if first_message_received:
307 (parent_bidi_call, channel, args))
308 child_process.start()
309 child_processes.append(child_process)
310 response =
next(parent_bidi_call)
311 first_message_received =
True
313 (parent_bidi_call, channel, args))
314 child_process.start()
315 child_processes.append(child_process)
321 (parent_bidi_call, channel, args))
322 child_process.start()
323 child_processes.append(child_process)
324 for child_process
in child_processes:
325 child_process.finish()
330 def child_target(parent_bidi_call, parent_channel, args):
331 stub = test_pb2_grpc.TestServiceStub(parent_channel)
335 'Child should not be able to re-use channel after fork')
336 except ValueError
as expected_value_error:
338 inherited_code = parent_bidi_call.code()
339 inherited_details = parent_bidi_call.details()
340 if inherited_code != grpc.StatusCode.CANCELLED:
341 raise ValueError(
'Expected inherited code CANCELLED, got %s' %
343 if inherited_details !=
'Channel closed due to fork':
345 'Expected inherited details Channel closed due to fork, got %s'
353 run_after_close=
False)
358 def child_target(parent_bidi_call, parent_channel, args):
359 stub = test_pb2_grpc.TestServiceStub(parent_channel)
363 'Child should not be able to re-use channel after fork')
364 except ValueError
as expected_value_error:
368 channel,
None, child_target)
373 def child_target(parent_bidi_call, parent_channel, args):
374 stub = test_pb2_grpc.TestServiceStub(parent_channel)
378 'Child should not be able to re-use channel after fork')
379 except ValueError
as expected_value_error:
383 channel,
None, child_target)
388 def child_target(parent_bidi_call, parent_channel, args):
390 stub = test_pb2_grpc.TestServiceStub(channel)
394 channel, args, child_target)
399 def child_target(parent_bidi_call, parent_channel, args):
401 stub = test_pb2_grpc.TestServiceStub(channel)
405 channel, args, child_target)
411 CONNECTIVITY_WATCH =
'connectivity_watch'
412 CLOSE_CHANNEL_BEFORE_FORK =
'close_channel_before_fork'
413 ASYNC_UNARY_SAME_CHANNEL =
'async_unary_same_channel'
414 ASYNC_UNARY_NEW_CHANNEL =
'async_unary_new_channel'
415 BLOCKING_UNARY_SAME_CHANNEL =
'blocking_unary_same_channel'
416 BLOCKING_UNARY_NEW_CHANNEL =
'blocking_unary_new_channel'
417 IN_PROGRESS_BIDI_CONTINUE_CALL =
'in_progress_bidi_continue_call'
418 IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL =
'in_progress_bidi_same_channel_async_call'
419 IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL =
'in_progress_bidi_same_channel_blocking_call'
420 IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL =
'in_progress_bidi_new_channel_async_call'
421 IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL =
'in_progress_bidi_new_channel_blocking_call'
424 _LOGGER.info(
"Running %s", self)
426 if self
is TestCase.ASYNC_UNARY_SAME_CHANNEL:
428 elif self
is TestCase.ASYNC_UNARY_NEW_CHANNEL:
430 elif self
is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
432 elif self
is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
434 elif self
is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
436 elif self
is TestCase.CONNECTIVITY_WATCH:
438 elif self
is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
440 elif self
is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
442 elif self
is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
444 elif self
is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
446 elif self
is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
449 raise NotImplementedError(
'Test case "%s" not implemented!' %