Unverified Commit 4f20cc9f authored by Jacob Scherffenberg's avatar Jacob Scherffenberg Committed by GitHub

chore: Bigquery example preview client (#935)

* Bigquery example preview client
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Fix bad copy paste of imports
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Clean up junk comment in config
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Refactor way to config custom implementation  and fix mypy
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Optional install of dependencies for bigquery preview client
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* install bigquery deps for unit testing on PR
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Make an all_deps option like in databuilder
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>

* Just use all option for unit tests
Signed-off-by: 's avatarJacobSMoller <scherffenberg91@gmail.com>
parent 0c0d08fc
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from http import HTTPStatus
from typing import Dict, List
from amundsen_application.base.base_preview_client import BasePreviewClient
from amundsen_application.models.preview_data import (
ColumnItem,
PreviewData,
PreviewDataSchema,
)
from flask import Response, make_response, jsonify
from google.cloud import bigquery
class BaseBigqueryPreviewClient(BasePreviewClient):
"""
Returns a Response object, where the response data represents a json object
with the preview data accessible on 'preview_data' key. The preview data should
match amundsen_application.models.preview_data.PreviewDataSchema
"""
def __init__(self, bq_client: bigquery.Client, preview_limit: int = 5, previewable_projects: List = None) -> None:
# Client passed from custom implementation. See example implementation.
self.bq_client = bq_client
self.preview_limit = preview_limit
# List of projects that are approved for whitelisting. None(Default) approves all google projects.
self.previewable_projects = previewable_projects
def _bq_list_rows(
self, gcp_project_id: str, table_project_name: str, table_name: str
) -> PreviewData:
"""
Returns PreviewData from bigquery list rows api.
"""
pass # pragma: no cover
def _column_item_from_bq_schema(self, schemafield: bigquery.SchemaField, key: str = None) -> List:
"""
Recursively build ColumnItems from the bigquery schema
"""
all_fields = []
if schemafield.field_type != "RECORD":
name = key + "." + schemafield.name if key else schemafield.name
return [ColumnItem(name, schemafield.field_type)]
for field in schemafield.fields:
if key:
name = key + "." + schemafield.name
else:
name = schemafield.name
all_fields.extend(self._column_item_from_bq_schema(field, name))
return all_fields
def get_preview_data(self, params: Dict, optionalHeaders: Dict = None) -> Response:
if self.previewable_projects and params["cluster"] not in self.previewable_projects:
return make_response(jsonify({"preview_data": {}}), HTTPStatus.FORBIDDEN)
preview_data = self._bq_list_rows(
params["cluster"],
params["schema"],
params["tableName"],
)
data = PreviewDataSchema().dump(preview_data)[0]
errors = PreviewDataSchema().load(data)[1]
payload = jsonify({"preview_data": data})
if not errors:
payload = jsonify({"preview_data": data})
return make_response(payload, HTTPStatus.OK)
return make_response(
jsonify({"preview_data": {}}), HTTPStatus.INTERNAL_SERVER_ERROR
)
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from amundsen_application.base.base_bigquery_preview_client import BaseBigqueryPreviewClient
from amundsen_application.models.preview_data import (
PreviewData,
)
from google.cloud import bigquery
from flatten_dict import flatten
class BigqueryPreviewClient(BaseBigqueryPreviewClient):
"""
Returns a Response object, where the response data represents a json object
with the preview data accessible on 'preview_data' key. The preview data should
match amundsen_application.models.preview_data.PreviewDataSchema
"""
def __init__(self) -> None:
# Requires access to a service account eg.
# GOOGLE_APPLICATION_CREDENTIALS=path/serviceaccount.json or a mounted service kubernetes service account.
super().__init__(bq_client=bigquery.Client("your project here"))
def _bq_list_rows(
self, gcp_project_id: str, table_project_name: str, table_name: str
) -> PreviewData:
"""
Returns PreviewData from bigquery list rows api.
"""
table_id = f"{gcp_project_id}.{table_project_name}.{table_name}"
rows = self.bq_client.list_rows(table_id, max_results=self.preview_limit)
# Make flat key ColumnItems from table schema.
columns = []
for field in rows.schema:
extend_with = self._column_item_from_bq_schema(field)
columns.extend(extend_with)
# Flatten rows and set missing empty keys to None, to avoid errors with undefined values
# in frontend
column_data = []
for row in rows:
flat_row = flatten(dict(row), reducer="dot")
for key in columns:
if key.column_name not in flat_row:
flat_row[key.column_name] = None
column_data.append(flat_row)
return PreviewData(columns, column_data)
......@@ -92,3 +92,4 @@ retrying>=1.3.3,<2.0
# License: Apache 2.0
# Upstream url: https://pypi.org/project/dataclasses/
dataclasses==0.8; python_version < '3.7'
......@@ -39,6 +39,10 @@ with open(requirements_path) as requirements_file:
__version__ = '3.5.0'
oicd = ['flaskoidc==0.1.1']
pyarrrow = ['pyarrow==3.0.0']
bigquery_preview = ['google-cloud-bigquery>=2.8.0,<3.0.0', 'flatten-dict==0.3.0']
all_deps = requirements + oicd + pyarrrow + bigquery_preview
setup(
name='amundsen-frontend',
......@@ -53,8 +57,10 @@ setup(
setup_requires=['cython >= 0.29'],
install_requires=requirements,
extras_require={
'oidc': ['flaskoidc==0.1.1'],
'pyarrow': ['pyarrow==3.0.0'],
'oidc': oicd,
'pyarrow': pyarrrow,
'bigquery_preview': bigquery_preview,
'all': all_deps,
},
python_requires=">=3.6",
entry_points="""
......
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import flask
import json
import unittest
from http import HTTPStatus
from amundsen_application.models.preview_data import (
PreviewData,
)
from amundsen_application.base.base_bigquery_preview_client import BaseBigqueryPreviewClient
from google.cloud.bigquery import SchemaField, Row
from flatten_dict import flatten
app = flask.Flask(__name__)
app.config.from_object('amundsen_application.config.LocalConfig')
test_schema = [
SchemaField('test_row', 'STRING', 'NULLABLE', 'Some data', (), None),
SchemaField('test_row1', 'INTEGER', 'NULLABLE', 'Some other data', (), None),
SchemaField('enrichments', 'RECORD', 'NULLABLE', None, (
SchemaField('geo_ip', 'RECORD', 'NULLABLE', None, (
SchemaField('country_code', 'STRING', 'NULLABLE', None, (), None),
SchemaField('region', 'STRING', 'NULLABLE', None, (), None),
SchemaField('city', 'STRING', 'NULLABLE', None, (), None),
SchemaField('coordinates', 'RECORD', 'NULLABLE', None, (
SchemaField('latitude', 'FLOAT', 'REQUIRED', None, (), None),
SchemaField('longitude', 'FLOAT', 'REQUIRED', None, (), None)), None)), None),), None),
]
test_rows = [
Row(('testdata', 7357,
{'geo_ip': {'country_code': 'US', 'region': '', 'city': '', }}), {
'test_row': 0, 'test_row1': 1, 'enrichments': 2}),
Row(('testdata_1', 7357,
{'geo_ip': {'country_code': 'US', 'region': '', 'city': '', }}), {
'test_row': 0, 'test_row1': 1, 'enrichments': 2}),
]
expected_results = {'columns': [{'column_name': 'test_row', 'column_type': 'STRING'},
{'column_name': 'test_row1', 'column_type': 'INTEGER'},
{'column_name': 'enrichments.geo_ip.country_code',
'column_type': 'STRING'},
{'column_name': 'enrichments.geo_ip.region',
'column_type': 'STRING'},
{'column_name': 'enrichments.geo_ip.city',
'column_type': 'STRING'},
{'column_name': 'enrichments.geo_ip.coordinates.latitude',
'column_type': 'FLOAT'},
{'column_name': 'enrichments.geo_ip.coordinates.longitude',
'column_type': 'FLOAT'}],
'data': [{'enrichments.geo_ip.city': '',
'enrichments.geo_ip.coordinates.latitude': None,
'enrichments.geo_ip.coordinates.longitude': None,
'enrichments.geo_ip.country_code': 'US',
'enrichments.geo_ip.region': '',
'test_row': 'testdata',
'test_row1': 7357},
{'enrichments.geo_ip.city': '',
'enrichments.geo_ip.coordinates.latitude': None,
'enrichments.geo_ip.coordinates.longitude': None,
'enrichments.geo_ip.country_code': 'US',
'enrichments.geo_ip.region': '',
'test_row': 'testdata_1',
'test_row1': 7357}],
'error_text': ''}
class BigQueryMockClient():
pass
class MockClient(BaseBigqueryPreviewClient):
def __init__(self) -> None:
super().__init__(bq_client=BigQueryMockClient)
def _bq_list_rows(
self, gcp_project_id: str, table_project_name: str, table_name: str
) -> PreviewData:
columns = []
for field in test_schema:
extend_with = self._column_item_from_bq_schema(field)
columns.extend(extend_with)
column_data = []
for row in test_rows:
flat_row = flatten(dict(row), reducer="dot")
for key in columns:
if key.column_name not in flat_row:
flat_row[key.column_name] = None
column_data.append(flat_row)
return PreviewData(columns, column_data)
class MockClientNonPreviewableDataset(BaseBigqueryPreviewClient):
def __init__(self) -> None:
super().__init__(bq_client=BigQueryMockClient, previewable_projects=['test-project-y'])
def _bq_list_rows(
self, gcp_project_id: str, table_project_name: str, table_name: str
) -> PreviewData:
return PreviewData()
class BigqueryPreviewClientTest(unittest.TestCase):
def test_bigquery_get_preview_data_correct_data_shape(self) -> None:
"""
Test _bq_list_rows(), which should result in
a response with 200 and previeable data.
:return:
"""
with app.test_request_context():
response = MockClient().get_preview_data(
params={"cluster": "test-project-x", "schema": "test-schema", "tableName": "foo"})
self.assertEqual(json.loads(response.data).get('preview_data'), expected_results)
self.assertEqual(response.status_code, HTTPStatus.OK)
def test_bigquery_get_preview_data_non_previable_dataset(self) -> None:
"""
Test _bq_list_rows(), which should result in
a response with 403 due to trying to preview a dataset that is not
approved for previewing.
:return:
"""
with app.test_request_context():
response = MockClientNonPreviewableDataset().get_preview_data(
params={"cluster": "test-project-x", "schema": "test-schema", "tableName": "foo"})
self.assertEqual(json.loads(response.data).get('preview_data'), {})
self.assertEqual(response.status_code, HTTPStatus.FORBIDDEN)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment