17 TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
22 from typing
import Iterator, List, Optional
31 logger = logging.getLogger(__name__)
34 _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
35 _XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient
36 _HealthClient = grpc_testing.HealthClient
41 Represents RPC services implemented in Server component of the xDS test app.
42 https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
49 maintenance_port: Optional[int] =
None,
50 secure_mode: Optional[bool] =
False,
51 server_id: Optional[str] =
None,
52 xds_host: Optional[str] =
None,
53 xds_port: Optional[int] =
None,
54 rpc_host: Optional[str] =
None,
55 pod_name: Optional[str] =
None):
56 super().
__init__(rpc_host=(rpc_host
or ip))
62 self.xds_host, self.
xds_port = xds_host, xds_port
66 @functools.lru_cache(
None)
71 @functools.lru_cache(
None)
77 @functools.lru_cache(
None)
82 logger.info(
'Setting health status to serving')
84 logger.info(
'Server reports %s', self.
health_client.check_health())
87 logger.info(
'Setting health status to not serving')
89 logger.info(
'Server reports %s', self.
health_client.check_health())
92 self.xds_host, self.
xds_port = xds_host, xds_port
100 return f
'{self.xds_host}:{self.xds_port}'
104 if not self.xds_host:
106 return f
'xds:///{self.xds_address}'
109 """Return channelz representation of a server running TestService.
112 GrpcApp.NotFound: Test server not found.
117 f
'Server listening on port {self.rpc_port} not found')
121 """List all sockets of the test server.
124 GrpcApp.NotFound: Test server not found.
127 return self.
channelz.list_server_sockets(server)
130 client_socket: grpc_channelz.Socket):
131 """Find test server socket that matches given test client socket.
133 Sockets are matched using TCP endpoints (ip:port), further on "address".
134 Server socket remote address matched with client socket local address.
137 GrpcApp.NotFound: Server socket matching client socket not found.
139 client_local = self.
channelz.sock_address_to_str(client_socket.local)
140 logger.debug(
'Looking for a server socket connected to the client %s',
143 server_socket = self.
channelz.find_server_socket_matching_client(
145 if not server_socket:
147 f
'Server socket to client {client_local} not found')
149 logger.info(
'Found matching socket pair: server(%s) <-> client(%s)',
150 self.
channelz.sock_addresses_pretty(server_socket),
151 self.
channelz.sock_addresses_pretty(client_socket))
156 DEFAULT_TEST_PORT = 8080
157 DEFAULT_MAINTENANCE_PORT = 8080
158 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
167 gcp_api_manager: gcp.api.GcpApiManager,
169 gcp_service_account: str,
170 service_account_name=
None,
175 deployment_template=
'server.deployment.yaml',
176 service_account_template=
'service-account.yaml',
177 service_template=
'server.service.yaml',
179 reuse_namespace=
False,
180 namespace_template=
None,
181 debug_use_port_forwarding=
False,
182 enable_workload_identity=
True):
183 super().
__init__(k8s_namespace, namespace_template, reuse_namespace)
194 self.
neg_name = neg_name
or (f
'{self.k8s_namespace.name}-'
195 f
'{self.service_name}')
218 self.
gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
221 self.
deployment: Optional[k8s.V1Deployment] =
None
223 self.
service: Optional[k8s.V1Service] =
None
229 test_port=DEFAULT_TEST_PORT,
230 maintenance_port=None,
233 replica_count=1) -> List[XdsTestServer]:
238 if maintenance_port
is None:
244 if secure_mode
and maintenance_port == test_port:
245 raise ValueError(
'port and maintenance_port must be different '
246 'when running test server in secure mode')
248 if not (isinstance(test_port, int)
and
249 isinstance(maintenance_port, int)):
250 raise TypeError(
'Port numbers must be integer')
253 raise ValueError(
'Secure mode requires Workload Identity enabled.')
256 'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
257 'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s',
259 maintenance_port, secure_mode, server_id, replica_count)
261 namespace_name=self.k8s_namespace.name,
276 namespace_name=self.k8s_namespace.name,
294 namespace_name=self.k8s_namespace.name,
302 namespace_name=self.k8s_namespace.name,
307 replica_count=replica_count,
309 maintenance_port=maintenance_port,
311 secure_mode=secure_mode)
317 pods = self.k8s_namespace.list_deployment_pods(self.
deployment)
321 pod_name = pod.metadata.name
324 pod_ip = pod.status.pod_ip
327 local_port = maintenance_port
329 logger.info(
'LOCAL DEV MODE: Enabling port forwarding to %s:%s',
330 pod_ip, maintenance_port)
331 port_forwarder = self.k8s_namespace.port_forward_pod(
332 pod, remote_port=maintenance_port)
334 local_port = port_forwarder.local_port
335 rpc_host = port_forwarder.local_address
340 maintenance_port=local_port,
341 secure_mode=secure_mode,
347 def cleanup(self, *, force=False, force_namespace=False):
350 port_forwarder.close()
365 super().
cleanup(force=(force_namespace
and force))
369 resource_prefix: str,
370 resource_suffix: str,
371 name: str =
'server') -> str:
372 """A helper to make consistent XdsTestServer kubernetes namespace name
373 for given resource prefix and suffix.
375 Note: the idea is to intentionally produce different namespace name for
376 the test server, and the test client, as that closely mimics real-world