# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import urllib.parse
from dataclasses import dataclass
from typing import Any, Dict, List
from amundsen_common.models.dashboard import DashboardSummary, DashboardSummarySchema
from amundsen_common.models.popular_table import PopularTable, PopularTableSchema
from amundsen_common.models.table import Table, TableSchema
from amundsen_application.models.user import load_user, dump_user
from amundsen_application.config import MatchRuleObject
from flask import current_app as app
import re
@dataclass
class TableUri:
database: str
cluster: str
schema: str
table: str
def __str__(self) -> str:
return f"{self.database}://{self.cluster}.{self.schema}/{self.table}"
@classmethod
def from_uri(cls, uri: str) -> 'TableUri':
parsed = urllib.parse.urlparse(uri)
cluster, schema = parsed.netloc.rsplit('.', 1)
return TableUri(
database=parsed.scheme,
cluster=cluster,
schema=schema,
table=parsed.path.lstrip('/')
)
def marshall_table_partial(table_dict: Dict) -> Dict:
"""
Forms a short version of a table Dict, with selected fields and an added 'key'
:param table_dict: Dict of partial table object
:return: partial table Dict
TODO - Unify data format returned by search and metadata.
"""
schema = PopularTableSchema(strict=True)
# TODO: consider migrating to validate() instead of roundtripping
table: PopularTable = schema.load(table_dict).data
results = schema.dump(table).data
# TODO: fix popular tables to provide these? remove if we're not using them?
# TODO: Add the 'key' or 'id' to the base PopularTableSchema
results['key'] = f'{table.database}://{table.cluster}.{table.schema}/{table.name}'
results['last_updated_timestamp'] = None
results['type'] = 'table'
return results
def _parse_editable_rule(rule: MatchRuleObject,
schema: str,
table: str) -> bool:
"""
Matches table name and schema with corresponding regex in matching rule
:parm rule: MatchRuleObject defined in list UNEDITABLE_TABLE_DESCRIPTION_MATCH_RULES in config file
:parm schema: schema name from Table Dict received from metadata service
:parm table: table name from Table Dict received from metadata service
:return: boolean which determines if table desc is editable or not for given table as per input matching rule
"""
if rule.schema_regex and rule.table_name_regex:
match_schema = re.match(rule.schema_regex, schema)
match_table = re.match(rule.table_name_regex, table)
return not (match_schema and match_table)
if rule.schema_regex:
return not re.match(rule.schema_regex, schema)
if rule.table_name_regex:
return not re.match(rule.table_name_regex, table)
return True
def is_table_editable(schema_name: str, table_name: str, cfg: Any = None) -> bool:
if cfg is None:
cfg = app.config
if schema_name in cfg['UNEDITABLE_SCHEMAS']:
return False
for rule in cfg['UNEDITABLE_TABLE_DESCRIPTION_MATCH_RULES']:
if not _parse_editable_rule(rule, schema_name, table_name):
return False
return True
def marshall_table_full(table_dict: Dict) -> Dict:
"""
Forms the full version of a table Dict, with additional and sanitized fields
:param table_dict: Table Dict from metadata service
:return: Table Dict with sanitized fields
"""
schema = TableSchema(strict=True)
# TODO: consider migrating to validate() instead of roundtripping
table: Table = schema.load(table_dict).data
results: Dict[str, Any] = schema.dump(table).data
is_editable = is_table_editable(results['schema'], results['name'])
results['is_editable'] = is_editable
# TODO - Cleanup https://github.com/lyft/amundsen/issues/296
# This code will try to supplement some missing data since the data here is incomplete.
# Once the metadata service response provides complete user objects we can remove this.
results['owners'] = [_map_user_object_to_schema(owner) for owner in results['owners']]
readers = results['table_readers']
for reader_object in readers:
reader_object['user'] = _map_user_object_to_schema(reader_object['user'])
columns = results['columns']
for col in columns:
# Set editable state
col['is_editable'] = is_editable
# If order is provided, we sort the column based on the pre-defined order
if app.config['COLUMN_STAT_ORDER']:
# the stat_type isn't defined in COLUMN_STAT_ORDER, we just use the max index for sorting
col['stats'].sort(key=lambda x: app.config['COLUMN_STAT_ORDER'].
get(x['stat_type'], len(app.config['COLUMN_STAT_ORDER'])))
# TODO: Add the 'key' or 'id' to the base TableSchema
results['key'] = f'{table.database}://{table.cluster}.{table.schema}/{table.name}'
# Temp code to make 'partition_key' and 'partition_value' part of the table
results['partition'] = _get_partition_data(results['watermarks'])
# We follow same style as column stat order for arranging the programmatic descriptions
prog_descriptions = results['programmatic_descriptions']
results['programmatic_descriptions'] = _convert_prog_descriptions(prog_descriptions)
return results
def marshall_dashboard_partial(dashboard_dict: Dict) -> Dict:
"""
Forms a short version of dashboard metadata, with selected fields and an added 'key'
and 'type'
:param dashboard_dict: Dict of partial dashboard metadata
:return: partial dashboard Dict
"""
schema = DashboardSummarySchema(strict=True)
dashboard: DashboardSummary = schema.load(dashboard_dict).data
results = schema.dump(dashboard).data
results['type'] = 'dashboard'
# TODO: Bookmark logic relies on key, opting to add this here to avoid messy logic in
# React app and we have to clean up later.
results['key'] = results.get('uri', '')
return results
def marshall_dashboard_full(dashboard_dict: Dict) -> Dict:
"""
Cleanup some fields in the dashboard response
:param dashboard_dict: Dashboard response from metadata service.
:return: Dashboard dictionary with sanitized fields, particularly the tables and owners.
"""
# TODO - Cleanup https://github.com/lyft/amundsen/issues/296
# This code will try to supplement some missing data since the data here is incomplete.
# Once the metadata service response provides complete user objects we can remove this.
dashboard_dict['owners'] = [_map_user_object_to_schema(owner) for owner in dashboard_dict['owners']]
dashboard_dict['tables'] = [marshall_table_partial(table) for table in dashboard_dict['tables']]
return dashboard_dict
def _convert_prog_descriptions(prog_descriptions: List = None) -> Dict:
"""
Apply the PROGRAMMATIC_DISPLAY configuration to convert to the structure.
:param prog_descriptions: A list of objects representing programmatic descriptions
:return: A dictionary with organized programmatic_descriptions
"""
left = [] # type: List
right = [] # type: List
other = prog_descriptions or [] # type: List
updated_descriptions = {}
if prog_descriptions:
# We want to make sure there is a display title that is just source
for desc in prog_descriptions:
source = desc.get('source')
if not source:
logging.warning("no source found in: " + str(desc))
# If config is defined for programmatic disply we organize and sort them based on the configuration
prog_display_config = app.config['PROGRAMMATIC_DISPLAY']
if prog_display_config:
left_config = prog_display_config.get('LEFT', {})
left = [x for x in prog_descriptions if x.get('source') in left_config]
left.sort(key=lambda x: _sort_prog_descriptions(left_config, x))
right_config = prog_display_config.get('RIGHT', {})
right = [x for x in prog_descriptions if x.get('source') in right_config]
right.sort(key=lambda x: _sort_prog_descriptions(right_config, x))
other_config = dict(filter(lambda x: x not in ['LEFT', 'RIGHT'], prog_display_config.items()))
other = list(filter(lambda x: x.get('source') not in left_config and x.get('source')
not in right_config, prog_descriptions))
other.sort(key=lambda x: _sort_prog_descriptions(other_config, x))
updated_descriptions['left'] = left
updated_descriptions['right'] = right
updated_descriptions['other'] = other
return updated_descriptions
def _sort_prog_descriptions(base_config: Dict, prog_description: Dict) -> int:
default_order = len(base_config)
prog_description_source = prog_description.get('source')
config_dict = base_config.get(prog_description_source)
if config_dict:
return config_dict.get('display_order', default_order)
return default_order
def _map_user_object_to_schema(u: Dict) -> Dict:
return dump_user(load_user(u))
def _get_partition_data(watermarks: Dict) -> Dict:
if watermarks:
high_watermark = next(filter(lambda x: x['watermark_type'] == 'high_watermark', watermarks))
if high_watermark:
return {
'is_partitioned': True,
'key': high_watermark['partition_key'],
'value': high_watermark['partition_value']
}
return {
'is_partitioned': False
}