You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
3.9 KiB
Python
117 lines
3.9 KiB
Python
from datetime import datetime, timedelta
|
|
import requests
|
|
import json
|
|
import logging
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import uuid
|
|
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.providers.http.operators.http import HttpOperator
|
|
from airflow.providers.http.hooks.http import HttpHook
|
|
from airflow.models import Variable
|
|
|
|
# Konfigurasi logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default arguments for the DAG
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 3,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
# Create the DAG
|
|
dag = DAG(
|
|
'daily_train_api_fetch_async',
|
|
default_args=default_args,
|
|
description='Machine learning from API asynchronously every day',
|
|
schedule_interval='@daily', # Run every day
|
|
start_date=datetime(2025, 3, 10),
|
|
catchup=False,
|
|
tags=['api',"train", 'daily', 'async'],
|
|
)
|
|
|
|
def fire_and_forget_api_call(**context):
|
|
"""
|
|
Fungsi ini mengirim permintaan API dan tidak menunggu respons (fire and forget)
|
|
"""
|
|
# Menggunakan HttpHook untuk mendapatkan konfigurasi koneksi
|
|
http = HttpHook(method='GET', http_conn_id='pfi-api')
|
|
|
|
# Mendapatkan session dan base URL
|
|
session = http.get_conn()
|
|
endpoint = '/train/train'
|
|
|
|
# Generate request ID untuk tracking
|
|
request_id = str(uuid.uuid4())
|
|
|
|
# Log sebelum mengirim request
|
|
logger.info(f"Mengirim permintaan asinkron ke {endpoint} (Request ID: {request_id})")
|
|
|
|
# Membuat callback untuk menangani respons secara asinkron
|
|
def response_callback(future):
|
|
try:
|
|
# Hanya mengecek apakah request terkirim tanpa menunggu proses selesai
|
|
response = future.result(timeout=5) # Timeout 5 detik hanya untuk konfirmasi request terkirim
|
|
logger.info(f"Permintaan berhasil dikirim (Request ID: {request_id}), status: {response.status_code}")
|
|
except Exception as e:
|
|
logger.error(f"Kesalahan saat mengirim permintaan (Request ID: {request_id}): {str(e)}")
|
|
|
|
# Header API
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-Request-ID': request_id,
|
|
# 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables
|
|
}
|
|
|
|
# Menggunakan ThreadPoolExecutor untuk operasi asinkron
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
# Submit request ke executor
|
|
future = executor.submit(
|
|
session.request,
|
|
'GET',
|
|
endpoint,
|
|
headers=headers,
|
|
timeout=10 # Timeout untuk koneksi awal, bukan waktu tunggu keseluruhan
|
|
)
|
|
|
|
# Menambahkan callback yang akan dieksekusi ketika future selesai
|
|
future.add_done_callback(response_callback)
|
|
|
|
# Jangan menunggu future selesai, biarkan berjalan di background
|
|
logger.info(f"Permintaan asinkron telah dikirim (Request ID: {request_id})")
|
|
|
|
# Kembalikan request_id untuk referensi
|
|
return {"request_id": request_id, "timestamp": datetime.now().isoformat()}
|
|
|
|
# API task yang tidak menunggu respons
|
|
train_async = PythonOperator(
|
|
task_id='train_async',
|
|
python_callable=fire_and_forget_api_call,
|
|
provide_context=True,
|
|
dag=dag,
|
|
)
|
|
|
|
# Alternatif: gunakan HttpOperator dengan request_timeout yang sangat pendek
|
|
# Ini hanya untuk memulai request tanpa menunggu respons lengkap
|
|
train_alt = HttpOperator(
|
|
task_id='train_alt',
|
|
http_conn_id='pfi-api',
|
|
endpoint='/train/train',
|
|
method='GET',
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
},
|
|
log_response=False, # Jangan log respons karena kita tidak menunggunya
|
|
response_check=lambda response: True, # Selalu mengembalikan sukses
|
|
extra_options={"timeout": 1.0}, # Timeout sangat pendek, hanya untuk memulai request
|
|
dag=dag,
|
|
)
|
|
|
|
# Set task dependencies jika diperlukan
|
|
# (empty for now, since there's only one task) |