Unverified Commit 80162a28 authored by Pablo Torre's avatar Pablo Torre Committed by GitHub

docs: MS SQL Server example (#278)

* Change result header to small caps.
Large CAPS were causing an error.

* script to load sample data to MSSQL,
 based on PostgreSQL sample .

* change line ends to LF

* unit tests

* remove flag unnecessary change.

* whitespace tabs and readability

* flake8
parent e7936c78
...@@ -24,28 +24,34 @@ class MSSQLMetadataExtractor(Extractor): ...@@ -24,28 +24,34 @@ class MSSQLMetadataExtractor(Extractor):
# SELECT statement from MS SQL to extract table and column metadata # SELECT statement from MS SQL to extract table and column metadata
SQL_STATEMENT = """ SQL_STATEMENT = """
SELECT DISTINCT {cluster_source} AS CLUSTER, SELECT DISTINCT
TBL.TABLE_SCHEMA AS [SCHEMA_NAME], {cluster_source} AS cluster,
TBL.TABLE_NAME AS [NAME], TBL.TABLE_SCHEMA AS [schema_name],
CAST(PROP.VALUE AS NVARCHAR(MAX)) AS [DESCRIPTION], TBL.TABLE_NAME AS [name],
COL.COLUMN_NAME AS [COL_NAME], CAST(PROP.VALUE AS NVARCHAR(MAX)) AS [description],
COL.DATA_TYPE AS [COL_TYPE], COL.COLUMN_NAME AS [col_name],
CAST(PROP_COL.VALUE AS NVARCHAR(MAX)) AS [COL_DESCRIPTION], COL.DATA_TYPE AS [col_type],
COL.ORDINAL_POSITION AS COL_SORT_ORDER CAST(PROP_COL.VALUE AS NVARCHAR(MAX)) AS [col_description],
FROM INFORMATION_SCHEMA.TABLES TBL COL.ORDINAL_POSITION AS col_sort_order
INNER JOIN INFORMATION_SCHEMA.COLUMNS COL ON COL.TABLE_NAME = TBL.TABLE_NAME FROM INFORMATION_SCHEMA.TABLES TBL
AND COL.TABLE_SCHEMA = TBL.TABLE_SCHEMA INNER JOIN INFORMATION_SCHEMA.COLUMNS COL
LEFT JOIN SYS.EXTENDED_PROPERTIES PROP ON PROP.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME) ON (COL.TABLE_NAME = TBL.TABLE_NAME
AND PROP.MINOR_ID = 0 AND COL.TABLE_SCHEMA = TBL.TABLE_SCHEMA )
AND PROP.NAME = 'MS_Description' LEFT JOIN SYS.EXTENDED_PROPERTIES PROP
LEFT JOIN SYS.EXTENDED_PROPERTIES PROP_COL ON PROP_COL.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME) ON (PROP.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME)
AND PROP_COL.MINOR_ID = COL.ORDINAL_POSITION AND PROP.MINOR_ID = 0
AND PROP_COL.NAME = 'MS_Description' AND PROP.NAME = 'MS_Description')
WHERE TBL.TABLE_TYPE = 'base table' {where_clause_suffix} LEFT JOIN SYS.EXTENDED_PROPERTIES PROP_COL
ORDER BY CLUSTER, ON (PROP_COL.MAJOR_ID = OBJECT_ID(TBL.TABLE_SCHEMA + '.' + TBL.TABLE_NAME)
SCHEMA_NAME, AND PROP_COL.MINOR_ID = COL.ORDINAL_POSITION
NAME, AND PROP_COL.NAME = 'MS_Description')
COL_SORT_ORDER; WHERE TBL.TABLE_TYPE = 'base table' {where_clause_suffix}
ORDER BY
CLUSTER,
SCHEMA_NAME,
NAME,
COL_SORT_ORDER
;
""" """
# CONFIG KEYS # CONFIG KEYS
...@@ -57,8 +63,10 @@ ORDER BY CLUSTER, ...@@ -57,8 +63,10 @@ ORDER BY CLUSTER,
# Default values # Default values
DEFAULT_CLUSTER_NAME = 'DB_NAME()' DEFAULT_CLUSTER_NAME = 'DB_NAME()'
DEFAULT_CONFIG = ConfigFactory.from_dict( DEFAULT_CONFIG = ConfigFactory.from_dict({
{WHERE_CLAUSE_SUFFIX_KEY: '', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True} WHERE_CLAUSE_SUFFIX_KEY: '',
CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
USE_CATALOG_AS_CLUSTER_NAME: True}
) )
DEFAULT_WHERE_CLAUSE_VALUE = 'and tbl.table_schema in {schemas}' DEFAULT_WHERE_CLAUSE_VALUE = 'and tbl.table_schema in {schemas}'
...@@ -67,25 +75,31 @@ ORDER BY CLUSTER, ...@@ -67,25 +75,31 @@ ORDER BY CLUSTER,
# type: (ConfigTree) -> None # type: (ConfigTree) -> None
conf = conf.with_fallback(MSSQLMetadataExtractor.DEFAULT_CONFIG) conf = conf.with_fallback(MSSQLMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(MSSQLMetadataExtractor.CLUSTER_KEY)) self._cluster = '{}'.format(
conf.get_string(MSSQLMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): if conf.get_bool(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "DB_NAME()" cluster_source = "DB_NAME()"
else: else:
cluster_source = "'{}'".format(self._cluster) cluster_source = "'{}'".format(self._cluster)
database = conf.get_string(MSSQLMetadataExtractor.DATABASE_KEY, default='mssql') database = conf.get_string(
MSSQLMetadataExtractor.DATABASE_KEY,
default='mssql')
if six.PY2 and isinstance(database, six.text_type): if six.PY2 and isinstance(database, six.text_type):
database = database.encode('utf-8', 'ignore') database = database.encode('utf-8', 'ignore')
self._database = database self._database = database
config_where_clause = conf.get_string(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY) config_where_clause = conf.get_string(
MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY)
logging.info("Crawling for Schemas %s", config_where_clause) logging.info("Crawling for Schemas %s", config_where_clause)
if len(config_where_clause) > 0: if len(config_where_clause) > 0:
where_clause_suffix = MSSQLMetadataExtractor.DEFAULT_WHERE_CLAUSE_VALUE.format(schemas=config_where_clause) where_clause_suffix = MSSQLMetadataExtractor\
.DEFAULT_WHERE_CLAUSE_VALUE\
.format(schemas=config_where_clause)
else: else:
where_clause_suffix = '' where_clause_suffix = ''
...@@ -97,8 +111,12 @@ ORDER BY CLUSTER, ...@@ -97,8 +111,12 @@ ORDER BY CLUSTER,
LOGGER.info('SQL for MS SQL Metadata: {}'.format(self.sql_stmt)) LOGGER.info('SQL for MS SQL Metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor = SQLAlchemyExtractor() self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \ sql_alch_conf = Scoped\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) .get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \
.with_fallback(
ConfigFactory.from_dict({
SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})
)
self._alchemy_extractor.init(sql_alch_conf) self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator] self._extract_iter = None # type: Union[None, Iterator]
...@@ -119,7 +137,8 @@ ORDER BY CLUSTER, ...@@ -119,7 +137,8 @@ ORDER BY CLUSTER,
def _get_extract_iter(self): def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata] # type: () -> Iterator[TableMetadata]
""" """
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return: :return:
""" """
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
...@@ -127,14 +146,21 @@ ORDER BY CLUSTER, ...@@ -127,14 +146,21 @@ ORDER BY CLUSTER,
for row in group: for row in group:
last_row = row last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'], columns.append(
row['col_type'], row['col_sort_order'])) ColumnMetadata(
row['col_name'],
yield TableMetadata(self._database, last_row['cluster'], row['col_description'],
last_row['schema_name'], row['col_type'],
last_row['name'], row['col_sort_order']))
last_row['description'],
columns, tags=last_row['schema_name']) yield TableMetadata(
self._database,
last_row['cluster'],
last_row['schema_name'],
last_row['name'],
last_row['description'],
columns,
tags=last_row['schema_name'])
def _get_raw_extract_iter(self): def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]] # type: () -> Iterator[Dict[str, Any]]
...@@ -155,6 +181,8 @@ ORDER BY CLUSTER, ...@@ -155,6 +181,8 @@ ORDER BY CLUSTER,
:return: :return:
""" """
if row: if row:
return TableKey(schema_name=row['schema_name'], table_name=row['name']) return TableKey(
schema_name=row['schema_name'],
table_name=row['name'])
return None return None
"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch from MS SQL Server
without using an Airflow DAG.
"""
import sys
import textwrap
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.mssql_metadata_extractor import MSSQLMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])
DB_FILE = '/tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = 'bolt://{}:7687'.format(neo_host if neo_host else 'localhost')
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
# todo: connection string needs to change
def connection_string(windows_auth=False):
"""Generages an MSSQL connection string.
Keyword Arguments:
windows_auth {bool} -- set to true if connecting to DB using windows
credentials. (default: {False})
Returns:
[str] -- [connection string]
"""
if windows_auth:
base_string = (
"mssql+pyodbc://@{host}/{db}" +
"?driver=ODBC+Driver+17+for+SQL+Server" +
"?trusted_connection=yes" +
"&autocommit=true" # comment to disable autocommit.
)
params = {
"host": "localhost",
"db": "master"
}
else:
base_string = (
"mssql+pyodbc://{user}:{pword}@{host}/{db}" +
"?driver=ODBC+Driver+17+for+SQL+Server" +
"&autocommit=true" # comment to disable autocommit.
)
params = {
"user": "username",
"pword": "password",
"host": "localhost",
"db": "master"
}
return base_string.format(**params)
def run_mssql_job():
where_clause_suffix = textwrap.dedent("""
('dbo')
""")
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(
tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
# MSSQL Loader
'extractor.mssql_metadata.{}'.format(
MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.mssql_metadata.{}'.format(
MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.mssql_metadata.extractor.sqlalchemy.{}'.format(
SQLAlchemyExtractor.CONN_STRING): connection_string(),
# NEO4J Loader
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=MSSQLMetadataExtractor(),
loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)
loading_job = run_mssql_job()
loading_job.launch()
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment