33 from typing
import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type
37 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
38 import scenario_config
39 import scenario_config_exporter
41 CONFIGURATION_FILE_HEADER_COMMENT =
"""
42 # Load test configurations generated from a template by loadtest_config.py.
43 # See documentation below:
44 # https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md#grpc-oss-benchmarks
49 """Returns a name that is safe to use in labels and file names."""
50 return scenario_config.LANGUAGES[language].safename
54 """Constructs and returns a default prefix for LoadTest names."""
55 return os.environ.get(
'USER',
'loadtest')
59 """Returns the current date and time in string format."""
60 return datetime.datetime.now().strftime(
'%Y%m%d%H%M%S')
64 """Validates that a LoadTest name is in the expected format."""
67 'LoadTest name must be less than 253 characters long: %s' % name)
68 if not all(c.isalnum()
and not c.isupper()
for c
in name
if c !=
'-'):
69 raise ValueError(
'Invalid characters in LoadTest name: %s' % name)
70 if not name
or not name[0].isalpha()
or name[-1] ==
'-':
71 raise ValueError(
'Invalid format for LoadTest name: %s' % name)
75 uniquifier_elements: Iterable[str]) -> str:
76 """Constructs and returns the base name for a LoadTest resource."""
77 name_elements = scenario_name.split(
'_')
78 name_elements.extend(uniquifier_elements)
79 return '-'.join(element.lower()
for element
in name_elements)
83 uniquifier_elements: Iterable[str]) -> str:
84 """Constructs and returns a valid name for a LoadTest resource."""
88 name_elements.append(prefix)
89 name_elements.append(base_name)
90 name =
'-'.join(name_elements)
96 """Constructs a component name from possibly empty elements."""
97 return '-'.join((e
for e
in elements
if e))
101 """Validates that annotations do not contain reserved names.
103 These names are automatically added by the config generator.
105 names =
set((
'scenario',
'uniquifier')).intersection(annotations)
107 raise ValueError(
'Annotations contain reserved names: %s' % names)
111 """Generates run indices for multiple runs, as formatted strings."""
112 if runs_per_test < 2:
115 index_length =
len(
'{:d}'.
format(runs_per_test - 1))
116 index_fmt =
'{{:0{:d}d}}'.
format(index_length)
117 for i
in range(runs_per_test):
118 yield index_fmt.format(i)
122 server_threads: Optional[int], offered_load: Optional[int]):
123 """Constructs scenario name from base name and modifiers."""
125 elements = [base_name]
127 elements.append(
'{:d}channels'.
format(client_channels))
129 elements.append(
'{:d}threads'.
format(server_threads))
131 elements.append(
'{:d}load'.
format(offered_load))
132 return '_'.join(elements)
136 client_channels: Optional[int], server_threads: Optional[int],
137 offered_loads: Optional[Iterable[int]]
138 ) -> Optional[Callable[[Iterable[Mapping[str, Any]]], Iterable[Mapping[str,
140 """Returns a transform to be applied to a list of scenarios."""
141 if not any((client_channels, server_threads,
len(offered_loads))):
145 scenarios: Iterable[Mapping[str,
146 Any]]) -> Iterable[Mapping[str, Any]]:
147 """Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load."""
149 for base_scenario
in scenarios:
150 base_name = base_scenario[
'name']
152 base_scenario[
'client_config'][
153 'client_channels'] = client_channels
156 base_scenario[
'server_config'][
157 'async_server_threads'] = server_threads
159 if not offered_loads:
166 for offered_load
in offered_loads:
167 scenario = copy.deepcopy(base_scenario)
168 scenario[
'client_config'][
'load_params'] = {
170 'offered_load': offered_load
174 server_threads, offered_load)
181 base_config: Mapping[str, Any],
182 base_config_clients: Iterable[Mapping[str, Any]],
183 base_config_servers: Iterable[Mapping[str, Any]],
184 scenario_name_regex: str,
185 language_config: scenario_config_exporter.LanguageConfig,
186 loadtest_name_prefix: str,
187 uniquifier_elements: Iterable[str],
188 annotations: Mapping[str, str],
189 instances_per_client: int = 1,
190 runs_per_test: int = 1,
191 scenario_transform: Callable[[Iterable[Mapping[str, Any]]],
192 List[Dict[str, Any]]] =
lambda s: s
193 ) -> Iterable[Dict[str, Any]]:
194 """Generates LoadTest configurations for a given language config.
196 The LoadTest configurations are generated as YAML objects.
200 cl =
safe_name(language_config.client_language
or language_config.language)
201 sl =
safe_name(language_config.server_language
or language_config.language)
202 scenario_filter = scenario_config_exporter.scenario_filter(
203 scenario_name_regex=scenario_name_regex,
204 category=language_config.category,
205 client_language=language_config.client_language,
206 server_language=language_config.server_language)
208 scenarios = scenario_transform(
209 scenario_config_exporter.gen_scenarios(language_config.language,
212 for scenario
in scenarios:
214 uniq = (uniquifier_elements +
215 [run_index]
if run_index
else uniquifier_elements)
217 scenario_str = json.dumps({
'scenarios': scenario},
220 config = copy.deepcopy(base_config)
222 metadata = config[
'metadata']
223 metadata[
'name'] = name
224 if 'labels' not in metadata:
225 metadata[
'labels'] = dict()
226 metadata[
'labels'][
'language'] =
safe_name(language_config.language)
227 metadata[
'labels'][
'prefix'] = prefix
228 if 'annotations' not in metadata:
229 metadata[
'annotations'] = dict()
230 metadata[
'annotations'].
update(annotations)
231 metadata[
'annotations'].
update({
232 'scenario': scenario[
'name'],
233 'uniquifier':
'-'.join(uniq),
236 spec = config[
'spec']
240 client
for client
in base_config_clients
241 if client[
'language'] == cl
244 raise IndexError(
'Client language not found in template: %s' %
248 if instances_per_client > 1:
249 c = collections.Counter(
250 (client.get(
'name',
'')
for client
in clients))
251 if max(c.values()) > 1:
253 (
'Multiple instances of multiple clients requires '
254 'unique names, name counts for language %s: %s') %
255 (cl, c.most_common()))
258 client_instances = []
259 for i
in range(instances_per_client):
260 client_instances.extend(copy.deepcopy(clients))
261 for client
in client_instances[-
len(clients):]:
266 spec[
'clients'] = client_instances
269 servers = copy.deepcopy([
270 server
for server
in base_config_servers
271 if server[
'language'] == sl
274 raise IndexError(
'Server language not found in template: %s' %
278 for i, server
in enumerate(servers):
283 spec[
'servers'] = servers
286 if 'driver' not in spec:
287 spec[
'driver'] = dict()
290 driver = spec[
'driver']
291 if 'language' not in driver:
293 if 'run' not in driver:
294 driver[
'run'] = dict()
298 if 'name' not in driver
or not driver[
'name']:
301 spec[
'scenariosJSON'] = scenario_str
307 """Parses arguments in the form key=value into a dictionary."""
312 key, equals, value = arg.partition(
'=')
314 raise ValueError(
'Expected key=value: ' + value)
320 """Clears fields set to empty values by string substitution."""
321 spec = config[
'spec']
322 if 'clients' in spec:
323 for client
in spec[
'clients']:
324 if 'pool' in client
and not client[
'pool']:
326 if 'servers' in spec:
327 for server
in spec[
'servers']:
328 if 'pool' in server
and not server[
'pool']:
331 driver = spec[
'driver']
332 if 'pool' in driver
and not driver[
'pool']:
334 if (
'run' in driver
and 'image' in driver[
'run']
and
335 not driver[
'run'][
'image']):
336 del driver[
'run'][
'image']
337 if 'results' in spec
and not (
'bigQueryTable' in spec[
'results']
and
338 spec[
'results'][
'bigQueryTable']):
343 """Returns a custom dumper to dump configurations in the expected format."""
345 class ConfigDumper(yaml.SafeDumper):
347 def expect_stream_start(self):
348 super().expect_stream_start()
349 if isinstance(self.event, yaml.StreamStartEvent):
351 self.write_indicator(header_comment, need_whitespace=
False)
353 def str_presenter(dumper, data):
355 return dumper.represent_scalar(
'tag:yaml.org,2002:str',
358 return dumper.represent_scalar(
'tag:yaml.org,2002:str', data)
360 ConfigDumper.add_representer(str, str_presenter)
366 language_choices = sorted(scenario_config.LANGUAGES.keys())
367 argp = argparse.ArgumentParser(
368 description=
'Generates load test configs from a template.',
369 fromfile_prefix_chars=
'@')
370 argp.add_argument(
'-l',
373 choices=language_choices,
375 help=
'Language(s) to benchmark.',
377 argp.add_argument(
'-t',
381 help=
'LoadTest configuration yaml file template.')
382 argp.add_argument(
'-s',
386 help=
'Template substitution(s), in the form key=value.',
387 dest=
'substitutions')
388 argp.add_argument(
'-p',
392 help=
'Test name prefix.')
393 argp.add_argument(
'-u',
394 '--uniquifier_element',
397 help=
'String element(s) to make the test name unique.',
398 dest=
'uniquifier_elements')
402 help=
'Use creation date and time as an additional uniquifier element.')
403 argp.add_argument(
'-a',
407 help=
'metadata.annotation(s), in the form key=value.',
409 argp.add_argument(
'-r',
413 help=
'Regex to select scenarios to run.')
416 choices=[
'all',
'inproc',
'scalable',
'smoketest',
'sweep',
'psm'],
418 help=
'Select a category of tests to run.')
420 '--allow_client_language',
422 choices=language_choices,
424 help=
'Allow cross-language scenarios with this client language.',
425 dest=
'allow_client_languages')
427 '--allow_server_language',
429 choices=language_choices,
431 help=
'Allow cross-language scenarios with this server language.',
432 dest=
'allow_server_languages')
433 argp.add_argument(
'--instances_per_client',
436 help=
"Number of instances to generate for each client.")
437 argp.add_argument(
'--runs_per_test',
440 help=
'Number of copies to generate for each test.')
441 argp.add_argument(
'-o',
444 help=
'Output file name. Output to stdout if not set.')
445 argp.add_argument(
'--client_channels',
447 help=
'Number of client channels.')
448 argp.add_argument(
'--server_threads',
450 help=
'Number of async server threads.')
456 help=
'A list of QPS values at which each load test scenario will be run.'
458 args = argp.parse_args()
460 if args.instances_per_client < 1:
461 argp.error(
'instances_per_client must be greater than zero.')
463 if args.runs_per_test < 1:
464 argp.error(
'runs_per_test must be greater than zero.')
469 'DRIVER_PORT':
'${DRIVER_PORT}',
470 'KILL_AFTER':
'${KILL_AFTER}',
471 'POD_TIMEOUT':
'${POD_TIMEOUT}',
478 uniquifier_elements = args.uniquifier_elements
488 with open(args.template)
as f:
489 base_config = yaml.safe_load(
490 string.Template(f.read()).substitute(substitutions))
494 spec = base_config[
'spec']
495 base_config_clients = spec[
'clients']
497 base_config_servers = spec[
'servers']
500 client_languages = [
''] + args.allow_client_languages
501 server_languages = [
''] + args.allow_server_languages
502 config_generators = []
503 for l, cl, sl
in itertools.product(args.languages, client_languages,
505 language_config = scenario_config_exporter.LanguageConfig(
506 category=args.category,
510 config_generators.append(
516 loadtest_name_prefix=args.prefix,
517 uniquifier_elements=uniquifier_elements,
518 annotations=annotations,
519 instances_per_client=args.instances_per_client,
520 runs_per_test=args.runs_per_test,
521 scenario_transform=transform))
522 configs = (config
for config
in itertools.chain(*config_generators))
524 with open(args.output,
'w')
if args.output
else sys.stdout
as f:
525 yaml.dump_all(configs,
528 CONFIGURATION_FILE_HEADER_COMMENT.strip()),
529 default_flow_style=
False)
532 if __name__ ==
'__main__':