You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
5.8 KiB
Python

from datetime import datetime, timedelta
import requests
import json
import uuid
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable, DagRun
from airflow.utils.session import create_session
from airflow.utils.state import State
# Configure logging
logger = logging.getLogger(__name__)
def is_dag_already_running(**context):
"""
Check if there are other active DAG runs to prevent duplicate execution
"""
dag_id = context["dag"].dag_id
run_id = context["run_id"]
with create_session() as session:
# Count running instances of this DAG excluding the current run
running_dags = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
DagRun.run_id != run_id,
DagRun.state == State.RUNNING,
)
.count()
)
if running_dags > 0:
logger.info(
f"Found {running_dags} other running instance(s) of this DAG. Skipping execution."
)
return "skip_execution"
else:
logger.info("No other running instances found. Proceeding with execution.")
return "fetch_daily_maximo_data"
def fetch_daily_maximo_data(**context):
"""
Fetch data from the Maximo endpoint using GET method with async pattern
"""
# Generate request ID for tracking
request_id = str(uuid.uuid4())
# Connection configuration
base_url = "http://10.47.0.65/envelope"
endpoint = "/fetch-maximo-daily"
fetch_url = f"{base_url}{endpoint}"
# Log before sending request
logger.info(f"Sending async GET request to {fetch_url} (Request ID: {request_id})")
# Request headers
headers = {
"Content-Type": "application/json",
"X-Request-ID": request_id,
}
# Create a response callback function
def response_callback(future):
try:
# Get the response from the future, with a short timeout just to confirm
# the request was properly initiated
response = future.result(timeout=10)
logger.info(
f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}"
)
# We don't wait for the full response processing, as it may take longer than Airflow's task timeout
except requests.exceptions.Timeout:
logger.error(f"Request connection timed out (Request ID: {request_id})")
except Exception as e:
logger.error(
f"Error initiating request (Request ID: {request_id}): {str(e)}"
)
# Using ThreadPoolExecutor for async operation
with ThreadPoolExecutor(max_workers=1) as executor:
# Submit request to executor with a longer timeout for the overall operation
future = executor.submit(
requests.get,
url=fetch_url,
headers=headers,
timeout=600, # Increased timeout to 10 minutes for the actual API call
)
# Add callback that will execute when future completes
future.add_done_callback(response_callback)
# Don't wait for future to complete, let it run in background
logger.info(f"Async request has been dispatched (Request ID: {request_id})")
# Push the request details to XCom for tracking
result_dict = {
"request_id": request_id,
"status": "initiated",
"timestamp": datetime.now().isoformat(),
"message": "Fetch Daily Maximo request initiated asynchronously",
}
ti = context["ti"]
ti.xcom_push(key="fetch_result", value=result_dict)
return result_dict
def process_response(**context):
"""
Process the response from the Maximo API
In async mode, this simply acknowledges the request was made
"""
ti = context["ti"]
result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result")
if result:
logger.info(f"Processing async request result: {result}")
# Since we're using fire-and-forget pattern, we just acknowledge the request was made
# In production, you might want to implement a separate DAG or task
# to check the status of the asynchronous job later
return True
return False
# Define default arguments for the DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
"fetch_daily_maximo_data",
default_args=default_args,
description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule",
# Schedule to run daily at 21:00 GMT
schedule_interval="0 14 * * *", # UTC (14:00 WIB)
start_date=days_ago(1),
tags=["maximo", "api", "fetch", "continuous", "daily", "async"],
catchup=False,
)
# Branch operator to check if we should proceed or skip
check_running = BranchPythonOperator(
task_id="check_if_already_running",
python_callable=is_dag_already_running,
provide_context=True,
dag=dag,
)
# Skip execution dummy task
skip_execution = DummyOperator(
task_id="skip_execution",
dag=dag,
)
# Define the task to fetch data from Maximo using PythonOperator
fetch_task = PythonOperator(
task_id="fetch_daily_maximo_data",
python_callable=fetch_daily_maximo_data,
provide_context=True,
dag=dag,
)
# Define task to process the response
process_task = PythonOperator(
task_id="process_response",
python_callable=process_response,
provide_context=True,
dag=dag,
)
# Set task dependencies
check_running >> [skip_execution, fetch_task]
fetch_task >> process_task