_error_message_encoding_test.py
Go to the documentation of this file.
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests 'utf-8' encoded error message."""
15 
16 import unittest
17 import weakref
18 
19 import grpc
20 
21 from tests.unit import test_common
22 from tests.unit.framework.common import test_constants
23 
24 _UNICODE_ERROR_MESSAGES = [
25  b'\xe2\x80\x9d'.decode('utf-8'),
26  b'abc\x80\xd0\xaf'.decode('latin-1'),
27  b'\xc3\xa9'.decode('utf-8'),
28 ]
29 
30 _REQUEST = b'\x00\x00\x00'
31 _RESPONSE = b'\x00\x00\x00'
32 
33 _UNARY_UNARY = '/test/UnaryUnary'
34 
35 
37 
38  def __init__(self, request_streaming=None, response_streaming=None):
39  self.request_streaming = request_streaming
40  self.response_streaming = response_streaming
42  self.response_serializer = None
43  self.unary_stream = None
44  self.stream_unary = None
45  self.stream_stream = None
46 
47  def unary_unary(self, request, servicer_context):
48  servicer_context.set_code(grpc.StatusCode.UNKNOWN)
49  servicer_context.set_details(request.decode('utf-8'))
50  return _RESPONSE
51 
52 
54 
55  def __init__(self, test):
56  self._test = test
57 
58  def service(self, handler_call_details):
59  return _MethodHandler()
60 
61 
62 class ErrorMessageEncodingTest(unittest.TestCase):
63 
64  def setUp(self):
65  self._server = test_common.test_server()
66  self._server.add_generic_rpc_handlers(
67  (_GenericHandler(weakref.proxy(self)),))
68  port = self._server.add_insecure_port('[::]:0')
69  self._server.start()
70  self._channel = grpc.insecure_channel('localhost:%d' % port)
71 
72  def tearDown(self):
73  self._server.stop(0)
74  self._channel.close()
75 
77  for message in _UNICODE_ERROR_MESSAGES:
78  multi_callable = self._channel.unary_unary(_UNARY_UNARY)
79  with self.assertRaises(grpc.RpcError) as cm:
80  multi_callable(message.encode('utf-8'))
81 
82  self.assertEqual(cm.exception.code(), grpc.StatusCode.UNKNOWN)
83  self.assertEqual(cm.exception.details(), message)
84 
85 
86 if __name__ == '__main__':
87  unittest.main(verbosity=2)
tests.unit._error_message_encoding_test._MethodHandler.stream_stream
stream_stream
Definition: _error_message_encoding_test.py:45
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest
Definition: _error_message_encoding_test.py:62
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._error_message_encoding_test._MethodHandler.__init__
def __init__(self, request_streaming=None, response_streaming=None)
Definition: _error_message_encoding_test.py:38
tests.unit._error_message_encoding_test._MethodHandler
Definition: _error_message_encoding_test.py:36
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest._channel
_channel
Definition: _error_message_encoding_test.py:70
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
tests.unit._error_message_encoding_test._MethodHandler.unary_unary
def unary_unary(self, request, servicer_context)
Definition: _error_message_encoding_test.py:47
tests.unit._error_message_encoding_test._MethodHandler.request_streaming
request_streaming
Definition: _error_message_encoding_test.py:39
tests.unit._error_message_encoding_test._GenericHandler.__init__
def __init__(self, test)
Definition: _error_message_encoding_test.py:55
tests.unit._error_message_encoding_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _error_message_encoding_test.py:58
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest._server
_server
Definition: _error_message_encoding_test.py:65
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit._error_message_encoding_test._MethodHandler.request_deserializer
request_deserializer
Definition: _error_message_encoding_test.py:41
tests.unit._exit_scenarios.multi_callable
multi_callable
Definition: _exit_scenarios.py:216
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._error_message_encoding_test._GenericHandler
Definition: _error_message_encoding_test.py:53
close
#define close
Definition: test-fs.c:48
tests.unit._error_message_encoding_test._MethodHandler.stream_unary
stream_unary
Definition: _error_message_encoding_test.py:44
tests.unit._error_message_encoding_test._MethodHandler.response_streaming
response_streaming
Definition: _error_message_encoding_test.py:40
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest.setUp
def setUp(self)
Definition: _error_message_encoding_test.py:64
grpc._common.decode
def decode(b)
Definition: grpc/_common.py:75
tests.unit._error_message_encoding_test._MethodHandler.response_serializer
response_serializer
Definition: _error_message_encoding_test.py:42
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest.testMessageEncoding
def testMessageEncoding(self)
Definition: _error_message_encoding_test.py:76
tests.unit._error_message_encoding_test._MethodHandler.unary_stream
unary_stream
Definition: _error_message_encoding_test.py:43
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit._error_message_encoding_test.ErrorMessageEncodingTest.tearDown
def tearDown(self)
Definition: _error_message_encoding_test.py:72
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._error_message_encoding_test._GenericHandler._test
_test
Definition: _error_message_encoding_test.py:56
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27