from datetime import datetime, timedelta import requests import json import uuid import logging from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago # Configure logging logger = logging.getLogger(__name__) def fetch_maximo_data(**context): """ Fetch data from the Maximo endpoint using GET method and process the response """ # Generate request ID for tracking request_id = str(uuid.uuid4()) # Connection configuration base_url = "http://10.47.0.65/envelope" endpoint = "/fetch-maximo-daily" fetch_url = f"{base_url}{endpoint}" # Log before sending request logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})") # Request headers headers = { "Content-Type": "application/json", "X-Request-ID": request_id, } try: # Use requests library directly with increased timeout response = requests.get( url=fetch_url, headers=headers, timeout=timedelta(minutes=7), # Increased timeout to 60 seconds ) logger.info( f"Request completed (Request ID: {request_id}), status: {response.status_code}" ) if response.status_code == 200: logger.info(f"Request successful (Request ID: {request_id})") # Parse JSON response response_data = response.json() maximo_message = response_data.get("message", "No message provided") logger.info(f"Maximo response message: {maximo_message}") return { "request_id": request_id, "status_code": response.status_code, "timestamp": datetime.now().isoformat(), "message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}", } else: logger.error( f"Request failed (Request ID: {request_id}), status: {response.status_code}" ) return { "request_id": request_id, "status_code": response.status_code, "timestamp": datetime.now().isoformat(), "message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}", } except requests.exceptions.Timeout: logger.error(f"Request timed out (Request ID: {request_id})") return { "request_id": request_id, "status_code": 504, "timestamp": datetime.now().isoformat(), "message": "Request timed out while fetching Maximo data", } except Exception as e: logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}") return { "request_id": request_id, "status_code": 500, "timestamp": datetime.now().isoformat(), "message": f"Exception in Fetch Daily Maximo: {str(e)}", } # Define default arguments for the DAG default_args = { "owner": "airflow", "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, "retries": 3, "retry_delay": timedelta(minutes=5), } # Define the DAG dag = DAG( "fetch_daily_maximo_data", default_args=default_args, description="A DAG to fetch data from Maximo API endpoint on a schedule daily", # Schedule to run daily at 21:00, 22:00, and 23:00 schedule_interval="0 21-23 * * *", start_date=days_ago(1), tags=["maximo", "api", "fetch"], catchup=False, ) # Define the task to fetch data from Maximo using PythonOperator fetch_task = PythonOperator( task_id="fetch_maximo_data", python_callable=fetch_maximo_data, provide_context=True, dag=dag, )