Commit bbefcd8f authored by Gerard Toonstra's avatar Gerard Toonstra Committed by Tao Feng

Implements big query usage extractor (#48)

parent 112a4f3d
from collections import namedtuple
from datetime import date, timedelta
import logging
import google.oauth2.service_account
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, Optional # noqa: F401
from databuilder.extractor.base_extractor import Extractor
TableColumnUsageTuple = namedtuple('TableColumnUsageTuple', ['database', 'cluster', 'schema',
'table', 'column', 'email'])
LOGGER = logging.getLogger(__name__)
class BigQueryTableUsageExtractor(Extractor):
"""
An aggregate extractor for bigquery table usage. This class takes the data from
the stackdriver logging API by filtering on timestamp, bigquery_resource and looking
for referencedTables in the response.
"""
TIMESTAMP_KEY = 'timestamp'
PROJECT_ID_KEY = 'project_id'
DEFAULT_PAGE_SIZE = 300
PAGE_SIZE_KEY = 'page_size'
KEY_PATH_KEY = 'key_path'
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
NUM_RETRIES = 3
def init(self, conf):
# type: (ConfigTree) -> None
self.key_path = conf.get_string(BigQueryTableUsageExtractor.KEY_PATH_KEY, None)
if self.key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
self.key_path, scopes=BigQueryTableUsageExtractor._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=BigQueryTableUsageExtractor._DEFAULT_SCOPES)
http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.logging_service = build('logging', 'v2', http=authed_http, cache_discovery=False)
self.timestamp = conf.get_string(
BigQueryTableUsageExtractor.TIMESTAMP_KEY,
(date.today() - timedelta(days=1)).strftime('%Y-%m-%dT00:00:00Z'))
self.projectid = conf.get_string(BigQueryTableUsageExtractor.PROJECT_ID_KEY)
self.pagesize = conf.get_int(
BigQueryTableUsageExtractor.PAGE_SIZE_KEY,
BigQueryTableUsageExtractor.DEFAULT_PAGE_SIZE)
self.table_usage_counts = {}
self._count_usage()
self.iter = iter(self.table_usage_counts)
def _count_usage(self):
# type: () -> None
count = 0
for entry in self._retrieve_records():
count += 1
if count % self.pagesize == 0:
LOGGER.info('Aggregated {} records'.format(count))
job = entry['protoPayload']['serviceData']['jobCompletedEvent']['job']
if job['jobStatus']['state'] != 'DONE':
# This job seems not to have finished yet, so we ignore it.
continue
if len(job['jobStatus'].get('error', {})) > 0:
# This job has errors, so we ignore it
continue
email = entry['protoPayload']['authenticationInfo']['principalEmail']
refTables = job['jobStatistics'].get('referencedTables', None)
if not refTables:
# Query results can be cached and if the source tables remain untouched,
# bigquery will return it from a 24 hour cache result instead. In that
# case, referencedTables has been observed to be empty:
# https://cloud.google.com/logging/docs/reference/audit/bigquery/rest/Shared.Types/AuditData#JobStatistics
continue
numTablesProcessed = job['jobStatistics']['totalTablesProcessed']
if len(refTables) != numTablesProcessed:
LOGGER.warn('The number of tables listed in job {job_id} is not consistent'
.format(job_id=job['jobName']['jobId']))
for refTable in refTables:
key = TableColumnUsageTuple(database='bigquery',
cluster=refTable['projectId'],
schema=refTable['datasetId'],
table=refTable['tableId'],
column='*',
email=email)
new_count = self.table_usage_counts.get(key, 0) + 1
self.table_usage_counts[key] = new_count
def _retrieve_records(self):
# type: () -> Optional[Dict]
"""
Extracts bigquery log data by looking at the principalEmail in the
authenticationInfo block and referencedTables in the jobStatistics.
:return: Provides a record or None if no more to extract
"""
body = {
'resourceNames': [
'projects/{projectid}'.format(projectid=self.projectid)
],
'pageSize': self.pagesize,
'filter': 'resource.type="bigquery_resource" AND '
'protoPayload.methodName="jobservice.jobcompleted" AND '
'timestamp >= "{timestamp}"'.format(timestamp=self.timestamp)
}
for page in self._page_over_results(body):
for entry in page['entries']:
yield(entry)
def extract(self):
# type: () -> Optional[tuple]
try:
key = next(self.iter)
return key, self.table_usage_counts[key]
except StopIteration:
return None
def _page_over_results(self, body):
# type: (Dict) -> Optional[Dict]
response = self.logging_service.entries().list(body=body).execute(
num_retries=BigQueryTableUsageExtractor.NUM_RETRIES)
while response:
yield response
if 'nextPageToken' in response:
body['pageToken'] = response['nextPageToken']
response = self.logging_service.entries().list(body=body).execute(
num_retries=BigQueryTableUsageExtractor.NUM_RETRIES)
else:
response = None
def get_scope(self):
# type: () -> str
return 'extractor.bigquery_table_usage'
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, Optional # noqa: F401
from databuilder.transformer.base_transformer import Transformer
from databuilder.models.table_column_usage import ColumnReader, TableColumnUsage
from databuilder.extractor.bigquery_usage_extractor import TableColumnUsageTuple
class BigqueryUsageTransformer(Transformer):
def init(self, conf):
# type: (ConfigTree) -> None
"""
Transformer to convert TableColumnUsageTuple data to bigquery usage data
which can be uploaded to Neo4j
"""
self.conf = conf
def transform(self, record):
# type: (Dict) -> Optional[TableColumnUsage]
if not record:
return None
(key, count) = record
if not isinstance(key, TableColumnUsageTuple):
raise Exception("BigqueryUsageTransformer expects record of type TableColumnUsageTuple")
col_readers = []
col_readers.append(ColumnReader(database=key.database,
cluster=key.cluster,
schema=key.schema,
table=key.table,
column=key.column,
user_email=key.email,
read_count=count))
return TableColumnUsage(col_readers=col_readers)
def get_scope(self):
# type: () -> str
return 'transformer.bigquery_usage'
"""
This is a example script for extracting BigQuery usage results
"""
import logging
from pyhocon import ConfigFactory
import sqlite3
from databuilder.extractor.bigquery_usage_extractor import BigQueryTableUsageExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.bigquery_usage_transformer import BigqueryUsageTransformer
logging.basicConfig(level=logging.INFO)
# replace localhost with docker host ip
# todo: get the ip from input argument
NEO4J_ENDPOINT = 'bolt://localhost:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
def create_connection(db_file):
try:
conn = sqlite3.connect(db_file)
return conn
except Exception:
logging.exception('exception')
return None
# todo: Add a second model
def create_bq_job(metadata_type, gcloud_project):
tmp_folder = '/var/tmp/amundsen/{metadata_type}'.format(metadata_type=metadata_type)
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
bq_usage_extractor = BigQueryTableUsageExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=bq_usage_extractor,
loader=csv_loader,
transformer=BigqueryUsageTransformer())
job_config = ConfigFactory.from_dict({
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
gcloud_project,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR):
True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
return job
if __name__ == "__main__":
# start table job
job1 = create_bq_job('bigquery_usage', 'your-project-here')
job1.launch()
......@@ -54,3 +54,12 @@ pytz==2018.4
antlr4-python2-runtime==4.7.1
statsd==3.2.1
retrying==1.3.3
# Python API client for google
# License: Apache Software License
# Upstream url: https://github.com/googleapis/google-api-python-client
google-api-python-client>=1.6.0, <2.0.0dev
google-auth-httplib2>=0.0.1
google-auth>=1.0.0, <2.0.0dev
httplib2~=0.9.2
This diff is collapsed.
import unittest
from pyhocon import ConfigFactory
from databuilder.transformer.bigquery_usage_transformer import BigqueryUsageTransformer
from databuilder.extractor.bigquery_usage_extractor import TableColumnUsageTuple
from databuilder.models.table_column_usage import TableColumnUsage
class TestBigQueryUsageTransform(unittest.TestCase):
DATABASE = 'bigquery'
CLUSTER = 'your-project-here'
DATASET = 'dataset'
TABLE = 'table'
COLUMN = '*'
EMAIL = 'your-user-here@test.com'
READ_COUNT = 305
def test_transform_function(self):
# type: () -> None
config = ConfigFactory.from_dict({})
transformer = BigqueryUsageTransformer()
transformer.init(config)
key = TableColumnUsageTuple(database=TestBigQueryUsageTransform.DATABASE,
cluster=TestBigQueryUsageTransform.CLUSTER,
schema=TestBigQueryUsageTransform.DATASET,
table=TestBigQueryUsageTransform.TABLE,
column=TestBigQueryUsageTransform.COLUMN,
email=TestBigQueryUsageTransform.EMAIL)
t1 = (key, TestBigQueryUsageTransform.READ_COUNT)
xformed = transformer.transform(t1)
self.assertIsInstance(xformed, TableColumnUsage)
self.assertEqual(len(xformed.col_readers), 1)
col_reader = xformed.col_readers[0]
self.assertEqual(col_reader.cluster, TestBigQueryUsageTransform.CLUSTER)
self.assertEqual(col_reader.database, TestBigQueryUsageTransform.DATABASE)
self.assertEqual(col_reader.schema, TestBigQueryUsageTransform.DATASET)
self.assertEqual(col_reader.table, TestBigQueryUsageTransform.TABLE)
self.assertEqual(col_reader.column, TestBigQueryUsageTransform.COLUMN)
self.assertEqual(col_reader.user_email, TestBigQueryUsageTransform.EMAIL)
self.assertEqual(col_reader.read_count, TestBigQueryUsageTransform.READ_COUNT)
def test_scope(self):
config = ConfigFactory.from_dict({})
transformer = BigqueryUsageTransformer()
transformer.init(config)
self.assertEqual(transformer.get_scope(), 'transformer.bigquery_usage')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment