channel_argument_test.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests behavior around the Core channel arguments."""
15 
16 import asyncio
17 import errno
18 import logging
19 import platform
20 import random
21 import unittest
22 
23 import grpc
24 from grpc.experimental import aio
25 
26 from src.proto.grpc.testing import messages_pb2
27 from src.proto.grpc.testing import test_pb2_grpc
28 from tests.unit.framework import common
29 from tests_aio.unit._test_base import AioTestBase
30 from tests_aio.unit._test_server import start_test_server
31 
32 _RANDOM_SEED = 42
33 
34 _ENABLE_REUSE_PORT = 'SO_REUSEPORT enabled'
35 _DISABLE_REUSE_PORT = 'SO_REUSEPORT disabled'
36 _SOCKET_OPT_SO_REUSEPORT = 'grpc.so_reuseport'
37 _OPTIONS = (
38  (_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)),
39  (_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)),
40 )
41 
42 _NUM_SERVER_CREATED = 5
43 
44 _GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 'grpc.max_receive_message_length'
45 _MAX_MESSAGE_LENGTH = 1024
46 
47 _ADDRESS_TOKEN_ERRNO = errno.EADDRINUSE, errno.ENOSR
48 
49 
50 class _TestPointerWrapper(object):
51 
52  def __int__(self):
53  return 123456
54 
55 
56 _TEST_CHANNEL_ARGS = (
57  ('arg1', b'bytes_val'),
58  ('arg2', 'str_val'),
59  ('arg3', 1),
60  (b'arg4', 'str_val'),
61  ('arg6', _TestPointerWrapper()),
62 )
63 
64 _INVALID_TEST_CHANNEL_ARGS = [
65  {
66  'foo': 'bar'
67  },
68  (('key',),),
69  'str',
70 ]
71 
72 
73 async def test_if_reuse_port_enabled(server: aio.Server):
74  port = server.add_insecure_port('localhost:0')
75  await server.start()
76 
77  try:
78  with common.bound_socket(
79  bind_address='localhost',
80  port=port,
81  listen=False,
82  ) as (unused_host, bound_port):
83  assert bound_port == port
84  except OSError as e:
85  if e.errno in _ADDRESS_TOKEN_ERRNO:
86  return False
87  else:
88  logging.exception(e)
89  raise
90  else:
91  return True
92 
93 
95 
96  async def setUp(self):
97  random.seed(_RANDOM_SEED)
98 
99  @unittest.skipIf(platform.system() == 'Windows',
100  'SO_REUSEPORT only available in Linux-like OS.')
101  @unittest.skipIf('aarch64' in platform.machine(),
102  'SO_REUSEPORT needs to be enabled in Core\'s port.h.')
104 
105  async def test_body():
106  fact, options = random.choice(_OPTIONS)
107  server = aio.server(options=options)
108  try:
109  result = await test_if_reuse_port_enabled(server)
110  if fact == _ENABLE_REUSE_PORT and not result:
111  self.fail(
112  'Enabled reuse port in options, but not observed in socket'
113  )
114  elif fact == _DISABLE_REUSE_PORT and result:
115  self.fail(
116  'Disabled reuse port in options, but observed in socket'
117  )
118  finally:
119  await server.stop(None)
120 
121  # Creating a lot of servers concurrently
122  await asyncio.gather(*(test_body() for _ in range(_NUM_SERVER_CREATED)))
123 
124  async def test_client(self):
125  # Do not segfault, or raise exception!
126  channel = aio.insecure_channel('[::]:0', options=_TEST_CHANNEL_ARGS)
127  await channel.close()
128 
129  async def test_server(self):
130  # Do not segfault, or raise exception!
131  server = aio.server(options=_TEST_CHANNEL_ARGS)
132  await server.stop(None)
133 
134  async def test_invalid_client_args(self):
135  for invalid_arg in _INVALID_TEST_CHANNEL_ARGS:
136  self.assertRaises((ValueError, TypeError),
137  aio.insecure_channel,
138  '[::]:0',
139  options=invalid_arg)
140 
142  address, server = await start_test_server()
143 
144  async with aio.insecure_channel(
145  address,
146  options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
147  _MAX_MESSAGE_LENGTH),)) as channel:
148  stub = test_pb2_grpc.TestServiceStub(channel)
149 
151  # First request will pass
152  request.response_parameters.append(
153  messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH // 2,))
154  # Second request should fail
155  request.response_parameters.append(
156  messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH * 2,))
157 
158  call = stub.StreamingOutputCall(request)
159 
160  response = await call.read()
161  self.assertEqual(_MAX_MESSAGE_LENGTH // 2,
162  len(response.payload.body))
163 
164  with self.assertRaises(aio.AioRpcError) as exception_context:
165  await call.read()
166  rpc_error = exception_context.exception
167  self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED,
168  rpc_error.code())
169  self.assertIn(str(_MAX_MESSAGE_LENGTH), rpc_error.details())
170 
171  self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await
172  call.code())
173 
174  await server.stop(None)
175 
176 
177 if __name__ == '__main__':
178  logging.basicConfig(level=logging.DEBUG)
179  unittest.main(verbosity=2)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
tests.unit.framework
Definition: src/python/grpcio_tests/tests/unit/framework/__init__.py:1
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.unit.channel_argument_test._TestPointerWrapper.__int__
def __int__(self)
Definition: channel_argument_test.py:52
tests_aio.unit.channel_argument_test.test_if_reuse_port_enabled
def test_if_reuse_port_enabled(aio.Server server)
Definition: channel_argument_test.py:73
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.unit.channel_argument_test.TestChannelArgument.test_client
def test_client(self)
Definition: channel_argument_test.py:124
tests_aio.unit.channel_argument_test.TestChannelArgument.test_invalid_client_args
def test_invalid_client_args(self)
Definition: channel_argument_test.py:134
messages_pb2.ResponseParameters
ResponseParameters
Definition: messages_pb2.py:625
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
tests_aio.unit.channel_argument_test.TestChannelArgument.test_max_message_length_applied
def test_max_message_length_applied(self)
Definition: channel_argument_test.py:141
tests_aio.unit.channel_argument_test._TestPointerWrapper
Definition: channel_argument_test.py:50
tests_aio.unit.channel_argument_test.TestChannelArgument.test_server_so_reuse_port_is_set_properly
def test_server_so_reuse_port_is_set_properly(self)
Definition: channel_argument_test.py:103
tests_aio.unit.channel_argument_test.TestChannelArgument.setUp
def setUp(self)
Definition: channel_argument_test.py:96
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
tests_aio.unit.channel_argument_test.TestChannelArgument.test_server
def test_server(self)
Definition: channel_argument_test.py:129
tests_aio.unit.channel_argument_test.TestChannelArgument
Definition: channel_argument_test.py:94
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:43