server_app.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """
15 xDS Test Server.
16 
17 TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
18 modules.
19 """
20 import functools
21 import logging
22 from typing import Iterator, List, Optional
23 
24 from framework.infrastructure import gcp
25 from framework.infrastructure import k8s
26 import framework.rpc
27 from framework.rpc import grpc_channelz
28 from framework.rpc import grpc_testing
29 from framework.test_app import base_runner
30 
31 logger = logging.getLogger(__name__)
32 
33 # Type aliases
34 _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
35 _XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient
36 _HealthClient = grpc_testing.HealthClient
37 
38 
40  """
41  Represents RPC services implemented in Server component of the xDS test app.
42  https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
43  """
44 
45  def __init__(self,
46  *,
47  ip: str,
48  rpc_port: int,
49  maintenance_port: Optional[int] = None,
50  secure_mode: Optional[bool] = False,
51  server_id: Optional[str] = None,
52  xds_host: Optional[str] = None,
53  xds_port: Optional[int] = None,
54  rpc_host: Optional[str] = None,
55  pod_name: Optional[str] = None):
56  super().__init__(rpc_host=(rpc_host or ip))
57  self.ip = ip
58  self.rpc_port = rpc_port
59  self.maintenance_port = maintenance_port or rpc_port
60  self.secure_mode = secure_mode
61  self.server_id = server_id
62  self.xds_host, self.xds_port = xds_host, xds_port
63  self.pod_name = pod_name
64 
65  @property
66  @functools.lru_cache(None)
67  def channelz(self) -> _ChannelzServiceClient:
69 
70  @property
71  @functools.lru_cache(None)
72  def update_health_service_client(self) -> _XdsUpdateHealthServiceClient:
75 
76  @property
77  @functools.lru_cache(None)
78  def health_client(self) -> _HealthClient:
80 
81  def set_serving(self):
82  logger.info('Setting health status to serving')
84  logger.info('Server reports %s', self.health_client.check_health())
85 
86  def set_not_serving(self):
87  logger.info('Setting health status to not serving')
89  logger.info('Server reports %s', self.health_client.check_health())
90 
91  def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
92  self.xds_host, self.xds_port = xds_host, xds_port
93 
94  @property
95  def xds_address(self) -> str:
96  if not self.xds_host:
97  return ''
98  if not self.xds_port:
99  return self.xds_host
100  return f'{self.xds_host}:{self.xds_port}'
101 
102  @property
103  def xds_uri(self) -> str:
104  if not self.xds_host:
105  return ''
106  return f'xds:///{self.xds_address}'
107 
108  def get_test_server(self) -> grpc_channelz.Server:
109  """Return channelz representation of a server running TestService.
110 
111  Raises:
112  GrpcApp.NotFound: Test server not found.
113  """
114  server = self.channelz.find_server_listening_on_port(self.rpc_port)
115  if not server:
116  raise self.NotFound(
117  f'Server listening on port {self.rpc_port} not found')
118  return server
119 
120  def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]:
121  """List all sockets of the test server.
122 
123  Raises:
124  GrpcApp.NotFound: Test server not found.
125  """
126  server = self.get_test_server()
127  return self.channelz.list_server_sockets(server)
128 
130  client_socket: grpc_channelz.Socket):
131  """Find test server socket that matches given test client socket.
132 
133  Sockets are matched using TCP endpoints (ip:port), further on "address".
134  Server socket remote address matched with client socket local address.
135 
136  Raises:
137  GrpcApp.NotFound: Server socket matching client socket not found.
138  """
139  client_local = self.channelz.sock_address_to_str(client_socket.local)
140  logger.debug('Looking for a server socket connected to the client %s',
141  client_local)
142 
143  server_socket = self.channelz.find_server_socket_matching_client(
144  self.get_test_server_sockets(), client_socket)
145  if not server_socket:
146  raise self.NotFound(
147  f'Server socket to client {client_local} not found')
148 
149  logger.info('Found matching socket pair: server(%s) <-> client(%s)',
150  self.channelz.sock_addresses_pretty(server_socket),
151  self.channelz.sock_addresses_pretty(client_socket))
152  return server_socket
153 
154 
156  DEFAULT_TEST_PORT = 8080
157  DEFAULT_MAINTENANCE_PORT = 8080
158  DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
159 
160  def __init__( # pylint: disable=too-many-locals
161  self,
162  k8s_namespace,
163  *,
164  deployment_name,
165  image_name,
166  td_bootstrap_image,
167  gcp_api_manager: gcp.api.GcpApiManager,
168  gcp_project: str,
169  gcp_service_account: str,
170  service_account_name=None,
171  service_name=None,
172  neg_name=None,
173  xds_server_uri=None,
174  network='default',
175  deployment_template='server.deployment.yaml',
176  service_account_template='service-account.yaml',
177  service_template='server.service.yaml',
178  reuse_service=False,
179  reuse_namespace=False,
180  namespace_template=None,
181  debug_use_port_forwarding=False,
182  enable_workload_identity=True):
183  super().__init__(k8s_namespace, namespace_template, reuse_namespace)
184 
185  # Settings
186  self.deployment_name = deployment_name
187  self.image_name = image_name
188  self.service_name = service_name or deployment_name
189  # xDS bootstrap generator
190  self.td_bootstrap_image = td_bootstrap_image
191  self.xds_server_uri = xds_server_uri
192  # This only works in k8s >= 1.18.10-gke.600
193  # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs
194  self.neg_name = neg_name or (f'{self.k8s_namespace.name}-'
195  f'{self.service_name}')
196  self.network = network
197  self.deployment_template = deployment_template
198  self.service_template = service_template
199  self.reuse_service = reuse_service
200  self.debug_use_port_forwarding = debug_use_port_forwarding
201  self.enable_workload_identity = enable_workload_identity
202  # Service account settings:
203  # Kubernetes service account
204  if self.enable_workload_identity:
205  self.service_account_name = service_account_name or deployment_name
206  self.service_account_template = service_account_template
207  else:
208  self.service_account_name = None
209  self.service_account_template = None
210 
211  # GCP.
212  self.gcp_project = gcp_project
213  self.gcp_ui_url = gcp_api_manager.gcp_ui_url
214  # GCP service account to map to Kubernetes service account
215  self.gcp_service_account = gcp_service_account
216  # GCP IAM API used to grant allow workload service accounts permission
217  # to use GCP service account identity.
218  self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
219 
220  # Mutable state
221  self.deployment: Optional[k8s.V1Deployment] = None
222  self.service_account: Optional[k8s.V1ServiceAccount] = None
223  self.service: Optional[k8s.V1Service] = None
224  self.port_forwarders: List[k8s.PortForwarder] = []
225 
226  def run( # pylint: disable=arguments-differ
227  self,
228  *,
229  test_port=DEFAULT_TEST_PORT,
230  maintenance_port=None,
231  secure_mode=False,
232  server_id=None,
233  replica_count=1) -> List[XdsTestServer]:
234  # Implementation detail: in secure mode, maintenance ("backchannel")
235  # port must be different from the test port so communication with
236  # maintenance services can be reached independently from the security
237  # configuration under test.
238  if maintenance_port is None:
239  if not secure_mode:
240  maintenance_port = self.DEFAULT_MAINTENANCE_PORT
241  else:
242  maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
243 
244  if secure_mode and maintenance_port == test_port:
245  raise ValueError('port and maintenance_port must be different '
246  'when running test server in secure mode')
247  # To avoid bugs with comparing wrong types.
248  if not (isinstance(test_port, int) and
249  isinstance(maintenance_port, int)):
250  raise TypeError('Port numbers must be integer')
251 
252  if secure_mode and not self.enable_workload_identity:
253  raise ValueError('Secure mode requires Workload Identity enabled.')
254 
255  logger.info(
256  'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
257  'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s',
258  self.deployment_name, self.k8s_namespace.name, test_port,
259  maintenance_port, secure_mode, server_id, replica_count)
260  self._logs_explorer_link(deployment_name=self.deployment_name,
261  namespace_name=self.k8s_namespace.name,
262  gcp_project=self.gcp_project,
263  gcp_ui_url=self.gcp_ui_url)
264 
265  # Create namespace.
266  super().run()
267 
268  # Reuse existing if requested, create a new deployment when missing.
269  # Useful for debugging to avoid NEG loosing relation to deleted service.
270  if self.reuse_service:
271  self.service = self._reuse_service(self.service_name)
272  if not self.service:
273  self.service = self._create_service(
274  self.service_template,
275  service_name=self.service_name,
276  namespace_name=self.k8s_namespace.name,
277  deployment_name=self.deployment_name,
278  neg_name=self.neg_name,
279  test_port=test_port)
280  self._wait_service_neg(self.service_name, test_port)
281 
282  if self.enable_workload_identity:
283  # Allow Kubernetes service account to use the GCP service account
284  # identity.
286  gcp_iam=self.gcp_iam,
287  gcp_service_account=self.gcp_service_account,
288  service_account_name=self.service_account_name)
289 
290  # Create service account
293  service_account_name=self.service_account_name,
294  namespace_name=self.k8s_namespace.name,
295  gcp_service_account=self.gcp_service_account)
296 
297  # Always create a new deployment
298  self.deployment = self._create_deployment(
299  self.deployment_template,
300  deployment_name=self.deployment_name,
301  image_name=self.image_name,
302  namespace_name=self.k8s_namespace.name,
303  service_account_name=self.service_account_name,
304  td_bootstrap_image=self.td_bootstrap_image,
305  xds_server_uri=self.xds_server_uri,
306  network=self.network,
307  replica_count=replica_count,
308  test_port=test_port,
309  maintenance_port=maintenance_port,
310  server_id=server_id,
311  secure_mode=secure_mode)
312 
314  replica_count)
315 
316  # Wait for pods running
317  pods = self.k8s_namespace.list_deployment_pods(self.deployment)
318 
319  servers = []
320  for pod in pods:
321  pod_name = pod.metadata.name
322  self._wait_pod_started(pod_name)
323 
324  pod_ip = pod.status.pod_ip
325  rpc_host = None
326  # Experimental, for local debugging.
327  local_port = maintenance_port
329  logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
330  pod_ip, maintenance_port)
331  port_forwarder = self.k8s_namespace.port_forward_pod(
332  pod, remote_port=maintenance_port)
333  self.port_forwarders.append(port_forwarder)
334  local_port = port_forwarder.local_port
335  rpc_host = port_forwarder.local_address
336 
337  servers.append(
338  XdsTestServer(ip=pod_ip,
339  rpc_port=test_port,
340  maintenance_port=local_port,
341  secure_mode=secure_mode,
342  server_id=server_id,
343  rpc_host=rpc_host,
344  pod_name=pod_name))
345  return servers
346 
347  def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
348  if self.port_forwarders:
349  for port_forwarder in self.port_forwarders:
350  port_forwarder.close()
351  self.port_forwarders = []
352  if self.deployment or force:
354  self.deployment = None
355  if (self.service and not self.reuse_service) or force:
356  self._delete_service(self.service_name)
357  self.service = None
358  if self.enable_workload_identity and (self.service_account or force):
360  gcp_iam=self.gcp_iam,
361  gcp_service_account=self.gcp_service_account,
362  service_account_name=self.service_account_name)
364  self.service_account = None
365  super().cleanup(force=(force_namespace and force))
366 
367  @classmethod
369  resource_prefix: str,
370  resource_suffix: str,
371  name: str = 'server') -> str:
372  """A helper to make consistent XdsTestServer kubernetes namespace name
373  for given resource prefix and suffix.
374 
375  Note: the idea is to intentionally produce different namespace name for
376  the test server, and the test client, as that closely mimics real-world
377  deployments.
378  :rtype: object
379  """
380  return cls._make_namespace_name(resource_prefix, resource_suffix, name)
framework.test_app.server_app.XdsTestServer.set_serving
def set_serving(self)
Definition: server_app.py:81
framework.test_app.base_runner.KubernetesBaseRunner
Definition: base_runner.py:53
framework.test_app.server_app.KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT
int DEFAULT_MAINTENANCE_PORT
Definition: server_app.py:157
framework.test_app.base_runner.KubernetesBaseRunner._delete_service
def _delete_service(self, name, wait_for_deletion=True)
Definition: base_runner.py:250
framework.test_app.server_app._HealthClient
_HealthClient
Definition: server_app.py:36
framework.test_app.server_app.KubernetesServerRunner.neg_name
neg_name
Definition: server_app.py:172
framework.test_app.server_app.XdsTestServer.rpc_port
rpc_port
Definition: server_app.py:48
framework.test_app.server_app.KubernetesServerRunner.service_template
service_template
Definition: server_app.py:176
framework.test_app.server_app.KubernetesServerRunner.service_name
service_name
Definition: server_app.py:166
framework.test_app.server_app.KubernetesServerRunner.td_bootstrap_image
td_bootstrap_image
Definition: server_app.py:168
framework.test_app.base_runner.KubernetesBaseRunner._grant_workload_identity_user
def _grant_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:170
framework.test_app.server_app.KubernetesServerRunner.deployment
deployment
Definition: server_app.py:291
framework.test_app.server_app.KubernetesServerRunner.service_account_template
service_account_template
Definition: server_app.py:184
framework.test_app.server_app.XdsTestServer.get_test_server_sockets
Iterator[grpc_channelz.Socket] get_test_server_sockets(self)
Definition: server_app.py:120
framework.test_app.base_runner.KubernetesBaseRunner._wait_deployment_with_available_replicas
def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs)
Definition: base_runner.py:289
framework.test_app.server_app.KubernetesServerRunner.debug_use_port_forwarding
debug_use_port_forwarding
Definition: server_app.py:178
framework.test_app.server_app.KubernetesServerRunner.cleanup
def cleanup(self, *force=False, force_namespace=False)
Definition: server_app.py:347
framework.test_app.server_app.KubernetesServerRunner.image_name
image_name
Definition: server_app.py:165
framework.test_app.server_app.KubernetesServerRunner.service_account_name
service_account_name
Definition: server_app.py:183
framework.test_app.server_app.KubernetesServerRunner.port_forwarders
port_forwarders
Definition: server_app.py:351
framework.test_app.base_runner.KubernetesBaseRunner._wait_pod_started
def _wait_pod_started(self, name, **kwargs)
Definition: base_runner.py:299
framework.test_app.server_app.XdsTestServer.get_server_socket_matching_client
def get_server_socket_matching_client(self, grpc_channelz.Socket client_socket)
Definition: server_app.py:129
framework.test_app.server_app.KubernetesServerRunner
Definition: server_app.py:155
framework.test_app.base_runner.KubernetesBaseRunner._logs_explorer_link
None _logs_explorer_link(*str deployment_name, str namespace_name, str gcp_project, str gcp_ui_url, Optional[timedelta] end_delta=None)
Definition: base_runner.py:315
framework.test_app.server_app.KubernetesServerRunner.gcp_service_account
gcp_service_account
Definition: server_app.py:193
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
framework.test_app.server_app.KubernetesServerRunner.xds_server_uri
xds_server_uri
Definition: server_app.py:169
framework.test_app.server_app.XdsTestServer.set_xds_address
def set_xds_address(self, xds_host, Optional[int] xds_port=None)
Definition: server_app.py:91
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
framework.test_app.server_app.KubernetesServerRunner.deployment_name
deployment_name
Definition: server_app.py:164
framework.rpc.grpc.GrpcApp
Definition: grpc.py:63
framework.test_app.base_runner.KubernetesBaseRunner._delete_deployment
def _delete_deployment(self, name, wait_for_deletion=True)
Definition: base_runner.py:237
framework.test_app.server_app.XdsTestServer.update_health_service_client
_XdsUpdateHealthServiceClient update_health_service_client(self)
Definition: server_app.py:72
framework.test_app.base_runner.KubernetesBaseRunner._revoke_workload_identity_user
def _revoke_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:182
framework.test_app.server_app.KubernetesServerRunner.__init__
def __init__(self, k8s_namespace, *deployment_name, image_name, td_bootstrap_image, gcp.api.GcpApiManager gcp_api_manager, str gcp_project, str gcp_service_account, service_account_name=None, service_name=None, neg_name=None, xds_server_uri=None, network='default', deployment_template='server.deployment.yaml', service_account_template='service-account.yaml', service_template='server.service.yaml', reuse_service=False, reuse_namespace=False, namespace_template=None, debug_use_port_forwarding=False, enable_workload_identity=True)
Definition: server_app.py:160
framework.test_app.server_app.XdsTestServer.xds_port
xds_port
Definition: server_app.py:52
framework.rpc.grpc.GrpcApp._make_channel
grpc.Channel _make_channel(self, port)
Definition: grpc.py:78
framework.test_app.base_runner.KubernetesBaseRunner._create_service_account
k8s.V1ServiceAccount _create_service_account(self, template, **kwargs)
Definition: base_runner.py:198
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.rpc.grpc.GrpcApp.NotFound
Definition: grpc.py:66
framework.test_app.base_runner.KubernetesBaseRunner._delete_service_account
def _delete_service_account(self, name, wait_for_deletion=True)
Definition: base_runner.py:263
framework.test_app.server_app.KubernetesServerRunner.gcp_iam
gcp_iam
Definition: server_app.py:196
framework.test_app.server_app.XdsTestServer.secure_mode
secure_mode
Definition: server_app.py:50
framework.test_app.server_app.KubernetesServerRunner.service
service
Definition: server_app.py:264
framework.test_app.base_runner.KubernetesBaseRunner._create_deployment
k8s.V1Deployment _create_deployment(self, template, **kwargs)
Definition: base_runner.py:212
framework.test_app.server_app.XdsTestServer.get_test_server
grpc_channelz.Server get_test_server(self)
Definition: server_app.py:108
framework.test_app.server_app._ChannelzServiceClient
_ChannelzServiceClient
Definition: server_app.py:34
framework.test_app.server_app.XdsTestServer.channelz
_ChannelzServiceClient channelz(self)
Definition: server_app.py:67
framework.test_app.server_app.KubernetesServerRunner.gcp_project
gcp_project
Definition: server_app.py:190
framework.test_app.server_app.KubernetesServerRunner.service_account
service_account
Definition: server_app.py:284
framework.test_app.base_runner.KubernetesBaseRunner._make_namespace_name
str _make_namespace_name(str resource_prefix, str resource_suffix, str name)
Definition: base_runner.py:343
framework.test_app.server_app.KubernetesServerRunner.gcp_ui_url
gcp_ui_url
Definition: server_app.py:191
framework.test_app.server_app.XdsTestServer.xds_address
str xds_address(self)
Definition: server_app.py:95
framework.test_app.server_app.XdsTestServer.ip
ip
Definition: server_app.py:47
framework.test_app.server_app._XdsUpdateHealthServiceClient
_XdsUpdateHealthServiceClient
Definition: server_app.py:35
framework.test_app.server_app.KubernetesServerRunner.network
network
Definition: server_app.py:174
framework.test_app.server_app.XdsTestServer.server_id
server_id
Definition: server_app.py:51
framework.test_app.server_app.KubernetesServerRunner.run
List[XdsTestServer] run(self, *test_port=DEFAULT_TEST_PORT, maintenance_port=None, secure_mode=False, server_id=None, replica_count=1)
Definition: server_app.py:226
framework.test_app.server_app.KubernetesServerRunner.enable_workload_identity
enable_workload_identity
Definition: server_app.py:179
framework.test_app.base_runner.KubernetesBaseRunner._wait_service_neg
def _wait_service_neg(self, name, service_port, **kwargs)
Definition: base_runner.py:306
framework.test_app.server_app.KubernetesServerRunner.deployment_template
deployment_template
Definition: server_app.py:175
framework.test_app.server_app.KubernetesServerRunner.reuse_service
reuse_service
Definition: server_app.py:177
framework.test_app.server_app.XdsTestServer
Definition: server_app.py:39
framework.test_app.server_app.XdsTestServer.pod_name
pod_name
Definition: server_app.py:53
framework.test_app.base_runner.KubernetesBaseRunner._create_service
k8s.V1Service _create_service(self, template, **kwargs)
Definition: base_runner.py:225
framework.test_app.server_app.XdsTestServer.set_not_serving
def set_not_serving(self)
Definition: server_app.py:86
framework.test_app.server_app.KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
int DEFAULT_SECURE_MODE_MAINTENANCE_PORT
Definition: server_app.py:158
cleanup
Definition: cleanup.py:1
framework.test_app.server_app.XdsTestServer.maintenance_port
maintenance_port
Definition: server_app.py:49
framework.test_app.server_app.XdsTestServer.health_client
_HealthClient health_client(self)
Definition: server_app.py:78
framework.test_app.server_app.XdsTestServer.xds_uri
str xds_uri(self)
Definition: server_app.py:103
framework.test_app.server_app.KubernetesServerRunner.make_namespace_name
str make_namespace_name(cls, str resource_prefix, str resource_suffix, str name='server')
Definition: server_app.py:368
framework.test_app.server_app.XdsTestServer.__init__
def __init__(self, *str ip, int rpc_port, Optional[int] maintenance_port=None, Optional[bool] secure_mode=False, Optional[str] server_id=None, Optional[str] xds_host=None, Optional[int] xds_port=None, Optional[str] rpc_host=None, Optional[str] pod_name=None)
Definition: server_app.py:45
framework.test_app.base_runner.KubernetesBaseRunner._reuse_service
k8s.V1Service _reuse_service(self, service_name)
Definition: base_runner.py:137


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:16