|
|
|
@ -71,19 +71,21 @@ def fetch_daily_maximo_data(**context):
|
|
|
|
# Create a response callback function
|
|
|
|
# Create a response callback function
|
|
|
|
def response_callback(future):
|
|
|
|
def response_callback(future):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
# Get the response from the future, with a short timeout just to confirm
|
|
|
|
# Get the response from the future, with a short timeout just to confirm
|
|
|
|
# the request was properly initiated
|
|
|
|
# the request was properly initiated
|
|
|
|
response = future.result(timeout=10)
|
|
|
|
response = future.result(timeout=10)
|
|
|
|
logger.info(
|
|
|
|
logger.info(
|
|
|
|
f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}"
|
|
|
|
f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# We don't wait for the full response processing, as it may take longer than Airflow's task timeout
|
|
|
|
# We don't wait for the full response processing, as it may take longer than Airflow's task timeout
|
|
|
|
|
|
|
|
|
|
|
|
except requests.exceptions.Timeout:
|
|
|
|
except requests.exceptions.Timeout:
|
|
|
|
logger.error(f"Request connection timed out (Request ID: {request_id})")
|
|
|
|
logger.error(f"Request connection timed out (Request ID: {request_id})")
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Error initiating request (Request ID: {request_id}): {str(e)}")
|
|
|
|
logger.error(
|
|
|
|
|
|
|
|
f"Error initiating request (Request ID: {request_id}): {str(e)}"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Using ThreadPoolExecutor for async operation
|
|
|
|
# Using ThreadPoolExecutor for async operation
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
|
|
@ -94,13 +96,13 @@ def fetch_daily_maximo_data(**context):
|
|
|
|
headers=headers,
|
|
|
|
headers=headers,
|
|
|
|
timeout=600, # Increased timeout to 10 minutes for the actual API call
|
|
|
|
timeout=600, # Increased timeout to 10 minutes for the actual API call
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Add callback that will execute when future completes
|
|
|
|
# Add callback that will execute when future completes
|
|
|
|
future.add_done_callback(response_callback)
|
|
|
|
future.add_done_callback(response_callback)
|
|
|
|
|
|
|
|
|
|
|
|
# Don't wait for future to complete, let it run in background
|
|
|
|
# Don't wait for future to complete, let it run in background
|
|
|
|
logger.info(f"Async request has been dispatched (Request ID: {request_id})")
|
|
|
|
logger.info(f"Async request has been dispatched (Request ID: {request_id})")
|
|
|
|
|
|
|
|
|
|
|
|
# Push the request details to XCom for tracking
|
|
|
|
# Push the request details to XCom for tracking
|
|
|
|
result_dict = {
|
|
|
|
result_dict = {
|
|
|
|
"request_id": request_id,
|
|
|
|
"request_id": request_id,
|
|
|
|
@ -108,10 +110,10 @@ def fetch_daily_maximo_data(**context):
|
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
|
"message": "Fetch Daily Maximo request initiated asynchronously",
|
|
|
|
"message": "Fetch Daily Maximo request initiated asynchronously",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ti = context["ti"]
|
|
|
|
ti = context["ti"]
|
|
|
|
ti.xcom_push(key="fetch_result", value=result_dict)
|
|
|
|
ti.xcom_push(key="fetch_result", value=result_dict)
|
|
|
|
|
|
|
|
|
|
|
|
return result_dict
|
|
|
|
return result_dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -126,8 +128,8 @@ def process_response(**context):
|
|
|
|
if result:
|
|
|
|
if result:
|
|
|
|
logger.info(f"Processing async request result: {result}")
|
|
|
|
logger.info(f"Processing async request result: {result}")
|
|
|
|
# Since we're using fire-and-forget pattern, we just acknowledge the request was made
|
|
|
|
# Since we're using fire-and-forget pattern, we just acknowledge the request was made
|
|
|
|
|
|
|
|
|
|
|
|
# In production, you might want to implement a separate DAG or task
|
|
|
|
# In production, you might want to implement a separate DAG or task
|
|
|
|
# to check the status of the asynchronous job later
|
|
|
|
# to check the status of the asynchronous job later
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
@ -146,7 +148,7 @@ default_args = {
|
|
|
|
|
|
|
|
|
|
|
|
# Define the DAG
|
|
|
|
# Define the DAG
|
|
|
|
dag = DAG(
|
|
|
|
dag = DAG(
|
|
|
|
"fetch_daily_maximo_data_async",
|
|
|
|
"fetch_daily_maximo_data",
|
|
|
|
default_args=default_args,
|
|
|
|
default_args=default_args,
|
|
|
|
description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule",
|
|
|
|
description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule",
|
|
|
|
# Schedule to run daily at 21:00, 22:00, and 23:00
|
|
|
|
# Schedule to run daily at 21:00, 22:00, and 23:00
|
|
|
|
@ -188,4 +190,4 @@ process_task = PythonOperator(
|
|
|
|
|
|
|
|
|
|
|
|
# Set task dependencies
|
|
|
|
# Set task dependencies
|
|
|
|
check_running >> [skip_execution, fetch_task]
|
|
|
|
check_running >> [skip_execution, fetch_task]
|
|
|
|
fetch_task >> process_task
|
|
|
|
fetch_task >> process_task
|
|
|
|
|