from datetime import datetime, timedelta import requests import json import uuid import logging import time from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago from airflow.models import Variable, DagRun from airflow.utils.session import create_session from airflow.utils.state import State # Configure logging logger = logging.getLogger(__name__) def is_dag_already_running(**context): """ Check if there are other active DAG runs to prevent duplicate execution """ dag_id = context["dag"].dag_id run_id = context["run_id"] with create_session() as session: # Count running instances of this DAG excluding the current run running_dags = ( session.query(DagRun) .filter( DagRun.dag_id == dag_id, DagRun.run_id != run_id, DagRun.state == State.RUNNING, ) .count() ) if running_dags > 0: logger.info( f"Found {running_dags} other running instance(s) of this DAG. Skipping execution." ) return "skip_execution" else: logger.info("No other running instances found. Proceeding with execution.") return "fetch_daily_maximo_data" def fetch_daily_maximo_data(**context): """ Fetch data from the Maximo endpoint using GET method and process the response """ # Generate request ID for tracking request_id = str(uuid.uuid4()) # Connection configuration base_url = "http://10.47.0.65/envelope" endpoint = "/fetch-maximo-daily" fetch_url = f"{base_url}{endpoint}" # Log before sending request logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})") # Request headers headers = { "Content-Type": "application/json", "X-Request-ID": request_id, } try: # Use requests library directly with increased timeout response = requests.get( url=fetch_url, headers=headers, timeout=60, # Increased timeout to 60 seconds ) logger.info( f"Request completed (Request ID: {request_id}), status: {response.status_code}" ) if response.status_code == 200: logger.info(f"Request successful (Request ID: {request_id})") # Parse JSON response response_data = response.json() maximo_message = response_data.get("message", "No message provided") logger.info(f"Maximo response message: {maximo_message}") return { "request_id": request_id, "status_code": response.status_code, "timestamp": datetime.now().isoformat(), "message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}", } else: logger.error( f"Request failed (Request ID: {request_id}), status: {response.status_code}" ) return { "request_id": request_id, "status_code": response.status_code, "timestamp": datetime.now().isoformat(), "message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}", } except requests.exceptions.Timeout: logger.error(f"Request timed out (Request ID: {request_id})") return { "request_id": request_id, "status_code": 504, "timestamp": datetime.now().isoformat(), "message": "Request timed out while fetching Maximo data", } except Exception as e: logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}") return { "request_id": request_id, "status_code": 500, "timestamp": datetime.now().isoformat(), "message": f"Exception in Fetch Daily Maximo: {str(e)}", } def process_response(**context): """ Process the response from the Maximo API """ ti = context["ti"] result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result") if result: logger.info(f"Processing result: {result}") # Add any additional processing logic here # Sleep for a short time to simulate processing time.sleep(2) return True return False # Define default arguments for the DAG default_args = { "owner": "airflow", "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, "retries": 3, "retry_delay": timedelta(minutes=5), } # Define the DAG dag = DAG( "fetch_daily_maximo_data", default_args=default_args, description="A DAG to fetch data from Maximo API endpoint on a schedule daily", # Schedule to run daily at 21:00, 22:00, and 23:00 schedule_interval="0 21-23 * * *", start_date=days_ago(1), tags=["maximo", "api", "fetch", "continuous", "daily"], catchup=False, ) # Branch operator to check if we should proceed or skip check_running = BranchPythonOperator( task_id="check_if_already_running", python_callable=is_dag_already_running, provide_context=True, dag=dag, ) # Skip execution dummy task skip_execution = DummyOperator( task_id="skip_execution", dag=dag, ) # Define the task to fetch data from Maximo using PythonOperator fetch_task = PythonOperator( task_id="fetch_daily_maximo_data", python_callable=fetch_daily_maximo_data, provide_context=True, dag=dag, ) # Define task to process the response process_task = PythonOperator( task_id="process_response", python_callable=process_response, provide_context=True, dag=dag, ) # Set task dependencies check_running >> [skip_execution, fetch_task] fetch_task >> process_task