15 from typing
import Optional
17 from framework
import xds_k8s_testcase
24 logger = logging.getLogger(__name__)
27 TrafficDirectorManager = traffic_director.TrafficDirectorManager
28 XdsTestServer = xds_k8s_testcase.XdsTestServer
29 XdsTestClient = xds_k8s_testcase.XdsTestClient
30 KubernetesServerRunner = server_app.KubernetesServerRunner
31 KubernetesClientRunner = client_app.KubernetesClientRunner
35 """Common functionality to support testing of bootstrap generator versions
36 across gRPC clients and servers."""
40 """Hook method for setting up class fixture before running tests in
46 KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT
56 logger.info(
'Test run resource prefix: %s, suffix: %s',
70 cls.
td.create_firewall_rule(
107 td_bootstrap_image: Optional[str] =
None) -> KubernetesServerRunner:
108 if not td_bootstrap_image:
114 td_bootstrap_image=td_bootstrap_image,
130 **kwargs) -> XdsTestServer:
131 test_server = server_runner.run(replica_count=replica_count,
133 maintenance_port=maintenance_port,
135 test_server.set_xds_address(xds_host, xds_port)
140 td_bootstrap_image: Optional[str] =
None) -> KubernetesClientRunner:
141 if not td_bootstrap_image:
148 td_bootstrap_image=td_bootstrap_image,
160 **kwargs) -> XdsTestClient:
161 test_client = self.client_runner.
run(server_target=test_server.xds_uri,
163 test_client.wait_for_active_server_channel()