from datetime import datetime, timedelta import requests import json import uuid import logging import time from concurrent.futures import ThreadPoolExecutor from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago from airflow.models import Variable, DagRun from airflow.utils.session import create_session from airflow.utils.state import State # Configure logging logger = logging.getLogger(__name__) def is_dag_already_running(**context): """ Check if there are other active DAG runs to prevent duplicate execution """ dag_id = context["dag"].dag_id run_id = context["run_id"] with create_session() as session: # Count running instances of this DAG excluding the current run running_dags = ( session.query(DagRun) .filter( DagRun.dag_id == dag_id, DagRun.run_id != run_id, DagRun.state == State.RUNNING, ) .count() ) if running_dags > 0: logger.info( f"Found {running_dags} other running instance(s) of this DAG. Skipping execution." ) return "skip_execution" else: logger.info("No other running instances found. Proceeding with execution.") return "fetch_daily_maximo_data" def fetch_daily_maximo_data(**context): """ Fetch data from the Maximo endpoint using GET method with async pattern """ # Generate request ID for tracking request_id = str(uuid.uuid4()) # Connection configuration base_url = "http://10.47.0.65/envelope" endpoint = "/fetch-maximo-daily" fetch_url = f"{base_url}{endpoint}" # Log before sending request logger.info(f"Sending async GET request to {fetch_url} (Request ID: {request_id})") # Request headers headers = { "Content-Type": "application/json", "X-Request-ID": request_id, } # Create a response callback function def response_callback(future): try: # Get the response from the future, with a short timeout just to confirm # the request was properly initiated response = future.result(timeout=10) logger.info( f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}" ) # We don't wait for the full response processing, as it may take longer than Airflow's task timeout except requests.exceptions.Timeout: logger.error(f"Request connection timed out (Request ID: {request_id})") except Exception as e: logger.error( f"Error initiating request (Request ID: {request_id}): {str(e)}" ) # Using ThreadPoolExecutor for async operation with ThreadPoolExecutor(max_workers=1) as executor: # Submit request to executor with a longer timeout for the overall operation future = executor.submit( requests.get, url=fetch_url, headers=headers, timeout=600, # Increased timeout to 10 minutes for the actual API call ) # Add callback that will execute when future completes future.add_done_callback(response_callback) # Don't wait for future to complete, let it run in background logger.info(f"Async request has been dispatched (Request ID: {request_id})") # Push the request details to XCom for tracking result_dict = { "request_id": request_id, "status": "initiated", "timestamp": datetime.now().isoformat(), "message": "Fetch Daily Maximo request initiated asynchronously", } ti = context["ti"] ti.xcom_push(key="fetch_result", value=result_dict) return result_dict def process_response(**context): """ Process the response from the Maximo API In async mode, this simply acknowledges the request was made """ ti = context["ti"] result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result") if result: logger.info(f"Processing async request result: {result}") # Since we're using fire-and-forget pattern, we just acknowledge the request was made # In production, you might want to implement a separate DAG or task # to check the status of the asynchronous job later return True return False # Define default arguments for the DAG default_args = { "owner": "airflow", "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, "retries": 3, "retry_delay": timedelta(minutes=5), } # Define the DAG dag = DAG( "fetch_daily_maximo_data", default_args=default_args, description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule", # Schedule to run daily at 21:00 GMT schedule_interval="0 14 * * *", # UTC (14:00 WIB) start_date=days_ago(1), tags=["maximo", "api", "fetch", "continuous", "daily", "async"], catchup=False, ) # Branch operator to check if we should proceed or skip check_running = BranchPythonOperator( task_id="check_if_already_running", python_callable=is_dag_already_running, provide_context=True, dag=dag, ) # Skip execution dummy task skip_execution = DummyOperator( task_id="skip_execution", dag=dag, ) # Define the task to fetch data from Maximo using PythonOperator fetch_task = PythonOperator( task_id="fetch_daily_maximo_data", python_callable=fetch_daily_maximo_data, provide_context=True, dag=dag, ) # Define task to process the response process_task = PythonOperator( task_id="process_response", python_callable=process_response, provide_context=True, dag=dag, ) # Set task dependencies check_running >> [skip_execution, fetch_task] fetch_task >> process_task