diff --git a/dags/fetch_maximo_daily.py b/dags/fetch_maximo_daily.py index 0339edd..fd6106e 100644 --- a/dags/fetch_maximo_daily.py +++ b/dags/fetch_maximo_daily.py @@ -4,10 +4,10 @@ import json import uuid import logging import time +from concurrent.futures import ThreadPoolExecutor from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago from airflow.models import Variable, DagRun @@ -49,7 +49,7 @@ def is_dag_already_running(**context): def fetch_daily_maximo_data(**context): """ - Fetch data from the Maximo endpoint using GET method and process the response + Fetch data from the Maximo endpoint using GET method with async pattern """ # Generate request ID for tracking request_id = str(uuid.uuid4()) @@ -60,7 +60,7 @@ def fetch_daily_maximo_data(**context): fetch_url = f"{base_url}{endpoint}" # Log before sending request - logger.info(f"Sending GET request to {fetch_url} (Request ID: {request_id})") + logger.info(f"Sending async GET request to {fetch_url} (Request ID: {request_id})") # Request headers headers = { @@ -68,76 +68,67 @@ def fetch_daily_maximo_data(**context): "X-Request-ID": request_id, } - try: - # Use requests library directly with increased timeout - response = requests.get( + # Create a response callback function + def response_callback(future): + try: + # Get the response from the future, with a short timeout just to confirm + # the request was properly initiated + response = future.result(timeout=10) + logger.info( + f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}" + ) + + # We don't wait for the full response processing, as it may take longer than Airflow's task timeout + + except requests.exceptions.Timeout: + logger.error(f"Request connection timed out (Request ID: {request_id})") + except Exception as e: + logger.error(f"Error initiating request (Request ID: {request_id}): {str(e)}") + + # Using ThreadPoolExecutor for async operation + with ThreadPoolExecutor(max_workers=1) as executor: + # Submit request to executor with a longer timeout for the overall operation + future = executor.submit( + requests.get, url=fetch_url, headers=headers, - timeout=60, # Increased timeout to 60 seconds - ) - - logger.info( - f"Request completed (Request ID: {request_id}), status: {response.status_code}" + timeout=600, # Increased timeout to 10 minutes for the actual API call ) - - if response.status_code == 200: - logger.info(f"Request successful (Request ID: {request_id})") - - # Parse JSON response - response_data = response.json() - maximo_message = response_data.get("message", "No message provided") - - logger.info(f"Maximo response message: {maximo_message}") - - return { - "request_id": request_id, - "status_code": response.status_code, - "timestamp": datetime.now().isoformat(), - "message": f"Fetch Daily Maximo Runs Successfully! {maximo_message}", - } - else: - logger.error( - f"Request failed (Request ID: {request_id}), status: {response.status_code}" - ) - return { - "request_id": request_id, - "status_code": response.status_code, - "timestamp": datetime.now().isoformat(), - "message": f"Failed to fetch daily Maximo data. Status code: {response.status_code}", - } - - except requests.exceptions.Timeout: - logger.error(f"Request timed out (Request ID: {request_id})") - return { - "request_id": request_id, - "status_code": 504, - "timestamp": datetime.now().isoformat(), - "message": "Request timed out while fetching Maximo data", - } - - except Exception as e: - logger.error(f"Error sending request (Request ID: {request_id}): {str(e)}") - return { - "request_id": request_id, - "status_code": 500, - "timestamp": datetime.now().isoformat(), - "message": f"Exception in Fetch Daily Maximo: {str(e)}", - } + + # Add callback that will execute when future completes + future.add_done_callback(response_callback) + + # Don't wait for future to complete, let it run in background + logger.info(f"Async request has been dispatched (Request ID: {request_id})") + + # Push the request details to XCom for tracking + result_dict = { + "request_id": request_id, + "status": "initiated", + "timestamp": datetime.now().isoformat(), + "message": "Fetch Daily Maximo request initiated asynchronously", + } + + ti = context["ti"] + ti.xcom_push(key="fetch_result", value=result_dict) + + return result_dict def process_response(**context): """ Process the response from the Maximo API + In async mode, this simply acknowledges the request was made """ ti = context["ti"] result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result") if result: - logger.info(f"Processing result: {result}") - # Add any additional processing logic here - - # Sleep for a short time to simulate processing - time.sleep(2) + logger.info(f"Processing async request result: {result}") + # Since we're using fire-and-forget pattern, we just acknowledge the request was made + + # In production, you might want to implement a separate DAG or task + # to check the status of the asynchronous job later return True return False @@ -155,13 +146,13 @@ default_args = { # Define the DAG dag = DAG( - "fetch_daily_maximo_data", + "fetch_daily_maximo_data_async", default_args=default_args, - description="A DAG to fetch data from Maximo API endpoint on a schedule daily", + description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule", # Schedule to run daily at 21:00, 22:00, and 23:00 schedule_interval="0 21-23 * * *", start_date=days_ago(1), - tags=["maximo", "api", "fetch", "continuous", "daily"], + tags=["maximo", "api", "fetch", "continuous", "daily", "async"], catchup=False, ) @@ -197,4 +188,4 @@ process_task = PythonOperator( # Set task dependencies check_running >> [skip_execution, fetch_task] -fetch_task >> process_task +fetch_task >> process_task \ No newline at end of file