channel_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior of the grpc.aio.Channel class."""
15 
16 import logging
17 import os
18 import unittest
19 
20 import grpc
21 from grpc.experimental import aio
22 
23 from src.proto.grpc.testing import messages_pb2
24 from src.proto.grpc.testing import test_pb2_grpc
25 from tests.unit.framework.common import test_constants
26 from tests_aio.unit._constants import UNARY_CALL_WITH_SLEEP_VALUE
27 from tests_aio.unit._constants import UNREACHABLE_TARGET
28 from tests_aio.unit._test_base import AioTestBase
29 from tests_aio.unit._test_server import start_test_server
30 
31 _UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall'
32 _UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep'
33 _STREAMING_OUTPUT_CALL_METHOD = '/grpc.testing.TestService/StreamingOutputCall'
34 
35 _INVOCATION_METADATA = (
36  ('x-grpc-test-echo-initial', 'initial-md-value'),
37  ('x-grpc-test-echo-trailing-bin', b'\x00\x02'),
38 )
39 
40 _NUM_STREAM_RESPONSES = 5
41 _REQUEST_PAYLOAD_SIZE = 7
42 _RESPONSE_PAYLOAD_SIZE = 42
43 
44 
46 
47  async def setUp(self):
48  self._server_target, self._server = await start_test_server()
49 
50  async def tearDown(self):
51  await self._server.stop(None)
52 
53  async def test_async_context(self):
54  async with aio.insecure_channel(self._server_target) as channel:
55  hi = channel.unary_unary(
56  _UNARY_CALL_METHOD,
57  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
58  response_deserializer=messages_pb2.SimpleResponse.FromString)
59  await hi(messages_pb2.SimpleRequest())
60 
61  async def test_unary_unary(self):
62  async with aio.insecure_channel(self._server_target) as channel:
63  hi = channel.unary_unary(
64  _UNARY_CALL_METHOD,
65  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
66  response_deserializer=messages_pb2.SimpleResponse.FromString)
67  response = await hi(messages_pb2.SimpleRequest())
68 
69  self.assertIsInstance(response, messages_pb2.SimpleResponse)
70 
71  async def test_unary_call_times_out(self):
72  async with aio.insecure_channel(self._server_target) as channel:
73  hi = channel.unary_unary(
74  _UNARY_CALL_METHOD_WITH_SLEEP,
75  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
76  response_deserializer=messages_pb2.SimpleResponse.FromString,
77  )
78 
79  with self.assertRaises(grpc.RpcError) as exception_context:
80  await hi(messages_pb2.SimpleRequest(),
81  timeout=UNARY_CALL_WITH_SLEEP_VALUE / 2)
82 
83  _, details = grpc.StatusCode.DEADLINE_EXCEEDED.value # pylint: disable=unused-variable
84  self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
85  exception_context.exception.code())
86  self.assertEqual(details.title(),
87  exception_context.exception.details())
88  self.assertIsNotNone(exception_context.exception.initial_metadata())
89  self.assertIsNotNone(
90  exception_context.exception.trailing_metadata())
91 
92  @unittest.skipIf(os.name == 'nt',
93  'TODO: https://github.com/grpc/grpc/issues/21658')
94  async def test_unary_call_does_not_times_out(self):
95  async with aio.insecure_channel(self._server_target) as channel:
96  hi = channel.unary_unary(
97  _UNARY_CALL_METHOD_WITH_SLEEP,
98  request_serializer=messages_pb2.SimpleRequest.SerializeToString,
99  response_deserializer=messages_pb2.SimpleResponse.FromString,
100  )
101 
102  call = hi(messages_pb2.SimpleRequest(),
103  timeout=UNARY_CALL_WITH_SLEEP_VALUE * 5)
104  self.assertEqual(await call.code(), grpc.StatusCode.OK)
105 
106  async def test_unary_stream(self):
107  channel = aio.insecure_channel(self._server_target)
108  stub = test_pb2_grpc.TestServiceStub(channel)
109 
110  # Prepares the request
112  for _ in range(_NUM_STREAM_RESPONSES):
113  request.response_parameters.append(
114  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
115 
116  # Invokes the actual RPC
117  call = stub.StreamingOutputCall(request)
118 
119  # Validates the responses
120  response_cnt = 0
121  async for response in call:
122  response_cnt += 1
123  self.assertIs(type(response),
124  messages_pb2.StreamingOutputCallResponse)
125  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
126 
127  self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
128  self.assertEqual(await call.code(), grpc.StatusCode.OK)
129  await channel.close()
130 
131  async def test_stream_unary_using_write(self):
132  channel = aio.insecure_channel(self._server_target)
133  stub = test_pb2_grpc.TestServiceStub(channel)
134 
135  # Invokes the actual RPC
136  call = stub.StreamingInputCall()
137 
138  # Prepares the request
139  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
140  request = messages_pb2.StreamingInputCallRequest(payload=payload)
141 
142  # Sends out requests
143  for _ in range(_NUM_STREAM_RESPONSES):
144  await call.write(request)
145  await call.done_writing()
146 
147  # Validates the responses
148  response = await call
149  self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
150  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
151  response.aggregated_payload_size)
152 
153  self.assertEqual(await call.code(), grpc.StatusCode.OK)
154  await channel.close()
155 
156  async def test_stream_unary_using_async_gen(self):
157  channel = aio.insecure_channel(self._server_target)
158  stub = test_pb2_grpc.TestServiceStub(channel)
159 
160  # Prepares the request
161  payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
162  request = messages_pb2.StreamingInputCallRequest(payload=payload)
163 
164  async def gen():
165  for _ in range(_NUM_STREAM_RESPONSES):
166  yield request
167 
168  # Invokes the actual RPC
169  call = stub.StreamingInputCall(gen())
170 
171  # Validates the responses
172  response = await call
173  self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
174  self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
175  response.aggregated_payload_size)
176 
177  self.assertEqual(await call.code(), grpc.StatusCode.OK)
178  await channel.close()
179 
180  async def test_stream_stream_using_read_write(self):
181  channel = aio.insecure_channel(self._server_target)
182  stub = test_pb2_grpc.TestServiceStub(channel)
183 
184  # Invokes the actual RPC
185  call = stub.FullDuplexCall()
186 
187  # Prepares the request
189  request.response_parameters.append(
190  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
191 
192  for _ in range(_NUM_STREAM_RESPONSES):
193  await call.write(request)
194  response = await call.read()
195  self.assertIsInstance(response,
196  messages_pb2.StreamingOutputCallResponse)
197  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
198 
199  await call.done_writing()
200 
201  self.assertEqual(grpc.StatusCode.OK, await call.code())
202  await channel.close()
203 
204  async def test_stream_stream_using_async_gen(self):
205  channel = aio.insecure_channel(self._server_target)
206  stub = test_pb2_grpc.TestServiceStub(channel)
207 
208  # Prepares the request
210  request.response_parameters.append(
211  messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
212 
213  async def gen():
214  for _ in range(_NUM_STREAM_RESPONSES):
215  yield request
216 
217  # Invokes the actual RPC
218  call = stub.FullDuplexCall(gen())
219 
220  async for response in call:
221  self.assertIsInstance(response,
222  messages_pb2.StreamingOutputCallResponse)
223  self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
224 
225  self.assertEqual(grpc.StatusCode.OK, await call.code())
226  await channel.close()
227 
228 
229 if __name__ == '__main__':
230  logging.basicConfig(level=logging.DEBUG)
231  unittest.main(verbosity=2)
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
grpc.RpcError
Definition: src/python/grpcio/grpc/__init__.py:302
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.channel_test.TestChannel
Definition: channel_test.py:45
gen
OPENSSL_EXPORT GENERAL_NAME * gen
Definition: x509v3.h:495
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
tests_aio.unit.channel_test.TestChannel.setUp
def setUp(self)
Definition: channel_test.py:47
tests_aio.unit.channel_test.TestChannel._server
_server
Definition: channel_test.py:48
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests.unit.framework.common
Definition: src/python/grpcio_tests/tests/unit/framework/common/__init__.py:1
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit._constants
Definition: _constants.py:1
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:44