14 """Defines a number of module-scope gRPC scenarios to test clean exit."""
29 UNSTARTED_SERVER =
'unstarted_server'
30 RUNNING_SERVER =
'running_server'
31 POLL_CONNECTIVITY_NO_SERVER =
'poll_connectivity_no_server'
32 POLL_CONNECTIVITY =
'poll_connectivity'
33 IN_FLIGHT_UNARY_UNARY_CALL =
'in_flight_unary_unary_call'
34 IN_FLIGHT_UNARY_STREAM_CALL =
'in_flight_unary_stream_call'
35 IN_FLIGHT_STREAM_UNARY_CALL =
'in_flight_stream_unary_call'
36 IN_FLIGHT_STREAM_STREAM_CALL =
'in_flight_stream_stream_call'
37 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL =
'in_flight_partial_unary_stream_call'
38 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL =
'in_flight_partial_stream_unary_call'
39 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL =
'in_flight_partial_stream_stream_call'
41 UNARY_UNARY = b
'/test/UnaryUnary'
42 UNARY_STREAM = b
'/test/UnaryStream'
43 STREAM_UNARY = b
'/test/StreamUnary'
44 STREAM_STREAM = b
'/test/StreamStream'
45 PARTIAL_UNARY_STREAM = b
'/test/PartialUnaryStream'
46 PARTIAL_STREAM_UNARY = b
'/test/PartialStreamUnary'
47 PARTIAL_STREAM_STREAM = b
'/test/PartialStreamStream'
50 IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
51 IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
52 IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
53 IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
54 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
55 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
56 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
69 for _
in range(test_constants.STREAM_LENGTH // 2):
79 for _
in range(test_constants.STREAM_LENGTH // 2):
80 next(request_iterator)
89 for _
in range(test_constants.STREAM_LENGTH // 2):
90 yield next(request_iterator)
96 def __init__(self, request_streaming, response_streaming, partial_hang):
127 if handler_call_details.method == UNARY_UNARY:
129 elif handler_call_details.method == UNARY_STREAM:
131 elif handler_call_details.method == STREAM_UNARY:
133 elif handler_call_details.method == STREAM_STREAM:
135 elif handler_call_details.method == PARTIAL_UNARY_STREAM:
137 elif handler_call_details.method == PARTIAL_STREAM_UNARY:
139 elif handler_call_details.method == PARTIAL_STREAM_STREAM:
151 thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
164 if __name__ ==
'__main__':
165 logging.basicConfig()
166 parser = argparse.ArgumentParser()
167 parser.add_argument(
'scenario', type=str)
168 parser.add_argument(
'--wait_for_interrupt',
169 dest=
'wait_for_interrupt',
171 args = parser.parse_args()
173 if args.scenario == UNSTARTED_SERVER:
175 if args.wait_for_interrupt:
176 time.sleep(WAIT_TIME)
177 elif args.scenario == RUNNING_SERVER:
179 port = server.add_insecure_port(
'[::]:0')
181 if args.wait_for_interrupt:
182 time.sleep(WAIT_TIME)
183 elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
189 channel.subscribe(connectivity_callback, try_to_connect=
True)
190 if args.wait_for_interrupt:
191 time.sleep(WAIT_TIME)
192 elif args.scenario == POLL_CONNECTIVITY:
193 server =
grpc.server(DaemonPool(), options=((
'grpc.so_reuseport', 0),))
194 port = server.add_insecure_port(
'[::]:0')
201 channel.subscribe(connectivity_callback, try_to_connect=
True)
202 if args.wait_for_interrupt:
203 time.sleep(WAIT_TIME)
208 port = server.add_insecure_port(
'[::]:0')
209 server.add_generic_rpc_handlers((handler,))
213 method = TEST_TO_METHOD[args.scenario]
215 if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
216 multi_callable = channel.unary_unary(method)
217 future = multi_callable.future(REQUEST)
218 result, call = multi_callable.with_call(REQUEST)
219 elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL
or
220 args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
221 multi_callable = channel.unary_stream(method)
223 for response
in response_iterator:
225 elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL
or
226 args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
227 multi_callable = channel.stream_unary(method)
229 result, call = multi_callable.with_call(
230 iter([REQUEST] * test_constants.STREAM_LENGTH))
231 elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL
or
232 args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
233 multi_callable = channel.stream_stream(method)
235 for response
in response_iterator: