14 """Tests behavior around the Core channel arguments."""
26 from src.proto.grpc.testing
import messages_pb2
27 from src.proto.grpc.testing
import test_pb2_grpc
34 _ENABLE_REUSE_PORT =
'SO_REUSEPORT enabled'
35 _DISABLE_REUSE_PORT =
'SO_REUSEPORT disabled'
36 _SOCKET_OPT_SO_REUSEPORT =
'grpc.so_reuseport'
38 (_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)),
39 (_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)),
42 _NUM_SERVER_CREATED = 5
44 _GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH =
'grpc.max_receive_message_length'
45 _MAX_MESSAGE_LENGTH = 1024
47 _ADDRESS_TOKEN_ERRNO = errno.EADDRINUSE, errno.ENOSR
56 _TEST_CHANNEL_ARGS = (
57 (
'arg1', b
'bytes_val'),
64 _INVALID_TEST_CHANNEL_ARGS = [
74 port = server.add_insecure_port(
'localhost:0')
78 with common.bound_socket(
79 bind_address=
'localhost',
82 )
as (unused_host, bound_port):
83 assert bound_port == port
85 if e.errno
in _ADDRESS_TOKEN_ERRNO:
97 random.seed(_RANDOM_SEED)
99 @unittest.skipIf(platform.system() ==
'Windows',
100 'SO_REUSEPORT only available in Linux-like OS.')
101 @unittest.skipIf(
'aarch64' in platform.machine(),
102 'SO_REUSEPORT needs to be enabled in Core\'s port.h.')
105 async
def test_body():
106 fact, options = random.choice(_OPTIONS)
107 server = aio.server(options=options)
110 if fact == _ENABLE_REUSE_PORT
and not result:
112 'Enabled reuse port in options, but not observed in socket'
114 elif fact == _DISABLE_REUSE_PORT
and result:
116 'Disabled reuse port in options, but observed in socket'
119 await server.stop(
None)
122 await asyncio.gather(*(test_body()
for _
in range(_NUM_SERVER_CREATED)))
126 channel = aio.insecure_channel(
'[::]:0', options=_TEST_CHANNEL_ARGS)
127 await channel.close()
131 server = aio.server(options=_TEST_CHANNEL_ARGS)
132 await server.stop(
None)
135 for invalid_arg
in _INVALID_TEST_CHANNEL_ARGS:
136 self.assertRaises((ValueError, TypeError),
137 aio.insecure_channel,
144 async
with aio.insecure_channel(
146 options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
147 _MAX_MESSAGE_LENGTH),))
as channel:
148 stub = test_pb2_grpc.TestServiceStub(channel)
152 request.response_parameters.append(
155 request.response_parameters.append(
158 call = stub.StreamingOutputCall(request)
160 response = await call.read()
161 self.assertEqual(_MAX_MESSAGE_LENGTH // 2,
162 len(response.payload.body))
164 with self.assertRaises(aio.AioRpcError)
as exception_context:
166 rpc_error = exception_context.exception
167 self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED,
169 self.assertIn(
str(_MAX_MESSAGE_LENGTH), rpc_error.details())
171 self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await
174 await server.stop(
None)
177 if __name__ ==
'__main__':
178 logging.basicConfig(level=logging.DEBUG)
179 unittest.main(verbosity=2)