Unverified Commit 46207eee authored by Carter Landis's avatar Carter Landis Committed by GitHub

feat: Add Tableau dashboard metadata extractors (#333)

Signed-off-by: 's avatarTao Feng <fengtao04@gmail.com>
parent 60c837d5
...@@ -656,6 +656,180 @@ def parse_tables(viz_widget: RedashVisualiationWidget) -> Iterator[TableRelation ...@@ -656,6 +656,180 @@ def parse_tables(viz_widget: RedashVisualiationWidget) -> Iterator[TableRelation
return [] return []
``` ```
### [TableauDashboardExtractor](./databuilder/extractor/dashboard/tableau/tableau_dashboard_extractor.py)
The included `TableauDashboardExtractor` provides support for extracting basic metadata for Tableau workbooks. All Tableau extractors including this one use the [Tableau Metadata GraphQL API](https://help.tableau.com/current/api/metadata_api/en-us/index.html) to gather the metadata. Tableau "workbooks" are mapped to Amundsen dashboards, and the top-level project in which these workbooks preside is the dashboard group. The metadata it gathers is as follows:
- Dashboard name (Workbook name)
- Dashboard description (Workbook description)
- Dashboard creation timestamp (Workbook creation timestamp)
- Dashboard group name (Workbook top-level folder name)
- Dashboard and dashboard group URL
If you wish to exclude top-level projects from being loaded, specify their names in the `tableau_excluded_projects` list and workbooks from any of those projects will not be indexed.
Tableau's concept of "owners" does not map cleanly into Amundsen's understanding of owners, as the owner of a Tableau workbook is simply whoever updated it last, even if they made a very small change. This can prove problematic in determining the true point of contact for a workbook, so it's simply omitted for now. Similarly, the hierachy of `dashboard/query/chart` in Amundsen does not map into Tableau, where `charts` have only an optional relation to queries and vice versa. For these reasons, there are not extractors for either entity.
The Tableau Metadata API also does not support usage or execution statistics, so there are no extractors for these entities either.
Sample job config:
```python
extractor = TableauDashboardExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_metadata.tableau_host': tableau_host,
'extractor.tableau_dashboard_metadata.api_version': tableau_api_version,
'extractor.tableau_dashboard_metadata.site_name': tableau_site_name,
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_name': tableau_personal_access_token_name,
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_secret': tableau_personal_access_token_secret,
'extractor.tableau_dashboard_metadata.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_metadata.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_metadata.database': tableau_dashboard_database,
'extractor.tableau_dashboard_metadata.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ",
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
### [TableauDashboardTableExtractor](./databuilder/extractor/dashboard/tableau/tableau_dashboard_table_extractor.py)
The included `TableauDashboardTableExtractor` provides support for extracting table metadata from Tableau workbooks. The extractor assumes all the table entities have already been created; if you are interested in using the provided `TableauExternalTableExtractor`, make sure that job runs before this one, as it will create the tables required by this job. It also assumes that the dashboards are using their names as the primary ID.
A sample job config is shown below. Configuration related to the loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#TableauDashboardExtractor) for the configuration that holds loader and publisher.
```python
extractor = TableauDashboardTableExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_table.tableau_host': tableau_host,
'extractor.tableau_dashboard_table.api_version': tableau_api_version,
'extractor.tableau_dashboard_table.site_name': tableau_site_name,
'extractor.tableau_dashboard_table.tableau_personal_access_token_name': tableau_personal_access_token_name,
'extractor.tableau_dashboard_table.tableau_personal_access_token_secret': tableau_personal_access_token_secret,
'extractor.tableau_dashboard_table.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_table.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_table.database': tableau_dashboard_database,
'extractor.tableau_dashboard_table.external_cluster_name': tableau_external_table_cluster,
'extractor.tableau_dashboard_table.external_schema_name': tableau_external_table_schema,
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
### [TableauDashboardQueryExtractor](./databuilder/extractor/dashboard/tableau/tableau_dashboard_query_extractor.py)
The included `TableauDashboardQueryExtractor` provides support for extracting query metadata from Tableau workbooks. It retrives the name and query text for each custom SQL query.
A sample job config is shown below. Configuration related to the loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#TableauDashboardExtractor) for the configuration that holds loader and publisher.
```python
extractor = TableauDashboardQueryExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_query.tableau_host': tableau_host,
'extractor.tableau_dashboard_query.api_version': tableau_api_version,
'extractor.tableau_dashboard_query.site_name': tableau_site_name,
'extractor.tableau_dashboard_query.tableau_personal_access_token_name': tableau_personal_access_token_name,
'extractor.tableau_dashboard_query.tableau_personal_access_token_secret': tableau_personal_access_token_secret,
'extractor.tableau_dashboard_query.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_query.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_query.database': tableau_dashboard_database,
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
### [TableauDashboardLastModifiedExtractor](./databuilder/extractor/dashboard/tableau/tableau_dashboard_last_modified_extractor.py)
The included `TableauDashboardLastModifiedExtractor` provides support for extracting the last updated timestamp for Tableau workbooks.
A sample job config is shown below. Configuration related to the loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#TableauDashboardExtractor) for the configuration that holds loader and publisher.
```python
extractor = TableauDashboardQueryExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_last_modified.tableau_host': tableau_host,
'extractor.tableau_dashboard_last_modified.api_version': tableau_api_version,
'extractor.tableau_dashboard_last_modified.site_name': tableau_site_name,
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_name': tableau_personal_access_token_name,
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_secret': tableau_personal_access_token_secret,
'extractor.tableau_dashboard_last_modified.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_last_modified.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_last_modified.database': tableau_dashboard_database,
'extractor.tableau_dashboard_last_modified.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ",
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
### [TableauExternalTableExtractor](./databuilder/extractor/dashboard/tableau/tableau_external_table_extractor.py)
The included `TableauExternalTableExtractor` provides support for extracting external table entities referenced by Tableau workbooks. In this context, "external" tables are "tables" that are not from a typical database, and are loaded using some other data format, like CSV files.
This extractor has been tested with the following types of external tables; feel free to add others, but it's recommended
to test them in a non-production instance first to be safe.
- Excel spreadsheets
- Text files (including CSV files)
- Salesforce connections
- Google Sheets connections
Use the `external_table_types` list config option to specify which external connection types you would like to index;
refer to your Tableau instance for the exact formatting of each connection type string.
Excel spreadsheets, Salesforce connections, and Google Sheets connections are all classified as
"databases" in terms of Tableau's Metadata API, with their "subsheets" forming their "tables" when
present. However, these tables are not assigned a schema, this extractor chooses to use the name
of the parent sheet as the schema, and assign a new table to each subsheet. The connection type is
always used as the database, and for text files, the schema is set using the `external_schema_name`
config option. Since these external tables are usually named for human consumption only and often
contain a wider range of characters, all inputs are sanitized to remove any problematic
occurences before they are inserted: see the `sanitize` methods `TableauDashboardUtils` for specifics.
A more concrete example: if one had a Google Sheet titled "Growth by Region" with 2 subsheets called
"FY19 Report" and "FY20 Report", two tables would be generated with the following keys:
`googlesheets://external.growth_by_region/FY_19_Report`
`googlesheets://external.growth_by_region/FY_20_Report`
A sample job config is shown below. Configuration related to the loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#TableauDashboardExtractor) for the configuration that holds loader and publisher.
```python
extractor = TableauExternalTableExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.tableau_external_table.tableau_host': tableau_host,
'extractor.tableau_external_table.api_version': tableau_api_version,
'extractor.tableau_external_table.site_name': tableau_site_name,
'extractor.tableau_external_table.tableau_personal_access_token_name': tableau_personal_access_token_name,
'extractor.tableau_external_table.tableau_personal_access_token_secret': tableau_personal_access_token_secret,
'extractor.tableau_external_table.excluded_projects': tableau_excluded_projects,
'extractor.tableau_external_table.cluster': tableau_dashboard_cluster,
'extractor.tableau_external_table.database': tableau_dashboard_database,
'extractor.tableau_external_table.external_cluster_name': tableau_external_table_cluster,
'extractor.tableau_external_table.external_schema_name': tableau_external_table_schema,
'extractor.tableau_external_table.external_table_types': tableau_external_table_types
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```
## List of transformers ## List of transformers
#### [ChainedTransformer](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer") #### [ChainedTransformer](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/transformer/base_transformer.py#L41 "ChainedTransformer")
......
API_VERSION = 'api_version'
TABLEAU_HOST = 'tableau_host'
SITE_NAME = 'site_name'
TABLEAU_ACCESS_TOKEN_NAME = 'tableau_personal_access_token_name'
TABLEAU_ACCESS_TOKEN_SECRET = 'tableau_personal_access_token_secret'
EXCLUDED_PROJECTS = 'excluded_projects'
EXTERNAL_CLUSTER_NAME = 'external_cluster_name'
EXTERNAL_SCHEMA_NAME = 'external_schema_name'
EXTERNAL_TABLE_TYPES = 'external_table_types'
CLUSTER = 'cluster'
DATABASE = 'database'
VERIFY_REQUEST = 'verify_request'
import logging
from typing import Any, Dict, Iterator, List
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import TableauGraphQLApiExtractor,\
TableauDashboardUtils
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.base_transformer import Transformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.timestamp_string_to_epoch import TimestampStringToEpoch, FIELD_NAME
LOGGER = logging.getLogger(__name__)
class TableauGraphQLApiMetadataExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the DashboardMetadata model. Allows workbooks to be exlcuded based on their project.
"""
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
TABLEAU_HOST = const.TABLEAU_HOST
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
workbooks_data = [workbook for workbook in response['workbooks']
if workbook['projectName'] not in
self._conf.get_list(TableauGraphQLApiMetadataExtractor.EXCLUDED_PROJECTS)]
for workbook in workbooks_data:
data = {
'dashboard_group': workbook['projectName'],
'dashboard_name': TableauDashboardUtils.sanitize_workbook_name(workbook['name']),
'description': workbook.get('description', ''),
'created_timestamp': workbook['createdAt'],
'dashboard_group_url': 'https://{}/#/projects/{}'.format(
self._conf.get(TableauGraphQLApiMetadataExtractor.TABLEAU_HOST),
workbook['projectVizportalUrlId']
),
'dashboard_url': 'https://{}/#/workbooks/{}/views'.format(
self._conf.get(TableauGraphQLApiMetadataExtractor.TABLEAU_HOST),
workbook['vizportalUrlId']
),
'cluster': self._conf.get_string(TableauGraphQLApiMetadataExtractor.CLUSTER)
}
yield data
class TableauDashboardExtractor(Extractor):
"""
Extracts core metadata about Tableau "dashboards".
For the purposes of this extractor, Tableau "workbooks" are mapped to Amundsen dashboards, and the
top-level project in which these workbooks preside is the dashboard group. The metadata it gathers is:
Dashboard name (Workbook name)
Dashboard description (Workbook description)
Dashboard creation timestamp (Workbook creationstamp)
Dashboard group name (Workbook top-level folder name)
Uses the Metadata API: https://help.tableau.com/current/api/metadata_api/en-us/index.html
"""
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query {
workbooks {
id
name
createdAt
description
projectName
projectVizportalUrlId
vizportalUrlId
}
}"""
self._extractor = self._build_extractor()
transformers: List[Transformer] = []
timestamp_str_to_epoch_transformer = TimestampStringToEpoch()
timestamp_str_to_epoch_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, timestamp_str_to_epoch_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict({FIELD_NAME: 'created_timestamp', })))
transformers.append(timestamp_str_to_epoch_transformer)
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard.dashboard_metadata.DashboardMetadata'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self) -> str:
return 'extractor.tableau_dashboard_metadata'
def _build_extractor(self) -> TableauGraphQLApiMetadataExtractor:
"""
Builds a TableauGraphQLApiMetadataExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLApiMetadataExtractor that provides core dashboard metadata.
"""
extractor = TableauGraphQLApiMetadataExtractor()
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict({TableauGraphQLApiExtractor.QUERY: self.query,
STATIC_RECORD_DICT: {'product': 'tableau'}
}
)
)
extractor.init(conf=tableau_extractor_conf)
return extractor
import logging
from typing import Any, Dict, Iterator, List
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import TableauGraphQLApiExtractor,\
TableauDashboardUtils
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.base_transformer import Transformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.timestamp_string_to_epoch import TimestampStringToEpoch, FIELD_NAME
LOGGER = logging.getLogger(__name__)
class TableauGraphQLApiLastModifiedExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the DashboardLastModifiedTimestamp model. Allows workbooks to be exlcuded based on their project.
"""
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
workbooks_data = [workbook for workbook in response['workbooks']
if workbook['projectName'] not in
self._conf.get_list(TableauGraphQLApiLastModifiedExtractor.EXCLUDED_PROJECTS)]
for workbook in workbooks_data:
data = {
'dashboard_group_id': workbook['projectName'],
'dashboard_id': TableauDashboardUtils.sanitize_workbook_name(workbook['name']),
'last_modified_timestamp': workbook['updatedAt'],
'cluster': self._conf.get_string(TableauGraphQLApiLastModifiedExtractor.CLUSTER)
}
yield data
class TableauDashboardLastModifiedExtractor(Extractor):
"""
Extracts metadata about the time of last update for Tableau dashboards.
For the purposes of this extractor, Tableau "workbooks" are mapped to Amundsen dashboards, and the
top-level project in which these workbooks preside is the dashboard group. The metadata it gathers is:
Dashboard last modified timestamp (Workbook last modified timestamp)
"""
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query {
workbooks {
id
name
projectName
updatedAt
}
}"""
self._extractor = self._build_extractor()
transformers: List[Transformer] = []
timestamp_str_to_epoch_transformer = TimestampStringToEpoch()
timestamp_str_to_epoch_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, timestamp_str_to_epoch_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict({FIELD_NAME: 'last_modified_timestamp', })))
transformers.append(timestamp_str_to_epoch_transformer)
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS:
'databuilder.models.dashboard.dashboard_last_modified.DashboardLastModifiedTimestamp'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self) -> str:
return 'extractor.tableau_dashboard_last_modified'
def _build_extractor(self) -> TableauGraphQLApiLastModifiedExtractor:
"""
Builds a TableauGraphQLApiExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLApiLastModifiedExtractor that provides dashboard update metadata.
"""
extractor = TableauGraphQLApiLastModifiedExtractor()
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict({TableauGraphQLApiExtractor.QUERY: self.query,
STATIC_RECORD_DICT: {'product': 'tableau'}
}
)
)
extractor.init(conf=tableau_extractor_conf)
return extractor
import logging
from typing import Any, Dict, Iterator
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import TableauGraphQLApiExtractor,\
TableauDashboardUtils
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
LOGGER = logging.getLogger(__name__)
class TableauGraphQLApiQueryExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the DashboardQuery model. Allows workbooks to be exlcuded based on their project.
"""
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
for query in response['customSQLTables']:
for workbook in query['downstreamWorkbooks']:
if workbook['projectName'] not in \
self._conf.get_list(TableauGraphQLApiQueryExtractor.EXCLUDED_PROJECTS):
data = {
'dashboard_group_id': workbook['projectName'],
'dashboard_id': TableauDashboardUtils.sanitize_workbook_name(workbook['name']),
'query_name': query['name'],
'query_id': query['id'],
'query_text': query['query'],
'cluster': self._conf.get_string(TableauGraphQLApiQueryExtractor.CLUSTER)
}
yield data
class TableauDashboardQueryExtractor(Extractor):
"""
Extracts metadata about the queries associated with Tableau workbooks.
In terms of Tableau's Metadata API, these queries are called "custom SQL tables".
However, not every workbook uses custom SQL queries, and most are built with a mixture of using the
datasource fields directly and various "calculated" columns.
This extractor iterates through one query at a time, yielding a new relationship for every downstream
workbook that uses the query.
"""
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query {
customSQLTables {
id
name
query
downstreamWorkbooks {
name
projectName
}
}
}"""
self._extractor = self._build_extractor()
transformers = []
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard.dashboard_query.DashboardQuery'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self) -> str:
return 'extractor.tableau_dashboard_query'
def _build_extractor(self) -> TableauGraphQLApiQueryExtractor:
"""
Builds a TableauGraphQLApiQueryExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLApiQueryExtractor that provides dashboard query metadata.
"""
extractor = TableauGraphQLApiQueryExtractor()
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict({TableauGraphQLApiExtractor.QUERY: self.query,
STATIC_RECORD_DICT: {'product': 'tableau'}
}
)
)
extractor.init(conf=tableau_extractor_conf)
return extractor
import logging
from typing import Any, Dict, Iterator
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import TableauGraphQLApiExtractor,\
TableauDashboardUtils
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.models.table_metadata import TableMetadata
LOGGER = logging.getLogger(__name__)
class TableauGraphQLDashboardTableExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the DashboardTable model. Allows workbooks to be exlcuded based on their project.
"""
CLUSTER = const.CLUSTER
DATABASE = const.DATABASE
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
workbooks_data = [workbook for workbook in response['workbooks']
if workbook['projectName'] not in
self._conf.get_list(TableauGraphQLDashboardTableExtractor.EXCLUDED_PROJECTS)]
for workbook in workbooks_data:
data = {
'dashboard_group_id': workbook['projectName'],
'dashboard_id': TableauDashboardUtils.sanitize_workbook_name(workbook['name']),
'cluster': self._conf.get_string(TableauGraphQLDashboardTableExtractor.CLUSTER),
'table_ids': []
}
for table in workbook['upstreamTables']:
# external tables have no schema, so they must be parsed differently
# see TableauExternalTableExtractor for more specifics
if table['schema'] != '':
cluster = self._conf.get_string(TableauGraphQLDashboardTableExtractor.CLUSTER)
database = self._conf.get_string(TableauGraphQLDashboardTableExtractor.DATABASE)
# Tableau sometimes incorrectly assigns the "schema" value
# based on how the datasource connection is used in a workbook.
# It will hide the real schema inside the table name, like "real_schema.real_table",
# and set the "schema" value to "wrong_schema". In every case discovered so far, the schema
# key is incorrect, so the "inner" schema from the table name is used instead.
if '.' in table['name']:
schema, name = table['name'].split('.')
else:
schema, name = table['schema'], table['name']
schema = TableauDashboardUtils.sanitize_schema_name(schema)
name = TableauDashboardUtils.sanitize_table_name(name)
else:
cluster = self._conf.get_string(TableauGraphQLDashboardTableExtractor.EXTERNAL_CLUSTER_NAME)
database = TableauDashboardUtils.sanitize_database_name(
table['database']['connectionType']
)
schema = TableauDashboardUtils.sanitize_schema_name(table['database']['name'])
name = TableauDashboardUtils.sanitize_table_name(table['name'])
table_id = TableMetadata.TABLE_KEY_FORMAT.format(
db=database,
cluster=cluster,
schema=schema,
tbl=name,
)
data['table_ids'].append(table_id)
yield data
class TableauDashboardTableExtractor(Extractor):
"""
Extracts metadata about the tables associated with Tableau workbooks.
It can handle both "regular" database tables as well as "external" tables
(see TableauExternalTableExtractor for more info on external tables).
Assumes that all the nodes for both the dashboards and the tables have already been created.
"""
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
DATABASE = const.DATABASE
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query {
workbooks {
name
projectName
upstreamTables {
name
schema
database {
name
connectionType
}
}
}
}"""
self._extractor = self._build_extractor()
transformers = []
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard.dashboard_table.DashboardTable'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self) -> str:
return 'extractor.tableau_dashboard_table'
def _build_extractor(self) -> TableauGraphQLDashboardTableExtractor:
"""
Builds a TableauGraphQLDashboardTableExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLDashboardTableExtractor that creates dashboard <> table relationships.
"""
extractor = TableauGraphQLDashboardTableExtractor()
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict({TableauGraphQLApiExtractor.QUERY: self.query,
STATIC_RECORD_DICT: {'product': 'tableau'}
}
)
)
extractor.init(conf=tableau_extractor_conf)
return extractor
import json
import requests
import re
from typing import Any, Dict, Iterator, Optional
from pyhocon import ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
class TableauDashboardUtils:
"""
Provides various utility functions specifc to the Tableau dashboard extractors.
"""
@staticmethod
def sanitize_schema_name(schema_name: str) -> str:
"""
Sanitizes a given string so that it can safely be used as a table's schema.
Sanitization behaves as follows:
- all spaces and periods are replaced by underscores
- any [], (), -, &, and ? characters are deleted
"""
# this indentation looks a little odd, but otherwise the linter complains
return re.sub(r' ', '_',
re.sub(r'\.', '_',
re.sub(r'(\[|\]|\(|\)|\-|\&|\?)', '', schema_name)))
@staticmethod
def sanitize_database_name(database_name: str) -> str:
"""
Sanitizes a given string so that it can safely be used as a table's database.
Sanitization behaves as follows:
- all hyphens are deleted
"""
return re.sub(r'-', '', database_name)
@staticmethod
def sanitize_table_name(table_name: str) -> str:
"""
Sanitizes a given string so that it can safely be used as a table name.
Replicates the current behavior of sanitize_workbook_name, but this is purely coincidental.
As more breaking characters/patterns are found, each method should be updated to reflect the specifics.
Sanitization behaves as follows:
- all forward slashes and single quotes characters are deleted
"""
return re.sub(r'(\/|\')', '', table_name)
@staticmethod
def sanitize_workbook_name(workbook_name: str) -> str:
"""
Sanitizes a given string so that it can safely be used as a workbook ID.
Mimics the current behavior of sanitize_table_name for now, but is purely coincidental.
As more breaking characters/patterns are found, each method should be updated to reflect the specifics.
Sanitization behaves as follows:
- all forward slashes and single quotes characters are deleted
"""
return re.sub(r'(\/|\')', '', workbook_name)
class TableauGraphQLApiExtractor(Extractor):
"""
Base class for querying the Tableau Metdata API, which uses a GraphQL schema.
"""
QUERY = 'query'
QUERY_VARIABLES = 'query_variables'
TABLEAU_HOST = const.TABLEAU_HOST
VERIFY_REQUEST = 'verify_request'
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self._auth_token = TableauDashboardAuth(self._conf).token
self._query = self._conf.get(TableauGraphQLApiExtractor.QUERY)
self._iterator: Optional[Iterator[Dict[str, Any]]] = None
self._static_dict = conf.get(STATIC_RECORD_DICT, dict())
self._metadata_url = 'https://{TABLEAU_HOST}/api/metadata/graphql'.format(
TABLEAU_HOST=self._conf.get_string(TableauGraphQLApiExtractor.TABLEAU_HOST)
)
self._query_variables = self._conf.get(TableauGraphQLApiExtractor.QUERY_VARIABLES, {})
self._verify_request = self._conf.get(TableauGraphQLApiExtractor.VERIFY_REQUEST, True)
def execute_query(self) -> Dict[str, Any]:
"""
Executes the extractor's given query and returns the data from the results.
"""
query_payload = json.dumps({
'query': self._query,
'variables': self._query_variables
})
headers = {
'Content-Type': 'application/json',
'X-Tableau-Auth': self._auth_token
}
params = {
'data': query_payload,
'headers': headers,
'verify': self._verify_request
}
response = requests.post(url=self._metadata_url, **params)
return response.json()['data']
def execute(self) -> Iterator[Dict[str, Any]]:
"""
Must be overriden by any extractor using this class. This should parse the result and yield each entity's
metadata one by one.
"""
pass
def extract(self) -> Any:
"""
Fetch one result at a time from the generator created by self.execute(), updating using the
static record values if needed.
"""
if not self._iterator:
self._iterator = self.execute()
try:
record = next(self._iterator)
except StopIteration:
return None
if self._static_dict:
record.update(self._static_dict)
return record
class TableauDashboardAuth:
"""
Attempts to authenticate agains the Tableau REST API using the provided personal access token credentials.
When successful, it will create a valid token that must be used on all subsequent requests.
https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_concepts_auth.htm
"""
API_VERSION = const.API_VERSION
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def __init__(self, conf: ConfigTree) -> None:
self._token: Optional[str] = None
self._conf = conf
self._access_token_name = self._conf.get_string(TableauDashboardAuth.TABLEAU_ACCESS_TOKEN_NAME)
self._access_token_secret = self._conf.get_string(TableauDashboardAuth.TABLEAU_ACCESS_TOKEN_SECRET)
self._api_version = self._conf.get_string(TableauDashboardAuth.API_VERSION)
self._site_name = self._conf.get_string(TableauDashboardAuth.SITE_NAME)
self._tableau_host = self._conf.get_string(TableauDashboardAuth.TABLEAU_HOST)
self._verify_request = self._conf.get(TableauDashboardAuth.VERIFY_REQUEST, True)
@property
def token(self) -> Optional[str]:
if not self._token:
self._token = self._authenticate()
return self._token
def _authenticate(self) -> str:
"""
Queries the auth/signin endpoint for the given Tableau instance using a personal access token.
The API version differs with your version of Tableau.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_concepts_versions.htm
for details or ask your Tableau server administrator.
"""
self._auth_url = "https://{tableau_host}/api/{api_version}/auth/signin".format(
tableau_host=self._tableau_host,
api_version=self._api_version
)
payload = json.dumps({
'credentials': {
'personalAccessTokenName': self._access_token_name,
'personalAccessTokenSecret': self._access_token_secret,
'site': {
'contentUrl': self._site_name
}
}
})
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
# verify = False is needed bypass occasional (valid) self-signed cert errors. TODO: actually fix it!!
params = {
'headers': headers,
'verify': self._verify_request
}
response_json = requests.post(url=self._auth_url, data=payload, **params).json()
return response_json['credentials']['token']
import logging
from typing import Any, Dict, Iterator
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import TableauGraphQLApiExtractor,\
TableauDashboardUtils
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
LOGGER = logging.getLogger(__name__)
class TableauGraphQLExternalTableExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the TableMetadata model.
"""
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
EXTERNAL_SCHEMA_NAME = const.EXTERNAL_SCHEMA_NAME
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
for table in response['databases']:
if table['connectionType'] in ['google-sheets', 'salesforce', 'excel-direct']:
for downstreamTable in table['tables']:
data = {
'cluster': self._conf.get_string(TableauGraphQLExternalTableExtractor.EXTERNAL_CLUSTER_NAME),
'database': TableauDashboardUtils.sanitize_database_name(
table['connectionType']
),
'schema': TableauDashboardUtils.sanitize_schema_name(table['name']),
'name': TableauDashboardUtils.sanitize_table_name(downstreamTable['name']),
'description': table['description']
}
yield data
else:
data = {
'cluster': self._conf.get_string(TableauGraphQLExternalTableExtractor.EXTERNAL_CLUSTER_NAME),
'database': TableauDashboardUtils.sanitize_database_name(table['connectionType']),
'schema': self._conf.get_string(TableauGraphQLExternalTableExtractor.EXTERNAL_SCHEMA_NAME),
'name': TableauDashboardUtils.sanitize_table_name(table['name']),
'description': table['description']
}
yield data
class TableauDashboardExternalTableExtractor(Extractor):
"""
Creates the "external" Tableau tables.
In this context, "external" tables are "tables" that are not from a typical database, and are loaded
using some other data format, like CSV files.
This extractor has been tested with the following types of external tables:
Excel spreadsheets
Text files (including CSV files)
Salesforce connections
Google Sheets connections
Excel spreadsheets, Salesforce connections, and Google Sheets connections are all classified as
"databases" in terms of Tableau's Metadata API, with their "subsheets" forming their "tables" when
present. However, these tables are not assigned a schema, this extractor chooses to use the name
parent sheet as the schema, and assign a new table to each subsheet. The connection type is
always used as the database, and for text files, the schema is set using the EXTERNAL_SCHEMA_NAME
config option. Since these external tables are usually named for human consumption only and often
contain a wider range of characters, all inputs are transformed to remove any problematic
occurences before they are inserted: see the sanitize methods TableauDashboardUtils for specifics.
A more concrete example: if one had a Google Sheet titled "Growth by Region & County" with 2 subsheets called
"FY19 Report" and "FY20 Report", two tables would be generated with the following keys:
googlesheets://external.growth_by_region_county/FY_19_Report
googlesheets://external.growth_by_region_county/FY_20_Report
"""
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
EXTERNAL_SCHEMA_NAME = const.EXTERNAL_SCHEMA_NAME
EXTERNAL_TABLE_TYPES = const.EXTERNAL_TABLE_TYPES
SITE_NAME = const.SITE_NAME
TABLEAU_HOST = const.TABLEAU_HOST
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query externalTables($externalTableTypes: [String]) {
databases (filter: {connectionTypeWithin: $externalTableTypes}) {
name
connectionType
description
tables {
name
}
}
}"""
self.query_variables = {
'externalTableTypes': self._conf.get_list(TableauDashboardExternalTableExtractor.EXTERNAL_TABLE_TYPES)}
self._extractor = self._build_extractor()
transformers = []
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.table_metadata.TableMetadata'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return self._transformer.transform(record=record)
def get_scope(self) -> str:
return 'extractor.tableau_external_table'
def _build_extractor(self) -> TableauGraphQLExternalTableExtractor:
"""
Builds a TableauGraphQLExternalTableExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLExternalTableExtractor that creates external table metadata entities.
"""
extractor = TableauGraphQLExternalTableExtractor()
config_dict = {
TableauGraphQLApiExtractor.QUERY_VARIABLES: self.query_variables,
TableauGraphQLApiExtractor.QUERY: self.query}
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict(config_dict))
extractor.init(conf=tableau_extractor_conf)
return extractor
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script demonstrating how to load data into Neo4j and
Elasticsearch without using an Airflow DAG.
It contains several jobs:
- `run_tableau_*_job: executes a job to execute a specific Tableau extractor
and publish the resulting metadata to neo4j
- `create_es_publisher_sample_job`: creates a job that extracts data from neo4j
and pubishes it into elasticsearch.
For other available extractors, please take a look at
https://github.com/lyft/amundsendatabuilder#list-of-extractors
"""
import logging
import os
import sys
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher.elasticsearch_constants import DASHBOARD_ELASTICSEARCH_INDEX_MAPPING
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.dashboard.tableau.tableau_dashboard_extractor import TableauDashboardExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_last_modified_extractor import \
TableauDashboardLastModifiedExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_query_extractor import TableauDashboardQueryExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_table_extractor import TableauDashboardTableExtractor
from databuilder.extractor.dashboard.tableau.tableau_external_table_extractor import \
TableauDashboardExternalTableExtractor
es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost')
neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost')
es_port = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_PORT', 9200)
neo_port = os.getenv('CREDENTIALS_NEO4J_PROXY_PORT', 7687)
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host, 'port': es_port},
])
Base = declarative_base()
NEO4J_ENDPOINT = 'bolt://{}:{}'.format(neo_host, neo_port)
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
LOGGER = logging.getLogger(__name__)
tableau_host = ""
tableau_api_version = 0
tableau_site_name = ""
tableau_personal_access_token_name = ""
tableau_personal_access_token_secret = ""
tableau_excluded_projects = []
tableau_dashboard_cluster = ""
tableau_dashboard_database = ""
tableau_external_table_cluster = ""
tableau_external_table_schema = ""
tableau_external_table_types = []
tableau_verify_request = True
common_tableau_config = {
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'unique_tag', # should use unique tag here like {ds}
}
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
entity_type='table',
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_{uuid}`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine
Cypher query to extract data from Neo4j. Defaults to `table`.
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
'extractor.search_data.entity_type': entity_type,
'extractor.search_data.extractor.neo4j.graph_url': neo4j_endpoint,
'extractor.search_data.extractor.neo4j.model_class': model_name,
'extractor.search_data.extractor.neo4j.neo4j_auth_user': neo4j_user,
'extractor.search_data.extractor.neo4j.neo4j_auth_pw': neo4j_password,
'extractor.search_data.extractor.neo4j.neo4j_encrypted': False,
'loader.filesystem.elasticsearch.file_path': extracted_search_data_path,
'loader.filesystem.elasticsearch.mode': 'w',
'publisher.elasticsearch.file_path': extracted_search_data_path,
'publisher.elasticsearch.mode': 'r',
'publisher.elasticsearch.client': elasticsearch_client,
'publisher.elasticsearch.new_index': elasticsearch_new_index_key,
'publisher.elasticsearch.doc_type': elasticsearch_doc_type_key,
'publisher.elasticsearch.alias': elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
def run_tableau_metadata_job():
task = DefaultTask(extractor=TableauDashboardExtractor(), loader=FsNeo4jCSVLoader())
tmp_folder = '/var/tmp/amundsen/tableau_dashboard_metadata'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
dict_config = common_tableau_config
dict_config.update({
'extractor.tableau_dashboard_metadata.tableau_host': tableau_host,
'extractor.tableau_dashboard_metadata.api_version': tableau_api_version,
'extractor.tableau_dashboard_metadata.site_name': tableau_site_name,
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_name':
tableau_personal_access_token_name,
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_secret':
tableau_personal_access_token_secret,
'extractor.tableau_dashboard_metadata.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_metadata.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_metadata.database': tableau_dashboard_database,
'extractor.tableau_dashboard_metadata.transformer.timestamp_str_to_epoch.timestamp_format':
"%Y-%m-%dT%H:%M:%SZ",
'extractor.tableau_dashboard_metadata.verify_request': tableau_verify_request,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'task.progress_report_frequency': 100,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
})
job_config = ConfigFactory.from_dict(dict_config)
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_tableau_last_modified_job():
task = DefaultTask(extractor=TableauDashboardLastModifiedExtractor(), loader=FsNeo4jCSVLoader())
tmp_folder = '/var/tmp/amundsen/tableau_dashboard_user'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
dict_config = common_tableau_config
dict_config.update({
'extractor.tableau_dashboard_last_modified.tableau_host': tableau_host,
'extractor.tableau_dashboard_last_modified.api_version': tableau_api_version,
'extractor.tableau_dashboard_last_modified.site_name': tableau_site_name,
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_name':
tableau_personal_access_token_name,
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_secret':
tableau_personal_access_token_secret,
'extractor.tableau_dashboard_last_modified.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_last_modified.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_last_modified.database': tableau_dashboard_database,
'extractor.tableau_dashboard_last_modified.transformer.timestamp_str_to_epoch.timestamp_format':
"%Y-%m-%dT%H:%M:%SZ",
'extractor.tableau_dashboard_last_modified.verify_request': tableau_verify_request,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'task.progress_report_frequency': 100,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
})
job_config = ConfigFactory.from_dict(dict_config)
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_tableau_query_job():
task = DefaultTask(extractor=TableauDashboardQueryExtractor(), loader=FsNeo4jCSVLoader())
tmp_folder = '/var/tmp/amundsen/tableau_dashboard_query'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
dict_config = common_tableau_config
dict_config.update({
'extractor.tableau_dashboard_query.tableau_host': tableau_host,
'extractor.tableau_dashboard_query.api_version': tableau_api_version,
'extractor.tableau_dashboard_query.site_name': tableau_site_name,
'extractor.tableau_dashboard_query.tableau_personal_access_token_name':
tableau_personal_access_token_name,
'extractor.tableau_dashboard_query.tableau_personal_access_token_secret':
tableau_personal_access_token_secret,
'extractor.tableau_dashboard_query.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_query.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_query.database': tableau_dashboard_database,
'extractor.tableau_dashboard_query.transformer.timestamp_str_to_epoch.timestamp_format':
"%Y-%m-%dT%H:%M:%SZ",
'extractor.tableau_dashboard_query.verify_request': tableau_verify_request,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'task.progress_report_frequency': 100,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
})
job_config = ConfigFactory.from_dict(dict_config)
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_tableau_table_job():
task = DefaultTask(extractor=TableauDashboardTableExtractor(), loader=FsNeo4jCSVLoader())
tmp_folder = '/var/tmp/amundsen/tableau_dashboard_table'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
dict_config = common_tableau_config
dict_config.update({
'extractor.tableau_dashboard_table.tableau_host': tableau_host,
'extractor.tableau_dashboard_table.api_version': tableau_api_version,
'extractor.tableau_dashboard_table.site_name': tableau_site_name,
'extractor.tableau_dashboard_table.tableau_personal_access_token_name':
tableau_personal_access_token_name,
'extractor.tableau_dashboard_table.tableau_personal_access_token_secret':
tableau_personal_access_token_secret,
'extractor.tableau_dashboard_table.excluded_projects': tableau_excluded_projects,
'extractor.tableau_dashboard_table.cluster': tableau_dashboard_cluster,
'extractor.tableau_dashboard_table.database': tableau_dashboard_database,
'extractor.tableau_dashboard_table.external_cluster_name': tableau_external_table_cluster,
'extractor.tableau_dashboard_table.external_schema_name': tableau_external_table_schema,
'extractor.tableau_dashboard_table.transformer.timestamp_str_to_epoch.timestamp_format': "%Y-%m-%dT%H:%M:%SZ",
'extractor.tableau_dashboard_table.verify_request': tableau_verify_request,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'task.progress_report_frequency': 100,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
})
job_config = ConfigFactory.from_dict(dict_config)
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_tableau_external_table_job():
task = DefaultTask(extractor=TableauDashboardExternalTableExtractor(), loader=FsNeo4jCSVLoader())
tmp_folder = '/var/tmp/amundsen/tableau_dashboard_external_table'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
dict_config = common_tableau_config
dict_config.update({
'extractor.tableau_external_table.tableau_host': tableau_host,
'extractor.tableau_external_table.api_version': tableau_api_version,
'extractor.tableau_external_table.site_name': tableau_site_name,
'extractor.tableau_external_table.tableau_personal_access_token_name':
tableau_personal_access_token_name,
'extractor.tableau_external_table.tableau_personal_access_token_secret':
tableau_personal_access_token_secret,
'extractor.tableau_external_table.excluded_projects': tableau_excluded_projects,
'extractor.tableau_external_table.cluster': tableau_dashboard_cluster,
'extractor.tableau_external_table.database': tableau_dashboard_database,
'extractor.tableau_external_table.external_cluster_name': tableau_external_table_cluster,
'extractor.tableau_external_table.external_schema_name': tableau_external_table_schema,
'extractor.tableau_external_table.external_table_types': tableau_external_table_types,
'extractor.tableau_external_table.verify_request': tableau_verify_request,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'task.progress_report_frequency': 100,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
})
job_config = ConfigFactory.from_dict(dict_config)
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)
run_tableau_metadata_job()
run_tableau_external_table_job()
run_tableau_table_job()
run_tableau_query_job()
run_tableau_last_modified_job()
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
entity_type='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
job_es_dashboard = create_es_publisher_sample_job(
elasticsearch_index_alias='dashboard_search_index',
elasticsearch_doc_type_key='dashboard',
model_name='databuilder.models.dashboard_elasticsearch_document.DashboardESDocument',
entity_type='dashboard',
elasticsearch_mapping=DASHBOARD_ELASTICSEARCH_INDEX_MAPPING)
job_es_dashboard.launch()
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from typing import Any, Dict
from mock import patch
from pyhocon import ConfigFactory
from databuilder import Scoped
from databuilder.extractor.dashboard.tableau.tableau_dashboard_extractor import TableauDashboardExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils \
import TableauDashboardAuth, TableauGraphQLApiExtractor
logging.basicConfig(level=logging.INFO)
def mock_query(*_args: Any, **_kwargs: Any) -> Dict[str, Any]:
return {
'workbooks': [
{
'id': 'fake-id',
'name': 'Test Workbook',
'createdAt': '2020-04-08T05:32:01Z',
'description': '',
'projectName': 'Test Project',
'projectVizportalUrlId': 123,
'vizportalUrlId': 456
}
]
}
def mock_token(*_args: Any, **_kwargs: Any) -> str:
return '123-abc'
class TestTableauDashboardExtractor(unittest.TestCase):
@patch.object(TableauDashboardAuth, '_authenticate', mock_token)
@patch.object(TableauGraphQLApiExtractor, 'execute_query', mock_query)
def test_dashboard_metadata_extractor(self) -> None:
config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_metadata.tableau_host': 'tableau_host',
'extractor.tableau_dashboard_metadata.api_version': 'tableau_api_version',
'extractor.tableau_dashboard_metadata.site_name': 'tableau_site_name',
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_name':
'tableau_personal_access_token_name',
'extractor.tableau_dashboard_metadata.tableau_personal_access_token_secret':
'tableau_personal_access_token_secret',
'extractor.tableau_dashboard_metadata.excluded_projects': [],
'extractor.tableau_dashboard_metadata.cluster': 'tableau_dashboard_cluster',
'extractor.tableau_dashboard_metadata.database': 'tableau_dashboard_database',
'extractor.tableau_dashboard_metadata.transformer.timestamp_str_to_epoch.timestamp_format':
'%Y-%m-%dT%H:%M:%SZ',
})
extractor = TableauDashboardExtractor()
extractor.init(Scoped.get_scoped_conf(conf=config, scope=extractor.get_scope()))
record = extractor.extract()
self.assertEqual(record.dashboard_id, 'Test Workbook')
self.assertEqual(record.dashboard_name, 'Test Workbook')
self.assertEqual(record.dashboard_group_id, 'Test Project')
self.assertEqual(record.dashboard_group, 'Test Project')
self.assertEqual(record.product, 'tableau')
self.assertEqual(record.cluster, 'tableau_dashboard_cluster')
self.assertEqual(record.created_timestamp, 1586323921)
if __name__ == '__main__':
unittest.main()
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from typing import Any, Dict
from mock import patch
from pyhocon import ConfigFactory
from databuilder import Scoped
from databuilder.extractor.dashboard.tableau.tableau_dashboard_last_modified_extractor \
import TableauDashboardLastModifiedExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils \
import TableauDashboardAuth, TableauGraphQLApiExtractor
logging.basicConfig(level=logging.INFO)
def mock_query(*_args: Any, **_kwargs: Any) -> Dict[str, Any]:
return {
'workbooks': [
{
'id': 'fake-workbook-id',
'name': 'Test Workbook',
'projectName': 'Test Project',
'updatedAt': '2020-08-04T20:16:05Z'
}
]
}
def mock_token(*_args: Any, **_kwargs: Any) -> str:
return '123-abc'
class TestTableauDashboardLastModified(unittest.TestCase):
@patch.object(TableauDashboardAuth, '_authenticate', mock_token)
@patch.object(TableauGraphQLApiExtractor, 'execute_query', mock_query)
def test_dashboard_last_modified_extractor(self) -> None:
config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_last_modified.tableau_host': 'tableau_host',
'extractor.tableau_dashboard_last_modified.api_version': 'tableau_api_version',
'extractor.tableau_dashboard_last_modified.site_name': 'tableau_site_name',
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_name':
'tableau_personal_access_token_name',
'extractor.tableau_dashboard_last_modified.tableau_personal_access_token_secret':
'tableau_personal_access_token_secret',
'extractor.tableau_dashboard_last_modified.excluded_projects': [],
'extractor.tableau_dashboard_last_modified.cluster': 'tableau_dashboard_cluster',
'extractor.tableau_dashboard_last_modified.database': 'tableau_dashboard_database',
'extractor.tableau_dashboard_last_modified.transformer.timestamp_str_to_epoch.timestamp_format':
'%Y-%m-%dT%H:%M:%SZ',
})
extractor = TableauDashboardLastModifiedExtractor()
extractor.init(Scoped.get_scoped_conf(conf=config, scope=extractor.get_scope()))
record = extractor.extract()
self.assertEqual(record._dashboard_id, 'Test Workbook')
self.assertEqual(record._dashboard_group_id, 'Test Project')
self.assertEqual(record._product, 'tableau')
self.assertEqual(record._cluster, 'tableau_dashboard_cluster')
self.assertEqual(record._last_modified_timestamp, 1596572165)
if __name__ == '__main__':
unittest.main()
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from typing import Any, Dict
from mock import patch
from pyhocon import ConfigFactory
from databuilder import Scoped
from databuilder.extractor.dashboard.tableau.tableau_dashboard_query_extractor import TableauDashboardQueryExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils \
import TableauDashboardAuth, TableauGraphQLApiExtractor
logging.basicConfig(level=logging.INFO)
def mock_query(*_args: Any, **_kwargs: Any) -> Dict[str, Any]:
return {
'customSQLTables': [
{
'id': 'fake-query-id',
'name': 'Test Query',
'query': 'SELECT * FROM foo',
'downstreamWorkbooks': [
{
'name': 'Test Workbook',
'projectName': 'Test Project'
}
]
}
]
}
def mock_token(*_args: Any, **_kwargs: Any) -> str:
return '123-abc'
class TestTableauDashboardQuery(unittest.TestCase):
@patch.object(TableauDashboardAuth, '_authenticate', mock_token)
@patch.object(TableauGraphQLApiExtractor, 'execute_query', mock_query)
def test_dashboard_query_extractor(self) -> None:
config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_query.tableau_host': 'tableau_host',
'extractor.tableau_dashboard_query.api_version': 'tableau_api_version',
'extractor.tableau_dashboard_query.site_name': 'tableau_site_name',
'extractor.tableau_dashboard_query.tableau_personal_access_token_name':
'tableau_personal_access_token_name',
'extractor.tableau_dashboard_query.tableau_personal_access_token_secret':
'tableau_personal_access_token_secret',
'extractor.tableau_dashboard_query.excluded_projects': [],
'extractor.tableau_dashboard_query.cluster': 'tableau_dashboard_cluster',
'extractor.tableau_dashboard_query.database': 'tableau_dashboard_database',
'extractor.tableau_dashboard_query.transformer.timestamp_str_to_epoch.timestamp_format':
'%Y-%m-%dT%H:%M:%SZ',
})
extractor = TableauDashboardQueryExtractor()
extractor.init(Scoped.get_scoped_conf(conf=config, scope=extractor.get_scope()))
record = extractor.extract()
self.assertEqual(record._query_name, 'Test Query')
self.assertEqual(record._query_text, 'SELECT * FROM foo')
self.assertEqual(record._dashboard_id, 'Test Workbook')
self.assertEqual(record._dashboard_group_id, 'Test Project')
if __name__ == '__main__':
unittest.main()
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from typing import Any, Dict
from mock import patch
from pyhocon import ConfigFactory
from databuilder import Scoped
from databuilder.extractor.dashboard.tableau.tableau_dashboard_table_extractor import TableauDashboardTableExtractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils \
import TableauDashboardAuth, TableauGraphQLApiExtractor
logging.basicConfig(level=logging.INFO)
def mock_query(*_args: Any, **_kwargs: Any) -> Dict[str, Any]:
return {
'workbooks': [
{
'name': 'Test Workbook',
'projectName': 'Test Project',
'upstreamTables': [
{
'name': 'test_table_1',
'schema': 'test_schema_1',
'database': {
'name': 'test_database_1',
'connectionType': 'redshift'
}
},
{
'name': 'test_table_2',
'schema': 'test_schema_2',
'database': {
'name': 'test_database_2',
'connectionType': 'redshift'
}
}
]
}
]
}
def mock_token(*_args: Any, **_kwargs: Any) -> str:
return '123-abc'
class TestTableauDashboardTable(unittest.TestCase):
@patch.object(TableauDashboardAuth, '_authenticate', mock_token)
@patch.object(TableauGraphQLApiExtractor, 'execute_query', mock_query)
def test_dashboard_table_extractor(self) -> None:
config = ConfigFactory.from_dict({
'extractor.tableau_dashboard_table.tableau_host': 'tableau_host',
'extractor.tableau_dashboard_table.api_version': 'tableau_api_version',
'extractor.tableau_dashboard_table.site_name': 'tableau_site_name',
'extractor.tableau_dashboard_table.tableau_personal_access_token_name':
'tableau_personal_access_token_name',
'extractor.tableau_dashboard_table.tableau_personal_access_token_secret':
'tableau_personal_access_token_secret',
'extractor.tableau_dashboard_table.excluded_projects': [],
'extractor.tableau_dashboard_table.cluster': 'tableau_dashboard_cluster',
'extractor.tableau_dashboard_table.database': 'tableau_dashboard_database',
'extractor.tableau_dashboard_table.transformer.timestamp_str_to_epoch.timestamp_format':
'%Y-%m-%dT%H:%M:%SZ',
})
extractor = TableauDashboardTableExtractor()
extractor.init(Scoped.get_scoped_conf(conf=config, scope=extractor.get_scope()))
record = extractor.extract()
self.assertEqual(record._dashboard_id, 'Test Workbook')
self.assertEqual(record._dashboard_group_id, 'Test Project')
self.assertEqual(record._product, 'tableau')
self.assertEqual(record._cluster, 'tableau_dashboard_cluster')
self.assertEqual(record._table_ids, [
'tableau_dashboard_database://tableau_dashboard_cluster.test_schema_1/test_table_1',
'tableau_dashboard_database://tableau_dashboard_cluster.test_schema_2/test_table_2'])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment