prevent timeout fetch daily maximo

main
MrWaradana 9 months ago
parent a7dbfede7d
commit 5d9a440715

@ -4,10 +4,10 @@ import json
import uuid
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable, DagRun
@ -49,7 +49,7 @@ def is_dag_already_running(**context):
def fetch_daily_maximo_data(**context):
"""
Fetch data from the Maximo endpoint using GET method and process the response
Fetch data from the Maximo endpoint using GET method with async pattern
"""
# Generate request ID for tracking
request_id = str(uuid.uuid4())
@ -60,7 +60,7 @@ def fetch_daily_maximo_data(**context):
fetch_url = f"{base_url}{endpoint}"
# Log before sending request
logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})")
logger.info(f"Sending async GET request to {fetch_url} (Request ID: {request_id})")
# Request headers
headers = {
@ -68,76 +68,67 @@ def fetch_daily_maximo_data(**context):
"X-Request-ID": request_id,
}
try:
# Use requests library directly with increased timeout
response = requests.get(
# Create a response callback function
def response_callback(future):
try:
# Get the response from the future, with a short timeout just to confirm
# the request was properly initiated
response = future.result(timeout=10)
logger.info(
f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}"
)
# We don't wait for the full response processing, as it may take longer than Airflow's task timeout
except requests.exceptions.Timeout:
logger.error(f"Request connection timed out (Request ID: {request_id})")
except Exception as e:
logger.error(f"Error initiating request (Request ID: {request_id}): {str(e)}")
# Using ThreadPoolExecutor for async operation
with ThreadPoolExecutor(max_workers=1) as executor:
# Submit request to executor with a longer timeout for the overall operation
future = executor.submit(
requests.get,
url=fetch_url,
headers=headers,
timeout=60, # Increased timeout to 60 seconds
)
logger.info(
f"Request completed (Request ID: {request_id}), status: {response.status_code}"
timeout=600, # Increased timeout to 10 minutes for the actual API call
)
if response.status_code == 200:
logger.info(f"Request successful (Request ID: {request_id})")
# Parse JSON response
response_data = response.json()
maximo_message = response_data.get("message", "No message provided")
logger.info(f"Maximo response message: {maximo_message}")
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}",
}
else:
logger.error(
f"Request failed (Request ID: {request_id}), status: {response.status_code}"
)
return {
"request_id": request_id,
"status_code": response.status_code,
"timestamp": datetime.now().isoformat(),
"message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}",
}
except requests.exceptions.Timeout:
logger.error(f"Request timed out (Request ID: {request_id})")
return {
"request_id": request_id,
"status_code": 504,
"timestamp": datetime.now().isoformat(),
"message": "Request timed out while fetching Maximo data",
}
except Exception as e:
logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}")
return {
"request_id": request_id,
"status_code": 500,
"timestamp": datetime.now().isoformat(),
"message": f"Exception in Fetch Daily Maximo: {str(e)}",
}
# Add callback that will execute when future completes
future.add_done_callback(response_callback)
# Don't wait for future to complete, let it run in background
logger.info(f"Async request has been dispatched (Request ID: {request_id})")
# Push the request details to XCom for tracking
result_dict = {
"request_id": request_id,
"status": "initiated",
"timestamp": datetime.now().isoformat(),
"message": "Fetch Daily Maximo request initiated asynchronously",
}
ti = context["ti"]
ti.xcom_push(key="fetch_result", value=result_dict)
return result_dict
def process_response(**context):
"""
Process the response from the Maximo API
In async mode, this simply acknowledges the request was made
"""
ti = context["ti"]
result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result")
if result:
logger.info(f"Processing result: {result}")
# Add any additional processing logic here
# Sleep for a short time to simulate processing
time.sleep(2)
logger.info(f"Processing async request result: {result}")
# Since we're using fire-and-forget pattern, we just acknowledge the request was made
# In production, you might want to implement a separate DAG or task
# to check the status of the asynchronous job later
return True
return False
@ -155,13 +146,13 @@ default_args = {
# Define the DAG
dag = DAG(
"fetch_daily_maximo_data",
"fetch_daily_maximo_data_async",
default_args=default_args,
description="A DAG to fetch data from Maximo API endpoint on a schedule daily",
description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule",
# Schedule to run daily at 21:00, 22:00, and 23:00
schedule_interval="0 21-23 * * *",
start_date=days_ago(1),
tags=["maximo", "api", "fetch", "continuous", "daily"],
tags=["maximo", "api", "fetch", "continuous", "daily", "async"],
catchup=False,
)
@ -197,4 +188,4 @@ process_task = PythonOperator(
# Set task dependencies
check_running >> [skip_execution, fetch_task]
fetch_task >> process_task
fetch_task >> process_task
Loading…
Cancel
Save