base_runner.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import contextlib
15 import datetime
16 import logging
17 import pathlib
18 from typing import Dict, Optional
19 import urllib.parse
20 
21 import mako.template
22 import yaml
23 
26 from framework.infrastructure import gcp
27 from framework.infrastructure import k8s
28 
29 logger = logging.getLogger(__name__)
30 
31 # Type aliases
33 _helper_datetime = framework.helpers.datetime
34 timedelta = datetime.timedelta
35 
36 
37 def _logs_explorer_query(query: Dict[str, str]) -> str:
38  return '\n'.join(f'{k}="{v}"' for k, v in query.items())
39 
40 
41 def _logs_explorer_request(req: Dict[str, str]) -> str:
42  return ';'.join(f'{k}={_logs_explorer_quote(v)}' for k, v in req.items())
43 
44 
45 def _logs_explorer_quote(value: str):
46  return urllib.parse.quote_plus(value, safe=':')
47 
48 
49 class RunnerError(Exception):
50  """Error running app"""
51 
52 
54  TEMPLATE_DIR_NAME = 'kubernetes-manifests'
55  TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}'
56  ROLE_WORKLOAD_IDENTITY_USER = 'roles/iam.workloadIdentityUser'
57 
58  def __init__(self,
59  k8s_namespace,
60  namespace_template=None,
61  reuse_namespace=False):
63 
64  # Kubernetes namespaced resources manager
65  self.k8s_namespace: k8s.KubernetesNamespace = k8s_namespace
66  self.reuse_namespace = reuse_namespace
67  self.namespace_template = namespace_template or 'namespace.yaml'
68 
69  # Mutable state
70  self.namespace: Optional[k8s.V1Namespace] = None
71 
72  def run(self, **kwargs):
73  del kwargs
74  if self.reuse_namespace:
76  if not self.namespace:
77  self.namespace = self._create_namespace(
78  self.namespace_template, namespace_name=self.k8s_namespace.name)
79 
80  def cleanup(self, *, force=False):
81  if (self.namespace and not self.reuse_namespace) or force:
82  self.delete_namespace()
83  self.namespace = None
84 
85  @staticmethod
86  def _render_template(template_file, **kwargs):
87  template = mako.template.Template(filename=str(template_file))
88  return template.render(**kwargs)
89 
90  @staticmethod
91  def _manifests_from_yaml_file(yaml_file):
92  with open(yaml_file) as f:
93  with contextlib.closing(yaml.safe_load_all(f)) as yml:
94  for manifest in yml:
95  yield manifest
96 
97  @staticmethod
98  def _manifests_from_str(document):
99  with contextlib.closing(yaml.safe_load_all(document)) as yml:
100  for manifest in yml:
101  yield manifest
102 
103  @classmethod
104  def _template_file_from_name(cls, template_name):
105  templates_path = (pathlib.Path(__file__).parent /
107  return templates_path.joinpath(template_name).resolve()
108 
109  def _create_from_template(self, template_name, **kwargs):
110  template_file = self._template_file_from_name(template_name)
111  logger.debug("Loading k8s manifest template: %s", template_file)
112 
113  yaml_doc = self._render_template(template_file, **kwargs)
114  logger.info("Rendered template %s/%s:\n%s", self.TEMPLATE_DIR_NAME,
115  template_name, self._highlighter.highlight(yaml_doc))
116 
117  manifests = self._manifests_from_str(yaml_doc)
118  manifest = next(manifests)
119  # Error out on multi-document yaml
120  if next(manifests, False):
121  raise RunnerError('Exactly one document expected in manifest '
122  f'{template_file}')
123  k8s_objects = self.k8s_namespace.apply_manifest(manifest)
124  if len(k8s_objects) != 1:
125  raise RunnerError('Expected exactly one object must created from '
126  f'manifest {template_file}')
127 
128  logger.info('%s %s created', k8s_objects[0].kind,
129  k8s_objects[0].metadata.name)
130  return k8s_objects[0]
131 
132  def _reuse_deployment(self, deployment_name) -> k8s.V1Deployment:
133  deployment = self.k8s_namespace.get_deployment(deployment_name)
134  # TODO(sergiitk): check if good or must be recreated
135  return deployment
136 
137  def _reuse_service(self, service_name) -> k8s.V1Service:
138  service = self.k8s_namespace.get_service(service_name)
139  # TODO(sergiitk): check if good or must be recreated
140  return service
141 
142  def _reuse_namespace(self) -> k8s.V1Namespace:
143  return self.k8s_namespace.get()
144 
145  def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace:
146  namespace = self._create_from_template(template, **kwargs)
147  if not isinstance(namespace, k8s.V1Namespace):
148  raise RunnerError('Expected V1Namespace to be created '
149  f'from manifest {template}')
150  if namespace.metadata.name != kwargs['namespace_name']:
151  raise RunnerError('V1Namespace created with unexpected name: '
152  f'{namespace.metadata.name}')
153  logger.debug('V1Namespace %s created at %s',
154  namespace.metadata.self_link,
155  namespace.metadata.creation_timestamp)
156  return namespace
157 
158  @staticmethod
159  def _get_workload_identity_member_name(project, namespace_name,
160  service_account_name):
161  """
162  Returns workload identity member name used to authenticate Kubernetes
163  service accounts.
164 
165  https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity
166  """
167  return (f'serviceAccount:{project}.svc.id.goog'
168  f'[{namespace_name}/{service_account_name}]')
169 
170  def _grant_workload_identity_user(self, *, gcp_iam, gcp_service_account,
171  service_account_name):
172  workload_identity_member = self._get_workload_identity_member_name(
173  gcp_iam.project, self.k8s_namespace.name, service_account_name)
174  logger.info('Granting %s to %s for GCP Service Account %s',
175  self.ROLE_WORKLOAD_IDENTITY_USER, workload_identity_member,
176  gcp_service_account)
177 
178  gcp_iam.add_service_account_iam_policy_binding(
179  gcp_service_account, self.ROLE_WORKLOAD_IDENTITY_USER,
180  workload_identity_member)
181 
182  def _revoke_workload_identity_user(self, *, gcp_iam, gcp_service_account,
183  service_account_name):
184  workload_identity_member = self._get_workload_identity_member_name(
185  gcp_iam.project, self.k8s_namespace.name, service_account_name)
186  logger.info('Revoking %s from %s for GCP Service Account %s',
187  self.ROLE_WORKLOAD_IDENTITY_USER, workload_identity_member,
188  gcp_service_account)
189  try:
190  gcp_iam.remove_service_account_iam_policy_binding(
191  gcp_service_account, self.ROLE_WORKLOAD_IDENTITY_USER,
192  workload_identity_member)
193  except gcp.api.Error as error:
194  logger.warning('Failed %s from %s for Service Account %s: %r',
196  workload_identity_member, gcp_service_account, error)
197 
198  def _create_service_account(self, template,
199  **kwargs) -> k8s.V1ServiceAccount:
200  resource = self._create_from_template(template, **kwargs)
201  if not isinstance(resource, k8s.V1ServiceAccount):
202  raise RunnerError('Expected V1ServiceAccount to be created '
203  f'from manifest {template}')
204  if resource.metadata.name != kwargs['service_account_name']:
205  raise RunnerError('V1ServiceAccount created with unexpected name: '
206  f'{resource.metadata.name}')
207  logger.debug('V1ServiceAccount %s created at %s',
208  resource.metadata.self_link,
209  resource.metadata.creation_timestamp)
210  return resource
211 
212  def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment:
213  deployment = self._create_from_template(template, **kwargs)
214  if not isinstance(deployment, k8s.V1Deployment):
215  raise RunnerError('Expected V1Deployment to be created '
216  f'from manifest {template}')
217  if deployment.metadata.name != kwargs['deployment_name']:
218  raise RunnerError('V1Deployment created with unexpected name: '
219  f'{deployment.metadata.name}')
220  logger.debug('V1Deployment %s created at %s',
221  deployment.metadata.self_link,
222  deployment.metadata.creation_timestamp)
223  return deployment
224 
225  def _create_service(self, template, **kwargs) -> k8s.V1Service:
226  service = self._create_from_template(template, **kwargs)
227  if not isinstance(service, k8s.V1Service):
228  raise RunnerError('Expected V1Service to be created '
229  f'from manifest {template}')
230  if service.metadata.name != kwargs['service_name']:
231  raise RunnerError('V1Service created with unexpected name: '
232  f'{service.metadata.name}')
233  logger.debug('V1Service %s created at %s', service.metadata.self_link,
234  service.metadata.creation_timestamp)
235  return service
236 
237  def _delete_deployment(self, name, wait_for_deletion=True):
238  logger.info('Deleting deployment %s', name)
239  try:
240  self.k8s_namespace.delete_deployment(name)
241  except k8s.ApiException as e:
242  logger.info('Deployment %s deletion failed, error: %s %s', name,
243  e.status, e.reason)
244  return
245 
246  if wait_for_deletion:
247  self.k8s_namespace.wait_for_deployment_deleted(name)
248  logger.debug('Deployment %s deleted', name)
249 
250  def _delete_service(self, name, wait_for_deletion=True):
251  logger.info('Deleting service %s', name)
252  try:
253  self.k8s_namespace.delete_service(name)
254  except k8s.ApiException as e:
255  logger.info('Service %s deletion failed, error: %s %s', name,
256  e.status, e.reason)
257  return
258 
259  if wait_for_deletion:
260  self.k8s_namespace.wait_for_service_deleted(name)
261  logger.debug('Service %s deleted', name)
262 
263  def _delete_service_account(self, name, wait_for_deletion=True):
264  logger.info('Deleting service account %s', name)
265  try:
266  self.k8s_namespace.delete_service_account(name)
267  except k8s.ApiException as e:
268  logger.info('Service account %s deletion failed, error: %s %s',
269  name, e.status, e.reason)
270  return
271 
272  if wait_for_deletion:
273  self.k8s_namespace.wait_for_service_account_deleted(name)
274  logger.debug('Service account %s deleted', name)
275 
276  def delete_namespace(self, wait_for_deletion=True):
277  logger.info('Deleting namespace %s', self.k8s_namespace.name)
278  try:
279  self.k8s_namespace.delete()
280  except k8s.ApiException as e:
281  logger.info('Namespace %s deletion failed, error: %s %s',
282  self.k8s_namespace.name, e.status, e.reason)
283  return
284 
285  if wait_for_deletion:
286  self.k8s_namespace.wait_for_namespace_deleted()
287  logger.debug('Namespace %s deleted', self.k8s_namespace.name)
288 
289  def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs):
290  logger.info('Waiting for deployment %s to have %s available replica(s)',
291  name, count)
292  self.k8s_namespace.wait_for_deployment_available_replicas(
293  name, count, **kwargs)
294  deployment = self.k8s_namespace.get_deployment(name)
295  logger.info('Deployment %s has %i replicas available',
296  deployment.metadata.name,
297  deployment.status.available_replicas)
298 
299  def _wait_pod_started(self, name, **kwargs):
300  logger.info('Waiting for pod %s to start', name)
301  self.k8s_namespace.wait_for_pod_started(name, **kwargs)
302  pod = self.k8s_namespace.get_pod(name)
303  logger.info('Pod %s ready, IP: %s', pod.metadata.name,
304  pod.status.pod_ip)
305 
306  def _wait_service_neg(self, name, service_port, **kwargs):
307  logger.info('Waiting for NEG for service %s', name)
308  self.k8s_namespace.wait_for_service_neg(name, **kwargs)
309  neg_name, neg_zones = self.k8s_namespace.get_service_neg(
310  name, service_port)
311  logger.info("Service %s: detected NEG=%s in zones=%s", name, neg_name,
312  neg_zones)
313 
314  @staticmethod
316  deployment_name: str,
317  namespace_name: str,
318  gcp_project: str,
319  gcp_ui_url: str,
320  end_delta: Optional[timedelta] = None) -> None:
321  """Output the link to test server/client logs in GCP Logs Explorer."""
322  if end_delta is None:
323  end_delta = timedelta(hours=1)
324 
325  time_now = _helper_datetime.iso8601_utc_time()
326  time_end = _helper_datetime.iso8601_utc_time(end_delta)
327  query = _logs_explorer_query({
328  'resource.type': 'k8s_container',
329  'resource.labels.project_id': gcp_project,
330  'resource.labels.container_name': deployment_name,
331  'resource.labels.namespace_name': namespace_name,
332  })
333  req = _logs_explorer_request({
334  'query': query,
335  'timeRange': f'{time_now}/{time_end}',
336  })
337 
338  link = f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}'
339  # A whitespace at the end to indicate the end of the url.
340  logger.info("GCP Logs Explorer link to %s:\n%s ", deployment_name, link)
341 
342  @staticmethod
343  def _make_namespace_name(resource_prefix: str, resource_suffix: str,
344  name: str) -> str:
345  """A helper to make consistent test app kubernetes namespace name
346  for given resource prefix and suffix."""
347  parts = [resource_prefix, name]
348  # Avoid trailing dash when the suffix is empty.
349  if resource_suffix:
350  parts.append(resource_suffix)
351  return '-'.join(parts)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
framework.test_app.base_runner.KubernetesBaseRunner.namespace_template
namespace_template
Definition: base_runner.py:64
framework.test_app.base_runner.KubernetesBaseRunner.__init__
def __init__(self, k8s_namespace, namespace_template=None, reuse_namespace=False)
Definition: base_runner.py:58
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
framework.test_app.base_runner.KubernetesBaseRunner
Definition: base_runner.py:53
framework.test_app.base_runner.KubernetesBaseRunner._render_template
def _render_template(template_file, **kwargs)
Definition: base_runner.py:86
framework.helpers.datetime
Definition: datetime.py:1
framework.test_app.base_runner.KubernetesBaseRunner._delete_service
def _delete_service(self, name, wait_for_deletion=True)
Definition: base_runner.py:250
framework.test_app.base_runner.KubernetesBaseRunner._grant_workload_identity_user
def _grant_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:170
framework.test_app.base_runner._logs_explorer_request
str _logs_explorer_request(Dict[str, str] req)
Definition: base_runner.py:41
framework.test_app.base_runner.KubernetesBaseRunner._wait_deployment_with_available_replicas
def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs)
Definition: base_runner.py:289
framework.test_app.base_runner.KubernetesBaseRunner._manifests_from_yaml_file
def _manifests_from_yaml_file(yaml_file)
Definition: base_runner.py:91
framework.test_app.base_runner.KubernetesBaseRunner._wait_pod_started
def _wait_pod_started(self, name, **kwargs)
Definition: base_runner.py:299
run_xds_tests.delete
delete
Definition: run_xds_tests.py:3329
framework.test_app.base_runner.timedelta
timedelta
Definition: base_runner.py:34
framework.test_app.base_runner.KubernetesBaseRunner.TEMPLATE_DIR_NAME
string TEMPLATE_DIR_NAME
Definition: base_runner.py:54
framework.test_app.base_runner.KubernetesBaseRunner.run
def run(self, **kwargs)
Definition: base_runner.py:72
framework.test_app.base_runner.KubernetesBaseRunner._highlighter
_highlighter
Definition: base_runner.py:59
framework.test_app.base_runner.KubernetesBaseRunner._logs_explorer_link
None _logs_explorer_link(*str deployment_name, str namespace_name, str gcp_project, str gcp_ui_url, Optional[timedelta] end_delta=None)
Definition: base_runner.py:315
framework.test_app.base_runner._logs_explorer_query
str _logs_explorer_query(Dict[str, str] query)
Definition: base_runner.py:37
framework.test_app.base_runner.KubernetesBaseRunner._get_workload_identity_member_name
def _get_workload_identity_member_name(project, namespace_name, service_account_name)
Definition: base_runner.py:159
framework.test_app.base_runner.KubernetesBaseRunner.reuse_namespace
reuse_namespace
Definition: base_runner.py:63
framework.test_app.base_runner.KubernetesBaseRunner._delete_deployment
def _delete_deployment(self, name, wait_for_deletion=True)
Definition: base_runner.py:237
framework.test_app.base_runner.KubernetesBaseRunner.cleanup
def cleanup(self, *force=False)
Definition: base_runner.py:80
framework.test_app.base_runner.KubernetesBaseRunner._revoke_workload_identity_user
def _revoke_workload_identity_user(self, *gcp_iam, gcp_service_account, service_account_name)
Definition: base_runner.py:182
framework.test_app.base_runner.KubernetesBaseRunner._create_service_account
k8s.V1ServiceAccount _create_service_account(self, template, **kwargs)
Definition: base_runner.py:198
framework.infrastructure
Definition: tools/run_tests/xds_k8s_test_driver/framework/infrastructure/__init__.py:1
framework.test_app.base_runner.KubernetesBaseRunner._delete_service_account
def _delete_service_account(self, name, wait_for_deletion=True)
Definition: base_runner.py:263
framework.test_app.base_runner.KubernetesBaseRunner._reuse_namespace
k8s.V1Namespace _reuse_namespace(self)
Definition: base_runner.py:142
framework.test_app.base_runner.KubernetesBaseRunner._create_deployment
k8s.V1Deployment _create_deployment(self, template, **kwargs)
Definition: base_runner.py:212
framework.test_app.base_runner.KubernetesBaseRunner._manifests_from_str
def _manifests_from_str(document)
Definition: base_runner.py:98
framework.helpers.highlighter.HighlighterYaml
Definition: highlighter.py:91
framework.test_app.base_runner.KubernetesBaseRunner.namespace
namespace
Definition: base_runner.py:75
framework.test_app.base_runner.KubernetesBaseRunner._make_namespace_name
str _make_namespace_name(str resource_prefix, str resource_suffix, str name)
Definition: base_runner.py:343
framework.test_app.base_runner.KubernetesBaseRunner.ROLE_WORKLOAD_IDENTITY_USER
string ROLE_WORKLOAD_IDENTITY_USER
Definition: base_runner.py:56
framework.test_app.base_runner.KubernetesBaseRunner._wait_service_neg
def _wait_service_neg(self, name, service_port, **kwargs)
Definition: base_runner.py:306
framework.test_app.base_runner.KubernetesBaseRunner._create_namespace
k8s.V1Namespace _create_namespace(self, template, **kwargs)
Definition: base_runner.py:145
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
framework.test_app.base_runner.RunnerError
Definition: base_runner.py:49
framework.test_app.base_runner.KubernetesBaseRunner._create_service
k8s.V1Service _create_service(self, template, **kwargs)
Definition: base_runner.py:225
framework.test_app.base_runner.KubernetesBaseRunner.delete_namespace
def delete_namespace(self, wait_for_deletion=True)
Definition: base_runner.py:276
open
#define open
Definition: test-fs.c:46
framework.test_app.base_runner.KubernetesBaseRunner._create_from_template
def _create_from_template(self, template_name, **kwargs)
Definition: base_runner.py:109
framework.test_app.base_runner.KubernetesBaseRunner.TEMPLATE_DIR_RELATIVE_PATH
string TEMPLATE_DIR_RELATIVE_PATH
Definition: base_runner.py:55
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
framework.test_app.base_runner.KubernetesBaseRunner._reuse_deployment
k8s.V1Deployment _reuse_deployment(self, deployment_name)
Definition: base_runner.py:132
framework.test_app.base_runner._logs_explorer_quote
def _logs_explorer_quote(str value)
Definition: base_runner.py:45
framework.test_app.base_runner.KubernetesBaseRunner._template_file_from_name
def _template_file_from_name(cls, template_name)
Definition: base_runner.py:104
framework.test_app.base_runner._HighlighterYaml
_HighlighterYaml
Definition: base_runner.py:32
framework.test_app.base_runner.KubernetesBaseRunner._reuse_service
k8s.V1Service _reuse_service(self, service_name)
Definition: base_runner.py:137
framework.helpers.highlighter
Definition: highlighter.py:1


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:36