Commit d302e673 authored by jornh's avatar jornh Committed by Tao Feng

Add Quicktart sample data to demo Airflow DAG (#94)

This is showing Airflow as the application because Amundsen already has
the Airflow icon but it could basically be any data transformation job
e.g SQL transforming from source dataset(s) to a resulting dataset
(Table).
parent e29e11f1
...@@ -12,6 +12,7 @@ venv/ ...@@ -12,6 +12,7 @@ venv/
venv3/ venv3/
.cache/ .cache/
.idea/ .idea/
.vscode/
.coverage .coverage
.mypy_cache .mypy_cache
.pytest_cache .pytest_cache
......
...@@ -98,7 +98,7 @@ class Application(Neo4jCsvSerializable): ...@@ -98,7 +98,7 @@ class Application(Neo4jCsvSerializable):
def create_relation(self): def create_relation(self):
# type: () -> List[Dict[str, Any]] # type: () -> List[Dict[str, Any]]
""" """
Create a list of relation map between watermark record with original hive table Create a list of relations between application and table nodes
:return: :return:
""" """
results = [{ results = [{
......
task_id,dag_id,exec_date,application_url_template
hive.test_schema.test_table1,event_test,"2018-05-31T00:00:00","https://airflow_host.net/admin/airflow/tree?dag_id={dag_id}"
...@@ -149,8 +149,31 @@ def load_user_data_from_csv(file_name): ...@@ -149,8 +149,31 @@ def load_user_data_from_csv(file_name):
conn.commit() conn.commit()
def load_application_data_from_csv(file_name):
conn = create_connection(DB_FILE)
if conn:
cur = conn.cursor()
cur.execute('drop table if exists test_application_metadata')
cur.execute('create table if not exists test_application_metadata '
'(task_id VARCHAR(64) NOT NULL , '
'dag_id VARCHAR(64) NOT NULL , '
'exec_date VARCHAR(64) NOT NULL, '
'application_url_template VARCHAR(128) NOT NULL)')
file_loc = 'example/sample_data/' + file_name
with open(file_loc, 'r') as fin:
dr = csv.DictReader(fin)
to_db = [(i['task_id'],
i['dag_id'],
i['exec_date'],
i['application_url_template']) for i in dr]
cur.executemany("INSERT INTO test_application_metadata (task_id, dag_id, "
"exec_date, application_url_template) VALUES (?, ?, ?, ?);", to_db)
conn.commit()
# todo: Add a second model # todo: Add a second model
def create_sample_job(table_name, model_name): def create_sample_job(table_name, model_name, transformer=NoopTransformer()):
sql = textwrap.dedent(""" sql = textwrap.dedent("""
select * from {table_name}; select * from {table_name};
""").format(table_name=table_name) """).format(table_name=table_name)
...@@ -164,7 +187,7 @@ def create_sample_job(table_name, model_name): ...@@ -164,7 +187,7 @@ def create_sample_job(table_name, model_name):
task = DefaultTask(extractor=sql_extractor, task = DefaultTask(extractor=sql_extractor,
loader=csv_loader, loader=csv_loader,
transformer=NoopTransformer()) transformer=transformer)
job_config = ConfigFactory.from_dict({ job_config = ConfigFactory.from_dict({
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): SQLITE_CONN_STRING, 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): SQLITE_CONN_STRING,
...@@ -279,9 +302,13 @@ def create_es_publisher_sample_job(): ...@@ -279,9 +302,13 @@ def create_es_publisher_sample_job():
if __name__ == "__main__": if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)
load_table_data_from_csv('sample_table.csv') load_table_data_from_csv('sample_table.csv')
load_col_data_from_csv('sample_col.csv') load_col_data_from_csv('sample_col.csv')
load_user_data_from_csv('sample_user.csv') load_user_data_from_csv('sample_user.csv')
load_application_data_from_csv('sample_application.csv')
if create_connection(DB_FILE): if create_connection(DB_FILE):
# start table job # start table job
job1 = create_sample_job('test_table_metadata', job1 = create_sample_job('test_table_metadata',
...@@ -298,6 +325,11 @@ if __name__ == "__main__": ...@@ -298,6 +325,11 @@ if __name__ == "__main__":
'databuilder.models.user.User') 'databuilder.models.user.User')
job_user.launch() job_user.launch()
# start application job
job_app = create_sample_job('test_application_metadata',
'databuilder.models.application.Application')
job_app.launch()
# start last updated job # start last updated job
job_lastupdated = create_last_updated_job() job_lastupdated = create_last_updated_job()
job_lastupdated.launch() job_lastupdated.launch()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment