feat: add feature_envelope_pfi_scheduler.py
parent
770b8995c5
commit
3109d2cfb5
@ -0,0 +1,117 @@
|
|||||||
|
from datetime import datetime, timedelta
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from airflow import DAG
|
||||||
|
from airflow.operators.python import PythonOperator
|
||||||
|
from airflow.providers.http.operators.http import HttpOperator
|
||||||
|
from airflow.providers.http.hooks.http import HttpHook
|
||||||
|
from airflow.models import Variable
|
||||||
|
|
||||||
|
# Konfigurasi logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Default arguments for the DAG
|
||||||
|
default_args = {
|
||||||
|
'owner': 'airflow',
|
||||||
|
'depends_on_past': False,
|
||||||
|
'email_on_failure': False,
|
||||||
|
'email_on_retry': False,
|
||||||
|
'retries': 3,
|
||||||
|
'retry_delay': timedelta(minutes=5),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create the DAG
|
||||||
|
dag = DAG(
|
||||||
|
'daily_api_fetch_async',
|
||||||
|
default_args=default_args,
|
||||||
|
description='Feature calculation from API asynchronously every day',
|
||||||
|
schedule_interval='@daily', # Run every day
|
||||||
|
start_date=datetime(2025, 3, 10),
|
||||||
|
catchup=False,
|
||||||
|
tags=['api',"feature-envelope", 'daily', 'async'],
|
||||||
|
)
|
||||||
|
|
||||||
|
def fire_and_forget_api_call(**context):
|
||||||
|
"""
|
||||||
|
Fungsi ini mengirim permintaan API dan tidak menunggu respons (fire and forget)
|
||||||
|
"""
|
||||||
|
# Menggunakan HttpHook untuk mendapatkan konfigurasi koneksi
|
||||||
|
http = HttpHook(method='GET', http_conn_id='envelope-pfi-app')
|
||||||
|
|
||||||
|
# Mendapatkan session dan base URL
|
||||||
|
session = http.get_conn()
|
||||||
|
endpoint = '/calculate-feature'
|
||||||
|
|
||||||
|
# Generate request ID untuk tracking
|
||||||
|
request_id = str(uuid.uuid4())
|
||||||
|
|
||||||
|
# Log sebelum mengirim request
|
||||||
|
logger.info(f"Mengirim permintaan asinkron ke {endpoint} (Request ID: {request_id})")
|
||||||
|
|
||||||
|
# Membuat callback untuk menangani respons secara asinkron
|
||||||
|
def response_callback(future):
|
||||||
|
try:
|
||||||
|
# Hanya mengecek apakah request terkirim tanpa menunggu proses selesai
|
||||||
|
response = future.result(timeout=5) # Timeout 5 detik hanya untuk konfirmasi request terkirim
|
||||||
|
logger.info(f"Permintaan berhasil dikirim (Request ID: {request_id}), status: {response.status_code}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Kesalahan saat mengirim permintaan (Request ID: {request_id}): {str(e)}")
|
||||||
|
|
||||||
|
# Header API
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'X-Request-ID': request_id,
|
||||||
|
# 'Authorization': '{{ var.value.api1_token }}', # Store this in Airflow Variables
|
||||||
|
}
|
||||||
|
|
||||||
|
# Menggunakan ThreadPoolExecutor untuk operasi asinkron
|
||||||
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||||
|
# Submit request ke executor
|
||||||
|
future = executor.submit(
|
||||||
|
session.request,
|
||||||
|
'GET',
|
||||||
|
endpoint,
|
||||||
|
headers=headers,
|
||||||
|
timeout=10 # Timeout untuk koneksi awal, bukan waktu tunggu keseluruhan
|
||||||
|
)
|
||||||
|
|
||||||
|
# Menambahkan callback yang akan dieksekusi ketika future selesai
|
||||||
|
future.add_done_callback(response_callback)
|
||||||
|
|
||||||
|
# Jangan menunggu future selesai, biarkan berjalan di background
|
||||||
|
logger.info(f"Permintaan asinkron telah dikirim (Request ID: {request_id})")
|
||||||
|
|
||||||
|
# Kembalikan request_id untuk referensi
|
||||||
|
return {"request_id": request_id, "timestamp": datetime.now().isoformat()}
|
||||||
|
|
||||||
|
# API task yang tidak menunggu respons
|
||||||
|
feature_envelope_async = PythonOperator(
|
||||||
|
task_id='feature_envelope_async',
|
||||||
|
python_callable=fire_and_forget_api_call,
|
||||||
|
provide_context=True,
|
||||||
|
dag=dag,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Alternatif: gunakan HttpOperator dengan request_timeout yang sangat pendek
|
||||||
|
# Ini hanya untuk memulai request tanpa menunggu respons lengkap
|
||||||
|
feature_envelope_alt = HttpOperator(
|
||||||
|
task_id='feature_envelope_alt',
|
||||||
|
http_conn_id='envelope-pfi-app',
|
||||||
|
endpoint='/calculate-feature',
|
||||||
|
method='GET',
|
||||||
|
headers={
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
},
|
||||||
|
log_response=False, # Jangan log respons karena kita tidak menunggunya
|
||||||
|
response_check=lambda response: True, # Selalu mengembalikan sukses
|
||||||
|
extra_options={"timeout": 1.0}, # Timeout sangat pendek, hanya untuk memulai request
|
||||||
|
dag=dag,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set task dependencies jika diperlukan
|
||||||
|
# (empty for now, since there's only one task)
|
||||||
Loading…
Reference in New Issue