local_interop_test.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Conducts interop tests locally."""
15 
16 import logging
17 import unittest
18 
19 import grpc
20 from grpc.experimental import aio
21 
22 from src.proto.grpc.testing import test_pb2_grpc
23 from tests.interop import resources
24 from tests_aio.interop import methods
25 from tests_aio.unit._test_base import AioTestBase
26 from tests_aio.unit._test_server import start_test_server
27 
28 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
29 
30 
32  """Unit test methods.
33 
34  This class must be mixed in with unittest.TestCase and a class that defines
35  setUp and tearDown methods that manage a stub attribute.
36  """
37  _stub: test_pb2_grpc.TestServiceStub
38 
39  async def test_empty_unary(self):
40  await methods.test_interoperability(methods.TestCase.EMPTY_UNARY,
41  self._stub, None)
42 
43  async def test_large_unary(self):
44  await methods.test_interoperability(methods.TestCase.LARGE_UNARY,
45  self._stub, None)
46 
47  async def test_server_streaming(self):
48  await methods.test_interoperability(methods.TestCase.SERVER_STREAMING,
49  self._stub, None)
50 
51  async def test_client_streaming(self):
52  await methods.test_interoperability(methods.TestCase.CLIENT_STREAMING,
53  self._stub, None)
54 
55  async def test_ping_pong(self):
56  await methods.test_interoperability(methods.TestCase.PING_PONG,
57  self._stub, None)
58 
59  async def test_cancel_after_begin(self):
60  await methods.test_interoperability(methods.TestCase.CANCEL_AFTER_BEGIN,
61  self._stub, None)
62 
63  async def test_cancel_after_first_response(self):
64  await methods.test_interoperability(
65  methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE, self._stub, None)
66 
67  async def test_timeout_on_sleeping_server(self):
68  await methods.test_interoperability(
69  methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None)
70 
71  async def test_empty_stream(self):
72  await methods.test_interoperability(methods.TestCase.EMPTY_STREAM,
73  self._stub, None)
74 
75  async def test_status_code_and_message(self):
76  await methods.test_interoperability(
77  methods.TestCase.STATUS_CODE_AND_MESSAGE, self._stub, None)
78 
79  async def test_unimplemented_method(self):
80  await methods.test_interoperability(
81  methods.TestCase.UNIMPLEMENTED_METHOD, self._stub, None)
82 
83  async def test_unimplemented_service(self):
84  await methods.test_interoperability(
85  methods.TestCase.UNIMPLEMENTED_SERVICE, self._stub, None)
86 
87  async def test_custom_metadata(self):
88  await methods.test_interoperability(methods.TestCase.CUSTOM_METADATA,
89  self._stub, None)
90 
91  async def test_special_status_message(self):
92  await methods.test_interoperability(
93  methods.TestCase.SPECIAL_STATUS_MESSAGE, self._stub, None)
94 
95 
97 
98  async def setUp(self):
99  address, self._server = await start_test_server()
100  self._channel = aio.insecure_channel(address)
101  self._stub = test_pb2_grpc.TestServiceStub(self._channel)
102 
103  async def tearDown(self):
104  await self._channel.close()
105  await self._server.stop(None)
106 
107 
109 
110  async def setUp(self):
111  server_credentials = grpc.ssl_server_credentials([
112  (resources.private_key(), resources.certificate_chain())
113  ])
114  channel_credentials = grpc.ssl_channel_credentials(
115  resources.test_root_certificates())
116  channel_options = ((
117  'grpc.ssl_target_name_override',
118  _SERVER_HOST_OVERRIDE,
119  ),)
120 
121  address, self._server = await start_test_server(
122  secure=True, server_credentials=server_credentials)
123  self._channel = aio.secure_channel(address, channel_credentials,
124  channel_options)
125  self._stub = test_pb2_grpc.TestServiceStub(self._channel)
126 
127  async def tearDown(self):
128  await self._channel.close()
129  await self._server.stop(None)
130 
131 
132 if __name__ == '__main__':
133  logging.basicConfig(level=logging.INFO)
134  unittest.main(verbosity=2)
run_xds_tests.test_ping_pong
def test_ping_pong(gcp, backend_service, instance_group)
Definition: run_xds_tests.py:795
tests_aio.interop.local_interop_test.SecureLocalInteropTest._server
_server
Definition: local_interop_test.py:121
tests_aio.interop.local_interop_test.InteropTestCaseMixin
Definition: local_interop_test.py:31
tests_aio.unit._test_server
Definition: tests_aio/unit/_test_server.py:1
tests_aio.unit._test_server.start_test_server
def start_test_server(port=0, secure=False, server_credentials=None, interceptors=None)
Definition: tests_aio/unit/_test_server.py:128
tests_aio.interop.local_interop_test.InteropTestCaseMixin.test_empty_unary
def test_empty_unary(self)
Definition: local_interop_test.py:39
tests_aio.unit._test_base
Definition: _test_base.py:1
tests_aio.interop.local_interop_test.SecureLocalInteropTest._channel
_channel
Definition: local_interop_test.py:123
test_server_streaming
static void test_server_streaming(grpc_end2end_test_config config, int num_messages)
Definition: server_streaming.cc:89
grpc::experimental
Definition: include/grpcpp/channel.h:46
grpc.ssl_server_credentials
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False)
Definition: src/python/grpcio/grpc/__init__.py:1709
tests_aio.interop.local_interop_test.InsecureLocalInteropTest._channel
_channel
Definition: local_interop_test.py:100
close
#define close
Definition: test-fs.c:48
tests_aio.interop.local_interop_test.InsecureLocalInteropTest
Definition: local_interop_test.py:96
tests_aio.interop.local_interop_test.SecureLocalInteropTest._stub
_stub
Definition: local_interop_test.py:125
tests_aio.interop.local_interop_test.InsecureLocalInteropTest._stub
_stub
Definition: local_interop_test.py:101
tests_aio.interop.local_interop_test.InsecureLocalInteropTest.setUp
def setUp(self)
Definition: local_interop_test.py:98
tests_aio.interop
Definition: src/python/grpcio_tests/tests_aio/interop/__init__.py:1
tests.interop
Definition: src/python/grpcio_tests/tests/interop/__init__.py:1
grpc.ssl_channel_credentials
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None)
Definition: src/python/grpcio/grpc/__init__.py:1607
tests_aio.interop.local_interop_test.InsecureLocalInteropTest._server
_server
Definition: local_interop_test.py:99
stop
static const char stop[]
Definition: benchmark-async-pummel.c:35
tests_aio.interop.local_interop_test.SecureLocalInteropTest
Definition: local_interop_test.py:108
test_client_streaming
static void test_client_streaming(grpc_end2end_test_config config, int messages)
Definition: client_streaming.cc:90
tests_aio.unit._test_base.AioTestBase
Definition: _test_base.py:49
tests_aio.interop.local_interop_test.SecureLocalInteropTest.setUp
def setUp(self)
Definition: local_interop_test.py:110


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:16