client_app.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """
15 xDS Test Client.
16 
17 TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual
18 modules.
19 """
20 import datetime
21 import functools
22 import logging
23 from typing import Iterable, List, Optional
24 
25 from framework.helpers import retryers
26 from framework.infrastructure import gcp
27 from framework.infrastructure import k8s
28 import framework.rpc
29 from framework.rpc import grpc_channelz
30 from framework.rpc import grpc_csds
31 from framework.rpc import grpc_testing
32 from framework.test_app import base_runner
33 
34 logger = logging.getLogger(__name__)
35 
36 # Type aliases
37 _timedelta = datetime.timedelta
38 _LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
39 _XdsUpdateClientConfigureServiceClient = grpc_testing.XdsUpdateClientConfigureServiceClient
40 _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
41 _ChannelzChannel = grpc_channelz.Channel
42 _ChannelzChannelState = grpc_channelz.ChannelState
43 _ChannelzSubchannel = grpc_channelz.Subchannel
44 _ChannelzSocket = grpc_channelz.Socket
45 _CsdsClient = grpc_csds.CsdsClient
46 
47 
49  """
50  Represents RPC services implemented in Client component of the xds test app.
51  https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client
52  """
53 
54  def __init__(self,
55  *,
56  ip: str,
57  rpc_port: int,
58  server_target: str,
59  rpc_host: Optional[str] = None,
60  maintenance_port: Optional[int] = None):
61  super().__init__(rpc_host=(rpc_host or ip))
62  self.ip = ip
63  self.rpc_port = rpc_port
64  self.server_target = server_target
65  self.maintenance_port = maintenance_port or rpc_port
66 
67  @property
68  @functools.lru_cache(None)
69  def load_balancer_stats(self) -> _LoadBalancerStatsServiceClient:
71  self.rpc_port))
72 
73  @property
74  @functools.lru_cache(None)
75  def update_config(self):
77  self._make_channel(self.rpc_port))
78 
79  @property
80  @functools.lru_cache(None)
81  def channelz(self) -> _ChannelzServiceClient:
83 
84  @property
85  @functools.lru_cache(None)
86  def csds(self) -> _CsdsClient:
87  return _CsdsClient(self._make_channel(self.maintenance_port))
88 
90  self,
91  *,
92  num_rpcs: int,
93  timeout_sec: Optional[int] = None,
94  ) -> grpc_testing.LoadBalancerStatsResponse:
95  """
96  Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
97  """
99  num_rpcs=num_rpcs, timeout_sec=timeout_sec)
100 
102  self,
103  *,
104  timeout_sec: Optional[int] = None,
105  ) -> grpc_testing.LoadBalancerAccumulatedStatsResponse:
106  """Shortcut to LoadBalancerStatsServiceClient.get_client_accumulated_stats()"""
108  timeout_sec=timeout_sec)
109 
110  def wait_for_active_server_channel(self) -> _ChannelzChannel:
111  """Wait for the channel to the server to transition to READY.
112 
113  Raises:
114  GrpcApp.NotFound: If the channel never transitioned to READY.
115  """
116  return self.wait_for_server_channel_state(_ChannelzChannelState.READY)
117 
118  def get_active_server_channel_socket(self) -> _ChannelzSocket:
119  channel = self.find_server_channel_with_state(
120  _ChannelzChannelState.READY)
121  # Get the first subchannel of the active channel to the server.
122  logger.debug(
123  'Retrieving client -> server socket, '
124  'channel_id: %s, subchannel: %s', channel.ref.channel_id,
125  channel.subchannel_ref[0].name)
126  subchannel, *subchannels = list(
127  self.channelz.list_channel_subchannels(channel))
128  if subchannels:
129  logger.warning('Unexpected subchannels: %r', subchannels)
130  # Get the first socket of the subchannel
131  socket, *sockets = list(
132  self.channelz.list_subchannels_sockets(subchannel))
133  if sockets:
134  logger.warning('Unexpected sockets: %r', subchannels)
135  logger.debug('Found client -> server socket: %s', socket.ref.name)
136  return socket
137 
139  self,
140  state: _ChannelzChannelState,
141  *,
142  timeout: Optional[_timedelta] = None,
143  rpc_deadline: Optional[_timedelta] = None) -> _ChannelzChannel:
144  # When polling for a state, prefer smaller wait times to avoid
145  # exhausting all allowed time on a single long RPC.
146  if rpc_deadline is None:
147  rpc_deadline = _timedelta(seconds=30)
148 
149  # Fine-tuned to wait for the channel to the server.
150  retryer = retryers.exponential_retryer_with_timeout(
151  wait_min=_timedelta(seconds=10),
152  wait_max=_timedelta(seconds=25),
153  timeout=_timedelta(minutes=5) if timeout is None else timeout)
154 
155  logger.info('Waiting for client %s to report a %s channel to %s',
156  self.ip, _ChannelzChannelState.Name(state),
157  self.server_target)
158  channel = retryer(self.find_server_channel_with_state,
159  state,
160  rpc_deadline=rpc_deadline)
161  logger.info('Client %s channel to %s transitioned to state %s:\n%s',
162  self.ip, self.server_target,
163  _ChannelzChannelState.Name(state), channel)
164  return channel
165 
167  self,
168  state: _ChannelzChannelState,
169  *,
170  rpc_deadline: Optional[_timedelta] = None,
171  check_subchannel=True) -> _ChannelzChannel:
172  rpc_params = {}
173  if rpc_deadline is not None:
174  rpc_params['deadline_sec'] = rpc_deadline.total_seconds()
175 
176  for channel in self.get_server_channels(**rpc_params):
177  channel_state: _ChannelzChannelState = channel.data.state.state
178  logger.info('Server channel: %s, state: %s', channel.ref.name,
179  _ChannelzChannelState.Name(channel_state))
180  if channel_state is state:
181  if check_subchannel:
182  # When requested, check if the channel has at least
183  # one subchannel in the requested state.
184  try:
185  subchannel = self.find_subchannel_with_state(
186  channel, state, **rpc_params)
187  logger.info('Found subchannel in state %s: %s',
188  _ChannelzChannelState.Name(state),
189  subchannel)
190  except self.NotFound as e:
191  # Otherwise, keep searching.
192  logger.info(e.message)
193  continue
194  return channel
195 
196  raise self.NotFound(
197  f'Client has no {_ChannelzChannelState.Name(state)} channel with '
198  'the server')
199 
200  def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]:
201  return self.channelz.find_channels_for_target(self.server_target,
202  **kwargs)
203 
204  def find_subchannel_with_state(self, channel: _ChannelzChannel,
205  state: _ChannelzChannelState,
206  **kwargs) -> _ChannelzSubchannel:
207  subchannels = self.channelz.list_channel_subchannels(channel, **kwargs)
208  for subchannel in subchannels:
209  if subchannel.data.state.state is state:
210  return subchannel
211 
212  raise self.NotFound(
213  f'Not found a {_ChannelzChannelState.Name(state)} '
214  f'subchannel for channel_id {channel.ref.channel_id}')
215 
216  def find_subchannels_with_state(self, state: _ChannelzChannelState,
217  **kwargs) -> List[_ChannelzSubchannel]:
218  subchannels = []
219  for channel in self.channelz.find_channels_for_target(
220  self.server_target, **kwargs):
221  for subchannel in self.channelz.list_channel_subchannels(
222  channel, **kwargs):
223  if subchannel.data.state.state is state:
224  subchannels.append(subchannel)
225  return subchannels
226 
227 
229 
230  def __init__( # pylint: disable=too-many-locals
231  self,
232  k8s_namespace,
233  *,
234  deployment_name,
235  image_name,
236  td_bootstrap_image,
237  gcp_api_manager: gcp.api.GcpApiManager,
238  gcp_project: str,
239  gcp_service_account: str,
240  xds_server_uri=None,
241  network='default',
242  service_account_name=None,
243  stats_port=8079,
244  deployment_template='client.deployment.yaml',
245  service_account_template='service-account.yaml',
246  reuse_namespace=False,
247  namespace_template=None,
248  debug_use_port_forwarding=False,
249  enable_workload_identity=True):
250  super().__init__(k8s_namespace, namespace_template, reuse_namespace)
251 
252  # Settings
253  self.deployment_name = deployment_name
254  self.image_name = image_name
255  self.stats_port = stats_port
256  # xDS bootstrap generator
257  self.td_bootstrap_image = td_bootstrap_image
258  self.xds_server_uri = xds_server_uri
259  self.network = network
260  self.deployment_template = deployment_template
261  self.debug_use_port_forwarding = debug_use_port_forwarding
262  self.enable_workload_identity = enable_workload_identity
263  # Service account settings:
264  # Kubernetes service account
265  if self.enable_workload_identity:
266  self.service_account_name = service_account_name or deployment_name
267  self.service_account_template = service_account_template
268  else:
269  self.service_account_name = None
270  self.service_account_template = None
271  # GCP.
272  self.gcp_project = gcp_project
273  self.gcp_ui_url = gcp_api_manager.gcp_ui_url
274  # GCP service account to map to Kubernetes service account
275  self.gcp_service_account = gcp_service_account
276  # GCP IAM API used to grant allow workload service accounts permission
277  # to use GCP service account identity.
278  self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
279 
280  # Mutable state
281  self.deployment: Optional[k8s.V1Deployment] = None
282  self.service_account: Optional[k8s.V1ServiceAccount] = None
283  self.port_forwarder: Optional[k8s.PortForwarder] = None
284 
285  # TODO(sergiitk): make rpc UnaryCall enum or get it from proto
286  def run( # pylint: disable=arguments-differ
287  self,
288  *,
289  server_target,
290  rpc='UnaryCall',
291  qps=25,
292  metadata='',
293  secure_mode=False,
294  config_mesh=None,
295  print_response=False) -> XdsTestClient:
296  logger.info(
297  'Deploying xDS test client "%s" to k8s namespace %s: '
298  'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
299  'print_response=%s', self.deployment_name, self.k8s_namespace.name,
300  server_target, rpc, qps, metadata, secure_mode, print_response)
301  self._logs_explorer_link(deployment_name=self.deployment_name,
302  namespace_name=self.k8s_namespace.name,
303  gcp_project=self.gcp_project,
304  gcp_ui_url=self.gcp_ui_url)
305 
306  super().run()
307 
309  # Allow Kubernetes service account to use the GCP service account
310  # identity.
312  gcp_iam=self.gcp_iam,
313  gcp_service_account=self.gcp_service_account,
314  service_account_name=self.service_account_name)
315 
316  # Create service account
319  service_account_name=self.service_account_name,
320  namespace_name=self.k8s_namespace.name,
321  gcp_service_account=self.gcp_service_account)
322 
323  # Always create a new deployment
324  self.deployment = self._create_deployment(
325  self.deployment_template,
326  deployment_name=self.deployment_name,
327  image_name=self.image_name,
328  namespace_name=self.k8s_namespace.name,
329  service_account_name=self.service_account_name,
330  td_bootstrap_image=self.td_bootstrap_image,
331  xds_server_uri=self.xds_server_uri,
332  network=self.network,
333  stats_port=self.stats_port,
334  server_target=server_target,
335  rpc=rpc,
336  qps=qps,
337  metadata=metadata,
338  secure_mode=secure_mode,
339  config_mesh=config_mesh,
340  print_response=print_response)
341 
343 
344  # Load test client pod. We need only one client at the moment
345  pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
346  self._wait_pod_started(pod.metadata.name)
347  pod_ip = pod.status.pod_ip
348  rpc_port = self.stats_port
349  rpc_host = None
350 
351  # Experimental, for local debugging.
353  logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
354  pod_ip, self.stats_port)
355  self.port_forwarder = self.k8s_namespace.port_forward_pod(
356  pod, remote_port=self.stats_port)
357  rpc_port = self.port_forwarder.local_port
358  rpc_host = self.port_forwarder.local_address
359 
360  return XdsTestClient(ip=pod_ip,
361  rpc_port=rpc_port,
362  server_target=server_target,
363  rpc_host=rpc_host)
364 
365  def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
366  if self.port_forwarder:
367  self.port_forwarder.close()
368  self.port_forwarder = None
369  if self.deployment or force:
371  self.deployment = None
372  if self.enable_workload_identity and (self.service_account or force):
374  gcp_iam=self.gcp_iam,
375  gcp_service_account=self.gcp_service_account,
376  service_account_name=self.service_account_name)
378  self.service_account = None
379  super().cleanup(force=force_namespace and force)
380 
381  @classmethod
383  resource_prefix: str,
384  resource_suffix: str,
385  name: str = 'client') -> str:
386  """A helper to make consistent XdsTestClient kubernetes namespace name
387  for given resource prefix and suffix.
388 
389  Note: the idea is to intentionally produce different namespace name for
390  the test server, and the test client, as that closely mimics real-world
391  deployments.
392  """
393  return cls._make_namespace_name(resource_prefix, resource_suffix, name)
framework.test_app.client_app.KubernetesClientRunner.td_bootstrap_image
td_bootstrap_image
Definition: client_app.py:238
framework.test_app.client_app.XdsTestClient.server_target
server_target
Definition: client_app.py:58
framework.test_app.client_app.XdsTestClient.find_subchannels_with_state
List[_ChannelzSubchannel] find_subchannels_with_state(self, _ChannelzChannelState state, **kwargs)
Definition: client_app.py:216
framework.test_app.client_app.XdsTestClient.channelz
_ChannelzServiceClient channelz(self)
Definition: client_app.py:81
framework.test_app.base_runner.KubernetesBaseRunner
Definition: base_runner.py:53
run_xds_tests.get_client_accumulated_stats
def get_client_accumulated_stats()
Definition: run_xds_tests.py:382
framework.test_app.client_app.KubernetesClientRunner
Definition: client_app.py:228
framework.test_app.client_app.KubernetesClientRunner.make_namespace_name
str make_namespace_name(cls, str resource_prefix, str resource_suffix, str name='client')
Definition: client_app.py:382
framework.test_app.client_app.XdsTestClient.wait_for_server_channel_state
_ChannelzChannel wait_for_server_channel_state(self, _ChannelzChannelState state, *Optional[_timedelta] timeout=None, Optional[_timedelta] rpc_deadline=None)
Definition: client_app.py:138
framework.test_app.client_app.KubernetesClientRunner.enable_workload_identity
enable_workload_identity
Definition: client_app.py:243
framework.test_app.client_app.KubernetesClientRunner.run
XdsTestClient run(self, *server_target, rpc='UnaryCall', qps=25, metadata='', secure_mode=False, config_mesh=None, print_response=False)
Definition: client_app.py:286
framework.test_app.client_app._timedelta
_timedelta
Definition: client_app.py:37
framework.test_app.client_app.XdsTestClient.get_active_server_channel_socket
_ChannelzSocket get_active_server_channel_socket(self)
Definition: client_app.py:118
framework.test_app.client_app.KubernetesClientRunner.service_account_name
service_account_name
Definition: client_app.py:247
framework.test_app.client_app.KubernetesClientRunner.__init__
def __init__(self, k8s_namespace, *deployment_name, image_name, td_bootstrap_image, gcp.api.GcpApiManager gcp_api_manager, str gcp_project, str gcp_service_account, xds_server_uri=None, network='default', service_account_name=None, stats_port=8079, deployment_template='client.deployment.yaml', service_account_template='service-account.yaml', reuse_namespace=False, namespace_template=None, debug_use_port_forwarding=False, enable_workload_identity=True)
Definition: client_app.py:230
framework.test_app.base_runner.KubernetesBaseRunner._grant_workload_identity_user
def _grant_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:170
framework.test_app.client_app.KubernetesClientRunner.service_account_template
service_account_template
Definition: client_app.py:248
framework.test_app.client_app.XdsTestClient.csds
_CsdsClient csds(self)
Definition: client_app.py:86
framework.test_app.client_app.XdsTestClient.find_server_channel_with_state
_ChannelzChannel find_server_channel_with_state(self, _ChannelzChannelState state, *Optional[_timedelta] rpc_deadline=None, check_subchannel=True)
Definition: client_app.py:166
framework.test_app.base_runner.KubernetesBaseRunner._wait_deployment_with_available_replicas
def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs)
Definition: base_runner.py:289
framework.test_app.client_app.XdsTestClient.maintenance_port
maintenance_port
Definition: client_app.py:59
framework.test_app.client_app._ChannelzServiceClient
_ChannelzServiceClient
Definition: client_app.py:40
framework.test_app.client_app.XdsTestClient.rpc_port
rpc_port
Definition: client_app.py:57
framework.test_app.base_runner.KubernetesBaseRunner._wait_pod_started
def _wait_pod_started(self, name, **kwargs)
Definition: base_runner.py:299
framework.test_app.client_app.XdsTestClient.load_balancer_stats
_LoadBalancerStatsServiceClient load_balancer_stats(self)
Definition: client_app.py:69
framework.test_app.client_app.KubernetesClientRunner.debug_use_port_forwarding
debug_use_port_forwarding
Definition: client_app.py:242
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
framework.test_app.base_runner.KubernetesBaseRunner._logs_explorer_link
None _logs_explorer_link(*str deployment_name, str namespace_name, str gcp_project, str gcp_ui_url, Optional[timedelta] end_delta=None)
Definition: base_runner.py:315
framework.test_app.client_app.KubernetesClientRunner.port_forwarder
port_forwarder
Definition: client_app.py:346
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
framework.test_app.client_app._LoadBalancerStatsServiceClient
_LoadBalancerStatsServiceClient
Definition: client_app.py:38
framework.test_app.client_app.XdsTestClient
Definition: client_app.py:48
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
run_xds_tests.get_client_stats
def get_client_stats(num_rpcs, timeout_sec)
Definition: run_xds_tests.py:359
framework.test_app.client_app.KubernetesClientRunner.deployment_template
deployment_template
Definition: client_app.py:241
framework.rpc.grpc.GrpcApp
Definition: grpc.py:63
framework.test_app.client_app.XdsTestClient.ip
ip
Definition: client_app.py:56
framework.test_app.base_runner.KubernetesBaseRunner._delete_deployment
def _delete_deployment(self, name, wait_for_deletion=True)
Definition: base_runner.py:237
framework.test_app.client_app.KubernetesClientRunner.deployment
deployment
Definition: client_app.py:315
framework.test_app.client_app.KubernetesClientRunner.xds_server_uri
xds_server_uri
Definition: client_app.py:239
framework.test_app.client_app.KubernetesClientRunner.service_account
service_account
Definition: client_app.py:308
framework.test_app.base_runner.KubernetesBaseRunner._revoke_workload_identity_user
def _revoke_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:182
framework.test_app.client_app.XdsTestClient.wait_for_active_server_channel
_ChannelzChannel wait_for_active_server_channel(self)
Definition: client_app.py:110
framework.rpc.grpc.GrpcApp._make_channel
grpc.Channel _make_channel(self, port)
Definition: grpc.py:78
framework.test_app.base_runner.KubernetesBaseRunner._create_service_account
k8s.V1ServiceAccount _create_service_account(self, template, **kwargs)
Definition: base_runner.py:198
close
#define close
Definition: test-fs.c:48
framework.test_app.client_app._CsdsClient
_CsdsClient
Definition: client_app.py:45
framework.test_app.client_app.XdsTestClient.find_subchannel_with_state
_ChannelzSubchannel find_subchannel_with_state(self, _ChannelzChannel channel, _ChannelzChannelState state, **kwargs)
Definition: client_app.py:204
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.test_app.client_app._XdsUpdateClientConfigureServiceClient
_XdsUpdateClientConfigureServiceClient
Definition: client_app.py:39
framework.rpc.grpc.GrpcApp.NotFound
Definition: grpc.py:66
framework.test_app.client_app.KubernetesClientRunner.gcp_service_account
gcp_service_account
Definition: client_app.py:256
framework.test_app.base_runner.KubernetesBaseRunner._delete_service_account
def _delete_service_account(self, name, wait_for_deletion=True)
Definition: base_runner.py:263
framework.test_app.base_runner.KubernetesBaseRunner._create_deployment
k8s.V1Deployment _create_deployment(self, template, **kwargs)
Definition: base_runner.py:212
framework.test_app.client_app.XdsTestClient.get_server_channels
Iterable[_ChannelzChannel] get_server_channels(self, **kwargs)
Definition: client_app.py:200
framework.test_app.base_runner.KubernetesBaseRunner._make_namespace_name
str _make_namespace_name(str resource_prefix, str resource_suffix, str name)
Definition: base_runner.py:343
framework.test_app.client_app.KubernetesClientRunner.image_name
image_name
Definition: client_app.py:235
framework.test_app.client_app.KubernetesClientRunner.gcp_ui_url
gcp_ui_url
Definition: client_app.py:254
framework.test_app.client_app.XdsTestClient.update_config
def update_config(self)
Definition: client_app.py:75
framework.test_app.client_app.XdsTestClient.get_load_balancer_accumulated_stats
grpc_testing.LoadBalancerAccumulatedStatsResponse get_load_balancer_accumulated_stats(self, *Optional[int] timeout_sec=None)
Definition: client_app.py:101
framework.test_app.client_app.KubernetesClientRunner.stats_port
stats_port
Definition: client_app.py:236
framework.test_app.client_app.KubernetesClientRunner.deployment_name
deployment_name
Definition: client_app.py:234
framework.test_app.client_app.KubernetesClientRunner.network
network
Definition: client_app.py:240
framework.test_app.client_app.KubernetesClientRunner.gcp_iam
gcp_iam
Definition: client_app.py:259
framework.test_app.client_app.XdsTestClient.__init__
def __init__(self, *str ip, int rpc_port, str server_target, Optional[str] rpc_host=None, Optional[int] maintenance_port=None)
Definition: client_app.py:54
cleanup
Definition: cleanup.py:1
framework.test_app.client_app.KubernetesClientRunner.cleanup
def cleanup(self, *force=False, force_namespace=False)
Definition: client_app.py:365
framework.test_app.client_app.XdsTestClient.get_load_balancer_stats
grpc_testing.LoadBalancerStatsResponse get_load_balancer_stats(self, *int num_rpcs, Optional[int] timeout_sec=None)
Definition: client_app.py:89
framework.test_app.client_app.KubernetesClientRunner.gcp_project
gcp_project
Definition: client_app.py:253


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:46