xds_interop_client_test.py
Go to the documentation of this file.
1 # Copyright 2022 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import collections
16 import contextlib
17 import logging
18 import os
19 import subprocess
20 import sys
21 import tempfile
22 import time
23 from typing import Iterable, List, Mapping, Set, Tuple
24 import unittest
25 
26 import grpc.experimental
27 import xds_interop_client
28 import xds_interop_server
29 
30 from src.proto.grpc.testing import empty_pb2
31 from src.proto.grpc.testing import messages_pb2
32 from src.proto.grpc.testing import test_pb2
33 from src.proto.grpc.testing import test_pb2_grpc
34 import src.python.grpcio_tests.tests.unit.framework.common as framework_common
35 
36 _CLIENT_PATH = os.path.abspath(os.path.realpath(xds_interop_client.__file__))
37 _SERVER_PATH = os.path.abspath(os.path.realpath(xds_interop_server.__file__))
38 
39 _METHODS = (
40  (messages_pb2.ClientConfigureRequest.UNARY_CALL, "UNARY_CALL"),
41  (messages_pb2.ClientConfigureRequest.EMPTY_CALL, "EMPTY_CALL"),
42 )
43 
44 _QPS = 100
45 _NUM_CHANNELS = 20
46 
47 _TEST_ITERATIONS = 10
48 _ITERATION_DURATION_SECONDS = 1
49 _SUBPROCESS_TIMEOUT_SECONDS = 2
50 
51 
52 def _set_union(a: Iterable, b: Iterable) -> Set:
53  c = set(a)
54  c.update(b)
55  return c
56 
57 
58 @contextlib.contextmanager
60  file: str, args: List[str]
61 ) -> Tuple[subprocess.Popen, tempfile.TemporaryFile, tempfile.TemporaryFile]:
62  with tempfile.TemporaryFile(mode='r') as stdout:
63  with tempfile.TemporaryFile(mode='r') as stderr:
64  proc = subprocess.Popen((sys.executable, file) + tuple(args),
65  stdout=stdout,
66  stderr=stderr)
67  yield proc, stdout, stderr
68 
69 
70 def _dump_stream(process_name: str, stream_name: str,
71  stream: tempfile.TemporaryFile):
72  sys.stderr.write(f"{process_name} {stream_name}:\n")
73  stream.seek(0)
74  sys.stderr.write(stream.read())
75 
76 
77 def _dump_streams(process_name: str, stdout: tempfile.TemporaryFile,
78  stderr: tempfile.TemporaryFile):
79  _dump_stream(process_name, "stdout", stdout)
80  _dump_stream(process_name, "stderr", stderr)
81  sys.stderr.write(f"End {process_name} output.\n")
82 
83 
85  response: messages_pb2.LoadBalancerAccumulatedStatsResponse
86 ) -> Mapping[str, Mapping[int, int]]:
87  indexed = collections.defaultdict(lambda: collections.defaultdict(int))
88  for _, method_str in _METHODS:
89  for status in response.stats_per_method[method_str].result.keys():
90  indexed[method_str][status] = response.stats_per_method[
91  method_str].result[status]
92  return indexed
93 
94 
95 def _subtract_indexed_stats(a: Mapping[str, Mapping[int, int]],
96  b: Mapping[str, Mapping[int, int]]):
97  c = collections.defaultdict(lambda: collections.defaultdict(int))
98  all_methods = _set_union(a.keys(), b.keys())
99  for method in all_methods:
100  all_statuses = _set_union(a[method].keys(), b[method].keys())
101  for status in all_statuses:
102  c[method][status] = a[method][status] - b[method][status]
103  return c
104 
105 
106 def _collect_stats(stats_port: int,
107  duration: int) -> Mapping[str, Mapping[int, int]]:
108  settings = {
109  "target": f"localhost:{stats_port}",
110  "insecure": True,
111  }
112  response = test_pb2_grpc.LoadBalancerStatsService.GetClientAccumulatedStats(
113  messages_pb2.LoadBalancerAccumulatedStatsRequest(), **settings)
114  before = _index_accumulated_stats(response)
115  time.sleep(duration)
116  response = test_pb2_grpc.LoadBalancerStatsService.GetClientAccumulatedStats(
117  messages_pb2.LoadBalancerAccumulatedStatsRequest(), **settings)
118  after = _index_accumulated_stats(response)
119  return _subtract_indexed_stats(after, before)
120 
121 
122 class XdsInteropClientTest(unittest.TestCase):
123 
124  def _assert_client_consistent(self, server_port: int, stats_port: int,
125  qps: int, num_channels: int):
126  settings = {
127  "target": f"localhost:{stats_port}",
128  "insecure": True,
129  }
130  for i in range(_TEST_ITERATIONS):
131  target_method, target_method_str = _METHODS[i % len(_METHODS)]
132  test_pb2_grpc.XdsUpdateClientConfigureService.Configure(
133  messages_pb2.ClientConfigureRequest(types=[target_method]),
134  **settings)
135  delta = _collect_stats(stats_port, _ITERATION_DURATION_SECONDS)
136  logging.info("Delta: %s", delta)
137  for _, method_str in _METHODS:
138  for status in delta[method_str]:
139  if status == 0 and method_str == target_method_str:
140  self.assertGreater(delta[method_str][status], 0, delta)
141  else:
142  self.assertEqual(delta[method_str][status], 0, delta)
143 
145  _, server_port, socket = framework_common.get_socket()
146 
148  _SERVER_PATH,
149  [f"--port={server_port}", f"--maintenance_port={server_port}"
150  ]) as (server, server_stdout, server_stderr):
151  # Send RPC to server to make sure it's running.
152  logging.info("Sending RPC to server.")
153  test_pb2_grpc.TestService.EmptyCall(empty_pb2.Empty(),
154  f"localhost:{server_port}",
155  insecure=True,
156  wait_for_ready=True)
157  logging.info("Server successfully started.")
158  socket.close()
159  _, stats_port, stats_socket = framework_common.get_socket()
160  with _start_python_with_args(_CLIENT_PATH, [
161  f"--server=localhost:{server_port}",
162  f"--stats_port={stats_port}", f"--qps={_QPS}",
163  f"--num_channels={_NUM_CHANNELS}"
164  ]) as (client, client_stdout, client_stderr):
165  stats_socket.close()
166  try:
167  self._assert_client_consistent(server_port, stats_port,
168  _QPS, _NUM_CHANNELS)
169  except:
170  _dump_streams("server", server_stdout, server_stderr)
171  _dump_streams("client", client_stdout, client_stderr)
172  raise
173  finally:
174  server.kill()
175  client.kill()
176  server.wait(timeout=_SUBPROCESS_TIMEOUT_SECONDS)
177  client.wait(timeout=_SUBPROCESS_TIMEOUT_SECONDS)
178 
179 
180 if __name__ == '__main__':
181  logging.basicConfig()
182  unittest.main(verbosity=2)
xds_interop_client_test.XdsInteropClientTest._assert_client_consistent
def _assert_client_consistent(self, int server_port, int stats_port, int qps, int num_channels)
Definition: xds_interop_client_test.py:124
keys
const void * keys
Definition: abseil-cpp/absl/random/internal/randen.cc:49
xds_interop_client_test._collect_stats
Mapping[str, Mapping[int, int]] _collect_stats(int stats_port, int duration)
Definition: xds_interop_client_test.py:106
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
xds_interop_client_test._set_union
Set _set_union(Iterable a, Iterable b)
Definition: xds_interop_client_test.py:52
grpc::experimental
Definition: include/grpcpp/channel.h:46
xds_interop_client_test._index_accumulated_stats
Mapping[str, Mapping[int, int]] _index_accumulated_stats(messages_pb2.LoadBalancerAccumulatedStatsResponse response)
Definition: xds_interop_client_test.py:84
xds_interop_client_test._dump_stream
def _dump_stream(str process_name, str stream_name, tempfile.TemporaryFile stream)
Definition: xds_interop_client_test.py:70
xds_interop_client_test._dump_streams
def _dump_streams(str process_name, tempfile.TemporaryFile stdout, tempfile.TemporaryFile stderr)
Definition: xds_interop_client_test.py:77
xds_interop_client_test.XdsInteropClientTest
Definition: xds_interop_client_test.py:122
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
xds_interop_client_test.XdsInteropClientTest.test_configure_consistency
def test_configure_consistency(self)
Definition: xds_interop_client_test.py:144
xds_interop_client_test._start_python_with_args
Tuple[subprocess.Popen, tempfile.TemporaryFile, tempfile.TemporaryFile] _start_python_with_args(str file, List[str] args)
Definition: xds_interop_client_test.py:59
xds_interop_client_test._subtract_indexed_stats
def _subtract_indexed_stats(Mapping[str, Mapping[int, int]] a, Mapping[str, Mapping[int, int]] b)
Definition: xds_interop_client_test.py:95


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:59