wait_for_ready_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Testing the done callbacks mechanism."""
15 
16 import asyncio
17 import gc
18 import logging
19 import platform
20 import time
21 import unittest
22 
23 import grpc
24 from grpc.experimental import aio
25 
26 from src.proto.grpc.testing import messages_pb2
27 from src.proto.grpc.testing import test_pb2_grpc
28 from tests.unit.framework.common import get_socket
29 from tests.unit.framework.common import test_constants
30 from tests_aio.unit import _common
31 from tests_aio.unit._test_base import AioTestBase
32 from tests_aio.unit._test_server import start_test_server
33 
34 _NUM_STREAM_RESPONSES = 5
35 _REQUEST_PAYLOAD_SIZE = 7
36 _RESPONSE_PAYLOAD_SIZE = 42
37 
38 
39 async def _perform_unary_unary(stub, wait_for_ready):
40  await stub.UnaryCall(messages_pb2.SimpleRequest(),
41  timeout=test_constants.LONG_TIMEOUT,
42  wait_for_ready=wait_for_ready)
43 
44 
45 async def _perform_unary_stream(stub, wait_for_ready):
47  for _ in range(_NUM_STREAM_RESPONSES):
48  request.response_parameters.append(
49  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
50 
51  call = stub.StreamingOutputCall(request,
52  timeout=test_constants.LONG_TIMEOUT,
53  wait_for_ready=wait_for_ready)
54 
55  for _ in range(_NUM_STREAM_RESPONSES):
56  await call.read()
57  assert await call.code() == grpc.StatusCode.OK
58 
59 
60 async def _perform_stream_unary(stub, wait_for_ready):
61  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
62  request = messages_pb2.StreamingInputCallRequest(payload=payload)
63 
64  async def gen():
65  for _ in range(_NUM_STREAM_RESPONSES):
66  yield request
67 
68  await stub.StreamingInputCall(gen(),
69  timeout=test_constants.LONG_TIMEOUT,
70  wait_for_ready=wait_for_ready)
71 
72 
73 async def _perform_stream_stream(stub, wait_for_ready):
74  call = stub.FullDuplexCall(timeout=test_constants.LONG_TIMEOUT,
75  wait_for_ready=wait_for_ready)
76 
78  request.response_parameters.append(
79  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
80 
81  for _ in range(_NUM_STREAM_RESPONSES):
82  await call.write(request)
83  response = await call.read()
84  assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
85 
86  await call.done_writing()
87  assert await call.code() == grpc.StatusCode.OK
88 
89 
90 _RPC_ACTIONS = (
91  _perform_unary_unary,
92  _perform_unary_stream,
93  _perform_stream_unary,
94  _perform_stream_stream,
95 )
96 
97 
99 
100  async def setUp(self):
101  address, self._port, self._socket = get_socket(listen=False)
102  self._channel = aio.insecure_channel(f"{address}:{self._port}")
103  self._stub = test_pb2_grpc.TestServiceStub(self._channel)
104  self._socket.close()
105 
106  async def tearDown(self):
107  await self._channel.close()
108 
109  async def _connection_fails_fast(self, wait_for_ready):
110  for action in _RPC_ACTIONS:
111  with self.subTest(name=action):
112  with self.assertRaises(aio.AioRpcError) as exception_context:
113  await action(self._stub, wait_for_ready)
114  rpc_error = exception_context.exception
115  self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
116 
117  async def test_call_wait_for_ready_default(self):
118  """RPC should fail immediately after connection failed."""
119  await self._connection_fails_fast(None)
120 
121  async def test_call_wait_for_ready_disabled(self):
122  """RPC should fail immediately after connection failed."""
123  await self._connection_fails_fast(False)
124 
125  @unittest.skipIf(platform.system() == 'Windows',
126  'https://github.com/grpc/grpc/pull/26729')
127  async def test_call_wait_for_ready_enabled(self):
128  """RPC will wait until the connection is ready."""
129  for action in _RPC_ACTIONS:
130  with self.subTest(name=action.__name__):
131  # Starts the RPC
132  action_task = self.loop.create_task(action(self._stub, True))
133 
134  # Wait for TRANSIENT_FAILURE, and RPC is not aborting
135  await _common.block_until_certain_state(
136  self._channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE)
137 
138  try:
139  # Start the server
140  _, server = await start_test_server(port=self._port)
141 
142  # The RPC should recover itself
143  await action_task
144  finally:
145  if server is not None:
146  await server.stop(None)
147 
148 
149 if __name__ == '__main__':
150  logging.basicConfig(level=logging.DEBUG)
151  unittest.main(verbosity=2)
tests_aio.unit.wait_for_ready_test._perform_stream_unary
def _perform_stream_unary(stub, wait_for_ready)
Definition: wait_for_ready_test.py:60
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit
Definition: src/python/grpcio_tests/tests_aio/unit/__init__.py:1
tests_aio.unit._test_base.AioTestBase.loop
def loop(self)
Definition: _test_base.py:55
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit.wait_for_ready_test.TestWaitForReady._channel
_channel
Definition: wait_for_ready_test.py:102
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.wait_for_ready_test.TestWaitForReady._socket
_socket
Definition: wait_for_ready_test.py:101
tests_aio.unit.wait_for_ready_test._perform_unary_unary
def _perform_unary_unary(stub, wait_for_ready)
Definition: wait_for_ready_test.py:39
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests.unit.framework.common.get_socket
def get_socket(bind_address='localhost', port=0, listen=True, sock_options=_DEFAULT_SOCK_OPTIONS)
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:26
close
#define close
Definition: test-fs.c:48
gen
OPENSSL_EXPORT GENERAL_NAME * gen
Definition: x509v3.h:495
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests_aio.unit.wait_for_ready_test.TestWaitForReady._stub
_stub
Definition: wait_for_ready_test.py:103
client.action
action
Definition: examples/python/xds/client.py:49
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
tests_aio.unit.wait_for_ready_test._perform_unary_stream
def _perform_unary_stream(stub, wait_for_ready)
Definition: wait_for_ready_test.py:45
tests_aio.unit.wait_for_ready_test.TestWaitForReady
Definition: wait_for_ready_test.py:98
tests_aio.unit.wait_for_ready_test.TestWaitForReady.setUp
def setUp(self)
Definition: wait_for_ready_test.py:100
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit.wait_for_ready_test._perform_stream_stream
def _perform_stream_stream(stub, wait_for_ready)
Definition: wait_for_ready_test.py:73
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:51