You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

104 lines
3.3 KiB
Python

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
import uuid
import logging
def get_last_processed_id():
"""Get the maximum source_id from target table"""
target_hook = PostgresHook(postgres_conn_id='target_postgres')
result = target_hook.get_first("""
SELECT MAX(source_id)
FROM oh_wo_master
""")
return result[0] if result[0] else 0
def incremental_load(**context):
"""Load incremental data by comparing IDs"""
source_hook = PostgresHook(postgres_conn_id='source_postgres')
target_hook = PostgresHook(postgres_conn_id='target_postgres')
# Get last processed ID
last_id = get_last_processed_id()
logging.info(f"Last processed ID: {last_id}")
# Fetch new records
records = source_hook.get_records("""
SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at
FROM dl_wo_staging
WHERE id > %s
ORDER BY id
LIMIT 1000
""", parameters=(last_id,))
if not records:
logging.info("No new records to process")
return
# Transform and load records
target_hook.run("""
CREATE TEMP TABLE temp_records (
id UUID,
source_id BIGSERIAL,
worktype VARCHAR(5),
workgroup VARCHAR(30),
total_cost_max FLOAT4,
created_at TIMESTAMP,
updated_at TIMESTAMP
) ON COMMIT DROP
""")
# Prepare batch insert
insert_values = []
for record in records:
old_id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at = record
new_uuid = uuid.uuid4()
insert_values.append(
f"('{new_uuid}', {old_id}, '{assetnum}', '{worktype}', '{workgroup}', '{total_cost_max}', '{created_at}', '{updated_at}')"
)
if insert_values:
# Bulk insert into temp table
target_hook.run(f"""
INSERT INTO temp_records (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at)
VALUES {','.join(insert_values)}
""")
# Upsert from temp table to target table
target_hook.run("""
INSERT INTO oh_wo_master (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at)
SELECT id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at
FROM temp_records
ON CONFLICT (source_id) DO UPDATE
SET worktype = EXCLUDED.worktype,
workgroup = EXCLUDED.workgroup,
total_cost_max = EXCLUDED.total_cost_max,
updated_at = EXCLUDED.updated_at
""")
processed_count = len(records)
logging.info(f"Processed {processed_count} records")
# Create DAG
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'oh_wo_incremental_etl',
default_args=default_args,
description='Simple incremental ETL using target table as state',
schedule_interval='*/15 * * * *', # Run every 15 minutes
start_date=datetime(2024, 1, 1),
catchup=False
)
load_task = PythonOperator(
task_id='incremental_load',
python_callable=incremental_load,
dag=dag
)