@ -7,7 +7,7 @@ import os
import httpx
import httpx
sys . path . append ( os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , " .. " ) ) )
sys . path . append ( os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , " .. " ) ) )
from config import get_connection
from config import get_connection , get_production_connection
async def fetch_api_data (
async def fetch_api_data (
@ -37,43 +37,96 @@ def get_recursive_query(cursor, assetnum, worktype="CM"):
Fungsi untuk menjalankan query rekursif berdasarkan assetnum dan worktype .
Fungsi untuk menjalankan query rekursif berdasarkan assetnum dan worktype .
worktype memiliki nilai default ' CM ' .
worktype memiliki nilai default ' CM ' .
"""
"""
# query = f"""
# SELECT
# ROW_NUMBER() OVER (ORDER BY tbl.assetnum, tbl.year, tbl.worktype) AS seq,
# *
# FROM (
# SELECT
# a.worktype,
# a.assetnum,
# EXTRACT(YEAR FROM a.reportdate) AS year,
# COUNT(a.wonum) AS raw_corrective_failure_interval,
# SUM(a.total_cost_max) AS raw_corrective_material_cost,
# ROUND(
# SUM(
# EXTRACT(EPOCH FROM (
# a.actfinish -
# a.actstart
# ))
# ) / 3600
# , 2) AS raw_corrective_labor_time_jam,
# SUM(a.jumlah_labor) AS raw_corrective_labor_technician
# FROM
# public.wo_staging_3 AS a
# WHERE
# a.unit = '3'
# GROUP BY
# a.worktype,
# a.assetnum,
# EXTRACT(YEAR FROM a.reportdate)
# ) AS tbl
# WHERE
# tbl.worktype = '{worktype}'
# AND tbl.assetnum = '{assetnum}'
# ORDER BY
# tbl.assetnum,
# tbl.year,
# tbl.worktype
# """
# query = f"""
# select d.tahun, SUM(d.actmatcost) AS raw_corrective_material_cost, sum(d.man_hour) as man_hour_peryear from
# (
# SELECT
# a.wonum,
# a.actmatcost,
# DATE_PART('year', a.reportdate) AS tahun,
# (
# ROUND(SUM(EXTRACT(EPOCH FROM (a.actfinish - a.actstart)) / 3600), 2)
# ) AS man_hour,
# CASE
# WHEN COUNT(b.laborcode) = 0 THEN 3
# ELSE COUNT(b.laborcode)
# END AS man_count
# FROM public.wo_maximo AS a
# LEFT JOIN public.wo_maximo_labtrans AS b
# ON b.wonum = a.wonum
# WHERE
# a.asset_unit = '3'
# AND a.worktype = '{worktype}'
# AND a.asset_assetnum = '{assetnum}'
# and a.wonum not like 'T%'
# GROUP BY
# a.wonum,
# a.actmatcost,
# DATE_PART('year', a.reportdate)
# ) as d group by d.tahun
# ;
# """
query = f """
query = f """
SELECT
select
ROW_NUMBER ( ) OVER ( ORDER BY tbl . assetnum , tbl . year , tbl . worktype ) AS seq ,
DATE_PART ( ' year ' , a . reportdate ) as tahun ,
*
COUNT ( a . wonum ) as raw_ { worktype . lower ( ) } _interval ,
FROM (
sum ( a . actmatcost ) as raw_ { worktype . lower ( ) } _material_cost ,
SELECT
(
a . worktype ,
ROUND ( SUM ( EXTRACT ( EPOCH FROM ( a . actfinish - a . actstart ) ) / 3600 ) , 2 )
a . assetnum ,
) AS raw_ { worktype . lower ( ) } _labor_time ,
EXTRACT ( YEAR FROM a . reportdate ) AS year ,
CASE
COUNT ( a . wonum ) AS raw_corrective_failure_interval ,
WHEN COUNT ( b . laborcode ) = 0 THEN 3
SUM ( a . total_cost_max ) AS raw_corrective_material_cost ,
ELSE COUNT ( b . laborcode )
ROUND (
END AS raw_ { worktype . lower ( ) } _labor_human
SUM (
from public . wo_maximo as a
EXTRACT ( EPOCH FROM (
LEFT JOIN public . wo_maximo_labtrans AS b
a . actfinish -
ON b . wonum = a . wonum
a . actstart
where
) )
a . asset_unit = ' 3 '
) / 3600
{ f " AND a.worktype = ' { worktype } ' " if worktype != ' CM ' else " AND a.worktype in ( ' CM ' , ' PROACTIVE ' , ' WA ' ) " }
, 2 ) AS raw_corrective_labor_time_jam ,
AND a . asset_assetnum = ' {assetnum} '
SUM ( a . jumlah_labor ) AS raw_corrective_labor_technician
and a . wonum not like ' T % '
FROM
{ f " AND a.wojp8 != ' S1 ' " if worktype == ' CM ' else " " }
public . wo_staging_3 AS a
group by DATE_PART ( ' year ' , a . reportdate ) ;
WHERE
"""
a . unit = ' 3 '
GROUP BY
a . worktype ,
a . assetnum ,
EXTRACT ( YEAR FROM a . reportdate )
) AS tbl
WHERE
tbl . worktype = ' {worktype} '
AND tbl . assetnum = ' {assetnum} '
ORDER BY
tbl . assetnum ,
tbl . year ,
tbl . worktype
"""
# Eksekusi query dan fetch hasil
# Eksekusi query dan fetch hasil
cursor . execute ( query )
cursor . execute ( query )
return cursor . fetchall ( )
return cursor . fetchall ( )
@ -87,25 +140,247 @@ def get_data_tahun(cursor):
cursor . execute ( query )
cursor . execute ( query )
return cursor . fetchall ( )
return cursor . fetchall ( )
async def insert_ms_equipment_data ( ) :
connection = None
try :
connection , connection_wo_db = get_connection ( )
cursor_db_app = connection . cursor ( cursor_factory = DictCursor )
query_main = " SELECT DISTINCT(assetnum) FROM ms_equipment_master "
cursor_db_app . execute ( query_main )
results = cursor_db_app . fetchall ( )
inserted = 0
processed = 0
total = len ( results )
if total == 0 :
print ( " No assetnum to insert. " )
else :
start_time = datetime . now ( )
print ( f " Starting insert of { total } assetnum into lcc_ms_equipment_data " )
for idx , row in enumerate ( results , start = 1 ) :
assetnum = row . get ( " assetnum " )
try :
# skip null/empty assetnum
if not assetnum :
print ( f " [ { idx } / { total } ] Skipping empty assetnum " )
else :
# check existing
cursor_db_app . execute (
" SELECT 1 FROM lcc_ms_equipment_data WHERE assetnum = %s LIMIT 1 " ,
( assetnum , ) ,
)
if cursor_db_app . fetchone ( ) :
print ( f " [ { idx } / { total } ] Already exists: { assetnum } " )
else :
# provide an id since the table enforces NOT NULL on id
cursor_db_app . execute (
" INSERT INTO lcc_ms_equipment_data (id, assetnum) VALUES ( %s , %s ) " ,
( str ( uuid4 ( ) ) , assetnum ) ,
)
connection . commit ( )
inserted + = 1
print ( f " [ { idx } / { total } ] Inserted: { assetnum } " )
except Exception as e :
try :
connection . rollback ( )
except Exception :
pass
print ( f " [ { idx } / { total } ] Error inserting { assetnum } : { e } " )
processed + = 1
# progress monitoring every 10 items and at end
if idx % 10 == 0 or idx == total :
elapsed = datetime . now ( ) - start_time
pct = ( idx / total ) * 100 if total else 100
print (
f " Progress: { idx } / { total } ( { pct : .1f } %) - processed { processed } , inserted { inserted } - elapsed { elapsed . total_seconds ( ) : .1f } s "
)
print ( f " Finished. Total processed: { processed } , inserted: { inserted } " )
except Exception as e :
print ( " Error saat menjalankan insert_ms_equipment_data: " , e )
try :
connection . rollback ( )
except Exception :
pass
pass
async def insert_lcca_maximo_corrective_data ( ) :
connection = None
connection_wo_db = None
production_connection = None
finished_data = [ ]
errors = [ ]
inserted_count = 0
error_count = 0
try :
connection , connection_wo_db = get_connection ( )
production_connection = get_production_connection ( )
if connection is None or connection_wo_db is None or production_connection is None :
print ( " Database connection failed. " )
return
# start total timer
start_time = datetime . now ( )
print ( f " Start insert_lcca_maximo_corrective_data at { start_time . isoformat ( ) } " )
cursor_db_app = connection . cursor ( cursor_factory = DictCursor )
cursor_wo = connection_wo_db . cursor ( cursor_factory = DictCursor )
cursor_production = production_connection . cursor ( cursor_factory = DictCursor )
check_data_query = " SELECT COUNT(*) FROM lcc_equipment_tr_data LIMIT 1 "
cursor_db_app . execute ( check_data_query )
data_count = cursor_db_app . fetchone ( ) [ 0 ]
if data_count > 0 :
truncate_query = " TRUNCATE TABLE lcc_equipment_tr_data "
cursor_db_app . execute ( truncate_query )
query_main = " SELECT DISTINCT(assetnum) FROM ms_equipment_master "
cursor_db_app . execute ( query_main )
results = cursor_db_app . fetchall ( )
if not results :
print ( " No assetnum found in ms_equipment_master " )
return
print ( f " Found { len ( results ) } assetnum entries to process. " )
current_year = datetime . now ( ) . year
for row in results :
asset_start = datetime . now ( )
assetnum = row [ " assetnum " ]
data_corrective_maintenance = get_recursive_query (
cursor_production , assetnum , worktype = " CM "
)
print ( data_corrective_maintenance )
start_year = 2015
end_year = 2056
seq = 0
for year in range ( start_year , end_year ) :
# corrective_row = next(
# (r for r in data_corrective_maintenance if r["tahun"] == year), None
# )
corrective_row = next (
( r for r in data_corrective_maintenance ) , None
)
# if corrective_row:
insert_query = """
INSERT INTO lcc_equipment_tr_data (
id , assetnum , tahun , seq , is_actual ,
raw_cm_material_cost ,
raw_cm_labor_time , rc_cm_material_cost
) VALUES ( % s , % s , % s , % s , % s , % s , % s , % s )
"""
try :
# find corrective_row for the specific year (may be None)
# don't filter by tahun — take rows sequentially for each year in the range
corrective_row = (
data_corrective_maintenance [ seq ]
if isinstance ( data_corrective_maintenance , ( list , tuple ) ) and seq < len ( data_corrective_maintenance )
else None
)
raw_cm_material_cost = (
corrective_row [ " raw_corrective_material_cost " ]
if corrective_row and corrective_row . get ( " raw_corrective_material_cost " ) is not None
else 0
)
raw_cm_labor_time = (
corrective_row [ " man_hour_peryear " ]
if corrective_row and corrective_row . get ( " man_hour_peryear " ) is not None
else 0
)
rc_cm_material_cost = raw_cm_material_cost
cursor_db_app . execute (
insert_query ,
(
str ( uuid4 ( ) ) , # id
assetnum , # assetnum
year , # tahun
seq , # seq
( 0 if year > current_year + 1 else 1 ) , # is_actual
raw_cm_material_cost , # raw_cm_material_cost
raw_cm_labor_time , # raw_cm_labor_time
rc_cm_material_cost , # rc_cm_material_cost
) ,
)
# commit per successful insert to allow continuing on later errors
connection . commit ( )
inserted_count + = 1
finished_data . append ( { " assetnum " : assetnum , " year " : year } )
print ( f " Corrective data inserted for { assetnum } in year { year } " )
except Exception as e :
# rollback the failed statement so the transaction is usable again
try :
connection . rollback ( )
except Exception :
pass
error_count + = 1
errors . append ( { " assetnum " : assetnum , " year " : year , " error " : str ( e ) } )
print ( f " Error inserting { assetnum } year { year } : { e } " )
seq + = 1
asset_elapsed = datetime . now ( ) - asset_start
print ( f " Processed asset { assetnum } in { asset_elapsed . total_seconds ( ) : .2f } s " )
# final commit for safety (no-op if nothing pending)
try :
connection . commit ( )
except Exception :
pass
except Exception as e :
print ( " Error saat menjalankan insert_lcca_maximo_corrective_data: " , e )
try :
connection . rollback ( )
except Exception :
pass
finally :
total_elapsed = None
try :
total_elapsed = datetime . now ( ) - start_time
except Exception :
pass
print ( " ========Process finished and connection closed.======== " )
print ( f " Inserted rows: { inserted_count } , Errors: { error_count } " )
if total_elapsed is not None :
print ( f " Total elapsed time: { total_elapsed . total_seconds ( ) : .2f } s " )
if errors :
print ( f " Sample error: { errors [ 0 ] } " )
if connection or connection_wo_db or production_connection :
cursor_db_app . close ( )
cursor_wo . close ( )
cursor_production . close ( )
connection . close ( )
connection_wo_db . close ( )
production_connection . close ( )
async def query_data ( RELIABILITY_APP_URL : str , token : str ) :
async def query_data ( ) :
connection = None
connection = None
connection_wo_db = None
connection_production_wo = None
try :
try :
# Mendapatkan koneksi dari config.py
# Mendapatkan koneksi dari config.py
connection , connection_wo_db = get_connection ( )
connection , connection_wo_db = get_connection ( )
if connection is None or connection_wo_db is None :
connection_production_wo = get_production_connection ( )
if connection is None or connection_wo_db is None or connection_production_wo is None :
print ( " Database connection failed. " )
print ( " Database connection failed. " )
return
return
# Membuat cursor menggunakan DictCursor
# Membuat cursor menggunakan DictCursor
cursor = connection . cursor ( cursor_factory = DictCursor )
cursor = connection . cursor ( cursor_factory = DictCursor )
cursor_wo = connection_wo_db . cursor ( cursor_factory = DictCursor )
cursor_wo = connection_ production_wo . cursor ( cursor_factory = DictCursor )
# TRUNCATE DATA
# TRUNCATE DATA
# truncate_query = "TRUNCATE TABLE lcc_equipment_tr_data"
truncate_query = " TRUNCATE TABLE lcc_equipment_tr_data RESTART IDENTITY "
# cursor.execute(truncate_query)
cursor . execute ( truncate_query )
# Query untuk mendapatkan semua data dari tabel `lcc_ms_equipment_data`
# Query untuk mendapatkan semua data dari tabel `lcc_ms_equipment_data`
query_main = " SELECT * FROM lcc_ms_equipment_data "
# query_main = "SELECT * FROM lcc_ms_equipment_data"
query_main = " SELECT DISTINCT(assetnum) FROM ms_equipment_master "
cursor . execute ( query_main )
cursor . execute ( query_main )
# Fetch semua hasil query
# Fetch semua hasil query
@ -114,17 +389,32 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
# Tahun sekarang
# Tahun sekarang
current_year = datetime . now ( ) . year
current_year = datetime . now ( ) . year
total_assets = len ( results )
processed_assets = 0
total_inserted = 0
overall_start = datetime . now ( )
print ( f " Starting processing { total_assets } assets at { overall_start . isoformat ( ) } " )
# Looping untuk setiap assetnum
# Looping untuk setiap assetnum
for row in results :
for idx, row in enumerate ( results , start = 1 ) :
assetnum = row [ " assetnum " ] # Mengambil assetnum dari hasil query
assetnum = row [ " assetnum " ] # Mengambil assetnum dari hasil query
forecasting_start_year = row [ " forecasting_start_year " ] - 1
if not assetnum or str ( assetnum ) . strip ( ) == " " :
print ( f " [ { idx } / { total_assets } ] Skipping empty assetnum " )
continue
# forecasting_start_year = row["forecasting_start_year"] - 1
forecasting_start_year = 2014
asset_start = datetime . now ( )
processed_assets + = 1
years_processed = 0
inserted_this_asset = 0
# CM
# CM
recursive_results = get_recursive_query (
data_cm = get_recursive_query ( cursor_wo , assetnum , worktype = " CM " )
cursor_wo , assetnum , worktype = " CM "
)
# PM
# PM
data_pm = get_recursive_query ( cursor_wo , assetnum , worktype = " PM " )
data_pm = get_recursive_query ( cursor_wo , assetnum , worktype = " PM " )
# PDM = Predictive Maintenance
data_predictive = get_recursive_query ( cursor_wo , assetnum , worktype = " PDM " )
# OH
# OH
data_oh = get_recursive_query ( cursor_wo , assetnum , worktype = " OH " )
data_oh = get_recursive_query ( cursor_wo , assetnum , worktype = " OH " )
# Data Tahun
# Data Tahun
@ -133,15 +423,28 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
seq = 0
seq = 0
# Looping untuk setiap tahun
# Looping untuk setiap tahun
for year in range ( forecasting_start_year , current_year + 1 ) :
for year in range ( forecasting_start_year , current_year + 1 ) :
years_processed + = 1
# print(f"Processing assetnum {assetnum} in year {year}")
# print(f"Processing assetnum {assetnum} in year {year}")
# Filter data berdasarkan tahun
# Filter data berdasarkan tahun (support both 'tahun' and 'year' column names)
recursive_row = next (
data_cm_row = next (
( r for r in recursive_results if r [ " year " ] == year ) , None
( r for r in data_cm if ( r . get ( " tahun " ) == year or r . get ( " year " ) == year ) ) ,
None ,
) # CM Corrective Maintenance
) # CM Corrective Maintenance
data_pm_row = next ( ( r for r in data_pm if r [ " year " ] == year ) , None )
data_pm_row = next (
data_oh_row = next ( ( r for r in data_oh if r [ " year " ] == year ) , None )
( r for r in data_pm if ( r . get ( " tahun " ) == year or r . get ( " year " ) == year ) ) ,
None ,
)
data_oh_row = next (
( r for r in data_oh if ( r . get ( " tahun " ) == year or r . get ( " year " ) == year ) ) ,
None ,
)
data_predictive_row = next (
( r for r in data_predictive if ( r . get ( " tahun " ) == year or r . get ( " year " ) == year ) ) ,
None ,
)
data_tahunan_row = next (
data_tahunan_row = next (
( r for r in data_tahunan if r [ " year " ] == year ) , None
( r for r in data_tahunan if ( r . get ( " tahun " ) == year or r . get ( " year " ) == year ) ) ,
None ,
)
)
# Cek apakah data sudah ada
# Cek apakah data sudah ada
@ -159,28 +462,36 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
continue
continue
if not data_exists :
if not data_exists :
print ( " Data not exists for assetnum " , assetnum )
# Insert data jika belum ada
# Insert data jika belum ada
if not recursive _row and not data_pm_row and not data_oh _row:
if not data_cm _row and not data_pm_row and not data_oh _row and not data_predictive _row:
# Jika data recursive_row tidak ada
# Jika data recursive_row tidak ada
insert_query = """
insert_query = """
INSERT INTO lcc_equipment_tr_data (
INSERT INTO lcc_equipment_tr_data (
id , assetnum , tahun , seq , is_actual ,
id , assetnum , tahun , seq , is_actual ,
raw_cm_interval , raw_cm_material_cost ,
raw_cm_interval , raw_cm_material_cost , raw_cm_labor_time , raw_cm_labor_human
raw_cm_labor_time , raw_cm_labor_human
, raw_pm_interval , raw_pm_material_cost , raw_pm_labor_time , raw_pm_labor_human
, raw_pm_interval , raw_pm_material_cost , raw_pm_labor_time , raw_pm_labor_human
, raw_oh_material_cost , raw_oh_labor_time , raw_oh_labor_human
, raw_oh_interval , raw_oh_material_cost , raw_oh_labor_time , raw_oh_labor_human
, raw_predictive_interval , raw_predictive_material_cost , raw_predictive_labor_time , raw_predictive_labor_human
, " raw_loss_output_MW " , raw_loss_output_price
, " raw_loss_output_MW " , raw_loss_output_price
) VALUES ( % s , % s , % s , % s , % s , % s , % s , % s , % s
, rc_cm_material_cost , rc_cm_labor_cost
, % s , % s , % s , % s
, rc_pm_material_cost , rc_pm_labor_cost
, % s , % s , % s
, rc_oh_material_cost , rc_oh_labor_cost
, % s , % s
, rc_predictive_labor_cost
, created_by , created_at
) VALUES (
% s , % s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s
, % s , % s
, % s , % s
, % s , % s
, % s
, ' Sys ' , NOW ( )
)
)
"""
"""
api_data = await fetch_api_data (
assetnum , year , RELIABILITY_APP_URL , token
)
print ( api_data )
cursor . execute (
cursor . execute (
insert_query ,
insert_query ,
(
(
@ -189,62 +500,112 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
year , # tahun
year , # tahun
seq , # seq
seq , # seq
1 , # is_actual
1 , # is_actual
(
1 , # raw_cm_interval (minimal 1 karena minimal 1x OH)
api_data [ " data " ] [ 0 ] [ " actual_fail " ]
if api_data
else 1
) , # raw_cm_interval (minimal 1 karena minimal 1x OH)
0 , # raw_cm_material_cost
0 , # raw_cm_material_cost
0 , # raw_cm_labor_time
0 , # raw_cm_labor_time
0 , # raw_cm_labor_human
0 , # raw_cm_labor_human
1 , # pm interval set default 1
1 , # pm interval set default 1
0 ,
0 , # raw_pm_material_cost
0 ,
0 , # raw_pm_labor_time
0 ,
0 , # raw_pm_labor_human
0 ,
0 , # raw_oh_interval set default 1
0 ,
0 , # raw_oh_material_cost
0 ,
0 , # raw_oh_labor_time
(
0 , # raw_oh_labor_human
data_tahunan_row [ " total_lost " ]
0 , # raw_predictive_interval set default 1
0 , # raw_predictive_material_cost
0 , # raw_predictive_labor_time
0 , # raw_predictive_labor_human
( # "raw_loss_output_MW"
# data_tahunan_row["total_lost"]
0
if data_tahunan_row
if data_tahunan_row
else 0
else 0
) ,
) ,
(
( # raw_loss_output_price
data_tahunan_row [ " rp_per_kwh " ]
# data_tahunan_row["rp_per_kwh"]
0
if data_tahunan_row
if data_tahunan_row
else 0
else 0
) ,
) ,
0 , # rc_cm_material_cost
0 , # rc_cm_labor_cost
0 , # rc_pm_material_cost
0 , # rc_pm_labor_cost
0 , # rc_oh_material_cost
0 , # rc_oh_labor_cost
0 , # rc_predictive_labor_cost
) ,
) ,
)
)
print ( f " Data inserted for { assetnum } in year { year } " )
inserted_this_asset + = 1
total_inserted + = 1
# print minimal per-year insert log
# print(f"Inserted default data for {assetnum} year {year}")
else :
else :
print ( " Data exists for assetnum " , assetnum )
# Jika data recursive_row ada
# Jika data recursive_row ada
# raw_cm_interval ambil dari reliability predict
insert_query = """
insert_query = """
INSERT INTO lcc_equipment_tr_data (
INSERT INTO lcc_equipment_tr_data (
id , assetnum , tahun , seq , is_actual ,
id , assetnum , tahun , seq , is_actual ,
raw_cm_interval , raw_cm_material_cost ,
raw_cm_interval , raw_cm_material_cost , raw_cm_labor_time , raw_cm_labor_human
raw_cm_labor_time , raw_cm_labor_human
, raw_pm_interval , raw_pm_material_cost , raw_pm_labor_time , raw_pm_labor_human
, raw_pm_interval , raw_pm_material_cost , raw_pm_labor_time , raw_pm_labor_human
, raw_oh_material_cost , raw_oh_labor_time , raw_oh_labor_human
, raw_oh_interval , raw_oh_material_cost , raw_oh_labor_time , raw_oh_labor_human
, raw_predictive_interval , raw_predictive_material_cost , raw_predictive_labor_time , raw_predictive_labor_human
, " raw_loss_output_MW " , raw_loss_output_price
, " raw_loss_output_MW " , raw_loss_output_price
) VALUES ( % s , % s , % s , % s , % s , % s , % s , % s , % s
, " rc_cm_material_cost " , " rc_cm_labor_cost "
, % s , % s , % s , % s
, " rc_pm_material_cost " , " rc_pm_labor_cost "
, % s , % s , % s
, " rc_oh_material_cost " , " rc_oh_labor_cost "
, % s , % s
, " rc_predictive_labor_cost "
, created_by , created_at
) VALUES (
% s , % s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s , % s , % s ,
% s , % s ,
% s , % s ,
% s , % s ,
% s , % s ,
% s ,
' Sys ' , NOW ( )
)
)
"""
"""
api_data = await fetch_api_data (
# Normalize row values to avoid inserting NULL and avoid division by zero
assetnum , year , RELIABILITY_APP_URL , token
raw_cm_interval = data_cm_row . get ( " raw_cm_interval " ) if data_cm_row and data_cm_row . get ( " raw_cm_interval " ) is not None else 0
)
raw_cm_material_cost = data_cm_row . get ( " raw_cm_material_cost " ) if data_cm_row and data_cm_row . get ( " raw_cm_material_cost " ) is not None else 0
print ( api_data )
avg_cm_material_cost = ( raw_cm_material_cost / raw_cm_interval ) if raw_cm_interval else 0
if api_data and " data " in api_data and api_data [ " data " ] :
raw_cm_labor_time = data_cm_row . get ( " raw_cm_labor_time " ) if data_cm_row and data_cm_row . get ( " raw_cm_labor_time " ) is not None else 0
print ( " API data: " , api_data [ " data " ] [ 0 ] [ " actual_fail " ] )
raw_cm_labor_human = data_cm_row . get ( " raw_cm_labor_human " ) if data_cm_row and data_cm_row . get ( " raw_cm_labor_human " ) is not None else 0
else :
print (
raw_pm_interval = data_pm_row . get ( " raw_pm_interval " ) if data_pm_row and data_pm_row . get ( " raw_pm_interval " ) is not None else 0
f " No API data available for { assetnum } in year { year } "
raw_pm_material_cost = data_pm_row . get ( " raw_pm_material_cost " ) if data_pm_row and data_pm_row . get ( " raw_pm_material_cost " ) is not None else 0
)
raw_pm_labor_time = data_pm_row . get ( " raw_pm_labor_time " ) if data_pm_row and data_pm_row . get ( " raw_pm_labor_time " ) is not None else 0
raw_pm_labor_human = data_pm_row . get ( " raw_pm_labor_human " ) if data_pm_row and data_pm_row . get ( " raw_pm_labor_human " ) is not None else 0
raw_oh_interval = data_oh_row . get ( " raw_oh_interval " ) if data_oh_row and data_oh_row . get ( " raw_oh_interval " ) is not None else 0
raw_oh_material_cost = data_oh_row . get ( " raw_oh_material_cost " ) if data_oh_row and data_oh_row . get ( " raw_oh_material_cost " ) is not None else 0
raw_oh_labor_time = data_oh_row . get ( " raw_oh_labor_time " ) if data_oh_row and data_oh_row . get ( " raw_oh_labor_time " ) is not None else 0
raw_oh_labor_human = data_oh_row . get ( " raw_oh_labor_human " ) if data_oh_row and data_oh_row . get ( " raw_oh_labor_human " ) is not None else 0
raw_pdm_interval = data_predictive_row . get ( " raw_predictive_interval " ) if data_predictive_row and data_predictive_row . get ( " raw_predictive_interval " ) is not None else 0
raw_pdm_material_cost = data_predictive_row . get ( " raw_predictive_material_cost " ) if data_predictive_row and data_predictive_row . get ( " raw_predictive_material_cost " ) is not None else 0
raw_pdm_labor_time = data_predictive_row . get ( " raw_predictive_labor_time " ) if data_predictive_row and data_predictive_row . get ( " raw_predictive_labor_time " ) is not None else 0
raw_pdm_labor_human = data_predictive_row . get ( " raw_predictive_labor_human " ) if data_predictive_row and data_predictive_row . get ( " raw_predictive_labor_human " ) is not None else 0
raw_loss_output_MW = data_tahunan_row . get ( " total_lost " ) if data_tahunan_row and data_tahunan_row . get ( " total_lost " ) is not None else 0
raw_loss_output_price = data_tahunan_row . get ( " rp_per_kwh " ) if data_tahunan_row and data_tahunan_row . get ( " rp_per_kwh " ) is not None else 0
rc_cm_material_cost = data_cm_row . get ( " raw_cm_material_cost " ) if data_cm_row and data_cm_row . get ( " raw_cm_material_cost " ) is not None else 0
rc_cm_labor_cost = data_cm_row . get ( " raw_cm_labor_time " ) * data_cm_row . get ( " rc_cm_labor_human " ) * data_tahunan_row . get ( " man_hour " ) if data_cm_row and data_cm_row . get ( " rc_cm_labor_cost " ) and data_cm_row . get ( " rc_cm_labor_human " ) and data_tahunan_row . get ( " man_hour " ) is not None else 0
rc_pm_material_cost = data_pm_row . get ( " raw_pm_material_cost " ) if data_pm_row and data_pm_row . get ( " raw_pm_material_cost " ) is not None else 0
rc_pm_labor_cost = data_pm_row . get ( " raw_pm_labor_time " ) * data_pm_row . get ( " rc_pm_labor_human " ) * data_tahunan_row . get ( " man_hour " ) if data_pm_row and data_pm_row . get ( " rc_pm_labor_cost " ) and data_pm_row . get ( " rc_pm_labor_human " ) and data_tahunan_row . get ( " man_hour " ) is not None else 0
rc_oh_material_cost = data_oh_row . get ( " raw_oh_material_cost " ) if data_oh_row and data_oh_row . get ( " raw_oh_material_cost " ) is not None else 0
rc_oh_labor_cost = data_oh_row . get ( " raw_oh_labor_time " ) * data_oh_row . get ( " rc_oh_labor_human " ) * data_tahunan_row . get ( " man_hour " ) if data_oh_row and data_oh_row . get ( " rc_oh_labor_cost " ) and data_oh_row . get ( " rc_oh_labor_human " ) and data_tahunan_row . get ( " man_hour " ) is not None else 0
rc_predictive_labor_cost = data_predictive_row . get ( " raw_predictive_labor_human " ) * data_tahunan_row . get ( " man_hour " ) if data_predictive_row and data_predictive_row . get ( " rc_predictive_labor_cost " ) and data_tahunan_row . get ( " man_hour " ) is not None else 0
cursor . execute (
cursor . execute (
insert_query ,
insert_query ,
(
(
@ -253,117 +614,74 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
year , # tahun
year , # tahun
seq , # seq
seq , # seq
1 , # is_actual
1 , # is_actual
(
raw_cm_interval , # raw_cm_interval
api_data [ " data " ] [ 0 ] [ " actual_fail " ]
avg_cm_material_cost , # avg raw_cm_material_cost per interval
if api_data
raw_cm_labor_time , # raw_cm_labor_time
else (
raw_cm_labor_human , # raw_cm_labor_human
recursive_row [ " raw_corrective_failure_interval " ]
raw_pm_interval , # raw_pm_interval
+ 1
raw_pm_material_cost , # raw_pm_material_cost
if recursive_row
raw_pm_labor_time , # raw_pm_labor_time
else 1
raw_pm_labor_human ,
)
raw_oh_interval ,
) , # raw_cm_interval nanti ambil dari API reliability predict
raw_oh_material_cost ,
(
raw_oh_labor_time ,
recursive_row [ " raw_corrective_material_cost " ]
raw_oh_labor_human ,
if recursive_row
raw_pdm_interval ,
else 0
raw_pdm_material_cost ,
) , # raw_cm_material_cost
raw_pdm_labor_time ,
(
raw_pdm_labor_human ,
(
raw_loss_output_MW ,
recursive_row [ " raw_corrective_labor_time_jam " ]
raw_loss_output_price ,
or 0
rc_cm_material_cost ,
)
rc_cm_labor_cost ,
if recursive_row
rc_pm_material_cost ,
else 0
rc_pm_labor_cost ,
) , # raw_cm_labor_time
rc_oh_material_cost ,
(
rc_oh_labor_cost ,
(
rc_predictive_labor_cost ,
max (
recursive_row [
" raw_corrective_labor_technician "
] ,
1 ,
)
if recursive_row [
" raw_corrective_labor_time_jam "
]
else 0
)
if recursive_row
else 0
) , # raw_cm_labor_human
1 , # raw_pm_interval
(
data_pm_row [ " raw_corrective_material_cost " ]
if data_pm_row
else 0
) , # raw_pm_material_cost
(
( data_pm_row [ " raw_corrective_labor_time_jam " ] or 0 )
if data_pm_row
else 0
) , # raw_pm_labor_time
(
(
max (
data_pm_row [
" raw_corrective_labor_technician "
] ,
1 ,
)
if data_pm_row [ " raw_corrective_labor_time_jam " ]
else 0
)
if data_pm_row
else 0
) , # raw_pm_labor_human
(
data_oh_row [ " raw_corrective_material_cost " ]
if data_oh_row
else 0
) , # raw_oh_material_cost
(
( data_oh_row [ " raw_corrective_labor_time_jam " ] or 0 )
if data_oh_row
else 0
) , # raw_oh_labor_time
(
(
max (
data_oh_row [
" raw_corrective_labor_technician "
] ,
1 ,
)
if data_oh_row [ " raw_corrective_labor_time_jam " ]
else 0
)
if data_oh_row
else 0
) , # raw_oh_labor_human
(
data_tahunan_row [ " total_lost " ]
if data_tahunan_row
else 0
)
/ (
recursive_row [ " raw_corrective_failure_interval " ] + 1
if recursive_row
else 1
) , # raw_loss_output_MW
(
data_tahunan_row [ " rp_per_kwh " ]
if data_tahunan_row
else 0
) ,
) ,
) ,
)
)
print ( f " Data inserted for { assetnum } in year { year } " )
inserted_this_asset + = 1
total_inserted + = 1
seq = seq + 1
seq = seq + 1
# commit per asset to persist progress and free transaction
try :
connection . commit ( )
except Exception :
try :
connection . rollback ( )
except Exception :
pass
asset_elapsed = datetime . now ( ) - asset_start
total_elapsed = datetime . now ( ) - overall_start
pct_assets = ( idx / total_assets ) * 100 if total_assets else 100
# progress per asset
print (
f " [ { idx } / { total_assets } ] Asset { assetnum } processed. "
f " Inserted this asset: { inserted_this_asset } . "
f " Asset time: { asset_elapsed . total_seconds ( ) : .2f } s. "
f " Total inserted: { total_inserted } . "
f " Overall elapsed: { total_elapsed . total_seconds ( ) : .2f } s. "
f " Progress: { pct_assets : .1f } % "
)
# periodic summary every 10 assets
if idx % 10 == 0 or idx == total_assets :
print ( f " SUMMARY: { idx } / { total_assets } assets processed, { total_inserted } rows inserted, elapsed { total_elapsed . total_seconds ( ) : .1f } s " )
# Commit perubahan
# Commit perubahan
connection . commit ( )
try :
connection . commit ( )
except Exception :
try :
connection . rollback ( )
except Exception :
pass
print ( f " Finished processing all assets. Total assets: { total_assets } , total inserted: { total_inserted } , total time: { ( datetime . now ( ) - overall_start ) . total_seconds ( ) : .2f } s " )
except Exception as e :
except Exception as e :
print ( " Error saat menjalankan query: " , e )
print ( " Error saat menjalankan query: " , e )
@ -372,8 +690,20 @@ async def query_data(RELIABILITY_APP_URL: str, token: str):
# Menutup koneksi
# Menutup koneksi
print ( " ========Process finished and connection closed.======== " )
print ( " ========Process finished and connection closed.======== " )
if connection or connection_wo_db :
if connection or connection_wo_db :
cursor . close ( )
try :
cursor_wo . close ( )
cursor . close ( )
connection . close ( )
except Exception :
connection_wo_db . close ( )
pass
try :
cursor_wo . close ( )
except Exception :
pass
try :
connection . close ( )
except Exception :
pass
try :
connection_wo_db . close ( )
except Exception :
pass
# print("========Process finished and connection closed.========")
# print("========Process finished and connection closed.========")