14 """Tests metadata flags feature by testing wait-for-ready semantics"""
24 from six.moves
import queue
31 _UNARY_UNARY =
'/test/UnaryUnary'
32 _UNARY_STREAM =
'/test/UnaryStream'
33 _STREAM_UNARY =
'/test/StreamUnary'
34 _STREAM_STREAM =
'/test/StreamStream'
36 _REQUEST = b
'\x00\x00\x00'
37 _RESPONSE = b
'\x00\x00\x00'
45 for _
in range(test_constants.STREAM_LENGTH):
50 for _
in request_iterator:
56 for _
in request_iterator:
62 def __init__(self, test, request_streaming, response_streaming):
91 if handler_call_details.method == _UNARY_UNARY:
93 elif handler_call_details.method == _UNARY_STREAM:
95 elif handler_call_details.method == _STREAM_UNARY:
97 elif handler_call_details.method == _STREAM_STREAM:
104 """Creating phony channels is a workaround for retries"""
105 host, port, sock =
get_socket(sock_options=(socket.SO_REUSEADDR,))
111 channel.unary_unary(_UNARY_UNARY).__call__(
113 timeout=test_constants.LONG_TIMEOUT,
114 wait_for_ready=wait_for_ready)
118 channel.unary_unary(_UNARY_UNARY).with_call(
120 timeout=test_constants.LONG_TIMEOUT,
121 wait_for_ready=wait_for_ready)
125 channel.unary_unary(_UNARY_UNARY).
future(
127 timeout=test_constants.LONG_TIMEOUT,
128 wait_for_ready=wait_for_ready).
result(
129 timeout=test_constants.LONG_TIMEOUT)
133 response_iterator = channel.unary_stream(_UNARY_STREAM).__call__(
135 timeout=test_constants.LONG_TIMEOUT,
136 wait_for_ready=wait_for_ready)
137 for _
in response_iterator:
142 channel.stream_unary(_STREAM_UNARY).__call__(
143 iter([_REQUEST] * test_constants.STREAM_LENGTH),
144 timeout=test_constants.LONG_TIMEOUT,
145 wait_for_ready=wait_for_ready)
149 channel.stream_unary(_STREAM_UNARY).with_call(
150 iter([_REQUEST] * test_constants.STREAM_LENGTH),
151 timeout=test_constants.LONG_TIMEOUT,
152 wait_for_ready=wait_for_ready)
156 channel.stream_unary(_STREAM_UNARY).
future(
157 iter([_REQUEST] * test_constants.STREAM_LENGTH),
158 timeout=test_constants.LONG_TIMEOUT,
159 wait_for_ready=wait_for_ready).
result(
160 timeout=test_constants.LONG_TIMEOUT)
164 response_iterator = channel.stream_stream(_STREAM_STREAM).__call__(
165 iter([_REQUEST] * test_constants.STREAM_LENGTH),
166 timeout=test_constants.LONG_TIMEOUT,
167 wait_for_ready=wait_for_ready)
168 for _
in response_iterator:
173 perform_unary_unary_call, perform_unary_unary_with_call,
174 perform_unary_unary_future, perform_unary_stream_call,
175 perform_stream_unary_call, perform_stream_unary_with_call,
176 perform_stream_unary_future, perform_stream_stream_call
184 fn(channel, wait_for_ready)
185 self.fail(
"The Call should fail")
186 except BaseException
as e:
187 self.assertIs(grpc.StatusCode.UNAVAILABLE, e.code())
190 for perform_call
in _ALL_CALL_CASES:
195 for perform_call
in _ALL_CALL_CASES:
199 wait_for_ready=
False)
207 unhandled_exceptions = queue.Queue()
210 host, port, sock =
get_socket(sock_options=(socket.SO_REUSEADDR,))
213 addr =
'{}:{}'.
format(host, port)
216 def wait_for_transient_failure(channel_connectivity):
217 if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
220 def test_call(perform_call):
223 channel.subscribe(wait_for_transient_failure)
224 perform_call(channel, wait_for_ready=
True)
225 except BaseException
as e:
230 unhandled_exceptions.put(e,
True)
233 for perform_call
in _ALL_CALL_CASES:
234 test_thread = threading.Thread(target=test_call,
235 args=(perform_call,))
236 test_thread.daemon =
True
237 test_thread.exception =
None
239 test_threads.append(test_thread)
243 server = test_common.test_server(reuse_port=
True)
244 server.add_generic_rpc_handlers((
_GenericHandler(weakref.proxy(self)),))
245 server.add_insecure_port(addr)
248 for test_thread
in test_threads:
254 if not unhandled_exceptions.empty():
255 raise unhandled_exceptions.get(
True)
258 if __name__ ==
'__main__':
259 logging.basicConfig(level=logging.DEBUG)
260 unittest.main(verbosity=2)