_contextvars_propagation_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test of propagation of contextvars to AuthMetadataPlugin threads.."""
15 
16 import contextlib
17 import logging
18 import os
19 import sys
20 import threading
21 import unittest
22 
23 import grpc
24 from six.moves import queue
25 
26 from tests.unit import test_common
27 
28 _UNARY_UNARY = "/test/UnaryUnary"
29 _REQUEST = b"0000"
30 
31 
32 def _unary_unary_handler(request, context):
33  return request
34 
35 
37  try:
38  import contextvars
39  return True
40  except ImportError:
41  return False
42 
43 
45 
46  def service(self, handler_call_details):
47  if handler_call_details.method == _UNARY_UNARY:
48  return grpc.unary_unary_rpc_method_handler(_unary_unary_handler)
49  else:
50  raise NotImplementedError()
51 
52 
53 @contextlib.contextmanager
54 def _server():
55  try:
56  server = test_common.test_server()
57  target = 'localhost:0'
58  port = server.add_insecure_port(target)
59  server.add_generic_rpc_handlers((_GenericHandler(),))
60  server.start()
61  yield port
62  finally:
63  server.stop(None)
64 
65 
67  import contextvars
68 
69  _EXPECTED_VALUE = 24601
70  test_var = contextvars.ContextVar("test_var", default=None)
71 
73  test_var.set(_EXPECTED_VALUE)
74 
76 
77  def __call__(self, context, callback):
78  if test_var.get(
79  ) != _EXPECTED_VALUE and not test_common.running_under_gevent():
80  # contextvars do not work under gevent, but the rest of this
81  # test is still valuable as a test of concurrent runs of the
82  # metadata credentials code path.
83  raise AssertionError("{} != {}".format(test_var.get(),
84  _EXPECTED_VALUE))
85  callback((), None)
86 
87  def assert_called(self, test):
88  test.assertTrue(self._invoked)
89  test.assertEqual(_EXPECTED_VALUE, self._recorded_value)
90 
91 else:
92 
94  pass
95 
96  class TestCallCredentials(grpc.AuthMetadataPlugin):
97 
98  def __call__(self, context, callback):
99  callback((), None)
100 
101 
102 # TODO(https://github.com/grpc/grpc/issues/22257)
103 @unittest.skipIf(os.name == "nt", "LocalCredentials not supported on Windows.")
104 class ContextVarsPropagationTest(unittest.TestCase):
105 
108  with _server() as port:
109  target = "localhost:{}".format(port)
110  local_credentials = grpc.local_channel_credentials()
111  test_call_credentials = TestCallCredentials()
112  call_credentials = grpc.metadata_call_credentials(
113  test_call_credentials, "test call credentials")
114  composite_credentials = grpc.composite_channel_credentials(
115  local_credentials, call_credentials)
116  with grpc.secure_channel(target, composite_credentials) as channel:
117  stub = channel.unary_unary(_UNARY_UNARY)
118  response = stub(_REQUEST, wait_for_ready=True)
119  self.assertEqual(_REQUEST, response)
120 
122  _THREAD_COUNT = 32
123  _RPC_COUNT = 32
124 
126  with _server() as port:
127  target = "localhost:{}".format(port)
128  local_credentials = grpc.local_channel_credentials()
129  test_call_credentials = TestCallCredentials()
130  call_credentials = grpc.metadata_call_credentials(
131  test_call_credentials, "test call credentials")
132  composite_credentials = grpc.composite_channel_credentials(
133  local_credentials, call_credentials)
134  wait_group = test_common.WaitGroup(_THREAD_COUNT)
135 
136  def _run_on_thread(exception_queue):
137  try:
138  with grpc.secure_channel(target,
139  composite_credentials) as channel:
140  stub = channel.unary_unary(_UNARY_UNARY)
141  wait_group.done()
142  wait_group.wait()
143  for i in range(_RPC_COUNT):
144  response = stub(_REQUEST, wait_for_ready=True)
145  self.assertEqual(_REQUEST, response)
146  except Exception as e: # pylint: disable=broad-except
147  exception_queue.put(e)
148 
149  threads = []
150 
151  for _ in range(_THREAD_COUNT):
152  q = queue.Queue()
153  thread = threading.Thread(target=_run_on_thread, args=(q,))
154  thread.setDaemon(True)
155  thread.start()
156  threads.append((thread, q))
157 
158  for thread, q in threads:
159  thread.join()
160  if not q.empty():
161  raise q.get()
162 
163 
164 if __name__ == '__main__':
165  logging.basicConfig()
166  unittest.main(verbosity=2)
tests.unit._contextvars_propagation_test._GenericHandler
Definition: _contextvars_propagation_test.py:44
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
http2_test_server.format
format
Definition: http2_test_server.py:118
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
grpc.AuthMetadataPlugin
Definition: src/python/grpcio/grpc/__init__.py:617
tests.unit._contextvars_propagation_test.ContextVarsPropagationTest.test_propagation_to_auth_plugin
def test_propagation_to_auth_plugin(self)
Definition: _contextvars_propagation_test.py:106
async_greeter_client.stub
stub
Definition: hellostreamingworld/async_greeter_client.py:26
tests.unit._contextvars_propagation_test._unary_unary_handler
def _unary_unary_handler(request, context)
Definition: _contextvars_propagation_test.py:32
tests.unit._contextvars_propagation_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _contextvars_propagation_test.py:46
grpc.composite_channel_credentials
def composite_channel_credentials(channel_credentials, *call_credentials)
Definition: src/python/grpcio/grpc/__init__.py:1691
grpc.metadata_call_credentials
def metadata_call_credentials(metadata_plugin, name=None)
Definition: src/python/grpcio/grpc/__init__.py:1644
tests.unit._contextvars_propagation_test.TestCallCredentials.assert_called
def assert_called(self, test)
Definition: _contextvars_propagation_test.py:87
tests.unit._contextvars_propagation_test.ContextVarsPropagationTest.test_concurrent_propagation
def test_concurrent_propagation(self)
Definition: _contextvars_propagation_test.py:121
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
grpc.local_channel_credentials
def local_channel_credentials(local_connect_type=LocalConnectionType.LOCAL_TCP)
Definition: src/python/grpcio/grpc/__init__.py:1833
tests.unit._contextvars_propagation_test.ContextVarsPropagationTest
Definition: _contextvars_propagation_test.py:104
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit.test_common.WaitGroup
Definition: test_common.py:112
tests.unit._contextvars_propagation_test.set_up_expected_context
def set_up_expected_context()
Definition: _contextvars_propagation_test.py:72
tests.unit._contextvars_propagation_test._server
def _server()
Definition: _contextvars_propagation_test.py:54
tests.unit._contextvars_propagation_test.TestCallCredentials
Definition: _contextvars_propagation_test.py:75
tests.unit._contextvars_propagation_test.TestCallCredentials.__call__
def __call__(self, context, callback)
Definition: _contextvars_propagation_test.py:77
tests.unit._contextvars_propagation_test.contextvars_supported
def contextvars_supported()
Definition: _contextvars_propagation_test.py:36
grpc.secure_channel
def secure_channel(target, credentials, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1982


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:38