Unverified Commit 739c6383 authored by Tao Feng's avatar Tao Feng Committed by GitHub

Provide email filter for BQ usage extractor (#97)

parent 7b6bf54f
from collections import namedtuple
from datetime import date, timedelta
import logging
import re
from time import sleep
import google.oauth2.service_account
......@@ -30,6 +31,7 @@ class BigQueryTableUsageExtractor(Extractor):
PAGE_SIZE_KEY = 'page_size'
KEY_PATH_KEY = 'key_path'
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
EMAIL_PATTERN = 'email_pattern'
NUM_RETRIES = 3
DELAY_TIME = 10
......@@ -55,11 +57,13 @@ class BigQueryTableUsageExtractor(Extractor):
BigQueryTableUsageExtractor.PAGE_SIZE_KEY,
BigQueryTableUsageExtractor.DEFAULT_PAGE_SIZE)
self.email_pattern = conf.get_string(BigQueryTableUsageExtractor.EMAIL_PATTERN, None)
self.table_usage_counts = {}
self._count_usage()
self.iter = iter(self.table_usage_counts)
def _count_usage(self):
def _count_usage(self): # noqa: C901
# type: () -> None
count = 0
for entry in self._retrieve_records():
......@@ -89,6 +93,12 @@ class BigQueryTableUsageExtractor(Extractor):
# https://cloud.google.com/logging/docs/reference/audit/bigquery/rest/Shared.Types/AuditData#JobStatistics
continue
# if email filter is provided, only the email matched with filter will be recorded.
if self.email_pattern:
if not re.match(self.email_pattern, email):
# the usage account not match email pattern
continue
numTablesProcessed = job['jobStatistics']['totalTablesProcessed']
if len(refTables) != numTablesProcessed:
LOGGER.warn('The number of tables listed in job {job_id} is not consistent'
......
from setuptools import setup, find_packages
__version__ = '1.3.4'
__version__ = '1.3.5'
setup(
......
......@@ -288,3 +288,48 @@ class TestBigqueryUsageExtractor(unittest.TestCase):
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
def test_email_filter_not_counted(self, mock_build):
config_dict = {
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
'your-project-here',
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.EMAIL_PATTERN):
'emailFilter',
}
conf = ConfigFactory.from_dict(config_dict)
mock_build.return_value = MockLoggingClient(CORRECT_DATA)
extractor = BigQueryTableUsageExtractor()
extractor.init(Scoped.get_scoped_conf(conf=conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertIsNone(result)
@patch('databuilder.extractor.bigquery_usage_extractor.build')
def test_email_filter_counted(self, mock_build):
config_dict = {
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
'your-project-here',
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.EMAIL_PATTERN):
'.*@test.com.*',
}
conf = ConfigFactory.from_dict(config_dict)
mock_build.return_value = MockLoggingClient(CORRECT_DATA)
extractor = BigQueryTableUsageExtractor()
extractor.init(Scoped.get_scoped_conf(conf=conf,
scope=extractor.get_scope()))
result = extractor.extract()
self.assertIsInstance(result, tuple)
(key, value) = result
self.assertIsInstance(key, TableColumnUsageTuple)
self.assertIsInstance(value, int)
self.assertEqual(key.database, 'bigquery')
self.assertEqual(key.cluster, 'bigquery-public-data')
self.assertEqual(key.schema, 'austin_incidents')
self.assertEqual(key.table, 'incidents_2008')
self.assertEqual(key.email, 'your-user-here@test.com')
self.assertEqual(value, 1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment