You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
6.1 KiB
Python

from typing import Optional
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_scope_name
from src.scope_equipment_job.service import get_equipment_level_by_no
from datetime import datetime, timedelta
import random
from typing import List
from .utils import generate_down_periods
# async def get_all_target_reliability(
# *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0
# ):
# """Get all overhaul overview with EAF values that sum to 100%, aggregated by system."""
# equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
# equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1)
# equipment_subsystem = await get_equipment_level_by_no(
# db_session=db_session, level=2
# )
# # If no equipments found, return empty list
# if not equipments:
# return []
# import random
# n = len(equipments)
# base_value = 100 / n # Even distribution as base
# # Generate EAF values with ±30% variation from base
# eaf_values = [
# base_value + random.uniform(-0.3 * base_value, 0.3 * base_value)
# for _ in range(n)
# ]
# # Normalize to ensure sum is 100
# total = sum(eaf_values)
# eaf_values = [(v * 100 / total) for v in eaf_values]
# # Create result array of dictionaries
# result = [
# {
# "id": equipment.id,
# "assetnum": equipment.assetnum,
# "location_tag": equipment.master_equipment.location_tag,
# "name": equipment.master_equipment.name,
# "parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system
# "eaf": round(eaf, 4), # Add EAF value
# }
# for equipment, eaf in zip(equipments, eaf_values)
# ]
# # Group equipment by system
# sub_system = {
# subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem
# }
# systems = {
# system.id: {"name": system.name, "total_eaf": 0, "equipments": []}
# for system in equipment_system
# }
# for equipment in result:
# if equipment["parent_id"] in sub_system:
# systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment)
# systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"]
# # Convert the systems dictionary to a list of aggregated results
# aggregated_result = [
# {
# "system_id": system_id,
# "system_name": system_data["name"],
# "total_eaf": round(system_data["total_eaf"], 4),
# "equipments": system_data["equipments"],
# }
# for system_id, system_data in systems.items()
# ]
# # Sort the aggregated result by total_eaf in descending order
# aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True)
# # Filter systems up to the threshold
# cumulative_eaf = 0
# filtered_aggregated_result = []
# for system in aggregated_result:
# cumulative_eaf += system["total_eaf"]
# filtered_aggregated_result.append(system)
# if cumulative_eaf >= eaf_threshold:
# break
# return filtered_aggregated_result
def get_eaf_timeline(eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
"""
Generate a timeline of EAF values based on input parameters.
Args:
eaf_input (float): EAF value to check against thresholds
oh_session_id (str): OH session identifier
Returns:
set[dict]: Set of dictionaries containing dates and their EAF values
"""
# Define EAF thresholds
MIN_EAF = 0.3
MAX_EAF = 0.8
# Dummy OH session dates
oh_session_start = datetime(2024, 1, 1)
oh_session_end = oh_session_start + timedelta(hours=oh_duration)
# Initialize result set
results = []
# Determine date range based on EAF input
if MIN_EAF <= eaf_input <= MAX_EAF:
start_date = oh_session_start
end_date = oh_session_end
elif eaf_input < MIN_EAF:
# If below minimum, extend end date by 2 months weeks
start_date = oh_session_start
end_date = oh_session_end + timedelta(days=360)
else: # eaf_input > MAX_EAF
# If above maximum, reduce end date by 1 month
start_date = oh_session_start
end_date = oh_session_end - timedelta(days=180)
total_hours = (end_date - start_date).total_seconds() / 3600
# Generate random down periods
results = []
# Generate down periods for each EAF scenario
down_periods = {
'eaf1': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf2': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf3': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf4': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90)
}
# Define EAF values for downtime periods
eaf_downtime_values = {
'eaf1': 0.8,
'eaf2': 0.65,
'eaf3': 0.35,
'eaf4': 0
}
# Generate daily entries
current_time = start_date
while current_time <= end_date:
time_str = current_time.strftime('%Y-%m-%d %H:00:00')
# Initialize dictionary for this hour with default values (system up)
hourly_entry = {
'date': time_str,
'eaf1_value': 1.0,
'eaf2_value': 0.75,
'eaf3_value': 0.6,
'eaf4_value': 0.3
}
# Check each EAF scenario
for eaf_key in down_periods:
# Check if current hour is in any down period for this EAF
for period_start, period_end in down_periods[eaf_key]:
if period_start <= current_time <= period_end:
hourly_entry[f'{eaf_key}_value'] = eaf_downtime_values[eaf_key]
break
results.append(hourly_entry)
current_time += timedelta(hours=1)
return results