diff --git a/dags/fetch_maximo_daily.py b/dags/fetch_maximo_daily.py index fd6106e..c41376f 100644 --- a/dags/fetch_maximo_daily.py +++ b/dags/fetch_maximo_daily.py @@ -71,19 +71,21 @@ def fetch_daily_maximo_data(**context): # Create a response callback function def response_callback(future): try: - # Get the response from the future, with a short timeout just to confirm + # Get the response from the future, with a short timeout just to confirm # the request was properly initiated response = future.result(timeout=10) logger.info( f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}" ) - + # We don't wait for the full response processing, as it may take longer than Airflow's task timeout - + except requests.exceptions.Timeout: logger.error(f"Request connection timed out (Request ID: {request_id})") except Exception as e: - logger.error(f"Error initiating request (Request ID: {request_id}): {str(e)}") + logger.error( + f"Error initiating request (Request ID: {request_id}): {str(e)}" + ) # Using ThreadPoolExecutor for async operation with ThreadPoolExecutor(max_workers=1) as executor: @@ -94,13 +96,13 @@ def fetch_daily_maximo_data(**context): headers=headers, timeout=600, # Increased timeout to 10 minutes for the actual API call ) - + # Add callback that will execute when future completes future.add_done_callback(response_callback) - + # Don't wait for future to complete, let it run in background logger.info(f"Async request has been dispatched (Request ID: {request_id})") - + # Push the request details to XCom for tracking result_dict = { "request_id": request_id, @@ -108,10 +110,10 @@ def fetch_daily_maximo_data(**context): "timestamp": datetime.now().isoformat(), "message": "Fetch Daily Maximo request initiated asynchronously", } - + ti = context["ti"] ti.xcom_push(key="fetch_result", value=result_dict) - + return result_dict @@ -126,8 +128,8 @@ def process_response(**context): if result: logger.info(f"Processing async request result: {result}") # Since we're using fire-and-forget pattern, we just acknowledge the request was made - - # In production, you might want to implement a separate DAG or task + + # In production, you might want to implement a separate DAG or task # to check the status of the asynchronous job later return True @@ -146,7 +148,7 @@ default_args = { # Define the DAG dag = DAG( - "fetch_daily_maximo_data_async", + "fetch_daily_maximo_data", default_args=default_args, description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule", # Schedule to run daily at 21:00, 22:00, and 23:00 @@ -188,4 +190,4 @@ process_task = PythonOperator( # Set task dependencies check_running >> [skip_execution, fetch_task] -fetch_task >> process_task \ No newline at end of file +fetch_task >> process_task