You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
2.8 KiB
Python
117 lines
2.8 KiB
Python
from datetime import datetime, timedelta
|
|
from airflow import DAG
|
|
from airflow.providers.http.operators.http import HttpOperator
|
|
|
|
# Default arguments for the DAG
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 3,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
# Create the DAG
|
|
dag = DAG(
|
|
'hourly_api_fetch',
|
|
default_args=default_args,
|
|
description='Fetch data from multiple APIs every hour',
|
|
schedule_interval='@hourly', # Run every hour
|
|
start_date=datetime(2025, 3, 7),
|
|
catchup=False,
|
|
tags=['api', 'hourly'],
|
|
)
|
|
|
|
# API 1 configuration
|
|
api1_endpoint = '/risk-matrix/api/pof-predict/update/latest/'
|
|
api1_headers = {
|
|
'Content-Type': 'application/json',
|
|
# 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables
|
|
}
|
|
|
|
# API 2 configuration
|
|
api2_endpoint = '/risk-matrix/api/cof-predict/citra/update/latest/'
|
|
api2_headers = {
|
|
'Content-Type': 'application/json',
|
|
# 'Authorization': '{{ var.value.api2_token }}',
|
|
}
|
|
|
|
# API 3 configuration
|
|
api3_endpoint = '/risk-matrix/api/cof-predict/critical/mttr-update/'
|
|
api3_headers = {
|
|
'Content-Type': 'application/json',
|
|
# 'Authorization': '{{ var.value.api3_token }}',
|
|
}
|
|
|
|
# API 4 configuration
|
|
api4_endpoint = '/risk-matrix/api/cof-predict/losses/mdt-update/latest/'
|
|
api4_headers = {
|
|
'Content-Type': 'application/json',
|
|
# 'Authorization': '{{ var.value.api4_token }}',
|
|
}
|
|
|
|
# API 5 configuration
|
|
api5_endpoint = '/risk-matrix/api/nof-history/update/latest/'
|
|
api5_headers = {
|
|
'Content-Type': 'application/json',
|
|
# 'Authorization': '{{ var.value.api5_token }}',
|
|
}
|
|
|
|
# API 1 task
|
|
fetch_api1 = HttpOperator(
|
|
task_id='fetch_api1',
|
|
http_conn_id='risk_matrix_app', # Set up this connection in Airflow
|
|
endpoint=api1_endpoint,
|
|
method='PATCH',
|
|
headers=api1_headers,
|
|
log_response=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# API 2 task
|
|
fetch_api2 = HttpOperator(
|
|
task_id='fetch_api2',
|
|
http_conn_id='risk_matrix_app',
|
|
endpoint=api2_endpoint,
|
|
method='PATCH',
|
|
headers=api2_headers,
|
|
log_response=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# API 3 task
|
|
fetch_api3 = HttpOperator(
|
|
task_id='fetch_api3',
|
|
http_conn_id='risk_matrix_app',
|
|
endpoint=api3_endpoint,
|
|
method='PATCH',
|
|
headers=api3_headers,
|
|
log_response=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# API 4 task
|
|
fetch_api4 = HttpOperator(
|
|
task_id='fetch_api4',
|
|
http_conn_id='risk_matrix_app',
|
|
endpoint=api4_endpoint,
|
|
method='PATCH',
|
|
headers=api4_headers,
|
|
log_response=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# API 5 task
|
|
fetch_api5 = HttpOperator(
|
|
task_id='fetch_api5',
|
|
http_conn_id='risk_matrix_app',
|
|
endpoint=api5_endpoint,
|
|
method='PATCH',
|
|
headers=api5_headers,
|
|
log_response=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# These tasks can run in parallel since they don't depend on each other
|