big_query_utils.py
Go to the documentation of this file.
1 #!/usr/bin/env python2.7
2 
3 from __future__ import print_function
4 import argparse
5 import json
6 import uuid
7 import httplib2
8 
9 from apiclient import discovery
10 from apiclient.errors import HttpError
11 from oauth2client.client import GoogleCredentials
12 
13 # 30 days in milliseconds
14 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
15 NUM_RETRIES = 3
16 
17 
19  """Authenticates with cloud platform and gets a BiqQuery service object
20  """
21  creds = GoogleCredentials.get_application_default()
22  return discovery.build(
23  'bigquery', 'v2', credentials=creds, cache_discovery=False)
24 
25 
26 def create_dataset(biq_query, project_id, dataset_id):
27  is_success = True
28  body = {
29  'datasetReference': {
30  'projectId': project_id,
31  'datasetId': dataset_id
32  }
33  }
34 
35  try:
36  dataset_req = biq_query.datasets().insert(
37  projectId=project_id, body=body)
38  dataset_req.execute(num_retries=NUM_RETRIES)
39  except HttpError as http_error:
40  if http_error.resp.status == 409:
41  print('Warning: The dataset %s already exists' % dataset_id)
42  else:
43  # Note: For more debugging info, print "http_error.content"
44  print('Error in creating dataset: %s. Err: %s' % (dataset_id,
45  http_error))
46  is_success = False
47  return is_success
48 
49 
50 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
51  description):
52  fields = [{
53  'name': field_name,
54  'type': field_type,
55  'description': field_description
56  } for (field_name, field_type, field_description) in table_schema]
57  return create_table2(big_query, project_id, dataset_id, table_id, fields,
58  description)
59 
60 
62  project_id,
63  dataset_id,
64  table_id,
65  table_schema,
66  description,
67  partition_type='DAY',
68  expiration_ms=_EXPIRATION_MS):
69  """Creates a partitioned table. By default, a date-paritioned table is created with
70  each partition lasting 30 days after it was last modified.
71  """
72  fields = [{
73  'name': field_name,
74  'type': field_type,
75  'description': field_description
76  } for (field_name, field_type, field_description) in table_schema]
77  return create_table2(big_query, project_id, dataset_id, table_id, fields,
78  description, partition_type, expiration_ms)
79 
80 
81 def create_table2(big_query,
82  project_id,
83  dataset_id,
84  table_id,
85  fields_schema,
86  description,
87  partition_type=None,
88  expiration_ms=None):
89  is_success = True
90 
91  body = {
92  'description': description,
93  'schema': {
94  'fields': fields_schema
95  },
96  'tableReference': {
97  'datasetId': dataset_id,
98  'projectId': project_id,
99  'tableId': table_id
100  }
101  }
102 
103  if partition_type and expiration_ms:
104  body["timePartitioning"] = {
105  "type": partition_type,
106  "expirationMs": expiration_ms
107  }
108 
109  try:
110  table_req = big_query.tables().insert(
111  projectId=project_id, datasetId=dataset_id, body=body)
112  res = table_req.execute(num_retries=NUM_RETRIES)
113  print('Successfully created %s "%s"' % (res['kind'], res['id']))
114  except HttpError as http_error:
115  if http_error.resp.status == 409:
116  print('Warning: Table %s already exists' % table_id)
117  else:
118  print('Error in creating table: %s. Err: %s' % (table_id,
119  http_error))
120  is_success = False
121  return is_success
122 
123 
124 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
125  is_success = True
126 
127  body = {
128  'schema': {
129  'fields': fields_schema
130  },
131  'tableReference': {
132  'datasetId': dataset_id,
133  'projectId': project_id,
134  'tableId': table_id
135  }
136  }
137 
138  try:
139  table_req = big_query.tables().patch(
140  projectId=project_id,
141  datasetId=dataset_id,
142  tableId=table_id,
143  body=body)
144  res = table_req.execute(num_retries=NUM_RETRIES)
145  print('Successfully patched %s "%s"' % (res['kind'], res['id']))
146  except HttpError as http_error:
147  print('Error in creating table: %s. Err: %s' % (table_id, http_error))
148  is_success = False
149  return is_success
150 
151 
152 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
153  is_success = True
154  body = {'rows': rows_list}
155  try:
156  insert_req = big_query.tabledata().insertAll(
157  projectId=project_id,
158  datasetId=dataset_id,
159  tableId=table_id,
160  body=body)
161  res = insert_req.execute(num_retries=NUM_RETRIES)
162  if res.get('insertErrors', None):
163  print('Error inserting rows! Response: %s' % res)
164  is_success = False
165  except HttpError as http_error:
166  print('Error inserting rows to the table %s' % table_id)
167  is_success = False
168 
169  return is_success
170 
171 
172 def sync_query_job(big_query, project_id, query, timeout=5000):
173  query_data = {'query': query, 'timeoutMs': timeout}
174  query_job = None
175  try:
176  query_job = big_query.jobs().query(
177  projectId=project_id,
178  body=query_data).execute(num_retries=NUM_RETRIES)
179  except HttpError as http_error:
180  print('Query execute job failed with error: %s' % http_error)
181  print(http_error.content)
182  return query_job
183 
184 
185  # List of (column name, column type, description) tuples
186 def make_row(unique_row_id, row_values_dict):
187  """row_values_dict is a dictionary of column name and column value.
188  """
189  return {'insertId': unique_row_id, 'json': row_values_dict}
insert
static void insert(upb_table *t, lookupkey_t key, upb_tabkey tabkey, upb_value val, uint32_t hash, hashfunc_t *hashfunc, eqlfunc_t *eql)
Definition: php/ext/google/protobuf/upb.c:4804
benchmarks.util.big_query_utils.create_table
def create_table(big_query, project_id, dataset_id, table_id, table_schema, description)
Definition: big_query_utils.py:50
benchmarks.util.big_query_utils.create_table2
def create_table2(big_query, project_id, dataset_id, table_id, fields_schema, description, partition_type=None, expiration_ms=None)
Definition: big_query_utils.py:81
benchmarks.util.big_query_utils.create_dataset
def create_dataset(biq_query, project_id, dataset_id)
Definition: big_query_utils.py:26
benchmarks.util.big_query_utils.patch_table
def patch_table(big_query, project_id, dataset_id, table_id, fields_schema)
Definition: big_query_utils.py:124
benchmarks.util.big_query_utils.insert_rows
def insert_rows(big_query, project_id, dataset_id, table_id, rows_list)
Definition: big_query_utils.py:152
benchmarks.util.big_query_utils.make_row
def make_row(unique_row_id, row_values_dict)
Definition: big_query_utils.py:186
benchmarks.util.big_query_utils.create_partitioned_table
def create_partitioned_table(big_query, project_id, dataset_id, table_id, table_schema, description, partition_type='DAY', expiration_ms=_EXPIRATION_MS)
Definition: big_query_utils.py:61
benchmarks.util.big_query_utils.create_big_query
def create_big_query()
Definition: big_query_utils.py:18
print
static unsigned char * print(const cJSON *const item, cJSON_bool format, const internal_hooks *const hooks)
Definition: cJSON.c:1074
benchmarks.util.big_query_utils.sync_query_job
def sync_query_job(big_query, project_id, query, timeout=5000)
Definition: big_query_utils.py:172


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:48