from datetime import datetime, timedelta import requests import json import logging import time from concurrent.futures import ThreadPoolExecutor import uuid from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.http.operators.http import HttpOperator from airflow.providers.http.hooks.http import HttpHook from airflow.models import Variable # Konfigurasi logging logger = logging.getLogger(__name__) # Default arguments for the DAG default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), } # Create the DAG dag = DAG( 'daily_api_fetch_async', default_args=default_args, description='Feature calculation from API asynchronously every day', schedule_interval='@daily', # Run every day start_date=datetime(2025, 3, 10), catchup=False, tags=['api',"feature-envelope", 'daily', 'async'], ) def fire_and_forget_api_call(**context): """ Fungsi ini mengirim permintaan API dan tidak menunggu respons (fire and forget) """ # Menggunakan HttpHook untuk mendapatkan konfigurasi koneksi http = HttpHook(method='GET', http_conn_id='pfi-api') # Mendapatkan session dan base URL session = http.get_conn() endpoint = '/envelope/calculate-feature' # Generate request ID untuk tracking request_id = str(uuid.uuid4()) # Log sebelum mengirim request logger.info(f"Mengirim permintaan asinkron ke {endpoint} (Request ID: {request_id})") # Membuat callback untuk menangani respons secara asinkron def response_callback(future): try: # Hanya mengecek apakah request terkirim tanpa menunggu proses selesai response = future.result(timeout=5) # Timeout 5 detik hanya untuk konfirmasi request terkirim logger.info(f"Permintaan berhasil dikirim (Request ID: {request_id}), status: {response.status_code}") except Exception as e: logger.error(f"Kesalahan saat mengirim permintaan (Request ID: {request_id}): {str(e)}") # Header API headers = { 'Content-Type': 'application/json', 'X-Request-ID': request_id, # 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables } # Menggunakan ThreadPoolExecutor untuk operasi asinkron with ThreadPoolExecutor(max_workers=1) as executor: # Submit request ke executor future = executor.submit( session.request, 'GET', endpoint, headers=headers, timeout=10 # Timeout untuk koneksi awal, bukan waktu tunggu keseluruhan ) # Menambahkan callback yang akan dieksekusi ketika future selesai future.add_done_callback(response_callback) # Jangan menunggu future selesai, biarkan berjalan di background logger.info(f"Permintaan asinkron telah dikirim (Request ID: {request_id})") # Kembalikan request_id untuk referensi return {"request_id": request_id, "timestamp": datetime.now().isoformat()} # API task yang tidak menunggu respons feature_envelope_async = PythonOperator( task_id='feature_envelope_async', python_callable=fire_and_forget_api_call, provide_context=True, dag=dag, ) # Alternatif: gunakan HttpOperator dengan request_timeout yang sangat pendek # Ini hanya untuk memulai request tanpa menunggu respons lengkap feature_envelope_alt = HttpOperator( task_id='feature_envelope_alt', http_conn_id='pfi-api', endpoint='/envelope/calculate-feature', method='GET', headers={ 'Content-Type': 'application/json', }, log_response=False, # Jangan log respons karena kita tidak menunggunya response_check=lambda response: True, # Selalu mengembalikan sukses extra_options={"timeout": 1.0}, # Timeout sangat pendek, hanya untuk memulai request dag=dag, ) # Set task dependencies jika diperlukan # (empty for now, since there's only one task)