You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.9 KiB
Python

from datetime import datetime, timedelta
import requests
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor
import uuid
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import HttpOperator
from airflow.providers.http.hooks.http import HttpHook
from airflow.models import Variable
# Konfigurasi logging
logger = logging.getLogger(__name__)
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
dag = DAG(
'daily_train_api_fetch_async',
default_args=default_args,
description='Machine learning from API asynchronously every day',
schedule_interval='@daily', # Run every day
start_date=datetime(2025, 3, 10),
catchup=False,
tags=['api',"train", 'daily', 'async'],
)
def fire_and_forget_api_call(**context):
"""
Fungsi ini mengirim permintaan API dan tidak menunggu respons (fire and forget)
"""
# Menggunakan HttpHook untuk mendapatkan konfigurasi koneksi
http = HttpHook(method='GET', http_conn_id='pfi-api')
# Mendapatkan session dan base URL
session = http.get_conn()
endpoint = '/train/train'
# Generate request ID untuk tracking
request_id = str(uuid.uuid4())
# Log sebelum mengirim request
logger.info(f"Mengirim permintaan asinkron ke {endpoint} (Request ID: {request_id})")
# Membuat callback untuk menangani respons secara asinkron
def response_callback(future):
try:
# Hanya mengecek apakah request terkirim tanpa menunggu proses selesai
response = future.result(timeout=5) # Timeout 5 detik hanya untuk konfirmasi request terkirim
logger.info(f"Permintaan berhasil dikirim (Request ID: {request_id}), status: {response.status_code}")
except Exception as e:
logger.error(f"Kesalahan saat mengirim permintaan (Request ID: {request_id}): {str(e)}")
# Header API
headers = {
'Content-Type': 'application/json',
'X-Request-ID': request_id,
# 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables
}
# Menggunakan ThreadPoolExecutor untuk operasi asinkron
with ThreadPoolExecutor(max_workers=1) as executor:
# Submit request ke executor
future = executor.submit(
session.request,
'GET',
endpoint,
headers=headers,
timeout=10 # Timeout untuk koneksi awal, bukan waktu tunggu keseluruhan
)
# Menambahkan callback yang akan dieksekusi ketika future selesai
future.add_done_callback(response_callback)
# Jangan menunggu future selesai, biarkan berjalan di background
logger.info(f"Permintaan asinkron telah dikirim (Request ID: {request_id})")
# Kembalikan request_id untuk referensi
return {"request_id": request_id, "timestamp": datetime.now().isoformat()}
# API task yang tidak menunggu respons
train_async = PythonOperator(
task_id='train_async',
python_callable=fire_and_forget_api_call,
provide_context=True,
dag=dag,
)
# Alternatif: gunakan HttpOperator dengan request_timeout yang sangat pendek
# Ini hanya untuk memulai request tanpa menunggu respons lengkap
train_alt = HttpOperator(
task_id='train_alt',
http_conn_id='pfi-api',
endpoint='/train/train',
method='GET',
headers={
'Content-Type': 'application/json',
},
log_response=False, # Jangan log respons karena kita tidak menunggunya
response_check=lambda response: True, # Selalu mengembalikan sukses
extra_options={"timeout": 1.0}, # Timeout sangat pendek, hanya untuk memulai request
dag=dag,
)
# Set task dependencies jika diperlukan
# (empty for now, since there's only one task)