diff --git a/dags/fetch_maximo_daily.py b/dags/fetch_maximo_daily.py new file mode 100644 index 0000000..5264efa --- /dev/null +++ b/dags/fetch_maximo_daily.py @@ -0,0 +1,121 @@ +from datetime import datetime, timedelta +import requests +import json +import uuid +import logging + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago + +# Configure logging +logger = logging.getLogger(__name__) + + +def fetch_maximo_data(**context): + """ + Fetch data from the Maximo endpoint using GET method and process the response + """ + # Generate request ID for tracking + request_id = str(uuid.uuid4()) + + # Connection configuration + base_url = "http://192.168.1.84:5002" + endpoint = "/fetch-maximo-daily" + fetch_url = f"{base_url}{endpoint}" + + # Log before sending request + logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})") + + # Request headers + headers = { + "Content-Type": "application/json", + "X-Request-ID": request_id, + } + + try: + # Use requests library directly with increased timeout + response = requests.get( + url=fetch_url, + headers=headers, + timeout=60, # Increased timeout to 60 seconds + ) + + logger.info( + f"Request completed (Request ID: {request_id}), status: {response.status_code}" + ) + + if response.status_code == 200: + logger.info(f"Request successful (Request ID: {request_id})") + + # Parse JSON response + response_data = response.json() + maximo_message = response_data.get("message", "No message provided") + + logger.info(f"Maximo response message: {maximo_message}") + + return { + "request_id": request_id, + "status_code": response.status_code, + "timestamp": datetime.now().isoformat(), + "message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}", + } + else: + logger.error( + f"Request failed (Request ID: {request_id}), status: {response.status_code}" + ) + return { + "request_id": request_id, + "status_code": response.status_code, + "timestamp": datetime.now().isoformat(), + "message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}", + } + + except requests.exceptions.Timeout: + logger.error(f"Request timed out (Request ID: {request_id})") + return { + "request_id": request_id, + "status_code": 504, + "timestamp": datetime.now().isoformat(), + "message": "Request timed out while fetching Maximo data", + } + + except Exception as e: + logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}") + return { + "request_id": request_id, + "status_code": 500, + "timestamp": datetime.now().isoformat(), + "message": f"Exception in Fetch Daily Maximo: {str(e)}", + } + + +# Define default arguments for the DAG +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": True, + "email_on_retry": False, + "retries": 3, + "retry_delay": timedelta(minutes=5), +} + +# Define the DAG +dag = DAG( + "fetch_daily_maximo_data", + default_args=default_args, + description="A DAG to fetch data from Maximo API endpoint on a schedule daily", + # Schedule to run daily at 21:00, 22:00, and 23:00 + schedule_interval="0 21-23 * * *", + start_date=days_ago(1), + tags=["maximo", "api", "fetch"], + catchup=False, +) + +# Define the task to fetch data from Maximo using PythonOperator +fetch_task = PythonOperator( + task_id="fetch_maximo_data", + python_callable=fetch_maximo_data, + provide_context=True, + dag=dag, +)