fix start end date

feature/reliability_stat
Cizz22 8 months ago
parent c4ce88a921
commit 0a66bd0c76

@ -10,7 +10,7 @@ from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all_by_session_id
from src.overhaul_scope.service import get as get_scope
from src.overhaul_scope.service import get as get_scope, get_prev_oh
from src.utils import get_latest_numOfFail
from src.workorder.model import MasterWorkOrder
@ -138,6 +138,16 @@ async def get_corrective_cost_time_chart(
raise
# async def get_corrective_cost_time_chart(
# material_cost: float,
# service_cost: float,
# location_tag: str,
# token,
# start_date: datetime.datetime,
# end_date: datetime.datetime
# ) -> Tuple[np.ndarray, np.ndarray]:
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
@ -148,6 +158,159 @@ async def get_corrective_cost_time_chart(
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + datetime.timedelta(days=1)
# Initialize monthly data dictionary
monthly_data = {}
latest_num = 1
# Handle historical data (any portion before or including today)
historical_start = start_date if start_date <= today else None
historical_end = min(today, end_date)
if historical_start and historical_start <= historical_end:
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{historical_start.strftime('%Y-%m-%d')}/{historical_end.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_history,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
history_data = response.json()
# Process historical data - accumulate failures by month
history_dict = {}
monthly_failures = {}
for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
# Initialize if first occurrence of this month
if month_key not in history_dict:
history_dict[month_key] = 0
# Accumulate failures for this month
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
if sorted_months:
failures = np.array([history_dict[month] for month in sorted_months])
cum_failure = np.cumsum(failures)
for month_key in sorted_months:
monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# Update monthly_data with cumulative historical data
monthly_data.update(monthly_failures)
# Get the latest number for predictions if we have historical data
if failures.size > 0:
latest_num = max(1, failures[-1]) # Use the last month's failures, minimum 1
except Exception as e:
raise Exception(f"Error fetching historical data: {e}")
# Handle prediction data (any portion after today)
prediction_start = max(tomorrow, start_date)
if end_date >= tomorrow:
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{prediction_start.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
prediction_data = response.json()
# Process prediction data
if prediction_data["data"]:
# Use the last prediction value for future months
latest_prediction = prediction_data["data"][-1]["num_fail"]
# Ensure the value is at least 1 and rounded
latest_num = max(1, round(latest_prediction)) if latest_prediction is not None else 1
# Create prediction dictionary
prediction_dict = {}
for item in prediction_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
prediction_dict[month_key] = round(item["num_fail"]) if item["num_fail"] is not None else latest_num
# Update monthly_data with prediction data
for key in prediction_dict:
monthly_data[key] = prediction_dict[key]
except Exception as e:
print(f"Error fetching prediction data: {e}")
# If we can't get prediction data but the range is in the future,
# we need to at least populate with our best estimate
if not monthly_data:
# Create a default entry for the start month
start_month_key = datetime.datetime(start_date.year, start_date.month, 1)
monthly_data[start_month_key] = latest_num
# Fill in any missing months in the range
current_date = datetime.datetime(start_date.year, start_date.month, 1)
end_month = datetime.datetime(end_date.year, end_date.month, 1)
while current_date <= end_month:
if current_date not in monthly_data:
# Try to find the most recent month with data
prev_months = [m for m in monthly_data.keys() if m < current_date]
if prev_months:
# Use the most recent previous month's data
latest_month = max(prev_months)
monthly_data[current_date] = monthly_data[latest_month]
else:
# If no previous months exist, look for future months
future_months = [m for m in monthly_data.keys() if m > current_date]
if future_months:
# Use the earliest future month's data
earliest_future = min(future_months)
monthly_data[current_date] = monthly_data[earliest_future]
else:
# No data available at all, use default
monthly_data[current_date] = latest_num
# Move to next month
if current_date.month == 12:
current_date = datetime.datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
days_difference = (end_date - start_date).days
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + datetime.timedelta(days=1)
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
@ -389,8 +552,19 @@ async def get_calculation_result(db_session: DbSession, calculation_id: str):
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
start_date = datetime.datetime.combine(scope_overhaul.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope_overhaul.end_date, datetime.time.min)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope_overhaul)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# End date is the start date of the current scope
end_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
else:
# If there's no previous scope, use the start and end dates from the current scope
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
months_num = get_months_between(start_date, end_date)
@ -517,11 +691,19 @@ async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
prev_oh_scope = await get_prev_oh(db_session=db_session, overhaul_session=scope)
# Set the date range for the calculation
if prev_oh_scope:
# Start date is the day after the previous scope's end date
start_date = datetime.datetime.combine(prev_oh_scope.end_date + datetime.timedelta(days=1), datetime.time.min)
# End date is the start date of the current scope
end_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
else:
# If there's no previous scope, use the start and end dates from the current scope
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
months_num = get_months_between(start_date, end_date)
@ -561,6 +743,7 @@ async def create_calculation_result_service(
)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)

@ -22,6 +22,12 @@ async def get(
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_prev_oh(*, db_session: DbSession, overhaul_session: OverhaulScope) -> Optional[OverhaulScope]:
"""Returns a document based on the given document id."""
result = Select(OverhaulScope).where(OverhaulScope.end_date < overhaul_session.end_date).order_by(OverhaulScope.end_date.desc()).limit(1)
result = await db_session.execute(result)
return result.scalars().one_or_none()
async def get_all(*, common, scope_name: Optional[str] = None):
"""Returns all documents."""

Loading…
Cancel
Save