compatibility_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Testing the compatibility between AsyncIO stack and the old stack."""
15 
16 import asyncio
17 from concurrent.futures import ThreadPoolExecutor
18 import logging
19 import os
20 import random
21 import threading
22 from typing import Callable, Iterable, Sequence, Tuple
23 import unittest
24 
25 import grpc
26 from grpc.experimental import aio
27 
28 from src.proto.grpc.testing import messages_pb2
29 from src.proto.grpc.testing import test_pb2_grpc
30 from tests.unit.framework.common import test_constants
31 from tests_aio.unit import _common
32 from tests_aio.unit._test_base import AioTestBase
33 from tests_aio.unit._test_server import TestServiceServicer
34 from tests_aio.unit._test_server import start_test_server
35 
36 _NUM_STREAM_RESPONSES = 5
37 _REQUEST_PAYLOAD_SIZE = 7
38 _RESPONSE_PAYLOAD_SIZE = 42
39 _REQUEST = b'\x03\x07'
40 
41 
42 def _unique_options() -> Sequence[Tuple[str, float]]:
43  return (('iv', random.random()),)
44 
45 
46 @unittest.skipIf(
47  os.environ.get('GRPC_ASYNCIO_ENGINE', '').lower() == 'custom_io_manager',
48  'Compatible mode needs POLLER completion queue.')
50 
51  async def setUp(self):
52  self._async_server = aio.server(
53  options=(('grpc.so_reuseport', 0),),
54  migration_thread_pool=ThreadPoolExecutor())
55 
56  test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(),
57  self._async_server)
59  self._async_server.add_generic_rpc_handlers((self._adhoc_handlers,))
60 
61  port = self._async_server.add_insecure_port('[::]:0')
62  address = 'localhost:%d' % port
63  await self._async_server.start()
64 
65  # Create async stub
66  self._async_channel = aio.insecure_channel(address,
67  options=_unique_options())
68  self._async_stub = test_pb2_grpc.TestServiceStub(self._async_channel)
69 
70  # Create sync stub
72  options=_unique_options())
73  self._sync_stub = test_pb2_grpc.TestServiceStub(self._sync_channel)
74 
75  async def tearDown(self):
76  self._sync_channel.close()
77  await self._async_channel.close()
78  await self._async_server.stop(None)
79 
80  async def _run_in_another_thread(self, func: Callable[[], None]):
81  work_done = asyncio.Event()
82 
83  def thread_work():
84  func()
85  self.loop.call_soon_threadsafe(work_done.set)
86 
87  thread = threading.Thread(target=thread_work, daemon=True)
88  thread.start()
89  await work_done.wait()
90  thread.join()
91 
92  async def test_unary_unary(self):
93  # Calling async API in this thread
94  await self._async_stub.UnaryCall(messages_pb2.SimpleRequest(),
95  timeout=test_constants.LONG_TIMEOUT)
96 
97  # Calling sync API in a different thread
98  def sync_work() -> None:
99  response, call = self._sync_stub.UnaryCall.with_call(
101  timeout=test_constants.LONG_TIMEOUT)
102  self.assertIsInstance(response, messages_pb2.SimpleResponse)
103  self.assertEqual(grpc.StatusCode.OK, call.code())
104 
105  await self._run_in_another_thread(sync_work)
106 
107  async def test_unary_stream(self):
109  for _ in range(_NUM_STREAM_RESPONSES):
110  request.response_parameters.append(
111  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
112 
113  # Calling async API in this thread
114  call = self._async_stub.StreamingOutputCall(request)
115 
116  for _ in range(_NUM_STREAM_RESPONSES):
117  await call.read()
118  self.assertEqual(grpc.StatusCode.OK, await call.code())
119 
120  # Calling sync API in a different thread
121  def sync_work() -> None:
122  response_iterator = self._sync_stub.StreamingOutputCall(request)
123  for response in response_iterator:
124  assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
125  self.assertEqual(grpc.StatusCode.OK, response_iterator.code())
126 
127  await self._run_in_another_thread(sync_work)
128 
129  async def test_stream_unary(self):
130  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
131  request = messages_pb2.StreamingInputCallRequest(payload=payload)
132 
133  # Calling async API in this thread
134  async def gen():
135  for _ in range(_NUM_STREAM_RESPONSES):
136  yield request
137 
138  response = await self._async_stub.StreamingInputCall(gen())
139  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
140  response.aggregated_payload_size)
141 
142  # Calling sync API in a different thread
143  def sync_work() -> None:
144  response = self._sync_stub.StreamingInputCall(
145  iter([request] * _NUM_STREAM_RESPONSES))
146  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
147  response.aggregated_payload_size)
148 
149  await self._run_in_another_thread(sync_work)
150 
151  async def test_stream_stream(self):
153  request.response_parameters.append(
154  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
155 
156  # Calling async API in this thread
157  call = self._async_stub.FullDuplexCall()
158 
159  for _ in range(_NUM_STREAM_RESPONSES):
160  await call.write(request)
161  response = await call.read()
162  assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
163 
164  await call.done_writing()
165  assert await call.code() == grpc.StatusCode.OK
166 
167  # Calling sync API in a different thread
168  def sync_work() -> None:
169  response_iterator = self._sync_stub.FullDuplexCall(iter([request]))
170  for response in response_iterator:
171  assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
172  self.assertEqual(grpc.StatusCode.OK, response_iterator.code())
173 
174  await self._run_in_another_thread(sync_work)
175 
176  async def test_server(self):
177 
178  class GenericHandlers(grpc.GenericRpcHandler):
179 
180  def service(self, handler_call_details):
181  return grpc.unary_unary_rpc_method_handler(lambda x, _: x)
182 
183  # It's fine to instantiate server object in the event loop thread.
184  # The server will spawn its own serving thread.
185  server = grpc.server(ThreadPoolExecutor(),
186  handlers=(GenericHandlers(),))
187  port = server.add_insecure_port('localhost:0')
188  server.start()
189 
190  def sync_work() -> None:
191  for _ in range(100):
192  with grpc.insecure_channel('localhost:%d' % port) as channel:
193  response = channel.unary_unary('/test/test')(b'\x07\x08')
194  self.assertEqual(response, b'\x07\x08')
195 
196  await self._run_in_another_thread(sync_work)
197 
198  async def test_many_loop(self):
199  address, server = await start_test_server()
200 
201  # Run another loop in another thread
202  def sync_work():
203 
204  async def async_work():
205  # Create async stub
206  async_channel = aio.insecure_channel(address,
207  options=_unique_options())
208  async_stub = test_pb2_grpc.TestServiceStub(async_channel)
209 
210  call = async_stub.UnaryCall(messages_pb2.SimpleRequest())
211  response = await call
212  self.assertIsInstance(response, messages_pb2.SimpleResponse)
213  self.assertEqual(grpc.StatusCode.OK, await call.code())
214 
215  loop = asyncio.new_event_loop()
216  loop.run_until_complete(async_work())
217 
218  await self._run_in_another_thread(sync_work)
219  await server.stop(None)
220 
221  async def test_sync_unary_unary_success(self):
222 
223  @grpc.unary_unary_rpc_method_handler
224  def echo_unary_unary(request: bytes, unused_context):
225  return request
226 
227  self._adhoc_handlers.set_adhoc_handler(echo_unary_unary)
228  response = await self._async_channel.unary_unary(_common.ADHOC_METHOD
229  )(_REQUEST)
230  self.assertEqual(_REQUEST, response)
231 
232  async def test_sync_unary_unary_metadata(self):
233  metadata = (('unique', 'key-42'),)
234 
235  @grpc.unary_unary_rpc_method_handler
236  def metadata_unary_unary(request: bytes, context: grpc.ServicerContext):
237  context.send_initial_metadata(metadata)
238  return request
239 
240  self._adhoc_handlers.set_adhoc_handler(metadata_unary_unary)
241  call = self._async_channel.unary_unary(_common.ADHOC_METHOD)(_REQUEST)
242  self.assertTrue(
243  _common.seen_metadata(aio.Metadata(*metadata), await
244  call.initial_metadata()))
245 
246  async def test_sync_unary_unary_abort(self):
247 
248  @grpc.unary_unary_rpc_method_handler
249  def abort_unary_unary(request: bytes, context: grpc.ServicerContext):
250  context.abort(grpc.StatusCode.INTERNAL, 'Test')
251 
252  self._adhoc_handlers.set_adhoc_handler(abort_unary_unary)
253  with self.assertRaises(aio.AioRpcError) as exception_context:
254  await self._async_channel.unary_unary(_common.ADHOC_METHOD
255  )(_REQUEST)
256  self.assertEqual(grpc.StatusCode.INTERNAL,
257  exception_context.exception.code())
258 
259  async def test_sync_unary_unary_set_code(self):
260 
261  @grpc.unary_unary_rpc_method_handler
262  def set_code_unary_unary(request: bytes, context: grpc.ServicerContext):
263  context.set_code(grpc.StatusCode.INTERNAL)
264 
265  self._adhoc_handlers.set_adhoc_handler(set_code_unary_unary)
266  with self.assertRaises(aio.AioRpcError) as exception_context:
267  await self._async_channel.unary_unary(_common.ADHOC_METHOD
268  )(_REQUEST)
269  self.assertEqual(grpc.StatusCode.INTERNAL,
270  exception_context.exception.code())
271 
272  async def test_sync_unary_stream_success(self):
273 
274  @grpc.unary_stream_rpc_method_handler
275  def echo_unary_stream(request: bytes, unused_context):
276  for _ in range(_NUM_STREAM_RESPONSES):
277  yield request
278 
279  self._adhoc_handlers.set_adhoc_handler(echo_unary_stream)
280  call = self._async_channel.unary_stream(_common.ADHOC_METHOD)(_REQUEST)
281  async for response in call:
282  self.assertEqual(_REQUEST, response)
283 
284  async def test_sync_unary_stream_error(self):
285 
286  @grpc.unary_stream_rpc_method_handler
287  def error_unary_stream(request: bytes, unused_context):
288  for _ in range(_NUM_STREAM_RESPONSES):
289  yield request
290  raise RuntimeError('Test')
291 
292  self._adhoc_handlers.set_adhoc_handler(error_unary_stream)
293  call = self._async_channel.unary_stream(_common.ADHOC_METHOD)(_REQUEST)
294  with self.assertRaises(aio.AioRpcError) as exception_context:
295  async for response in call:
296  self.assertEqual(_REQUEST, response)
297  self.assertEqual(grpc.StatusCode.UNKNOWN,
298  exception_context.exception.code())
299 
300  async def test_sync_stream_unary_success(self):
301 
302  @grpc.stream_unary_rpc_method_handler
303  def echo_stream_unary(request_iterator: Iterable[bytes],
304  unused_context):
305  self.assertEqual(len(list(request_iterator)), _NUM_STREAM_RESPONSES)
306  return _REQUEST
307 
308  self._adhoc_handlers.set_adhoc_handler(echo_stream_unary)
309  request_iterator = iter([_REQUEST] * _NUM_STREAM_RESPONSES)
310  response = await self._async_channel.stream_unary(_common.ADHOC_METHOD
311  )(request_iterator)
312  self.assertEqual(_REQUEST, response)
313 
314  async def test_sync_stream_unary_error(self):
315 
316  @grpc.stream_unary_rpc_method_handler
317  def echo_stream_unary(request_iterator: Iterable[bytes],
318  unused_context):
319  self.assertEqual(len(list(request_iterator)), _NUM_STREAM_RESPONSES)
320  raise RuntimeError('Test')
321 
322  self._adhoc_handlers.set_adhoc_handler(echo_stream_unary)
323  request_iterator = iter([_REQUEST] * _NUM_STREAM_RESPONSES)
324  with self.assertRaises(aio.AioRpcError) as exception_context:
325  response = await self._async_channel.stream_unary(
326  _common.ADHOC_METHOD)(request_iterator)
327  self.assertEqual(grpc.StatusCode.UNKNOWN,
328  exception_context.exception.code())
329 
330  async def test_sync_stream_stream_success(self):
331 
332  @grpc.stream_stream_rpc_method_handler
333  def echo_stream_stream(request_iterator: Iterable[bytes],
334  unused_context):
335  for request in request_iterator:
336  yield request
337 
338  self._adhoc_handlers.set_adhoc_handler(echo_stream_stream)
339  request_iterator = iter([_REQUEST] * _NUM_STREAM_RESPONSES)
340  call = self._async_channel.stream_stream(
341  _common.ADHOC_METHOD)(request_iterator)
342  async for response in call:
343  self.assertEqual(_REQUEST, response)
344 
345  async def test_sync_stream_stream_error(self):
346 
347  @grpc.stream_stream_rpc_method_handler
348  def echo_stream_stream(request_iterator: Iterable[bytes],
349  unused_context):
350  for request in request_iterator:
351  yield request
352  raise RuntimeError('test')
353 
354  self._adhoc_handlers.set_adhoc_handler(echo_stream_stream)
355  request_iterator = iter([_REQUEST] * _NUM_STREAM_RESPONSES)
356  call = self._async_channel.stream_stream(
357  _common.ADHOC_METHOD)(request_iterator)
358  with self.assertRaises(aio.AioRpcError) as exception_context:
359  async for response in call:
360  self.assertEqual(_REQUEST, response)
361  self.assertEqual(grpc.StatusCode.UNKNOWN,
362  exception_context.exception.code())
363 
364 
365 if __name__ == '__main__':
366  logging.basicConfig(level=logging.DEBUG)
367  unittest.main(verbosity=2)
tests_aio.unit._common.AdhocGenericHandler
Definition: tests/tests_aio/unit/_common.py:107
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit.compatibility_test.TestCompatibility._async_server
_async_server
Definition: compatibility_test.py:52
tests.unit._abort_test.abort_unary_unary
def abort_unary_unary(request, servicer_context)
Definition: _abort_test.py:52
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit.compatibility_test.TestCompatibility
Definition: compatibility_test.py:49
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.compatibility_test.TestCompatibility._adhoc_handlers
_adhoc_handlers
Definition: compatibility_test.py:58
start
static uint64_t start
Definition: benchmark-pound.c:74
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.compatibility_test.TestCompatibility.setUp
def setUp(self)
Definition: compatibility_test.py:51
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
tests_aio.unit._test_server.TestServiceServicer
Definition: tests_aio/unit/_test_server.py:52
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
close
#define close
Definition: test-fs.c:48
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
tests_aio.unit.compatibility_test.TestCompatibility._async_stub
_async_stub
Definition: compatibility_test.py:68
tests.unit.test_common.test_server
def test_server(max_workers=10, reuse_port=False)
Definition: test_common.py:103
gen
OPENSSL_EXPORT GENERAL_NAME * gen
Definition: x509v3.h:495
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
grpc._simple_stubs.stream_unary
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:330
tests_aio.unit.compatibility_test.TestCompatibility._async_channel
_async_channel
Definition: compatibility_test.py:66
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
func
const EVP_CIPHER *(* func)(void)
Definition: cipher_extra.c:73
tests_aio.unit.compatibility_test.TestCompatibility._sync_channel
_sync_channel
Definition: compatibility_test.py:71
tests_aio.unit.compatibility_test.TestCompatibility._sync_stub
_sync_stub
Definition: compatibility_test.py:73
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
iter
Definition: test_winkernel.cpp:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
tests_aio.unit.compatibility_test._unique_options
Sequence[Tuple[str, float]] _unique_options()
Definition: compatibility_test.py:42
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:52