tools/gcp/utils/big_query_utils.py
Go to the documentation of this file.
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 from __future__ import print_function
16 
17 import argparse
18 import json
19 import uuid
20 
21 from apiclient import discovery
22 from apiclient.errors import HttpError
23 import httplib2
24 from oauth2client.client import GoogleCredentials
25 
26 # 30 days in milliseconds
27 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
28 NUM_RETRIES = 3
29 
30 
32  """Authenticates with cloud platform and gets a BiqQuery service object
33  """
34  creds = GoogleCredentials.get_application_default()
35  return discovery.build('bigquery',
36  'v2',
37  credentials=creds,
38  cache_discovery=False)
39 
40 
41 def create_dataset(biq_query, project_id, dataset_id):
42  is_success = True
43  body = {
44  'datasetReference': {
45  'projectId': project_id,
46  'datasetId': dataset_id
47  }
48  }
49 
50  try:
51  dataset_req = biq_query.datasets().insert(projectId=project_id,
52  body=body)
53  dataset_req.execute(num_retries=NUM_RETRIES)
54  except HttpError as http_error:
55  if http_error.resp.status == 409:
56  print('Warning: The dataset %s already exists' % dataset_id)
57  else:
58  # Note: For more debugging info, print "http_error.content"
59  print('Error in creating dataset: %s. Err: %s' %
60  (dataset_id, http_error))
61  is_success = False
62  return is_success
63 
64 
65 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
66  description):
67  fields = [{
68  'name': field_name,
69  'type': field_type,
70  'description': field_description
71  } for (field_name, field_type, field_description) in table_schema]
72  return create_table2(big_query, project_id, dataset_id, table_id, fields,
73  description)
74 
75 
77  project_id,
78  dataset_id,
79  table_id,
80  table_schema,
81  description,
82  partition_type='DAY',
83  expiration_ms=_EXPIRATION_MS):
84  """Creates a partitioned table. By default, a date-paritioned table is created with
85  each partition lasting 30 days after it was last modified.
86  """
87  fields = [{
88  'name': field_name,
89  'type': field_type,
90  'description': field_description
91  } for (field_name, field_type, field_description) in table_schema]
92  return create_table2(big_query, project_id, dataset_id, table_id, fields,
93  description, partition_type, expiration_ms)
94 
95 
96 def create_table2(big_query,
97  project_id,
98  dataset_id,
99  table_id,
100  fields_schema,
101  description,
102  partition_type=None,
103  expiration_ms=None):
104  is_success = True
105 
106  body = {
107  'description': description,
108  'schema': {
109  'fields': fields_schema
110  },
111  'tableReference': {
112  'datasetId': dataset_id,
113  'projectId': project_id,
114  'tableId': table_id
115  }
116  }
117 
118  if partition_type and expiration_ms:
119  body["timePartitioning"] = {
120  "type": partition_type,
121  "expirationMs": expiration_ms
122  }
123 
124  try:
125  table_req = big_query.tables().insert(projectId=project_id,
126  datasetId=dataset_id,
127  body=body)
128  res = table_req.execute(num_retries=NUM_RETRIES)
129  print('Successfully created %s "%s"' % (res['kind'], res['id']))
130  except HttpError as http_error:
131  if http_error.resp.status == 409:
132  print('Warning: Table %s already exists' % table_id)
133  else:
134  print('Error in creating table: %s. Err: %s' %
135  (table_id, http_error))
136  is_success = False
137  return is_success
138 
139 
140 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
141  is_success = True
142 
143  body = {
144  'schema': {
145  'fields': fields_schema
146  },
147  'tableReference': {
148  'datasetId': dataset_id,
149  'projectId': project_id,
150  'tableId': table_id
151  }
152  }
153 
154  try:
155  table_req = big_query.tables().patch(projectId=project_id,
156  datasetId=dataset_id,
157  tableId=table_id,
158  body=body)
159  res = table_req.execute(num_retries=NUM_RETRIES)
160  print('Successfully patched %s "%s"' % (res['kind'], res['id']))
161  except HttpError as http_error:
162  print('Error in creating table: %s. Err: %s' % (table_id, http_error))
163  is_success = False
164  return is_success
165 
166 
167 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
168  is_success = True
169  body = {'rows': rows_list}
170  try:
171  insert_req = big_query.tabledata().insertAll(projectId=project_id,
172  datasetId=dataset_id,
173  tableId=table_id,
174  body=body)
175  res = insert_req.execute(num_retries=NUM_RETRIES)
176  if res.get('insertErrors', None):
177  print('Error inserting rows! Response: %s' % res)
178  is_success = False
179  except HttpError as http_error:
180  print('Error inserting rows to the table %s' % table_id)
181  print('Error message: %s' % http_error)
182  is_success = False
183 
184  return is_success
185 
186 
187 def sync_query_job(big_query, project_id, query, timeout=5000):
188  query_data = {'query': query, 'timeoutMs': timeout}
189  query_job = None
190  try:
191  query_job = big_query.jobs().query(
192  projectId=project_id,
193  body=query_data).execute(num_retries=NUM_RETRIES)
194  except HttpError as http_error:
195  print('Query execute job failed with error: %s' % http_error)
196  print(http_error.content)
197  return query_job
198 
199 
200  # List of (column name, column type, description) tuples
201 def make_row(unique_row_id, row_values_dict):
202  """row_values_dict is a dictionary of column name and column value.
203  """
204  return {'insertId': unique_row_id, 'json': row_values_dict}
query
Definition: ares_private.h:198
big_query_utils.create_table2
def create_table2(big_query, project_id, dataset_id, table_id, fields_schema, description, partition_type=None, expiration_ms=None)
Definition: tools/gcp/utils/big_query_utils.py:96
big_query_utils.create_big_query
def create_big_query()
Definition: tools/gcp/utils/big_query_utils.py:31
big_query_utils.patch_table
def patch_table(big_query, project_id, dataset_id, table_id, fields_schema)
Definition: tools/gcp/utils/big_query_utils.py:140
big_query_utils.create_dataset
def create_dataset(biq_query, project_id, dataset_id)
Definition: tools/gcp/utils/big_query_utils.py:41
insert
static void insert(upb_table *t, lookupkey_t key, upb_tabkey tabkey, upb_value val, uint32_t hash, hashfunc_t *hashfunc, eqlfunc_t *eql)
Definition: bloaty/third_party/protobuf/php/ext/google/protobuf/upb.c:1431
big_query_utils.make_row
def make_row(unique_row_id, row_values_dict)
Definition: tools/gcp/utils/big_query_utils.py:201
big_query_utils.create_table
def create_table(big_query, project_id, dataset_id, table_id, table_schema, description)
Definition: tools/gcp/utils/big_query_utils.py:65
big_query_utils.sync_query_job
def sync_query_job(big_query, project_id, query, timeout=5000)
Definition: tools/gcp/utils/big_query_utils.py:187
big_query_utils.insert_rows
def insert_rows(big_query, project_id, dataset_id, table_id, rows_list)
Definition: tools/gcp/utils/big_query_utils.py:167
big_query_utils.create_partitioned_table
def create_partitioned_table(big_query, project_id, dataset_id, table_id, table_schema, description, partition_type='DAY', expiration_ms=_EXPIRATION_MS)
Definition: tools/gcp/utils/big_query_utils.py:76


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:37