16 from absl
import flags
17 from absl.testing
import absltest
19 from framework
import xds_k8s_testcase
21 logger = logging.getLogger(__name__)
22 flags.adopt_module_key_flags(xds_k8s_testcase)
24 _XdsTestServer = xds_k8s_testcase.XdsTestServer
25 _XdsTestClient = xds_k8s_testcase.XdsTestClient
28 class AppNetTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase):
31 with self.subTest(
'0_create_health_check'):
34 with self.subTest(
'1_create_backend_service'):
35 self.td.create_backend_service()
37 with self.subTest(
'2_create_mesh'):
40 with self.subTest(
'3_create_grpc_route'):
41 self.td.create_grpc_route(self.server_xds_host,
44 test_server: _XdsTestServer
45 with self.subTest(
'4_start_test_server'):
46 test_server = self.startTestServers(replica_count=1)[0]
48 with self.subTest(
'5_setup_server_backends'):
49 self.setupServerBackends()
51 test_client: _XdsTestClient
52 with self.subTest(
'6_start_test_client'):
53 test_client = self.startTestClient(test_server,
54 config_mesh=self.td.mesh.name)
56 with self.subTest(
'7_assert_xds_config_exists'):
57 self.assertXdsConfigExists(test_client)
59 with self.subTest(
'8_assert_successful_rpcs'):
60 self.assertSuccessfulRpcs(test_client)
63 if __name__ ==
'__main__':
64 absltest.main(failfast=
True)