fault_injection_test.py
Go to the documentation of this file.
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import time
16 from typing import Tuple
17 
18 from absl import flags
19 from absl.testing import absltest
20 import grpc
21 
22 from framework import xds_url_map_testcase
23 from framework.helpers import skips
24 from framework.test_app import client_app
25 
26 # Type aliases
27 HostRule = xds_url_map_testcase.HostRule
28 PathMatcher = xds_url_map_testcase.PathMatcher
29 GcpResourceManager = xds_url_map_testcase.GcpResourceManager
30 DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
31 RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
32 RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
33 XdsTestClient = client_app.XdsTestClient
34 ExpectedResult = xds_url_map_testcase.ExpectedResult
35 _Lang = skips.Lang
36 
37 logger = logging.getLogger(__name__)
38 flags.adopt_module_key_flags(xds_url_map_testcase)
39 
40 # The first batch of RPCs don't count towards the result of test case. They are
41 # meant to prove the communication between driver and client is fine.
42 _NUM_RPCS = 10
43 _LENGTH_OF_RPC_SENDING_SEC = 16
44 # We are using sleep to synchronize test driver and the client... Even though
45 # the client is sending at QPS rate, we can't assert that exactly QPS *
46 # SLEEP_DURATION number of RPC is finished. The final completed RPC might be
47 # slightly more or less.
48 _NON_RANDOM_ERROR_TOLERANCE = 0.01
49 # For random generator involved test cases, we want to be more loose about the
50 # final result. Otherwise, we will need more test duration (sleep duration) and
51 # more accurate communication mechanism. The accurate of random number
52 # generation is not the intention of this test.
53 _ERROR_TOLERANCE = 0.2
54 _DELAY_CASE_APPLICATION_TIMEOUT_SEC = 1
55 _BACKLOG_WAIT_TIME_SEC = 20
56 
57 
58 def _build_fault_injection_route_rule(abort_percentage: int = 0,
59  delay_percentage: int = 0):
60  return {
61  'priority': 0,
62  'matchRules': [{
63  'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
64  }],
65  'service': GcpResourceManager().default_backend_service(),
66  'routeAction': {
67  'faultInjectionPolicy': {
68  'abort': {
69  'httpStatus': 401,
70  'percentage': abort_percentage,
71  },
72  'delay': {
73  'fixedDelay': {
74  'seconds': '20'
75  },
76  'percentage': delay_percentage,
77  }
78  }
79  },
80  }
81 
82 
83 def _wait_until_backlog_cleared(test_client: XdsTestClient,
84  timeout: int = _BACKLOG_WAIT_TIME_SEC):
85  """ Wait until the completed RPC is close to started RPC.
86 
87  For delay injected test cases, there might be a backlog of RPCs due to slow
88  initialization of the client. E.g., if initialization took 20s and qps is
89  25, then there will be a backlog of 500 RPCs. In normal test cases, this is
90  fine, because RPCs will fail immediately. But for delay injected test cases,
91  the RPC might linger much longer and affect the stability of test results.
92  """
93  logger.info('Waiting for RPC backlog to clear for %d seconds', timeout)
94  deadline = time.time() + timeout
95  while time.time() < deadline:
96  stats = test_client.get_load_balancer_accumulated_stats()
97  ok = True
98  for rpc_type in [RpcTypeUnaryCall, RpcTypeEmptyCall]:
99  started = stats.num_rpcs_started_by_method.get(rpc_type, 0)
100  completed = stats.num_rpcs_succeeded_by_method.get(
101  rpc_type, 0) + stats.num_rpcs_failed_by_method.get(rpc_type, 0)
102  # We consider the backlog is healthy, if the diff between started
103  # RPCs and completed RPCs is less than 1.5 QPS.
104  if abs(started - completed) > xds_url_map_testcase.QPS.value * 1.1:
105  logger.info(
106  'RPC backlog exist: rpc_type=%s started=%s completed=%s',
107  rpc_type, started, completed)
108  time.sleep(_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
109  ok = False
110  else:
111  logger.info(
112  'RPC backlog clear: rpc_type=%s started=%s completed=%s',
113  rpc_type, started, completed)
114  if ok:
115  # Both backlog of both types of RPCs is clear, success, return.
116  return
117 
118  raise RuntimeError('failed to clear RPC backlog in %s seconds' % timeout)
119 
120 
121 def _is_supported(config: skips.TestConfig) -> bool:
122  if config.client_lang == _Lang.NODE:
123  return config.version_gte('v1.4.x')
124  return True
125 
126 
127 class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
128 
129  @staticmethod
130  def is_supported(config: skips.TestConfig) -> bool:
131  return _is_supported(config)
132 
133  @staticmethod
135  host_rule: HostRule,
136  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
137  path_matcher["routeRules"] = [
138  _build_fault_injection_route_rule(abort_percentage=0,
139  delay_percentage=0)
140  ]
141  return host_rule, path_matcher
142 
143  def xds_config_validate(self, xds_config: DumpedXdsConfig):
144  self.assertNumEndpoints(xds_config, 1)
145  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
146  'typedPerFilterConfig']['envoy.filters.http.fault']
147  self.assertEqual('20s', filter_config['delay']['fixedDelay'])
148  self.assertEqual(
149  0, filter_config['delay']['percentage'].get('numerator', 0))
150  self.assertEqual('MILLION',
151  filter_config['delay']['percentage']['denominator'])
152  self.assertEqual(401, filter_config['abort']['httpStatus'])
153  self.assertEqual(
154  0, filter_config['abort']['percentage'].get('numerator', 0))
155  self.assertEqual('MILLION',
156  filter_config['abort']['percentage']['denominator'])
157 
158  def rpc_distribution_validate(self, test_client: XdsTestClient):
159  self.configure_and_send(test_client,
160  rpc_types=(RpcTypeUnaryCall,),
161  num_rpcs=_NUM_RPCS)
162  self.assertRpcStatusCode(test_client,
163  expected=(ExpectedResult(
164  rpc_type=RpcTypeUnaryCall,
165  status_code=grpc.StatusCode.OK,
166  ratio=1),),
167  length=_LENGTH_OF_RPC_SENDING_SEC,
168  tolerance=_NON_RANDOM_ERROR_TOLERANCE)
169 
170 
171 class TestNonMatchingFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
172  """EMPTY_CALL is not fault injected, so it should succeed."""
173 
174  @staticmethod
175  def is_supported(config: skips.TestConfig) -> bool:
176  return _is_supported(config)
177 
178  @staticmethod
179  def client_init_config(rpc: str, metadata: str):
180  # Python interop client will stuck if the traffic is slow (in this case,
181  # 20s injected). The purpose of this test is examining the un-injected
182  # traffic is not impacted, so it's fine to just send un-injected
183  # traffic.
184  return 'EmptyCall', metadata
185 
186  @staticmethod
188  host_rule: HostRule,
189  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
190  path_matcher["routeRules"] = [
191  _build_fault_injection_route_rule(abort_percentage=100,
192  delay_percentage=100)
193  ]
194  return host_rule, path_matcher
195 
196  def xds_config_validate(self, xds_config: DumpedXdsConfig):
197  self.assertNumEndpoints(xds_config, 1)
198  # The first route rule for UNARY_CALL is fault injected
199  self.assertEqual(
200  "/grpc.testing.TestService/UnaryCall",
201  xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'])
202  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
203  'typedPerFilterConfig']['envoy.filters.http.fault']
204  self.assertEqual('20s', filter_config['delay']['fixedDelay'])
205  self.assertEqual(1000000,
206  filter_config['delay']['percentage']['numerator'])
207  self.assertEqual('MILLION',
208  filter_config['delay']['percentage']['denominator'])
209  self.assertEqual(401, filter_config['abort']['httpStatus'])
210  self.assertEqual(1000000,
211  filter_config['abort']['percentage']['numerator'])
212  self.assertEqual('MILLION',
213  filter_config['abort']['percentage']['denominator'])
214  # The second route rule for all other RPCs is untouched
215  self.assertNotIn(
216  'envoy.filters.http.fault',
217  xds_config.rds['virtualHosts'][0]['routes'][1].get(
218  'typedPerFilterConfig', {}))
219 
220  def rpc_distribution_validate(self, test_client: XdsTestClient):
221  self.assertRpcStatusCode(test_client,
222  expected=(ExpectedResult(
223  rpc_type=RpcTypeEmptyCall,
224  status_code=grpc.StatusCode.OK,
225  ratio=1),),
226  length=_LENGTH_OF_RPC_SENDING_SEC,
227  tolerance=_NON_RANDOM_ERROR_TOLERANCE)
228 
229 
230 @absltest.skip('20% RPC might pass immediately, reason unknown')
231 class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase):
232 
233  @staticmethod
234  def is_supported(config: skips.TestConfig) -> bool:
235  return _is_supported(config)
236 
237  @staticmethod
239  host_rule: HostRule,
240  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
241  path_matcher["routeRules"] = [
242  _build_fault_injection_route_rule(abort_percentage=0,
243  delay_percentage=100)
244  ]
245  return host_rule, path_matcher
246 
247  def xds_config_validate(self, xds_config: DumpedXdsConfig):
248  self.assertNumEndpoints(xds_config, 1)
249  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
250  'typedPerFilterConfig']['envoy.filters.http.fault']
251  self.assertEqual('20s', filter_config['delay']['fixedDelay'])
252  self.assertEqual(1000000,
253  filter_config['delay']['percentage']['numerator'])
254  self.assertEqual('MILLION',
255  filter_config['delay']['percentage']['denominator'])
256 
257  def rpc_distribution_validate(self, test_client: XdsTestClient):
258  self.configure_and_send(test_client,
259  rpc_types=(RpcTypeUnaryCall,),
260  num_rpcs=_NUM_RPCS,
261  app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
262  _wait_until_backlog_cleared(test_client)
263  self.assertRpcStatusCode(
264  test_client,
265  expected=(ExpectedResult(
266  rpc_type=RpcTypeUnaryCall,
267  status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
268  ratio=1),),
269  length=_LENGTH_OF_RPC_SENDING_SEC,
270  tolerance=_NON_RANDOM_ERROR_TOLERANCE)
271 
272 
273 class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase):
274 
275  @staticmethod
276  def is_supported(config: skips.TestConfig) -> bool:
277  return _is_supported(config)
278 
279  @staticmethod
281  host_rule: HostRule,
282  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
283  path_matcher["routeRules"] = [
284  _build_fault_injection_route_rule(abort_percentage=100,
285  delay_percentage=0)
286  ]
287  return host_rule, path_matcher
288 
289  def xds_config_validate(self, xds_config: DumpedXdsConfig):
290  self.assertNumEndpoints(xds_config, 1)
291  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
292  'typedPerFilterConfig']['envoy.filters.http.fault']
293  self.assertEqual(401, filter_config['abort']['httpStatus'])
294  self.assertEqual(1000000,
295  filter_config['abort']['percentage']['numerator'])
296  self.assertEqual('MILLION',
297  filter_config['abort']['percentage']['denominator'])
298 
299  def rpc_distribution_validate(self, test_client: XdsTestClient):
300  self.configure_and_send(test_client,
301  rpc_types=(RpcTypeUnaryCall,),
302  num_rpcs=_NUM_RPCS)
303  self.assertRpcStatusCode(
304  test_client,
305  expected=(ExpectedResult(
306  rpc_type=RpcTypeUnaryCall,
307  status_code=grpc.StatusCode.UNAUTHENTICATED,
308  ratio=1),),
309  length=_LENGTH_OF_RPC_SENDING_SEC,
310  tolerance=_NON_RANDOM_ERROR_TOLERANCE)
311 
312 
313 class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase):
314 
315  @staticmethod
316  def is_supported(config: skips.TestConfig) -> bool:
317  return _is_supported(config)
318 
319  @staticmethod
321  host_rule: HostRule,
322  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
323  path_matcher["routeRules"] = [
324  _build_fault_injection_route_rule(abort_percentage=0,
325  delay_percentage=50)
326  ]
327  return host_rule, path_matcher
328 
329  def xds_config_validate(self, xds_config: DumpedXdsConfig):
330  self.assertNumEndpoints(xds_config, 1)
331  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
332  'typedPerFilterConfig']['envoy.filters.http.fault']
333  self.assertEqual('20s', filter_config['delay']['fixedDelay'])
334  self.assertEqual(500000,
335  filter_config['delay']['percentage']['numerator'])
336  self.assertEqual('MILLION',
337  filter_config['delay']['percentage']['denominator'])
338 
339  def rpc_distribution_validate(self, test_client: XdsTestClient):
340  self.configure_and_send(test_client,
341  rpc_types=(RpcTypeUnaryCall,),
342  num_rpcs=_NUM_RPCS,
343  app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
344  _wait_until_backlog_cleared(test_client)
345  self.assertRpcStatusCode(
346  test_client,
347  expected=(ExpectedResult(
348  rpc_type=RpcTypeUnaryCall,
349  status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
350  ratio=0.5),),
351  length=_LENGTH_OF_RPC_SENDING_SEC,
352  tolerance=_ERROR_TOLERANCE)
353 
354 
355 class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase):
356 
357  @staticmethod
358  def is_supported(config: skips.TestConfig) -> bool:
359  return _is_supported(config)
360 
361  @staticmethod
363  host_rule: HostRule,
364  path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
365  path_matcher["routeRules"] = [
366  _build_fault_injection_route_rule(abort_percentage=50,
367  delay_percentage=0)
368  ]
369  return host_rule, path_matcher
370 
371  def xds_config_validate(self, xds_config: DumpedXdsConfig):
372  self.assertNumEndpoints(xds_config, 1)
373  filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
374  'typedPerFilterConfig']['envoy.filters.http.fault']
375  self.assertEqual(401, filter_config['abort']['httpStatus'])
376  self.assertEqual(500000,
377  filter_config['abort']['percentage']['numerator'])
378  self.assertEqual('MILLION',
379  filter_config['abort']['percentage']['denominator'])
380 
381  def rpc_distribution_validate(self, test_client: XdsTestClient):
382  self.configure_and_send(test_client,
383  rpc_types=(RpcTypeUnaryCall,),
384  num_rpcs=_NUM_RPCS)
385  self.assertRpcStatusCode(
386  test_client,
387  expected=(ExpectedResult(
388  rpc_type=RpcTypeUnaryCall,
389  status_code=grpc.StatusCode.UNAUTHENTICATED,
390  ratio=0.5),),
391  length=_LENGTH_OF_RPC_SENDING_SEC,
392  tolerance=_ERROR_TOLERANCE)
393 
394 
395 if __name__ == '__main__':
396  absltest.main()
tests.url_map.fault_injection_test.TestAlwaysDelay.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:238
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
tests.url_map.fault_injection_test._wait_until_backlog_cleared
def _wait_until_backlog_cleared(XdsTestClient test_client, int timeout=_BACKLOG_WAIT_TIME_SEC)
Definition: fault_injection_test.py:83
tests.url_map.fault_injection_test.TestDelayHalf.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:320
tests.url_map.fault_injection_test.TestAlwaysAbort.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:289
tests.url_map.fault_injection_test.TestDelayHalf.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:329
tests.url_map.fault_injection_test.TestAlwaysAbort.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:280
framework.helpers
Definition: tools/run_tests/xds_k8s_test_driver/framework/helpers/__init__.py:1
tests.url_map.fault_injection_test.TestAbortHalf.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:358
tests.url_map.fault_injection_test.TestAlwaysAbort
Definition: fault_injection_test.py:273
tests.url_map.fault_injection_test.ExpectedResult
ExpectedResult
Definition: fault_injection_test.py:34
tests.url_map.fault_injection_test.TestDelayHalf.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:316
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:220
framework.test_app
Definition: tools/run_tests/xds_k8s_test_driver/framework/test_app/__init__.py:1
tests.url_map.fault_injection_test.GcpResourceManager
GcpResourceManager
Definition: fault_injection_test.py:29
tests.url_map.fault_injection_test.TestAlwaysDelay
Definition: fault_injection_test.py:231
tests.url_map.fault_injection_test.TestAlwaysDelay.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:234
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:196
tests.url_map.fault_injection_test.TestAbortHalf
Definition: fault_injection_test.py:355
tests.url_map.fault_injection_test.TestZeroPercentFaultInjection
Definition: fault_injection_test.py:127
tests.url_map.fault_injection_test._is_supported
bool _is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:121
tests.url_map.fault_injection_test.TestAlwaysAbort.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:299
tests.url_map.fault_injection_test._build_fault_injection_route_rule
def _build_fault_injection_route_rule(int abort_percentage=0, int delay_percentage=0)
Definition: fault_injection_test.py:58
tests.url_map.fault_injection_test.TestAbortHalf.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:362
tests.url_map.fault_injection_test.TestDelayHalf
Definition: fault_injection_test.py:313
tests.url_map.fault_injection_test.TestAlwaysDelay.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:247
tests.url_map.fault_injection_test.TestAbortHalf.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:371
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection.client_init_config
def client_init_config(str rpc, str metadata)
Definition: fault_injection_test.py:179
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:175
tests.url_map.fault_injection_test.TestAbortHalf.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:381
tests.url_map.fault_injection_test.TestZeroPercentFaultInjection.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:158
tests.url_map.fault_injection_test.TestZeroPercentFaultInjection.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:134
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection.url_map_change
Tuple[HostRule, PathMatcher] url_map_change(HostRule host_rule, PathMatcher path_matcher)
Definition: fault_injection_test.py:187
tests.url_map.fault_injection_test.TestAlwaysDelay.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:257
tests.url_map.fault_injection_test.TestZeroPercentFaultInjection.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:130
tests.url_map.fault_injection_test.TestDelayHalf.rpc_distribution_validate
def rpc_distribution_validate(self, XdsTestClient test_client)
Definition: fault_injection_test.py:339
tests.url_map.fault_injection_test.TestAlwaysAbort.is_supported
bool is_supported(skips.TestConfig config)
Definition: fault_injection_test.py:276
tests.url_map.fault_injection_test.TestZeroPercentFaultInjection.xds_config_validate
def xds_config_validate(self, DumpedXdsConfig xds_config)
Definition: fault_injection_test.py:143
tests.url_map.fault_injection_test.TestNonMatchingFaultInjection
Definition: fault_injection_test.py:171


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:59:19