ros_prerelease_tests.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import os
4 import subprocess
5 import sys
6 
7 import urllib.request
8 import yaml
9 
10 from termcolor import cprint
11 print_blue = lambda x: cprint(x, 'blue', attrs=['bold'])
12 print_cyan = lambda x: cprint(x, 'cyan', attrs=['bold'])
13 print_green = lambda x: cprint(x, 'green', attrs=['bold'])
14 print_magenta = lambda x: cprint(x, 'magenta', attrs=['bold'])
15 print_red = lambda x: cprint(x, 'red', attrs=['bold'])
16 print_yellow = lambda x: cprint(x, 'yellow', attrs=['bold'])
17 
18 ROS_DISTROS = ['kinetic', 'lunar', 'melodic']
19 
20 REPO_URL = 'https://raw.githubusercontent.com/ros-infrastructure/ros_buildfarm_config'
21 BRANCH = 'production'
22 INDEX_FILE = 'index.yaml'
23 
24 def run_test(target, branch):
25  test_name = "%(ros_distro)s_%(os_distro)s_%(os_release)s_%(arch)s" % target
26 
27  config = target
28  config['branch'] = branch
29  config['test_name'] = test_name
30 
31  # move working directory into test folder
32  os.mkdir(test_name)
33  os.chdir(test_name)
34 
35  # generate prerelease scripts
36  print("Generating prerelease scripts...")
37  generate_prerelease_command = """
38  generate_prerelease_script.py \\
39  https://raw.githubusercontent.com/ros-infrastructure/ros_buildfarm_config/production/index.yaml \\
40  %(ros_distro)s default %(os_distro)s %(os_release)s %(arch)s \\
41  --custom-repo \\
42  async_com__custom-2:git:https://github.com/dpkoch/async_comm.git:%(branch)s \\
43  --level 0 \\
44  --output-dir ./
45  """ % config
46 
47  with open("../%s-generate.log" % (test_name), 'w') as log_file:
48  process = subprocess.run(generate_prerelease_command, shell=True, stdout=log_file, stderr=subprocess.STDOUT)
49  if process.returncode != 0:
50  print_yellow("Generating prerelease scripts failed!")
51  os.chdir('..')
52  print_red("[Failed]")
53  return False
54 
55  # run prerelease test
56  print("Running prerelease tests...")
57  with open("../%s-test.log" % (test_name), 'w') as log_file:
58  process = subprocess.run("./prerelease.sh", stdout=log_file, stderr=subprocess.STDOUT)
59  os.chdir('..')
60  if process.returncode == 0:
61  print_green("[Passed]")
62  return True
63  else:
64  print_red("[Failed]")
65  return False
66 
67 def get_repo_file(repo_path):
68  url = '/'.join([REPO_URL, BRANCH, repo_path])
69  with urllib.request.urlopen(url) as response:
70  return response.read()
71 
73  targets = []
74  index_yaml = yaml.load(get_repo_file(INDEX_FILE))
75  for ros_distro in ROS_DISTROS:
76  config_files = index_yaml['distributions'][ros_distro]['release_builds']
77  for file_path in config_files.values():
78  config_yaml = yaml.load(get_repo_file(file_path))
79  for os_distro in config_yaml['targets']:
80  for os_release in config_yaml['targets'][os_distro]:
81  for arch in config_yaml['targets'][os_distro][os_release]:
82  targets.append({'ros_distro': ros_distro, \
83  'os_distro': os_distro, \
84  'os_release': os_release, \
85  'arch': arch})
86  return targets
87 
89 
90  print_magenta("Running all tests for the \"%s\" branch" % (branch))
91 
92  print()
93  print_magenta("Running tests for the following ROS distros:")
94  for distro in ROS_DISTROS:
95  print(distro)
96 
97  print()
98  print("Retrieving release targets...")
99  targets = get_prerelease_targets()
100 
101  print()
102  print_magenta("The following release targets will be tested:")
103  for target in targets:
104  print("%(ros_distro)s %(os_distro)s %(os_release)s %(arch)s" % target)
105 
106  # run all tests and aggregate overall result into exit code
107  failed_tests = 0
108  for target in targets:
109  print()
110  print_blue("Testing %(ros_distro)s %(os_distro)s %(os_release)s %(arch)s" % target)
111  if not run_test(target, branch):
112  failed_tests += 1
113 
114  # print and return overall result
115  print()
116  if failed_tests > 0:
117  print_red("Failed %d of %d tests." % (failed_tests, len(targets)))
118  return False
119  else:
120  print_green("Passed %d tests." % (len(targets)))
121  return True
122 
123 if __name__ == '__main__':
124 
125  # check that branch argument has been passed in
126  if len(sys.argv) != 2:
127  print("Usage: %s <branch>" % (sys.argv[0]))
128  exit(2)
129  branch = sys.argv[1]
130 
131  # check that working directory is empty
132  if os.listdir('.'):
133  print("Non-empty directory, please run in an empty directory. Aborting.")
134  exit(3)
135 
136  exit_code = 0 if run_prerelease_tests(branch) else 1
137  print_cyan("Exit code: %d" % (exit_code))
138  exit(exit_code)
def run_test(target, branch)
def get_repo_file(repo_path)


async_comm
Author(s):
autogenerated on Thu May 16 2019 03:03:47