diff --git a/dags/fetch_maximo_daily.py b/dags/fetch_maximo_daily.py index 6e42dec..0339edd 100644 --- a/dags/fetch_maximo_daily.py +++ b/dags/fetch_maximo_daily.py @@ -3,16 +3,51 @@ import requests import json import uuid import logging +import time from airflow import DAG -from airflow.operators.python import PythonOperator +from airflow.operators.python import PythonOperator, BranchPythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago +from airflow.models import Variable, DagRun +from airflow.utils.session import create_session +from airflow.utils.state import State # Configure logging logger = logging.getLogger(__name__) -def fetch_maximo_data(**context): +def is_dag_already_running(**context): + """ + Check if there are other active DAG runs to prevent duplicate execution + """ + dag_id = context["dag"].dag_id + run_id = context["run_id"] + + with create_session() as session: + # Count running instances of this DAG excluding the current run + running_dags = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + DagRun.run_id != run_id, + DagRun.state == State.RUNNING, + ) + .count() + ) + + if running_dags > 0: + logger.info( + f"Found {running_dags} other running instance(s) of this DAG. Skipping execution." + ) + return "skip_execution" + else: + logger.info("No other running instances found. Proceeding with execution.") + return "fetch_daily_maximo_data" + + +def fetch_daily_maximo_data(**context): """ Fetch data from the Maximo endpoint using GET method and process the response """ @@ -38,7 +73,7 @@ def fetch_maximo_data(**context): response = requests.get( url=fetch_url, headers=headers, - timeout=420, # Increased timeout to 7 minutes (420 seconds) + timeout=60, # Increased timeout to 60 seconds ) logger.info( @@ -90,6 +125,24 @@ def fetch_maximo_data(**context): } +def process_response(**context): + """ + Process the response from the Maximo API + """ + ti = context["ti"] + result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result") + + if result: + logger.info(f"Processing result: {result}") + # Add any additional processing logic here + + # Sleep for a short time to simulate processing + time.sleep(2) + + return True + return False + + # Define default arguments for the DAG default_args = { "owner": "airflow", @@ -108,14 +161,40 @@ dag = DAG( # Schedule to run daily at 21:00, 22:00, and 23:00 schedule_interval="0 21-23 * * *", start_date=days_ago(1), - tags=["maximo", "api", "fetch"], + tags=["maximo", "api", "fetch", "continuous", "daily"], catchup=False, ) +# Branch operator to check if we should proceed or skip +check_running = BranchPythonOperator( + task_id="check_if_already_running", + python_callable=is_dag_already_running, + provide_context=True, + dag=dag, +) + +# Skip execution dummy task +skip_execution = DummyOperator( + task_id="skip_execution", + dag=dag, +) + # Define the task to fetch data from Maximo using PythonOperator fetch_task = PythonOperator( - task_id="fetch_maximo_data", - python_callable=fetch_maximo_data, + task_id="fetch_daily_maximo_data", + python_callable=fetch_daily_maximo_data, provide_context=True, dag=dag, ) + +# Define task to process the response +process_task = PythonOperator( + task_id="process_response", + python_callable=process_response, + provide_context=True, + dag=dag, +) + +# Set task dependencies +check_running >> [skip_execution, fetch_task] +fetch_task >> process_task