Unverified Commit 0339dd55 authored by Alagappan's avatar Alagappan Committed by GitHub

chore: Sample dags minor cleanup (#344)

Signed-off-by: 's avatarTao Feng <fengtao04@gmail.com>
parent d9f7d9dd
......@@ -199,3 +199,7 @@ with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
'watermark_type': '"low_watermark"',
'part_regex': '{}'.format('{{ ds }}')}
)
# Schedule high and low watermark task after metadata task
amundsen_databuilder_table_metadata_job >> amundsen_hwm_job
amundsen_databuilder_table_metadata_job >> amundsen_lwm_job
......@@ -75,7 +75,7 @@ def connection_string():
return "postgresql://%s:%s@%s:%s/%s" % (user, password, host, port, db)
def create_table_extract_job(**kwargs):
def create_table_extract_job():
where_clause_suffix = textwrap.dedent("""
where table_schema in {schemas}
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)
......@@ -161,15 +161,15 @@ def create_es_publisher_sample_job():
with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
create_table_extract_job = PythonOperator(
task_id='create_table_extract_job',
postgres_table_extract_job = PythonOperator(
task_id='postgres_table_extract_job',
python_callable=create_table_extract_job
)
create_es_index_job = PythonOperator(
task_id='create_es_publisher_sample_job',
postgres_es_index_job = PythonOperator(
task_id='postgres_es_publisher_sample_job',
python_callable=create_es_publisher_sample_job
)
# elastic search update run after table metadata has been updated
create_table_extract_job >> create_es_index_job
postgres_table_extract_job >> postgres_es_index_job
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment