You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.6 KiB
Python

from datetime import datetime, timedelta
import requests
import json
import uuid
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Configure logging
logger = logging.getLogger(__name__)
def fetch_maximo_data(**context):
"""
Fetch data from the Maximo endpoint using GET method and process the response
"""
# Generate request ID for tracking
request_id = str(uuid.uuid4())
# Connection configuration
base_url = "http://10.47.0.65"
endpoint = "/fetch-maximo-daily"
fetch_url = f"{base_url}{endpoint}"
# Log before sending request
logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})")
# Request headers
headers = {
"Content-Type": "application/json",
"X-Request-ID": request_id,
}
try:
# Use requests library directly with increased timeout
response = requests.get(
url=fetch_url,
headers=headers,
timeout=60, # Increased timeout to 60 seconds
)
logger.info(
f"Request completed (Request ID: {request_id}), status: {response.status_code}"
)
if response.status_code == 200:
logger.info(f"Request successful (Request ID: {request_id})")
# Parse JSON response
response_data = response.json()
maximo_message = response_data.get("message", "No message provided")
logger.info(f"Maximo response message: {maximo_message}")
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}",
}
else:
logger.error(
f"Request failed (Request ID: {request_id}), status: {response.status_code}"
)
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}",
}
except requests.exceptions.Timeout:
logger.error(f"Request timed out (Request ID: {request_id})")
return {
"request_id": request_id,
"status_code": 504,
"timestamp": datetime.now().isoformat(),
"message": "Request timed out while fetching Maximo data",
}
except Exception as e:
logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}")
return {
"request_id": request_id,
"status_code": 500,
"timestamp": datetime.now().isoformat(),
"message": f"Exception in Fetch Daily Maximo: {str(e)}",
}
# Define default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
"fetch_daily_maximo_data",
default_args=default_args,
description="A DAG to fetch data from Maximo API endpoint on a schedule daily",
# Schedule to run daily at 21:00, 22:00, and 23:00
schedule_interval="0 21-23 * * *",
start_date=days_ago(1),
tags=["maximo", "api", "fetch"],
catchup=False,
)
# Define the task to fetch data from Maximo using PythonOperator
fetch_task = PythonOperator(
task_id="fetch_maximo_data",
python_callable=fetch_maximo_data,
provide_context=True,
dag=dag,
)