23 from typing
import Iterable, List, Mapping, Set, Tuple
27 import xds_interop_client
28 import xds_interop_server
30 from src.proto.grpc.testing
import empty_pb2
31 from src.proto.grpc.testing
import messages_pb2
32 from src.proto.grpc.testing
import test_pb2
33 from src.proto.grpc.testing
import test_pb2_grpc
34 import src.python.grpcio_tests.tests.unit.framework.common
as framework_common
36 _CLIENT_PATH = os.path.abspath(os.path.realpath(xds_interop_client.__file__))
37 _SERVER_PATH = os.path.abspath(os.path.realpath(xds_interop_server.__file__))
40 (messages_pb2.ClientConfigureRequest.UNARY_CALL,
"UNARY_CALL"),
41 (messages_pb2.ClientConfigureRequest.EMPTY_CALL,
"EMPTY_CALL"),
48 _ITERATION_DURATION_SECONDS = 1
49 _SUBPROCESS_TIMEOUT_SECONDS = 2
58 @contextlib.contextmanager
60 file: str, args: List[str]
61 ) -> Tuple[subprocess.Popen, tempfile.TemporaryFile, tempfile.TemporaryFile]:
62 with tempfile.TemporaryFile(mode=
'r')
as stdout:
63 with tempfile.TemporaryFile(mode=
'r')
as stderr:
64 proc = subprocess.Popen((sys.executable, file) + tuple(args),
67 yield proc, stdout, stderr
71 stream: tempfile.TemporaryFile):
72 sys.stderr.write(f
"{process_name} {stream_name}:\n")
74 sys.stderr.write(stream.read())
78 stderr: tempfile.TemporaryFile):
81 sys.stderr.write(f
"End {process_name} output.\n")
85 response: messages_pb2.LoadBalancerAccumulatedStatsResponse
86 ) -> Mapping[str, Mapping[int, int]]:
87 indexed = collections.defaultdict(
lambda: collections.defaultdict(int))
88 for _, method_str
in _METHODS:
89 for status
in response.stats_per_method[method_str].result.keys():
90 indexed[method_str][status] = response.stats_per_method[
91 method_str].result[status]
96 b: Mapping[str, Mapping[int, int]]):
97 c = collections.defaultdict(
lambda: collections.defaultdict(int))
99 for method
in all_methods:
101 for status
in all_statuses:
102 c[method][status] = a[method][status] - b[method][status]
107 duration: int) -> Mapping[str, Mapping[int, int]]:
109 "target": f
"localhost:{stats_port}",
112 response = test_pb2_grpc.LoadBalancerStatsService.GetClientAccumulatedStats(
113 messages_pb2.LoadBalancerAccumulatedStatsRequest(), **settings)
116 response = test_pb2_grpc.LoadBalancerStatsService.GetClientAccumulatedStats(
117 messages_pb2.LoadBalancerAccumulatedStatsRequest(), **settings)
125 qps: int, num_channels: int):
127 "target": f
"localhost:{stats_port}",
130 for i
in range(_TEST_ITERATIONS):
131 target_method, target_method_str = _METHODS[i %
len(_METHODS)]
132 test_pb2_grpc.XdsUpdateClientConfigureService.Configure(
133 messages_pb2.ClientConfigureRequest(types=[target_method]),
136 logging.info(
"Delta: %s", delta)
137 for _, method_str
in _METHODS:
138 for status
in delta[method_str]:
139 if status == 0
and method_str == target_method_str:
140 self.assertGreater(delta[method_str][status], 0, delta)
142 self.assertEqual(delta[method_str][status], 0, delta)
145 _, server_port, socket = framework_common.get_socket()
149 [f
"--port={server_port}", f
"--maintenance_port={server_port}"
150 ])
as (server, server_stdout, server_stderr):
152 logging.info(
"Sending RPC to server.")
153 test_pb2_grpc.TestService.EmptyCall(empty_pb2.Empty(),
154 f
"localhost:{server_port}",
157 logging.info(
"Server successfully started.")
159 _, stats_port, stats_socket = framework_common.get_socket()
161 f
"--server=localhost:{server_port}",
162 f
"--stats_port={stats_port}", f
"--qps={_QPS}",
163 f
"--num_channels={_NUM_CHANNELS}"
164 ])
as (client, client_stdout, client_stderr):
176 server.wait(timeout=_SUBPROCESS_TIMEOUT_SECONDS)
177 client.wait(timeout=_SUBPROCESS_TIMEOUT_SECONDS)
180 if __name__ ==
'__main__':
181 logging.basicConfig()
182 unittest.main(verbosity=2)