loadtest_config.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Copyright 2021 The gRPC Authors
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 # Script to generate test configurations for the OSS benchmarks framework.
17 #
18 # This script filters test scenarios and generates uniquely named configurations
19 # for each test. Configurations are dumped in multipart YAML format.
20 #
21 # See documentation below:
22 # https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
23 
24 import argparse
25 import collections
26 import copy
27 import datetime
28 import itertools
29 import json
30 import os
31 import string
32 import sys
33 from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type
34 
35 import yaml
36 
37 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
38 import scenario_config
39 import scenario_config_exporter
40 
41 CONFIGURATION_FILE_HEADER_COMMENT = """
42 # Load test configurations generated from a template by loadtest_config.py.
43 # See documentation below:
44 # https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
45 """
46 
47 
48 def safe_name(language: str) -> str:
49  """Returns a name that is safe to use in labels and file names."""
50  return scenario_config.LANGUAGES[language].safename
51 
52 
53 def default_prefix() -> str:
54  """Constructs and returns a default prefix for LoadTest names."""
55  return os.environ.get('USER', 'loadtest')
56 
57 
58 def now_string() -> str:
59  """Returns the current date and time in string format."""
60  return datetime.datetime.now().strftime('%Y%m%d%H%M%S')
61 
62 
63 def validate_loadtest_name(name: str) -> None:
64  """Validates that a LoadTest name is in the expected format."""
65  if len(name) > 253:
66  raise ValueError(
67  'LoadTest name must be less than 253 characters long: %s' % name)
68  if not all(c.isalnum() and not c.isupper() for c in name if c != '-'):
69  raise ValueError('Invalid characters in LoadTest name: %s' % name)
70  if not name or not name[0].isalpha() or name[-1] == '-':
71  raise ValueError('Invalid format for LoadTest name: %s' % name)
72 
73 
74 def loadtest_base_name(scenario_name: str,
75  uniquifier_elements: Iterable[str]) -> str:
76  """Constructs and returns the base name for a LoadTest resource."""
77  name_elements = scenario_name.split('_')
78  name_elements.extend(uniquifier_elements)
79  return '-'.join(element.lower() for element in name_elements)
80 
81 
82 def loadtest_name(prefix: str, scenario_name: str,
83  uniquifier_elements: Iterable[str]) -> str:
84  """Constructs and returns a valid name for a LoadTest resource."""
85  base_name = loadtest_base_name(scenario_name, uniquifier_elements)
86  name_elements = []
87  if prefix:
88  name_elements.append(prefix)
89  name_elements.append(base_name)
90  name = '-'.join(name_elements)
92  return name
93 
94 
95 def component_name(elements: Iterable[str]) -> str:
96  """Constructs a component name from possibly empty elements."""
97  return '-'.join((e for e in elements if e))
98 
99 
100 def validate_annotations(annotations: Dict[str, str]) -> None:
101  """Validates that annotations do not contain reserved names.
102 
103  These names are automatically added by the config generator.
104  """
105  names = set(('scenario', 'uniquifier')).intersection(annotations)
106  if names:
107  raise ValueError('Annotations contain reserved names: %s' % names)
108 
109 
110 def gen_run_indices(runs_per_test: int) -> Iterable[str]:
111  """Generates run indices for multiple runs, as formatted strings."""
112  if runs_per_test < 2:
113  yield ''
114  return
115  index_length = len('{:d}'.format(runs_per_test - 1))
116  index_fmt = '{{:0{:d}d}}'.format(index_length)
117  for i in range(runs_per_test):
118  yield index_fmt.format(i)
119 
120 
121 def scenario_name(base_name: str, client_channels: Optional[int],
122  server_threads: Optional[int], offered_load: Optional[int]):
123  """Constructs scenario name from base name and modifiers."""
124 
125  elements = [base_name]
126  if client_channels:
127  elements.append('{:d}channels'.format(client_channels))
128  if server_threads:
129  elements.append('{:d}threads'.format(server_threads))
130  if offered_load:
131  elements.append('{:d}load'.format(offered_load))
132  return '_'.join(elements)
133 
134 
136  client_channels: Optional[int], server_threads: Optional[int],
137  offered_loads: Optional[Iterable[int]]
138 ) -> Optional[Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str,
139  Any]]]]:
140  """Returns a transform to be applied to a list of scenarios."""
141  if not any((client_channels, server_threads, len(offered_loads))):
142  return lambda s: s
143 
144  def _transform(
145  scenarios: Iterable[Mapping[str,
146  Any]]) -> Iterable[Mapping[str, Any]]:
147  """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load."""
148 
149  for base_scenario in scenarios:
150  base_name = base_scenario['name']
151  if client_channels:
152  base_scenario['client_config'][
153  'client_channels'] = client_channels
154 
155  if server_threads:
156  base_scenario['server_config'][
157  'async_server_threads'] = server_threads
158 
159  if not offered_loads:
160  base_scenario['name'] = scenario_name(base_name,
161  client_channels,
162  server_threads, 0)
163  yield base_scenario
164  return
165 
166  for offered_load in offered_loads:
167  scenario = copy.deepcopy(base_scenario)
168  scenario['client_config']['load_params'] = {
169  'poisson': {
170  'offered_load': offered_load
171  }
172  }
173  scenario['name'] = scenario_name(base_name, client_channels,
174  server_threads, offered_load)
175  yield scenario
176 
177  return _transform
178 
179 
181  base_config: Mapping[str, Any],
182  base_config_clients: Iterable[Mapping[str, Any]],
183  base_config_servers: Iterable[Mapping[str, Any]],
184  scenario_name_regex: str,
185  language_config: scenario_config_exporter.LanguageConfig,
186  loadtest_name_prefix: str,
187  uniquifier_elements: Iterable[str],
188  annotations: Mapping[str, str],
189  instances_per_client: int = 1,
190  runs_per_test: int = 1,
191  scenario_transform: Callable[[Iterable[Mapping[str, Any]]],
192  List[Dict[str, Any]]] = lambda s: s
193 ) -> Iterable[Dict[str, Any]]:
194  """Generates LoadTest configurations for a given language config.
195 
196  The LoadTest configurations are generated as YAML objects.
197  """
198  validate_annotations(annotations)
199  prefix = loadtest_name_prefix or default_prefix()
200  cl = safe_name(language_config.client_language or language_config.language)
201  sl = safe_name(language_config.server_language or language_config.language)
202  scenario_filter = scenario_config_exporter.scenario_filter(
203  scenario_name_regex=scenario_name_regex,
204  category=language_config.category,
205  client_language=language_config.client_language,
206  server_language=language_config.server_language)
207 
208  scenarios = scenario_transform(
209  scenario_config_exporter.gen_scenarios(language_config.language,
210  scenario_filter))
211 
212  for scenario in scenarios:
213  for run_index in gen_run_indices(runs_per_test):
214  uniq = (uniquifier_elements +
215  [run_index] if run_index else uniquifier_elements)
216  name = loadtest_name(prefix, scenario['name'], uniq)
217  scenario_str = json.dumps({'scenarios': scenario},
218  indent=' ') + '\n'
219 
220  config = copy.deepcopy(base_config)
221 
222  metadata = config['metadata']
223  metadata['name'] = name
224  if 'labels' not in metadata:
225  metadata['labels'] = dict()
226  metadata['labels']['language'] = safe_name(language_config.language)
227  metadata['labels']['prefix'] = prefix
228  if 'annotations' not in metadata:
229  metadata['annotations'] = dict()
230  metadata['annotations'].update(annotations)
231  metadata['annotations'].update({
232  'scenario': scenario['name'],
233  'uniquifier': '-'.join(uniq),
234  })
235 
236  spec = config['spec']
237 
238  # Select clients with the required language.
239  clients = [
240  client for client in base_config_clients
241  if client['language'] == cl
242  ]
243  if not clients:
244  raise IndexError('Client language not found in template: %s' %
245  cl)
246 
247  # Validate config for additional client instances.
248  if instances_per_client > 1:
249  c = collections.Counter(
250  (client.get('name', '') for client in clients))
251  if max(c.values()) > 1:
252  raise ValueError(
253  ('Multiple instances of multiple clients requires '
254  'unique names, name counts for language %s: %s') %
255  (cl, c.most_common()))
256 
257  # Name client instances with an index starting from zero.
258  client_instances = []
259  for i in range(instances_per_client):
260  client_instances.extend(copy.deepcopy(clients))
261  for client in client_instances[-len(clients):]:
262  client['name'] = component_name((client.get('name',
263  ''), str(i)))
264 
265  # Set clients to named instances.
266  spec['clients'] = client_instances
267 
268  # Select servers with the required language.
269  servers = copy.deepcopy([
270  server for server in base_config_servers
271  if server['language'] == sl
272  ])
273  if not servers:
274  raise IndexError('Server language not found in template: %s' %
275  sl)
276 
277  # Name servers with an index for consistency with clients.
278  for i, server in enumerate(servers):
279  server['name'] = component_name((server.get('name',
280  ''), str(i)))
281 
282  # Set servers to named instances.
283  spec['servers'] = servers
284 
285  # Add driver, if needed.
286  if 'driver' not in spec:
287  spec['driver'] = dict()
288 
289  # Ensure driver has language and run fields.
290  driver = spec['driver']
291  if 'language' not in driver:
292  driver['language'] = safe_name('c++')
293  if 'run' not in driver:
294  driver['run'] = dict()
295 
296  # Name the driver with an index for consistency with workers.
297  # There is only one driver, so the index is zero.
298  if 'name' not in driver or not driver['name']:
299  driver['name'] = '0'
300 
301  spec['scenariosJSON'] = scenario_str
302 
303  yield config
304 
305 
306 def parse_key_value_args(args: Optional[Iterable[str]]) -> Dict[str, str]:
307  """Parses arguments in the form key=value into a dictionary."""
308  d = dict()
309  if args is None:
310  return d
311  for arg in args:
312  key, equals, value = arg.partition('=')
313  if equals != '=':
314  raise ValueError('Expected key=value: ' + value)
315  d[key] = value
316  return d
317 
318 
319 def clear_empty_fields(config: Dict[str, Any]) -> None:
320  """Clears fields set to empty values by string substitution."""
321  spec = config['spec']
322  if 'clients' in spec:
323  for client in spec['clients']:
324  if 'pool' in client and not client['pool']:
325  del client['pool']
326  if 'servers' in spec:
327  for server in spec['servers']:
328  if 'pool' in server and not server['pool']:
329  del server['pool']
330  if 'driver' in spec:
331  driver = spec['driver']
332  if 'pool' in driver and not driver['pool']:
333  del driver['pool']
334  if ('run' in driver and 'image' in driver['run'] and
335  not driver['run']['image']):
336  del driver['run']['image']
337  if 'results' in spec and not ('bigQueryTable' in spec['results'] and
338  spec['results']['bigQueryTable']):
339  del spec['results']
340 
341 
342 def config_dumper(header_comment: str) -> Type[yaml.SafeDumper]:
343  """Returns a custom dumper to dump configurations in the expected format."""
344 
345  class ConfigDumper(yaml.SafeDumper):
346 
347  def expect_stream_start(self):
348  super().expect_stream_start()
349  if isinstance(self.event, yaml.StreamStartEvent):
350  self.write_indent()
351  self.write_indicator(header_comment, need_whitespace=False)
352 
353  def str_presenter(dumper, data):
354  if '\n' in data:
355  return dumper.represent_scalar('tag:yaml.org,2002:str',
356  data,
357  style='|')
358  return dumper.represent_scalar('tag:yaml.org,2002:str', data)
359 
360  ConfigDumper.add_representer(str, str_presenter)
361 
362  return ConfigDumper
363 
364 
365 def main() -> None:
366  language_choices = sorted(scenario_config.LANGUAGES.keys())
367  argp = argparse.ArgumentParser(
368  description='Generates load test configs from a template.',
369  fromfile_prefix_chars='@')
370  argp.add_argument('-l',
371  '--language',
372  action='append',
373  choices=language_choices,
374  required=True,
375  help='Language(s) to benchmark.',
376  dest='languages')
377  argp.add_argument('-t',
378  '--template',
379  type=str,
380  required=True,
381  help='LoadTest configuration yaml file template.')
382  argp.add_argument('-s',
383  '--substitution',
384  action='append',
385  default=[],
386  help='Template substitution(s), in the form key=value.',
387  dest='substitutions')
388  argp.add_argument('-p',
389  '--prefix',
390  default='',
391  type=str,
392  help='Test name prefix.')
393  argp.add_argument('-u',
394  '--uniquifier_element',
395  action='append',
396  default=[],
397  help='String element(s) to make the test name unique.',
398  dest='uniquifier_elements')
399  argp.add_argument(
400  '-d',
401  action='store_true',
402  help='Use creation date and time as an additional uniquifier element.')
403  argp.add_argument('-a',
404  '--annotation',
405  action='append',
406  default=[],
407  help='metadata.annotation(s), in the form key=value.',
408  dest='annotations')
409  argp.add_argument('-r',
410  '--regex',
411  default='.*',
412  type=str,
413  help='Regex to select scenarios to run.')
414  argp.add_argument(
415  '--category',
416  choices=['all', 'inproc', 'scalable', 'smoketest', 'sweep', 'psm'],
417  default='all',
418  help='Select a category of tests to run.')
419  argp.add_argument(
420  '--allow_client_language',
421  action='append',
422  choices=language_choices,
423  default=[],
424  help='Allow cross-language scenarios with this client language.',
425  dest='allow_client_languages')
426  argp.add_argument(
427  '--allow_server_language',
428  action='append',
429  choices=language_choices,
430  default=[],
431  help='Allow cross-language scenarios with this server language.',
432  dest='allow_server_languages')
433  argp.add_argument('--instances_per_client',
434  default=1,
435  type=int,
436  help="Number of instances to generate for each client.")
437  argp.add_argument('--runs_per_test',
438  default=1,
439  type=int,
440  help='Number of copies to generate for each test.')
441  argp.add_argument('-o',
442  '--output',
443  type=str,
444  help='Output file name. Output to stdout if not set.')
445  argp.add_argument('--client_channels',
446  type=int,
447  help='Number of client channels.')
448  argp.add_argument('--server_threads',
449  type=int,
450  help='Number of async server threads.')
451  argp.add_argument(
452  '--offered_loads',
453  nargs="*",
454  type=int,
455  default=[],
456  help='A list of QPS values at which each load test scenario will be run.'
457  )
458  args = argp.parse_args()
459 
460  if args.instances_per_client < 1:
461  argp.error('instances_per_client must be greater than zero.')
462 
463  if args.runs_per_test < 1:
464  argp.error('runs_per_test must be greater than zero.')
465 
466  # Config generation ignores environment variables that are passed by the
467  # controller at runtime.
468  substitutions = {
469  'DRIVER_PORT': '${DRIVER_PORT}',
470  'KILL_AFTER': '${KILL_AFTER}',
471  'POD_TIMEOUT': '${POD_TIMEOUT}',
472  }
473 
474  # The user can override the ignored variables above by passing them in as
475  # substitution keys.
476  substitutions.update(parse_key_value_args(args.substitutions))
477 
478  uniquifier_elements = args.uniquifier_elements
479  if args.d:
480  uniquifier_elements.append(now_string())
481 
482  annotations = parse_key_value_args(args.annotations)
483 
484  transform = scenario_transform_function(args.client_channels,
485  args.server_threads,
486  args.offered_loads)
487 
488  with open(args.template) as f:
489  base_config = yaml.safe_load(
490  string.Template(f.read()).substitute(substitutions))
491 
492  clear_empty_fields(base_config)
493 
494  spec = base_config['spec']
495  base_config_clients = spec['clients']
496  del spec['clients']
497  base_config_servers = spec['servers']
498  del spec['servers']
499 
500  client_languages = [''] + args.allow_client_languages
501  server_languages = [''] + args.allow_server_languages
502  config_generators = []
503  for l, cl, sl in itertools.product(args.languages, client_languages,
504  server_languages):
505  language_config = scenario_config_exporter.LanguageConfig(
506  category=args.category,
507  language=l,
508  client_language=cl,
509  server_language=sl)
510  config_generators.append(
511  gen_loadtest_configs(base_config,
512  base_config_clients,
513  base_config_servers,
514  args.regex,
515  language_config,
516  loadtest_name_prefix=args.prefix,
517  uniquifier_elements=uniquifier_elements,
518  annotations=annotations,
519  instances_per_client=args.instances_per_client,
520  runs_per_test=args.runs_per_test,
521  scenario_transform=transform))
522  configs = (config for config in itertools.chain(*config_generators))
523 
524  with open(args.output, 'w') if args.output else sys.stdout as f:
525  yaml.dump_all(configs,
526  stream=f,
527  Dumper=config_dumper(
528  CONFIGURATION_FILE_HEADER_COMMENT.strip()),
529  default_flow_style=False)
530 
531 
532 if __name__ == '__main__':
533  main()
xds_interop_client.str
str
Definition: xds_interop_client.py:487
test_group_name.all
all
Definition: test_group_name.py:241
http2_test_server.format
format
Definition: http2_test_server.py:118
grpc._common._transform
def _transform(message, transformer, exception_message)
Definition: grpc/_common.py:81
performance.loadtest_config.main
None main()
Definition: loadtest_config.py:365
performance.loadtest_config.loadtest_name
str loadtest_name(str prefix, str scenario_name, Iterable[str] uniquifier_elements)
Definition: loadtest_config.py:82
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
performance.loadtest_config.default_prefix
str default_prefix()
Definition: loadtest_config.py:53
performance.loadtest_config.scenario_transform_function
Optional[Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str, Any]]]] scenario_transform_function(Optional[int] client_channels, Optional[int] server_threads, Optional[Iterable[int]] offered_loads)
Definition: loadtest_config.py:135
performance.loadtest_config.config_dumper
Type[yaml.SafeDumper] config_dumper(str header_comment)
Definition: loadtest_config.py:342
performance.loadtest_config.validate_annotations
None validate_annotations(Dict[str, str] annotations)
Definition: loadtest_config.py:100
performance.loadtest_config.scenario_name
def scenario_name(str base_name, Optional[int] client_channels, Optional[int] server_threads, Optional[int] offered_load)
Definition: loadtest_config.py:121
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
performance.loadtest_config.loadtest_base_name
str loadtest_base_name(str scenario_name, Iterable[str] uniquifier_elements)
Definition: loadtest_config.py:74
performance.loadtest_config.validate_loadtest_name
None validate_loadtest_name(str name)
Definition: loadtest_config.py:63
performance.loadtest_config.gen_loadtest_configs
Iterable[Dict[str, Any]] gen_loadtest_configs(Mapping[str, Any] base_config, Iterable[Mapping[str, Any]] base_config_clients, Iterable[Mapping[str, Any]] base_config_servers, str scenario_name_regex, scenario_config_exporter.LanguageConfig language_config, str loadtest_name_prefix, Iterable[str] uniquifier_elements, Mapping[str, str] annotations, int instances_per_client=1, int runs_per_test=1, Callable[[Iterable[Mapping[str, Any]]], List[Dict[str, Any]]] scenario_transform=lambda s:s)
Definition: loadtest_config.py:180
main
Definition: main.py:1
performance.loadtest_config.gen_run_indices
Iterable[str] gen_run_indices(int runs_per_test)
Definition: loadtest_config.py:110
cpp.gmock_class.set
set
Definition: bloaty/third_party/googletest/googlemock/scripts/generator/cpp/gmock_class.py:44
performance.loadtest_config.now_string
str now_string()
Definition: loadtest_config.py:58
performance.loadtest_config.clear_empty_fields
None clear_empty_fields(Dict[str, Any] config)
Definition: loadtest_config.py:319
open
#define open
Definition: test-fs.c:46
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
performance.loadtest_config.safe_name
str safe_name(str language)
Definition: loadtest_config.py:48
performance.loadtest_config.parse_key_value_args
Dict[str, str] parse_key_value_args(Optional[Iterable[str]] args)
Definition: loadtest_config.py:306
update
absl::optional< XdsClusterResource > update
Definition: cds.cc:150
performance.loadtest_config.component_name
str component_name(Iterable[str] elements)
Definition: loadtest_config.py:95


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:29