20 from typing
import List, Optional, Tuple
22 from kubernetes
import client
23 from kubernetes
import utils
24 import kubernetes.config
28 logger = logging.getLogger(__name__)
30 V1Deployment = client.V1Deployment
31 V1ServiceAccount = client.V1ServiceAccount
33 V1PodList = client.V1PodList
34 V1Service = client.V1Service
35 V1Namespace = client.V1Namespace
36 ApiException = client.ApiException
41 def wrap_not_found_return_none(*args, **kwargs):
43 return func(*args, **kwargs)
44 except client.ApiException
as e:
50 return wrap_not_found_return_none
54 return ','.join(f
'{k}=={v}' for k, v
in labels.items())
69 @functools.lru_cache(
None)
71 client_instance = kubernetes.config.new_client_from_config(
73 logger.info(
'Using kubernetes context "%s", active host: %s', context,
74 client_instance.configuration.host)
75 return client_instance
79 """Error forwarding port"""
83 PORT_FORWARD_LOCAL_ADDRESS: str =
'127.0.0.1'
90 local_port: Optional[int] =
None,
91 local_address: Optional[str] =
None):
96 self.
local_address = local_address
or self.PORT_FORWARD_LOCAL_ADDRESS
98 self.
subprocess: Optional[subprocess.Popen] =
None
102 port_mapping = f
"{self.local_port}:{self.remote_port}"
104 port_mapping = f
":{self.remote_port}"
111 stdout=subprocess.PIPE,
112 stderr=subprocess.STDOUT,
113 universal_newlines=
True)
116 local_port_expected = (
117 f
"Forwarding from {self.local_address}:{self.local_port}"
118 f
" -> {self.remote_port}")
120 local_port_re = re.compile(
121 f
"Forwarding from {self.local_address}:([0-9]+) -> {self.remote_port}"
126 output = self.
subprocess.stdout.readline().strip()
129 if return_code
is not None:
132 for error
in self.
subprocess.stdout.readlines()
135 'Error forwarding port, kubectl return '
136 f
'code {return_code}, output {errors}')
143 if output != local_port_expected:
145 f
'Error forwarding port, unexpected output {output}'
148 groups = local_port_re.search(output)
151 f
'Error forwarding port, unexpected output {output}'
164 logger.info(
'Shutting down port forwarding, pid %s',
167 stdout, _ = self.
subprocess.communicate(timeout=5)
168 logger.info(
'Port forwarding stopped')
169 logger.debug(
'Port forwarding remaining stdout: %s', stdout)
174 NEG_STATUS_META =
'cloud.google.com/neg-status'
175 DELETE_GRACE_PERIOD_SEC: int = 5
176 WAIT_SHORT_TIMEOUT_SEC: int = 60
177 WAIT_SHORT_SLEEP_SEC: int = 1
178 WAIT_MEDIUM_TIMEOUT_SEC: int = 5 * 60
179 WAIT_MEDIUM_SLEEP_SEC: int = 10
180 WAIT_LONG_TIMEOUT_SEC: int = 10 * 60
181 WAIT_LONG_SLEEP_SEC: int = 30
183 def __init__(self, api: KubernetesApiManager, name: str):
188 return utils.create_from_dict(self.
api.client,
194 return self.
api.core.read_namespaced_service(name, self.
name)
198 return self.
api.core.read_namespaced_service_account(name, self.
name)
202 grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
203 self.
api.core.delete_namespaced_service(
206 body=client.V1DeleteOptions(
207 propagation_policy=
'Foreground',
208 grace_period_seconds=grace_period_seconds))
212 grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
213 self.
api.core.delete_namespaced_service_account(
216 body=client.V1DeleteOptions(
217 propagation_policy=
'Foreground',
218 grace_period_seconds=grace_period_seconds))
221 def get(self) -> V1Namespace:
222 return self.
api.core.read_namespace(self.
name)
224 def delete(self, grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
225 self.
api.core.delete_namespace(
227 body=client.V1DeleteOptions(
228 propagation_policy=
'Foreground',
229 grace_period_seconds=grace_period_seconds))
233 timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
234 wait_sec=WAIT_SHORT_SLEEP_SEC):
236 @retrying.retry(retry_on_result=
lambda r: r
is not None,
237 stop_max_delay=timeout_sec * 1000,
238 wait_fixed=wait_sec * 1000)
239 def _wait_for_deleted_service_with_retry():
241 if service
is not None:
242 logger.debug(
'Waiting for service %s to be deleted',
243 service.metadata.name)
246 _wait_for_deleted_service_with_retry()
250 timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
251 wait_sec=WAIT_SHORT_SLEEP_SEC):
253 @retrying.retry(retry_on_result=
lambda r: r
is not None,
254 stop_max_delay=timeout_sec * 1000,
255 wait_fixed=wait_sec * 1000)
256 def _wait_for_deleted_service_account_with_retry():
258 if service_account
is not None:
259 logger.debug(
'Waiting for service account %s to be deleted',
260 service_account.metadata.name)
261 return service_account
263 _wait_for_deleted_service_account_with_retry()
266 timeout_sec=WAIT_LONG_TIMEOUT_SEC,
267 wait_sec=WAIT_LONG_SLEEP_SEC):
269 @retrying.retry(retry_on_result=
lambda r: r
is not None,
270 stop_max_delay=timeout_sec * 1000,
271 wait_fixed=wait_sec * 1000)
272 def _wait_for_deleted_namespace_with_retry():
273 namespace = self.
get()
274 if namespace
is not None:
275 logger.debug(
'Waiting for namespace %s to be deleted',
276 namespace.metadata.name)
279 _wait_for_deleted_namespace_with_retry()
283 timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
284 wait_sec=WAIT_SHORT_SLEEP_SEC):
286 @retrying.retry(retry_on_result=
lambda r:
not r,
287 stop_max_delay=timeout_sec * 1000,
288 wait_fixed=wait_sec * 1000)
289 def _wait_for_service_neg():
292 logger.debug(
'Waiting for service %s NEG',
293 service.metadata.name)
297 _wait_for_service_neg()
300 service_port: int) -> Tuple[str, List[str]]:
302 neg_info: dict = json.loads(
304 neg_name: str = neg_info[
'network_endpoint_groups'][
str(service_port)]
305 neg_zones: List[str] = neg_info[
'zones']
306 return neg_name, neg_zones
310 return self.
api.apps.read_namespaced_deployment(name, self.
name)
314 grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
315 self.
api.apps.delete_namespaced_deployment(
318 body=client.V1DeleteOptions(
319 propagation_policy=
'Foreground',
320 grace_period_seconds=grace_period_seconds))
330 timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
331 wait_sec=WAIT_MEDIUM_SLEEP_SEC):
335 stop_max_delay=timeout_sec * 1000,
336 wait_fixed=wait_sec * 1000)
337 def _wait_for_deployment_available_replicas():
340 'Waiting for deployment %s to have %s available '
341 'replicas, current count %s', deployment.metadata.name, count,
342 deployment.status.available_replicas)
345 _wait_for_deployment_available_replicas()
348 deployment_name: str,
349 timeout_sec=WAIT_MEDIUM_TIMEOUT_SEC,
350 wait_sec=WAIT_MEDIUM_SLEEP_SEC):
352 @retrying.retry(retry_on_result=
lambda r: r
is not None,
353 stop_max_delay=timeout_sec * 1000,
354 wait_fixed=wait_sec * 1000)
355 def _wait_for_deleted_deployment_with_retry():
357 if deployment
is not None:
359 'Waiting for deployment %s to be deleted. '
360 'Non-terminated replicas: %s', deployment.metadata.name,
361 deployment.status.replicas)
364 _wait_for_deleted_deployment_with_retry()
367 pod_list: V1PodList = self.
api.core.list_namespaced_pod(
369 return pod_list.items
372 return self.
api.core.read_namespaced_pod(name, self.
name)
376 timeout_sec=WAIT_SHORT_TIMEOUT_SEC,
377 wait_sec=WAIT_SHORT_SLEEP_SEC):
379 @retrying.retry(retry_on_result=
lambda r:
not self.
_pod_started(r),
380 stop_max_delay=timeout_sec * 1000,
381 wait_fixed=wait_sec * 1000)
382 def _wait_for_pod_started():
384 logger.debug(
'Waiting for pod %s to start, current phase: %s',
385 pod.metadata.name, pod.status.phase)
388 _wait_for_pod_started()
394 local_port: Optional[int] =
None,
395 local_address: Optional[str] =
None,
398 f
"pod/{pod.metadata.name}", remote_port, local_port,
405 return pod.status.phase
not in (
'Pending',
'Unknown')
409 return (deployment
is not None and
410 deployment.status.available_replicas
is not None and
411 deployment.status.available_replicas >= count)