xds_k8s_testcase.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import abc
15 import datetime
16 import enum
17 import hashlib
18 import logging
19 import re
20 import time
21 from typing import List, Optional, Tuple
22 
23 from absl import flags
24 from absl.testing import absltest
25 from google.protobuf import json_format
26 import grpc
27 
28 from framework import xds_flags
29 from framework import xds_k8s_flags
30 from framework import xds_url_map_testcase
31 from framework.helpers import rand as helpers_rand
32 from framework.helpers import retryers
33 from framework.helpers import skips
34 from framework.infrastructure import gcp
35 from framework.infrastructure import k8s
36 from framework.infrastructure import traffic_director
37 from framework.rpc import grpc_channelz
38 from framework.rpc import grpc_csds
39 from framework.rpc import grpc_testing
40 from framework.test_app import client_app
41 from framework.test_app import server_app
42 
43 logger = logging.getLogger(__name__)
44 # TODO(yashkt): We will no longer need this flag once Core exposes local certs
45 # from channelz
46 _CHECK_LOCAL_CERTS = flags.DEFINE_bool(
47  "check_local_certs",
48  default=True,
49  help="Security Tests also check the value of local certs")
50 flags.adopt_module_key_flags(xds_flags)
51 flags.adopt_module_key_flags(xds_k8s_flags)
52 
53 # Type aliases
54 TrafficDirectorManager = traffic_director.TrafficDirectorManager
55 TrafficDirectorAppNetManager = traffic_director.TrafficDirectorAppNetManager
56 TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager
57 XdsTestServer = server_app.XdsTestServer
58 XdsTestClient = client_app.XdsTestClient
59 KubernetesServerRunner = server_app.KubernetesServerRunner
60 KubernetesClientRunner = client_app.KubernetesClientRunner
61 LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
62 _ChannelState = grpc_channelz.ChannelState
63 _timedelta = datetime.timedelta
64 ClientConfig = grpc_csds.ClientConfig
65 
66 _TD_CONFIG_MAX_WAIT_SEC = 600
67 
68 
69 class TdPropagationRetryableError(Exception):
70  """Indicates that TD config hasn't propagated yet, and it's safe to retry"""
71 
72 
73 class XdsKubernetesBaseTestCase(absltest.TestCase):
74  client_namespace: str
75  client_runner: KubernetesClientRunner
76  ensure_firewall: bool
77  force_cleanup: bool
78  gcp_api_manager: gcp.api.GcpApiManager
79  gcp_service_account: Optional[str]
80  k8s_api_manager: k8s.KubernetesApiManager
81  secondary_k8s_api_manager: k8s.KubernetesApiManager
82  network: str
83  project: str
84  resource_prefix: str
85  resource_suffix: str = ''
86  # Whether to randomize resources names for each test by appending a
87  # unique suffix.
88  resource_suffix_randomize: bool = True
89  server_maintenance_port: Optional[int]
90  server_namespace: str
91  server_runner: KubernetesServerRunner
92  server_xds_host: str
93  server_xds_port: int
94  td: TrafficDirectorManager
95  td_bootstrap_image: str
96 
97  @staticmethod
98  def is_supported(config: skips.TestConfig) -> bool:
99  """Overridden by the test class to decide if the config is supported.
100 
101  Returns:
102  A bool indicates if the given config is supported.
103  """
104  del config
105  return True
106 
107  @classmethod
108  def setUpClass(cls):
109  """Hook method for setting up class fixture before running tests in
110  the class.
111  """
112  logger.info('----- Testing %s -----', cls.__name__)
113  logger.info('Logs timezone: %s', time.localtime().tm_zone)
114 
115  # Raises unittest.SkipTest if given client/server/version does not
116  # support current test case.
117  skips.evaluate_test_config(cls.is_supported)
118 
119  # GCP
120  cls.project = xds_flags.PROJECT.value
121  cls.network = xds_flags.NETWORK.value
122  cls.gcp_service_account = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
123  cls.td_bootstrap_image = xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value
124  cls.xds_server_uri = xds_flags.XDS_SERVER_URI.value
125  cls.ensure_firewall = xds_flags.ENSURE_FIREWALL.value
126  cls.firewall_allowed_ports = xds_flags.FIREWALL_ALLOWED_PORTS.value
127  cls.compute_api_version = xds_flags.COMPUTE_API_VERSION.value
128 
129  # Resource names.
130  cls.resource_prefix = xds_flags.RESOURCE_PREFIX.value
131  if xds_flags.RESOURCE_SUFFIX.value is not None:
133  cls.resource_suffix = xds_flags.RESOURCE_SUFFIX.value
134 
135  # Test server
136  cls.server_image = xds_k8s_flags.SERVER_IMAGE.value
137  cls.server_name = xds_flags.SERVER_NAME.value
138  cls.server_port = xds_flags.SERVER_PORT.value
139  cls.server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
140  cls.server_xds_host = xds_flags.SERVER_NAME.value
141  cls.server_xds_port = xds_flags.SERVER_XDS_PORT.value
142 
143  # Test client
144  cls.client_image = xds_k8s_flags.CLIENT_IMAGE.value
145  cls.client_name = xds_flags.CLIENT_NAME.value
146  cls.client_port = xds_flags.CLIENT_PORT.value
147 
148  # Test suite settings
149  cls.force_cleanup = xds_flags.FORCE_CLEANUP.value
151  xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value
153  xds_k8s_flags.ENABLE_WORKLOAD_IDENTITY.value
154  cls.check_local_certs = _CHECK_LOCAL_CERTS.value
155 
156  # Resource managers
157  cls.k8s_api_manager = k8s.KubernetesApiManager(
158  xds_k8s_flags.KUBE_CONTEXT.value)
159  cls.secondary_k8s_api_manager = k8s.KubernetesApiManager(
160  xds_k8s_flags.SECONDARY_KUBE_CONTEXT.value)
161  cls.gcp_api_manager = gcp.api.GcpApiManager()
162 
163  @classmethod
164  def tearDownClass(cls):
165  cls.k8s_api_manager.close()
167  cls.gcp_api_manager.close()
168 
170  self.td.setup_for_grpc(self.server_xds_host,
171  self.server_xds_port,
172  health_check_port=self.server_maintenance_port)
173 
175  *,
176  wait_for_healthy_status=True,
177  server_runner=None,
178  max_rate_per_endpoint: Optional[int] = None):
179  if server_runner is None:
180  server_runner = self.server_runner
181  # Load Backends
182  neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg(
183  server_runner.service_name, self.server_port)
184 
185  # Add backends to the Backend Service
186  self.td.backend_service_add_neg_backends(
187  neg_name, neg_zones, max_rate_per_endpoint=max_rate_per_endpoint)
188  if wait_for_healthy_status:
189  self.td.wait_for_backends_healthy_status()
190 
191  def removeServerBackends(self, *, server_runner=None):
192  if server_runner is None:
193  server_runner = self.server_runner
194  # Load Backends
195  neg_name, neg_zones = server_runner.k8s_namespace.get_service_neg(
196  server_runner.service_name, self.server_port)
197 
198  # Remove backends from the Backend Service
199  self.td.backend_service_remove_neg_backends(neg_name, neg_zones)
200 
202  test_client: XdsTestClient,
203  num_rpcs: int = 100):
204  lb_stats = self.getClientRpcStats(test_client, num_rpcs)
205  self.assertAllBackendsReceivedRpcs(lb_stats)
206  failed = int(lb_stats.num_failures)
207  self.assertLessEqual(
208  failed,
209  0,
210  msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
211 
212  @staticmethod
214  before: grpc_testing.LoadBalancerAccumulatedStatsResponse,
215  after: grpc_testing.LoadBalancerAccumulatedStatsResponse):
216  """Only diffs stats_per_method, as the other fields are deprecated."""
217  diff = grpc_testing.LoadBalancerAccumulatedStatsResponse()
218  for method, method_stats in after.stats_per_method.items():
219  for status, count in method_stats.result.items():
220  count -= before.stats_per_method[method].result[status]
221  if count < 0:
222  raise AssertionError("Diff of count shouldn't be negative")
223  if count > 0:
224  diff.stats_per_method[method].result[status] = count
225  return diff
226 
227  def assertRpcStatusCodes(self, test_client: XdsTestClient, *,
228  status_code: grpc.StatusCode, duration: _timedelta,
229  method: str) -> None:
230  """Assert all RPCs for a method are completing with a certain status."""
231  # Sending with pre-set QPS for a period of time
232  before_stats = test_client.get_load_balancer_accumulated_stats()
233  response_type = 'LoadBalancerAccumulatedStatsResponse'
234  logging.info('Received %s from test client %s: before:\n%s',
235  response_type, test_client.ip, before_stats)
236  time.sleep(duration.total_seconds())
237  after_stats = test_client.get_load_balancer_accumulated_stats()
238  logging.info('Received %s from test client %s: after:\n%s',
239  response_type, test_client.ip, after_stats)
240 
241  diff_stats = self.diffAccumulatedStatsPerMethod(before_stats,
242  after_stats)
243  stats = diff_stats.stats_per_method[method]
244  status = status_code.value[0]
245  for found_status, count in stats.result.items():
246  if found_status != status and count > 0:
247  self.fail(f"Expected only status {status} but found status "
248  f"{found_status} for method {method}:\n{diff_stats}")
249  self.assertGreater(stats.result[status_code.value[0]], 0)
250 
252  test_client: XdsTestClient,
253  servers: List[XdsTestServer],
254  num_rpcs: int = 100):
255  retryer = retryers.constant_retryer(
256  wait_fixed=datetime.timedelta(seconds=1),
257  timeout=datetime.timedelta(seconds=_TD_CONFIG_MAX_WAIT_SEC),
258  log_level=logging.INFO)
259  try:
260  retryer(self._assertRpcsEventuallyGoToGivenServers, test_client,
261  servers, num_rpcs)
262  except retryers.RetryError:
263  logger.exception(
264  'Rpcs did not go to expected servers before timeout %s',
265  _TD_CONFIG_MAX_WAIT_SEC)
266 
267  def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient,
268  servers: List[XdsTestServer],
269  num_rpcs: int):
270  server_names = [server.pod_name for server in servers]
271  logger.info('Verifying RPCs go to %s', server_names)
272  lb_stats = self.getClientRpcStats(test_client, num_rpcs)
273  failed = int(lb_stats.num_failures)
274  self.assertLessEqual(
275  failed,
276  0,
277  msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
278  for server_name in server_names:
279  self.assertIn(server_name, lb_stats.rpcs_by_peer,
280  f'{server_name} did not receive RPCs')
281  for peer in lb_stats.rpcs_by_peer.keys():
282  self.assertIn(peer, server_names,
283  f'Unexpected server {peer} received RPCs')
284 
285  def assertXdsConfigExists(self, test_client: XdsTestClient):
286  config = test_client.csds.fetch_client_status(log_level=logging.INFO)
287  self.assertIsNotNone(config)
288  seen = set()
289  want = frozenset([
290  'listener_config',
291  'cluster_config',
292  'route_config',
293  'endpoint_config',
294  ])
295  for xds_config in config.xds_config:
296  seen.add(xds_config.WhichOneof('per_xds_config'))
297  for generic_xds_config in config.generic_xds_configs:
298  if re.search(r'\.Listener$', generic_xds_config.type_url):
299  seen.add('listener_config')
300  elif re.search(r'\.RouteConfiguration$',
301  generic_xds_config.type_url):
302  seen.add('route_config')
303  elif re.search(r'\.Cluster$', generic_xds_config.type_url):
304  seen.add('cluster_config')
305  elif re.search(r'\.ClusterLoadAssignment$',
306  generic_xds_config.type_url):
307  seen.add('endpoint_config')
308  logger.debug('Received xDS config dump: %s',
309  json_format.MessageToJson(config, indent=2))
310  self.assertSameElements(want, seen)
311 
313  self, test_client: XdsTestClient,
314  previous_route_config_version: str, retry_wait_second: int,
315  timeout_second: int):
316  retryer = retryers.constant_retryer(
317  wait_fixed=datetime.timedelta(seconds=retry_wait_second),
318  timeout=datetime.timedelta(seconds=timeout_second),
319  retry_on_exceptions=(TdPropagationRetryableError,),
320  logger=logger,
321  log_level=logging.INFO)
322  try:
323  for attempt in retryer:
324  with attempt:
325  self.assertSuccessfulRpcs(test_client)
326  raw_config = test_client.csds.fetch_client_status(
327  log_level=logging.INFO)
328  dumped_config = xds_url_map_testcase.DumpedXdsConfig(
329  json_format.MessageToDict(raw_config))
330  route_config_version = dumped_config.rds_version
331  if previous_route_config_version == route_config_version:
332  logger.info(
333  'Routing config not propagated yet. Retrying.')
335  "CSDS not get updated routing config corresponding"
336  " to the second set of url maps")
337  else:
338  self.assertSuccessfulRpcs(test_client)
339  logger.info(
340  ('[SUCCESS] Confirmed successful RPC with the '
341  'updated routing config, version=%s'),
342  route_config_version)
343  except retryers.RetryError as retry_error:
344  logger.info(
345  ('Retry exhausted. TD routing config propagation failed after '
346  'timeout %ds. Last seen client config dump: %s'),
347  timeout_second, dumped_config)
348  raise retry_error
349 
351  test_client: XdsTestClient,
352  num_rpcs: Optional[int] = 100):
353  lb_stats = self.getClientRpcStats(test_client, num_rpcs)
354  failed = int(lb_stats.num_failures)
355  self.assertEqual(
356  failed,
357  num_rpcs,
358  msg=f'Expected all RPCs to fail: {failed} of {num_rpcs} failed')
359 
360  @staticmethod
361  def getClientRpcStats(test_client: XdsTestClient,
362  num_rpcs: int) -> LoadBalancerStatsResponse:
363  lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
364  logger.info(
365  'Received LoadBalancerStatsResponse from test client %s:\n%s',
366  test_client.ip, lb_stats)
367  return lb_stats
368 
369  def assertAllBackendsReceivedRpcs(self, lb_stats):
370  # TODO(sergiitk): assert backends length
371  for backend, rpcs_count in lb_stats.rpcs_by_peer.items():
372  self.assertGreater(
373  int(rpcs_count),
374  0,
375  msg=f'Backend {backend} did not receive a single RPC')
376 
377 
379  metaclass=abc.ABCMeta):
380  """Isolated test case.
381 
382  Base class for tests cases where infra resources are created before
383  each test, and destroyed after.
384  """
385 
386  def setUp(self):
387  """Hook method for setting up the test fixture before exercising it."""
388  super().setUp()
389 
391  self.resource_suffix = helpers_rand.random_resource_suffix()
392  logger.info('Test run resource prefix: %s, suffix: %s',
393  self.resource_prefix, self.resource_suffix)
394 
395  # TD Manager
397 
398  # Test Server runner
399  self.server_namespace = KubernetesServerRunner.make_namespace_name(
400  self.resource_prefix, self.resource_suffix)
402 
403  # Test Client runner
404  self.client_namespace = KubernetesClientRunner.make_namespace_name(
405  self.resource_prefix, self.resource_suffix)
407 
408  # Ensures the firewall exist
409  if self.ensure_firewall:
410  self.td.create_firewall_rule(
411  allowed_ports=self.firewall_allowed_ports)
412 
413  # Randomize xds port, when it's set to 0
414  if self.server_xds_port == 0:
415  # TODO(sergiitk): this is prone to race conditions:
416  # The port might not me taken now, but there's not guarantee
417  # it won't be taken until the tests get to creating
418  # forwarding rule. This check is better than nothing,
419  # but we should find a better approach.
420  self.server_xds_port = self.td.find_unused_forwarding_rule_port()
421  logger.info('Found unused xds port: %s', self.server_xds_port)
422 
423  @abc.abstractmethod
424  def initTrafficDirectorManager(self) -> TrafficDirectorManager:
425  raise NotImplementedError
426 
427  @abc.abstractmethod
428  def initKubernetesServerRunner(self) -> KubernetesServerRunner:
429  raise NotImplementedError
430 
431  @abc.abstractmethod
432  def initKubernetesClientRunner(self) -> KubernetesClientRunner:
433  raise NotImplementedError
434 
435  def tearDown(self):
436  logger.info('----- TestMethod %s teardown -----', self.id())
437  retryer = retryers.constant_retryer(wait_fixed=_timedelta(seconds=10),
438  attempts=3,
439  log_level=logging.INFO)
440  try:
441  retryer(self._cleanup)
442  except retryers.RetryError:
443  logger.exception('Got error during teardown')
444 
445  def _cleanup(self):
446  self.td.cleanup(force=self.force_cleanup)
447  self.client_runner.cleanup(force=self.force_cleanup)
448  self.server_runner.cleanup(force=self.force_cleanup,
449  force_namespace=self.force_cleanup)
450 
451 
453  """Regular test case base class for testing PSM features in isolation."""
454 
455  @classmethod
456  def setUpClass(cls):
457  """Hook method for setting up class fixture before running tests in
458  the class.
459  """
460  super().setUpClass()
461  if cls.server_maintenance_port is None:
463  KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT
464 
465  def initTrafficDirectorManager(self) -> TrafficDirectorManager:
466  return TrafficDirectorManager(
467  self.gcp_api_manager,
468  project=self.project,
469  resource_prefix=self.resource_prefix,
470  resource_suffix=self.resource_suffix,
471  network=self.network,
472  compute_api_version=self.compute_api_version)
473 
474  def initKubernetesServerRunner(self) -> KubernetesServerRunner:
475  return KubernetesServerRunner(
476  k8s.KubernetesNamespace(self.k8s_api_manager,
477  self.server_namespace),
478  deployment_name=self.server_name,
479  image_name=self.server_image,
480  td_bootstrap_image=self.td_bootstrap_image,
481  gcp_project=self.project,
482  gcp_api_manager=self.gcp_api_manager,
483  gcp_service_account=self.gcp_service_account,
484  xds_server_uri=self.xds_server_uri,
485  network=self.network,
486  debug_use_port_forwarding=self.debug_use_port_forwarding,
487  enable_workload_identity=self.enable_workload_identity)
488 
489  def initKubernetesClientRunner(self) -> KubernetesClientRunner:
490  return KubernetesClientRunner(
491  k8s.KubernetesNamespace(self.k8s_api_manager,
492  self.client_namespace),
493  deployment_name=self.client_name,
494  image_name=self.client_image,
495  td_bootstrap_image=self.td_bootstrap_image,
496  gcp_project=self.project,
497  gcp_api_manager=self.gcp_api_manager,
498  gcp_service_account=self.gcp_service_account,
499  xds_server_uri=self.xds_server_uri,
500  network=self.network,
501  debug_use_port_forwarding=self.debug_use_port_forwarding,
502  enable_workload_identity=self.enable_workload_identity,
503  stats_port=self.client_port,
504  reuse_namespace=self.server_namespace == self.client_namespace)
505 
507  replica_count=1,
508  server_runner=None,
509  **kwargs) -> List[XdsTestServer]:
510  if server_runner is None:
511  server_runner = self.server_runner
512  test_servers = server_runner.run(
513  replica_count=replica_count,
514  test_port=self.server_port,
515  maintenance_port=self.server_maintenance_port,
516  **kwargs)
517  for test_server in test_servers:
518  test_server.set_xds_address(self.server_xds_host,
519  self.server_xds_port)
520  return test_servers
521 
522  def startTestClient(self, test_server: XdsTestServer,
523  **kwargs) -> XdsTestClient:
524  test_client = self.client_runner.run(server_target=test_server.xds_uri,
525  **kwargs)
526  test_client.wait_for_active_server_channel()
527  return test_client
528 
529 
531  td: TrafficDirectorAppNetManager
532 
533  def initTrafficDirectorManager(self) -> TrafficDirectorAppNetManager:
535  self.gcp_api_manager,
536  project=self.project,
537  resource_prefix=self.resource_prefix,
538  resource_suffix=self.resource_suffix,
539  network=self.network,
540  compute_api_version=self.compute_api_version)
541 
542 
544  """Test case base class for testing PSM security features in isolation."""
545  td: TrafficDirectorSecureManager
546 
547  class SecurityMode(enum.Enum):
548  MTLS = enum.auto()
549  TLS = enum.auto()
550  PLAINTEXT = enum.auto()
551 
552  @classmethod
553  def setUpClass(cls):
554  """Hook method for setting up class fixture before running tests in
555  the class.
556  """
557  super().setUpClass()
558  if cls.server_maintenance_port is None:
559  # In secure mode, the maintenance port is different from
560  # the test port to keep it insecure, and make
561  # Health Checks and Channelz tests available.
562  # When not provided, use explicit numeric port value, so
563  # Backend Health Checks are created on a fixed port.
565  KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
566 
567  def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager:
569  self.gcp_api_manager,
570  project=self.project,
571  resource_prefix=self.resource_prefix,
572  resource_suffix=self.resource_suffix,
573  network=self.network,
574  compute_api_version=self.compute_api_version)
575 
576  def initKubernetesServerRunner(self) -> KubernetesServerRunner:
577  return KubernetesServerRunner(
578  k8s.KubernetesNamespace(self.k8s_api_manager,
579  self.server_namespace),
580  deployment_name=self.server_name,
581  image_name=self.server_image,
582  td_bootstrap_image=self.td_bootstrap_image,
583  gcp_project=self.project,
584  gcp_api_manager=self.gcp_api_manager,
585  gcp_service_account=self.gcp_service_account,
586  network=self.network,
587  xds_server_uri=self.xds_server_uri,
588  deployment_template='server-secure.deployment.yaml',
589  debug_use_port_forwarding=self.debug_use_port_forwarding)
590 
591  def initKubernetesClientRunner(self) -> KubernetesClientRunner:
592  return KubernetesClientRunner(
593  k8s.KubernetesNamespace(self.k8s_api_manager,
594  self.client_namespace),
595  deployment_name=self.client_name,
596  image_name=self.client_image,
597  td_bootstrap_image=self.td_bootstrap_image,
598  gcp_project=self.project,
599  gcp_api_manager=self.gcp_api_manager,
600  gcp_service_account=self.gcp_service_account,
601  xds_server_uri=self.xds_server_uri,
602  network=self.network,
603  deployment_template='client-secure.deployment.yaml',
604  stats_port=self.client_port,
605  reuse_namespace=self.server_namespace == self.client_namespace,
606  debug_use_port_forwarding=self.debug_use_port_forwarding)
607 
608  def startSecureTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
609  test_server = self.server_runner.run(
610  replica_count=replica_count,
611  test_port=self.server_port,
612  maintenance_port=self.server_maintenance_port,
613  secure_mode=True,
614  **kwargs)[0]
615  test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
616  return test_server
617 
618  def setupSecurityPolicies(self, *, server_tls, server_mtls, client_tls,
619  client_mtls):
620  self.td.setup_client_security(server_namespace=self.server_namespace,
621  server_name=self.server_name,
622  tls=client_tls,
623  mtls=client_mtls)
624  self.td.setup_server_security(server_namespace=self.server_namespace,
625  server_name=self.server_name,
626  server_port=self.server_port,
627  tls=server_tls,
628  mtls=server_mtls)
629 
631  test_server: XdsTestServer,
632  *,
633  wait_for_active_server_channel=True,
634  **kwargs) -> XdsTestClient:
635  test_client = self.client_runner.run(server_target=test_server.xds_uri,
636  secure_mode=True,
637  **kwargs)
638  if wait_for_active_server_channel:
639  test_client.wait_for_active_server_channel()
640  return test_client
641 
642  def assertTestAppSecurity(self, mode: SecurityMode,
643  test_client: XdsTestClient,
644  test_server: XdsTestServer):
645  client_socket, server_socket = self.getConnectedSockets(
646  test_client, test_server)
647  server_security: grpc_channelz.Security = server_socket.security
648  client_security: grpc_channelz.Security = client_socket.security
649  logger.info('Server certs: %s', self.debug_sock_certs(server_security))
650  logger.info('Client certs: %s', self.debug_sock_certs(client_security))
651 
652  if mode is self.SecurityMode.MTLS:
653  self.assertSecurityMtls(client_security, server_security)
654  elif mode is self.SecurityMode.TLS:
655  self.assertSecurityTls(client_security, server_security)
656  elif mode is self.SecurityMode.PLAINTEXT:
657  self.assertSecurityPlaintext(client_security, server_security)
658  else:
659  raise TypeError('Incorrect security mode')
660 
661  def assertSecurityMtls(self, client_security: grpc_channelz.Security,
662  server_security: grpc_channelz.Security):
663  self.assertEqual(client_security.WhichOneof('model'),
664  'tls',
665  msg='(mTLS) Client socket security model must be TLS')
666  self.assertEqual(server_security.WhichOneof('model'),
667  'tls',
668  msg='(mTLS) Server socket security model must be TLS')
669  server_tls, client_tls = server_security.tls, client_security.tls
670 
671  # Confirm regular TLS: server local cert == client remote cert
672  self.assertNotEmpty(client_tls.remote_certificate,
673  msg="(mTLS) Client remote certificate is missing")
674  if self.check_local_certs:
675  self.assertNotEmpty(
676  server_tls.local_certificate,
677  msg="(mTLS) Server local certificate is missing")
678  self.assertEqual(
679  server_tls.local_certificate,
680  client_tls.remote_certificate,
681  msg="(mTLS) Server local certificate must match client's "
682  "remote certificate")
683 
684  # mTLS: server remote cert == client local cert
685  self.assertNotEmpty(server_tls.remote_certificate,
686  msg="(mTLS) Server remote certificate is missing")
687  if self.check_local_certs:
688  self.assertNotEmpty(
689  client_tls.local_certificate,
690  msg="(mTLS) Client local certificate is missing")
691  self.assertEqual(
692  server_tls.remote_certificate,
693  client_tls.local_certificate,
694  msg="(mTLS) Server remote certificate must match client's "
695  "local certificate")
696 
697  def assertSecurityTls(self, client_security: grpc_channelz.Security,
698  server_security: grpc_channelz.Security):
699  self.assertEqual(client_security.WhichOneof('model'),
700  'tls',
701  msg='(TLS) Client socket security model must be TLS')
702  self.assertEqual(server_security.WhichOneof('model'),
703  'tls',
704  msg='(TLS) Server socket security model must be TLS')
705  server_tls, client_tls = server_security.tls, client_security.tls
706 
707  # Regular TLS: server local cert == client remote cert
708  self.assertNotEmpty(client_tls.remote_certificate,
709  msg="(TLS) Client remote certificate is missing")
710  if self.check_local_certs:
711  self.assertNotEmpty(server_tls.local_certificate,
712  msg="(TLS) Server local certificate is missing")
713  self.assertEqual(
714  server_tls.local_certificate,
715  client_tls.remote_certificate,
716  msg="(TLS) Server local certificate must match client "
717  "remote certificate")
718 
719  # mTLS must not be used
720  self.assertEmpty(
721  server_tls.remote_certificate,
722  msg="(TLS) Server remote certificate must be empty in TLS mode. "
723  "Is server security incorrectly configured for mTLS?")
724  self.assertEmpty(
725  client_tls.local_certificate,
726  msg="(TLS) Client local certificate must be empty in TLS mode. "
727  "Is client security incorrectly configured for mTLS?")
728 
729  def assertSecurityPlaintext(self, client_security, server_security):
730  server_tls, client_tls = server_security.tls, client_security.tls
731  # Not TLS
732  self.assertEmpty(
733  server_tls.local_certificate,
734  msg="(Plaintext) Server local certificate must be empty.")
735  self.assertEmpty(
736  client_tls.local_certificate,
737  msg="(Plaintext) Client local certificate must be empty.")
738 
739  # Not mTLS
740  self.assertEmpty(
741  server_tls.remote_certificate,
742  msg="(Plaintext) Server remote certificate must be empty.")
743  self.assertEmpty(
744  client_tls.local_certificate,
745  msg="(Plaintext) Client local certificate must be empty.")
746 
748  self,
749  test_client: XdsTestClient,
750  *,
751  times: Optional[int] = None,
752  delay: Optional[_timedelta] = None):
753  """
754  Asserts that the client repeatedly cannot reach the server.
755 
756  With negative tests we can't be absolutely certain expected failure
757  state is not caused by something else.
758  To mitigate for this, we repeat the checks several times, and expect
759  all of them to succeed.
760 
761  This is useful in case the channel eventually stabilizes, and RPCs pass.
762 
763  Args:
764  test_client: An instance of XdsTestClient
765  times: Optional; A positive number of times to confirm that
766  the server is unreachable. Defaults to `3` attempts.
767  delay: Optional; Specifies how long to wait before the next check.
768  Defaults to `10` seconds.
769  """
770  if times is None or times < 1:
771  times = 3
772  if delay is None:
773  delay = _timedelta(seconds=10)
774 
775  for i in range(1, times + 1):
776  self.assertClientCannotReachServer(test_client)
777  if i < times:
778  logger.info('Check %s passed, waiting %s before the next check',
779  i, delay)
780  time.sleep(delay.total_seconds())
781 
782  def assertClientCannotReachServer(self, test_client: XdsTestClient):
783  self.assertClientChannelFailed(test_client)
784  self.assertFailedRpcs(test_client)
785 
786  def assertClientChannelFailed(self, test_client: XdsTestClient):
787  channel = test_client.wait_for_server_channel_state(
788  state=_ChannelState.TRANSIENT_FAILURE)
789  subchannels = list(
790  test_client.channelz.list_channel_subchannels(channel))
791  self.assertLen(subchannels,
792  1,
793  msg="Client channel must have exactly one subchannel "
794  "in state TRANSIENT_FAILURE.")
795 
796  @staticmethod
798  test_client: XdsTestClient, test_server: XdsTestServer
799  ) -> Tuple[grpc_channelz.Socket, grpc_channelz.Socket]:
800  client_sock = test_client.get_active_server_channel_socket()
801  server_sock = test_server.get_server_socket_matching_client(client_sock)
802  return client_sock, server_sock
803 
804  @classmethod
805  def debug_sock_certs(cls, security: grpc_channelz.Security):
806  if security.WhichOneof('model') == 'other':
807  return f'other: <{security.other.name}={security.other.value}>'
808 
809  return (f'local: <{cls.debug_cert(security.tls.local_certificate)}>, '
810  f'remote: <{cls.debug_cert(security.tls.remote_certificate)}>')
811 
812  @staticmethod
813  def debug_cert(cert):
814  if not cert:
815  return 'missing'
816  sha1 = hashlib.sha1(cert)
817  return f'sha1={sha1.hexdigest()}, len={len(cert)}'
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.startTestClient
XdsTestClient startTestClient(self, XdsTestServer test_server, **kwargs)
Definition: xds_k8s_testcase.py:522
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_name
server_name
Definition: xds_k8s_testcase.py:137
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.tearDown
def tearDown(self)
Definition: xds_k8s_testcase.py:435
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.getConnectedSockets
Tuple[grpc_channelz.Socket, grpc_channelz.Socket] getConnectedSockets(XdsTestClient test_client, XdsTestServer test_server)
Definition: xds_k8s_testcase.py:797
framework.xds_k8s_testcase.TrafficDirectorAppNetManager
TrafficDirectorAppNetManager
Definition: xds_k8s_testcase.py:55
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.gcp_service_account
gcp_service_account
Definition: xds_k8s_testcase.py:122
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.client_port
client_port
Definition: xds_k8s_testcase.py:146
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_port
server_port
Definition: xds_k8s_testcase.py:138
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.enable_workload_identity
enable_workload_identity
Definition: xds_k8s_testcase.py:152
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.removeServerBackends
def removeServerBackends(self, *server_runner=None)
Definition: xds_k8s_testcase.py:191
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.gcp_api_manager
gcp_api_manager
Definition: xds_k8s_testcase.py:161
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.startTestServers
List[XdsTestServer] startTestServers(self, replica_count=1, server_runner=None, **kwargs)
Definition: xds_k8s_testcase.py:506
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.initKubernetesServerRunner
KubernetesServerRunner initKubernetesServerRunner(self)
Definition: xds_k8s_testcase.py:576
framework.xds_k8s_testcase.TdPropagationRetryableError
Definition: xds_k8s_testcase.py:69
framework.xds_k8s_testcase.TrafficDirectorManager
TrafficDirectorManager
Definition: xds_k8s_testcase.py:54
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.getClientRpcStats
LoadBalancerStatsResponse getClientRpcStats(XdsTestClient test_client, int num_rpcs)
Definition: xds_k8s_testcase.py:361
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.setUpClass
def setUpClass(cls)
Definition: xds_k8s_testcase.py:553
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_maintenance_port
server_maintenance_port
Definition: xds_k8s_testcase.py:139
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_xds_host
server_xds_host
Definition: xds_k8s_testcase.py:140
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase
Definition: xds_k8s_testcase.py:543
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.debug_cert
def debug_cert(cert)
Definition: xds_k8s_testcase.py:813
google::protobuf
Definition: bloaty/third_party/protobuf/benchmarks/util/data_proto2_to_proto3_util.h:12
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase._assertRpcsEventuallyGoToGivenServers
def _assertRpcsEventuallyGoToGivenServers(self, XdsTestClient test_client, List[XdsTestServer] servers, int num_rpcs)
Definition: xds_k8s_testcase.py:267
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.setUpClass
def setUpClass(cls)
Definition: xds_k8s_testcase.py:108
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.server_runner
server_runner
Definition: xds_k8s_testcase.py:401
framework.xds_url_map_testcase.DumpedXdsConfig
Definition: xds_url_map_testcase.py:72
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase
Definition: xds_k8s_testcase.py:452
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.debug_use_port_forwarding
debug_use_port_forwarding
Definition: xds_k8s_testcase.py:150
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertRpcStatusCodes
None assertRpcStatusCodes(self, XdsTestClient test_client, *grpc.StatusCode status_code, _timedelta duration, str method)
Definition: xds_k8s_testcase.py:227
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_xds_port
server_xds_port
Definition: xds_k8s_testcase.py:141
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.server_image
server_image
Definition: xds_k8s_testcase.py:136
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertClientChannelFailed
def assertClientChannelFailed(self, XdsTestClient test_client)
Definition: xds_k8s_testcase.py:786
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.setUpClass
def setUpClass(cls)
Definition: xds_k8s_testcase.py:456
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.resource_prefix
resource_prefix
Definition: xds_k8s_testcase.py:130
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase._cleanup
def _cleanup(self)
Definition: xds_k8s_testcase.py:445
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.td_bootstrap_image
td_bootstrap_image
Definition: xds_k8s_testcase.py:123
framework.xds_k8s_testcase.KubernetesClientRunner
KubernetesClientRunner
Definition: xds_k8s_testcase.py:60
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.compute_api_version
compute_api_version
Definition: xds_k8s_testcase.py:127
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertRouteConfigUpdateTrafficHandoff
def assertRouteConfigUpdateTrafficHandoff(self, XdsTestClient test_client, str previous_route_config_version, int retry_wait_second, int timeout_second)
Definition: xds_k8s_testcase.py:312
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.force_cleanup
force_cleanup
Definition: xds_k8s_testcase.py:149
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.td
td
Definition: xds_k8s_testcase.py:396
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.client_namespace
client_namespace
Definition: xds_k8s_testcase.py:404
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertRpcsEventuallyGoToGivenServers
def assertRpcsEventuallyGoToGivenServers(self, XdsTestClient test_client, List[XdsTestServer] servers, int num_rpcs=100)
Definition: xds_k8s_testcase.py:251
xds_interop_client.int
int
Definition: xds_interop_client.py:113
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertSecurityPlaintext
def assertSecurityPlaintext(self, client_security, server_security)
Definition: xds_k8s_testcase.py:729
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.network
network
Definition: xds_k8s_testcase.py:121
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.tearDownClass
def tearDownClass(cls)
Definition: xds_k8s_testcase.py:164
grpc.StatusCode
Definition: src/python/grpcio/grpc/__init__.py:232
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
Definition: xds_k8s_testcase.py:547
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.debug_sock_certs
def debug_sock_certs(cls, grpc_channelz.Security security)
Definition: xds_k8s_testcase.py:805
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertAllBackendsReceivedRpcs
def assertAllBackendsReceivedRpcs(self, lb_stats)
Definition: xds_k8s_testcase.py:369
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.xds_server_uri
xds_server_uri
Definition: xds_k8s_testcase.py:124
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.is_supported
bool is_supported(skips.TestConfig config)
Definition: xds_k8s_testcase.py:98
framework.xds_k8s_testcase.KubernetesServerRunner
KubernetesServerRunner
Definition: xds_k8s_testcase.py:59
close
#define close
Definition: test-fs.c:48
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.xds_k8s_testcase.TrafficDirectorSecureManager
TrafficDirectorSecureManager
Definition: xds_k8s_testcase.py:56
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.initKubernetesServerRunner
KubernetesServerRunner initKubernetesServerRunner(self)
Definition: xds_k8s_testcase.py:474
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.setupSecurityPolicies
def setupSecurityPolicies(self, *server_tls, server_mtls, client_tls, client_mtls)
Definition: xds_k8s_testcase.py:618
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.resource_suffix_randomize
resource_suffix_randomize
Definition: xds_k8s_testcase.py:132
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.client_runner
client_runner
Definition: xds_k8s_testcase.py:406
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.setupTrafficDirectorGrpc
def setupTrafficDirectorGrpc(self)
Definition: xds_k8s_testcase.py:169
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertSecurityMtls
def assertSecurityMtls(self, grpc_channelz.Security client_security, grpc_channelz.Security server_security)
Definition: xds_k8s_testcase.py:661
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.startSecureTestClient
XdsTestClient startSecureTestClient(self, XdsTestServer test_server, *wait_for_active_server_channel=True, **kwargs)
Definition: xds_k8s_testcase.py:630
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertFailedRpcs
def assertFailedRpcs(self, XdsTestClient test_client, Optional[int] num_rpcs=100)
Definition: xds_k8s_testcase.py:350
framework.xds_k8s_testcase.AppNetXdsKubernetesTestCase.initTrafficDirectorManager
TrafficDirectorAppNetManager initTrafficDirectorManager(self)
Definition: xds_k8s_testcase.py:533
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.setupServerBackends
def setupServerBackends(self, *wait_for_healthy_status=True, server_runner=None, Optional[int] max_rate_per_endpoint=None)
Definition: xds_k8s_testcase.py:174
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertClientCannotReachServerRepeatedly
def assertClientCannotReachServerRepeatedly(self, XdsTestClient test_client, *Optional[int] times=None, Optional[_timedelta] delay=None)
Definition: xds_k8s_testcase.py:747
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertSecurityTls
def assertSecurityTls(self, grpc_channelz.Security client_security, grpc_channelz.Security server_security)
Definition: xds_k8s_testcase.py:697
framework.xds_k8s_testcase._timedelta
_timedelta
Definition: xds_k8s_testcase.py:63
client.run
def run()
Definition: examples/python/async_streaming/client.py:109
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.initTrafficDirectorManager
TrafficDirectorManager initTrafficDirectorManager(self)
Definition: xds_k8s_testcase.py:424
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.firewall_allowed_ports
firewall_allowed_ports
Definition: xds_k8s_testcase.py:126
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.initTrafficDirectorManager
TrafficDirectorSecureManager initTrafficDirectorManager(self)
Definition: xds_k8s_testcase.py:567
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.startSecureTestServer
XdsTestServer startSecureTestServer(self, replica_count=1, **kwargs)
Definition: xds_k8s_testcase.py:608
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.k8s_api_manager
k8s_api_manager
Definition: xds_k8s_testcase.py:157
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.initKubernetesServerRunner
KubernetesServerRunner initKubernetesServerRunner(self)
Definition: xds_k8s_testcase.py:428
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.setUp
def setUp(self)
Definition: xds_k8s_testcase.py:386
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertSuccessfulRpcs
def assertSuccessfulRpcs(self, XdsTestClient test_client, int num_rpcs=100)
Definition: xds_k8s_testcase.py:201
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.server_namespace
server_namespace
Definition: xds_k8s_testcase.py:399
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertTestAppSecurity
def assertTestAppSecurity(self, SecurityMode mode, XdsTestClient test_client, XdsTestServer test_server)
Definition: xds_k8s_testcase.py:642
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.ensure_firewall
ensure_firewall
Definition: xds_k8s_testcase.py:125
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.initTrafficDirectorManager
TrafficDirectorManager initTrafficDirectorManager(self)
Definition: xds_k8s_testcase.py:465
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.assertClientCannotReachServer
def assertClientCannotReachServer(self, XdsTestClient test_client)
Definition: xds_k8s_testcase.py:782
framework.xds_k8s_testcase.RegularXdsKubernetesTestCase.initKubernetesClientRunner
KubernetesClientRunner initKubernetesClientRunner(self)
Definition: xds_k8s_testcase.py:489
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase
Definition: xds_k8s_testcase.py:379
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.assertXdsConfigExists
def assertXdsConfigExists(self, XdsTestClient test_client)
Definition: xds_k8s_testcase.py:285
cleanup
Definition: cleanup.py:1
framework.xds_k8s_testcase.AppNetXdsKubernetesTestCase
Definition: xds_k8s_testcase.py:530
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.secondary_k8s_api_manager
secondary_k8s_api_manager
Definition: xds_k8s_testcase.py:159
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.client_name
client_name
Definition: xds_k8s_testcase.py:145
framework.xds_k8s_testcase.SecurityXdsKubernetesTestCase.initKubernetesClientRunner
KubernetesClientRunner initKubernetesClientRunner(self)
Definition: xds_k8s_testcase.py:591
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.resource_suffix
resource_suffix
Definition: xds_k8s_testcase.py:133
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.project
project
Definition: xds_k8s_testcase.py:120
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.client_image
client_image
Definition: xds_k8s_testcase.py:144
framework.xds_k8s_testcase.IsolatedXdsKubernetesTestCase.initKubernetesClientRunner
KubernetesClientRunner initKubernetesClientRunner(self)
Definition: xds_k8s_testcase.py:432
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase
Definition: xds_k8s_testcase.py:73
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.diffAccumulatedStatsPerMethod
def diffAccumulatedStatsPerMethod(grpc_testing.LoadBalancerAccumulatedStatsResponse before, grpc_testing.LoadBalancerAccumulatedStatsResponse after)
Definition: xds_k8s_testcase.py:213
framework.xds_k8s_testcase.XdsKubernetesBaseTestCase.check_local_certs
check_local_certs
Definition: xds_k8s_testcase.py:154


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:56