wait_for_connection_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior of the wait for connection API on client side."""
15 
16 import asyncio
17 import datetime
18 import logging
19 from typing import Callable, Tuple
20 import unittest
21 
22 import grpc
23 from grpc.experimental import aio
24 
25 from src.proto.grpc.testing import messages_pb2
26 from src.proto.grpc.testing import test_pb2_grpc
27 from tests_aio.unit import _common
28 from tests_aio.unit._constants import UNREACHABLE_TARGET
29 from tests_aio.unit._test_base import AioTestBase
30 from tests_aio.unit._test_server import start_test_server
31 
32 _REQUEST = b'\x01\x02\x03'
33 _TEST_METHOD = '/test/Test'
34 
35 _NUM_STREAM_RESPONSES = 5
36 _REQUEST_PAYLOAD_SIZE = 7
37 _RESPONSE_PAYLOAD_SIZE = 42
38 
39 
41  """Tests if wait_for_connection raises connectivity issue."""
42 
43  async def setUp(self):
44  address, self._server = await start_test_server()
45  self._channel = aio.insecure_channel(address)
46  self._phony_channel = aio.insecure_channel(UNREACHABLE_TARGET)
47  self._stub = test_pb2_grpc.TestServiceStub(self._channel)
48 
49  async def tearDown(self):
50  await self._phony_channel.close()
51  await self._channel.close()
52  await self._server.stop(None)
53 
54  async def test_unary_unary_ok(self):
55  call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
56 
57  # No exception raised and no message swallowed.
58  await call.wait_for_connection()
59 
60  response = await call
61  self.assertIsInstance(response, messages_pb2.SimpleResponse)
62 
63  async def test_unary_stream_ok(self):
65  for _ in range(_NUM_STREAM_RESPONSES):
66  request.response_parameters.append(
67  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
68 
69  call = self._stub.StreamingOutputCall(request)
70 
71  # No exception raised and no message swallowed.
72  await call.wait_for_connection()
73 
74  response_cnt = 0
75  async for response in call:
76  response_cnt += 1
77  self.assertIs(type(response),
78  messages_pb2.StreamingOutputCallResponse)
79  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
80 
81  self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
82  self.assertEqual(await call.code(), grpc.StatusCode.OK)
83 
84  async def test_stream_unary_ok(self):
85  call = self._stub.StreamingInputCall()
86 
87  # No exception raised and no message swallowed.
88  await call.wait_for_connection()
89 
90  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
91  request = messages_pb2.StreamingInputCallRequest(payload=payload)
92 
93  for _ in range(_NUM_STREAM_RESPONSES):
94  await call.write(request)
95  await call.done_writing()
96 
97  response = await call
98  self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
99  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
100  response.aggregated_payload_size)
101 
102  self.assertEqual(await call.code(), grpc.StatusCode.OK)
103 
104  async def test_stream_stream_ok(self):
105  call = self._stub.FullDuplexCall()
106 
107  # No exception raised and no message swallowed.
108  await call.wait_for_connection()
109 
111  request.response_parameters.append(
112  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
113 
114  for _ in range(_NUM_STREAM_RESPONSES):
115  await call.write(request)
116  response = await call.read()
117  self.assertIsInstance(response,
118  messages_pb2.StreamingOutputCallResponse)
119  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
120 
121  await call.done_writing()
122 
123  self.assertEqual(grpc.StatusCode.OK, await call.code())
124 
125  async def test_unary_unary_error(self):
126  call = self._phony_channel.unary_unary(_TEST_METHOD)(_REQUEST)
127 
128  with self.assertRaises(aio.AioRpcError) as exception_context:
129  await call.wait_for_connection()
130  rpc_error = exception_context.exception
131  self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
132 
133  async def test_unary_stream_error(self):
134  call = self._phony_channel.unary_stream(_TEST_METHOD)(_REQUEST)
135 
136  with self.assertRaises(aio.AioRpcError) as exception_context:
137  await call.wait_for_connection()
138  rpc_error = exception_context.exception
139  self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
140 
141  async def test_stream_unary_error(self):
142  call = self._phony_channel.stream_unary(_TEST_METHOD)()
143 
144  with self.assertRaises(aio.AioRpcError) as exception_context:
145  await call.wait_for_connection()
146  rpc_error = exception_context.exception
147  self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
148 
149  async def test_stream_stream_error(self):
150  call = self._phony_channel.stream_stream(_TEST_METHOD)()
151 
152  with self.assertRaises(aio.AioRpcError) as exception_context:
153  await call.wait_for_connection()
154  rpc_error = exception_context.exception
155  self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
156 
157 
158 if __name__ == '__main__':
159  logging.basicConfig(level=logging.DEBUG)
160  unittest.main(verbosity=2)
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit.wait_for_connection_test.TestWaitForConnection.setUp
def setUp(self)
Definition: wait_for_connection_test.py:43
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
grpc._simple_stubs.unary_stream
Iterator[ResponseType] unary_stream(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:250
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.wait_for_connection_test.TestWaitForConnection
Definition: wait_for_connection_test.py:40
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc._simple_stubs.stream_stream
Iterator[ResponseType] stream_stream(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:410
tests_aio.unit.wait_for_connection_test.TestWaitForConnection._channel
_channel
Definition: wait_for_connection_test.py:45
close
#define close
Definition: test-fs.c:48
tests_aio.unit.wait_for_connection_test.TestWaitForConnection._server
_server
Definition: wait_for_connection_test.py:44
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
grpc._simple_stubs.stream_unary
ResponseType stream_unary(Iterator[RequestType] request_iterator, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:330
grpc._simple_stubs.unary_unary
ResponseType unary_unary(RequestType request, str target, str method, Optional[Callable[[Any], bytes]] request_serializer=None, Optional[Callable[[bytes], Any]] response_deserializer=None, Sequence[Tuple[AnyStr, AnyStr]] options=(), Optional[grpc.ChannelCredentials] channel_credentials=None, bool insecure=False, Optional[grpc.CallCredentials] call_credentials=None, Optional[grpc.Compression] compression=None, Optional[bool] wait_for_ready=None, Optional[float] timeout=_DEFAULT_TIMEOUT, Optional[Sequence[Tuple[str, Union[str, bytes]]]] metadata=None)
Definition: _simple_stubs.py:169
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.unit.wait_for_connection_test.TestWaitForConnection._phony_channel
_phony_channel
Definition: wait_for_connection_test.py:46
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit.wait_for_connection_test.TestWaitForConnection._stub
_stub
Definition: wait_for_connection_test.py:47
tests_aio.unit._constants
Definition: _constants.py:1
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:51