modify target reliability endpoint

main
Cizz22 11 months ago
parent 9ced34bf16
commit adae35bf29

@ -6,20 +6,43 @@ from fastapi.params import Query
from src.database.core import DbSession
from src.models import StandardResponse
from .service import get_all_target_reliability
from .service import get_eaf_timeline
router = APIRouter()
# @router.get("", response_model=StandardResponse[List[Dict]])
# async def get_target_reliability(
# db_session: DbSession,
# scope_name: Optional[str] = Query(None),
# eaf_threshold: float = Query(100),
# ):
# """Get all scope pagination."""
# results = await get_all_target_reliability(
# db_session=db_session, scope_name=scope_name, eaf_threshold=eaf_threshold
# )
# return StandardResponse(
# data=results,
# message="Data retrieved successfully",
# )
@router.get("", response_model=StandardResponse[List[Dict]])
async def get_target_reliability(
db_session: DbSession,
scope_name: Optional[str] = Query(None),
eaf_threshold: float = Query(100),
oh_session_id: Optional[str] = Query(None),
eaf_input: float = Query(0.5),
):
"""Get all scope pagination."""
results = await get_all_target_reliability(
db_session=db_session, scope_name=scope_name, eaf_threshold=eaf_threshold
if not oh_session_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="oh_session_id is required",
)
results = get_eaf_timeline(
oh_session_id=oh_session_id, eaf_input=eaf_input
)
return StandardResponse(

@ -7,87 +7,153 @@ from src.database.core import DbSession
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_scope_name
from src.scope_equipment_job.service import get_equipment_level_by_no
async def get_all_target_reliability(
*, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0
):
"""Get all overhaul overview with EAF values that sum to 100%, aggregated by system."""
equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1)
equipment_subsystem = await get_equipment_level_by_no(
db_session=db_session, level=2
)
# If no equipments found, return empty list
if not equipments:
return []
import random
n = len(equipments)
base_value = 100 / n # Even distribution as base
# Generate EAF values with ±30% variation from base
eaf_values = [
base_value + random.uniform(-0.3 * base_value, 0.3 * base_value)
for _ in range(n)
]
# Normalize to ensure sum is 100
total = sum(eaf_values)
eaf_values = [(v * 100 / total) for v in eaf_values]
# Create result array of dictionaries
result = [
{
"id": equipment.id,
"assetnum": equipment.assetnum,
"location_tag": equipment.master_equipment.location_tag,
"name": equipment.master_equipment.name,
"parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system
"eaf": round(eaf, 4), # Add EAF value
}
for equipment, eaf in zip(equipments, eaf_values)
]
# Group equipment by system
sub_system = {
subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem
}
systems = {
system.id: {"name": system.name, "total_eaf": 0, "equipments": []}
for system in equipment_system
}
for equipment in result:
if equipment["parent_id"] in sub_system:
systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment)
systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"]
# Convert the systems dictionary to a list of aggregated results
aggregated_result = [
{
"system_id": system_id,
"system_name": system_data["name"],
"total_eaf": round(system_data["total_eaf"], 4),
"equipments": system_data["equipments"],
}
for system_id, system_data in systems.items()
]
# Sort the aggregated result by total_eaf in descending order
aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True)
# Filter systems up to the threshold
cumulative_eaf = 0
filtered_aggregated_result = []
for system in aggregated_result:
cumulative_eaf += system["total_eaf"]
filtered_aggregated_result.append(system)
if cumulative_eaf >= eaf_threshold:
break
return filtered_aggregated_result
from datetime import datetime, timedelta
import random
from typing import List
from .utils import generate_down_periods
# async def get_all_target_reliability(
# *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0
# ):
# """Get all overhaul overview with EAF values that sum to 100%, aggregated by system."""
# equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
# equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1)
# equipment_subsystem = await get_equipment_level_by_no(
# db_session=db_session, level=2
# )
# # If no equipments found, return empty list
# if not equipments:
# return []
# import random
# n = len(equipments)
# base_value = 100 / n # Even distribution as base
# # Generate EAF values with ±30% variation from base
# eaf_values = [
# base_value + random.uniform(-0.3 * base_value, 0.3 * base_value)
# for _ in range(n)
# ]
# # Normalize to ensure sum is 100
# total = sum(eaf_values)
# eaf_values = [(v * 100 / total) for v in eaf_values]
# # Create result array of dictionaries
# result = [
# {
# "id": equipment.id,
# "assetnum": equipment.assetnum,
# "location_tag": equipment.master_equipment.location_tag,
# "name": equipment.master_equipment.name,
# "parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system
# "eaf": round(eaf, 4), # Add EAF value
# }
# for equipment, eaf in zip(equipments, eaf_values)
# ]
# # Group equipment by system
# sub_system = {
# subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem
# }
# systems = {
# system.id: {"name": system.name, "total_eaf": 0, "equipments": []}
# for system in equipment_system
# }
# for equipment in result:
# if equipment["parent_id"] in sub_system:
# systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment)
# systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"]
# # Convert the systems dictionary to a list of aggregated results
# aggregated_result = [
# {
# "system_id": system_id,
# "system_name": system_data["name"],
# "total_eaf": round(system_data["total_eaf"], 4),
# "equipments": system_data["equipments"],
# }
# for system_id, system_data in systems.items()
# ]
# # Sort the aggregated result by total_eaf in descending order
# aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True)
# # Filter systems up to the threshold
# cumulative_eaf = 0
# filtered_aggregated_result = []
# for system in aggregated_result:
# cumulative_eaf += system["total_eaf"]
# filtered_aggregated_result.append(system)
# if cumulative_eaf >= eaf_threshold:
# break
# return filtered_aggregated_result
def get_eaf_timeline(eaf_input: float, oh_session_id: str) -> List[dict]:
"""
Generate a timeline of EAF values based on input parameters.
Args:
eaf_input (float): EAF value to check against thresholds
oh_session_id (str): OH session identifier
Returns:
set[dict]: Set of dictionaries containing dates and their EAF values
"""
# Define EAF thresholds
MIN_EAF = 0.3
MAX_EAF = 0.8
# Dummy OH session dates
oh_session_start = datetime(2024, 1, 1)
oh_session_end = datetime(2026, 7, 30)
# Initialize result set
results = []
# Determine date range based on EAF input
if MIN_EAF <= eaf_input <= MAX_EAF:
start_date = oh_session_start
end_date = oh_session_end
elif eaf_input < MIN_EAF:
# If below minimum, extend end date by 2 months weeks
start_date = oh_session_start
end_date = oh_session_end + timedelta(days=360)
else: # eaf_input > MAX_EAF
# If above maximum, reduce end date by 1 month
start_date = oh_session_start
end_date = oh_session_end - timedelta(days=180)
# Generate random down periods
down_periods = generate_down_periods(start_date, end_date, 10)
# Generate daily entries
current_date = start_date
while current_date <= end_date:
# Convert date to string format
date_str = current_date.strftime('%Y-%m-%d')
# Set default EAF value to 1 (system up)
eaf_value = 1.0
# Check if current date is in any down period
for period_start, period_end in down_periods:
if period_start <= current_date <= period_end:
eaf_value = 0.2
break
# Add entry to timeline
results.append({
'date': date_str,
'eaf_value': eaf_value
})
current_date += timedelta(days=1)
return results

@ -0,0 +1,54 @@
from datetime import datetime, timedelta
import random
from typing import List, Optional
def generate_down_periods(start_date: datetime, end_date: datetime,
num_periods: Optional[int] = None, min_duration: int = 3,
max_duration: int = 7) -> list[tuple[datetime, datetime]]:
"""
Generate random system down periods within a date range.
Args:
start_date (datetime): Start date of the overall period
end_date (datetime): End date of the overall period
num_periods (int, optional): Number of down periods to generate.
If None, generates 1-3 periods randomly
min_duration (int): Minimum duration of each down period in days
max_duration (int): Maximum duration of each down period in days
Returns:
list[tuple[datetime, datetime]]: List of (start_date, end_date) tuples
for each down period
"""
if num_periods is None:
num_periods = random.randint(1, 3)
total_days = (end_date - start_date).days
down_periods = []
# Generate random down periods
for _ in range(num_periods):
# Random duration for this period
duration = random.randint(min_duration, max_duration)
# Ensure we don't exceed the total date range
latest_possible_start = total_days - duration
if latest_possible_start < 0:
continue
# Random start day within available range
start_day = random.randint(0, latest_possible_start)
period_start = start_date + timedelta(days=start_day)
period_end = period_start + timedelta(days=duration)
# Check for overlaps with existing periods
overlaps = any(
(p_start <= period_end and period_start <= p_end)
for p_start, p_end in down_periods
)
if not overlaps:
down_periods.append((period_start, period_end))
return sorted(down_periods)

@ -199,6 +199,8 @@ async def get_corrective_cost_time_chart(
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
except Exception as e:
@ -306,6 +308,8 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
"num_failures": 0,
"day": i + 1,
}
## Add risk Cost
# risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App)
for eq in scope_calculation.equipment_results:
if not eq.is_included:
@ -486,6 +490,8 @@ async def create_calculation_result_service(
# Calculate optimum points using total costs
total_cost = total_corrective_costs + overhaul_cost_points
optimum_oh_index = np.argmin(total_cost)
raise Exception(optimum_oh_index)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(

Loading…
Cancel
Save