grpc_testing.py
Go to the documentation of this file.
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """
15 This contains helpers for gRPC services defined in
16 https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
17 """
18 import logging
19 from typing import Iterable, Optional, Tuple
20 
21 import grpc
22 from grpc_health.v1 import health_pb2
23 from grpc_health.v1 import health_pb2_grpc
24 
25 import framework.rpc
26 from src.proto.grpc.testing import empty_pb2
27 from src.proto.grpc.testing import messages_pb2
28 from src.proto.grpc.testing import test_pb2_grpc
29 
30 # Type aliases
31 _LoadBalancerStatsRequest = messages_pb2.LoadBalancerStatsRequest
32 LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse
33 _LoadBalancerAccumulatedStatsRequest = messages_pb2.LoadBalancerAccumulatedStatsRequest
34 LoadBalancerAccumulatedStatsResponse = messages_pb2.LoadBalancerAccumulatedStatsResponse
35 
36 
38  stub: test_pb2_grpc.LoadBalancerStatsServiceStub
39  STATS_PARTIAL_RESULTS_TIMEOUT_SEC = 1200
40  STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC = 600
41 
42  def __init__(self, channel: grpc.Channel):
43  super().__init__(channel, test_pb2_grpc.LoadBalancerStatsServiceStub)
44 
46  self,
47  *,
48  num_rpcs: int,
49  timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
50  ) -> LoadBalancerStatsResponse:
51  if timeout_sec is None:
52  timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC
53 
54  return self.call_unary_with_deadline(rpc='GetClientStats',
56  num_rpcs=num_rpcs,
57  timeout_sec=timeout_sec),
58  deadline_sec=timeout_sec,
59  log_level=logging.INFO)
60 
62  self,
63  *,
64  timeout_sec: Optional[int] = None
65  ) -> LoadBalancerAccumulatedStatsResponse:
66  if timeout_sec is None:
67  timeout_sec = self.STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC
68 
69  return self.call_unary_with_deadline(
70  rpc='GetClientAccumulatedStats',
72  deadline_sec=timeout_sec,
73  log_level=logging.INFO)
74 
75 
77  ):
78  stub: test_pb2_grpc.XdsUpdateClientConfigureServiceStub
79  CONFIGURE_TIMEOUT_SEC: int = 5
80 
81  def __init__(self, channel: grpc.Channel):
82  super().__init__(channel,
83  test_pb2_grpc.XdsUpdateClientConfigureServiceStub)
84 
85  def configure(
86  self,
87  *,
88  rpc_types: Iterable[str],
89  metadata: Optional[Iterable[Tuple[str, str, str]]] = None,
90  app_timeout: Optional[int] = None,
91  timeout_sec: int = CONFIGURE_TIMEOUT_SEC,
92  ) -> None:
93  request = messages_pb2.ClientConfigureRequest()
94  for rpc_type in rpc_types:
95  request.types.append(
96  messages_pb2.ClientConfigureRequest.RpcType.Value(rpc_type))
97  if metadata:
98  for entry in metadata:
99  request.metadata.append(
100  messages_pb2.ClientConfigureRequest.Metadata(
101  type=messages_pb2.ClientConfigureRequest.RpcType.Value(
102  entry[0]),
103  key=entry[1],
104  value=entry[2],
105  ))
106  if app_timeout:
107  request.timeout_sec = app_timeout
108  # Configure's response is empty
109  self.call_unary_with_deadline(rpc='Configure',
110  req=request,
111  deadline_sec=timeout_sec,
112  log_level=logging.INFO)
113 
114 
116  stub: test_pb2_grpc.XdsUpdateHealthServiceStub
117 
118  def __init__(self, channel: grpc.Channel):
119  super().__init__(channel, test_pb2_grpc.XdsUpdateHealthServiceStub)
120 
121  def set_serving(self):
122  self.call_unary_with_deadline(rpc='SetServing',
123  req=empty_pb2.Empty(),
124  log_level=logging.INFO)
125 
126  def set_not_serving(self):
127  self.call_unary_with_deadline(rpc='SetNotServing',
128  req=empty_pb2.Empty(),
129  log_level=logging.INFO)
130 
131 
133  stub: health_pb2_grpc.HealthStub
134 
135  def __init__(self, channel: grpc.Channel):
136  super().__init__(channel, health_pb2_grpc.HealthStub)
137 
138  def check_health(self):
139  return self.call_unary_with_deadline(
140  rpc='Check',
141  req=health_pb2.HealthCheckRequest(),
142  log_level=logging.INFO)
framework.rpc.grpc_testing._LoadBalancerStatsRequest
_LoadBalancerStatsRequest
Definition: grpc_testing.py:31
framework.rpc.grpc_testing.XdsUpdateHealthServiceClient.set_not_serving
def set_not_serving(self)
Definition: grpc_testing.py:126
framework.rpc.grpc_testing.XdsUpdateClientConfigureServiceClient.__init__
def __init__(self, grpc.Channel channel)
Definition: grpc_testing.py:81
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient.get_client_stats
LoadBalancerStatsResponse get_client_stats(self, *int num_rpcs, Optional[int] timeout_sec=STATS_PARTIAL_RESULTS_TIMEOUT_SEC)
Definition: grpc_testing.py:45
framework.rpc.grpc_testing._LoadBalancerAccumulatedStatsRequest
_LoadBalancerAccumulatedStatsRequest
Definition: grpc_testing.py:33
framework.rpc.grpc.GrpcClientHelper
Definition: grpc.py:28
framework.rpc.grpc_testing.HealthClient.check_health
def check_health(self)
Definition: grpc_testing.py:138
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient.STATS_PARTIAL_RESULTS_TIMEOUT_SEC
int STATS_PARTIAL_RESULTS_TIMEOUT_SEC
Definition: grpc_testing.py:39
grpc_health.v1
Definition: src/python/grpcio_health_checking/grpc_health/v1/__init__.py:1
framework.rpc
Definition: tools/run_tests/xds_k8s_test_driver/framework/rpc/__init__.py:1
framework.rpc.grpc_testing.HealthClient.__init__
def __init__(self, grpc.Channel channel)
Definition: grpc_testing.py:135
framework.rpc.grpc.GrpcClientHelper.call_unary_with_deadline
Message call_unary_with_deadline(self, *str rpc, Message req, Optional[int] deadline_sec=DEFAULT_RPC_DEADLINE_SEC, Optional[int] log_level=logging.DEBUG)
Definition: grpc.py:39
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient
Definition: grpc_testing.py:37
framework.rpc.grpc_testing.XdsUpdateHealthServiceClient.set_serving
def set_serving(self)
Definition: grpc_testing.py:121
framework.rpc.grpc_testing.HealthClient
Definition: grpc_testing.py:132
framework.rpc.grpc_testing.XdsUpdateClientConfigureServiceClient
Definition: grpc_testing.py:77
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient.STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC
int STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC
Definition: grpc_testing.py:40
framework.rpc.grpc_testing.XdsUpdateHealthServiceClient
Definition: grpc_testing.py:115
framework.rpc.grpc_testing.XdsUpdateClientConfigureServiceClient.configure
None configure(self, *Iterable[str] rpc_types, Optional[Iterable[Tuple[str, str, str]]] metadata=None, Optional[int] app_timeout=None, int timeout_sec=CONFIGURE_TIMEOUT_SEC)
Definition: grpc_testing.py:85
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient.get_client_accumulated_stats
LoadBalancerAccumulatedStatsResponse get_client_accumulated_stats(self, *Optional[int] timeout_sec=None)
Definition: grpc_testing.py:61
framework.rpc.grpc_testing.XdsUpdateHealthServiceClient.__init__
def __init__(self, grpc.Channel channel)
Definition: grpc_testing.py:118
framework.rpc.grpc_testing.LoadBalancerStatsServiceClient.__init__
def __init__(self, grpc.Channel channel)
Definition: grpc_testing.py:42


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:47