17 TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual
23 from typing
import Iterable, List, Optional
34 logger = logging.getLogger(__name__)
37 _timedelta = datetime.timedelta
38 _LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
39 _XdsUpdateClientConfigureServiceClient = grpc_testing.XdsUpdateClientConfigureServiceClient
40 _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
41 _ChannelzChannel = grpc_channelz.Channel
42 _ChannelzChannelState = grpc_channelz.ChannelState
43 _ChannelzSubchannel = grpc_channelz.Subchannel
44 _ChannelzSocket = grpc_channelz.Socket
45 _CsdsClient = grpc_csds.CsdsClient
50 Represents RPC services implemented in Client component of the xds test app.
51 https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client
59 rpc_host: Optional[str] =
None,
60 maintenance_port: Optional[int] =
None):
61 super().
__init__(rpc_host=(rpc_host
or ip))
68 @functools.lru_cache(
None)
74 @functools.lru_cache(
None)
80 @functools.lru_cache(
None)
85 @functools.lru_cache(
None)
86 def csds(self) -> _CsdsClient:
93 timeout_sec: Optional[int] =
None,
94 ) -> grpc_testing.LoadBalancerStatsResponse:
96 Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
99 num_rpcs=num_rpcs, timeout_sec=timeout_sec)
104 timeout_sec: Optional[int] =
None,
105 ) -> grpc_testing.LoadBalancerAccumulatedStatsResponse:
106 """Shortcut to LoadBalancerStatsServiceClient.get_client_accumulated_stats()"""
108 timeout_sec=timeout_sec)
111 """Wait for the channel to the server to transition to READY.
114 GrpcApp.NotFound: If the channel never transitioned to READY.
120 _ChannelzChannelState.READY)
123 'Retrieving client -> server socket, '
124 'channel_id: %s, subchannel: %s', channel.ref.channel_id,
125 channel.subchannel_ref[0].name)
126 subchannel, *subchannels = list(
127 self.
channelz.list_channel_subchannels(channel))
129 logger.warning(
'Unexpected subchannels: %r', subchannels)
131 socket, *sockets = list(
132 self.
channelz.list_subchannels_sockets(subchannel))
134 logger.warning(
'Unexpected sockets: %r', subchannels)
135 logger.debug(
'Found client -> server socket: %s', socket.ref.name)
140 state: _ChannelzChannelState,
142 timeout: Optional[_timedelta] =
None,
143 rpc_deadline: Optional[_timedelta] =
None) -> _ChannelzChannel:
146 if rpc_deadline
is None:
150 retryer = retryers.exponential_retryer_with_timeout(
153 timeout=
_timedelta(minutes=5)
if timeout
is None else timeout)
155 logger.info(
'Waiting for client %s to report a %s channel to %s',
156 self.
ip, _ChannelzChannelState.Name(state),
160 rpc_deadline=rpc_deadline)
161 logger.info(
'Client %s channel to %s transitioned to state %s:\n%s',
163 _ChannelzChannelState.Name(state), channel)
168 state: _ChannelzChannelState,
170 rpc_deadline: Optional[_timedelta] =
None,
171 check_subchannel=
True) -> _ChannelzChannel:
173 if rpc_deadline
is not None:
174 rpc_params[
'deadline_sec'] = rpc_deadline.total_seconds()
177 channel_state: _ChannelzChannelState = channel.data.state.state
178 logger.info(
'Server channel: %s, state: %s', channel.ref.name,
179 _ChannelzChannelState.Name(channel_state))
180 if channel_state
is state:
186 channel, state, **rpc_params)
187 logger.info(
'Found subchannel in state %s: %s',
188 _ChannelzChannelState.Name(state),
192 logger.info(e.message)
197 f
'Client has no {_ChannelzChannelState.Name(state)} channel with '
205 state: _ChannelzChannelState,
206 **kwargs) -> _ChannelzSubchannel:
207 subchannels = self.
channelz.list_channel_subchannels(channel, **kwargs)
208 for subchannel
in subchannels:
209 if subchannel.data.state.state
is state:
213 f
'Not found a {_ChannelzChannelState.Name(state)} '
214 f
'subchannel for channel_id {channel.ref.channel_id}')
217 **kwargs) -> List[_ChannelzSubchannel]:
219 for channel
in self.
channelz.find_channels_for_target(
221 for subchannel
in self.
channelz.list_channel_subchannels(
223 if subchannel.data.state.state
is state:
224 subchannels.append(subchannel)
237 gcp_api_manager: gcp.api.GcpApiManager,
239 gcp_service_account: str,
242 service_account_name=
None,
244 deployment_template=
'client.deployment.yaml',
245 service_account_template=
'service-account.yaml',
246 reuse_namespace=
False,
247 namespace_template=
None,
248 debug_use_port_forwarding=
False,
249 enable_workload_identity=
True):
250 super().
__init__(k8s_namespace, namespace_template, reuse_namespace)
278 self.
gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
281 self.
deployment: Optional[k8s.V1Deployment] =
None
295 print_response=False) -> XdsTestClient:
297 'Deploying xDS test client "%s" to k8s namespace %s: '
298 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
300 server_target, rpc, qps, metadata, secure_mode, print_response)
302 namespace_name=self.k8s_namespace.name,
320 namespace_name=self.k8s_namespace.name,
328 namespace_name=self.k8s_namespace.name,
334 server_target=server_target,
338 secure_mode=secure_mode,
339 config_mesh=config_mesh,
340 print_response=print_response)
345 pod = self.k8s_namespace.list_deployment_pods(self.
deployment)[0]
347 pod_ip = pod.status.pod_ip
353 logger.info(
'LOCAL DEV MODE: Enabling port forwarding to %s:%s',
362 server_target=server_target,
365 def cleanup(self, *, force=False, force_namespace=False):
379 super().
cleanup(force=force_namespace
and force)
383 resource_prefix: str,
384 resource_suffix: str,
385 name: str =
'client') -> str:
386 """A helper to make consistent XdsTestClient kubernetes namespace name
387 for given resource prefix and suffix.
389 Note: the idea is to intentionally produce different namespace name for
390 the test server, and the test client, as that closely mimics real-world