You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
194 lines
5.8 KiB
Python
194 lines
5.8 KiB
Python
from datetime import datetime, timedelta
|
|
import requests
|
|
import json
|
|
import uuid
|
|
import logging
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator, BranchPythonOperator
|
|
from airflow.operators.dummy import DummyOperator
|
|
from airflow.utils.dates import days_ago
|
|
from airflow.models import Variable, DagRun
|
|
from airflow.utils.session import create_session
|
|
from airflow.utils.state import State
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_dag_already_running(**context):
|
|
"""
|
|
Check if there are other active DAG runs to prevent duplicate execution
|
|
"""
|
|
dag_id = context["dag"].dag_id
|
|
run_id = context["run_id"]
|
|
|
|
with create_session() as session:
|
|
# Count running instances of this DAG excluding the current run
|
|
running_dags = (
|
|
session.query(DagRun)
|
|
.filter(
|
|
DagRun.dag_id == dag_id,
|
|
DagRun.run_id != run_id,
|
|
DagRun.state == State.RUNNING,
|
|
)
|
|
.count()
|
|
)
|
|
|
|
if running_dags > 0:
|
|
logger.info(
|
|
f"Found {running_dags} other running instance(s) of this DAG. Skipping execution."
|
|
)
|
|
return "skip_execution"
|
|
else:
|
|
logger.info("No other running instances found. Proceeding with execution.")
|
|
return "fetch_daily_maximo_data"
|
|
|
|
|
|
def fetch_daily_maximo_data(**context):
|
|
"""
|
|
Fetch data from the Maximo endpoint using GET method with async pattern
|
|
"""
|
|
# Generate request ID for tracking
|
|
request_id = str(uuid.uuid4())
|
|
|
|
# Connection configuration
|
|
base_url = "http://10.47.0.65/envelope"
|
|
endpoint = "/fetch-maximo-daily"
|
|
fetch_url = f"{base_url}{endpoint}"
|
|
|
|
# Log before sending request
|
|
logger.info(f"Sending async GET request to {fetch_url} (Request ID: {request_id})")
|
|
|
|
# Request headers
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"X-Request-ID": request_id,
|
|
}
|
|
|
|
# Create a response callback function
|
|
def response_callback(future):
|
|
try:
|
|
# Get the response from the future, with a short timeout just to confirm
|
|
# the request was properly initiated
|
|
response = future.result(timeout=10)
|
|
logger.info(
|
|
f"Request initiated successfully (Request ID: {request_id}), status: {response.status_code}"
|
|
)
|
|
|
|
# We don't wait for the full response processing, as it may take longer than Airflow's task timeout
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Request connection timed out (Request ID: {request_id})")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error initiating request (Request ID: {request_id}): {str(e)}"
|
|
)
|
|
|
|
# Using ThreadPoolExecutor for async operation
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
# Submit request to executor with a longer timeout for the overall operation
|
|
future = executor.submit(
|
|
requests.get,
|
|
url=fetch_url,
|
|
headers=headers,
|
|
timeout=600, # Increased timeout to 10 minutes for the actual API call
|
|
)
|
|
|
|
# Add callback that will execute when future completes
|
|
future.add_done_callback(response_callback)
|
|
|
|
# Don't wait for future to complete, let it run in background
|
|
logger.info(f"Async request has been dispatched (Request ID: {request_id})")
|
|
|
|
# Push the request details to XCom for tracking
|
|
result_dict = {
|
|
"request_id": request_id,
|
|
"status": "initiated",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"message": "Fetch Daily Maximo request initiated asynchronously",
|
|
}
|
|
|
|
ti = context["ti"]
|
|
ti.xcom_push(key="fetch_result", value=result_dict)
|
|
|
|
return result_dict
|
|
|
|
|
|
def process_response(**context):
|
|
"""
|
|
Process the response from the Maximo API
|
|
In async mode, this simply acknowledges the request was made
|
|
"""
|
|
ti = context["ti"]
|
|
result = ti.xcom_pull(task_ids="fetch_daily_maximo_data", key="fetch_result")
|
|
|
|
if result:
|
|
logger.info(f"Processing async request result: {result}")
|
|
# Since we're using fire-and-forget pattern, we just acknowledge the request was made
|
|
|
|
# In production, you might want to implement a separate DAG or task
|
|
# to check the status of the asynchronous job later
|
|
|
|
return True
|
|
return False
|
|
|
|
|
|
# Define default arguments for the DAG
|
|
default_args = {
|
|
"owner": "airflow",
|
|
"depends_on_past": False,
|
|
"email_on_failure": True,
|
|
"email_on_retry": False,
|
|
"retries": 3,
|
|
"retry_delay": timedelta(minutes=5),
|
|
}
|
|
|
|
# Define the DAG
|
|
dag = DAG(
|
|
"fetch_daily_maximo_data",
|
|
default_args=default_args,
|
|
description="A DAG to fetch data from Maximo API endpoint asynchronously on a daily schedule",
|
|
# Schedule to run daily at 21:00
|
|
schedule_interval="0 21 * * *",
|
|
start_date=days_ago(1),
|
|
tags=["maximo", "api", "fetch", "continuous", "daily", "async"],
|
|
catchup=False,
|
|
)
|
|
|
|
# Branch operator to check if we should proceed or skip
|
|
check_running = BranchPythonOperator(
|
|
task_id="check_if_already_running",
|
|
python_callable=is_dag_already_running,
|
|
provide_context=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# Skip execution dummy task
|
|
skip_execution = DummyOperator(
|
|
task_id="skip_execution",
|
|
dag=dag,
|
|
)
|
|
|
|
# Define the task to fetch data from Maximo using PythonOperator
|
|
fetch_task = PythonOperator(
|
|
task_id="fetch_daily_maximo_data",
|
|
python_callable=fetch_daily_maximo_data,
|
|
provide_context=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# Define task to process the response
|
|
process_task = PythonOperator(
|
|
task_id="process_response",
|
|
python_callable=process_response,
|
|
provide_context=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# Set task dependencies
|
|
check_running >> [skip_execution, fetch_task]
|
|
fetch_task >> process_task
|