14 """Tests for Simple Stubs."""
20 _MAXIMUM_CHANNELS = 10
22 _DEFAULT_TIMEOUT = 1.0
24 os.environ[
"GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] =
"2"
25 os.environ[
"GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] =
str(_MAXIMUM_CHANNELS)
26 os.environ[
"GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"] =
str(_DEFAULT_TIMEOUT)
35 from typing
import Callable, Optional
50 _SERVER_RESPONSE_COUNT = 10
51 _CLIENT_REQUEST_COUNT = _SERVER_RESPONSE_COUNT
53 _STRESS_EPOCHS = _MAXIMUM_CHANNELS * 10
55 _UNARY_UNARY =
"/test/UnaryUnary"
56 _UNARY_STREAM =
"/test/UnaryStream"
57 _STREAM_UNARY =
"/test/StreamUnary"
58 _STREAM_STREAM =
"/test/StreamStream"
59 _BLACK_HOLE =
"/test/BlackHole"
62 @contextlib.contextmanager
63 def _env(key: str, value: str):
64 os.environ[key] = value
74 for _
in range(_SERVER_RESPONSE_COUNT):
80 for single_request
in request_iterator:
81 request = single_request
86 for request
in request_iterator:
91 event = threading.Event()
96 context.add_callback(_on_done)
97 while not event.is_set():
104 if handler_call_details.method == _UNARY_UNARY:
106 elif handler_call_details.method == _UNARY_STREAM:
108 elif handler_call_details.method == _STREAM_UNARY:
110 elif handler_call_details.method == _STREAM_STREAM:
112 elif handler_call_details.method == _BLACK_HOLE:
115 raise NotImplementedError()
119 start = datetime.datetime.now()
121 return datetime.datetime.now() - start
124 @contextlib.contextmanager
127 server = test_common.test_server()
129 if credentials
is None:
130 port = server.add_insecure_port(target)
132 port = server.add_secure_port(target, credentials)
143 """Asserts that a function caches intermediate data/state.
145 To be specific, given a function whose caching behavior is
146 deterministic in the value of a supplied string, this function asserts
147 that, on average, subsequent invocations of the function for a specific
148 string are faster than first invocations with that same string.
151 to_check: A function returning nothing, that caches values based on
152 an arbitrary supplied string.
156 for epoch
in range(_CACHE_EPOCHS):
159 for trial
in range(_CACHE_TRIALS):
161 initial_runs.append(runs[0])
162 cached_runs.extend(runs[1:])
163 average_cold =
sum((run
for run
in initial_runs),
164 datetime.timedelta()) /
len(initial_runs)
165 average_warm =
sum((run
for run
in cached_runs),
166 datetime.timedelta()) /
len(cached_runs)
167 self.assertLess(average_warm, average_cold)
170 predicate: Callable[[], bool],
172 timeout: Optional[datetime.timedelta] =
None,
173 message: Optional[Callable[[], str]] =
None) ->
None:
174 message = message
or (
lambda:
"Proposition did not evaluate to true")
175 timeout = timeout
or datetime.timedelta(seconds=10)
176 end = datetime.datetime.now() + timeout
177 while datetime.datetime.now() < end:
182 self.fail(
message() +
" after " +
str(timeout))
186 target = f
'localhost:{port}'
187 response = grpc.experimental.unary_unary(
194 self.assertEqual(_REQUEST, response)
198 target = f
'localhost:{port}'
199 response = grpc.experimental.unary_unary(
205 self.assertEqual(_REQUEST, response)
209 target = f
'localhost:{port}'
210 test_name = inspect.stack()[0][3]
211 args = (_REQUEST, target, _UNARY_UNARY)
214 def _invoke(seed: str):
215 run_kwargs = dict(kwargs)
216 run_kwargs[
"options"] = ((test_name + seed,
""),)
217 grpc.experimental.unary_unary(*args, **run_kwargs)
223 target = f
'localhost:{port}'
224 response = grpc.experimental.unary_unary(
231 )._test_only_channel_count() == 0,
233 f
"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain"
238 target = f
'localhost:{port}'
239 for i
in range(_STRESS_EPOCHS):
241 options = ((
"foo",
str(i)),)
243 grpc.experimental.unary_unary(
251 )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1,
253 f
"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain"
258 target = f
'localhost:{port}'
259 for response
in grpc.experimental.unary_stream(
264 self.assertEqual(_REQUEST, response)
269 for _
in range(_CLIENT_REQUEST_COUNT):
273 target = f
'localhost:{port}'
274 response = grpc.experimental.stream_unary(
279 self.assertEqual(_REQUEST, response)
284 for _
in range(_CLIENT_REQUEST_COUNT):
288 target = f
'localhost:{port}'
289 for response
in grpc.experimental.stream_stream(
294 self.assertEqual(_REQUEST, response)
297 _private_key = resources.private_key()
298 _certificate_chain = resources.certificate_chain()
299 _server_certs = ((_private_key, _certificate_chain),)
300 _server_host_override =
'foo.test.google.fr'
301 _test_root_certificates = resources.test_root_certificates()
302 _property_options = ((
303 'grpc.ssl_target_name_override',
304 _server_host_override,
306 cert_dir = os.path.join(os.path.dirname(resources.__file__),
308 cert_file = os.path.join(cert_dir,
"ca.pem")
309 with _env(
"GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file):
311 with _server(server_creds)
as port:
312 target = f
'localhost:{port}'
313 response = grpc.experimental.unary_unary(
314 _REQUEST, target, _UNARY_UNARY, options=_property_options)
318 target = f
'localhost:{port}'
319 response = grpc.experimental.unary_unary(_REQUEST,
323 self.assertEqual(_REQUEST, response)
327 target = f
'localhost:{port}'
328 with self.assertRaises(ValueError):
329 response = grpc.experimental.unary_unary(
339 target = f
'{addr}:{port}'
341 target, (),
None,
True,
None)
342 rpc_finished_event = threading.Event()
343 rpc_failed_event = threading.Event()
346 def _on_connectivity_changed(connectivity):
348 if connectivity
is grpc.ChannelConnectivity.TRANSIENT_FAILURE:
349 self.assertFalse(rpc_finished_event.is_set())
350 self.assertFalse(rpc_failed_event.is_set())
351 server = test_common.test_server()
352 server.add_insecure_port(target)
355 channel.unsubscribe(_on_connectivity_changed)
356 elif connectivity
in (grpc.ChannelConnectivity.IDLE,
357 grpc.ChannelConnectivity.CONNECTING):
360 self.fail(
"Encountered unknown state.")
362 channel.subscribe(_on_connectivity_changed)
366 response = grpc.experimental.unary_unary(_REQUEST,
371 rpc_finished_event.set()
372 except Exception
as e:
373 rpc_failed_event.set()
375 t = threading.Thread(target=_send_rpc)
378 self.assertFalse(rpc_failed_event.is_set())
379 self.assertTrue(rpc_finished_event.is_set())
380 if server
is not None:
385 target = f
'localhost:{port}'
387 response = grpc.experimental.unary_unary(_REQUEST,
392 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
396 not_present = object()
397 wait_for_ready_values = [
True, not_present]
398 timeout_values = [0.5, not_present]
400 for wait_for_ready
in wait_for_ready_values:
401 for timeout
in timeout_values:
403 if timeout
is not not_present:
404 case[
"timeout"] = timeout
405 if wait_for_ready
is not not_present:
406 case[
"wait_for_ready"] = wait_for_ready
410 with self.subTest(**case):
414 if __name__ ==
"__main__":
415 logging.basicConfig(level=logging.INFO)
416 unittest.main(verbosity=2)