diff --git a/src/calculation_target_reliability/router.py b/src/calculation_target_reliability/router.py index 4e4bd72..f95c0cc 100644 --- a/src/calculation_target_reliability/router.py +++ b/src/calculation_target_reliability/router.py @@ -6,20 +6,43 @@ from fastapi.params import Query from src.database.core import DbSession from src.models import StandardResponse -from .service import get_all_target_reliability +from .service import get_eaf_timeline router = APIRouter() +# @router.get("", response_model=StandardResponse[List[Dict]]) +# async def get_target_reliability( +# db_session: DbSession, +# scope_name: Optional[str] = Query(None), +# eaf_threshold: float = Query(100), +# ): +# """Get all scope pagination.""" +# results = await get_all_target_reliability( +# db_session=db_session, scope_name=scope_name, eaf_threshold=eaf_threshold +# ) + +# return StandardResponse( +# data=results, +# message="Data retrieved successfully", +# ) + + @router.get("", response_model=StandardResponse[List[Dict]]) async def get_target_reliability( db_session: DbSession, - scope_name: Optional[str] = Query(None), - eaf_threshold: float = Query(100), + oh_session_id: Optional[str] = Query(None), + eaf_input: float = Query(0.5), ): """Get all scope pagination.""" - results = await get_all_target_reliability( - db_session=db_session, scope_name=scope_name, eaf_threshold=eaf_threshold + if not oh_session_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="oh_session_id is required", + ) + + results = get_eaf_timeline( + oh_session_id=oh_session_id, eaf_input=eaf_input ) return StandardResponse( diff --git a/src/calculation_target_reliability/service.py b/src/calculation_target_reliability/service.py index 70737a4..df46b06 100644 --- a/src/calculation_target_reliability/service.py +++ b/src/calculation_target_reliability/service.py @@ -7,87 +7,153 @@ from src.database.core import DbSession from src.scope_equipment.model import ScopeEquipment from src.scope_equipment.service import get_by_scope_name from src.scope_equipment_job.service import get_equipment_level_by_no - - -async def get_all_target_reliability( - *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0 -): - """Get all overhaul overview with EAF values that sum to 100%, aggregated by system.""" - equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name) - equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1) - equipment_subsystem = await get_equipment_level_by_no( - db_session=db_session, level=2 - ) - - # If no equipments found, return empty list - if not equipments: - return [] - - import random - - n = len(equipments) - base_value = 100 / n # Even distribution as base - - # Generate EAF values with ±30% variation from base - eaf_values = [ - base_value + random.uniform(-0.3 * base_value, 0.3 * base_value) - for _ in range(n) - ] - - # Normalize to ensure sum is 100 - total = sum(eaf_values) - eaf_values = [(v * 100 / total) for v in eaf_values] - - # Create result array of dictionaries - result = [ - { - "id": equipment.id, - "assetnum": equipment.assetnum, - "location_tag": equipment.master_equipment.location_tag, - "name": equipment.master_equipment.name, - "parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system - "eaf": round(eaf, 4), # Add EAF value - } - for equipment, eaf in zip(equipments, eaf_values) - ] - - # Group equipment by system - sub_system = { - subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem - } - systems = { - system.id: {"name": system.name, "total_eaf": 0, "equipments": []} - for system in equipment_system - } - - for equipment in result: - if equipment["parent_id"] in sub_system: - systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment) - systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"] - - # Convert the systems dictionary to a list of aggregated results - aggregated_result = [ - { - "system_id": system_id, - "system_name": system_data["name"], - "total_eaf": round(system_data["total_eaf"], 4), - "equipments": system_data["equipments"], - } - for system_id, system_data in systems.items() - ] - - # Sort the aggregated result by total_eaf in descending order - aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True) - - # Filter systems up to the threshold - cumulative_eaf = 0 - filtered_aggregated_result = [] - - for system in aggregated_result: - cumulative_eaf += system["total_eaf"] - filtered_aggregated_result.append(system) - - if cumulative_eaf >= eaf_threshold: - break - - return filtered_aggregated_result +from datetime import datetime, timedelta +import random +from typing import List +from .utils import generate_down_periods + +# async def get_all_target_reliability( +# *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0 +# ): + # """Get all overhaul overview with EAF values that sum to 100%, aggregated by system.""" + # equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name) + # equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1) + # equipment_subsystem = await get_equipment_level_by_no( + # db_session=db_session, level=2 + # ) + + # # If no equipments found, return empty list + # if not equipments: + # return [] + + # import random + + # n = len(equipments) + # base_value = 100 / n # Even distribution as base + + # # Generate EAF values with ±30% variation from base + # eaf_values = [ + # base_value + random.uniform(-0.3 * base_value, 0.3 * base_value) + # for _ in range(n) + # ] + + # # Normalize to ensure sum is 100 + # total = sum(eaf_values) + # eaf_values = [(v * 100 / total) for v in eaf_values] + + # # Create result array of dictionaries + # result = [ + # { + # "id": equipment.id, + # "assetnum": equipment.assetnum, + # "location_tag": equipment.master_equipment.location_tag, + # "name": equipment.master_equipment.name, + # "parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system + # "eaf": round(eaf, 4), # Add EAF value + # } + # for equipment, eaf in zip(equipments, eaf_values) + # ] + + # # Group equipment by system + # sub_system = { + # subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem + # } + # systems = { + # system.id: {"name": system.name, "total_eaf": 0, "equipments": []} + # for system in equipment_system + # } + + # for equipment in result: + # if equipment["parent_id"] in sub_system: + # systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment) + # systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"] + + # # Convert the systems dictionary to a list of aggregated results + # aggregated_result = [ + # { + # "system_id": system_id, + # "system_name": system_data["name"], + # "total_eaf": round(system_data["total_eaf"], 4), + # "equipments": system_data["equipments"], + # } + # for system_id, system_data in systems.items() + # ] + + # # Sort the aggregated result by total_eaf in descending order + # aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True) + + # # Filter systems up to the threshold + # cumulative_eaf = 0 + # filtered_aggregated_result = [] + + # for system in aggregated_result: + # cumulative_eaf += system["total_eaf"] + # filtered_aggregated_result.append(system) + + # if cumulative_eaf >= eaf_threshold: + # break + + # return filtered_aggregated_result + +def get_eaf_timeline(eaf_input: float, oh_session_id: str) -> List[dict]: + """ + Generate a timeline of EAF values based on input parameters. + + Args: + eaf_input (float): EAF value to check against thresholds + oh_session_id (str): OH session identifier + + Returns: + set[dict]: Set of dictionaries containing dates and their EAF values + """ + # Define EAF thresholds + MIN_EAF = 0.3 + MAX_EAF = 0.8 + + # Dummy OH session dates + oh_session_start = datetime(2024, 1, 1) + oh_session_end = datetime(2026, 7, 30) + + # Initialize result set + results = [] + + # Determine date range based on EAF input + if MIN_EAF <= eaf_input <= MAX_EAF: + start_date = oh_session_start + end_date = oh_session_end + elif eaf_input < MIN_EAF: + # If below minimum, extend end date by 2 months weeks + start_date = oh_session_start + end_date = oh_session_end + timedelta(days=360) + else: # eaf_input > MAX_EAF + # If above maximum, reduce end date by 1 month + start_date = oh_session_start + end_date = oh_session_end - timedelta(days=180) + + # Generate random down periods + down_periods = generate_down_periods(start_date, end_date, 10) + + # Generate daily entries + current_date = start_date + while current_date <= end_date: + # Convert date to string format + date_str = current_date.strftime('%Y-%m-%d') + + # Set default EAF value to 1 (system up) + eaf_value = 1.0 + + # Check if current date is in any down period + for period_start, period_end in down_periods: + if period_start <= current_date <= period_end: + eaf_value = 0.2 + break + + # Add entry to timeline + results.append({ + 'date': date_str, + 'eaf_value': eaf_value + }) + + current_date += timedelta(days=1) + + return results diff --git a/src/calculation_target_reliability/utils.py b/src/calculation_target_reliability/utils.py new file mode 100644 index 0000000..91befc2 --- /dev/null +++ b/src/calculation_target_reliability/utils.py @@ -0,0 +1,54 @@ +from datetime import datetime, timedelta +import random +from typing import List, Optional + +def generate_down_periods(start_date: datetime, end_date: datetime, + num_periods: Optional[int] = None, min_duration: int = 3, + max_duration: int = 7) -> list[tuple[datetime, datetime]]: + """ + Generate random system down periods within a date range. + + Args: + start_date (datetime): Start date of the overall period + end_date (datetime): End date of the overall period + num_periods (int, optional): Number of down periods to generate. + If None, generates 1-3 periods randomly + min_duration (int): Minimum duration of each down period in days + max_duration (int): Maximum duration of each down period in days + + Returns: + list[tuple[datetime, datetime]]: List of (start_date, end_date) tuples + for each down period + """ + if num_periods is None: + num_periods = random.randint(1, 3) + + total_days = (end_date - start_date).days + down_periods = [] + + # Generate random down periods + for _ in range(num_periods): + # Random duration for this period + duration = random.randint(min_duration, max_duration) + + # Ensure we don't exceed the total date range + latest_possible_start = total_days - duration + + if latest_possible_start < 0: + continue + + # Random start day within available range + start_day = random.randint(0, latest_possible_start) + period_start = start_date + timedelta(days=start_day) + period_end = period_start + timedelta(days=duration) + + # Check for overlaps with existing periods + overlaps = any( + (p_start <= period_end and period_start <= p_end) + for p_start, p_end in down_periods + ) + + if not overlaps: + down_periods.append((period_start, period_end)) + + return sorted(down_periods) diff --git a/src/calculation_time_constrains/service.py b/src/calculation_time_constrains/service.py index 54f2c33..4473c77 100644 --- a/src/calculation_time_constrains/service.py +++ b/src/calculation_time_constrains/service.py @@ -199,6 +199,8 @@ async def get_corrective_cost_time_chart( corrective_costs = monthly_failure * cost_per_failure + + return corrective_costs, monthly_failure except Exception as e: @@ -306,6 +308,8 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str): "num_failures": 0, "day": i + 1, } + ## Add risk Cost + # risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App) for eq in scope_calculation.equipment_results: if not eq.is_included: @@ -486,6 +490,8 @@ async def create_calculation_result_service( # Calculate optimum points using total costs total_cost = total_corrective_costs + overhaul_cost_points optimum_oh_index = np.argmin(total_cost) + + raise Exception(optimum_oh_index) numbers_of_failure = sum(total_daily_failures[:optimum_oh_index]) optimum = OptimumResult(