You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
apache-airflow/risk_matrix_scheduler_per_h...

117 lines
2.8 KiB
Python

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import HttpOperator
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
dag = DAG(
'hourly_api_fetch',
default_args=default_args,
description='Fetch data from multiple APIs every hour',
schedule_interval='@hourly', # Run every hour
start_date=datetime(2025, 3, 7),
catchup=False,
tags=['api', 'hourly'],
)
# API 1 configuration
api1_endpoint = '/risk-matrix/api/pof-predict/update/latest/'
api1_headers = {
'Content-Type': 'application/json',
# 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables
}
# API 2 configuration
api2_endpoint = '/risk-matrix/api/cof-predict/citra/update/latest/'
api2_headers = {
'Content-Type': 'application/json',
# 'Authorization': '{{ var.value.api2_token }}',
}
# API 3 configuration
api3_endpoint = '/risk-matrix/api/cof-predict/critical/mttr-update/'
api3_headers = {
'Content-Type': 'application/json',
# 'Authorization': '{{ var.value.api3_token }}',
}
# API 4 configuration
api4_endpoint = '/risk-matrix/api/cof-predict/losses/mdt-update/latest/'
api4_headers = {
'Content-Type': 'application/json',
# 'Authorization': '{{ var.value.api4_token }}',
}
# API 5 configuration
api5_endpoint = '/risk-matrix/api/nof-history/update/latest/'
api5_headers = {
'Content-Type': 'application/json',
# 'Authorization': '{{ var.value.api5_token }}',
}
# API 1 task
fetch_api1 = HttpOperator(
task_id='fetch_api1',
http_conn_id='risk_matrix_app', # Set up this connection in Airflow
endpoint=api1_endpoint,
method='PATCH',
headers=api1_headers,
log_response=True,
dag=dag,
)
# API 2 task
fetch_api2 = HttpOperator(
task_id='fetch_api2',
http_conn_id='risk_matrix_app',
endpoint=api2_endpoint,
method='PATCH',
headers=api2_headers,
log_response=True,
dag=dag,
)
# API 3 task
fetch_api3 = HttpOperator(
task_id='fetch_api3',
http_conn_id='risk_matrix_app',
endpoint=api3_endpoint,
method='PATCH',
headers=api3_headers,
log_response=True,
dag=dag,
)
# API 4 task
fetch_api4 = HttpOperator(
task_id='fetch_api4',
http_conn_id='risk_matrix_app',
endpoint=api4_endpoint,
method='PATCH',
headers=api4_headers,
log_response=True,
dag=dag,
)
# API 5 task
fetch_api5 = HttpOperator(
task_id='fetch_api5',
http_conn_id='risk_matrix_app',
endpoint=api5_endpoint,
method='PATCH',
headers=api5_headers,
log_response=True,
dag=dag,
)
# These tasks can run in parallel since they don't depend on each other