_empty_message_test.py
Go to the documentation of this file.
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import logging
16 import unittest
17 
18 import grpc
19 
20 from tests.unit import test_common
21 from tests.unit.framework.common import test_constants
22 
23 _REQUEST = b''
24 _RESPONSE = b''
25 
26 _UNARY_UNARY = '/test/UnaryUnary'
27 _UNARY_STREAM = '/test/UnaryStream'
28 _STREAM_UNARY = '/test/StreamUnary'
29 _STREAM_STREAM = '/test/StreamStream'
30 
31 
32 def handle_unary_unary(request, servicer_context):
33  return _RESPONSE
34 
35 
36 def handle_unary_stream(request, servicer_context):
37  for _ in range(test_constants.STREAM_LENGTH):
38  yield _RESPONSE
39 
40 
41 def handle_stream_unary(request_iterator, servicer_context):
42  for request in request_iterator:
43  pass
44  return _RESPONSE
45 
46 
47 def handle_stream_stream(request_iterator, servicer_context):
48  for request in request_iterator:
49  yield _RESPONSE
50 
51 
53 
54  def __init__(self, request_streaming, response_streaming):
55  self.request_streaming = request_streaming
56  self.response_streaming = response_streaming
58  self.response_serializer = None
59  self.unary_unary = None
60  self.unary_stream = None
61  self.stream_unary = None
62  self.stream_stream = None
63  if self.request_streaming and self.response_streaming:
64  self.stream_stream = handle_stream_stream
65  elif self.request_streaming:
66  self.stream_unary = handle_stream_unary
67  elif self.response_streaming:
68  self.unary_stream = handle_unary_stream
69  else:
70  self.unary_unary = handle_unary_unary
71 
72 
74 
75  def service(self, handler_call_details):
76  if handler_call_details.method == _UNARY_UNARY:
77  return _MethodHandler(False, False)
78  elif handler_call_details.method == _UNARY_STREAM:
79  return _MethodHandler(False, True)
80  elif handler_call_details.method == _STREAM_UNARY:
81  return _MethodHandler(True, False)
82  elif handler_call_details.method == _STREAM_STREAM:
83  return _MethodHandler(True, True)
84  else:
85  return None
86 
87 
88 class EmptyMessageTest(unittest.TestCase):
89 
90  def setUp(self):
91  self._server = test_common.test_server()
92  self._server.add_generic_rpc_handlers((_GenericHandler(),))
93  port = self._server.add_insecure_port('[::]:0')
94  self._server.start()
95  self._channel = grpc.insecure_channel('localhost:%d' % port)
96 
97  def tearDown(self):
98  self._server.stop(0)
99  self._channel.close()
100 
101  def testUnaryUnary(self):
102  response = self._channel.unary_unary(_UNARY_UNARY)(_REQUEST)
103  self.assertEqual(_RESPONSE, response)
104 
105  def testUnaryStream(self):
106  response_iterator = self._channel.unary_stream(_UNARY_STREAM)(_REQUEST)
107  self.assertSequenceEqual([_RESPONSE] * test_constants.STREAM_LENGTH,
108  list(response_iterator))
109 
110  def testStreamUnary(self):
111  response = self._channel.stream_unary(_STREAM_UNARY)(iter(
112  [_REQUEST] * test_constants.STREAM_LENGTH))
113  self.assertEqual(_RESPONSE, response)
114 
115  def testStreamStream(self):
116  response_iterator = self._channel.stream_stream(_STREAM_STREAM)(iter(
117  [_REQUEST] * test_constants.STREAM_LENGTH))
118  self.assertSequenceEqual([_RESPONSE] * test_constants.STREAM_LENGTH,
119  list(response_iterator))
120 
121 
122 if __name__ == '__main__':
123  logging.basicConfig()
124  unittest.main(verbosity=2)
tests.unit._empty_message_test.EmptyMessageTest.testUnaryUnary
def testUnaryUnary(self)
Definition: _empty_message_test.py:101
tests.unit._empty_message_test._GenericHandler
Definition: _empty_message_test.py:73
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
tests.unit._empty_message_test.EmptyMessageTest.testStreamUnary
def testStreamUnary(self)
Definition: _empty_message_test.py:110
tests.unit._empty_message_test.handle_unary_unary
def handle_unary_unary(request, servicer_context)
Definition: _empty_message_test.py:32
tests.unit._empty_message_test._MethodHandler.response_serializer
response_serializer
Definition: _empty_message_test.py:58
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests.unit._empty_message_test._MethodHandler.stream_stream
stream_stream
Definition: _empty_message_test.py:62
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests.unit._empty_message_test.EmptyMessageTest
Definition: _empty_message_test.py:88
tests.unit._empty_message_test._MethodHandler.request_deserializer
request_deserializer
Definition: _empty_message_test.py:57
tests.unit._empty_message_test._MethodHandler.request_streaming
request_streaming
Definition: _empty_message_test.py:55
tests.unit._empty_message_test._GenericHandler.service
def service(self, handler_call_details)
Definition: _empty_message_test.py:75
start
static uint64_t start
Definition: benchmark-pound.c:74
tests.unit._empty_message_test.EmptyMessageTest.setUp
def setUp(self)
Definition: _empty_message_test.py:90
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
tests.unit._empty_message_test.EmptyMessageTest.testUnaryStream
def testUnaryStream(self)
Definition: _empty_message_test.py:105
grpc.GenericRpcHandler
Definition: src/python/grpcio/grpc/__init__.py:1333
tests.unit._empty_message_test._MethodHandler
Definition: _empty_message_test.py:52
tests.unit._empty_message_test._MethodHandler.response_streaming
response_streaming
Definition: _empty_message_test.py:56
close
#define close
Definition: test-fs.c:48
tests.unit._empty_message_test.handle_stream_unary
def handle_stream_unary(request_iterator, servicer_context)
Definition: _empty_message_test.py:41
tests.unit._empty_message_test.EmptyMessageTest.tearDown
def tearDown(self)
Definition: _empty_message_test.py:97
grpc._simple_stubs.stream_unary
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:330
tests.unit._empty_message_test.handle_unary_stream
def handle_unary_stream(request, servicer_context)
Definition: _empty_message_test.py:36
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
tests.unit
Definition: src/python/grpcio_tests/tests/unit/__init__.py:1
tests.unit._empty_message_test._MethodHandler.unary_stream
unary_stream
Definition: _empty_message_test.py:60
tests.unit._empty_message_test.handle_stream_stream
def handle_stream_stream(request_iterator, servicer_context)
Definition: _empty_message_test.py:47
tests.unit._empty_message_test.EmptyMessageTest._server
_server
Definition: _empty_message_test.py:91
tests.unit._empty_message_test._MethodHandler.stream_unary
stream_unary
Definition: _empty_message_test.py:61
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
iter
Definition: test_winkernel.cpp:47
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
tests.unit._empty_message_test._MethodHandler.__init__
def __init__(self, request_streaming, response_streaming)
Definition: _empty_message_test.py:54
tests.unit._empty_message_test._MethodHandler.unary_unary
unary_unary
Definition: _empty_message_test.py:59
grpc.RpcMethodHandler
Definition: src/python/grpcio/grpc/__init__.py:1288
tests.unit._empty_message_test.EmptyMessageTest._channel
_channel
Definition: _empty_message_test.py:95
tests.unit._empty_message_test.EmptyMessageTest.testStreamStream
def testStreamStream(self)
Definition: _empty_message_test.py:115


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:27