diff --git a/train_pfi_scheduler.py b/train_pfi_scheduler.py new file mode 100644 index 0000000..c332946 --- /dev/null +++ b/train_pfi_scheduler.py @@ -0,0 +1,117 @@ +from datetime import datetime, timedelta +import requests +import json +import logging +import time +from concurrent.futures import ThreadPoolExecutor +import uuid + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.http.operators.http import HttpOperator +from airflow.providers.http.hooks.http import HttpHook +from airflow.models import Variable + +# Konfigurasi logging +logger = logging.getLogger(__name__) + +# Default arguments for the DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 3, + 'retry_delay': timedelta(minutes=5), +} + +# Create the DAG +dag = DAG( + 'daily_train_api_fetch_async', + default_args=default_args, + description='Machine learning from API asynchronously every day', + schedule_interval='@daily', # Run every day + start_date=datetime(2025, 3, 10), + catchup=False, + tags=['api',"train", 'daily', 'async'], +) + +def fire_and_forget_api_call(**context): + """ + Fungsi ini mengirim permintaan API dan tidak menunggu respons (fire and forget) + """ + # Menggunakan HttpHook untuk mendapatkan konfigurasi koneksi + http = HttpHook(method='GET', http_conn_id='train-pfi-app') + + # Mendapatkan session dan base URL + session = http.get_conn() + endpoint = '/train' + + # Generate request ID untuk tracking + request_id = str(uuid.uuid4()) + + # Log sebelum mengirim request + logger.info(f"Mengirim permintaan asinkron ke {endpoint} (Request ID: {request_id})") + + # Membuat callback untuk menangani respons secara asinkron + def response_callback(future): + try: + # Hanya mengecek apakah request terkirim tanpa menunggu proses selesai + response = future.result(timeout=5) # Timeout 5 detik hanya untuk konfirmasi request terkirim + logger.info(f"Permintaan berhasil dikirim (Request ID: {request_id}), status: {response.status_code}") + except Exception as e: + logger.error(f"Kesalahan saat mengirim permintaan (Request ID: {request_id}): {str(e)}") + + # Header API + headers = { + 'Content-Type': 'application/json', + 'X-Request-ID': request_id, + # 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables + } + + # Menggunakan ThreadPoolExecutor untuk operasi asinkron + with ThreadPoolExecutor(max_workers=1) as executor: + # Submit request ke executor + future = executor.submit( + session.request, + 'GET', + endpoint, + headers=headers, + timeout=10 # Timeout untuk koneksi awal, bukan waktu tunggu keseluruhan + ) + + # Menambahkan callback yang akan dieksekusi ketika future selesai + future.add_done_callback(response_callback) + + # Jangan menunggu future selesai, biarkan berjalan di background + logger.info(f"Permintaan asinkron telah dikirim (Request ID: {request_id})") + + # Kembalikan request_id untuk referensi + return {"request_id": request_id, "timestamp": datetime.now().isoformat()} + +# API task yang tidak menunggu respons +train_async = PythonOperator( + task_id='train_async', + python_callable=fire_and_forget_api_call, + provide_context=True, + dag=dag, +) + +# Alternatif: gunakan HttpOperator dengan request_timeout yang sangat pendek +# Ini hanya untuk memulai request tanpa menunggu respons lengkap +train_alt = HttpOperator( + task_id='train_alt', + http_conn_id='train-pfi-app', + endpoint='/train', + method='GET', + headers={ + 'Content-Type': 'application/json', + }, + log_response=False, # Jangan log respons karena kita tidak menunggunya + response_check=lambda response: True, # Selalu mengembalikan sukses + extra_options={"timeout": 1.0}, # Timeout sangat pendek, hanya untuk memulai request + dag=dag, +) + +# Set task dependencies jika diperlukan +# (empty for now, since there's only one task) \ No newline at end of file