Unverified Commit 9cd1a841 authored by Tao Feng's avatar Tao Feng Committed by GitHub

Add delay handling in BQ usage extractor (#89)

parent 822bd16b
from collections import namedtuple from collections import namedtuple
from datetime import date, timedelta from datetime import date, timedelta
import logging import logging
from time import sleep
import google.oauth2.service_account import google.oauth2.service_account
import google_auth_httplib2 import google_auth_httplib2
...@@ -30,6 +31,7 @@ class BigQueryTableUsageExtractor(Extractor): ...@@ -30,6 +31,7 @@ class BigQueryTableUsageExtractor(Extractor):
KEY_PATH_KEY = 'key_path' KEY_PATH_KEY = 'key_path'
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',) _DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
NUM_RETRIES = 3 NUM_RETRIES = 3
DELAY_TIME = 10
def init(self, conf): def init(self, conf):
# type: (ConfigTree) -> None # type: (ConfigTree) -> None
...@@ -52,6 +54,7 @@ class BigQueryTableUsageExtractor(Extractor): ...@@ -52,6 +54,7 @@ class BigQueryTableUsageExtractor(Extractor):
self.pagesize = conf.get_int( self.pagesize = conf.get_int(
BigQueryTableUsageExtractor.PAGE_SIZE_KEY, BigQueryTableUsageExtractor.PAGE_SIZE_KEY,
BigQueryTableUsageExtractor.DEFAULT_PAGE_SIZE) BigQueryTableUsageExtractor.DEFAULT_PAGE_SIZE)
self.table_usage_counts = {} self.table_usage_counts = {}
self._count_usage() self._count_usage()
self.iter = iter(self.table_usage_counts) self.iter = iter(self.table_usage_counts)
...@@ -64,7 +67,11 @@ class BigQueryTableUsageExtractor(Extractor): ...@@ -64,7 +67,11 @@ class BigQueryTableUsageExtractor(Extractor):
if count % self.pagesize == 0: if count % self.pagesize == 0:
LOGGER.info('Aggregated {} records'.format(count)) LOGGER.info('Aggregated {} records'.format(count))
try:
job = entry['protoPayload']['serviceData']['jobCompletedEvent']['job'] job = entry['protoPayload']['serviceData']['jobCompletedEvent']['job']
except Exception:
# Skip the record if the record missing certain fields
continue
if job['jobStatus']['state'] != 'DONE': if job['jobStatus']['state'] != 'DONE':
# This job seems not to have finished yet, so we ignore it. # This job seems not to have finished yet, so we ignore it.
continue continue
...@@ -134,12 +141,16 @@ class BigQueryTableUsageExtractor(Extractor): ...@@ -134,12 +141,16 @@ class BigQueryTableUsageExtractor(Extractor):
while response: while response:
yield response yield response
try:
if 'nextPageToken' in response: if 'nextPageToken' in response:
body['pageToken'] = response['nextPageToken'] body['pageToken'] = response['nextPageToken']
response = self.logging_service.entries().list(body=body).execute( response = self.logging_service.entries().list(body=body).execute(
num_retries=BigQueryTableUsageExtractor.NUM_RETRIES) num_retries=BigQueryTableUsageExtractor.NUM_RETRIES)
else: else:
response = None response = None
except Exception:
# Add a delay when BQ quota exceeds limitation
sleep(BigQueryTableUsageExtractor.DELAY_TIME)
def get_scope(self): def get_scope(self):
# type: () -> str # type: () -> str
......
from setuptools import setup, find_packages from setuptools import setup, find_packages
__version__ = '1.3.0' __version__ = '1.3.1'
setup( setup(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment