You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
from datetime import datetime, timedelta
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.hooks.postgres_hook import PostgresHook
|
|
import uuid
|
|
import logging
|
|
|
|
def get_last_processed_id():
|
|
"""Get the maximum source_id from target table"""
|
|
target_hook = PostgresHook(postgres_conn_id='target_postgres')
|
|
result = target_hook.get_first("""
|
|
SELECT MAX(source_id)
|
|
FROM oh_wo_master
|
|
""")
|
|
return result[0] if result[0] else 0
|
|
|
|
def incremental_load(**context):
|
|
"""Load incremental data by comparing IDs"""
|
|
source_hook = PostgresHook(postgres_conn_id='source_postgres')
|
|
target_hook = PostgresHook(postgres_conn_id='target_postgres')
|
|
|
|
# Get last processed ID
|
|
last_id = get_last_processed_id()
|
|
logging.info(f"Last processed ID: {last_id}")
|
|
|
|
# Fetch new records
|
|
records = source_hook.get_records("""
|
|
SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at
|
|
FROM dl_wo_staging
|
|
WHERE id > %s
|
|
ORDER BY id
|
|
LIMIT 1000
|
|
""", parameters=(last_id,))
|
|
|
|
if not records:
|
|
logging.info("No new records to process")
|
|
return
|
|
|
|
# Transform and load records
|
|
target_hook.run("""
|
|
CREATE TEMP TABLE temp_records (
|
|
id UUID,
|
|
source_id BIGSERIAL,
|
|
worktype VARCHAR(5),
|
|
workgroup VARCHAR(30),
|
|
total_cost_max FLOAT4,
|
|
created_at TIMESTAMP,
|
|
updated_at TIMESTAMP
|
|
) ON COMMIT DROP
|
|
""")
|
|
|
|
# Prepare batch insert
|
|
insert_values = []
|
|
for record in records:
|
|
old_id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at = record
|
|
new_uuid = uuid.uuid4()
|
|
insert_values.append(
|
|
f"('{new_uuid}', {old_id}, '{assetnum}', '{worktype}', '{workgroup}', '{total_cost_max}', '{created_at}', '{updated_at}')"
|
|
)
|
|
|
|
if insert_values:
|
|
# Bulk insert into temp table
|
|
target_hook.run(f"""
|
|
INSERT INTO temp_records (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at)
|
|
VALUES {','.join(insert_values)}
|
|
""")
|
|
|
|
# Upsert from temp table to target table
|
|
target_hook.run("""
|
|
INSERT INTO oh_wo_master (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at)
|
|
SELECT id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at
|
|
FROM temp_records
|
|
ON CONFLICT (source_id) DO UPDATE
|
|
SET worktype = EXCLUDED.worktype,
|
|
workgroup = EXCLUDED.workgroup,
|
|
total_cost_max = EXCLUDED.total_cost_max,
|
|
updated_at = EXCLUDED.updated_at
|
|
""")
|
|
|
|
processed_count = len(records)
|
|
logging.info(f"Processed {processed_count} records")
|
|
|
|
# Create DAG
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
dag = DAG(
|
|
'oh_wo_incremental_etl',
|
|
default_args=default_args,
|
|
description='Simple incremental ETL using target table as state',
|
|
schedule_interval='*/15 * * * *', # Run every 15 minutes
|
|
start_date=datetime(2024, 1, 1),
|
|
catchup=False
|
|
)
|
|
|
|
load_task = PythonOperator(
|
|
task_id='incremental_load',
|
|
python_callable=incremental_load,
|
|
dag=dag
|
|
)
|