14 """Tests behavior around the Core channel arguments.""" 
   26 from src.proto.grpc.testing 
import messages_pb2
 
   27 from src.proto.grpc.testing 
import test_pb2_grpc
 
   34 _ENABLE_REUSE_PORT = 
'SO_REUSEPORT enabled' 
   35 _DISABLE_REUSE_PORT = 
'SO_REUSEPORT disabled' 
   36 _SOCKET_OPT_SO_REUSEPORT = 
'grpc.so_reuseport' 
   38     (_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)),
 
   39     (_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)),
 
   42 _NUM_SERVER_CREATED = 5
 
   44 _GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 
'grpc.max_receive_message_length' 
   45 _MAX_MESSAGE_LENGTH = 1024
 
   47 _ADDRESS_TOKEN_ERRNO = errno.EADDRINUSE, errno.ENOSR
 
   56 _TEST_CHANNEL_ARGS = (
 
   57     (
'arg1', b
'bytes_val'),
 
   64 _INVALID_TEST_CHANNEL_ARGS = [
 
   74     port = server.add_insecure_port(
'localhost:0')
 
   78         with common.bound_socket(
 
   79                 bind_address=
'localhost',
 
   82         ) 
as (unused_host, bound_port):
 
   83             assert bound_port == port
 
   85         if e.errno 
in _ADDRESS_TOKEN_ERRNO:
 
   97         random.seed(_RANDOM_SEED)
 
   99     @unittest.skipIf(platform.system() == 
'Windows',
 
  100                      'SO_REUSEPORT only available in Linux-like OS.')
 
  101     @unittest.skipIf(
'aarch64' in platform.machine(),
 
  102                      'SO_REUSEPORT needs to be enabled in Core\'s port.h.')
 
  105         async 
def test_body():
 
  106             fact, options = random.choice(_OPTIONS)
 
  107             server = aio.server(options=options)
 
  110                 if fact == _ENABLE_REUSE_PORT 
and not result:
 
  112                         'Enabled reuse port in options, but not observed in socket' 
  114                 elif fact == _DISABLE_REUSE_PORT 
and result:
 
  116                         'Disabled reuse port in options, but observed in socket' 
  119                 await server.stop(
None)
 
  122         await asyncio.gather(*(test_body() 
for _ 
in range(_NUM_SERVER_CREATED)))
 
  126         channel = aio.insecure_channel(
'[::]:0', options=_TEST_CHANNEL_ARGS)
 
  127         await channel.close()
 
  131         server = aio.server(options=_TEST_CHANNEL_ARGS)
 
  132         await server.stop(
None)
 
  135         for invalid_arg 
in _INVALID_TEST_CHANNEL_ARGS:
 
  136             self.assertRaises((ValueError, TypeError),
 
  137                               aio.insecure_channel,
 
  144         async 
with aio.insecure_channel(
 
  146                 options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
 
  147                           _MAX_MESSAGE_LENGTH),)) 
as channel:
 
  148             stub = test_pb2_grpc.TestServiceStub(channel)
 
  152             request.response_parameters.append(
 
  155             request.response_parameters.append(
 
  158             call = stub.StreamingOutputCall(request)
 
  160             response = await call.read()
 
  161             self.assertEqual(_MAX_MESSAGE_LENGTH // 2,
 
  162                              len(response.payload.body))
 
  164             with self.assertRaises(aio.AioRpcError) 
as exception_context:
 
  166             rpc_error = exception_context.exception
 
  167             self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED,
 
  169             self.assertIn(
str(_MAX_MESSAGE_LENGTH), rpc_error.details())
 
  171             self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await
 
  174         await server.stop(
None)
 
  177 if __name__ == 
'__main__':
 
  178     logging.basicConfig(level=logging.DEBUG)
 
  179     unittest.main(verbosity=2)