_simple_stubs_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests for Simple Stubs."""
15 
16 # TODO(https://github.com/grpc/grpc/issues/21965): Run under setuptools.
17 
18 import os
19 
20 _MAXIMUM_CHANNELS = 10
21 
22 _DEFAULT_TIMEOUT = 1.0
23 
24 os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "2"
25 os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS)
26 os.environ["GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"] = str(_DEFAULT_TIMEOUT)
27 
28 import contextlib
29 import datetime
30 import inspect
31 import logging
32 import sys
33 import threading
34 import time
35 from typing import Callable, Optional
36 import unittest
37 
38 import grpc
39 import grpc.experimental
40 
41 from tests.unit import resources
42 from tests.unit import test_common
43 from tests.unit.framework.common import get_socket
44 
45 _REQUEST = b"0000"
46 
47 _CACHE_EPOCHS = 8
48 _CACHE_TRIALS = 6
49 
50 _SERVER_RESPONSE_COUNT = 10
51 _CLIENT_REQUEST_COUNT = _SERVER_RESPONSE_COUNT
52 
53 _STRESS_EPOCHS = _MAXIMUM_CHANNELS * 10
54 
55 _UNARY_UNARY = "/test/UnaryUnary"
56 _UNARY_STREAM = "/test/UnaryStream"
57 _STREAM_UNARY = "/test/StreamUnary"
58 _STREAM_STREAM = "/test/StreamStream"
59 _BLACK_HOLE = "/test/BlackHole"
60 
61 
62 @contextlib.contextmanager
63 def _env(key: str, value: str):
64  os.environ[key] = value
65  yield
66  del os.environ[key]
67 
68 
69 def _unary_unary_handler(request, context):
70  return request
71 
72 
73 def _unary_stream_handler(request, context):
74  for _ in range(_SERVER_RESPONSE_COUNT):
75  yield request
76 
77 
78 def _stream_unary_handler(request_iterator, context):
79  request = None
80  for single_request in request_iterator:
81  request = single_request
82  return request
83 
84 
85 def _stream_stream_handler(request_iterator, context):
86  for request in request_iterator:
87  yield request
88 
89 
90 def _black_hole_handler(request, context):
91  event = threading.Event()
92 
93  def _on_done():
94  event.set()
95 
96  context.add_callback(_on_done)
97  while not event.is_set():
98  time.sleep(0.1)
99 
100 
102 
103  def service(self, handler_call_details):
104  if handler_call_details.method == _UNARY_UNARY:
105  return grpc.unary_unary_rpc_method_handler(_unary_unary_handler)
106  elif handler_call_details.method == _UNARY_STREAM:
107  return grpc.unary_stream_rpc_method_handler(_unary_stream_handler)
108  elif handler_call_details.method == _STREAM_UNARY:
109  return grpc.stream_unary_rpc_method_handler(_stream_unary_handler)
110  elif handler_call_details.method == _STREAM_STREAM:
111  return grpc.stream_stream_rpc_method_handler(_stream_stream_handler)
112  elif handler_call_details.method == _BLACK_HOLE:
113  return grpc.unary_unary_rpc_method_handler(_black_hole_handler)
114  else:
115  raise NotImplementedError()
116 
117 
118 def _time_invocation(to_time: Callable[[], None]) -> datetime.timedelta:
119  start = datetime.datetime.now()
120  to_time()
121  return datetime.datetime.now() - start
122 
123 
124 @contextlib.contextmanager
125 def _server(credentials: Optional[grpc.ServerCredentials]):
126  try:
127  server = test_common.test_server()
128  target = '[::]:0'
129  if credentials is None:
130  port = server.add_insecure_port(target)
131  else:
132  port = server.add_secure_port(target, credentials)
133  server.add_generic_rpc_handlers((_GenericHandler(),))
134  server.start()
135  yield port
136  finally:
137  server.stop(None)
138 
139 
140 class SimpleStubsTest(unittest.TestCase):
141 
142  def assert_cached(self, to_check: Callable[[str], None]) -> None:
143  """Asserts that a function caches intermediate data/state.
144 
145  To be specific, given a function whose caching behavior is
146  deterministic in the value of a supplied string, this function asserts
147  that, on average, subsequent invocations of the function for a specific
148  string are faster than first invocations with that same string.
149 
150  Args:
151  to_check: A function returning nothing, that caches values based on
152  an arbitrary supplied string.
153  """
154  initial_runs = []
155  cached_runs = []
156  for epoch in range(_CACHE_EPOCHS):
157  runs = []
158  text = str(epoch)
159  for trial in range(_CACHE_TRIALS):
160  runs.append(_time_invocation(lambda: to_check(text)))
161  initial_runs.append(runs[0])
162  cached_runs.extend(runs[1:])
163  average_cold = sum((run for run in initial_runs),
164  datetime.timedelta()) / len(initial_runs)
165  average_warm = sum((run for run in cached_runs),
166  datetime.timedelta()) / len(cached_runs)
167  self.assertLess(average_warm, average_cold)
168 
170  predicate: Callable[[], bool],
171  *,
172  timeout: Optional[datetime.timedelta] = None,
173  message: Optional[Callable[[], str]] = None) -> None:
174  message = message or (lambda: "Proposition did not evaluate to true")
175  timeout = timeout or datetime.timedelta(seconds=10)
176  end = datetime.datetime.now() + timeout
177  while datetime.datetime.now() < end:
178  if predicate():
179  break
180  time.sleep(0.5)
181  else:
182  self.fail(message() + " after " + str(timeout))
183 
185  with _server(None) as port:
186  target = f'localhost:{port}'
187  response = grpc.experimental.unary_unary(
188  _REQUEST,
189  target,
190  _UNARY_UNARY,
191  channel_credentials=grpc.experimental.
193  timeout=None)
194  self.assertEqual(_REQUEST, response)
195 
197  with _server(grpc.local_server_credentials()) as port:
198  target = f'localhost:{port}'
199  response = grpc.experimental.unary_unary(
200  _REQUEST,
201  target,
202  _UNARY_UNARY,
203  channel_credentials=grpc.local_channel_credentials(),
204  timeout=None)
205  self.assertEqual(_REQUEST, response)
206 
208  with _server(grpc.local_server_credentials()) as port:
209  target = f'localhost:{port}'
210  test_name = inspect.stack()[0][3]
211  args = (_REQUEST, target, _UNARY_UNARY)
212  kwargs = {"channel_credentials": grpc.local_channel_credentials()}
213 
214  def _invoke(seed: str):
215  run_kwargs = dict(kwargs)
216  run_kwargs["options"] = ((test_name + seed, ""),)
217  grpc.experimental.unary_unary(*args, **run_kwargs)
218 
219  self.assert_cached(_invoke)
220 
222  with _server(grpc.local_server_credentials()) as port:
223  target = f'localhost:{port}'
224  response = grpc.experimental.unary_unary(
225  _REQUEST,
226  target,
227  _UNARY_UNARY,
228  channel_credentials=grpc.local_channel_credentials())
229  self.assert_eventually(
231  )._test_only_channel_count() == 0,
232  message=lambda:
233  f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain"
234  )
235 
237  with _server(grpc.local_server_credentials()) as port:
238  target = f'localhost:{port}'
239  for i in range(_STRESS_EPOCHS):
240  # Ensure we get a new channel each time.
241  options = (("foo", str(i)),)
242  # Send messages at full blast.
243  grpc.experimental.unary_unary(
244  _REQUEST,
245  target,
246  _UNARY_UNARY,
247  options=options,
248  channel_credentials=grpc.local_channel_credentials())
249  self.assert_eventually(
251  )._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1,
252  message=lambda:
253  f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain"
254  )
255 
256  def test_unary_stream(self):
257  with _server(grpc.local_server_credentials()) as port:
258  target = f'localhost:{port}'
259  for response in grpc.experimental.unary_stream(
260  _REQUEST,
261  target,
262  _UNARY_STREAM,
263  channel_credentials=grpc.local_channel_credentials()):
264  self.assertEqual(_REQUEST, response)
265 
266  def test_stream_unary(self):
267 
268  def request_iter():
269  for _ in range(_CLIENT_REQUEST_COUNT):
270  yield _REQUEST
271 
272  with _server(grpc.local_server_credentials()) as port:
273  target = f'localhost:{port}'
274  response = grpc.experimental.stream_unary(
275  request_iter(),
276  target,
277  _STREAM_UNARY,
278  channel_credentials=grpc.local_channel_credentials())
279  self.assertEqual(_REQUEST, response)
280 
282 
283  def request_iter():
284  for _ in range(_CLIENT_REQUEST_COUNT):
285  yield _REQUEST
286 
287  with _server(grpc.local_server_credentials()) as port:
288  target = f'localhost:{port}'
289  for response in grpc.experimental.stream_stream(
290  request_iter(),
291  target,
292  _STREAM_STREAM,
293  channel_credentials=grpc.local_channel_credentials()):
294  self.assertEqual(_REQUEST, response)
295 
296  def test_default_ssl(self):
297  _private_key = resources.private_key()
298  _certificate_chain = resources.certificate_chain()
299  _server_certs = ((_private_key, _certificate_chain),)
300  _server_host_override = 'foo.test.google.fr'
301  _test_root_certificates = resources.test_root_certificates()
302  _property_options = ((
303  'grpc.ssl_target_name_override',
304  _server_host_override,
305  ),)
306  cert_dir = os.path.join(os.path.dirname(resources.__file__),
307  "credentials")
308  cert_file = os.path.join(cert_dir, "ca.pem")
309  with _env("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", cert_file):
310  server_creds = grpc.ssl_server_credentials(_server_certs)
311  with _server(server_creds) as port:
312  target = f'localhost:{port}'
313  response = grpc.experimental.unary_unary(
314  _REQUEST, target, _UNARY_UNARY, options=_property_options)
315 
317  with _server(None) as port:
318  target = f'localhost:{port}'
319  response = grpc.experimental.unary_unary(_REQUEST,
320  target,
321  _UNARY_UNARY,
322  insecure=True)
323  self.assertEqual(_REQUEST, response)
324 
326  with _server(None) as port:
327  target = f'localhost:{port}'
328  with self.assertRaises(ValueError):
329  response = grpc.experimental.unary_unary(
330  _REQUEST,
331  target,
332  _UNARY_UNARY,
333  insecure=True,
334  channel_credentials=grpc.local_channel_credentials())
335 
337  addr, port, sock = get_socket()
338  sock.close()
339  target = f'{addr}:{port}'
340  channel = grpc._simple_stubs.ChannelCache.get().get_channel(
341  target, (), None, True, None)
342  rpc_finished_event = threading.Event()
343  rpc_failed_event = threading.Event()
344  server = None
345 
346  def _on_connectivity_changed(connectivity):
347  nonlocal server
348  if connectivity is grpc.ChannelConnectivity.TRANSIENT_FAILURE:
349  self.assertFalse(rpc_finished_event.is_set())
350  self.assertFalse(rpc_failed_event.is_set())
351  server = test_common.test_server()
352  server.add_insecure_port(target)
353  server.add_generic_rpc_handlers((_GenericHandler(),))
354  server.start()
355  channel.unsubscribe(_on_connectivity_changed)
356  elif connectivity in (grpc.ChannelConnectivity.IDLE,
357  grpc.ChannelConnectivity.CONNECTING):
358  pass
359  else:
360  self.fail("Encountered unknown state.")
361 
362  channel.subscribe(_on_connectivity_changed)
363 
364  def _send_rpc():
365  try:
366  response = grpc.experimental.unary_unary(_REQUEST,
367  target,
368  _UNARY_UNARY,
369  timeout=None,
370  insecure=True)
371  rpc_finished_event.set()
372  except Exception as e:
373  rpc_failed_event.set()
374 
375  t = threading.Thread(target=_send_rpc)
376  t.start()
377  t.join()
378  self.assertFalse(rpc_failed_event.is_set())
379  self.assertTrue(rpc_finished_event.is_set())
380  if server is not None:
381  server.stop(None)
382 
383  def assert_times_out(self, invocation_args):
384  with _server(None) as port:
385  target = f'localhost:{port}'
386  with self.assertRaises(grpc.RpcError) as cm:
387  response = grpc.experimental.unary_unary(_REQUEST,
388  target,
389  _BLACK_HOLE,
390  insecure=True,
391  **invocation_args)
392  self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
393  cm.exception.code())
394 
396  not_present = object()
397  wait_for_ready_values = [True, not_present]
398  timeout_values = [0.5, not_present]
399  cases = []
400  for wait_for_ready in wait_for_ready_values:
401  for timeout in timeout_values:
402  case = {}
403  if timeout is not not_present:
404  case["timeout"] = timeout
405  if wait_for_ready is not not_present:
406  case["wait_for_ready"] = wait_for_ready
407  cases.append(case)
408 
409  for case in cases:
410  with self.subTest(**case):
411  self.assert_times_out(case)
412 
413 
414 if __name__ == "__main__":
415  logging.basicConfig(level=logging.INFO)
416  unittest.main(verbosity=2)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
grpc::experimental.insecure_channel_credentials
def insecure_channel_credentials()
Definition: src/python/grpcio/grpc/experimental/__init__.py:51
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_default_timeout
def test_default_timeout(self)
Definition: _simple_stubs_test.py:395
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_total_channels_enforced
def test_total_channels_enforced(self)
Definition: _simple_stubs_test.py:236
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_stream_unary
def test_stream_unary(self)
Definition: _simple_stubs_test.py:266
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_unary_unary_insecure
def test_unary_unary_insecure(self)
Definition: _simple_stubs_test.py:184
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
grpc.stream_unary_rpc_method_handler
def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1550
grpc::testing::sum
double sum(const T &container, F functor)
Definition: test/cpp/qps/stats.h:30
tests_py3_only.unit._simple_stubs_test._unary_stream_handler
def _unary_stream_handler(request, context)
Definition: _simple_stubs_test.py:73
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_unary_stream
def test_unary_stream(self)
Definition: _simple_stubs_test.py:256
grpc.unary_stream_rpc_method_handler
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1530
tests_py3_only.unit._simple_stubs_test._stream_unary_handler
def _stream_unary_handler(request_iterator, context)
Definition: _simple_stubs_test.py:78
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
grpc.local_server_credentials
def local_server_credentials(local_connect_type=LocalConnectionType.LOCAL_TCP)
Definition: src/python/grpcio/grpc/__init__.py:1863
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_channels_cached
def test_channels_cached(self)
Definition: _simple_stubs_test.py:207
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_unary_unary_secure
def test_unary_unary_secure(self)
Definition: _simple_stubs_test.py:196
tests_py3_only.unit._simple_stubs_test._time_invocation
datetime.timedelta _time_invocation(Callable[[], None] to_time)
Definition: _simple_stubs_test.py:118
message
char * message
Definition: libuv/docs/code/tty-gravity/main.c:12
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_channels_evicted
def test_channels_evicted(self)
Definition: _simple_stubs_test.py:221
tests_py3_only.unit._simple_stubs_test._server
def _server(Optional[grpc.ServerCredentials] credentials)
Definition: _simple_stubs_test.py:125
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.assert_cached
None assert_cached(self, Callable[[str], None] to_check)
Definition: _simple_stubs_test.py:142
tests_py3_only.unit._simple_stubs_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _simple_stubs_test.py:103
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_stream_stream
def test_stream_stream(self)
Definition: _simple_stubs_test.py:281
tests_py3_only.unit._simple_stubs_test._unary_unary_handler
def _unary_unary_handler(request, context)
Definition: _simple_stubs_test.py:69
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_default_wait_for_ready
def test_default_wait_for_ready(self)
Definition: _simple_stubs_test.py:336
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_insecure_sugar_mutually_exclusive
def test_insecure_sugar_mutually_exclusive(self)
Definition: _simple_stubs_test.py:325
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest
Definition: _simple_stubs_test.py:140
grpc::ServerCredentials
Wrapper around grpc_server_credentials, a way to authenticate a server.
Definition: include/grpcpp/security/server_credentials.h:76
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_default_ssl
def test_default_ssl(self)
Definition: _simple_stubs_test.py:296
tests.unit.framework.common.get_socket
def get_socket(bind_address='localhost', port=0, listen=True, sock_options=_DEFAULT_SOCK_OPTIONS)
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:26
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
grpc.local_channel_credentials
def local_channel_credentials(local_connect_type=LocalConnectionType.LOCAL_TCP)
Definition: src/python/grpcio/grpc/__init__.py:1833
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.test_insecure_sugar
def test_insecure_sugar(self)
Definition: _simple_stubs_test.py:316
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.assert_eventually
None assert_eventually(self, Callable[[], bool] predicate, *Optional[datetime.timedelta] timeout=None, Optional[Callable[[], str]] message=None)
Definition: _simple_stubs_test.py:169
tests_py3_only.unit._simple_stubs_test._black_hole_handler
def _black_hole_handler(request, context)
Definition: _simple_stubs_test.py:90
tests_py3_only.unit._simple_stubs_test._env
def _env(str key, str value)
Definition: _simple_stubs_test.py:63
grpc._simple_stubs.ChannelCache.get
def get()
Definition: _simple_stubs.py:89
grpc.stream_stream_rpc_method_handler
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1570
tests_py3_only.unit._simple_stubs_test._stream_stream_handler
def _stream_stream_handler(request_iterator, context)
Definition: _simple_stubs_test.py:85
tests_py3_only.unit._simple_stubs_test.SimpleStubsTest.assert_times_out
def assert_times_out(self, invocation_args)
Definition: _simple_stubs_test.py:383
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_py3_only.unit._simple_stubs_test._GenericHandler
Definition: _simple_stubs_test.py:101


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:39