upload_rbe_results.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Copyright 2017 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Uploads RBE results to BigQuery"""
16 
17 import argparse
18 import json
19 import os
20 import ssl
21 import sys
22 import urllib.error
23 import urllib.parse
24 import urllib.request
25 import uuid
26 
27 gcp_utils_dir = os.path.abspath(
28  os.path.join(os.path.dirname(__file__), '../../gcp/utils'))
29 sys.path.append(gcp_utils_dir)
30 import big_query_utils
31 
32 _DATASET_ID = 'jenkins_test_results'
33 _DESCRIPTION = 'Test results from master RBE builds on Kokoro'
34 # 365 days in milliseconds
35 _EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000
36 _PARTITION_TYPE = 'DAY'
37 _PROJECT_ID = 'grpc-testing'
38 _RESULTS_SCHEMA = [
39  ('job_name', 'STRING', 'Name of Kokoro job'),
40  ('build_id', 'INTEGER', 'Build ID of Kokoro job'),
41  ('build_url', 'STRING', 'URL of Kokoro build'),
42  ('test_target', 'STRING', 'Bazel target path'),
43  ('test_class_name', 'STRING', 'Name of test class'),
44  ('test_case', 'STRING', 'Name of test case'),
45  ('result', 'STRING', 'Test or build result'),
46  ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
47  ('duration', 'FLOAT', 'Duration of the test run'),
48 ]
49 _TABLE_ID = 'rbe_test_results'
50 
51 
53  """Returns string with API key to access ResultStore.
54  Intended to be used in Kokoro environment."""
55  api_key_directory = os.getenv('KOKORO_GFILE_DIR')
56  api_key_file = os.path.join(api_key_directory, 'resultstore_api_key')
57  assert os.path.isfile(api_key_file), 'Must add --api_key arg if not on ' \
58  'Kokoro or Kokoro environment is not set up properly.'
59  with open(api_key_file, 'r') as f:
60  return f.read().replace('\n', '')
61 
62 
64  """Returns String of Bazel invocation ID. Intended to be used in
65  Kokoro environment."""
66  bazel_id_directory = os.getenv('KOKORO_ARTIFACTS_DIR')
67  bazel_id_file = os.path.join(bazel_id_directory, 'bazel_invocation_ids')
68  assert os.path.isfile(bazel_id_file), 'bazel_invocation_ids file, written ' \
69  'by RBE initialization script, expected but not found.'
70  with open(bazel_id_file, 'r') as f:
71  return f.read().replace('\n', '')
72 
73 
74 def _parse_test_duration(duration_str):
75  """Parse test duration string in '123.567s' format"""
76  try:
77  if duration_str.endswith('s'):
78  duration_str = duration_str[:-1]
79  return float(duration_str)
80  except:
81  return None
82 
83 
85  """Upload test results to a BQ table.
86 
87  Args:
88  rows: A list of dictionaries containing data for each row to insert
89  """
92  _PROJECT_ID,
93  _DATASET_ID,
94  _TABLE_ID,
95  _RESULTS_SCHEMA,
96  _DESCRIPTION,
97  partition_type=_PARTITION_TYPE,
98  expiration_ms=_EXPIRATION_MS)
99 
100  max_retries = 3
101  for attempt in range(max_retries):
102  if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, _TABLE_ID,
103  rows):
104  break
105  else:
106  if attempt < max_retries - 1:
107  print('Error uploading result to bigquery, will retry.')
108  else:
109  print(
110  'Error uploading result to bigquery, all attempts failed.')
111  sys.exit(1)
112 
113 
114 def _get_resultstore_data(api_key, invocation_id):
115  """Returns dictionary of test results by querying ResultStore API.
116  Args:
117  api_key: String of ResultStore API key
118  invocation_id: String of ResultStore invocation ID to results from
119  """
120  all_actions = []
121  page_token = ''
122  # ResultStore's API returns data on a limited number of tests. When we exceed
123  # that limit, the 'nextPageToken' field is included in the request to get
124  # subsequent data, so keep requesting until 'nextPageToken' field is omitted.
125  while True:
126  req = urllib.request.Request(
127  url=
128  'https://resultstore.googleapis.com/v2/invocations/%s/targets/-/configuredTargets/-/actions?key=%s&pageToken=%s&fields=next_page_token,actions.id,actions.status_attributes,actions.timing,actions.test_action'
129  % (invocation_id, api_key, page_token),
130  headers={'Content-Type': 'application/json'})
131  ctx_dict = {}
132  if os.getenv("PYTHONHTTPSVERIFY") == "0":
133  ctx = ssl.create_default_context()
134  ctx.check_hostname = False
135  ctx.verify_mode = ssl.CERT_NONE
136  ctx_dict = {"context": ctx}
137  raw_resp = urllib.request.urlopen(req, **ctx_dict).read()
138  decoded_resp = raw_resp if isinstance(
139  raw_resp, str) else raw_resp.decode('utf-8', 'ignore')
140  results = json.loads(decoded_resp)
141  all_actions.extend(results['actions'])
142  if 'nextPageToken' not in results:
143  break
144  page_token = results['nextPageToken']
145  return all_actions
146 
147 
148 if __name__ == "__main__":
149  # Arguments are necessary if running in a non-Kokoro environment.
150  argp = argparse.ArgumentParser(
151  description=
152  'Fetches results for given RBE invocation and uploads them to BigQuery table.'
153  )
154  argp.add_argument('--api_key',
155  default='',
156  type=str,
157  help='The API key to read from ResultStore API')
158  argp.add_argument('--invocation_id',
159  default='',
160  type=str,
161  help='UUID of bazel invocation to fetch.')
162  argp.add_argument('--bq_dump_file',
163  default=None,
164  type=str,
165  help='Dump JSON data to file just before uploading')
166  argp.add_argument('--resultstore_dump_file',
167  default=None,
168  type=str,
169  help='Dump JSON data as received from ResultStore API')
170  argp.add_argument('--skip_upload',
171  default=False,
172  action='store_const',
173  const=True,
174  help='Skip uploading to bigquery')
175  args = argp.parse_args()
176 
177  api_key = args.api_key or _get_api_key()
178  invocation_id = args.invocation_id or _get_invocation_id()
179  resultstore_actions = _get_resultstore_data(api_key, invocation_id)
180 
181  if args.resultstore_dump_file:
182  with open(args.resultstore_dump_file, 'w') as f:
183  json.dump(resultstore_actions, f, indent=4, sort_keys=True)
184  print(
185  ('Dumped resultstore data to file %s' % args.resultstore_dump_file))
186 
187  # google.devtools.resultstore.v2.Action schema:
188  # https://github.com/googleapis/googleapis/blob/master/google/devtools/resultstore/v2/action.proto
189  bq_rows = []
190  for index, action in enumerate(resultstore_actions):
191  # Filter out non-test related data, such as build results.
192  if 'testAction' not in action:
193  continue
194  # Some test results contain the fileProcessingErrors field, which indicates
195  # an issue with parsing results individual test cases.
196  if 'fileProcessingErrors' in action:
197  test_cases = [{
198  'testCase': {
199  'caseName': str(action['id']['actionId']),
200  }
201  }]
202  # Test timeouts have a different dictionary structure compared to pass and
203  # fail results.
204  elif action['statusAttributes']['status'] == 'TIMED_OUT':
205  test_cases = [{
206  'testCase': {
207  'caseName': str(action['id']['actionId']),
208  'timedOut': True
209  }
210  }]
211  # When RBE believes its infrastructure is failing, it will abort and
212  # mark running tests as UNKNOWN. These infrastructure failures may be
213  # related to our tests, so we should investigate if specific tests are
214  # repeatedly being marked as UNKNOWN.
215  elif action['statusAttributes']['status'] == 'UNKNOWN':
216  test_cases = [{
217  'testCase': {
218  'caseName': str(action['id']['actionId']),
219  'unknown': True
220  }
221  }]
222  # Take the timestamp from the previous action, which should be
223  # a close approximation.
224  action['timing'] = {
225  'startTime':
226  resultstore_actions[index - 1]['timing']['startTime']
227  }
228  elif 'testSuite' not in action['testAction']:
229  continue
230  elif 'tests' not in action['testAction']['testSuite']:
231  continue
232  else:
233  test_cases = []
234  for tests_item in action['testAction']['testSuite']['tests']:
235  test_cases += tests_item['testSuite']['tests']
236  for test_case in test_cases:
237  if any(s in test_case['testCase'] for s in ['errors', 'failures']):
238  result = 'FAILED'
239  elif 'timedOut' in test_case['testCase']:
240  result = 'TIMEOUT'
241  elif 'unknown' in test_case['testCase']:
242  result = 'UNKNOWN'
243  else:
244  result = 'PASSED'
245  try:
246  bq_rows.append({
247  'insertId': str(uuid.uuid4()),
248  'json': {
249  'job_name':
250  os.getenv('KOKORO_JOB_NAME'),
251  'build_id':
252  os.getenv('KOKORO_BUILD_NUMBER'),
253  'build_url':
254  'https://source.cloud.google.com/results/invocations/%s'
255  % invocation_id,
256  'test_target':
257  action['id']['targetId'],
258  'test_class_name':
259  test_case['testCase'].get('className', ''),
260  'test_case':
261  test_case['testCase']['caseName'],
262  'result':
263  result,
264  'timestamp':
265  action['timing']['startTime'],
266  'duration':
267  _parse_test_duration(action['timing']['duration']),
268  }
269  })
270  except Exception as e:
271  print(('Failed to parse test result. Error: %s' % str(e)))
272  print((json.dumps(test_case, indent=4)))
273  bq_rows.append({
274  'insertId': str(uuid.uuid4()),
275  'json': {
276  'job_name':
277  os.getenv('KOKORO_JOB_NAME'),
278  'build_id':
279  os.getenv('KOKORO_BUILD_NUMBER'),
280  'build_url':
281  'https://source.cloud.google.com/results/invocations/%s'
282  % invocation_id,
283  'test_target':
284  action['id']['targetId'],
285  'test_class_name':
286  'N/A',
287  'test_case':
288  'N/A',
289  'result':
290  'UNPARSEABLE',
291  'timestamp':
292  'N/A',
293  }
294  })
295 
296  if args.bq_dump_file:
297  with open(args.bq_dump_file, 'w') as f:
298  json.dump(bq_rows, f, indent=4, sort_keys=True)
299  print(('Dumped BQ data to file %s' % args.bq_dump_file))
300 
301  if not args.skip_upload:
302  # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
303  MAX_ROWS = 1000
304  for i in range(0, len(bq_rows), MAX_ROWS):
305  _upload_results_to_bq(bq_rows[i:i + MAX_ROWS])
306  else:
307  print('Skipped upload to bigquery.')
xds_interop_client.str
str
Definition: xds_interop_client.py:487
python_utils.upload_rbe_results._upload_results_to_bq
def _upload_results_to_bq(rows)
Definition: upload_rbe_results.py:84
get
absl::string_view get(const Cont &c)
Definition: abseil-cpp/absl/strings/str_replace_test.cc:185
capstone.range
range
Definition: third_party/bloaty/third_party/capstone/bindings/python/capstone/__init__.py:6
python_utils.upload_rbe_results._get_resultstore_data
def _get_resultstore_data(api_key, invocation_id)
Definition: upload_rbe_results.py:114
python_utils.upload_rbe_results._get_invocation_id
def _get_invocation_id()
Definition: upload_rbe_results.py:63
big_query_utils.create_big_query
def create_big_query()
Definition: tools/gcp/utils/big_query_utils.py:31
read
int read(izstream &zs, T *x, Items items)
Definition: bloaty/third_party/zlib/contrib/iostream2/zstream.h:115
python_utils.upload_rbe_results._parse_test_duration
def _parse_test_duration(duration_str)
Definition: upload_rbe_results.py:74
open
#define open
Definition: test-fs.c:46
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
big_query_utils.insert_rows
def insert_rows(big_query, project_id, dataset_id, table_id, rows_list)
Definition: tools/gcp/utils/big_query_utils.py:167
python_utils.upload_rbe_results._get_api_key
def _get_api_key()
Definition: upload_rbe_results.py:52
big_query_utils.create_partitioned_table
def create_partitioned_table(big_query, project_id, dataset_id, table_id, table_schema, description, partition_type='DAY', expiration_ms=_EXPIRATION_MS)
Definition: tools/gcp/utils/big_query_utils.py:76


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:48