add daily maximo pull

main
MrWaradana 9 months ago
parent 53a3030146
commit 8c5ab14c54

@ -0,0 +1,121 @@
from datetime import datetime, timedelta
import requests
import json
import uuid
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Configure logging
logger = logging.getLogger(__name__)
def fetch_maximo_data(**context):
"""
Fetch data from the Maximo endpoint using GET method and process the response
"""
# Generate request ID for tracking
request_id = str(uuid.uuid4())
# Connection configuration
base_url = "http://192.168.1.84:5002"
endpoint = "/fetch-maximo-daily"
fetch_url = f"{base_url}{endpoint}"
# Log before sending request
logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})")
# Request headers
headers = {
"Content-Type": "application/json",
"X-Request-ID": request_id,
}
try:
# Use requests library directly with increased timeout
response = requests.get(
url=fetch_url,
headers=headers,
timeout=60, # Increased timeout to 60 seconds
)
logger.info(
f"Request completed (Request ID: {request_id}), status: {response.status_code}"
)
if response.status_code == 200:
logger.info(f"Request successful (Request ID: {request_id})")
# Parse JSON response
response_data = response.json()
maximo_message = response_data.get("message", "No message provided")
logger.info(f"Maximo response message: {maximo_message}")
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}",
}
else:
logger.error(
f"Request failed (Request ID: {request_id}), status: {response.status_code}"
)
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}",
}
except requests.exceptions.Timeout:
logger.error(f"Request timed out (Request ID: {request_id})")
return {
"request_id": request_id,
"status_code": 504,
"timestamp": datetime.now().isoformat(),
"message": "Request timed out while fetching Maximo data",
}
except Exception as e:
logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}")
return {
"request_id": request_id,
"status_code": 500,
"timestamp": datetime.now().isoformat(),
"message": f"Exception in Fetch Daily Maximo: {str(e)}",
}
# Define default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
"fetch_daily_maximo_data",
default_args=default_args,
description="A DAG to fetch data from Maximo API endpoint on a schedule daily",
# Schedule to run daily at 21:00, 22:00, and 23:00
schedule_interval="0 21-23 * * *",
start_date=days_ago(1),
tags=["maximo", "api", "fetch"],
catchup=False,
)
# Define the task to fetch data from Maximo using PythonOperator
fetch_task = PythonOperator(
task_id="fetch_maximo_data",
python_callable=fetch_maximo_data,
provide_context=True,
dag=dag,
)
Loading…
Cancel
Save