k8s.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import functools
15 import json
16 import logging
17 import re
18 import subprocess
19 import time
20 from typing import List, Optional, Tuple
21 
22 from kubernetes import client
23 from kubernetes import utils
24 import kubernetes.config
25 # TODO(sergiitk): replace with tenacity
26 import retrying
27 
28 logger = logging.getLogger(__name__)
29 # Type aliases
30 V1Deployment = client.V1Deployment
31 V1ServiceAccount = client.V1ServiceAccount
32 V1Pod = client.V1Pod
33 V1PodList = client.V1PodList
34 V1Service = client.V1Service
35 V1Namespace = client.V1Namespace
36 ApiException = client.ApiException
37 
38 
40 
41  def wrap_not_found_return_none(*args, **kwargs):
42  try:
43  return func(*args, **kwargs)
44  except client.ApiException as e:
45  if e.status == 404:
46  # Ignore 404
47  return None
48  raise
49 
50  return wrap_not_found_return_none
51 
52 
53 def label_dict_to_selector(labels: dict) -> str:
54  return ','.join(f'{k}=={v}' for k, v in labels.items())
55 
56 
58 
59  def __init__(self, context):
60  self.context = context
62  self.apps = client.AppsV1Api(self.client)
63  self.core = client.CoreV1Api(self.client)
64 
65  def close(self):
66  self.client.close()
67 
68  @classmethod
69  @functools.lru_cache(None)
70  def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
71  client_instance = kubernetes.config.new_client_from_config(
72  context=context)
73  logger.info('Using kubernetes context "%s", active host: %s', context,
74  client_instance.configuration.host)
75  return client_instance
76 
77 
78 class PortForwardingError(Exception):
79  """Error forwarding port"""
80 
81 
82 class PortForwarder:
83  PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1'
84 
85  def __init__(self,
86  context: str,
87  namespace: str,
88  destination: str,
89  remote_port: int,
90  local_port: Optional[int] = None,
91  local_address: Optional[str] = None):
92  self.context = context
93  self.namespace = namespace
94  self.destination = destination
95  self.remote_port = remote_port
96  self.local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS
97  self.local_port: Optional[int] = local_port
98  self.subprocess: Optional[subprocess.Popen] = None
99 
100  def connect(self) -> None:
101  if self.local_port:
102  port_mapping = f"{self.local_port}:{self.remote_port}"
103  else:
104  port_mapping = f":{self.remote_port}"
105  cmd = [
106  "kubectl", "--context", self.context, "--namespace", self.namespace,
107  "port-forward", "--address", self.local_address, self.destination,
108  port_mapping
109  ]
110  self.subprocess = subprocess.Popen(cmd,
111  stdout=subprocess.PIPE,
112  stderr=subprocess.STDOUT,
113  universal_newlines=True)
114  # Wait for stdout line indicating successful start.
115  if self.local_port:
116  local_port_expected = (
117  f"Forwarding from {self.local_address}:{self.local_port}"
118  f" -> {self.remote_port}")
119  else:
120  local_port_re = re.compile(
121  f"Forwarding from {self.local_address}:([0-9]+) -> {self.remote_port}"
122  )
123  try:
124  while True:
125  time.sleep(0.05)
126  output = self.subprocess.stdout.readline().strip()
127  if not output:
128  return_code = self.subprocess.poll()
129  if return_code is not None:
130  errors = [
131  error
132  for error in self.subprocess.stdout.readlines()
133  ]
134  raise PortForwardingError(
135  'Error forwarding port, kubectl return '
136  f'code {return_code}, output {errors}')
137  # If there is no output, and the subprocess is not exiting,
138  # continue waiting for the log line.
139  continue
140 
141  # Validate output log
142  if self.local_port:
143  if output != local_port_expected:
144  raise PortForwardingError(
145  f'Error forwarding port, unexpected output {output}'
146  )
147  else:
148  groups = local_port_re.search(output)
149  if groups is None:
150  raise PortForwardingError(
151  f'Error forwarding port, unexpected output {output}'
152  )
153  # Update local port to the randomly picked one
154  self.local_port = int(groups[1])
155 
156  logger.info(output)
157  break
158  except Exception:
159  self.close()
160  raise
161 
162  def close(self) -> None:
163  if self.subprocess is not None:
164  logger.info('Shutting down port forwarding, pid %s',
165  self.subprocess.pid)
166  self.subprocess.kill()
167  stdout, _ = self.subprocess.communicate(timeout=5)
168  logger.info('Port forwarding stopped')
169  logger.debug('Port forwarding remaining stdout: %s', stdout)
170  self.subprocess = None
171 
172 
173 class KubernetesNamespace: # pylint: disable=too-many-public-methods
174  NEG_STATUS_META = 'cloud.google.com/neg-status'
175  DELETE_GRACE_PERIOD_SEC: int = 5
176  WAIT_SHORT_TIMEOUT_SEC: int = 60
177  WAIT_SHORT_SLEEP_SEC: int = 1
178  WAIT_MEDIUM_TIMEOUT_SEC: int = 5 * 60
179  WAIT_MEDIUM_SLEEP_SEC: int = 10
180  WAIT_LONG_TIMEOUT_SEC: int = 10 * 60
181  WAIT_LONG_SLEEP_SEC: int = 30
182 
183  def __init__(self, api: KubernetesApiManager, name: str):
184  self.name = name
185  self.api = api
186 
187  def apply_manifest(self, manifest):
188  return utils.create_from_dict(self.api.client,
189  manifest,
190  namespace=self.name)
191 
192  @simple_resource_get
193  def get_service(self, name) -> V1Service:
194  return self.api.core.read_namespaced_service(name, self.name)
195 
196  @simple_resource_get
197  def get_service_account(self, name) -> V1Service:
198  return self.api.core.read_namespaced_service_account(name, self.name)
199 
200  def delete_service(self,
201  name,
202  grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
203  self.api.core.delete_namespaced_service(
204  name=name,
205  namespace=self.name,
206  body=client.V1DeleteOptions(
207  propagation_policy='Foreground',
208  grace_period_seconds=grace_period_seconds))
209 
211  name,
212  grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
213  self.api.core.delete_namespaced_service_account(
214  name=name,
215  namespace=self.name,
216  body=client.V1DeleteOptions(
217  propagation_policy='Foreground',
218  grace_period_seconds=grace_period_seconds))
219 
220  @simple_resource_get
221  def get(self) -> V1Namespace:
222  return self.api.core.read_namespace(self.name)
223 
224  def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
225  self.api.core.delete_namespace(
226  name=self.name,
227  body=client.V1DeleteOptions(
228  propagation_policy='Foreground',
229  grace_period_seconds=grace_period_seconds))
230 
232  name: str,
233  timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
234  wait_sec=WAIT_SHORT_SLEEP_SEC):
235 
236  @retrying.retry(retry_on_result=lambda r: r is not None,
237  stop_max_delay=timeout_sec * 1000,
238  wait_fixed=wait_sec * 1000)
239  def _wait_for_deleted_service_with_retry():
240  service = self.get_service(name)
241  if service is not None:
242  logger.debug('Waiting for service %s to be deleted',
243  service.metadata.name)
244  return service
245 
246  _wait_for_deleted_service_with_retry()
247 
249  name: str,
250  timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
251  wait_sec=WAIT_SHORT_SLEEP_SEC):
252 
253  @retrying.retry(retry_on_result=lambda r: r is not None,
254  stop_max_delay=timeout_sec * 1000,
255  wait_fixed=wait_sec * 1000)
256  def _wait_for_deleted_service_account_with_retry():
257  service_account = self.get_service_account(name)
258  if service_account is not None:
259  logger.debug('Waiting for service account %s to be deleted',
260  service_account.metadata.name)
261  return service_account
262 
263  _wait_for_deleted_service_account_with_retry()
264 
266  timeout_sec=WAIT_LONG_TIMEOUT_SEC,
267  wait_sec=WAIT_LONG_SLEEP_SEC):
268 
269  @retrying.retry(retry_on_result=lambda r: r is not None,
270  stop_max_delay=timeout_sec * 1000,
271  wait_fixed=wait_sec * 1000)
272  def _wait_for_deleted_namespace_with_retry():
273  namespace = self.get()
274  if namespace is not None:
275  logger.debug('Waiting for namespace %s to be deleted',
276  namespace.metadata.name)
277  return namespace
278 
279  _wait_for_deleted_namespace_with_retry()
280 
282  name: str,
283  timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
284  wait_sec=WAIT_SHORT_SLEEP_SEC):
285 
286  @retrying.retry(retry_on_result=lambda r: not r,
287  stop_max_delay=timeout_sec * 1000,
288  wait_fixed=wait_sec * 1000)
289  def _wait_for_service_neg():
290  service = self.get_service(name)
291  if self.NEG_STATUS_META not in service.metadata.annotations:
292  logger.debug('Waiting for service %s NEG',
293  service.metadata.name)
294  return False
295  return True
296 
297  _wait_for_service_neg()
298 
299  def get_service_neg(self, service_name: str,
300  service_port: int) -> Tuple[str, List[str]]:
301  service = self.get_service(service_name)
302  neg_info: dict = json.loads(
303  service.metadata.annotations[self.NEG_STATUS_META])
304  neg_name: str = neg_info['network_endpoint_groups'][str(service_port)]
305  neg_zones: List[str] = neg_info['zones']
306  return neg_name, neg_zones
307 
308  @simple_resource_get
309  def get_deployment(self, name) -> V1Deployment:
310  return self.api.apps.read_namespaced_deployment(name, self.name)
311 
313  name,
314  grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
315  self.api.apps.delete_namespaced_deployment(
316  name=name,
317  namespace=self.name,
318  body=client.V1DeleteOptions(
319  propagation_policy='Foreground',
320  grace_period_seconds=grace_period_seconds))
321 
322  def list_deployment_pods(self, deployment: V1Deployment) -> List[V1Pod]:
323  # V1LabelSelector.match_expressions not supported at the moment
324  return self.list_pods_with_labels(deployment.spec.selector.match_labels)
325 
327  self,
328  name,
329  count=1,
330  timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
331  wait_sec=WAIT_MEDIUM_SLEEP_SEC):
332 
333  @retrying.retry(
334  retry_on_result=lambda r: not self._replicas_available(r, count),
335  stop_max_delay=timeout_sec * 1000,
336  wait_fixed=wait_sec * 1000)
337  def _wait_for_deployment_available_replicas():
338  deployment = self.get_deployment(name)
339  logger.debug(
340  'Waiting for deployment %s to have %s available '
341  'replicas, current count %s', deployment.metadata.name, count,
342  deployment.status.available_replicas)
343  return deployment
344 
345  _wait_for_deployment_available_replicas()
346 
348  deployment_name: str,
349  timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
350  wait_sec=WAIT_MEDIUM_SLEEP_SEC):
351 
352  @retrying.retry(retry_on_result=lambda r: r is not None,
353  stop_max_delay=timeout_sec * 1000,
354  wait_fixed=wait_sec * 1000)
355  def _wait_for_deleted_deployment_with_retry():
356  deployment = self.get_deployment(deployment_name)
357  if deployment is not None:
358  logger.debug(
359  'Waiting for deployment %s to be deleted. '
360  'Non-terminated replicas: %s', deployment.metadata.name,
361  deployment.status.replicas)
362  return deployment
363 
364  _wait_for_deleted_deployment_with_retry()
365 
366  def list_pods_with_labels(self, labels: dict) -> List[V1Pod]:
367  pod_list: V1PodList = self.api.core.list_namespaced_pod(
368  self.name, label_selector=label_dict_to_selector(labels))
369  return pod_list.items
370 
371  def get_pod(self, name) -> client.V1Pod:
372  return self.api.core.read_namespaced_pod(name, self.name)
373 
375  pod_name,
376  timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
377  wait_sec=WAIT_SHORT_SLEEP_SEC):
378 
379  @retrying.retry(retry_on_result=lambda r: not self._pod_started(r),
380  stop_max_delay=timeout_sec * 1000,
381  wait_fixed=wait_sec * 1000)
382  def _wait_for_pod_started():
383  pod = self.get_pod(pod_name)
384  logger.debug('Waiting for pod %s to start, current phase: %s',
385  pod.metadata.name, pod.status.phase)
386  return pod
387 
388  _wait_for_pod_started()
389 
391  self,
392  pod: V1Pod,
393  remote_port: int,
394  local_port: Optional[int] = None,
395  local_address: Optional[str] = None,
396  ) -> PortForwarder:
397  pf = PortForwarder(self.api.context, self.name,
398  f"pod/{pod.metadata.name}", remote_port, local_port,
399  local_address)
400  pf.connect()
401  return pf
402 
403  @staticmethod
404  def _pod_started(pod: V1Pod):
405  return pod.status.phase not in ('Pending', 'Unknown')
406 
407  @staticmethod
408  def _replicas_available(deployment, count):
409  return (deployment is not None and
410  deployment.status.available_replicas is not None and
411  deployment.status.available_replicas >= count)
xds_interop_client.str
str
Definition: xds_interop_client.py:487
framework.infrastructure.k8s.KubernetesNamespace.wait_for_service_deleted
def wait_for_service_deleted(self, str name, timeout_sec=WAIT_SHORT_TIMEOUT_SEC, wait_sec=WAIT_SHORT_SLEEP_SEC)
Definition: k8s.py:231
framework.infrastructure.k8s.PortForwardingError
Definition: k8s.py:78
framework.infrastructure.k8s.PortForwarder.subprocess
subprocess
Definition: k8s.py:110
framework.infrastructure.k8s.KubernetesNamespace.NEG_STATUS_META
string NEG_STATUS_META
Definition: k8s.py:174
framework.infrastructure.k8s.KubernetesApiManager.__init__
def __init__(self, context)
Definition: k8s.py:59
framework.infrastructure.k8s.KubernetesNamespace.port_forward_pod
PortForwarder port_forward_pod(self, V1Pod pod, int remote_port, Optional[int] local_port=None, Optional[str] local_address=None)
Definition: k8s.py:390
framework.infrastructure.k8s.simple_resource_get
def simple_resource_get(func)
Definition: k8s.py:39
framework.infrastructure.k8s.KubernetesNamespace.wait_for_service_neg
def wait_for_service_neg(self, str name, timeout_sec=WAIT_SHORT_TIMEOUT_SEC, wait_sec=WAIT_SHORT_SLEEP_SEC)
Definition: k8s.py:281
framework.infrastructure.k8s.KubernetesNamespace.delete_deployment
def delete_deployment(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC)
Definition: k8s.py:312
framework.infrastructure.k8s.PortForwarder.remote_port
remote_port
Definition: k8s.py:89
framework.infrastructure.k8s.KubernetesNamespace.delete_service_account
def delete_service_account(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC)
Definition: k8s.py:210
framework.infrastructure.k8s.KubernetesNamespace._pod_started
def _pod_started(V1Pod pod)
Definition: k8s.py:404
framework.infrastructure.k8s.KubernetesNamespace.apply_manifest
def apply_manifest(self, manifest)
Definition: k8s.py:187
framework.infrastructure.k8s.PortForwarder.close
None close(self)
Definition: k8s.py:162
framework.infrastructure.k8s.KubernetesNamespace.get
V1Namespace get(self)
Definition: k8s.py:221
framework.infrastructure.k8s.KubernetesNamespace.get_pod
client.V1Pod get_pod(self, name)
Definition: k8s.py:371
framework.infrastructure.k8s.KubernetesNamespace.wait_for_service_account_deleted
def wait_for_service_account_deleted(self, str name, timeout_sec=WAIT_SHORT_TIMEOUT_SEC, wait_sec=WAIT_SHORT_SLEEP_SEC)
Definition: k8s.py:248
framework.infrastructure.k8s.KubernetesApiManager.core
core
Definition: k8s.py:63
framework.infrastructure.k8s.PortForwarder.__init__
def __init__(self, str context, str namespace, str destination, int remote_port, Optional[int] local_port=None, Optional[str] local_address=None)
Definition: k8s.py:85
framework.infrastructure.k8s.PortForwarder.local_port
local_port
Definition: k8s.py:154
framework.infrastructure.k8s.PortForwarder.context
context
Definition: k8s.py:86
framework.infrastructure.k8s.KubernetesNamespace.__init__
def __init__(self, KubernetesApiManager api, str name)
Definition: k8s.py:183
framework.infrastructure.k8s.PortForwarder
Definition: k8s.py:82
framework.infrastructure.k8s.PortForwarder.connect
None connect(self)
Definition: k8s.py:100
framework.infrastructure.k8s.KubernetesApiManager.context
context
Definition: k8s.py:60
framework.infrastructure.k8s.KubernetesNamespace.get_deployment
V1Deployment get_deployment(self, name)
Definition: k8s.py:309
framework.infrastructure.k8s.KubernetesNamespace.get_service_account
V1Service get_service_account(self, name)
Definition: k8s.py:197
framework.infrastructure.k8s.KubernetesApiManager
Definition: k8s.py:57
xds_interop_client.int
int
Definition: xds_interop_client.py:113
framework.infrastructure.k8s.label_dict_to_selector
str label_dict_to_selector(dict labels)
Definition: k8s.py:53
framework.infrastructure.k8s.PortForwarder.local_address
local_address
Definition: k8s.py:90
framework.infrastructure.k8s.KubernetesNamespace.get_service_neg
Tuple[str, List[str]] get_service_neg(self, str service_name, int service_port)
Definition: k8s.py:299
framework.infrastructure.k8s.KubernetesNamespace.list_pods_with_labels
List[V1Pod] list_pods_with_labels(self, dict labels)
Definition: k8s.py:366
framework.infrastructure.k8s.KubernetesNamespace._replicas_available
def _replicas_available(deployment, count)
Definition: k8s.py:408
framework.infrastructure.k8s.KubernetesNamespace
Definition: k8s.py:173
framework.infrastructure.k8s.KubernetesNamespace.wait_for_pod_started
def wait_for_pod_started(self, pod_name, timeout_sec=WAIT_SHORT_TIMEOUT_SEC, wait_sec=WAIT_SHORT_SLEEP_SEC)
Definition: k8s.py:374
framework.infrastructure.k8s.KubernetesApiManager.close
def close(self)
Definition: k8s.py:65
framework.infrastructure.k8s.KubernetesNamespace.get_service
V1Service get_service(self, name)
Definition: k8s.py:193
framework.infrastructure.k8s.KubernetesNamespace.wait_for_deployment_available_replicas
def wait_for_deployment_available_replicas(self, name, count=1, timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC, wait_sec=WAIT_MEDIUM_SLEEP_SEC)
Definition: k8s.py:326
framework.infrastructure.k8s.KubernetesNamespace.delete
def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC)
Definition: k8s.py:224
framework.infrastructure.k8s.KubernetesApiManager.client
client
Definition: k8s.py:61
func
const EVP_CIPHER *(* func)(void)
Definition: cipher_extra.c:73
framework.infrastructure.k8s.PortForwarder.destination
destination
Definition: k8s.py:88
framework.infrastructure.k8s.KubernetesNamespace.list_deployment_pods
List[V1Pod] list_deployment_pods(self, V1Deployment deployment)
Definition: k8s.py:322
framework.infrastructure.k8s.KubernetesNamespace.api
api
Definition: k8s.py:185
framework.infrastructure.k8s.PortForwarder.namespace
namespace
Definition: k8s.py:87
framework.infrastructure.k8s.KubernetesApiManager.apps
apps
Definition: k8s.py:62
framework.infrastructure.k8s.KubernetesNamespace.name
name
Definition: k8s.py:184
framework.infrastructure.k8s.KubernetesApiManager._cached_api_client_for_context
client.ApiClient _cached_api_client_for_context(cls, str context)
Definition: k8s.py:70
framework.infrastructure.k8s.KubernetesNamespace.wait_for_deployment_deleted
def wait_for_deployment_deleted(self, str deployment_name, timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC, wait_sec=WAIT_MEDIUM_SLEEP_SEC)
Definition: k8s.py:347
framework.infrastructure.k8s.KubernetesNamespace.delete_service
def delete_service(self, name, grace_period_seconds=DELETE_GRACE_PERIOD_SEC)
Definition: k8s.py:200
framework.infrastructure.k8s.KubernetesNamespace.wait_for_namespace_deleted
def wait_for_namespace_deleted(self, timeout_sec=WAIT_LONG_TIMEOUT_SEC, wait_sec=WAIT_LONG_SLEEP_SEC)
Definition: k8s.py:265


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:27