3 from __future__
import print_function
9 from apiclient
import discovery
10 from apiclient.errors
import HttpError
11 from oauth2client.client
import GoogleCredentials
14 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
19 """Authenticates with cloud platform and gets a BiqQuery service object
21 creds = GoogleCredentials.get_application_default()
22 return discovery.build(
23 'bigquery',
'v2', credentials=creds, cache_discovery=
False)
30 'projectId': project_id,
31 'datasetId': dataset_id
36 dataset_req = biq_query.datasets().
insert(
37 projectId=project_id, body=body)
38 dataset_req.execute(num_retries=NUM_RETRIES)
39 except HttpError
as http_error:
40 if http_error.resp.status == 409:
41 print(
'Warning: The dataset %s already exists' % dataset_id)
44 print(
'Error in creating dataset: %s. Err: %s' % (dataset_id,
50 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
55 'description': field_description
56 }
for (field_name, field_type, field_description)
in table_schema]
57 return create_table2(big_query, project_id, dataset_id, table_id, fields,
68 expiration_ms=_EXPIRATION_MS):
69 """Creates a partitioned table. By default, a date-paritioned table is created with
70 each partition lasting 30 days after it was last modified.
75 'description': field_description
76 }
for (field_name, field_type, field_description)
in table_schema]
77 return create_table2(big_query, project_id, dataset_id, table_id, fields,
78 description, partition_type, expiration_ms)
92 'description': description,
94 'fields': fields_schema
97 'datasetId': dataset_id,
98 'projectId': project_id,
103 if partition_type
and expiration_ms:
104 body[
"timePartitioning"] = {
105 "type": partition_type,
106 "expirationMs": expiration_ms
110 table_req = big_query.tables().
insert(
111 projectId=project_id, datasetId=dataset_id, body=body)
112 res = table_req.execute(num_retries=NUM_RETRIES)
113 print(
'Successfully created %s "%s"' % (res[
'kind'], res[
'id']))
114 except HttpError
as http_error:
115 if http_error.resp.status == 409:
116 print(
'Warning: Table %s already exists' % table_id)
118 print(
'Error in creating table: %s. Err: %s' % (table_id,
124 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
129 'fields': fields_schema
132 'datasetId': dataset_id,
133 'projectId': project_id,
139 table_req = big_query.tables().patch(
140 projectId=project_id,
141 datasetId=dataset_id,
144 res = table_req.execute(num_retries=NUM_RETRIES)
145 print(
'Successfully patched %s "%s"' % (res[
'kind'], res[
'id']))
146 except HttpError
as http_error:
147 print(
'Error in creating table: %s. Err: %s' % (table_id, http_error))
152 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
154 body = {
'rows': rows_list}
156 insert_req = big_query.tabledata().insertAll(
157 projectId=project_id,
158 datasetId=dataset_id,
161 res = insert_req.execute(num_retries=NUM_RETRIES)
162 if res.get(
'insertErrors',
None):
163 print(
'Error inserting rows! Response: %s' % res)
165 except HttpError
as http_error:
166 print(
'Error inserting rows to the table %s' % table_id)
173 query_data = {
'query': query,
'timeoutMs': timeout}
176 query_job = big_query.jobs().query(
177 projectId=project_id,
178 body=query_data).execute(num_retries=NUM_RETRIES)
179 except HttpError
as http_error:
180 print(
'Query execute job failed with error: %s' % http_error)
181 print(http_error.content)
187 """row_values_dict is a dictionary of column name and column value.
189 return {
'insertId': unique_row_id,
'json': row_values_dict}