15 from __future__
import print_function
21 from apiclient
import discovery
22 from apiclient.errors
import HttpError
24 from oauth2client.client
import GoogleCredentials
27 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
32 """Authenticates with cloud platform and gets a BiqQuery service object
34 creds = GoogleCredentials.get_application_default()
35 return discovery.build(
'bigquery',
38 cache_discovery=
False)
45 'projectId': project_id,
46 'datasetId': dataset_id
51 dataset_req = biq_query.datasets().
insert(projectId=project_id,
53 dataset_req.execute(num_retries=NUM_RETRIES)
54 except HttpError
as http_error:
55 if http_error.resp.status == 409:
56 print(
'Warning: The dataset %s already exists' % dataset_id)
59 print(
'Error in creating dataset: %s. Err: %s' %
60 (dataset_id, http_error))
65 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
70 'description': field_description
71 }
for (field_name, field_type, field_description)
in table_schema]
72 return create_table2(big_query, project_id, dataset_id, table_id, fields,
83 expiration_ms=_EXPIRATION_MS):
84 """Creates a partitioned table. By default, a date-paritioned table is created with
85 each partition lasting 30 days after it was last modified.
90 'description': field_description
91 }
for (field_name, field_type, field_description)
in table_schema]
92 return create_table2(big_query, project_id, dataset_id, table_id, fields,
93 description, partition_type, expiration_ms)
107 'description': description,
109 'fields': fields_schema
112 'datasetId': dataset_id,
113 'projectId': project_id,
118 if partition_type
and expiration_ms:
119 body[
"timePartitioning"] = {
120 "type": partition_type,
121 "expirationMs": expiration_ms
125 table_req = big_query.tables().
insert(projectId=project_id,
126 datasetId=dataset_id,
128 res = table_req.execute(num_retries=NUM_RETRIES)
129 print(
'Successfully created %s "%s"' % (res[
'kind'], res[
'id']))
130 except HttpError
as http_error:
131 if http_error.resp.status == 409:
132 print(
'Warning: Table %s already exists' % table_id)
134 print(
'Error in creating table: %s. Err: %s' %
135 (table_id, http_error))
140 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
145 'fields': fields_schema
148 'datasetId': dataset_id,
149 'projectId': project_id,
155 table_req = big_query.tables().patch(projectId=project_id,
156 datasetId=dataset_id,
159 res = table_req.execute(num_retries=NUM_RETRIES)
160 print(
'Successfully patched %s "%s"' % (res[
'kind'], res[
'id']))
161 except HttpError
as http_error:
162 print(
'Error in creating table: %s. Err: %s' % (table_id, http_error))
167 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
169 body = {
'rows': rows_list}
171 insert_req = big_query.tabledata().insertAll(projectId=project_id,
172 datasetId=dataset_id,
175 res = insert_req.execute(num_retries=NUM_RETRIES)
176 if res.get(
'insertErrors',
None):
177 print(
'Error inserting rows! Response: %s' % res)
179 except HttpError
as http_error:
180 print(
'Error inserting rows to the table %s' % table_id)
181 print(
'Error message: %s' % http_error)
188 query_data = {
'query': query,
'timeoutMs': timeout}
191 query_job = big_query.jobs().
query(
192 projectId=project_id,
193 body=query_data).execute(num_retries=NUM_RETRIES)
194 except HttpError
as http_error:
195 print(
'Query execute job failed with error: %s' % http_error)
196 print(http_error.content)
202 """row_values_dict is a dictionary of column name and column value.
204 return {
'insertId': unique_row_id,
'json': row_values_dict}