from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.hooks.postgres_hook import PostgresHook import uuid import logging def get_last_processed_id(): """Get the maximum source_id from target table""" target_hook = PostgresHook(postgres_conn_id='target_postgres') result = target_hook.get_first(""" SELECT MAX(source_id) FROM oh_wo_master """) return result[0] if result[0] else 0 def incremental_load(**context): """Load incremental data by comparing IDs""" source_hook = PostgresHook(postgres_conn_id='source_postgres') target_hook = PostgresHook(postgres_conn_id='target_postgres') # Get last processed ID last_id = get_last_processed_id() logging.info(f"Last processed ID: {last_id}") # Fetch new records records = source_hook.get_records(""" SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at FROM dl_wo_staging WHERE id > %s ORDER BY id LIMIT 1000 """, parameters=(last_id,)) if not records: logging.info("No new records to process") return # Transform and load records target_hook.run(""" CREATE TEMP TABLE temp_records ( id UUID, source_id BIGSERIAL, worktype VARCHAR(5), workgroup VARCHAR(30), total_cost_max FLOAT4, created_at TIMESTAMP, updated_at TIMESTAMP ) ON COMMIT DROP """) # Prepare batch insert insert_values = [] for record in records: old_id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at = record new_uuid = uuid.uuid4() insert_values.append( f"('{new_uuid}', {old_id}, '{assetnum}', '{worktype}', '{workgroup}', '{total_cost_max}', '{created_at}', '{updated_at}')" ) if insert_values: # Bulk insert into temp table target_hook.run(f""" INSERT INTO temp_records (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) VALUES {','.join(insert_values)} """) # Upsert from temp table to target table target_hook.run(""" INSERT INTO oh_wo_master (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) SELECT id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at FROM temp_records ON CONFLICT (source_id) DO UPDATE SET worktype = EXCLUDED.worktype, workgroup = EXCLUDED.workgroup, total_cost_max = EXCLUDED.total_cost_max, updated_at = EXCLUDED.updated_at """) processed_count = len(records) logging.info(f"Processed {processed_count} records") # Create DAG default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'oh_wo_incremental_etl', default_args=default_args, description='Simple incremental ETL using target table as state', schedule_interval='*/15 * * * *', # Run every 15 minutes start_date=datetime(2024, 1, 1), catchup=False ) load_task = PythonOperator( task_id='incremental_load', python_callable=incremental_load, dag=dag )