abort_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import asyncio
16 import gc
17 import logging
18 import time
19 import unittest
20 
21 import grpc
22 from grpc.experimental import aio
23 
24 from tests.unit.framework.common import test_constants
25 from tests_aio.unit._test_base import AioTestBase
26 
27 _UNARY_UNARY_ABORT = '/test/UnaryUnaryAbort'
28 _SUPPRESS_ABORT = '/test/SuppressAbort'
29 _REPLACE_ABORT = '/test/ReplaceAbort'
30 _ABORT_AFTER_REPLY = '/test/AbortAfterReply'
31 
32 _REQUEST = b'\x00\x00\x00'
33 _RESPONSE = b'\x01\x01\x01'
34 _NUM_STREAM_RESPONSES = 5
35 
36 _ABORT_CODE = grpc.StatusCode.RESOURCE_EXHAUSTED
37 _ABORT_DETAILS = 'Phony error details'
38 
39 
41 
42  @staticmethod
43  async def _unary_unary_abort(unused_request, context):
44  await context.abort(_ABORT_CODE, _ABORT_DETAILS)
45  raise RuntimeError('This line should not be executed')
46 
47  @staticmethod
48  async def _suppress_abort(unused_request, context):
49  try:
50  await context.abort(_ABORT_CODE, _ABORT_DETAILS)
51  except aio.AbortError as e:
52  pass
53  return _RESPONSE
54 
55  @staticmethod
56  async def _replace_abort(unused_request, context):
57  try:
58  await context.abort(_ABORT_CODE, _ABORT_DETAILS)
59  except aio.AbortError as e:
60  await context.abort(grpc.StatusCode.INVALID_ARGUMENT,
61  'Override abort!')
62 
63  @staticmethod
64  async def _abort_after_reply(unused_request, context):
65  yield _RESPONSE
66  await context.abort(_ABORT_CODE, _ABORT_DETAILS)
67  raise RuntimeError('This line should not be executed')
68 
69  def service(self, handler_details):
70  if handler_details.method == _UNARY_UNARY_ABORT:
72  if handler_details.method == _SUPPRESS_ABORT:
73  return grpc.unary_unary_rpc_method_handler(self._suppress_abort)
74  if handler_details.method == _REPLACE_ABORT:
75  return grpc.unary_unary_rpc_method_handler(self._replace_abort)
76  if handler_details.method == _ABORT_AFTER_REPLY:
77  return grpc.unary_stream_rpc_method_handler(self._abort_after_reply)
78 
79 
80 async def _start_test_server():
81  server = aio.server()
82  port = server.add_insecure_port('[::]:0')
83  server.add_generic_rpc_handlers((_GenericHandler(),))
84  await server.start()
85  return 'localhost:%d' % port, server
86 
87 
89 
90  async def setUp(self):
91  address, self._server = await _start_test_server()
92  self._channel = aio.insecure_channel(address)
93 
94  async def tearDown(self):
95  await self._channel.close()
96  await self._server.stop(None)
97 
98  async def test_unary_unary_abort(self):
99  method = self._channel.unary_unary(_UNARY_UNARY_ABORT)
100  call = method(_REQUEST)
101 
102  self.assertEqual(_ABORT_CODE, await call.code())
103  self.assertEqual(_ABORT_DETAILS, await call.details())
104 
105  with self.assertRaises(aio.AioRpcError) as exception_context:
106  await call
107 
108  rpc_error = exception_context.exception
109  self.assertEqual(_ABORT_CODE, rpc_error.code())
110  self.assertEqual(_ABORT_DETAILS, rpc_error.details())
111 
112  async def test_suppress_abort(self):
113  method = self._channel.unary_unary(_SUPPRESS_ABORT)
114  call = method(_REQUEST)
115 
116  with self.assertRaises(aio.AioRpcError) as exception_context:
117  await call
118 
119  rpc_error = exception_context.exception
120  self.assertEqual(_ABORT_CODE, rpc_error.code())
121  self.assertEqual(_ABORT_DETAILS, rpc_error.details())
122 
123  async def test_replace_abort(self):
124  method = self._channel.unary_unary(_REPLACE_ABORT)
125  call = method(_REQUEST)
126 
127  with self.assertRaises(aio.AioRpcError) as exception_context:
128  await call
129 
130  rpc_error = exception_context.exception
131  self.assertEqual(_ABORT_CODE, rpc_error.code())
132  self.assertEqual(_ABORT_DETAILS, rpc_error.details())
133 
134  async def test_abort_after_reply(self):
135  method = self._channel.unary_stream(_ABORT_AFTER_REPLY)
136  call = method(_REQUEST)
137 
138  with self.assertRaises(aio.AioRpcError) as exception_context:
139  await call.read()
140  await call.read()
141 
142  rpc_error = exception_context.exception
143  self.assertEqual(_ABORT_CODE, rpc_error.code())
144  self.assertEqual(_ABORT_DETAILS, rpc_error.details())
145 
146  self.assertEqual(_ABORT_CODE, await call.code())
147  self.assertEqual(_ABORT_DETAILS, await call.details())
148 
149 
150 if __name__ == '__main__':
151  logging.basicConfig(level=logging.DEBUG)
152  unittest.main(verbosity=2)
grpc.unary_unary_rpc_method_handler
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1510
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests_aio.unit._test_base
Definition: _test_base.py:1
grpc.unary_stream_rpc_method_handler
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None)
Definition: src/python/grpcio/grpc/__init__.py:1530
grpc.GenericRpcHandler.service
def service(self, handler_call_details)
Definition: src/python/grpcio/grpc/__init__.py:1337
tests_aio.unit.abort_test.TestAbort.setUp
def setUp(self)
Definition: abort_test.py:90
tests_aio.unit.abort_test._GenericHandler
Definition: abort_test.py:40
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.abort_test.TestAbort
Definition: abort_test.py:88
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
close
#define close
Definition: test-fs.c:48
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests_aio.unit.abort_test.TestAbort._channel
_channel
Definition: abort_test.py:92
tests_aio.unit.abort_test._start_test_server
def _start_test_server()
Definition: abort_test.py:80
tests_aio.unit.abort_test.TestAbort._server
_server
Definition: abort_test.py:91
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
method
NSString * method
Definition: ProtoMethod.h:28
tests_aio.unit.abort_test._GenericHandler._unary_unary_abort
def _unary_unary_abort(unused_request, context)
Definition: abort_test.py:43
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:28