src/python/grpcio_tests/tests_aio/interop/client.py
Go to the documentation of this file.
1 # Copyright 2019 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import argparse
16 import asyncio
17 import logging
18 import os
19 
20 import grpc
21 from grpc.experimental import aio
22 
23 from tests.interop import client as interop_client_lib
24 from tests_aio.interop import methods
25 
26 _LOGGER = logging.getLogger(__name__)
27 _LOGGER.setLevel(logging.DEBUG)
28 
29 
30 def _create_channel(args):
31  target = f'{args.server_host}:{args.server_port}'
32 
33  if args.use_tls or args.use_alts or args.custom_credentials_type is not None:
34  channel_credentials, options = interop_client_lib.get_secure_channel_parameters(
35  args)
36  return aio.secure_channel(target, channel_credentials, options)
37  else:
38  return aio.insecure_channel(target)
39 
40 
41 def _test_case_from_arg(test_case_arg):
42  for test_case in methods.TestCase:
43  if test_case_arg == test_case.value:
44  return test_case
45  else:
46  raise ValueError('No test case "%s"!' % test_case_arg)
47 
48 
50 
51  args = interop_client_lib.parse_interop_client_args()
52  channel = _create_channel(args)
53  stub = interop_client_lib.create_stub(channel, args)
54  test_case = _test_case_from_arg(args.test_case)
55  await methods.test_interoperability(test_case, stub, args)
56 
57 
58 if __name__ == '__main__':
59  logging.basicConfig(level=logging.DEBUG)
60  asyncio.get_event_loop().set_debug(True)
61  asyncio.get_event_loop().run_until_complete(test_interoperability())
tests_aio.interop.methods.TestCase
Definition: aio/interop/methods.py:398
grpc::experimental
Definition: include/grpcpp/channel.h:46
tests_aio.interop.client._create_channel
def _create_channel(args)
Definition: src/python/grpcio_tests/tests_aio/interop/client.py:30
tests_aio.interop.client.test_interoperability
def test_interoperability()
Definition: src/python/grpcio_tests/tests_aio/interop/client.py:49
tests_aio.interop
Definition: src/python/grpcio_tests/tests_aio/interop/__init__.py:1
tests.interop
Definition: src/python/grpcio_tests/tests/interop/__init__.py:1
tests_aio.interop.client._test_case_from_arg
def _test_case_from_arg(test_case_arg)
Definition: src/python/grpcio_tests/tests_aio/interop/client.py:41


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:54