Commit c99d9dfd authored by Louis Simoneau's avatar Louis Simoneau Committed by Tao Feng

Implement BigQuery Watermark Extractor for extracting partition ranges (#171)

* Initial working version of Watermark extractor with some tests

* Implement handling for sharded tables (with YYYYMMDD suffix)

* Extract BaseBigQueryExtractor with shared code
parent 9b8e9e61
import json
import logging
from collections import namedtuple
import google.oauth2.service_account
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import List, Any # noqa: F401
from databuilder.extractor.base_extractor import Extractor
DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class BaseBigQueryExtractor(Extractor):
PROJECT_ID_KEY = 'project_id'
KEY_PATH_KEY = 'key_path'
# sometimes we don't have a key path, but only have an variable
CRED_KEY = 'project_cred'
PAGE_SIZE_KEY = 'page_size'
FILTER_KEY = 'filter'
_DEFAULT_SCOPES = ['https://www.googleapis.com/auth/bigquery.readonly', ]
DEFAULT_PAGE_SIZE = 300
NUM_RETRIES = 3
DATE_LENGTH = 8
def init(self, conf):
# type: (ConfigTree) -> None
# should use key_path, or cred_key if the former doesn't exist
self.key_path = conf.get_string(BaseBigQueryExtractor.KEY_PATH_KEY, None)
self.cred_key = conf.get_string(BaseBigQueryExtractor.CRED_KEY, None)
self.project_id = conf.get_string(BaseBigQueryExtractor.PROJECT_ID_KEY)
self.pagesize = conf.get_int(
BaseBigQueryExtractor.PAGE_SIZE_KEY,
BaseBigQueryExtractor.DEFAULT_PAGE_SIZE)
self.filter = conf.get_string(BaseBigQueryExtractor.FILTER_KEY, '')
if self.key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
self.key_path, scopes=BaseBigQueryExtractor._DEFAULT_SCOPES))
else:
if self.cred_key:
service_account_info = json.loads(self.cred_key)
credentials = (
google.oauth2.service_account.Credentials.from_service_account_info(
service_account_info, scopes=BaseBigQueryExtractor._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=BaseBigQueryExtractor._DEFAULT_SCOPES)
http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.bigquery_service = build('bigquery', 'v2', http=authed_http, cache_discovery=False)
self.logging_service = build('logging', 'v2', http=authed_http, cache_discovery=False)
self.iter = iter(self._iterate_over_tables())
def extract(self):
# type: () -> Any
try:
return next(self.iter)
except StopIteration:
return None
def _is_sharded_table(self, table_id):
suffix = table_id[-BaseBigQueryExtractor.DATE_LENGTH:]
return suffix.isdigit()
def _iterate_over_tables(self):
# type: () -> Any
for dataset in self._retrieve_datasets():
for entry in self._retrieve_tables(dataset):
yield(entry)
def _retrieve_datasets(self):
# type: () -> List[DatasetRef]
datasets = []
for page in self._page_dataset_list_results():
if 'datasets' not in page:
continue
for dataset in page['datasets']:
dataset_ref = dataset['datasetReference']
ref = DatasetRef(**dataset_ref)
datasets.append(ref)
return datasets
def _page_dataset_list_results(self):
# type: () -> Any
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=False, # Do not return hidden datasets
filter=self.filter,
maxResults=self.pagesize).execute(
num_retries=BaseBigQueryExtractor.NUM_RETRIES)
while response:
yield response
if 'nextPageToken' in response:
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=True,
filter=self.filter,
pageToken=response['nextPageToken']).execute(
num_retries=BaseBigQueryExtractor.NUM_RETRIES)
else:
response = None
def _page_table_list_results(self, dataset):
# type: (DatasetRef) -> Any
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxResults=self.pagesize).execute(
num_retries=BaseBigQueryExtractor.NUM_RETRIES)
while response:
yield response
if 'nextPageToken' in response:
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxResults=self.pagesize,
pageToken=response['nextPageToken']).execute(
num_retries=BaseBigQueryExtractor.NUM_RETRIES)
else:
response = None
def get_scope(self):
# type: () -> str
return 'extractor.bigquery_table_metadata'
import json
import logging
from collections import namedtuple
import google.oauth2.service_account
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import List, Any # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
......@@ -19,7 +14,7 @@ TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
LOGGER = logging.getLogger(__name__)
class BigQueryMetadataExtractor(Extractor):
class BigQueryMetadataExtractor(BaseBigQueryExtractor):
""" A metadata extractor for bigquery tables, taking the schema metadata
from the google cloud bigquery API's. This extractor goes through all visible
......@@ -31,97 +26,11 @@ class BigQueryMetadataExtractor(Extractor):
column name.
"""
PROJECT_ID_KEY = 'project_id'
KEY_PATH_KEY = 'key_path'
# sometimes we don't have a key path, but only have an variable
CRED_KEY = 'project_cred'
PAGE_SIZE_KEY = 'page_size'
FILTER_KEY = 'filter'
_DEFAULT_SCOPES = ['https://www.googleapis.com/auth/bigquery.readonly', ]
DEFAULT_PAGE_SIZE = 300
NUM_RETRIES = 3
DATE_LENGTH = 8
def init(self, conf):
# type: (ConfigTree) -> None
# should use key_path, or cred_key if the former doesn't exist
self.key_path = conf.get_string(BigQueryMetadataExtractor.KEY_PATH_KEY, None)
self.cred_key = conf.get_string(BigQueryMetadataExtractor.CRED_KEY, None)
self.project_id = conf.get_string(BigQueryMetadataExtractor.PROJECT_ID_KEY)
self.pagesize = conf.get_int(
BigQueryMetadataExtractor.PAGE_SIZE_KEY,
BigQueryMetadataExtractor.DEFAULT_PAGE_SIZE)
self.filter = conf.get_string(BigQueryMetadataExtractor.FILTER_KEY, '')
if self.key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
self.key_path, scopes=BigQueryMetadataExtractor._DEFAULT_SCOPES))
else:
if self.cred_key:
service_account_info = json.loads(self.cred_key)
credentials = (
google.oauth2.service_account.Credentials.from_service_account_info(
service_account_info, scopes=BigQueryMetadataExtractor._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=BigQueryMetadataExtractor._DEFAULT_SCOPES)
http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.bigquery_service = build('bigquery', 'v2', http=authed_http, cache_discovery=False)
self.datasets = self._retrieve_datasets()
self.iter = iter(self._iterate_over_tables())
BaseBigQueryExtractor.init(self, conf)
self.grouped_tables = set([])
def extract(self):
# type: () -> Any
try:
return next(self.iter)
except StopIteration:
return None
def _iterate_over_tables(self):
# type: () -> Any
for dataset in self.datasets:
for entry in self._retrieve_tables(dataset):
yield(entry)
def _retrieve_datasets(self):
# type: () -> List[DatasetRef]
datasets = []
for page in self._page_dataset_list_results():
if 'datasets' not in page:
continue
for dataset in page['datasets']:
dataset_ref = dataset['datasetReference']
ref = DatasetRef(**dataset_ref)
datasets.append(ref)
return datasets
def _page_dataset_list_results(self):
# type: () -> Any
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=False, # Do not return hidden datasets
filter=self.filter,
maxResults=self.pagesize).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
while response:
yield response
if 'nextPageToken' in response:
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=True,
filter=self.filter,
pageToken=response['nextPageToken']).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
else:
response = None
def _retrieve_tables(self, dataset):
# type: () -> Any
for page in self._page_table_list_results(dataset):
......@@ -130,14 +39,12 @@ class BigQueryMetadataExtractor(Extractor):
for table in page['tables']:
tableRef = table['tableReference']
table_id = tableRef['tableId']
# BigQuery tables that have 8 digits as last characters are
# considered date range tables and are grouped together in the UI.
# ( e.g. ga_sessions_20190101, ga_sessions_20190102, etc. )
last_eight_chars = table_id[-BigQueryMetadataExtractor.DATE_LENGTH:]
if last_eight_chars.isdigit():
if self._is_sharded_table(table_id):
# If the last eight characters are digits, we assume the table is of a table date range type
# and then we only need one schema definition
table_prefix = table_id[:-BigQueryMetadataExtractor.DATE_LENGTH]
......@@ -201,27 +108,6 @@ class BigQueryMetadataExtractor(Extractor):
cols.append(col)
return total_cols + 1
def _page_table_list_results(self, dataset):
# type: (DatasetRef) -> Any
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxResults=self.pagesize).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
while response:
yield response
if 'nextPageToken' in response:
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
maxResults=self.pagesize,
pageToken=response['nextPageToken']).execute(
num_retries=BigQueryMetadataExtractor.NUM_RETRIES)
else:
response = None
def get_scope(self):
# type: () -> str
return 'extractor.bigquery_table_metadata'
from collections import namedtuple
from datetime import date, timedelta
import json
import logging
import re
from time import sleep
import google.oauth2.service_account
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, Optional # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor
TableColumnUsageTuple = namedtuple('TableColumnUsageTuple', ['database', 'cluster', 'schema',
'table', 'column', 'email'])
......@@ -20,51 +15,22 @@ TableColumnUsageTuple = namedtuple('TableColumnUsageTuple', ['database', 'cluste
LOGGER = logging.getLogger(__name__)
class BigQueryTableUsageExtractor(Extractor):
class BigQueryTableUsageExtractor(BaseBigQueryExtractor):
"""
An aggregate extractor for bigquery table usage. This class takes the data from
the stackdriver logging API by filtering on timestamp, bigquery_resource and looking
for referencedTables in the response.
"""
TIMESTAMP_KEY = 'timestamp'
PROJECT_ID_KEY = 'project_id'
DEFAULT_PAGE_SIZE = 300
PAGE_SIZE_KEY = 'page_size'
KEY_PATH_KEY = 'key_path'
# sometimes we don't have a key path, but only have an variable
CRED_KEY = 'project_cred'
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
EMAIL_PATTERN = 'email_pattern'
NUM_RETRIES = 3
DELAY_TIME = 10
def init(self, conf):
# type: (ConfigTree) -> None
self.key_path = conf.get_string(BigQueryTableUsageExtractor.KEY_PATH_KEY, None)
self.cred_key = conf.get_string(BigQueryTableUsageExtractor.CRED_KEY, None)
if self.key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
self.key_path, scopes=BigQueryTableUsageExtractor._DEFAULT_SCOPES))
elif self.cred_key:
service_account_info = json.loads(self.cred_key)
credentials = (
google.oauth2.service_account.Credentials.from_service_account_info(
service_account_info, scopes=BigQueryTableUsageExtractor._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=BigQueryTableUsageExtractor._DEFAULT_SCOPES)
http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.logging_service = build('logging', 'v2', http=authed_http, cache_discovery=False)
BaseBigQueryExtractor.init(self, conf)
self.timestamp = conf.get_string(
BigQueryTableUsageExtractor.TIMESTAMP_KEY,
(date.today() - timedelta(days=1)).strftime('%Y-%m-%dT00:00:00Z'))
self.projectid = conf.get_string(BigQueryTableUsageExtractor.PROJECT_ID_KEY)
self.pagesize = conf.get_int(
BigQueryTableUsageExtractor.PAGE_SIZE_KEY,
BigQueryTableUsageExtractor.DEFAULT_PAGE_SIZE)
self.email_pattern = conf.get_string(BigQueryTableUsageExtractor.EMAIL_PATTERN, None)
......@@ -134,7 +100,7 @@ class BigQueryTableUsageExtractor(Extractor):
"""
body = {
'resourceNames': [
'projects/{projectid}'.format(projectid=self.projectid)
'projects/{project_id}'.format(project_id=self.project_id)
],
'pageSize': self.pagesize,
'filter': 'resource.type="bigquery_resource" AND '
......
from collections import namedtuple
import logging
import datetime
import textwrap
from pyhocon import ConfigTree # noqa: F401
from typing import List, Any # noqa: F401
from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor
from databuilder.models.watermark import Watermark
DatasetRef = namedtuple('DatasetRef', ['datasetId', 'projectId'])
TableKey = namedtuple('TableKey', ['schema_name', 'table_name'])
PartitionInfo = namedtuple('PartitionInfo', ['partition_id', 'epoch_created'])
LOGGER = logging.getLogger(__name__)
class BigQueryWatermarkExtractor(BaseBigQueryExtractor):
def init(self, conf):
# type: (ConfigTree) -> None
BaseBigQueryExtractor.init(self, conf)
self.iter = iter(self._iterate_over_tables())
def get_scope(self):
# type: () -> str
return 'extractor.bigquery_watermarks'
def _retrieve_tables(self, dataset):
# type: () -> Any
sharded_table_watermarks = {}
for page in self._page_table_list_results(dataset):
if 'tables' not in page:
continue
for table in page['tables']:
tableRef = table['tableReference']
table_id = tableRef['tableId']
# BigQuery tables that have 8 digits as last characters are
# considered date range tables and are grouped together in the UI.
# ( e.g. ga_sessions_20190101, ga_sessions_20190102, etc. )
# We use these suffixes to determine high and low watermarks
if self._is_sharded_table(table_id):
suffix = table_id[-BigQueryWatermarkExtractor.DATE_LENGTH:]
prefix = table_id[:-BigQueryWatermarkExtractor.DATE_LENGTH]
if prefix in sharded_table_watermarks:
sharded_table_watermarks[prefix]['low'] = min(sharded_table_watermarks[prefix]['low'], suffix)
sharded_table_watermarks[prefix]['high'] = max(sharded_table_watermarks[prefix]['high'], suffix)
else:
sharded_table_watermarks[prefix] = {'high': suffix, 'low': suffix, 'table': table}
else:
partitions = self._get_partitions(table, tableRef)
if not partitions:
continue
low, high = self._get_partition_watermarks(table, tableRef, partitions)
yield low
yield high
for prefix, td in sharded_table_watermarks.items():
table = td['table']
tableRef = table['tableReference']
yield Watermark(
datetime.datetime.fromtimestamp(float(table['creationTime']) / 1000).strftime('%Y-%m-%d %H:%M:%S'),
'bigquery',
tableRef['datasetId'],
prefix,
'__table__={partition_id}'.format(partition_id=td['low']),
part_type="low_watermark",
cluster=tableRef['projectId']
)
yield Watermark(
datetime.datetime.fromtimestamp(float(table['creationTime']) / 1000).strftime('%Y-%m-%d %H:%M:%S'),
'bigquery',
tableRef['datasetId'],
prefix,
'__table__={partition_id}'.format(partition_id=td['high']),
part_type="high_watermark",
cluster=tableRef['projectId']
)
def _get_partitions(self, table, tableRef):
if 'timePartitioning' not in table:
return
query = textwrap.dedent("""
SELECT partition_id,
TIMESTAMP(creation_time/1000) AS creation_time
FROM [{project}:{dataset}.{table}$__PARTITIONS_SUMMARY__]
WHERE partition_id <> '__UNPARTITIONED__'
AND partition_id <> '__NULL__'
""")
body = {
'query': query.format(
project=tableRef['projectId'],
dataset=tableRef['datasetId'],
table=tableRef['tableId']),
'useLegacySql': True
}
result = self.bigquery_service.jobs().query(projectId='rea-gcp-dataservices-dev', body=body).execute()
if 'rows' not in result:
return
return [PartitionInfo(row['f'][0]['v'], row['f'][1]['v']) for row in result['rows']]
def _get_partition_watermarks(self, table, tableRef, partitions):
if 'field' in table['timePartitioning']:
field = table['timePartitioning']['field']
else:
field = '_PARTITIONTIME'
low = min(partitions, key=lambda t: t.partition_id)
low_wm = Watermark(
datetime.datetime.fromtimestamp(float(low.epoch_created)).strftime('%Y-%m-%d %H:%M:%S'),
'bigquery',
tableRef['datasetId'],
tableRef['tableId'],
'{field}={partition_id}'.format(field=field, partition_id=low.partition_id),
part_type="low_watermark",
cluster=tableRef['projectId']
)
high = max(partitions, key=lambda t: t.partition_id)
high_wm = Watermark(
datetime.datetime.fromtimestamp(float(high.epoch_created)).strftime('%Y-%m-%d %H:%M:%S'),
'bigquery',
tableRef['datasetId'],
tableRef['tableId'],
'{field}={partition_id}'.format(field=field, partition_id=high.partition_id),
part_type="high_watermark",
cluster=tableRef['projectId']
)
return low_wm, high_wm
......@@ -132,7 +132,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
'your-project-here'}
self.conf = ConfigFactory.from_dict(config_dict)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_can_handle_datasets(self, mock_build):
mock_build.return_value = MockBigQueryClient(NO_DATASETS, None, None)
extractor = BigQueryMetadataExtractor()
......@@ -141,7 +141,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_empty_dataset(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, NO_TABLES, None)
extractor = BigQueryMetadataExtractor()
......@@ -150,7 +150,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_accepts_dataset_filter_by_label(self, mock_build):
config_dict = {
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.PROJECT_ID_KEY):
......@@ -167,7 +167,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsInstance(result, TableMetadata)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_without_columns(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, NO_COLS)
extractor = BigQueryMetadataExtractor()
......@@ -183,7 +183,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(result.columns, [])
self.assertEquals(result.is_view, False)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_view(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_VIEW, VIEW_DATA)
extractor = BigQueryMetadataExtractor()
......@@ -193,7 +193,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertIsInstance(result, TableMetadata)
self.assertEquals(result.is_view, True)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_normal_table(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, TABLE_DATA)
extractor = BigQueryMetadataExtractor()
......@@ -213,7 +213,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(first_col.description, 'some_description')
self.assertEquals(result.is_view, False)
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_with_nested_records(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, NESTED_DATA)
extractor = BigQueryMetadataExtractor()
......@@ -231,7 +231,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
self.assertEquals(third_col.name, 'nested.nested2.ahah')
self.assertEquals(third_col.type, 'STRING')
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_keypath_and_pagesize_can_be_set(self, mock_build):
config_dict = {
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.PROJECT_ID_KEY):
......@@ -250,7 +250,7 @@ class TestBigQueryMetadataExtractor(unittest.TestCase):
extractor.init(Scoped.get_scoped_conf(conf=conf,
scope=extractor.get_scope()))
@patch('databuilder.extractor.bigquery_metadata_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_part_of_table_date_range(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, TABLE_DATE_RANGE, TABLE_DATA)
extractor = BigQueryMetadataExtractor()
......
......@@ -194,7 +194,7 @@ class MockLoggingClient():
)
class TestBigqueryUsageExtractor(unittest.TestCase):
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_basic_extraction(self, mock_build):
"""
Test Extraction using mock class
......@@ -223,7 +223,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
self.assertEqual(key.email, 'your-user-here@test.com')
self.assertEqual(value, 1)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_no_entries(self, mock_build):
config_dict = {
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
......@@ -238,7 +238,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_key_path(self, mock_build):
"""
Test key_path can be used
......@@ -268,7 +268,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
self.assertEqual(creds.project_id, 'your-project-here')
self.assertEqual(creds.service_account_email, 'test-162@your-project-here.iam.gserviceaccount.com')
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_timestamp_pagesize_settings(self, mock_build):
"""
Test timestamp and pagesize can be set
......@@ -298,7 +298,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
self.assertEqual(body['pageSize'], PAGESIZE)
self.assertEqual(TIMESTAMP in body['filter'], True)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_failed_jobs_should_not_be_counted(self, mock_build):
config_dict = {
......@@ -316,7 +316,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_email_filter_not_counted(self, mock_build):
config_dict = {
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
......@@ -333,7 +333,7 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_email_filter_counted(self, mock_build):
config_dict = {
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
......
import logging
import unittest
from datetime import datetime
from mock import patch, Mock
from pyhocon import ConfigFactory
from databuilder import Scoped
from databuilder.extractor.bigquery_watermark_extractor import BigQueryWatermarkExtractor
logging.basicConfig(level=logging.INFO)
NO_DATASETS = {'kind': 'bigquery#datasetList', 'etag': '1B2M2Y8AsgTpgAmY7PhCfg=='}
ONE_DATASET = {'kind': 'bigquery#datasetList', 'etag': 'yScH5WIHeNUBF9b/VKybXA==',
'datasets': [{'kind': 'bigquery#dataset', 'id': 'your-project-here:empty', 'datasetReference':
{'datasetId': 'empty', 'projectId': 'your-project-here'}, 'location': 'US'}]} # noqa
NO_TABLES = {'kind': 'bigquery#tableList', 'etag': '1B2M2Y8AsgTpgAmY7PhCfg==', 'totalItems': 0}
ONE_TABLE = {'kind': 'bigquery#tableList', 'etag': 'Iaqrz2TCDIANAOD/Xerkjw==',
'tables': [{'kind': 'bigquery#table', 'id': 'your-project-here:fdgdfgh.nested_recs', 'tableReference':
{'projectId': 'your-project-here', 'datasetId': 'fdgdfgh', 'tableId': 'nested_recs'},
'type': 'TABLE', 'creationTime': '1557578974009'}],
'totalItems': 1} # noqa
TIME_PARTITIONED = {'kind': 'bigquery#tableList', 'etag': 'Iaqrz2TCDIANAOD/Xerkjw==',
'tables': [{'kind': 'bigquery#table', 'id': 'your-project-here:fdgdfgh.other', 'tableReference':
{'projectId': 'your-project-here', 'datasetId': 'fdgdfgh', 'tableId': 'other'},
'type': 'TABLE', 'timePartitioning': {'type': 'DAY', 'requirePartitionFilter': False},
'creationTime': '1557577779306'}], 'totalItems': 1} # noqa
TIME_PARTITIONED_WITH_FIELD = {'kind': 'bigquery#tableList', 'etag': 'Iaqrz2TCDIANAOD/Xerkjw==',
'tables': [{'kind': 'bigquery#table', 'id': 'your-project-here:fdgdfgh.other', 'tableReference':
{'projectId': 'your-project-here', 'datasetId': 'fdgdfgh', 'tableId': 'other'},
'type': 'TABLE', 'timePartitioning': {'type': 'DAY', 'field': 'processed_date',
'requirePartitionFilter': False}, 'creationTime': '1557577779306'}], 'totalItems': 1} # noqa
TABLE_DATE_RANGE = {'kind': 'bigquery#tableList', 'etag': 'Iaqrz2TCDIANAOD/Xerkjw==',
'tables': [{'kind': 'bigquery#table', 'id': 'your-project-here:fdgdfgh.other_20190101', 'tableReference':
{'projectId': 'your-project-here', 'datasetId': 'fdgdfgh', 'tableId': 'date_range_20190101'},
'type': 'TABLE', 'creationTime': '1557577779306'},
{'kind': 'bigquery#table', 'id': 'your-project-here:fdgdfgh.other_20190102', 'tableReference':
{'projectId': 'your-project-here', 'datasetId': 'fdgdfgh', 'tableId': 'date_range_20190102'},
'type': 'TABLE', 'creationTime': '1557577779306'}], 'totalItems': 2} # noqa
PARTITION_DATA = {'kind': 'bigquery#queryResponse',
'schema': {'fields': [{'name': 'partition_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'creation_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}]},
'jobReference': {'projectId': 'your-project-here', 'jobId': 'job_bfTRGj3Lv0tRjcrotXbZSgMCpNhY', 'location': 'EU'},
'totalRows': '3',
'rows': [{'f': [{'v': '20180802'}, {'v': '1.547512241348E9'}]},
{'f': [{'v': '20180803'}, {'v': '1.547512241348E9'}]},
{'f': [{'v': '20180804'}, {'v': '1.547512241348E9'}]}],
'totalBytesProcessed': '0','jobComplete': True,'cacheHit': False} # noqa
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError
class MockBigQueryClient():
def __init__(self, dataset_list_data, table_list_data, partition_data):
self.list_execute = Mock()
self.list_execute.execute.return_value = table_list_data
self.tables_method = Mock()
self.tables_method.list.return_value = self.list_execute
self.ds_execute = Mock()
self.ds_execute.execute.return_value = dataset_list_data
self.ds_list = Mock()
self.ds_list.list.return_value = self.ds_execute
self.query_execute = Mock()
self.query_execute.execute.return_value = partition_data
self.jobs_query = Mock()
self.jobs_query.query.return_value = self.query_execute
def datasets(self):
return self.ds_list
def tables(self):
return self.tables_method
def jobs(self):
return self.jobs_query
class TestBigQueryWatermarkExtractor(unittest.TestCase):
def setUp(self):
# type: () -> None
config_dict = {
'extractor.bigquery_watermarks.{}'.format(BigQueryWatermarkExtractor.PROJECT_ID_KEY):
'your-project-here'}
self.conf = ConfigFactory.from_dict(config_dict)
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_can_handle_no_datasets(self, mock_build):
mock_build.return_value = MockBigQueryClient(NO_DATASETS, None, None)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_empty_dataset(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, NO_TABLES, None)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_without_partitions(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, None)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_with_default_partitions(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, TIME_PARTITIONED, PARTITION_DATA)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertEquals(result.part_type, 'low_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'other')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1547512241).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('_partitiontime', '20180802')])
result = extractor.extract()
self.assertEquals(result.part_type, 'high_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'other')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1547512241).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('_partitiontime', '20180804')])
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_with_field_partitions(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, TIME_PARTITIONED_WITH_FIELD, PARTITION_DATA)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertEquals(result.part_type, 'low_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'other')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1547512241).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('processed_date', '20180802')])
result = extractor.extract()
self.assertEquals(result.part_type, 'high_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'other')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1547512241).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('processed_date', '20180804')])
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_keypath_can_be_set(self, mock_build):
config_dict = {
'extractor.bigquery_watermarks.{}'.format(BigQueryWatermarkExtractor.PROJECT_ID_KEY):
'your-project-here',
'extractor.bigquery_watermarks.{}'.format(BigQueryWatermarkExtractor.KEY_PATH_KEY):
'/tmp/doesnotexist',
}
conf = ConfigFactory.from_dict(config_dict)
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, None)
extractor = BigQueryWatermarkExtractor()
with self.assertRaises(FileNotFoundError):
extractor.init(Scoped.get_scoped_conf(conf=conf,
scope=extractor.get_scope()))
@patch('databuilder.extractor.base_bigquery_extractor.build')
def test_table_part_of_table_date_range(self, mock_build):
mock_build.return_value = MockBigQueryClient(ONE_DATASET, TABLE_DATE_RANGE, None)
extractor = BigQueryWatermarkExtractor()
extractor.init(Scoped.get_scoped_conf(conf=self.conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertEquals(result.part_type, 'low_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'date_range_')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1557577779).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('__table__', '20190101')])
result = extractor.extract()
self.assertEquals(result.part_type, 'high_watermark')
self.assertEquals(result.database, 'bigquery')
self.assertEquals(result.schema, 'fdgdfgh')
self.assertEquals(result.table, 'date_range_')
self.assertEquals(result.cluster, 'your-project-here')
self.assertEquals(result.create_time, datetime.fromtimestamp(1557577779).strftime('%Y-%m-%d %H:%M:%S'))
self.assertEquals(result.parts, [('__table__', '20190102')])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment