xds_interop_client.py
Go to the documentation of this file.
1 # Copyright 2020 The gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import argparse
16 import collections
17 from concurrent import futures
18 import datetime
19 import logging
20 import signal
21 import sys
22 import threading
23 import time
24 from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple
25 
26 import grpc
27 import grpc_admin
28 from grpc_channelz.v1 import channelz
29 
30 from src.proto.grpc.testing import empty_pb2
31 from src.proto.grpc.testing import messages_pb2
32 from src.proto.grpc.testing import test_pb2
33 from src.proto.grpc.testing import test_pb2_grpc
34 
35 logger = logging.getLogger()
36 console_handler = logging.StreamHandler()
37 formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
38 console_handler.setFormatter(formatter)
39 logger.addHandler(console_handler)
40 
41 _SUPPORTED_METHODS = (
42  "UnaryCall",
43  "EmptyCall",
44 )
45 
46 _METHOD_CAMEL_TO_CAPS_SNAKE = {
47  "UnaryCall": "UNARY_CALL",
48  "EmptyCall": "EMPTY_CALL",
49 }
50 
51 _METHOD_STR_TO_ENUM = {
52  "UnaryCall": messages_pb2.ClientConfigureRequest.UNARY_CALL,
53  "EmptyCall": messages_pb2.ClientConfigureRequest.EMPTY_CALL,
54 }
55 
56 _METHOD_ENUM_TO_STR = {v: k for k, v in _METHOD_STR_TO_ENUM.items()}
57 
58 PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]]
59 
60 _CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500)
61 
62 
64  _start: int
65  _end: int
66  _rpcs_needed: int
67  _rpcs_by_peer: DefaultDict[str, int]
68  _rpcs_by_method: DefaultDict[str, DefaultDict[str, int]]
69  _no_remote_peer: int
70  _lock: threading.Lock
71  _condition: threading.Condition
72 
73  def __init__(self, start: int, end: int):
74  self._start = start
75  self._end = end
76  self._rpcs_needed = end - start
77  self._rpcs_by_peer = collections.defaultdict(int)
78  self._rpcs_by_method = collections.defaultdict(
79  lambda: collections.defaultdict(int))
80  self._condition = threading.Condition()
81  self._no_remote_peer = 0
82 
83  def on_rpc_complete(self, request_id: int, peer: str, method: str) -> None:
84  """Records statistics for a single RPC."""
85  if self._start <= request_id < self._end:
86  with self._condition:
87  if not peer:
88  self._no_remote_peer += 1
89  else:
90  self._rpcs_by_peer[peer] += 1
91  self._rpcs_by_method[method][peer] += 1
92  self._rpcs_needed -= 1
93  self._condition.notify()
94 
96  self, timeout_sec: int) -> messages_pb2.LoadBalancerStatsResponse:
97  """Blocks until a full response has been collected."""
98  with self._condition:
99  self._condition.wait_for(lambda: not self._rpcs_needed,
100  timeout=float(timeout_sec))
101  response = messages_pb2.LoadBalancerStatsResponse()
102  for peer, count in self._rpcs_by_peer.items():
103  response.rpcs_by_peer[peer] = count
104  for method, count_by_peer in self._rpcs_by_method.items():
105  for peer, count in count_by_peer.items():
106  response.rpcs_by_method[method].rpcs_by_peer[peer] = count
107  response.num_failures = self._no_remote_peer + self._rpcs_needed
108  return response
109 
110 
111 _global_lock = threading.Lock()
112 _stop_event = threading.Event()
113 _global_rpc_id: int = 0
114 _watchers: Set[_StatsWatcher] = set()
115 _global_server = None
116 _global_rpcs_started: Mapping[str, int] = collections.defaultdict(int)
117 _global_rpcs_succeeded: Mapping[str, int] = collections.defaultdict(int)
118 _global_rpcs_failed: Mapping[str, int] = collections.defaultdict(int)
119 
120 # Mapping[method, Mapping[status_code, count]]
121 _global_rpc_statuses: Mapping[str, Mapping[int, int]] = collections.defaultdict(
122  lambda: collections.defaultdict(int))
123 
124 
125 def _handle_sigint(sig, frame) -> None:
126  logger.warning("Received SIGINT")
127  _stop_event.set()
128  _global_server.stop(None)
129 
130 
131 class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
132  ):
133 
134  def __init__(self):
135  super(_LoadBalancerStatsServicer).__init__()
136 
138  self, request: messages_pb2.LoadBalancerStatsRequest,
139  context: grpc.ServicerContext
140  ) -> messages_pb2.LoadBalancerStatsResponse:
141  logger.info("Received stats request.")
142  start = None
143  end = None
144  watcher = None
145  with _global_lock:
146  start = _global_rpc_id + 1
147  end = start + request.num_rpcs
148  watcher = _StatsWatcher(start, end)
149  _watchers.add(watcher)
150  response = watcher.await_rpc_stats_response(request.timeout_sec)
151  with _global_lock:
152  _watchers.remove(watcher)
153  logger.info("Returning stats response: %s", response)
154  return response
155 
157  self, request: messages_pb2.LoadBalancerAccumulatedStatsRequest,
158  context: grpc.ServicerContext
159  ) -> messages_pb2.LoadBalancerAccumulatedStatsResponse:
160  logger.info("Received cumulative stats request.")
161  response = messages_pb2.LoadBalancerAccumulatedStatsResponse()
162  with _global_lock:
163  for method in _SUPPORTED_METHODS:
164  caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method]
165  response.num_rpcs_started_by_method[
166  caps_method] = _global_rpcs_started[method]
167  response.num_rpcs_succeeded_by_method[
168  caps_method] = _global_rpcs_succeeded[method]
169  response.num_rpcs_failed_by_method[
170  caps_method] = _global_rpcs_failed[method]
171  response.stats_per_method[
172  caps_method].rpcs_started = _global_rpcs_started[method]
173  for code, count in _global_rpc_statuses[method].items():
174  response.stats_per_method[caps_method].result[code] = count
175  logger.info("Returning cumulative stats response.")
176  return response
177 
178 
179 def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]],
180  request_id: int, stub: test_pb2_grpc.TestServiceStub,
181  timeout: float, futures: Mapping[int, Tuple[grpc.Future,
182  str]]) -> None:
183  logger.debug(f"Sending {method} request to backend: {request_id}")
184  if method == "UnaryCall":
185  future = stub.UnaryCall.future(messages_pb2.SimpleRequest(),
186  metadata=metadata,
187  timeout=timeout)
188  elif method == "EmptyCall":
189  future = stub.EmptyCall.future(empty_pb2.Empty(),
190  metadata=metadata,
191  timeout=timeout)
192  else:
193  raise ValueError(f"Unrecognized method '{method}'.")
194  futures[request_id] = (future, method)
195 
196 
197 def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str,
198  print_response: bool) -> None:
199  exception = future.exception()
200  hostname = ""
201  with _global_lock:
202  _global_rpc_statuses[method][future.code().value[0]] += 1
203  if exception is not None:
204  with _global_lock:
205  _global_rpcs_failed[method] += 1
206  if exception.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
207  logger.error(f"RPC {rpc_id} timed out")
208  else:
209  logger.error(exception)
210  else:
211  response = future.result()
212  hostname = None
213  for metadatum in future.initial_metadata():
214  if metadatum[0] == "hostname":
215  hostname = metadatum[1]
216  break
217  else:
218  hostname = response.hostname
219  if future.code() == grpc.StatusCode.OK:
220  with _global_lock:
221  _global_rpcs_succeeded[method] += 1
222  else:
223  with _global_lock:
224  _global_rpcs_failed[method] += 1
225  if print_response:
226  if future.code() == grpc.StatusCode.OK:
227  logger.debug("Successful response.")
228  else:
229  logger.debug(f"RPC failed: {call}")
230  with _global_lock:
231  for watcher in _watchers:
232  watcher.on_rpc_complete(rpc_id, hostname, method)
233 
234 
235 def _remove_completed_rpcs(futures: Mapping[int, grpc.Future],
236  print_response: bool) -> None:
237  logger.debug("Removing completed RPCs")
238  done = []
239  for future_id, (future, method) in futures.items():
240  if future.done():
241  _on_rpc_done(future_id, future, method, args.print_response)
242  done.append(future_id)
243  for rpc_id in done:
244  del futures[rpc_id]
245 
246 
247 def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
248  logger.info("Cancelling all remaining RPCs")
249  for future, _ in futures.values():
250  future.cancel()
251 
252 
254  """Configuration for a single client channel.
255 
256  Instances of this class are meant to be dealt with as PODs. That is,
257  data member should be accessed directly. This class is not thread-safe.
258  When accessing any of its members, the lock member should be held.
259  """
260 
261  def __init__(self, method: str, metadata: Sequence[Tuple[str, str]],
262  qps: int, server: str, rpc_timeout_sec: int,
263  print_response: bool, secure_mode: bool):
264  # condition is signalled when a change is made to the config.
265  self.condition = threading.Condition()
266 
267  self.method = method
268  self.metadata = metadata
269  self.qps = qps
270  self.server = server
271  self.rpc_timeout_sec = rpc_timeout_sec
272  self.print_response = print_response
273  self.secure_mode = secure_mode
274 
275 
276 def _run_single_channel(config: _ChannelConfiguration) -> None:
277  global _global_rpc_id # pylint: disable=global-statement
278  with config.condition:
279  server = config.server
280  channel = None
281  if config.secure_mode:
283  channel_creds = grpc.xds_channel_credentials(fallback_creds)
284  channel = grpc.secure_channel(server, channel_creds)
285  else:
286  channel = grpc.insecure_channel(server)
287  with channel:
288  stub = test_pb2_grpc.TestServiceStub(channel)
289  futures: Dict[int, Tuple[grpc.Future, str]] = {}
290  while not _stop_event.is_set():
291  with config.condition:
292  if config.qps == 0:
293  config.condition.wait(
294  timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds())
295  continue
296  else:
297  duration_per_query = 1.0 / float(config.qps)
298  request_id = None
299  with _global_lock:
300  request_id = _global_rpc_id
301  _global_rpc_id += 1
302  _global_rpcs_started[config.method] += 1
303  start = time.time()
304  end = start + duration_per_query
305  _start_rpc(config.method, config.metadata, request_id, stub,
306  float(config.rpc_timeout_sec), futures)
307  print_response = config.print_response
308  _remove_completed_rpcs(futures, config.print_response)
309  logger.debug(f"Currently {len(futures)} in-flight RPCs")
310  now = time.time()
311  while now < end:
312  time.sleep(end - now)
313  now = time.time()
314  _cancel_all_rpcs(futures)
315 
316 
318  test_pb2_grpc.XdsUpdateClientConfigureServiceServicer):
319 
320  def __init__(self, per_method_configs: Mapping[str, _ChannelConfiguration],
321  qps: int):
322  super(_XdsUpdateClientConfigureServicer).__init__()
323  self._per_method_configs = per_method_configs
324  self._qps = qps
325 
327  self, request: messages_pb2.ClientConfigureRequest,
328  context: grpc.ServicerContext
329  ) -> messages_pb2.ClientConfigureResponse:
330  logger.info("Received Configure RPC: %s", request)
331  method_strs = [_METHOD_ENUM_TO_STR[t] for t in request.types]
332  for method in _SUPPORTED_METHODS:
333  method_enum = _METHOD_STR_TO_ENUM[method]
334  channel_config = self._per_method_configs[method]
335  if method in method_strs:
336  qps = self._qps
337  metadata = ((md.key, md.value)
338  for md in request.metadata
339  if md.type == method_enum)
340  # For backward compatibility, do not change timeout when we
341  # receive a default value timeout.
342  if request.timeout_sec == 0:
343  timeout_sec = channel_config.rpc_timeout_sec
344  else:
345  timeout_sec = request.timeout_sec
346  else:
347  qps = 0
348  metadata = ()
349  # Leave timeout unchanged for backward compatibility.
350  timeout_sec = channel_config.rpc_timeout_sec
351  with channel_config.condition:
352  channel_config.qps = qps
353  channel_config.metadata = list(metadata)
354  channel_config.rpc_timeout_sec = timeout_sec
355  channel_config.condition.notify_all()
356  return messages_pb2.ClientConfigureResponse()
357 
358 
360  """An object grouping together threads driving RPCs for a method."""
361 
362  _channel_threads: List[threading.Thread]
363 
364  def __init__(self, num_channels: int,
365  channel_config: _ChannelConfiguration):
366  """Creates and starts a group of threads running the indicated method."""
367  self._channel_threads = []
368  for i in range(num_channels):
369  thread = threading.Thread(target=_run_single_channel,
370  args=(channel_config,))
371  thread.start()
372  self._channel_threads.append(thread)
373 
374  def stop(self) -> None:
375  """Joins all threads referenced by the handle."""
376  for channel_thread in self._channel_threads:
377  channel_thread.join()
378 
379 
380 def _run(args: argparse.Namespace, methods: Sequence[str],
381  per_method_metadata: PerMethodMetadataType) -> None:
382  logger.info("Starting python xDS Interop Client.")
383  global _global_server # pylint: disable=global-statement
384  method_handles = []
385  channel_configs = {}
386  for method in _SUPPORTED_METHODS:
387  if method in methods:
388  qps = args.qps
389  else:
390  qps = 0
391  channel_config = _ChannelConfiguration(
392  method, per_method_metadata.get(method, []), qps, args.server,
393  args.rpc_timeout_sec, args.print_response, args.secure_mode)
394  channel_configs[method] = channel_config
395  method_handles.append(_MethodHandle(args.num_channels, channel_config))
396  _global_server = grpc.server(futures.ThreadPoolExecutor())
397  _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}")
398  test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server(
399  _LoadBalancerStatsServicer(), _global_server)
400  test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(
401  _XdsUpdateClientConfigureServicer(channel_configs, args.qps),
402  _global_server)
403  channelz.add_channelz_servicer(_global_server)
404  grpc_admin.add_admin_servicers(_global_server)
405  _global_server.start()
406  _global_server.wait_for_termination()
407  for method_handle in method_handles:
408  method_handle.stop()
409 
410 
411 def parse_metadata_arg(metadata_arg: str) -> PerMethodMetadataType:
412  metadata = metadata_arg.split(",") if args.metadata else []
413  per_method_metadata = collections.defaultdict(list)
414  for metadatum in metadata:
415  elems = metadatum.split(":")
416  if len(elems) != 3:
417  raise ValueError(
418  f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'")
419  if elems[0] not in _SUPPORTED_METHODS:
420  raise ValueError(f"Unrecognized method '{elems[0]}'")
421  per_method_metadata[elems[0]].append((elems[1], elems[2]))
422  return per_method_metadata
423 
424 
425 def parse_rpc_arg(rpc_arg: str) -> Sequence[str]:
426  methods = rpc_arg.split(",")
427  if set(methods) - set(_SUPPORTED_METHODS):
428  raise ValueError("--rpc supported methods: {}".format(
429  ", ".join(_SUPPORTED_METHODS)))
430  return methods
431 
432 
433 def bool_arg(arg: str) -> bool:
434  if arg.lower() in ("true", "yes", "y"):
435  return True
436  elif arg.lower() in ("false", "no", "n"):
437  return False
438  else:
439  raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
440 
441 
442 if __name__ == "__main__":
443  parser = argparse.ArgumentParser(
444  description='Run Python XDS interop client.')
445  parser.add_argument(
446  "--num_channels",
447  default=1,
448  type=int,
449  help="The number of channels from which to send requests.")
450  parser.add_argument("--print_response",
451  default="False",
452  type=bool_arg,
453  help="Write RPC response to STDOUT.")
454  parser.add_argument(
455  "--qps",
456  default=1,
457  type=int,
458  help="The number of queries to send from each channel per second.")
459  parser.add_argument("--rpc_timeout_sec",
460  default=30,
461  type=int,
462  help="The per-RPC timeout in seconds.")
463  parser.add_argument("--server",
464  default="localhost:50051",
465  help="The address of the server.")
466  parser.add_argument(
467  "--stats_port",
468  default=50052,
469  type=int,
470  help="The port on which to expose the peer distribution stats service.")
471  parser.add_argument(
472  "--secure_mode",
473  default="False",
474  type=bool_arg,
475  help="If specified, uses xDS credentials to connect to the server.")
476  parser.add_argument('--verbose',
477  help='verbose log output',
478  default=False,
479  action='store_true')
480  parser.add_argument("--log_file",
481  default=None,
482  type=str,
483  help="A file to log to.")
484  rpc_help = "A comma-delimited list of RPC methods to run. Must be one of "
485  rpc_help += ", ".join(_SUPPORTED_METHODS)
486  rpc_help += "."
487  parser.add_argument("--rpc", default="UnaryCall", type=str, help=rpc_help)
488  metadata_help = (
489  "A comma-delimited list of 3-tuples of the form " +
490  "METHOD:KEY:VALUE, e.g. " +
491  "EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3")
492  parser.add_argument("--metadata", default="", type=str, help=metadata_help)
493  args = parser.parse_args()
494  signal.signal(signal.SIGINT, _handle_sigint)
495  if args.verbose:
496  logger.setLevel(logging.DEBUG)
497  if args.log_file:
498  file_handler = logging.FileHandler(args.log_file, mode='a')
499  file_handler.setFormatter(formatter)
500  logger.addHandler(file_handler)
501  _run(args, parse_rpc_arg(args.rpc), parse_metadata_arg(args.metadata))
xds_interop_client._ChannelConfiguration.method
method
Definition: xds_interop_client.py:265
xds_interop_client._run_single_channel
None _run_single_channel(_ChannelConfiguration config)
Definition: xds_interop_client.py:276
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
grpc.insecure_channel
def insecure_channel(target, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1962
grpc::experimental.insecure_channel_credentials
def insecure_channel_credentials()
Definition: src/python/grpcio/grpc/experimental/__init__.py:51
http2_test_server.format
format
Definition: http2_test_server.py:118
xds_interop_client._XdsUpdateClientConfigureServicer._per_method_configs
_per_method_configs
Definition: xds_interop_client.py:322
xds_interop_client._on_rpc_done
None _on_rpc_done(int rpc_id, grpc.Future future, str method, bool print_response)
Definition: xds_interop_client.py:197
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
xds_interop_client._ChannelConfiguration.server
server
Definition: xds_interop_client.py:268
xds_interop_client._run
None _run(argparse.Namespace args, Sequence[str] methods, PerMethodMetadataType per_method_metadata)
Definition: xds_interop_client.py:380
xds_interop_client._StatsWatcher._no_remote_peer
_no_remote_peer
Definition: xds_interop_client.py:81
xds_interop_client._MethodHandle.__init__
def __init__(self, int num_channels, _ChannelConfiguration channel_config)
Definition: xds_interop_client.py:364
xds_interop_client._StatsWatcher._start
_start
Definition: xds_interop_client.py:74
xds_interop_client._remove_completed_rpcs
None _remove_completed_rpcs(Mapping[int, grpc.Future] futures, bool print_response)
Definition: xds_interop_client.py:235
xds_interop_client._handle_sigint
None _handle_sigint(sig, frame)
Definition: xds_interop_client.py:125
xds_interop_client._ChannelConfiguration.print_response
print_response
Definition: xds_interop_client.py:270
xds_interop_client._LoadBalancerStatsServicer
Definition: xds_interop_client.py:132
xds_interop_client._StatsWatcher._rpcs_by_method
_rpcs_by_method
Definition: xds_interop_client.py:78
xds_interop_client._LoadBalancerStatsServicer.__init__
def __init__(self)
Definition: xds_interop_client.py:134
xds_interop_client._ChannelConfiguration.condition
condition
Definition: xds_interop_client.py:263
xds_interop_client._StatsWatcher
Definition: xds_interop_client.py:63
xds_interop_client._ChannelConfiguration.__init__
def __init__(self, str method, Sequence[Tuple[str, str]] metadata, int qps, str server, int rpc_timeout_sec, bool print_response, bool secure_mode)
Definition: xds_interop_client.py:261
xds_interop_client._StatsWatcher.await_rpc_stats_response
messages_pb2.LoadBalancerStatsResponse await_rpc_stats_response(self, int timeout_sec)
Definition: xds_interop_client.py:95
xds_interop_client._XdsUpdateClientConfigureServicer._qps
_qps
Definition: xds_interop_client.py:323
xds_interop_client._XdsUpdateClientConfigureServicer.Configure
messages_pb2.ClientConfigureResponse Configure(self, messages_pb2.ClientConfigureRequest request, grpc.ServicerContext context)
Definition: xds_interop_client.py:326
xds_interop_client._StatsWatcher.on_rpc_complete
None on_rpc_complete(self, int request_id, str peer, str method)
Definition: xds_interop_client.py:83
grpc.Future
Definition: src/python/grpcio/grpc/__init__.py:48
xds_interop_client._StatsWatcher._rpcs_by_peer
_rpcs_by_peer
Definition: xds_interop_client.py:77
grpc.server
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False)
Definition: src/python/grpcio/grpc/__init__.py:2034
xds_interop_client.parse_metadata_arg
PerMethodMetadataType parse_metadata_arg(str metadata_arg)
Definition: xds_interop_client.py:411
xds_manager.items
items
Definition: xds_manager.py:55
xds_interop_client._StatsWatcher._rpcs_needed
_rpcs_needed
Definition: xds_interop_client.py:76
xds_interop_client._ChannelConfiguration
Definition: xds_interop_client.py:253
xds_interop_client._StatsWatcher.__init__
def __init__(self, int start, int end)
Definition: xds_interop_client.py:73
grpc.ServicerContext
Definition: src/python/grpcio/grpc/__init__.py:1083
xds_interop_client._ChannelConfiguration.secure_mode
secure_mode
Definition: xds_interop_client.py:271
grpc_channelz.v1
Definition: src/python/grpcio_channelz/grpc_channelz/v1/__init__.py:1
xds_interop_client._XdsUpdateClientConfigureServicer
Definition: xds_interop_client.py:318
xds_interop_client._cancel_all_rpcs
None _cancel_all_rpcs(Mapping[int, Tuple[grpc.Future, str]] futures)
Definition: xds_interop_client.py:247
grpc.xds_channel_credentials
def xds_channel_credentials(fallback_credentials=None)
Definition: src/python/grpcio/grpc/__init__.py:1629
xds_interop_client._MethodHandle
Definition: xds_interop_client.py:359
xds_interop_client._MethodHandle.stop
None stop(self)
Definition: xds_interop_client.py:374
xds_interop_client.parse_rpc_arg
Sequence[str] parse_rpc_arg(str rpc_arg)
Definition: xds_interop_client.py:425
xds_interop_client._LoadBalancerStatsServicer.GetClientStats
messages_pb2.LoadBalancerStatsResponse GetClientStats(self, messages_pb2.LoadBalancerStatsRequest request, grpc.ServicerContext context)
Definition: xds_interop_client.py:137
xds_interop_client._ChannelConfiguration.rpc_timeout_sec
rpc_timeout_sec
Definition: xds_interop_client.py:269
xds_interop_client._MethodHandle._channel_threads
_channel_threads
Definition: xds_interop_client.py:366
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
xds_interop_client._XdsUpdateClientConfigureServicer.__init__
def __init__(self, Mapping[str, _ChannelConfiguration] per_method_configs, int qps)
Definition: xds_interop_client.py:320
xds_interop_client._ChannelConfiguration.metadata
metadata
Definition: xds_interop_client.py:266
xds_interop_client._ChannelConfiguration.qps
qps
Definition: xds_interop_client.py:267
xds_interop_client.bool_arg
bool bool_arg(str arg)
Definition: xds_interop_client.py:433
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_admin.add_admin_servicers
def add_admin_servicers(server)
Definition: src/python/grpcio_admin/grpc_admin/__init__.py:20
xds_interop_client._start_rpc
None _start_rpc(str method, Sequence[Tuple[str, str]] metadata, int request_id, test_pb2_grpc.TestServiceStub stub, float timeout, Mapping[int, Tuple[grpc.Future, str]] futures)
Definition: xds_interop_client.py:179
xds_interop_client._LoadBalancerStatsServicer.GetClientAccumulatedStats
messages_pb2.LoadBalancerAccumulatedStatsResponse GetClientAccumulatedStats(self, messages_pb2.LoadBalancerAccumulatedStatsRequest request, grpc.ServicerContext context)
Definition: xds_interop_client.py:156
grpc.secure_channel
def secure_channel(target, credentials, options=None, compression=None)
Definition: src/python/grpcio/grpc/__init__.py:1982
xds_interop_client._StatsWatcher._end
_end
Definition: xds_interop_client.py:75
xds_interop_client._StatsWatcher._condition
_condition
Definition: xds_interop_client.py:80


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:59