metadata_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior around the metadata mechanism."""
15 
16 import asyncio
17 import logging
18 import platform
19 import random
20 import unittest
21 
22 import grpc
23 from grpc.experimental import aio
24 
25 from tests_aio.unit import _common
26 from tests_aio.unit._test_base import AioTestBase
27 
28 _TEST_CLIENT_TO_SERVER = '/test/TestClientToServer'
29 _TEST_SERVER_TO_CLIENT = '/test/TestServerToClient'
30 _TEST_TRAILING_METADATA = '/test/TestTrailingMetadata'
31 _TEST_ECHO_INITIAL_METADATA = '/test/TestEchoInitialMetadata'
32 _TEST_GENERIC_HANDLER = '/test/TestGenericHandler'
33 _TEST_UNARY_STREAM = '/test/TestUnaryStream'
34 _TEST_STREAM_UNARY = '/test/TestStreamUnary'
35 _TEST_STREAM_STREAM = '/test/TestStreamStream'
36 _TEST_INSPECT_CONTEXT = '/test/TestInspectContext'
37 
38 _REQUEST = b'\x00\x00\x00'
39 _RESPONSE = b'\x01\x01\x01'
40 
41 _INITIAL_METADATA_FROM_CLIENT_TO_SERVER = aio.Metadata(
42  ('client-to-server', 'question'),
43  ('client-to-server-bin', b'\x07\x07\x07'),
44 )
45 _INITIAL_METADATA_FROM_SERVER_TO_CLIENT = aio.Metadata(
46  ('server-to-client', 'answer'),
47  ('server-to-client-bin', b'\x06\x06\x06'),
48 )
49 _TRAILING_METADATA = aio.Metadata(
50  ('a-trailing-metadata', 'stack-trace'),
51  ('a-trailing-metadata-bin', b'\x05\x05\x05'),
52 )
53 _INITIAL_METADATA_FOR_GENERIC_HANDLER = aio.Metadata(
54  ('a-must-have-key', 'secret'),)
55 
56 _INVALID_METADATA_TEST_CASES = (
57  (
58  TypeError,
59  ((42, 42),),
60  ),
61  (
62  TypeError,
63  (({}, {}),),
64  ),
65  (
66  TypeError,
67  ((None, {}),),
68  ),
69  (
70  TypeError,
71  (({}, {}),),
72  ),
73  (
74  TypeError,
75  (('normal', object()),),
76  ),
77 )
78 
79 _NON_OK_CODE = grpc.StatusCode.NOT_FOUND
80 _DETAILS = 'Test details!'
81 
82 
84 
85  def __init__(self):
86  self._routing_table = {
87  _TEST_CLIENT_TO_SERVER:
89  ),
90  _TEST_SERVER_TO_CLIENT:
92  ),
93  _TEST_TRAILING_METADATA:
95  ),
96  _TEST_UNARY_STREAM:
98  _TEST_STREAM_UNARY:
100  _TEST_STREAM_STREAM:
102  _TEST_INSPECT_CONTEXT:
104  }
105 
106  @staticmethod
107  async def _test_client_to_server(request, context):
108  assert _REQUEST == request
109  assert _common.seen_metadata(_INITIAL_METADATA_FROM_CLIENT_TO_SERVER,
110  context.invocation_metadata())
111  return _RESPONSE
112 
113  @staticmethod
114  async def _test_server_to_client(request, context):
115  assert _REQUEST == request
116  await context.send_initial_metadata(
117  _INITIAL_METADATA_FROM_SERVER_TO_CLIENT)
118  return _RESPONSE
119 
120  @staticmethod
121  async def _test_trailing_metadata(request, context):
122  assert _REQUEST == request
123  context.set_trailing_metadata(_TRAILING_METADATA)
124  return _RESPONSE
125 
126  @staticmethod
127  async def _test_unary_stream(request, context):
128  assert _REQUEST == request
129  assert _common.seen_metadata(_INITIAL_METADATA_FROM_CLIENT_TO_SERVER,
130  context.invocation_metadata())
131  await context.send_initial_metadata(
132  _INITIAL_METADATA_FROM_SERVER_TO_CLIENT)
133  yield _RESPONSE
134  context.set_trailing_metadata(_TRAILING_METADATA)
135 
136  @staticmethod
137  async def _test_stream_unary(request_iterator, context):
138  assert _common.seen_metadata(_INITIAL_METADATA_FROM_CLIENT_TO_SERVER,
139  context.invocation_metadata())
140  await context.send_initial_metadata(
141  _INITIAL_METADATA_FROM_SERVER_TO_CLIENT)
142 
143  async for request in request_iterator:
144  assert _REQUEST == request
145 
146  context.set_trailing_metadata(_TRAILING_METADATA)
147  return _RESPONSE
148 
149  @staticmethod
150  async def _test_stream_stream(request_iterator, context):
151  assert _common.seen_metadata(_INITIAL_METADATA_FROM_CLIENT_TO_SERVER,
152  context.invocation_metadata())
153  await context.send_initial_metadata(
154  _INITIAL_METADATA_FROM_SERVER_TO_CLIENT)
155 
156  async for request in request_iterator:
157  assert _REQUEST == request
158 
159  yield _RESPONSE
160  context.set_trailing_metadata(_TRAILING_METADATA)
161 
162  @staticmethod
163  async def _test_inspect_context(request, context):
164  assert _REQUEST == request
165  context.set_code(_NON_OK_CODE)
166  context.set_details(_DETAILS)
167  context.set_trailing_metadata(_TRAILING_METADATA)
168 
169  # ensure that we can read back the data we set on the context
170  assert context.get_code() == _NON_OK_CODE
171  assert context.get_details() == _DETAILS
172  assert context.get_trailing_metadata() == _TRAILING_METADATA
173  return _RESPONSE
174 
175  def service(self, handler_call_details):
176  return self._routing_table.get(handler_call_details.method)
177 
178 
180 
181  @staticmethod
182  async def _method(request, unused_context):
183  assert _REQUEST == request
184  return _RESPONSE
185 
186  def service(self, handler_call_details):
187  assert _common.seen_metadata(_INITIAL_METADATA_FOR_GENERIC_HANDLER,
188  handler_call_details.invocation_metadata)
190 
191 
192 async def _start_test_server():
193  server = aio.server()
194  port = server.add_insecure_port('[::]:0')
195  server.add_generic_rpc_handlers((
198  ))
199  await server.start()
200  return 'localhost:%d' % port, server
201 
202 
204 
205  async def setUp(self):
206  address, self._server = await _start_test_server()
207  self._client = aio.insecure_channel(address)
208 
209  async def tearDown(self):
210  await self._client.close()
211  await self._server.stop(None)
212 
213  async def test_from_client_to_server(self):
214  multicallable = self._client.unary_unary(_TEST_CLIENT_TO_SERVER)
215  call = multicallable(_REQUEST,
216  metadata=_INITIAL_METADATA_FROM_CLIENT_TO_SERVER)
217  self.assertEqual(_RESPONSE, await call)
218  self.assertEqual(grpc.StatusCode.OK, await call.code())
219 
220  async def test_from_server_to_client(self):
221  multicallable = self._client.unary_unary(_TEST_SERVER_TO_CLIENT)
222  call = multicallable(_REQUEST)
223 
224  self.assertEqual(_INITIAL_METADATA_FROM_SERVER_TO_CLIENT, await
225  call.initial_metadata())
226  self.assertEqual(_RESPONSE, await call)
227  self.assertEqual(grpc.StatusCode.OK, await call.code())
228 
229  async def test_trailing_metadata(self):
230  multicallable = self._client.unary_unary(_TEST_TRAILING_METADATA)
231  call = multicallable(_REQUEST)
232  self.assertEqual(_TRAILING_METADATA, await call.trailing_metadata())
233  self.assertEqual(_RESPONSE, await call)
234  self.assertEqual(grpc.StatusCode.OK, await call.code())
235 
237  multicallable = self._client.unary_unary(_TEST_CLIENT_TO_SERVER)
238  call = multicallable(
239  _REQUEST, metadata=list(_INITIAL_METADATA_FROM_CLIENT_TO_SERVER)) # pytype: disable=wrong-arg-types
240  self.assertEqual(_RESPONSE, await call)
241  self.assertEqual(grpc.StatusCode.OK, await call.code())
242 
243  @unittest.skipIf(platform.system() == 'Windows',
244  'https://github.com/grpc/grpc/issues/21943')
245  async def test_invalid_metadata(self):
246  multicallable = self._client.unary_unary(_TEST_CLIENT_TO_SERVER)
247  for exception_type, metadata in _INVALID_METADATA_TEST_CASES:
248  with self.subTest(metadata=metadata):
249  with self.assertRaises(exception_type):
250  call = multicallable(_REQUEST, metadata=metadata)
251  await call
252 
253  async def test_generic_handler(self):
254  multicallable = self._client.unary_unary(_TEST_GENERIC_HANDLER)
255  call = multicallable(_REQUEST,
256  metadata=_INITIAL_METADATA_FOR_GENERIC_HANDLER)
257  self.assertEqual(_RESPONSE, await call)
258  self.assertEqual(grpc.StatusCode.OK, await call.code())
259 
260  async def test_unary_stream(self):
261  multicallable = self._client.unary_stream(_TEST_UNARY_STREAM)
262  call = multicallable(_REQUEST,
263  metadata=_INITIAL_METADATA_FROM_CLIENT_TO_SERVER)
264 
265  self.assertTrue(
266  _common.seen_metadata(_INITIAL_METADATA_FROM_SERVER_TO_CLIENT, await
267  call.initial_metadata()))
268 
269  self.assertSequenceEqual([_RESPONSE],
270  [request async for request in call])
271 
272  self.assertEqual(_TRAILING_METADATA, await call.trailing_metadata())
273  self.assertEqual(grpc.StatusCode.OK, await call.code())
274 
275  async def test_stream_unary(self):
276  multicallable = self._client.stream_unary(_TEST_STREAM_UNARY)
277  call = multicallable(metadata=_INITIAL_METADATA_FROM_CLIENT_TO_SERVER)
278  await call.write(_REQUEST)
279  await call.done_writing()
280 
281  self.assertTrue(
282  _common.seen_metadata(_INITIAL_METADATA_FROM_SERVER_TO_CLIENT, await
283  call.initial_metadata()))
284  self.assertEqual(_RESPONSE, await call)
285 
286  self.assertEqual(_TRAILING_METADATA, await call.trailing_metadata())
287  self.assertEqual(grpc.StatusCode.OK, await call.code())
288 
289  async def test_stream_stream(self):
290  multicallable = self._client.stream_stream(_TEST_STREAM_STREAM)
291  call = multicallable(metadata=_INITIAL_METADATA_FROM_CLIENT_TO_SERVER)
292  await call.write(_REQUEST)
293  await call.done_writing()
294 
295  self.assertTrue(
296  _common.seen_metadata(_INITIAL_METADATA_FROM_SERVER_TO_CLIENT, await
297  call.initial_metadata()))
298  self.assertSequenceEqual([_RESPONSE],
299  [request async for request in call])
300  self.assertEqual(_TRAILING_METADATA, await call.trailing_metadata())
301  self.assertEqual(grpc.StatusCode.OK, await call.code())
302 
304  metadata_obj = aio.Metadata(('key', '42'), ('key-2', 'value'))
305  self.assertEqual(metadata_obj, tuple(metadata_obj))
306  self.assertEqual(tuple(metadata_obj), metadata_obj)
307 
308  expected_sum = tuple(metadata_obj) + (('third', '3'),)
309  self.assertEqual(expected_sum, metadata_obj + (('third', '3'),))
310  self.assertEqual(expected_sum, metadata_obj + aio.Metadata(
311  ('third', '3')))
312 
313  async def test_inspect_context(self):
314  multicallable = self._client.unary_unary(_TEST_INSPECT_CONTEXT)
315  call = multicallable(_REQUEST)
316  with self.assertRaises(grpc.RpcError) as exc_data:
317  await call
318 
319  err = exc_data.exception
320  self.assertEqual(_NON_OK_CODE, err.code())
321 
322 
323 if __name__ == '__main__':
324  logging.basicConfig(level=logging.DEBUG)
325  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_server_to_client
def _test_server_to_client(request, context)
Definition: metadata_test.py:114
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_inspect_context
def _test_inspect_context(request, context)
Definition: metadata_test.py:163
tests_aio.unit.metadata_test.TestMetadata._client
_client
Definition: metadata_test.py:207
tests_aio.unit.metadata_test.TestMetadata.test_trailing_metadata
def test_trailing_metadata(self)
Definition: metadata_test.py:229
tests_aio.unit.metadata_test._start_test_server
def _start_test_server()
Definition: metadata_test.py:192
grpc.stream_unary_rpc_method_handler
def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1550
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.metadata_test._TestGenericHandlerItself.service
def service(self, handler_call_details)
Definition: metadata_test.py:186
grpc.unary_stream_rpc_method_handler
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1530
tests_aio.unit.metadata_test.TestMetadata.test_generic_handler
def test_generic_handler(self)
Definition: metadata_test.py:253
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_client_to_server
def _test_client_to_server(request, context)
Definition: metadata_test.py:107
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_trailing_metadata
def _test_trailing_metadata(request, context)
Definition: metadata_test.py:121
tests_aio.unit.metadata_test._TestGenericHandlerItself
Definition: metadata_test.py:179
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.metadata_test.TestMetadata.setUp
def setUp(self)
Definition: metadata_test.py:205
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_stream_unary
def _test_stream_unary(request_iterator, context)
Definition: metadata_test.py:137
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
close
#define close
Definition: test-fs.c:48
tests_aio.unit.metadata_test.TestMetadata.test_from_client_to_server
def test_from_client_to_server(self)
Definition: metadata_test.py:213
tests_aio.unit.metadata_test.TestMetadata._server
_server
Definition: metadata_test.py:206
tests_aio.unit.metadata_test.TestMetadata.test_from_server_to_client
def test_from_server_to_client(self)
Definition: metadata_test.py:220
grpc._simple_stubs.stream_unary
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:330
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._routing_table
_routing_table
Definition: metadata_test.py:86
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests_aio.unit.metadata_test.TestMetadata.tearDown
def tearDown(self)
Definition: metadata_test.py:209
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_unary_stream
def _test_unary_stream(request, context)
Definition: metadata_test.py:127
tests_aio.unit.metadata_test.TestMetadata.test_unary_stream
def test_unary_stream(self)
Definition: metadata_test.py:260
tests_aio.unit.metadata_test._TestGenericHandlerForMethods.service
def service(self, handler_call_details)
Definition: metadata_test.py:175
grpc.stream_stream_rpc_method_handler
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1570
tests_aio.unit.metadata_test._TestGenericHandlerItself._method
def _method(request, unused_context)
Definition: metadata_test.py:182
tests_aio.unit.metadata_test.TestMetadata
Definition: metadata_test.py:203
tests_aio.unit.metadata_test.TestMetadata.test_from_client_to_server_with_list
def test_from_client_to_server_with_list(self)
Definition: metadata_test.py:236
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit.metadata_test.TestMetadata.test_compatibility_with_tuple
def test_compatibility_with_tuple(self)
Definition: metadata_test.py:303
tests_aio.unit.metadata_test._TestGenericHandlerForMethods
Definition: metadata_test.py:83
tests_aio.unit.metadata_test.TestMetadata.test_stream_stream
def test_stream_stream(self)
Definition: metadata_test.py:289
tests_aio.unit.metadata_test._TestGenericHandlerForMethods.__init__
def __init__(self)
Definition: metadata_test.py:85
tests_aio.unit.metadata_test._TestGenericHandlerForMethods._test_stream_stream
def _test_stream_stream(request_iterator, context)
Definition: metadata_test.py:150
tests_aio.unit.metadata_test.TestMetadata.test_invalid_metadata
def test_invalid_metadata(self)
Definition: metadata_test.py:245
tests_aio.unit.metadata_test.TestMetadata.test_inspect_context
def test_inspect_context(self)
Definition: metadata_test.py:313
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49
tests_aio.unit.metadata_test.TestMetadata.test_stream_unary
def test_stream_unary(self)
Definition: metadata_test.py:275


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:39