bq_upload_result.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Copyright 2016 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 # Uploads performance benchmark result file to bigquery.
17 
18 from __future__ import print_function
19 
20 import argparse
21 import calendar
22 import json
23 import os
24 import sys
25 import time
26 import uuid
27 
28 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
29 import massage_qps_stats
30 
31 gcp_utils_dir = os.path.abspath(
32  os.path.join(os.path.dirname(__file__), '../../gcp/utils'))
33 sys.path.append(gcp_utils_dir)
34 import big_query_utils
35 
36 _PROJECT_ID = 'grpc-testing'
37 
38 
39 def _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, result_file):
40  with open(result_file, 'r') as f:
41  (col1, col2, col3) = f.read().split(',')
42  latency50 = float(col1.strip()) * 1000
43  latency90 = float(col2.strip()) * 1000
44  latency99 = float(col3.strip()) * 1000
45 
46  scenario_result = {
47  'scenario': {
48  'name': 'netperf_tcp_rr'
49  },
50  'summary': {
51  'latency50': latency50,
52  'latency90': latency90,
53  'latency99': latency99
54  }
55  }
56 
58  _create_results_table(bq, dataset_id, table_id)
59 
60  if not _insert_result(
61  bq, dataset_id, table_id, scenario_result, flatten=False):
62  print('Error uploading result to bigquery.')
63  sys.exit(1)
64 
65 
66 def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file,
67  metadata_file, node_info_file,
68  prometheus_query_results_file):
69  with open(result_file, 'r') as f:
70  scenario_result = json.loads(f.read())
71 
73  _create_results_table(bq, dataset_id, table_id)
74 
75  if not _insert_scenario_result(bq, dataset_id, table_id, scenario_result,
76  metadata_file, node_info_file,
77  prometheus_query_results_file):
78  print('Error uploading result to bigquery.')
79  sys.exit(1)
80 
81 
82 def _insert_result(bq, dataset_id, table_id, scenario_result, flatten=True):
83  if flatten:
84  _flatten_result_inplace(scenario_result)
85  _populate_metadata_inplace(scenario_result)
86  row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
87  return big_query_utils.insert_rows(bq, _PROJECT_ID, dataset_id, table_id,
88  [row])
89 
90 
92  dataset_id,
93  table_id,
94  scenario_result,
95  test_metadata_file,
96  node_info_file,
97  prometheus_query_results_file,
98  flatten=True):
99  if flatten:
100  _flatten_result_inplace(scenario_result)
101  _populate_metadata_from_file(scenario_result, test_metadata_file)
102  _populate_node_metadata_from_file(scenario_result, node_info_file)
104  prometheus_query_results_file)
105  row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
106  return big_query_utils.insert_rows(bq, _PROJECT_ID, dataset_id, table_id,
107  [row])
108 
109 
110 def _create_results_table(bq, dataset_id, table_id):
111  with open(os.path.dirname(__file__) + '/scenario_result_schema.json',
112  'r') as f:
113  table_schema = json.loads(f.read())
114  desc = 'Results of performance benchmarks.'
115  return big_query_utils.create_table2(bq, _PROJECT_ID, dataset_id, table_id,
116  table_schema, desc)
117 
118 
119 def _flatten_result_inplace(scenario_result):
120  """Bigquery is not really great for handling deeply nested data
121  and repeated fields. To maintain values of some fields while keeping
122  the schema relatively simple, we artificially leave some of the fields
123  as JSON strings.
124  """
125  scenario_result['scenario']['clientConfig'] = json.dumps(
126  scenario_result['scenario']['clientConfig'])
127  scenario_result['scenario']['serverConfig'] = json.dumps(
128  scenario_result['scenario']['serverConfig'])
129  scenario_result['latencies'] = json.dumps(scenario_result['latencies'])
130  scenario_result['serverCpuStats'] = []
131  for stats in scenario_result['serverStats']:
132  scenario_result['serverCpuStats'].append(dict())
133  scenario_result['serverCpuStats'][-1]['totalCpuTime'] = stats.pop(
134  'totalCpuTime', None)
135  scenario_result['serverCpuStats'][-1]['idleCpuTime'] = stats.pop(
136  'idleCpuTime', None)
137  for stats in scenario_result['clientStats']:
138  stats['latencies'] = json.dumps(stats['latencies'])
139  stats.pop('requestResults', None)
140  scenario_result['serverCores'] = json.dumps(scenario_result['serverCores'])
141  scenario_result['clientSuccess'] = json.dumps(
142  scenario_result['clientSuccess'])
143  scenario_result['serverSuccess'] = json.dumps(
144  scenario_result['serverSuccess'])
145  scenario_result['requestResults'] = json.dumps(
146  scenario_result.get('requestResults', []))
147  scenario_result['serverCpuUsage'] = scenario_result['summary'].pop(
148  'serverCpuUsage', None)
149  scenario_result['summary'].pop('successfulRequestsPerSecond', None)
150  scenario_result['summary'].pop('failedRequestsPerSecond', None)
151  massage_qps_stats.massage_qps_stats(scenario_result)
152 
153 
154 def _populate_metadata_inplace(scenario_result):
155  """Populates metadata based on environment variables set by Jenkins."""
156  # NOTE: Grabbing the Kokoro environment variables will only work if the
157  # driver is running locally on the same machine where Kokoro has started
158  # the job. For our setup, this is currently the case, so just assume that.
159  build_number = os.getenv('KOKORO_BUILD_NUMBER')
160  build_url = 'https://source.cloud.google.com/results/invocations/%s' % os.getenv(
161  'KOKORO_BUILD_ID')
162  job_name = os.getenv('KOKORO_JOB_NAME')
163  git_commit = os.getenv('KOKORO_GIT_COMMIT')
164  # actual commit is the actual head of PR that is getting tested
165  # TODO(jtattermusch): unclear how to obtain on Kokoro
166  git_actual_commit = os.getenv('ghprbActualCommit')
167 
168  utc_timestamp = str(calendar.timegm(time.gmtime()))
169  metadata = {'created': utc_timestamp}
170 
171  if build_number:
172  metadata['buildNumber'] = build_number
173  if build_url:
174  metadata['buildUrl'] = build_url
175  if job_name:
176  metadata['jobName'] = job_name
177  if git_commit:
178  metadata['gitCommit'] = git_commit
179  if git_actual_commit:
180  metadata['gitActualCommit'] = git_actual_commit
181 
182  scenario_result['metadata'] = metadata
183 
184 
185 def _populate_metadata_from_file(scenario_result, test_metadata_file):
186  utc_timestamp = str(calendar.timegm(time.gmtime()))
187  metadata = {'created': utc_timestamp}
188 
189  _annotation_to_bq_metadata_key_map = {
190  'ci_' + key: key for key in (
191  'buildNumber',
192  'buildUrl',
193  'jobName',
194  'gitCommit',
195  'gitActualCommit',
196  )
197  }
198 
199  if os.access(test_metadata_file, os.R_OK):
200  with open(test_metadata_file, 'r') as f:
201  test_metadata = json.loads(f.read())
202 
203  # eliminate managedFields from metadata set
204  if 'managedFields' in test_metadata:
205  del test_metadata['managedFields']
206 
207  annotations = test_metadata.get('annotations', {})
208 
209  # if use kubectl apply ..., kubectl will append current configuration to
210  # annotation, the field is deleted since it includes a lot of irrelevant
211  # information
212  if 'kubectl.kubernetes.io/last-applied-configuration' in annotations:
213  del annotations['kubectl.kubernetes.io/last-applied-configuration']
214 
215  # dump all metadata as JSON to testMetadata field
216  scenario_result['testMetadata'] = json.dumps(test_metadata)
217  for key, value in _annotation_to_bq_metadata_key_map.items():
218  if key in annotations:
219  metadata[value] = annotations[key]
220 
221  scenario_result['metadata'] = metadata
222 
223 
224 def _populate_node_metadata_from_file(scenario_result, node_info_file):
225  node_metadata = {'driver': {}, 'servers': [], 'clients': []}
226  _node_info_to_bq_node_metadata_key_map = {
227  'Name': 'name',
228  'PodIP': 'podIP',
229  'NodeName': 'nodeName',
230  }
231 
232  if os.access(node_info_file, os.R_OK):
233  with open(node_info_file, 'r') as f:
234  file_metadata = json.loads(f.read())
235  for key, value in _node_info_to_bq_node_metadata_key_map.items():
236  node_metadata['driver'][value] = file_metadata['Driver'][key]
237  for clientNodeInfo in file_metadata['Clients']:
238  node_metadata['clients'].append({
239  value: clientNodeInfo[key] for key, value in
240  _node_info_to_bq_node_metadata_key_map.items()
241  })
242  for serverNodeInfo in file_metadata['Servers']:
243  node_metadata['servers'].append({
244  value: serverNodeInfo[key] for key, value in
245  _node_info_to_bq_node_metadata_key_map.items()
246  })
247 
248  scenario_result['nodeMetadata'] = node_metadata
249 
250 
252  prometheus_query_result_file):
253  """Populate the results from Prometheus query to Bigquery table """
254  if os.access(prometheus_query_result_file, os.R_OK):
255  with open(prometheus_query_result_file, 'r', encoding='utf8') as f:
256  file_query_results = json.loads(f.read())
257 
258  scenario_result['testDurationSeconds'] = file_query_results[
259  'testDurationSeconds']
260  clientsPrometheusData = []
261  if 'clients' in file_query_results:
262  for client_name, client_data in file_query_results[
263  'clients'].items():
264  clientPrometheusData = {'name': client_name}
265  containersPrometheusData = []
266  for container_name, container_data in client_data.items():
267  containerPrometheusData = {
268  'name': container_name,
269  'cpuSeconds': container_data['cpuSeconds'],
270  'memoryMean': container_data['memoryMean'],
271  }
272  containersPrometheusData.append(containerPrometheusData)
273  clientPrometheusData[
274  'containers'] = containersPrometheusData
275  clientsPrometheusData.append(clientPrometheusData)
276  scenario_result['clientsPrometheusData'] = clientsPrometheusData
277 
278  serversPrometheusData = []
279  if 'servers' in file_query_results:
280  for server_name, server_data in file_query_results[
281  'servers'].items():
282  serverPrometheusData = {'name': server_name}
283  containersPrometheusData = []
284  for container_name, container_data in server_data.items():
285  containerPrometheusData = {
286  'name': container_name,
287  'cpuSeconds': container_data['cpuSeconds'],
288  'memoryMean': container_data['memoryMean'],
289  }
290  containersPrometheusData.append(containerPrometheusData)
291  serverPrometheusData[
292  'containers'] = containersPrometheusData
293  serversPrometheusData.append(serverPrometheusData)
294  scenario_result['serversPrometheusData'] = serversPrometheusData
295 
296 
297 argp = argparse.ArgumentParser(description='Upload result to big query.')
298 argp.add_argument('--bq_result_table',
299  required=True,
300  default=None,
301  type=str,
302  help='Bigquery "dataset.table" to upload results to.')
303 argp.add_argument('--file_to_upload',
304  default='scenario_result.json',
305  type=str,
306  help='Report file to upload.')
307 argp.add_argument('--metadata_file_to_upload',
308  default='metadata.json',
309  type=str,
310  help='Metadata file to upload.')
311 argp.add_argument('--node_info_file_to_upload',
312  default='node_info.json',
313  type=str,
314  help='Node information file to upload.')
315 argp.add_argument('--prometheus_query_results_to_upload',
316  default='prometheus_query_result.json',
317  type=str,
318  help='Prometheus query result file to upload.')
319 argp.add_argument('--file_format',
320  choices=['scenario_result', 'netperf_latency_csv'],
321  default='scenario_result',
322  help='Format of the file to upload.')
323 
324 args = argp.parse_args()
325 
326 dataset_id, table_id = args.bq_result_table.split('.', 2)
327 
328 if args.file_format == 'netperf_latency_csv':
329  _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id,
330  args.file_to_upload)
331 else:
332  _upload_scenario_result_to_bigquery(dataset_id, table_id,
333  args.file_to_upload,
334  args.metadata_file_to_upload,
335  args.node_info_file_to_upload,
336  args.prometheus_query_results_to_upload)
337 print('Successfully uploaded %s, %s, %s and %s to BigQuery.\n' %
338  (args.file_to_upload, args.metadata_file_to_upload,
339  args.node_info_file_to_upload, args.prometheus_query_results_to_upload))
xds_interop_client.str
str
Definition: xds_interop_client.py:487
performance.bq_upload_result._create_results_table
def _create_results_table(bq, dataset_id, table_id)
Definition: bq_upload_result.py:110
performance.bq_upload_result._insert_scenario_result
def _insert_scenario_result(bq, dataset_id, table_id, scenario_result, test_metadata_file, node_info_file, prometheus_query_results_file, flatten=True)
Definition: bq_upload_result.py:91
performance.bq_upload_result._populate_node_metadata_from_file
def _populate_node_metadata_from_file(scenario_result, node_info_file)
Definition: bq_upload_result.py:224
performance.bq_upload_result._flatten_result_inplace
def _flatten_result_inplace(scenario_result)
Definition: bq_upload_result.py:119
performance.bq_upload_result._upload_scenario_result_to_bigquery
def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file, metadata_file, node_info_file, prometheus_query_results_file)
Definition: bq_upload_result.py:66
performance.bq_upload_result._populate_metadata_inplace
def _populate_metadata_inplace(scenario_result)
Definition: bq_upload_result.py:154
performance.bq_upload_result._insert_result
def _insert_result(bq, dataset_id, table_id, scenario_result, flatten=True)
Definition: bq_upload_result.py:82
performance.bq_upload_result._populate_prometheus_query_results_from_file
def _populate_prometheus_query_results_from_file(scenario_result, prometheus_query_result_file)
Definition: bq_upload_result.py:251
performance.bq_upload_result._upload_netperf_latency_csv_to_bigquery
def _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, result_file)
Definition: bq_upload_result.py:39
big_query_utils.create_table2
def create_table2(big_query, project_id, dataset_id, table_id, fields_schema, description, partition_type=None, expiration_ms=None)
Definition: tools/gcp/utils/big_query_utils.py:96
big_query_utils.create_big_query
def create_big_query()
Definition: tools/gcp/utils/big_query_utils.py:31
xds_manager.items
items
Definition: xds_manager.py:55
big_query_utils.make_row
def make_row(unique_row_id, row_values_dict)
Definition: tools/gcp/utils/big_query_utils.py:201
open
#define open
Definition: test-fs.c:46
big_query_utils.insert_rows
def insert_rows(big_query, project_id, dataset_id, table_id, rows_list)
Definition: tools/gcp/utils/big_query_utils.py:167
split
static void split(const char *s, char ***ss, size_t *ns)
Definition: debug/trace.cc:111
performance.bq_upload_result._populate_metadata_from_file
def _populate_metadata_from_file(scenario_result, test_metadata_file)
Definition: bq_upload_result.py:185


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:49