add api to calculation

feature/reliability_stat
Cizz22 11 months ago
parent 7ee77adbb9
commit cb979385cf

@ -51,5 +51,13 @@ class JWTBearer(HTTPBearer):
async def get_current_user(request: Request) -> UserBase:
return request.state.user
async def get_token(request: Request):
token = request.headers.get("Authorization")
if token:
return token.split(" ")[1]
return ""
CurrentUser = Annotated[UserBase, Depends(get_current_user)]
Token = Annotated[str, Depends(get_token)]

@ -10,6 +10,7 @@ from sqlalchemy import select
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.auth.service import Token
from src.overhaul_scope.service import get_all
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_assetnum
@ -77,12 +78,12 @@ async def get_create_calculation_parameters(*, db_session: DbSession, calculatio
)
async def create_calculation(*, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, created_by: str):
async def create_calculation(*,token:str, db_session: DbSession, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, created_by: str):
calculation_data = await create_param_and_data(
db_session=db_session, calculation_param_in=calculation_time_constrains_in, created_by=created_by)
results = await create_calculation_result_service(db_session=db_session, calculation=calculation_data)
results = await create_calculation_result_service(db_session=db_session, calculation=calculation_data, token=token)
return results

@ -6,7 +6,7 @@ from typing import Union
from fastapi import APIRouter
from fastapi.params import Query
from src.auth.service import CurrentUser
from src.auth.service import CurrentUser, Token
from src.database.core import DbSession
from src.models import StandardResponse
@ -28,13 +28,13 @@ router = APIRouter()
@router.post("", response_model=StandardResponse[Union[str, CalculationTimeConstrainsRead]])
async def create_calculation_time_constrains(db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, scope_calculation_id: Optional[str] = Query(None), with_results: Optional[int] = Query(0)):
async def create_calculation_time_constrains(token:Token ,db_session: DbSession, current_user: CurrentUser, calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate, scope_calculation_id: Optional[str] = Query(None), with_results: Optional[int] = Query(0)):
"""Save calculation time constrains Here"""
if scope_calculation_id:
results = await get_or_create_scope_equipment_calculation(db_session=db_session, scope_calculation_id=scope_calculation_id, calculation_time_constrains_in=calculation_time_constrains_in)
else:
results = await create_calculation(db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in, created_by=current_user.name)
results = await create_calculation(token=token ,db_session=db_session, calculation_time_constrains_in=calculation_time_constrains_in, created_by=current_user.name)
if not with_results:

@ -27,6 +27,9 @@ from .schema import CalculationTimeConstrainsRead
from .schema import OptimumResult
from .schema import CalculationSelectedEquipmentUpdate
import requests
import datetime
def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int,numEquipments:int ,decay_base: float = 1.01) -> np.ndarray:
if overhaul_cost < 0:
@ -58,25 +61,65 @@ def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int,numEquipment
# results = np.where(np.isfinite(results), results, 0)
# return results
def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int) -> Tuple[np.ndarray, np.ndarray]:
day_points = np.arange(0, days)
async def get_corrective_cost_time_chart(material_cost: float, service_cost: float, location_tag: str, token) -> Tuple[np.ndarray, np.ndarray]:
"""
Fetch failure data from API and calculate corrective costs, ensuring 365 days of data.
Args:
material_cost: Cost of materials per failure
service_cost: Cost of service per failure
location_tag: Location tag of the equipment
token: Authorization token
Returns:
Tuple of (corrective_costs, daily_failure_rate)
"""
url = f'http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2024-01-01/2024-12-31'
try:
response = requests.get(
url,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'
},
)
data = response.json()
# Create a complete date range for 2024
start_date = datetime.datetime(2024, 1, 1)
date_range = [start_date + datetime.timedelta(days=x) for x in range(365)]
# Create a dictionary of existing data
data_dict = {
datetime.datetime.strptime(item['date'], '%d %b %Y'): item['num_fail']
for item in data['data']
}
# Fill in missing dates with nearest available value
complete_data = []
last_known_value = 0 # Default value if no data is available
# Parameters for failure rate
base_rate = 0.2 # Base failure rate per day
acceleration = 2.4 # How quickly failure rate increases
grace_period = 170 # Days before failures start increasing significantly
for date in date_range:
if date in data_dict:
if data_dict[date] is not None:
last_known_value = data_dict[date]
complete_data.append(last_known_value)
else:
complete_data.append(last_known_value)
# Calculate daily failure rate using sigmoid function
daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# Convert to numpy array
daily_failure = np.array(complete_data)
# Calculate cumulative failures
failure_counts = np.cumsum(daily_failure_rate)
# Calculate corrective costs
cost_per_failure = material_cost + service_cost
corrective_costs = daily_failure * cost_per_failure
# Calculate corrective costs based on cumulative failures and combined costs
cost_per_failure = material_cost + service_cost
corrective_costs = failure_counts * cost_per_failure
return corrective_costs, daily_failure
return corrective_costs, daily_failure_rate
except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days)
@ -246,6 +289,7 @@ async def get_calculation_data_by_id(db_session: DbSession, calculation_id) -> C
async def create_calculation_result_service(
db_session: DbSession,
calculation: CalculationData,
token: str
) -> CalculationTimeConstrainsRead:
days = 365 # Changed to 365 days as per requirement
@ -265,15 +309,16 @@ async def create_calculation_result_service(
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = get_corrective_cost_time_chart(
corrective_costs, daily_failures = await get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
days=days
token=token,
location_tag=eq.equipment.location_tag
)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_data.parameter.overhaul_cost,
days=days,
days=len(corrective_costs),
numEquipments=len(equipments)
)

Loading…
Cancel
Save