done_callback_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Testing the done callbacks mechanism."""
15 
16 import asyncio
17 import logging
18 import unittest
19 
20 import grpc
21 from grpc.experimental import aio
22 
23 from src.proto.grpc.testing import messages_pb2
24 from src.proto.grpc.testing import test_pb2_grpc
25 from tests_aio.unit._common import inject_callbacks
26 from tests_aio.unit._test_base import AioTestBase
27 from tests_aio.unit._test_server import start_test_server
28 
29 _NUM_STREAM_RESPONSES = 5
30 _REQUEST_PAYLOAD_SIZE = 7
31 _RESPONSE_PAYLOAD_SIZE = 42
32 _REQUEST = b'\x01\x02\x03'
33 _RESPONSE = b'\x04\x05\x06'
34 _TEST_METHOD = '/test/Test'
35 _FAKE_METHOD = '/test/Fake'
36 
37 
39 
40  async def setUp(self):
41  address, self._server = await start_test_server()
42  self._channel = aio.insecure_channel(address)
43  self._stub = test_pb2_grpc.TestServiceStub(self._channel)
44 
45  async def tearDown(self):
46  await self._channel.close()
47  await self._server.stop(None)
48 
49  async def test_add_after_done(self):
50  call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
51  self.assertEqual(grpc.StatusCode.OK, await call.code())
52 
53  validation = inject_callbacks(call)
54  await validation
55 
56  async def test_unary_unary(self):
57  call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
58  validation = inject_callbacks(call)
59 
60  self.assertEqual(grpc.StatusCode.OK, await call.code())
61 
62  await validation
63 
64  async def test_unary_stream(self):
66  for _ in range(_NUM_STREAM_RESPONSES):
67  request.response_parameters.append(
68  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
69 
70  call = self._stub.StreamingOutputCall(request)
71  validation = inject_callbacks(call)
72 
73  response_cnt = 0
74  async for response in call:
75  response_cnt += 1
76  self.assertIsInstance(response,
77  messages_pb2.StreamingOutputCallResponse)
78  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
79 
80  self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
81  self.assertEqual(grpc.StatusCode.OK, await call.code())
82 
83  await validation
84 
85  async def test_stream_unary(self):
86  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
87  request = messages_pb2.StreamingInputCallRequest(payload=payload)
88 
89  async def gen():
90  for _ in range(_NUM_STREAM_RESPONSES):
91  yield request
92 
93  call = self._stub.StreamingInputCall(gen())
94  validation = inject_callbacks(call)
95 
96  response = await call
97  self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
98  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
99  response.aggregated_payload_size)
100  self.assertEqual(grpc.StatusCode.OK, await call.code())
101 
102  await validation
103 
104  async def test_stream_stream(self):
105  call = self._stub.FullDuplexCall()
106  validation = inject_callbacks(call)
107 
109  request.response_parameters.append(
110  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
111 
112  for _ in range(_NUM_STREAM_RESPONSES):
113  await call.write(request)
114  response = await call.read()
115  self.assertIsInstance(response,
116  messages_pb2.StreamingOutputCallResponse)
117  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
118 
119  await call.done_writing()
120 
121  self.assertEqual(grpc.StatusCode.OK, await call.code())
122  await validation
123 
124 
126 
127  async def setUp(self):
128  self._server = aio.server()
129  port = self._server.add_insecure_port('[::]:0')
130  self._channel = aio.insecure_channel('localhost:%d' % port)
131 
132  async def tearDown(self):
133  await self._channel.close()
134  await self._server.stop(None)
135 
136  async def _register_method_handler(self, method_handler):
137  """Registers method handler and starts the server"""
138  generic_handler = grpc.method_handlers_generic_handler(
139  'test',
140  dict(Test=method_handler),
141  )
142  self._server.add_generic_rpc_handlers((generic_handler,))
143  await self._server.start()
144 
145  async def test_unary_unary(self):
146  validation_future = self.loop.create_future()
147 
148  async def test_handler(request: bytes, context: aio.ServicerContext):
149  self.assertEqual(_REQUEST, request)
150  validation_future.set_result(inject_callbacks(context))
151  return _RESPONSE
152 
153  await self._register_method_handler(
155  response = await self._channel.unary_unary(_TEST_METHOD)(_REQUEST)
156  self.assertEqual(_RESPONSE, response)
157 
158  validation = await validation_future
159  await validation
160 
161  async def test_unary_stream(self):
162  validation_future = self.loop.create_future()
163 
164  async def test_handler(request: bytes, context: aio.ServicerContext):
165  self.assertEqual(_REQUEST, request)
166  validation_future.set_result(inject_callbacks(context))
167  for _ in range(_NUM_STREAM_RESPONSES):
168  yield _RESPONSE
169 
170  await self._register_method_handler(
172  call = self._channel.unary_stream(_TEST_METHOD)(_REQUEST)
173  async for response in call:
174  self.assertEqual(_RESPONSE, response)
175 
176  validation = await validation_future
177  await validation
178 
179  async def test_stream_unary(self):
180  validation_future = self.loop.create_future()
181 
182  async def test_handler(request_iterator, context: aio.ServicerContext):
183  validation_future.set_result(inject_callbacks(context))
184 
185  async for request in request_iterator:
186  self.assertEqual(_REQUEST, request)
187  return _RESPONSE
188 
189  await self._register_method_handler(
191  call = self._channel.stream_unary(_TEST_METHOD)()
192  for _ in range(_NUM_STREAM_RESPONSES):
193  await call.write(_REQUEST)
194  await call.done_writing()
195  self.assertEqual(_RESPONSE, await call)
196 
197  validation = await validation_future
198  await validation
199 
200  async def test_stream_stream(self):
201  validation_future = self.loop.create_future()
202 
203  async def test_handler(request_iterator, context: aio.ServicerContext):
204  validation_future.set_result(inject_callbacks(context))
205 
206  async for request in request_iterator:
207  self.assertEqual(_REQUEST, request)
208  return _RESPONSE
209 
210  await self._register_method_handler(
212  call = self._channel.stream_stream(_TEST_METHOD)()
213  for _ in range(_NUM_STREAM_RESPONSES):
214  await call.write(_REQUEST)
215  await call.done_writing()
216  async for response in call:
217  self.assertEqual(_RESPONSE, response)
218 
219  validation = await validation_future
220  await validation
221 
222  async def test_error_in_handler(self):
223  """Errors in the handler still triggers callbacks."""
224  validation_future = self.loop.create_future()
225 
226  async def test_handler(request: bytes, context: aio.ServicerContext):
227  self.assertEqual(_REQUEST, request)
228  validation_future.set_result(inject_callbacks(context))
229  raise RuntimeError('A test RuntimeError')
230 
231  await self._register_method_handler(
233  with self.assertRaises(aio.AioRpcError) as exception_context:
234  await self._channel.unary_unary(_TEST_METHOD)(_REQUEST)
235  rpc_error = exception_context.exception
236  self.assertEqual(grpc.StatusCode.UNKNOWN, rpc_error.code())
237 
238  validation = await validation_future
239  await validation
240 
241  async def test_error_in_callback(self):
242  """Errors in the callback won't be propagated to client."""
243  validation_future = self.loop.create_future()
244 
245  async def test_handler(request: bytes, context: aio.ServicerContext):
246  self.assertEqual(_REQUEST, request)
247 
248  def exception_raiser(unused_context):
249  raise RuntimeError('A test RuntimeError')
250 
251  context.add_done_callback(exception_raiser)
252  validation_future.set_result(inject_callbacks(context))
253  return _RESPONSE
254 
255  await self._register_method_handler(
257 
258  response = await self._channel.unary_unary(_TEST_METHOD)(_REQUEST)
259  self.assertEqual(_RESPONSE, response)
260 
261  # Following callbacks won't be invoked, if one of the callback crashed.
262  validation = await validation_future
263  with self.assertRaises(asyncio.TimeoutError):
264  await validation
265 
266  # Invoke RPC one more time to ensure the toxic callback won't break the
267  # server.
268  with self.assertRaises(aio.AioRpcError) as exception_context:
269  await self._channel.unary_unary(_FAKE_METHOD)(_REQUEST)
270  rpc_error = exception_context.exception
271  self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, rpc_error.code())
272 
273 
274 if __name__ == '__main__':
275  logging.basicConfig(level=logging.DEBUG)
276  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
grpc.stream_unary_rpc_method_handler
def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1550
tests_aio.unit.done_callback_test.TestServerSideDoneCallback
Definition: done_callback_test.py:125
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests_aio.unit._test_base
Definition: _test_base.py:1
grpc.unary_stream_rpc_method_handler
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1530
tests_aio.unit.done_callback_test.TestClientSideDoneCallback._stub
_stub
Definition: done_callback_test.py:43
tests_aio.unit.done_callback_test.TestServerSideDoneCallback._channel
_channel
Definition: done_callback_test.py:130
tests_aio.unit.done_callback_test.TestServerSideDoneCallback.setUp
def setUp(self)
Definition: done_callback_test.py:127
start
static uint64_t start
Definition: benchmark-pound.c:74
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.done_callback_test.TestClientSideDoneCallback._channel
_channel
Definition: done_callback_test.py:42
tests_aio.unit.done_callback_test.TestClientSideDoneCallback._server
_server
Definition: done_callback_test.py:41
tests_aio.unit.done_callback_test.TestClientSideDoneCallback
Definition: done_callback_test.py:38
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
close
#define close
Definition: test-fs.c:48
gen
OPENSSL_EXPORT GENERAL_NAME * gen
Definition: x509v3.h:495
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests_aio.unit.done_callback_test.TestServerSideDoneCallback._server
_server
Definition: done_callback_test.py:128
tests_aio.unit._common.inject_callbacks
def inject_callbacks(aio.Call call)
Definition: tests/tests_aio/unit/_common.py:48
grpc._simple_stubs.stream_unary
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:330
grpc.method_handlers_generic_handler
def method_handlers_generic_handler(service, method_handlers)
Definition: src/python/grpcio/grpc/__init__.py:1590
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests_aio.unit.done_callback_test.TestClientSideDoneCallback.setUp
def setUp(self)
Definition: done_callback_test.py:40
tests_aio.unit._common
Definition: tests/tests_aio/unit/_common.py:1
grpc.stream_stream_rpc_method_handler
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1570
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:12