fix: minor bug

main
Cizz22 11 months ago
parent a2ebe93406
commit e8c7762aff

@ -33,6 +33,7 @@ async def get_target_reliability(
db_session: DbSession,
oh_session_id: Optional[str] = Query(None),
eaf_input: float = Query(0.5),
duration: int = Query(8000),
):
"""Get all scope pagination."""
if not oh_session_id:
@ -41,10 +42,14 @@ async def get_target_reliability(
detail="oh_session_id is required",
)
results = get_eaf_timeline(
oh_session_id=oh_session_id, eaf_input=eaf_input
results = await get_eaf_timeline(
db_session=db_session,
oh_session_id=oh_session_id,
eaf_input=eaf_input,
oh_duration=duration
)
return StandardResponse(
data=results,
message="Data retrieved successfully",

@ -11,7 +11,9 @@ from datetime import datetime, timedelta
import random
from typing import List
from .utils import generate_down_periods
from src.overhaul_scope.service import get as get_overhaul
from bisect import bisect_left
from collections import defaultdict
# async def get_all_target_reliability(
# *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0
# ):
@ -95,86 +97,179 @@ from .utils import generate_down_periods
# return filtered_aggregated_result
def get_eaf_timeline(eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
# async def get_eaf_timeline(*, db_session, eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
# """
# Generate a timeline of EAF values based on input parameters.
# Args:
# eaf_input (float): EAF value to check against thresholds
# oh_session_id (str): OH session identifier
# Returns:
# set[dict]: Set of dictionaries containing dates and their EAF values
# """
# # Define EAF thresholds
# MIN_EAF = 30
# MAX_EAF = 80
# #Get OH session
# oh_session = await get_overhaul(db_session=db_session, overhaul_session_id=oh_session_id)
# # Dummy OH session dates
# oh_session_start = oh_session.start_date
# oh_session_end = oh_session_start + timedelta(hours=oh_duration)
# # Initialize result set
# results = []
# # Determine date range based on EAF input
# if MIN_EAF <= eaf_input <= MAX_EAF:
# start_date = oh_session_start
# end_date = oh_session_end
# elif eaf_input < MIN_EAF:
# # If below minimum, extend end date by 2 months weeks
# start_date = oh_session_start
# end_date = oh_session_end + timedelta(days=360)
# else: # eaf_input > MAX_EAF
# # If above maximum, reduce end date by 1 month
# start_date = oh_session_start
# end_date = oh_session_end - timedelta(days=180)
# total_hours = (end_date - start_date).total_seconds() / 3600
# # Generate random down periods
# results = []
# # Generate down periods for each EAF scenario
# down_periods = {
# 'eaf1': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf2': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf3': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf4': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90)
# }
# # Define EAF values for downtime periods
# eaf_downtime_values = {
# 'eaf1': 0.8,
# 'eaf2': 0.65,
# 'eaf3': 0.35,
# 'eaf4': 0
# }
# # Generate daily entries
# current_time = start_date
# while current_time <= end_date:
# time_str = current_time.strftime('%Y-%m-%d %H:00:00')
# # Initialize dictionary for this hour with default values (system up)
# hourly_entry = {
# 'date': time_str,
# 'eaf1_value': 1.0,
# 'eaf2_value': 0.75,
# 'eaf3_value': 0.6,
# 'eaf4_value': 0.3
# }
# # Check each EAF scenario
# for eaf_key in down_periods:
# # Check if current hour is in any down period for this EAF
# for period_start, period_end in down_periods[eaf_key]:
# if period_start <= current_time <= period_end:
# hourly_entry[f'{eaf_key}_value'] = eaf_downtime_values[eaf_key]
# break
# results.append(hourly_entry)
# current_time += timedelta(hours=1)
# return results
async def get_eaf_timeline(*, db_session, eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
"""
Generate a timeline of EAF values based on input parameters.
Optimized version with reduced time complexity.
Args:
eaf_input (float): EAF value to check against thresholds
oh_session_id (str): OH session identifier
oh_duration (int): Duration in hours
Returns:
set[dict]: Set of dictionaries containing dates and their EAF values
List[dict]: List of dictionaries containing dates and their EAF values
"""
# Define EAF thresholds
MIN_EAF = 0.3
MAX_EAF = 0.8
MIN_EAF = 30
MAX_EAF = 80
# Dummy OH session dates
oh_session_start = datetime(2024, 1, 1)
oh_session_end = oh_session_start + timedelta(hours=oh_duration)
oh_session = await get_overhaul(db_session=db_session, overhaul_session_id=oh_session_id)
oh_session_start = datetime.fromisoformat(oh_session.start_date.isoformat())
# Initialize result set
results = []
# Determine date range based on EAF input
# Determine date range
if MIN_EAF <= eaf_input <= MAX_EAF:
start_date = oh_session_start
end_date = oh_session_end
end_date = oh_session_start + timedelta(hours=oh_duration)
elif eaf_input < MIN_EAF:
# If below minimum, extend end date by 2 months weeks
start_date = oh_session_start
end_date = oh_session_end + timedelta(days=360)
end_date = oh_session_start + timedelta(hours=oh_duration, days=360)
else: # eaf_input > MAX_EAF
# If above maximum, reduce end date by 1 month
start_date = oh_session_start
end_date = oh_session_end - timedelta(days=180)
total_hours = (end_date - start_date).total_seconds() / 3600
end_date = oh_session_start + timedelta(hours=oh_duration) - timedelta(days=180)
# Generate random down periods
results = []
# Generate down periods for each EAF scenario
down_periods = {
'eaf1': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf2': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf3': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
'eaf4': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90)
# Default EAF values when system is up
default_values = {
'eaf1_value': 1.0,
'eaf2_value': 0.75,
'eaf3_value': 0.6,
'eaf4_value': 0.3
}
# Define EAF values for downtime periods
eaf_downtime_values = {
# EAF values during downtime
downtime_values = {
'eaf1': 0.8,
'eaf2': 0.65,
'eaf3': 0.35,
'eaf4': 0
}
# Generate daily entries
current_time = start_date
while current_time <= end_date:
time_str = current_time.strftime('%Y-%m-%d %H:00:00')
# Generate down periods for all EAF scenarios at once
all_down_periods = {}
for eaf_key in ['eaf1', 'eaf2', 'eaf3', 'eaf4']:
periods = generate_down_periods(oh_session_start, end_date, 5, min_duration=30, max_duration=90)
# Sort periods by start time for binary search
all_down_periods[eaf_key] = sorted(periods, key=lambda x: x[0])
# Initialize dictionary for this hour with default values (system up)
hourly_entry = {
'date': time_str,
'eaf1_value': 1.0,
'eaf2_value': 0.75,
'eaf3_value': 0.6,
'eaf4_value': 0.3
}
# Check each EAF scenario
for eaf_key in down_periods:
# Check if current hour is in any down period for this EAF
for period_start, period_end in down_periods[eaf_key]:
if period_start <= current_time <= period_end:
hourly_entry[f'{eaf_key}_value'] = eaf_downtime_values[eaf_key]
break
# Create a list of all state change times
state_changes = defaultdict(dict)
for eaf_key, periods in all_down_periods.items():
for start, end in periods:
# Record state changes at period boundaries
state_changes[start][eaf_key] = downtime_values[eaf_key]
state_changes[end + timedelta(hours=1)][eaf_key] = default_values[f'{eaf_key}_value']
results.append(hourly_entry)
current_time += timedelta(hours=1)
# Convert state_changes to sorted list of times
change_times = sorted(state_changes.keys())
results = []
current_values = default_values.copy()
# Process changes between state change points
current_time = oh_session_start
idx = 0
while current_time <= end_date:
# Update values if we've hit a state change point
if idx < len(change_times) and current_time >= change_times[idx]:
changes = state_changes[change_times[idx]]
for eaf_key, value in changes.items():
current_values[f'{eaf_key}_value'] = value
idx += 1
results.append({
'date': current_time.strftime('%Y-%m-%d %H:00:00'),
**current_values
})
current_time += timedelta(hours=1)
return results

Loading…
Cancel
Save