remove_neg_test.py
Go to the documentation of this file.
1 # Copyright 2021 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 from typing import List
16 
17 from absl import flags
18 from absl.testing import absltest
19 
20 from framework import xds_k8s_testcase
21 from framework.infrastructure import k8s
22 from framework.test_app import server_app
23 
24 logger = logging.getLogger(__name__)
25 flags.adopt_module_key_flags(xds_k8s_testcase)
26 
27 # Type aliases
28 _XdsTestServer = xds_k8s_testcase.XdsTestServer
29 _XdsTestClient = xds_k8s_testcase.XdsTestClient
30 
31 
32 class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
33 
34  def setUp(self):
35  super().setUp()
36  self.alternate_server_runner = server_app.KubernetesServerRunner(
37  k8s.KubernetesNamespace(self.k8s_api_manager,
38  self.server_namespace),
39  deployment_name=self.server_name + '-alt',
40  image_name=self.server_image,
41  gcp_service_account=self.gcp_service_account,
42  td_bootstrap_image=self.td_bootstrap_image,
43  gcp_project=self.project,
44  gcp_api_manager=self.gcp_api_manager,
45  xds_server_uri=self.xds_server_uri,
46  network=self.network,
47  debug_use_port_forwarding=self.debug_use_port_forwarding,
48  reuse_namespace=True)
49 
50  def tearDown(self):
51  if hasattr(self, 'alternate_server_runner'):
53  super().tearDown()
54 
55  def test_remove_neg(self) -> None:
56  with self.subTest('00_create_health_check'):
57  self.td.create_health_check()
58 
59  with self.subTest('01_create_backend_services'):
60  self.td.create_backend_service()
61 
62  with self.subTest('02_create_url_map'):
63  self.td.create_url_map(self.server_xds_host, self.server_xds_port)
64 
65  with self.subTest('03_create_target_proxy'):
66  self.td.create_target_proxy()
67 
68  with self.subTest('04_create_forwarding_rule'):
69  self.td.create_forwarding_rule(self.server_xds_port)
70 
71  default_test_servers: List[_XdsTestServer]
72  same_zone_test_servers: List[_XdsTestServer]
73  with self.subTest('05_start_test_servers'):
74  default_test_servers = self.startTestServers()
75  same_zone_test_servers = self.startTestServers(
76  server_runner=self.alternate_server_runner)
77 
78  with self.subTest('06_add_server_backends_to_backend_services'):
79  self.setupServerBackends()
80  self.setupServerBackends(server_runner=self.alternate_server_runner)
81 
82  test_client: _XdsTestClient
83  with self.subTest('07_start_test_client'):
84  test_client = self.startTestClient(default_test_servers[0])
85 
86  with self.subTest('08_test_client_xds_config_exists'):
87  self.assertXdsConfigExists(test_client)
88 
89  with self.subTest('09_test_server_received_rpcs_from_test_client'):
90  self.assertSuccessfulRpcs(test_client)
91 
92  with self.subTest('10_remove_neg'):
93  self.assertRpcsEventuallyGoToGivenServers(
94  test_client, default_test_servers + same_zone_test_servers)
95  self.removeServerBackends(
96  server_runner=self.alternate_server_runner)
97  self.assertRpcsEventuallyGoToGivenServers(test_client,
98  default_test_servers)
99 
100 
101 if __name__ == '__main__':
102  absltest.main(failfast=True)
tests.remove_neg_test.RemoveNegTest
Definition: remove_neg_test.py:32
tests.remove_neg_test.RemoveNegTest.test_remove_neg
None test_remove_neg(self)
Definition: remove_neg_test.py:55
tests.remove_neg_test.RemoveNegTest.setUp
def setUp(self)
Definition: remove_neg_test.py:34
run_xds_tests.create_target_proxy
def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None)
Definition: run_xds_tests.py:2618
run_xds_tests.create_url_map
def create_url_map(gcp, name, backend_service, host_name)
Definition: run_xds_tests.py:2582
tests.remove_neg_test.RemoveNegTest.alternate_server_runner
alternate_server_runner
Definition: remove_neg_test.py:36
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
tests.remove_neg_test.RemoveNegTest.tearDown
def tearDown(self)
Definition: remove_neg_test.py:50
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
cleanup
Definition: cleanup.py:1
run_xds_tests.create_health_check
def create_health_check(gcp, name)
Definition: run_xds_tests.py:2515


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:08