initial commit

main
Cizz22 9 months ago
parent c1520712b8
commit bef5e4f0c0

@ -5,31 +5,6 @@ from fastapi.responses import JSONResponse
from pydantic import BaseModel
from src.auth.service import JWTBearer
from src.calculation_budget_constrains.router import \
router as calculation_budget_constraint
from src.calculation_target_reliability.router import \
router as calculation_target_reliability
from src.calculation_time_constrains.router import \
router as calculation_time_constrains_router
from src.job.router import router as job_router
from src.overhaul.router import router as overhaul_router
from src.overhaul_activity.router import router as overhaul_activity_router
from src.overhaul_job.router import router as job_overhaul_router
from src.overhaul_scope.router import router as scope_router
from src.scope_equipment.router import router as scope_equipment_router
from src.scope_equipment_job.router import router as scope_equipment_job_router
from src.overhaul_schedule.router import router as overhaul_schedule_router
# from src.overhaul_scope.router import router as scope_router
# from src.scope_equipment.router import router as scope_equipment_router
# from src.overhaul.router import router as overhaul_router
# from src.overhaul_history.router import router as overhaul_history_router
# from src.overhaul_activity.router import router as scope_equipment_activity_router
# # from src.overhaul_schedule.router import router as ovehaul_schedule_router
# from src.scope_equipment_part.router import router as scope_equipment_part_router
# from src.calculation_target_reliability.router import router as calculation_target_reliability
#
# from src.master_activity.router import router as activity_router
class ErrorMessage(BaseModel):
msg: str
@ -49,7 +24,9 @@ api_router = APIRouter(
500: {"model": ErrorResponse},
},
)
from src.monitoring.router import router as monitoring_router
api_router.include_router(monitoring_router, prefix="/analytics", tags=["analytics"])
@api_router.get("/healthcheck", include_in_schema=False)
def healthcheck():
@ -59,86 +36,5 @@ def healthcheck():
authenticated_api_router = APIRouter(
dependencies=[Depends(JWTBearer())],
)
# overhaul data
authenticated_api_router.include_router(
overhaul_router, prefix="/overhauls", tags=["overhaul"]
)
authenticated_api_router.include_router(job_router, prefix="/jobs", tags=["job"])
# # Overhaul session data
authenticated_api_router.include_router(
scope_router, prefix="/overhaul-session", tags=["overhaul-session"]
)
authenticated_api_router.include_router(
scope_equipment_router, prefix="/scope-equipments", tags=["scope_equipment"]
)
authenticated_api_router.include_router(
overhaul_activity_router, prefix="/overhaul-activity", tags=["activity"]
)
authenticated_api_router.include_router(
scope_equipment_job_router,
prefix="/scope-equipment-jobs",
tags=["scope_equipment", "job"],
)
authenticated_api_router.include_router(
overhaul_schedule_router,
prefix="/overhaul-schedules",
tags=["overhaul_schedule"],
)
authenticated_api_router.include_router(
job_overhaul_router, prefix="/overhaul-jobs", tags=["job", "overhaul"]
)
# authenticated_api_router.include_router(
# overhaul_history_router, prefix="/overhaul-history", tags=["overhaul_history"]
# )
# authenticated_api_router.include_router(
# scope_equipment_activity_router, prefix="/equipment-activities", tags=["scope_equipment_activities"]
# )
# authenticated_api_router.include_router(
# activity_router, prefix="/activities", tags=["activities"]
# )
# authenticated_api_router.include_router(
# scope_equipment_part_router, prefix="/equipment-parts", tags=["scope_equipment_parts"]
# )
# authenticated_api_router.include_router(
# ovehaul_schedule_router, prefix="/overhaul-schedules", tags=["overhaul_schedules"]
# )
# calculation
calculation_router = APIRouter(prefix="/calculation", tags=["calculations"])
# Time constrains
calculation_router.include_router(
calculation_time_constrains_router,
prefix="/time-constraint",
tags=["calculation", "time_constraint"],
)
# Target reliability
calculation_router.include_router(
calculation_target_reliability,
prefix="/target-reliability",
tags=["calculation", "target_reliability"],
)
# # Budget Constrain
calculation_router.include_router(
calculation_budget_constraint,
prefix="/budget-constraint",
tags=["calculation", "budget_constraint"],
)
authenticated_api_router.include_router(calculation_router)
api_router.include_router(authenticated_api_router)

@ -1,31 +0,0 @@
from typing import Dict, List, Optional
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Query
from src.database.core import DbSession
from src.models import StandardResponse
from .service import get_all_budget_constrains
router = APIRouter()
@router.get("/{session_id}", response_model=StandardResponse[Dict])
async def get_target_reliability(
db_session: DbSession,
session_id: str,
cost_threshold: float = Query(100),
):
"""Get all scope pagination."""
results, consequesce = await get_all_budget_constrains(
db_session=db_session, session_id=session_id, cost_threshold=cost_threshold
)
return StandardResponse(
data={
"results": results,
"consequence": consequesce
},
message="Data retrieved successfully",
)

@ -1,71 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class OverhaulBase(BaseModel):
pass
class OverhaulCriticalParts(OverhaulBase):
criticalParts: List[str] = Field(..., description="List of critical parts")
class OverhaulSchedules(OverhaulBase):
schedules: List[Dict[str, Any]] = Field(..., description="List of schedules")
class OverhaulSystemComponents(OverhaulBase):
systemComponents: Dict[str, Any] = Field(
..., description="List of system components"
)
class OverhaulRead(OverhaulBase):
overview: Dict[str, Any]
criticalParts: List[str]
schedules: List[Dict[str, Any]]
systemComponents: Dict[str, Any]
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,95 +0,0 @@
import random
from typing import Optional
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_scope_name
from src.overhaul_activity.service import get_all_by_session_id
# async def get_all_budget_constrains(
# *, db_session: DbSession, session_id: str, cost_threshold: float = 100000000
# ):
# At the module level, add this dictionary to store persistent EAF values
_equipment_eaf_cache = {}
import random
async def get_all_budget_constrains(
*, db_session: DbSession, session_id: str, cost_threshold: float = 100000000
):
"""Get all overhaul overview with EAF values that sum to 100%."""
equipments = await get_all_by_session_id(db_session=db_session, overhaul_session_id=session_id)
# If no equipments found, return empty list
if not equipments:
return [], []
# Create or retrieve persistent EAF values
global _equipment_eaf_cache
# Generate EAF values for new equipment IDs
equipment_ids = [equipment.id for equipment in equipments]
# Generate new random EAF values if they don't exist
if not _equipment_eaf_cache or set(equipment_ids) != set(_equipment_eaf_cache.keys()):
total_eaf = 100.0
remaining_items = len(equipment_ids)
_equipment_eaf_cache.clear()
# Ensure minimum EAF value for each equipment
min_eaf = 1.0 # Minimum 1% for each equipment
reserved_eaf = min_eaf * remaining_items
distributable_eaf = total_eaf - reserved_eaf
for eq_id in equipment_ids[:-1]: # All except last item
if remaining_items > 1:
# Generate a random value between min_eaf and the remaining distributable EAF
max_allowed = distributable_eaf / (remaining_items - 1)
eaf = round(min_eaf + random.uniform(0, max_allowed), 2)
_equipment_eaf_cache[eq_id] = eaf
distributable_eaf -= (eaf - min_eaf)
remaining_items -= 1
# Assign remaining EAF to last item, ensuring it's at least min_eaf
_equipment_eaf_cache[equipment_ids[-1]] = round(distributable_eaf + min_eaf, 2)
# Create result array of dictionaries
result = [
{
"id": equipment.id,
"assetnum": equipment.assetnum,
"location_tag": equipment.equipment.location_tag,
"name": equipment.equipment.name,
"total_cost": equipment.material_cost + equipment.service_cost,
"eaf_contribution": _equipment_eaf_cache[equipment.id]
}
for equipment in equipments
]
# Sort by EAF contribution (highest to lowest)
result.sort(key=lambda x: x["eaf_contribution"], reverse=True)
# Filter equipment up to threshold
cumulative_cost = 0
included_results = []
for equipment in result:
cumulative_cost += equipment["total_cost"]
if cumulative_cost >= cost_threshold:
break
included_results.append(equipment)
# Rest equipment is consequence list
consequence_results = result[len(included_results):]
#Sort
consequence_results.sort(key=lambda x: x["eaf_contribution"], reverse=True)
included_results.sort(key=lambda x: x["eaf_contribution"], reverse=True)
return included_results, consequence_results
#

@ -1,56 +0,0 @@
from typing import Dict, List, Optional
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Query
from src.database.core import DbSession
from src.models import StandardResponse
from .service import get_eaf_timeline
router = APIRouter()
# @router.get("", response_model=StandardResponse[List[Dict]])
# async def get_target_reliability(
# db_session: DbSession,
# scope_name: Optional[str] = Query(None),
# eaf_threshold: float = Query(100),
# ):
# """Get all scope pagination."""
# results = await get_all_target_reliability(
# db_session=db_session, scope_name=scope_name, eaf_threshold=eaf_threshold
# )
# return StandardResponse(
# data=results,
# message="Data retrieved successfully",
# )
@router.get("", response_model=StandardResponse[List[Dict]])
async def get_target_reliability(
db_session: DbSession,
oh_session_id: Optional[str] = Query(None),
eaf_input: float = Query(0.5),
duration: int = Query(8000),
):
"""Get all scope pagination."""
if not oh_session_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="oh_session_id is required",
)
results = await get_eaf_timeline(
db_session=db_session,
oh_session_id=oh_session_id,
eaf_input=eaf_input,
oh_duration=duration
)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)

@ -1,71 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class OverhaulBase(BaseModel):
pass
class OverhaulCriticalParts(OverhaulBase):
criticalParts: List[str] = Field(..., description="List of critical parts")
class OverhaulSchedules(OverhaulBase):
schedules: List[Dict[str, Any]] = Field(..., description="List of schedules")
class OverhaulSystemComponents(OverhaulBase):
systemComponents: Dict[str, Any] = Field(
..., description="List of system components"
)
class OverhaulRead(OverhaulBase):
overview: Dict[str, Any]
criticalParts: List[str]
schedules: List[Dict[str, Any]]
systemComponents: Dict[str, Any]
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,275 +0,0 @@
from typing import Optional
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_scope_name
from src.scope_equipment_job.service import get_equipment_level_by_no
from datetime import datetime, timedelta
import random
from typing import List
from .utils import generate_down_periods
from src.overhaul_scope.service import get as get_overhaul
from bisect import bisect_left
from collections import defaultdict
# async def get_all_target_reliability(
# *, db_session: DbSession, scope_name: str, eaf_threshold: float = 100.0
# ):
# """Get all overhaul overview with EAF values that sum to 100%, aggregated by system."""
# equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
# equipment_system = await get_equipment_level_by_no(db_session=db_session, level=1)
# equipment_subsystem = await get_equipment_level_by_no(
# db_session=db_session, level=2
# )
# # If no equipments found, return empty list
# if not equipments:
# return []
# import random
# n = len(equipments)
# base_value = 100 / n # Even distribution as base
# # Generate EAF values with ±30% variation from base
# eaf_values = [
# base_value + random.uniform(-0.3 * base_value, 0.3 * base_value)
# for _ in range(n)
# ]
# # Normalize to ensure sum is 100
# total = sum(eaf_values)
# eaf_values = [(v * 100 / total) for v in eaf_values]
# # Create result array of dictionaries
# result = [
# {
# "id": equipment.id,
# "assetnum": equipment.assetnum,
# "location_tag": equipment.master_equipment.location_tag,
# "name": equipment.master_equipment.name,
# "parent_id": equipment.master_equipment.parent_id, # Add parent_id to identify the system
# "eaf": round(eaf, 4), # Add EAF value
# }
# for equipment, eaf in zip(equipments, eaf_values)
# ]
# # Group equipment by system
# sub_system = {
# subsystem.id: subsystem.parent_id for subsystem in equipment_subsystem
# }
# systems = {
# system.id: {"name": system.name, "total_eaf": 0, "equipments": []}
# for system in equipment_system
# }
# for equipment in result:
# if equipment["parent_id"] in sub_system:
# systems[sub_system[equipment["parent_id"]]]["equipments"].append(equipment)
# systems[sub_system[equipment["parent_id"]]]["total_eaf"] += equipment["eaf"]
# # Convert the systems dictionary to a list of aggregated results
# aggregated_result = [
# {
# "system_id": system_id,
# "system_name": system_data["name"],
# "total_eaf": round(system_data["total_eaf"], 4),
# "equipments": system_data["equipments"],
# }
# for system_id, system_data in systems.items()
# ]
# # Sort the aggregated result by total_eaf in descending order
# aggregated_result.sort(key=lambda x: x["total_eaf"], reverse=True)
# # Filter systems up to the threshold
# cumulative_eaf = 0
# filtered_aggregated_result = []
# for system in aggregated_result:
# cumulative_eaf += system["total_eaf"]
# filtered_aggregated_result.append(system)
# if cumulative_eaf >= eaf_threshold:
# break
# return filtered_aggregated_result
# async def get_eaf_timeline(*, db_session, eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
# """
# Generate a timeline of EAF values based on input parameters.
# Args:
# eaf_input (float): EAF value to check against thresholds
# oh_session_id (str): OH session identifier
# Returns:
# set[dict]: Set of dictionaries containing dates and their EAF values
# """
# # Define EAF thresholds
# MIN_EAF = 30
# MAX_EAF = 80
# #Get OH session
# oh_session = await get_overhaul(db_session=db_session, overhaul_session_id=oh_session_id)
# # Dummy OH session dates
# oh_session_start = oh_session.start_date
# oh_session_end = oh_session_start + timedelta(hours=oh_duration)
# # Initialize result set
# results = []
# # Determine date range based on EAF input
# if MIN_EAF <= eaf_input <= MAX_EAF:
# start_date = oh_session_start
# end_date = oh_session_end
# elif eaf_input < MIN_EAF:
# # If below minimum, extend end date by 2 months weeks
# start_date = oh_session_start
# end_date = oh_session_end + timedelta(days=360)
# else: # eaf_input > MAX_EAF
# # If above maximum, reduce end date by 1 month
# start_date = oh_session_start
# end_date = oh_session_end - timedelta(days=180)
# total_hours = (end_date - start_date).total_seconds() / 3600
# # Generate random down periods
# results = []
# # Generate down periods for each EAF scenario
# down_periods = {
# 'eaf1': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf2': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf3': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90),
# 'eaf4': generate_down_periods(start_date, end_date, 5, min_duration=30, max_duration=90)
# }
# # Define EAF values for downtime periods
# eaf_downtime_values = {
# 'eaf1': 0.8,
# 'eaf2': 0.65,
# 'eaf3': 0.35,
# 'eaf4': 0
# }
# # Generate daily entries
# current_time = start_date
# while current_time <= end_date:
# time_str = current_time.strftime('%Y-%m-%d %H:00:00')
# # Initialize dictionary for this hour with default values (system up)
# hourly_entry = {
# 'date': time_str,
# 'eaf1_value': 1.0,
# 'eaf2_value': 0.75,
# 'eaf3_value': 0.6,
# 'eaf4_value': 0.3
# }
# # Check each EAF scenario
# for eaf_key in down_periods:
# # Check if current hour is in any down period for this EAF
# for period_start, period_end in down_periods[eaf_key]:
# if period_start <= current_time <= period_end:
# hourly_entry[f'{eaf_key}_value'] = eaf_downtime_values[eaf_key]
# break
# results.append(hourly_entry)
# current_time += timedelta(hours=1)
# return results
async def get_eaf_timeline(*, db_session, eaf_input: float, oh_session_id: str, oh_duration = 8000) -> List[dict]:
"""
Generate a timeline of EAF values based on input parameters.
Optimized version with reduced time complexity.
Args:
eaf_input (float): EAF value to check against thresholds
oh_session_id (str): OH session identifier
oh_duration (int): Duration in hours
Returns:
List[dict]: List of dictionaries containing dates and their EAF values
"""
MIN_EAF = 30
MAX_EAF = 80
oh_session = await get_overhaul(db_session=db_session, overhaul_session_id=oh_session_id)
oh_session_start = datetime.fromisoformat(oh_session.start_date.isoformat())
# Determine date range
if MIN_EAF <= eaf_input <= MAX_EAF:
end_date = oh_session_start + timedelta(hours=oh_duration)
elif eaf_input < MIN_EAF:
end_date = oh_session_start + timedelta(hours=oh_duration, days=360)
else: # eaf_input > MAX_EAF
end_date = oh_session_start + timedelta(hours=oh_duration) - timedelta(days=180)
# Default EAF values when system is up
default_values = {
'eaf1_value': 1.0,
'eaf2_value': 0.75,
'eaf3_value': 0.6,
'eaf4_value': 0.3
}
# EAF values during downtime
downtime_values = {
'eaf1': 0.8,
'eaf2': 0.65,
'eaf3': 0.35,
'eaf4': 0
}
# Generate down periods for all EAF scenarios at once
all_down_periods = {}
for eaf_key in ['eaf1', 'eaf2', 'eaf3', 'eaf4']:
periods = generate_down_periods(oh_session_start, end_date, 5, min_duration=30, max_duration=90)
# Sort periods by start time for binary search
all_down_periods[eaf_key] = sorted(periods, key=lambda x: x[0])
# Create a list of all state change times
state_changes = defaultdict(dict)
for eaf_key, periods in all_down_periods.items():
for start, end in periods:
# Record state changes at period boundaries
state_changes[start][eaf_key] = downtime_values[eaf_key]
state_changes[end + timedelta(hours=1)][eaf_key] = default_values[f'{eaf_key}_value']
# Convert state_changes to sorted list of times
change_times = sorted(state_changes.keys())
results = []
current_values = default_values.copy()
# Process changes between state change points
current_time = oh_session_start
idx = 0
while current_time <= end_date:
# Update values if we've hit a state change point
if idx < len(change_times) and current_time >= change_times[idx]:
changes = state_changes[change_times[idx]]
for eaf_key, value in changes.items():
current_values[f'{eaf_key}_value'] = value
idx += 1
results.append({
'date': current_time.strftime('%Y-%m-%d %H:00:00'),
**current_values
})
current_time += timedelta(hours=1)
return results

@ -1,54 +0,0 @@
from datetime import datetime, timedelta
import random
from typing import List, Optional
def generate_down_periods(start_date: datetime, end_date: datetime,
num_periods: Optional[int] = None, min_duration: int = 3,
max_duration: int = 7) -> list[tuple[datetime, datetime]]:
"""
Generate random system down periods within a date range.
Args:
start_date (datetime): Start date of the overall period
end_date (datetime): End date of the overall period
num_periods (int, optional): Number of down periods to generate.
If None, generates 1-3 periods randomly
min_duration (int): Minimum duration of each down period in days
max_duration (int): Maximum duration of each down period in days
Returns:
list[tuple[datetime, datetime]]: List of (start_date, end_date) tuples
for each down period
"""
if num_periods is None:
num_periods = random.randint(1, 3)
total_days = (end_date - start_date).days
down_periods = []
# Generate random down periods
for _ in range(num_periods):
# Random duration for this period
duration = random.randint(min_duration, max_duration)
# Ensure we don't exceed the total date range
latest_possible_start = total_days - duration
if latest_possible_start < 0:
continue
# Random start day within available range
start_day = random.randint(0, latest_possible_start)
period_start = start_date + timedelta(days=start_day)
period_end = period_start + timedelta(days=duration)
# Check for overlaps with existing periods
overlaps = any(
(p_start <= period_end and period_start <= p_end)
for p_start, p_end in down_periods
)
if not overlaps:
down_periods.append((period_start, period_end))
return sorted(down_periods)

@ -1,125 +0,0 @@
from typing import Optional
from uuid import UUID
import numpy as np
from fastapi import HTTPException, status
from sqlalchemy import Select, func, select
from sqlalchemy.orm import joinedload
from src.auth.service import Token
from src.database.core import DbSession
from src.overhaul_scope.service import get_all
from src.scope_equipment.model import ScopeEquipment
from src.scope_equipment.service import get_by_assetnum
from src.workorder.model import MasterWorkOrder
from .schema import (CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsParametersRead,
CalculationTimeConstrainsParametersRetrive,
CalculationTimeConstrainsRead)
from .service import (create_calculation_result_service, create_param_and_data,
get_avg_cost_by_asset,
get_calculation_by_reference_and_parameter,
get_calculation_data_by_id, get_calculation_result,
get_corrective_cost_time_chart,
get_overhaul_cost_by_time_chart)
async def get_create_calculation_parameters(
*, db_session: DbSession, calculation_id: Optional[str] = None
):
if calculation_id is not None:
calculation = await get_calculation_data_by_id(
calculation_id=calculation_id, db_session=db_session
)
if not calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return CalculationTimeConstrainsParametersRead(
costPerFailure=calculation.parameter.avg_failure_cost,
overhaulCost=calculation.parameter.overhaul_cost,
reference=calculation,
)
stmt = (
select(
ScopeEquipment.scope_id,
func.avg(MasterWorkOrder.total_cost_max).label("average_cost"),
)
.outerjoin(MasterWorkOrder, ScopeEquipment.assetnum == MasterWorkOrder.assetnum)
.group_by(ScopeEquipment.scope_id)
.order_by(ScopeEquipment.scope_id)
)
results = await db_session.execute(stmt)
costFailure = results.all()
scopes = await get_all(db_session=db_session)
avaiableScopes = {scope.id: scope.scope_name for scope in scopes}
costFailurePerScope = {
avaiableScopes.get(costPerFailure[0]): costPerFailure[1]
for costPerFailure in costFailure
}
return CalculationTimeConstrainsParametersRetrive(
costPerFailure=costFailurePerScope,
availableScopes=avaiableScopes.values(),
recommendedScope="A",
# historicalData={
# "averageOverhaulCost": 10000000,
# "lastCalculation": {
# "id": "calc_122",
# "date": "2024-10-15",
# "scope": "B",
# },
# },
)
async def create_calculation(
*,
token: str,
db_session: DbSession,
calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate,
created_by: str
):
calculation_data = await create_param_and_data(
db_session=db_session,
calculation_param_in=calculation_time_constrains_in,
created_by=created_by,
)
results = await create_calculation_result_service(
db_session=db_session, calculation=calculation_data, token=token
)
return results
async def get_or_create_scope_equipment_calculation(
*,
db_session: DbSession,
scope_calculation_id,
calculation_time_constrains_in: Optional[CalculationTimeConstrainsParametersCreate]
):
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=scope_calculation_id
)
if not scope_calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
# Check if calculation already exist
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
results=scope_calculation.results,
optimum_oh=scope_calculation.optimum_oh_day,
equipment_results=scope_calculation.equipment_results,
)

@ -1,157 +0,0 @@
from enum import Enum
from typing import List, Optional, Union
from sqlalchemy import (JSON, UUID, Boolean, Column, Float, ForeignKey,
Integer, Numeric, String)
from sqlalchemy.orm import relationship
from src.database.core import Base, DbSession
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin, UUIDMixin
class OverhaulReferenceType(str, Enum):
SCOPE = "SCOPE"
ASSET = "ASSET"
class CalculationParam(Base, DefaultMixin, IdentityMixin):
__tablename__ = "oh_ms_calculation_param"
avg_failure_cost = Column(Float, nullable=False)
overhaul_cost = Column(Float, nullable=False)
# Relationships
calculation_data = relationship("CalculationData", back_populates="parameter")
results = relationship("CalculationResult", back_populates="parameter")
# @classmethod
# async def create_with_references(
# cls,
# db: DbSession,
# avg_failure_cost: float,
# overhaul_cost: float,
# created_by: str,
# # list of {"reference_type": OverhaulReferenceType, "reference_id": str}
# references: List[dict]
# ):
# # Create parameter
# param = cls(
# avg_failure_cost=avg_failure_cost,
# overhaul_cost=overhaul_cost,
# created_by=created_by
# )
# db.add(param)
# await db.flush() # Flush to get the param.id
# # Create reference links
# for ref in references:
# reference_link = ReferenceLink(
# parameter_id=param.id,
# overhaul_reference_type=ref["reference_type"],
# reference_id=ref["reference_id"]
# )
# db.add(reference_link)
# await db.commit()
# await db.refresh(param)
# return param
class CalculationData(Base, DefaultMixin, IdentityMixin):
__tablename__ = "oh_tr_calculation_data"
parameter_id = Column(
UUID(as_uuid=True), ForeignKey("oh_ms_calculation_param.id"), nullable=True
)
overhaul_session_id = Column(
UUID(as_uuid=True), ForeignKey("oh_ms_overhaul_scope.id")
)
optimum_oh_day = Column(Integer, nullable=True)
session = relationship("OverhaulScope", lazy="raise")
parameter = relationship("CalculationParam", back_populates="calculation_data")
equipment_results = relationship(
"CalculationEquipmentResult", lazy="raise", viewonly=True
)
results = relationship("CalculationResult", lazy="raise", viewonly=True)
@classmethod
async def create_with_param(
cls,
overhaul_session_id: str,
db: DbSession,
avg_failure_cost: Optional[float],
overhaul_cost: Optional[float],
created_by: str,
params_id: Optional[UUID],
):
if not params_id:
# Create Params
params = CalculationParam(
avg_failure_cost=avg_failure_cost,
overhaul_cost=overhaul_cost,
created_by=created_by,
)
db.add(params)
await db.flush()
params_id = params.id
calculation_data = cls(
overhaul_session_id=overhaul_session_id,
created_by=created_by,
parameter_id=params_id,
)
db.add(calculation_data)
await db.commit()
await db.refresh(calculation_data)
return calculation_data
class CalculationResult(Base, DefaultMixin):
__tablename__ = "oh_tr_calculation_result"
parameter_id = Column(
UUID(as_uuid=True), ForeignKey("oh_ms_calculation_param.id"), nullable=False
)
calculation_data_id = Column(
UUID(as_uuid=True), ForeignKey("oh_tr_calculation_data.id"), nullable=False
)
day = Column(Integer, nullable=False)
corrective_cost = Column(Float, nullable=False)
overhaul_cost = Column(Float, nullable=False)
num_failures = Column(Integer, nullable=False)
parameter = relationship("CalculationParam", back_populates="results")
reference_link = relationship("CalculationData")
class CalculationEquipmentResult(Base, DefaultMixin):
__tablename__ = "oh_tr_calculation_equipment_result"
corrective_costs = Column(JSON, nullable=False)
overhaul_costs = Column(JSON, nullable=False)
daily_failures = Column(JSON, nullable=False)
assetnum = Column(String(255), nullable=False)
material_cost = Column(Float, nullable=False)
service_cost = Column(Float, nullable=False)
calculation_data_id = Column(
UUID(as_uuid=True), ForeignKey("oh_tr_calculation_data.id"), nullable=True
)
optimum_day = Column(Integer, default=1)
is_included = Column(Boolean, default=True)
master_equipment = relationship(
"MasterEquipment",
lazy="joined",
primaryjoin="and_(CalculationEquipmentResult.assetnum == foreign(MasterEquipment.assetnum))",
uselist=False, # Add this if it's a one-to-one relationship
)

@ -1,146 +0,0 @@
from typing import List, Optional, Union
from fastapi import APIRouter
from fastapi.params import Query
from src.auth.service import CurrentUser, Token
from src.database.core import DbSession
from src.models import StandardResponse
from .flows import (create_calculation, get_create_calculation_parameters,
get_or_create_scope_equipment_calculation)
from .schema import (CalculationResultsRead,
CalculationSelectedEquipmentUpdate,
CalculationTimeConstrainsCreate,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsParametersRead,
CalculationTimeConstrainsParametersRetrive,
CalculationTimeConstrainsRead, EquipmentResult)
from .service import (bulk_update_equipment, get_calculation_result,
get_calculation_result_by_day, get_calculation_by_assetnum)
router = APIRouter()
@router.post(
"", response_model=StandardResponse[Union[str, CalculationTimeConstrainsRead]]
)
async def create_calculation_time_constrains(
token: Token,
db_session: DbSession,
current_user: CurrentUser,
calculation_time_constrains_in: CalculationTimeConstrainsParametersCreate,
scope_calculation_id: Optional[str] = Query(None),
with_results: Optional[int] = Query(0),
):
"""Save calculation time constrains Here"""
if scope_calculation_id:
results = await get_or_create_scope_equipment_calculation(
db_session=db_session,
scope_calculation_id=scope_calculation_id,
calculation_time_constrains_in=calculation_time_constrains_in,
)
else:
results = await create_calculation(
token=token,
db_session=db_session,
calculation_time_constrains_in=calculation_time_constrains_in,
created_by=current_user.name,
)
if not with_results:
results = str(results.id)
return StandardResponse(data=results, message="Data created successfully")
@router.get(
"/parameters",
response_model=StandardResponse[
Union[
CalculationTimeConstrainsParametersRetrive,
CalculationTimeConstrainsParametersRead,
]
],
)
async def get_calculation_parameters(
db_session: DbSession, calculation_id: Optional[str] = Query(default=None)
):
"""Get all calculation parameter."""
parameters = await get_create_calculation_parameters(
db_session=db_session, calculation_id=calculation_id
)
return StandardResponse(
data=parameters,
message="Data retrieved successfully",
)
@router.get(
"/{calculation_id}", response_model=StandardResponse[CalculationTimeConstrainsRead]
)
async def get_calculation_results(db_session: DbSession, calculation_id):
results = await get_calculation_result(
db_session=db_session, calculation_id=calculation_id
)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.get(
"/{calculation_id}/{assetnum}", response_model=StandardResponse[EquipmentResult]
)
async def get_calculation_per_equipment(db_session: DbSession, calculation_id, assetnum):
results = await get_calculation_by_assetnum(
db_session=db_session, assetnum=assetnum, calculation_id=calculation_id
)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.post(
"/{calculation_id}/simulation",
response_model=StandardResponse[CalculationResultsRead],
)
async def get_simulation_result(
db_session: DbSession,
calculation_id,
calculation_simuation_in: CalculationTimeConstrainsCreate,
):
simulation_result = await get_calculation_result_by_day(
db_session=db_session,
calculation_id=calculation_id,
simulation_day=calculation_simuation_in.intervalDays,
)
return StandardResponse(
data=simulation_result, message="Data retrieved successfully"
)
@router.put("/{calculation_id}", response_model=StandardResponse[List[str]])
async def update_selected_equipment(
db_session: DbSession,
calculation_id,
calculation_time_constrains_in: List[CalculationSelectedEquipmentUpdate],
):
results = await bulk_update_equipment(
db=db_session,
selected_equipments=calculation_time_constrains_in,
calculation_data_id=calculation_id,
)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)

@ -1,94 +0,0 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import Field
from src.models import DefultBase
from src.scope_equipment.schema import MasterEquipmentBase
class CalculationTimeConstrainsBase(DefultBase):
pass
class ReferenceLinkBase(DefultBase):
reference_id: str = Field(..., description="Reference ID")
overhaul_reference_type: str = Field(..., description="Overhaul reference type")
class CalculationTimeConstrainsParametersRetrive(CalculationTimeConstrainsBase):
# type: ignore
costPerFailure: Union[dict, float] = Field(..., description="Cost per failure")
availableScopes: List[str] = Field(..., description="Available scopes")
recommendedScope: str = Field(..., description="Recommended scope")
# historicalData: Dict[str, Any] = Field(..., description="Historical data")
class CalculationTimeConstrainsParametersRead(CalculationTimeConstrainsBase):
costPerFailure: Union[dict, float] = Field(..., description="Cost per failure")
overhaulCost: Optional[float] = Field(None, description="Overhaul cost")
reference: Optional[List[ReferenceLinkBase]] = Field(None, description="Reference")
class CalculationTimeConstrainsParametersCreate(CalculationTimeConstrainsBase):
overhaulCost: Optional[float] = Field(0, description="Overhaul cost")
ohSessionId: Optional[UUID] = Field(None, description="Scope OH")
costPerFailure: Optional[float] = Field(0, description="Cost per failure")
# class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):
# overhaulCost: float = Field(..., description="Overhaul cost")
# scopeOH: str = Field(..., description="Scope OH")
# costPerFailure: float = Field(..., description="Cost per failure")
# metadata: Dict[str, Any] = Field(..., description="Metadata")
class CalculationResultsRead(CalculationTimeConstrainsBase):
day: int
corrective_cost: float
overhaul_cost: float
num_failures: int
class OptimumResult(CalculationTimeConstrainsBase):
overhaul_cost: float
corrective_cost: float
num_failures: int
days: int
class EquipmentResult(CalculationTimeConstrainsBase):
id: UUID
corrective_costs: List[float]
overhaul_costs: List[float]
daily_failures: List[float]
assetnum: str
material_cost: float
service_cost: float
optimum_day: int # Added optimum result for each equipment
is_included: bool
master_equipment: Optional[MasterEquipmentBase] = Field(None)
class CalculationTimeConstrainsRead(CalculationTimeConstrainsBase):
id: UUID
reference: UUID
scope: str
results: List[CalculationResultsRead]
equipment_results: List[EquipmentResult]
optimum_oh: Any
class CalculationTimeConstrainsCreate(CalculationTimeConstrainsBase):
intervalDays: int
class CalculationTimeConstrainsSimulationRead(CalculationTimeConstrainsBase):
simulation: CalculationResultsRead
class CalculationSelectedEquipmentUpdate(CalculationTimeConstrainsBase):
is_included: bool
assetnum: str

@ -1,707 +0,0 @@
import datetime
from typing import Coroutine, List, Optional, Tuple
from uuid import UUID
import numpy as np
import requests
from fastapi import HTTPException, status
from sqlalchemy import and_, case, func, select, update
from sqlalchemy.orm import joinedload
from src.database.core import DbSession
from src.overhaul_activity.service import get_all_by_session_id
from src.overhaul_scope.service import get as get_scope
from src.utils import get_latest_numOfFail
from src.workorder.model import MasterWorkOrder
from .model import (CalculationData, CalculationEquipmentResult,
CalculationResult)
from .schema import (CalculationResultsRead,
CalculationSelectedEquipmentUpdate,
CalculationTimeConstrainsParametersCreate,
CalculationTimeConstrainsRead, OptimumResult)
from .utils import get_months_between
# def get_overhaul_cost_by_time_chart(
# overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.01
# ) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
# hours = days * 24
# rate = np.arange(1, hours + 1)
# cost_per_equipment = overhaul_cost / numEquipments
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
# return results
# def get_overhaul_cost_by_time_chart(overhaul_cost: float, days: int, numEquipments: int, decay_base: float = 1.1) -> np.ndarray:
# if overhaul_cost < 0:
# raise ValueError("Overhaul cost cannot be negative")
# if days <= 0:
# raise ValueError("Days must be positive")
# exponents = np.arange(0, days)
# cost_per_equipment = overhaul_cost / numEquipments
# # Introduce randomness by multiplying with a random factor
# random_factors = np.random.normal(1.0, 0.1, numEquipments) # Mean 1.0, Std Dev 0.1
# results = np.array([cost_per_equipment * factor / (decay_base ** exponents) for factor in random_factors])
# results = np.where(np.isfinite(results), results, 0)
# return results
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime.datetime,
end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
url = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
try:
response = requests.get(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
data = response.json()
latest_num = data["data"][-1]["num_fail"]
if not latest_num:
latest_num = 1
# Create a complete date range for 2025
# start_date = datetime.datetime(2025, 1, 1)
# date_range = [start_date + datetime.timedelta(days=x) for x in range(days_difference)]
# Create a dictionary of existing data
data_dict = {
datetime.datetime.strptime(item["date"], "%d %b %Y"): item["num_fail"]
for item in data["data"]
}
# Initialize all months in the range with 0
monthly_data = {}
current_date = start_date.replace(day=1)
while current_date <= end_date:
monthly_data[current_date] = 0
# Move to next month
if current_date.month == 12:
current_date = datetime.datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# Get the last day's value for each month
for date in data_dict.keys():
month_key = datetime.datetime(date.year, date.month, 1)
if month_key in monthly_data and data_dict[date] is not None:
# Update only if the value is higher (to get the last day's value)
monthly_data[month_key] = max(monthly_data[month_key], data_dict[date])
# Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
# Calculate corrective costs
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
except Exception as e:
print(f"Error fetching or processing data: {str(e)}")
raise
async def get_corrective_cost_time_chart(
material_cost: float,
service_cost: float,
location_tag: str,
token,
start_date: datetime.datetime,
end_date: datetime.datetime
) -> Tuple[np.ndarray, np.ndarray]:
days_difference = (end_date - start_date).days
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today + datetime.timedelta(days=1)
url_prediction = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/{tomorrow.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}"
url_history = f"http://192.168.1.82:8000/reliability/main/failures/{location_tag}/{start_date.strftime('%Y-%m-%d')}/{today.strftime('%Y-%m-%d')}"
# Initialize monthly data dictionary
monthly_data = {}
# Get historical data (start_date to today)
if start_date <= today:
try:
response = requests.get(
url_history,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
history_data = response.json()
# Process historical data - accumulate failures by month
history_dict = {}
monthly_failures = {}
for item in history_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
# Initialize if first occurrence of this month
if month_key not in history_dict:
history_dict[month_key] = 0
# Accumulate failures for this month
if item["num_fail"] is not None:
history_dict[month_key] += item["num_fail"]
# Sort months chronologically
sorted_months = sorted(history_dict.keys())
failures = np.array([history_dict[month] for month in sorted_months])
cum_failure = np.cumsum(failures)
for month_key in sorted_months:
monthly_failures[month_key] = int(cum_failure[sorted_months.index(month_key)])
# Update monthly_data with cumulative historical data
monthly_data.update(monthly_failures)
except Exception as e:
# print(f"Error fetching historical data: {e}")
raise Exception(e)
latest_num = 1
# Get prediction data (today+1 to end_date)
if end_date > today:
try:
response = requests.get(
url_prediction,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
prediction_data = response.json()
# Use the last prediction value for future months
# Get the latest number from prediction data
latest_num = prediction_data["data"][-1]["num_fail"]
# Ensure the value is at least 1
if not latest_num or latest_num < 1:
latest_num = 1
else:
# Round the number to the nearest integer
latest_num = round(latest_num)
# Create prediction dictionary
prediction_dict = {}
for item in prediction_data["data"]:
date = datetime.datetime.strptime(item["date"], "%d %b %Y")
month_key = datetime.datetime(date.year, date.month, 1)
prediction_dict[month_key] = round(item["num_fail"])
# Update monthly_data with prediction data
for key in prediction_dict:
if key not in monthly_data: # Don't overwrite historical data
monthly_data[key] = prediction_dict[key]
except Exception as e:
print(f"Error fetching prediction data: {e}")
# Create a complete date range covering all months from start to end
current_date = datetime.datetime(start_date.year, start_date.month, 1)
while current_date <= end_date:
if current_date not in monthly_data:
# Initialize to check previous months
previous_month = current_date.replace(day=1) - datetime.timedelta(days=1)
# Now previous_month is the last day of the previous month
# Convert back to first day of previous month for consistency
previous_month = previous_month.replace(day=1)
# Keep going back until we find data or run out of months to check
month_diff = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month)
max_attempts = max(1, month_diff) # Ensure at least 1 attempt
attempts = 0
while previous_month not in monthly_data and attempts < max_attempts:
# Move to the previous month (last day of the month before)
previous_month = previous_month.replace(day=1) - datetime.timedelta(days=1)
# Convert to first day of month
previous_month = previous_month.replace(day=1)
attempts += 1
# Use the found value or default to 0 if no previous month with data exists
if previous_month in monthly_data:
monthly_data[current_date] = monthly_data[previous_month]
else:
monthly_data[current_date] = 0
# Move to next month
if current_date.month == 12:
current_date = datetime.datetime(current_date.year + 1, 1, 1)
else:
current_date = datetime.datetime(current_date.year, current_date.month + 1, 1)
# # Convert to list maintaining chronological order
complete_data = []
for month in sorted(monthly_data.keys()):
complete_data.append(monthly_data[month])
# Convert to numpy array
monthly_failure = np.array(complete_data)
cost_per_failure = (material_cost + service_cost) / latest_num
if cost_per_failure == 0:
raise ValueError("Cost per failure cannot be zero")
# if location_tag == "3TR-TF005":
# raise Exception(cost_per_failure, latest_num)
corrective_costs = monthly_failure * cost_per_failure
return corrective_costs, monthly_failure
# except Exception as e:
# print(f"Error fetching or processing data: {str(e)}")
# raise
def get_overhaul_cost_by_time_chart(
overhaul_cost: float, months_num: int, numEquipments: int, decay_base: float = 1.01
) -> np.ndarray:
if overhaul_cost < 0:
raise ValueError("Overhaul cost cannot be negative")
if months_num <= 0:
raise ValueError("months_num must be positive")
rate = np.arange(1, months_num + 1)
cost_per_equipment = overhaul_cost / numEquipments
# results = cost_per_equipment - ((cost_per_equipment / hours) * rate)
results = cost_per_equipment / rate
return results
# def get_corrective_cost_time_chart(material_cost: float, service_cost: float, days: int, numEquipments: int) -> Tuple[np.ndarray, np.ndarray]:
# day_points = np.arange(0, days)
# # Parameters for failure rate
# base_rate = 0.04 # Base failure rate per day
# acceleration = 0.7 # How quickly failure rate increases
# grace_period = 49 # Days before failures start increasing significantly
# # Calculate daily failure rate using sigmoid function
# daily_failure_rate = base_rate / (1 + np.exp(-acceleration * (day_points - grace_period)/days))
# # Introduce randomness in the failure rate
# random_noise = np.random.normal(0.0, 0.05, (numEquipments, days)) # Mean 0.0, Std Dev 0.05
# daily_failure_rate = daily_failure_rate + random_noise
# daily_failure_rate = np.clip(daily_failure_rate, 0, None) # Ensure failure rate is non-negative
# # Calculate cumulative failures
# failure_counts = np.cumsum(daily_failure_rate)
# # Calculate corrective costs based on cumulative failures and combined costs
# cost_per_failure = material_cost + service_cost
# corrective_costs = failure_counts * cost_per_failure
# return corrective_costs, daily_failure_rate
async def create_param_and_data(
*,
db_session: DbSession,
calculation_param_in: CalculationTimeConstrainsParametersCreate,
created_by: str,
parameter_id: Optional[UUID] = None,
):
"""Creates a new document."""
if calculation_param_in.ohSessionId is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="overhaul_session_id is required",
)
calculationData = await CalculationData.create_with_param(
db=db_session,
overhaul_session_id=calculation_param_in.ohSessionId,
avg_failure_cost=calculation_param_in.costPerFailure,
overhaul_cost=calculation_param_in.overhaulCost,
created_by=created_by,
params_id=parameter_id,
)
return calculationData
async def get_calculation_result(db_session: DbSession, calculation_id: str):
scope_calculation = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation_id
)
if not scope_calculation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
scope_overhaul = await get_scope(
db_session=db_session, overhaul_session_id=scope_calculation.overhaul_session_id
)
if not scope_overhaul:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
start_date = datetime.datetime.combine(scope_overhaul.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope_overhaul.end_date, datetime.time.min)
months_num = get_months_between(start_date, end_date)
calculation_results = []
for i in range(months_num):
result = {
"overhaul_cost": 0,
"corrective_cost": 0,
"num_failures": 0,
"day": i + 1,
}
## Add risk Cost
# risk cost = ((Down Time1 * MW Loss 1) + (Downtime2 * Mw 2) + .... (DowntimeN * MwN) ) * Harga listrik (Efficicency HL App)
for eq in scope_calculation.equipment_results:
if not eq.is_included:
continue
result["corrective_cost"] += float(eq.corrective_costs[i])
result["overhaul_cost"] += float(eq.overhaul_costs[i])
result["num_failures"] += int(eq.daily_failures[i])
calculation_results.append(CalculationResultsRead(**result))
# Check if calculation already exist
return CalculationTimeConstrainsRead(
id=scope_calculation.id,
reference=scope_calculation.overhaul_session_id,
scope=scope_overhaul.type,
results=calculation_results,
optimum_oh=scope_calculation.optimum_oh_day,
equipment_results=scope_calculation.equipment_results,
)
async def get_calculation_data_by_id(
db_session: DbSession, calculation_id
) -> CalculationData:
stmt = (
select(CalculationData)
.filter(CalculationData.id == calculation_id)
.options(
joinedload(CalculationData.equipment_results),
joinedload(CalculationData.parameter),
)
)
result = await db_session.execute(stmt)
return result.unique().scalar()
async def get_calculation_by_assetnum(
*, db_session: DbSession, assetnum: str, calculation_id: str
):
stmt = (
select(CalculationEquipmentResult)
.where(CalculationEquipmentResult.assetnum == assetnum)
.where(CalculationEquipmentResult.calculation_data_id == calculation_id)
)
result = await db_session.execute(stmt)
return result.scalar()
# async def create_calculation_result_service(db_session: DbSession, calculation_id: UUID, costPerFailure: Optional[float] = None):
# days = 360
# calculation = await get_calculation_data_by_id(db_session=db_session, calculation_id=calculation_id)
# # reference = await get_by_assetnum(db_session=db_session, assetnum=calculation.reference_id) if calculation.overhaul_reference_type == OverhaulReferenceType.ASSET else await get(db_session=db_session, scope_id=calculation.reference_id)
# # Multiple Eequipment
# equipments_scope = get_all_by_session_id(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
# # Parameter
# overhaulCost = calculation.parameter.overhaul_cost
# costPerFailure = costPerFailure if costPerFailure else calculation.parameter.avg_failure_cost
# overhaul_cost_points = get_overhaul_cost_by_time_chart(
# overhaulCost, days=days)
# for eq in equipments_scope:
# corrective_cost_points, dailyNumberOfFailure = get_corrective_cost_time_chart(
# costPerFailure, days)
# total_cost = overhaul_cost_points + corrective_cost_points
# optimumOHIndex = np.argmin(total_cost)
# numbersOfFailure = sum(dailyNumberOfFailure[:optimumOHIndex])
# optimum = {
# "overhaulCost": float(overhaul_cost_points[optimumOHIndex]),
# "correctiveCost": float(corrective_cost_points[optimumOHIndex]),
# "numOfFailures": int(numbersOfFailure),
# "days": int(optimumOHIndex+1)
# }
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(corrective_cost_points[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(dailyNumberOfFailure[i]),
# )
# calculation_results.append(result)
# calculation.optimum_oh_day = int(optimumOHIndex+1)
# db_session.add_all(calculation_results)
# await db_session.commit()
# return CalculationTimeConstrainsRead(
# id=calculation.id,
# name=reference.scope_name if hasattr(
# reference, "scope_name") else reference.master_equipment.name,
# reference=reference.assetnum if hasattr(
# reference, "assetnum") else reference.scope_name,
# results=calculation_results,
# optimumOh=optimum
# )
async def create_calculation_result_service(
db_session: DbSession, calculation: CalculationData, token: str
) -> CalculationTimeConstrainsRead:
scope = await get_scope(db_session=db_session, overhaul_session_id=calculation.overhaul_session_id)
start_date = datetime.datetime.combine(scope.start_date, datetime.time.min)
end_date = datetime.datetime.combine(scope.end_date, datetime.time.min)
months_num = get_months_between(start_date, end_date)
# Get all equipment for this calculation session
equipments = await get_all_by_session_id(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
scope = await get_scope(
db_session=db_session, overhaul_session_id=calculation.overhaul_session_id
)
calculation_data = await get_calculation_data_by_id(
db_session=db_session, calculation_id=calculation.id
)
# Store results for each equipment
equipment_results: List[CalculationEquipmentResult] = []
total_corrective_costs = np.zeros(months_num)
total_overhaul_cost = np.zeros(months_num)
total_daily_failures = np.zeros(months_num)
# Calculate for each equipment
for eq in equipments:
corrective_costs, daily_failures = await get_corrective_cost_time_chart(
material_cost=eq.material_cost,
service_cost=eq.service_cost,
token=token,
location_tag=eq.equipment.location_tag,
start_date=start_date,
end_date=end_date
)
overhaul_cost_points = get_overhaul_cost_by_time_chart(
calculation_data.parameter.overhaul_cost,
months_num=months_num,
numEquipments=len(equipments),
)
# Calculate individual equipment optimum points
equipment_total_cost = corrective_costs + overhaul_cost_points
equipment_optimum_index = np.argmin(equipment_total_cost)
equipment_failure_sum = sum(daily_failures[:equipment_optimum_index])
equipment_results.append(
CalculationEquipmentResult(
corrective_costs=corrective_costs.tolist(),
overhaul_costs=overhaul_cost_points.tolist(),
daily_failures=daily_failures.tolist(),
assetnum=eq.assetnum,
material_cost=eq.material_cost,
service_cost=eq.service_cost,
optimum_day=int(equipment_optimum_index + 1),
calculation_data_id=calculation.id,
master_equipment=eq.equipment,
)
)
# Add to totals
total_corrective_costs += corrective_costs
total_overhaul_cost += overhaul_cost_points
total_daily_failures += daily_failures
db_session.add_all(equipment_results)
# Calculate optimum points using total costs
total_cost = total_corrective_costs + total_overhaul_cost
optimum_oh_index = np.argmin(total_cost)
numbers_of_failure = sum(total_daily_failures[:optimum_oh_index])
optimum = OptimumResult(
overhaul_cost=float(overhaul_cost_points[optimum_oh_index]),
corrective_cost=float(total_corrective_costs[optimum_oh_index]),
num_failures=int(numbers_of_failure),
days=int(optimum_oh_index + 1),
)
# # Create calculation results for database
# calculation_results = []
# for i in range(days):
# result = CalculationResult(
# parameter_id=calculation.parameter_id,
# calculation_data_id=calculation.id,
# day=(i + 1),
# corrective_cost=float(total_corrective_costs[i]),
# overhaul_cost=float(overhaul_cost_points[i]),
# num_failures=int(total_daily_failures[i]),
# )
# calculation_results.append(result)
# Update calculation with optimum day
calculation.optimum_oh_day = optimum.days
await db_session.commit()
# Return results including individual equipment data
return CalculationTimeConstrainsRead(
id=calculation.id,
reference=calculation.overhaul_session_id,
scope=scope.type,
results=[],
optimum_oh=optimum,
equipment_results=equipment_results,
)
async def get_calculation_by_reference_and_parameter(
*, db_session: DbSession, calculation_reference_id, parameter_id
):
stmt = select(CalculationData).filter(
and_(
CalculationData.reference_id == calculation_reference_id,
CalculationData.parameter_id == parameter_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_calculation_result_by_day(
*, db_session: DbSession, calculation_id, simulation_day
):
stmt = select(CalculationResult).filter(
and_(
CalculationResult.day == simulation_day,
CalculationResult.calculation_data_id == calculation_id,
)
)
result = await db_session.execute(stmt)
return result.scalar()
async def get_avg_cost_by_asset(*, db_session: DbSession, assetnum: str):
stmt = select(func.avg(MasterWorkOrder.total_cost_max).label("average_cost")).where(
MasterWorkOrder.assetnum == assetnum
)
result = await db_session.execute(stmt)
return result.scalar_one_or_none()
async def bulk_update_equipment(
*,
db: DbSession,
selected_equipments: List[CalculationSelectedEquipmentUpdate],
calculation_data_id: UUID,
):
# Create a dictionary mapping assetnum to is_included status
case_mappings = {asset.assetnum: asset.is_included for asset in selected_equipments}
# Get all assetnums that need to be updated
assetnums = list(case_mappings.keys())
# Create a list of when clauses for the case statement
when_clauses = [
(CalculationEquipmentResult.assetnum == assetnum, is_included)
for assetnum, is_included in case_mappings.items()
]
# Build the update statement
stmt = (
update(CalculationEquipmentResult)
.where(CalculationEquipmentResult.calculation_data_id == calculation_data_id)
.where(CalculationEquipmentResult.assetnum.in_(assetnums))
.values(
{
"is_included": case(
*when_clauses
) # Unpack the when clauses as separate arguments
}
)
)
await db.execute(stmt)
await db.commit()
return assetnums

@ -1,9 +0,0 @@
import datetime
def get_months_between(start_date: datetime.datetime, end_date: datetime.datetime) -> int:
"""
Calculate number of months between two dates.
"""
months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
# Add 1 to include both start and end months
return months + 1

@ -1,45 +0,0 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class MasterActivitytask(Base, DefaultMixin):
__tablename__ = "oh_ms_job_task"
description = Column(String, nullable=False)
oh_type = Column(String, nullable=False)
job_id = Column(
UUID(as_uuid=True),
ForeignKey("oh_ms_job.id", ondelete="cascade"),
nullable=False,
)
class MasterActivity(Base, DefaultMixin):
__tablename__ = "oh_ms_job"
workscope = Column(String, nullable=True)
system = Column(String, nullable=True)
subsystem = Column(String, nullable=True)
tasks = relationship(
"MasterActivitytask",
lazy="selectin",
)
# details = relationship(
# "MasterActivityDetail",
# lazy="raise",
# primaryjoin="and_(MasterActivity.id == foreign(MasterActivityDetail.activity_id))",
# )
# class MasterActivityDetail(Base, DefaultMixin):
# __tablename__ = "oh_ms_activity_detail"
# name = Column(String, nullable=False)
# activity_id = Column(UUID(as_uuid=True))

@ -1,84 +0,0 @@
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .schema import (ActivityMaster, ActivityMasterCreate,
ActivityMasterPagination)
from .service import create, delete, get, get_all, update
router = APIRouter()
@router.get("", response_model=StandardResponse[ActivityMasterPagination])
async def get_activities(common: CommonParameters):
"""Get all scope activity pagination."""
# return
data = await get_all(common=common)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.post("", response_model=StandardResponse[ActivityMasterCreate])
async def create_activity(db_session: DbSession, activity_in: ActivityMasterCreate):
activity = await create(db_session=db_session, activty_in=activity_in)
return StandardResponse(data=activity, message="Data created successfully")
@router.get(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def get_activity(db_session: DbSession, activity_id: str):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=activity, message="Data retrieved successfully")
@router.put(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def update_scope(
db_session: DbSession, activity_in: ActivityMasterCreate, activity_id
):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(
db_session=db_session, activity=activity, activity_in=activity_in
),
message="Data updated successfully",
)
@router.delete(
"/{scope_equipment_activity_id}", response_model=StandardResponse[ActivityMaster]
)
async def delete_scope(db_session: DbSession, activity_id: str):
activity = await get(db_session=db_session, activity_id=activity_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, activity_id=activity_id)
return StandardResponse(message="Data deleted successfully", data=activity)

@ -1,75 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class ActivityMaster(DefultBase):
pass
class ActivityMasterDetail(DefultBase):
name: str
class ActivityMasterCreate(ActivityMaster):
description: str
class ActivityMasterTasks(DefultBase):
description: str
oh_type: str
class ActivityMasterRead(ActivityMaster):
id: UUID
workscope: str
system: str
subsystem: str
tasks: List[ActivityMasterTasks]
class ActivityMasterPagination(Pagination):
items: List[ActivityMasterRead] = []
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,154 +0,0 @@
from datetime import datetime, timedelta
from typing import Any, Dict
import httpx
from fastapi import HTTPException
from starlette.config import Config
from src.config import MAXIMO_API_KEY, MAXIMO_BASE_URL
class MaximoDataMapper:
"""
Helper class to map MAXIMO API response to our data structure.
Update these mappings according to actual MAXIMO API documentation.
"""
def __init__(self, maximo_data: Dict[Any, Any]):
self.data = maximo_data
def get_start_date(self) -> datetime:
"""
Extract start date from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
Example: might be data['startDate'] or data['SCHEDSTART'] etc.
"""
# This is a placeholder - update with actual MAXIMO field name
start_date_str = self.data.get("scheduleStart")
if not start_date_str:
raise ValueError("Start date not found in MAXIMO data")
return datetime.fromisoformat(start_date_str)
def get_end_date(self) -> datetime:
"""
Extract end date from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
end_date_str = self.data.get("scheduleEnd")
if not end_date_str:
raise ValueError("End date not found in MAXIMO data")
return datetime.fromisoformat(end_date_str)
def get_maximo_id(self) -> str:
"""
Extract MAXIMO ID from response.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
maximo_id = self.data.get("workOrderId")
if not maximo_id:
raise ValueError("MAXIMO ID not found in response")
return str(maximo_id)
def get_status(self) -> str:
"""
Extract status from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO status field and values
status = self.data.get("status", "").upper()
return status
def get_total_cost(self) -> float:
"""
Extract total cost from MAXIMO data.
TODO: Update this based on actual MAXIMO API response structure
"""
# This is a placeholder - update with actual MAXIMO field name
cost = self.data.get("totalCost", 0)
return float(cost)
def get_scope_name(self) -> str:
scope_name = self.data.get("location", "A")
return scope_name
class MaximoService:
def __init__(self):
# TODO: Update these settings based on actual MAXIMO API configuration
self.base_url = MAXIMO_BASE_URL
self.api_key = MAXIMO_API_KEY
async def get_recent_overhaul(self) -> dict:
"""
Fetch most recent overhaul from MAXIMO.
TODO: Update this method based on actual MAXIMO API endpoints and parameters
"""
current_date = datetime.now()
schedule_start = current_date + timedelta(days=30) # Starting in 30 days
schedule_end = schedule_start + timedelta(days=90) # 90 day overhaul period
return {
"scheduleStart": schedule_start.isoformat(),
"scheduleEnd": schedule_end.isoformat(),
"workOrderId": "WO-2024-12345",
"status": "PLAN", # Common Maximo statuses: SCHEDULED, INPRG, COMP, CLOSE
"totalCost": 10000000.00,
"description": "Annual Turbine Overhaul",
"priority": 1,
"location": "A",
"assetDetails": [
{
"assetnum": "ASSET001",
"description": "Gas Turbine",
"status": "OPERATING",
},
{
"assetnum": "ASSET002",
"description": "Steam Turbine",
"status": "OPERATING",
},
],
"workType": "OH", # OH for Overhaul
"createdBy": "MAXADMIN",
"createdDate": (current_date - timedelta(days=10)).isoformat(),
"lastModifiedBy": "MAXADMIN",
"lastModifiedDate": current_date.isoformat(),
}
async with httpx.AsyncClient() as client:
try:
# TODO: Update endpoint and parameters based on actual MAXIMO API
response = await client.get(
f"{self.base_url}/your-endpoint-here",
headers={
"Authorization": f"Bearer {self.api_key}",
# Add any other required headers
},
params={
# Update these parameters based on actual MAXIMO API
"orderBy": "-scheduleEnd", # Example parameter
"limit": 1,
},
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"MAXIMO API error: {response.text}",
)
data = response.json()
if not data:
raise HTTPException(
status_code=404, detail="No recent overhaul found"
)
# TODO: Update this based on actual MAXIMO response structure
return data[0] if isinstance(data, list) else data
except httpx.RequestError as e:
raise HTTPException(
status_code=503, detail=f"Failed to connect to MAXIMO: {str(e)}"
)

@ -0,0 +1,19 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String, TIMESTAMP, Enum, Interval
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
import enum
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class MonitoringActivity(Base, DefaultMixin):
__tablename__ = "monitoring_tr_activity"
ip_address = Column(String(45), nullable=True)
user_id = Column(UUID(as_uuid=True), nullable=True)
user_name = Column(String(100), nullable=True)
activity_type = Column(String(100), nullable=True)
app_name = Column(String(100), nullable=True)
start_time = Column(TIMESTAMP(timezone=True), nullable=True)
end_time = Column(TIMESTAMP(timezone=True), nullable=True)
duration = Column(Interval, nullable=True)

@ -0,0 +1,69 @@
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .service import create, get, get_all, update
from .schema import MonitoringActivityPagination, MonitoringActivityCreate, MonitorActivityUpdate, MonitoringActivityRead
from .model import MonitoringActivity
router = APIRouter()
@router.get("", response_model=StandardResponse[MonitoringActivityPagination])
async def get_monitoring(common: CommonParameters):
"""Get all scope activity pagination."""
# return
data = await get_all(common=common)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.post("", response_model=StandardResponse[MonitoringActivityRead])
async def create_monitoring(db_session: DbSession, activity_in: MonitoringActivityCreate):
activity = await create(db_session=db_session, activty_in=activity_in)
return StandardResponse(data=activity, message="Data created successfully")
# @router.get(
# "/{scope_equipment_activity_id}", response_model=StandardResponse[MonitoringActivityRead]
# )
# async def get_activity(db_session: DbSession, activity_id: str):
# activity = await get(db_session=db_session, activity_id=activity_id)
# if not activity:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail="A data with this id does not exist.",
# )
# return StandardResponse(data=activity, message="Data retrieved successfully")
@router.put(
"/{monitoring_id}", response_model=StandardResponse[MonitoringActivityRead]
)
async def update_scope(
db_session: DbSession, activity_in: MonitorActivityUpdate, monitoring_id
):
activity = await get(db_session=db_session, activity_id=monitoring_id)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(
db_session=db_session, activity=activity, activity_in=activity_in
),
message="Data updated successfully",
)

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class MonitoringActivity(DefultBase):
pass
class MonitoringActivityCreate(MonitoringActivity):
ip_address: str
user_id: UUID
user_name: str
activity_type: str
app_name: Optional[str] = Field(None)
start_time: datetime
end_time: Optional[datetime] = Field(None)
class MonitorActivityUpdate(MonitoringActivity):
ip_address: Optional[str] = Field(None)
user_id: Optional[UUID] = Field(None)
user_name: Optional[str] = Field(None)
activity_type: Optional[str] = Field(None)
app_name: Optional[str] = Field(None)
start_time: Optional[datetime] = Field(None)
end_time: Optional[datetime] = Field(None)
class MonitoringActivityRead(MonitoringActivity):
id: UUID
ip_address: str
user_id: UUID
user_name: str
activity_type: str
app_name: Optional[str] = Field(None)
start_time: datetime
end_time: Optional[datetime] = Field(None)
class MonitoringActivityPagination(Pagination):
items: List[MonitoringActivityRead] = []

@ -7,26 +7,27 @@ from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from .model import MasterActivity
from .schema import ActivityMaster, ActivityMasterCreate
from .model import MonitoringActivity
from .schema import MonitoringActivityCreate, MonitorActivityUpdate, MonitoringActivityRead
async def get(*, db_session: DbSession, activity_id: str) -> Optional[ActivityMaster]:
async def get(*, db_session: DbSession, activity_id: str) -> Optional[MonitoringActivity]:
"""Returns a document based on the given document id."""
result = await db_session.get(MasterActivity, activity_id)
result = await db_session.get(MonitoringActivity, activity_id)
return result
async def get_all(common: CommonParameters):
query = Select(MasterActivity)
query = Select(MonitoringActivity)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, activty_in: ActivityMasterCreate):
activity = MasterActivity(**activty_in.model_dump())
async def create(*, db_session: DbSession, activty_in: MonitoringActivityCreate):
activity = MonitoringActivity(**activty_in.model_dump())
db_session.add(activity)
await db_session.commit()
return activity
@ -35,8 +36,8 @@ async def create(*, db_session: DbSession, activty_in: ActivityMasterCreate):
async def update(
*,
db_session: DbSession,
activity: MasterActivity,
activity_in: ActivityMasterCreate
activity: MonitoringActivity,
activity_in: MonitorActivityUpdate
):
"""Updates a document."""
data = activity_in.model_dump()
@ -50,10 +51,3 @@ async def update(
await db_session.commit()
return activity
async def delete(*, db_session: DbSession, activity_id: str):
"""Deletes a document."""
activity = await db_session.get(MasterActivity, activity_id)
await db_session.delete(activity)
await db_session.commit()

@ -1,67 +0,0 @@
from typing import List
from fastapi import APIRouter, HTTPException, status
from src.database.core import DbSession
from src.models import StandardResponse
from src.overhaul.service import (get_overhaul_critical_parts,
get_overhaul_overview,
get_overhaul_schedules,
get_overhaul_system_components)
from src.overhaul_scope.schema import ScopeRead
from .schema import (OverhaulCriticalParts, OverhaulRead,
OverhaulSystemComponents)
router = APIRouter()
@router.get("", response_model=StandardResponse[OverhaulRead])
async def get_overhaul(db_session: DbSession):
"""Get all scope pagination."""
overview = await get_overhaul_overview(db_session=db_session)
schedules = await get_overhaul_schedules(db_session=db_session)
criticalParts = get_overhaul_critical_parts()
systemComponents = get_overhaul_system_components()
return StandardResponse(
data=OverhaulRead(
overview=overview,
schedules=schedules,
criticalParts=criticalParts,
systemComponents=systemComponents,
),
message="Data retrieved successfully",
)
@router.get("/schedules", response_model=StandardResponse[List[ScopeRead]])
async def get_schedules():
"""Get all overhaul schedules."""
schedules = get_overhaul_schedules()
return StandardResponse(
data=schedules,
message="Data retrieved successfully",
)
@router.get("/critical-parts", response_model=StandardResponse[OverhaulCriticalParts])
async def get_critical_parts():
"""Get all overhaul critical parts."""
criticalParts = get_overhaul_critical_parts()
return StandardResponse(
data=OverhaulCriticalParts(criticalParts=criticalParts),
message="Data retrieved successfully",
)
@router.get(
"/system-components", response_model=StandardResponse[OverhaulSystemComponents]
)
async def get_system_components():
"""Get all overhaul system components."""
systemComponents = get_overhaul_system_components()
return StandardResponse(
data=OverhaulSystemComponents(systemComponents=systemComponents),
message="Data retrieved successfully",
)

@ -1,72 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
class OverhaulBase(BaseModel):
pass
class OverhaulCriticalParts(OverhaulBase):
criticalParts: List[str] = Field(..., description="List of critical parts")
class OverhaulSchedules(OverhaulBase):
schedules: List[Dict[str, Any]] = Field(..., description="List of schedules")
class OverhaulSystemComponents(OverhaulBase):
systemComponents: Dict[str, Any] = Field(
..., description="List of system components"
)
class OverhaulRead(OverhaulBase):
overview: Dict[str, Any]
criticalParts: List[str]
schedules: List[ScopeRead]
systemComponents: Dict[str, Any]
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,154 +0,0 @@
from typing import Optional
from sqlalchemy import Delete, Select
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.overhaul_scope.model import OverhaulScope
from src.overhaul_scope.service import get_all as get_all_session
from src.overhaul_scope.service import get_overview_overhaul
async def get_overhaul_overview(db_session: DbSession):
"""Get all overhaul overview."""
results = await get_overview_overhaul(db_session=db_session)
return results
def get_overhaul_critical_parts():
"""Get all overhaul critical parts."""
return [
"Boiler feed pump",
"Boiler reheater system",
"Drum Level (Right) Root Valve A",
"BCP A Discharge Valve",
"BFPT A EXH Press HI Root VLV",
]
async def get_overhaul_schedules(*, db_session: DbSession):
"""Get all overhaul schedules."""
query = Select(OverhaulScope)
results = await db_session.execute(query)
return results.scalars().all()
def get_overhaul_system_components():
"""Get all overhaul system components with dummy data."""
return {
"HPT": {
"efficiency": "92%",
"work_hours": "1200",
"reliability": "96%",
},
"IPT": {
"efficiency": "91%",
"work_hours": "1100",
"reliability": "95%",
},
"LPT": {
"efficiency": "90%",
"work_hours": "1000",
"reliability": "94%",
},
"EG": {
"efficiency": "88%",
"work_hours": "950",
"reliability": "93%",
},
"boiler": {
"efficiency": "90%",
"work_hours": "1000",
"reliability": "95%",
},
"HPH1": {
"efficiency": "89%",
"work_hours": "1050",
"reliability": "94%",
},
"HPH2": {
"efficiency": "88%",
"work_hours": "1020",
"reliability": "93%",
},
"HPH3": {
"efficiency": "87%",
"work_hours": "1010",
"reliability": "92%",
},
"HPH5": {
"efficiency": "86%",
"work_hours": "980",
"reliability": "91%",
},
"HPH6": {
"efficiency": "85%",
"work_hours": "970",
"reliability": "90%",
},
"HPH7": {
"efficiency": "84%",
"work_hours": "960",
"reliability": "89%",
},
"Condensor": {
"efficiency": "83%",
"work_hours": "940",
"reliability": "88%",
},
"Deaerator": {
"efficiency": "82%",
"work_hours": "930",
"reliability": "87%",
},
}
# async def get(*, db_session: DbSession, scope_id: str) -> Optional[Scope]:
# """Returns a document based on the given document id."""
# query = Select(Scope).filter(Scope.id == scope_id)
# result = await db_session.execute(query)
# return result.scalars().one_or_none()
# async def get_all(*, db_session: DbSession):
# """Returns all documents."""
# query = Select(Scope)
# result = await db_session.execute(query)
# return result.scalars().all()
# async def create(*, db_session: DbSession, scope_id: ScopeCreate):
# """Creates a new document."""
# scope = Scope(**scope_id.model_dump())
# db_session.add(scope)
# await db_session.commit()
# return scope
# async def update(*, db_session: DbSession, scope: Scope, scope_id: ScopeUpdate):
# """Updates a document."""
# data = scope_id.model_dump()
# update_data = scope_id.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(scope, field, update_data[field])
# await db_session.commit()
# return scope
# async def delete(*, db_session: DbSession, scope_id: str):
# """Deletes a document."""
# query = Delete(Scope).where(Scope.id == scope_id)
# await db_session.execute(query)
# await db_session.commit()

@ -1,35 +0,0 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class OverhaulActivity(Base, DefaultMixin):
__tablename__ = "oh_tr_overhaul_activity"
assetnum = Column(String, nullable=True)
overhaul_scope_id = Column(
UUID(as_uuid=True), ForeignKey("oh_ms_overhaul_scope.id"), nullable=False
)
material_cost = Column(Float, nullable=False, default=0)
service_cost = Column(Float, nullable=False, default=0)
status = Column(String, nullable=False, default="pending")
equipment = relationship(
"MasterEquipment",
lazy="raise",
primaryjoin="and_(OverhaulActivity.assetnum == foreign(MasterEquipment.assetnum))",
uselist=False, # Add this if it's a one-to-one relationship
)
overhaul_scope = relationship(
"OverhaulScope",
lazy="raise",
)
overhaul_jobs = relationship(
"OverhaulJob", back_populates="overhaul_activity", lazy="raise"
)

@ -1,118 +0,0 @@
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .schema import (OverhaulActivityCreate, OverhaulActivityPagination,
OverhaulActivityRead, OverhaulActivityUpdate)
from .service import create, delete, get, get_all, update
router = APIRouter()
@router.get(
"/{overhaul_session}", response_model=StandardResponse[OverhaulActivityPagination]
)
async def get_scope_equipments(
common: CommonParameters,
overhaul_session: str,
assetnum: Optional[str] = Query(None),
scope_name: Optional[str] = Query(None),
):
"""Get all scope activity pagination."""
# return
data = await get_all(
common=common,
assetnum=assetnum,
scope_name=scope_name,
overhaul_session_id=overhaul_session,
)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.post("/{overhaul_session}", response_model=StandardResponse[List[str]])
async def create_overhaul_equipment(
db_session: DbSession,
overhaul_activty_in: OverhaulActivityCreate,
overhaul_session: str,
):
activity = await create(
db_session=db_session,
overhaul_activty_in=overhaul_activty_in,
overhaul_session_id=overhaul_session,
)
return StandardResponse(data=activity, message="Data created successfully")
@router.get(
"/{overhaul_session}/{assetnum}",
response_model=StandardResponse[OverhaulActivityRead],
)
async def get_overhaul_equipment(
db_session: DbSession, assetnum: str, overhaul_session
):
equipment = await get(
db_session=db_session, assetnum=assetnum, overhaul_session_id=overhaul_session
)
if not equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=equipment, message="Data retrieved successfully")
@router.put(
"/{overhaul_session}/{assetnum}",
response_model=StandardResponse[OverhaulActivityRead],
)
async def update_scope(
db_session: DbSession,
scope_equipment_activity_in: OverhaulActivityUpdate,
assetnum: str,
):
activity = await get(db_session=db_session, assetnum=assetnum)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(
db_session=db_session,
activity=activity,
scope_equipment_activity_in=scope_equipment_activity_in,
),
message="Data updated successfully",
)
@router.delete(
"/{overhaul_session}/{assetnum}",
response_model=StandardResponse[OverhaulActivityRead],
)
async def delete_scope(db_session: DbSession, assetnum: str):
activity = await get(db_session=db_session, assetnum=assetnum)
if not activity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, assetnum=assetnum)
return StandardResponse(message="Data deleted successfully", data=activity)

@ -1,49 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
from src.scope_equipment.schema import MasterEquipmentTree
from src.job.schema import ActivityMasterRead
class OverhaulActivityBase(DefultBase):
pass
class OverhaulActivityCreate(OverhaulActivityBase):
assetnums: List[str]
scope_name: str
class OverhaulActivityUpdate(OverhaulActivityBase):
material_cost: Optional[float] = Field(0)
service_cost: Optional[float] = Field(0)
class OverhaulScope(DefultBase):
type: str
start_date: datetime
end_date: datetime
duration_oh: int
class ScopeEquipmentJob(DefultBase):
job: ActivityMasterRead
class OverhaulJob(DefultBase):
scope_equipment_job: ScopeEquipmentJob
class OverhaulActivityRead(OverhaulActivityBase):
id: UUID
material_cost: Optional[float] = Field(0)
service_cost: Optional[float] = Field(0)
assetnum: str = Field(..., description="Assetnum is required")
status: str
equipment: MasterEquipmentTree
overhaul_scope: OverhaulScope
overhaul_jobs: Optional[List[OverhaulJob]] = Field([])
class OverhaulActivityPagination(Pagination):
items: List[OverhaulActivityRead] = []

@ -1,203 +0,0 @@
import asyncio
from typing import List, Optional
from uuid import UUID
from sqlalchemy import Delete, Select, func, select
from sqlalchemy import update as sqlUpdate
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import joinedload, selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.overhaul_activity.utils import get_material_cost, get_service_cost
from src.overhaul_scope.model import OverhaulScope
from src.overhaul_scope.service import get as get_session
from src.scope_equipment.model import MasterEquipment
from src.job.model import MasterActivity
from src.scope_equipment_job.model import ScopeEquipmentJob
from src.overhaul_job.model import OverhaulJob
from .model import OverhaulActivity
from .schema import (OverhaulActivityCreate, OverhaulActivityRead,
OverhaulActivityUpdate)
async def get(
*, db_session: DbSession, assetnum: str, overhaul_session_id: Optional[UUID] = None
) -> Optional[OverhaulActivityRead]:
"""Returns a document based on the given document id."""
query = (
Select(OverhaulActivity)
.where(OverhaulActivity.assetnum == assetnum)
.options(joinedload(OverhaulActivity.equipment))
)
if overhaul_session_id:
query = query.filter(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
result = await db_session.execute(query)
return result.scalar()
async def get_all(
*,
common: CommonParameters,
overhaul_session_id: UUID,
assetnum: Optional[str] = None,
scope_name: Optional[str] = None
):
query = (
Select(OverhaulActivity)
.where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
.options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent))))
.options(selectinload(OverhaulActivity.overhaul_scope))
.options(selectinload(OverhaulActivity.overhaul_jobs).options(joinedload(OverhaulJob.scope_equipment_job).options(joinedload(ScopeEquipmentJob.job))))
)
if assetnum:
query = query.filter(OverhaulActivity.assetnum == assetnum).options(
joinedload(OverhaulActivity.overhaul_scope)
)
if scope_name:
query = query.filter(OverhaulActivity.scope_name == scope_name).options(
joinedload(OverhaulActivity.overhaul_scope)
)
results = await search_filter_sort_paginate(model=query, **common)
##raise Exception(results['items'][0].equipment.parent.__dict__)
return results
async def get_all_by_session_id(*, db_session: DbSession, overhaul_session_id):
query = (
Select(OverhaulActivity)
.where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
.options(joinedload(OverhaulActivity.equipment).options(joinedload(MasterEquipment.parent).options(joinedload(MasterEquipment.parent))))
.options(selectinload(OverhaulActivity.overhaul_scope))
)
results = await db_session.execute(query)
return results.scalars().all()
# async def create(*, db_session: DbSession, overhaul_activty_in: OverhaulActivityCreate, overhaul_session_id: UUID):
# # Check if the combination of assetnum and activity_id already exists
# existing_equipment_query = (
# Select(OverhaulActivity)
# .where(
# OverhaulActivity.assetnum == overhaul_activty_in.assetnum,
# OverhaulActivity.overhaul_scope_id == overhaul_session_id
# )
# )
# result = await db_session.execute(existing_equipment_query)
# existing_activity = result.scalar_one_or_none()
# # If the combination exists, raise an exception or return the existing activity
# if existing_activity:
# raise ValueError("This assetnum already exist.")
# activity = OverhaulActivity(
# **overhaul_activty_in.model_dump(),
# overhaul_scope_id=overhaul_session_id)
# db_session.add(activity)
# await db_session.commit()
# # Refresh and load relationships using joinedload
# query = (
# Select(OverhaulActivity)
# .options(joinedload(OverhaulActivity.equipment))
# .where(OverhaulActivity.id == activity.id)
# )
# result = await db_session.execute(query)
# activity_with_relationship = result.scalar_one()
# return activity_with_relationship
async def create(
*,
db_session: DbSession,
overhaul_activty_in: OverhaulActivityCreate,
overhaul_session_id: UUID
):
"""Creates a new document."""
assetnums = overhaul_activty_in.assetnums
if not assetnums:
return []
# Get session and count in parallel
session = await get_session(
db_session=db_session, overhaul_session_id=overhaul_session_id
)
equipment_count = await db_session.scalar(
select(func.count())
.select_from(OverhaulActivity)
.where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
)
# Calculate costs for all records
total_equipment = equipment_count + len(assetnums)
material_cost = get_material_cost(
scope=session.type, total_equipment=total_equipment
)
service_cost = get_service_cost(scope=session.type, total_equipment=total_equipment)
# Create the insert statement
stmt = insert(OverhaulActivity).values(
[
{
"assetnum": assetnum,
"overhaul_scope_id": overhaul_session_id,
"material_cost": material_cost,
"service_cost": service_cost,
}
for assetnum in assetnums
]
)
# Add the ON CONFLICT DO NOTHING clause
stmt = stmt.on_conflict_do_nothing(index_elements=["assetnum", "overhaul_scope_id"])
# Execute the statement
await db_session.execute(stmt)
await db_session.execute(
sqlUpdate(OverhaulActivity)
.where(OverhaulActivity.overhaul_scope_id == overhaul_session_id)
.values(material_cost=material_cost, service_cost=service_cost)
)
await db_session.commit()
return assetnums
async def update(
*,
db_session: DbSession,
activity: OverhaulActivity,
overhaul_activity_in: OverhaulActivityUpdate
):
"""Updates a document."""
data = overhaul_activity_in.model_dump()
update_data = overhaul_activity_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(activity, field, update_data[field])
await db_session.commit()
return activity
async def delete(*, db_session: DbSession, overhaul_activity_id: str):
"""Deletes a document."""
activity = await db_session.get(OverhaulActivity, overhaul_activity_id)
await db_session.delete(activity)
await db_session.commit()

@ -1,35 +0,0 @@
from decimal import Decimal, getcontext
def get_material_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == "B":
result = Decimal("365539731101") / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal("8565468127") / Decimal(str(total_equipment))
return float(result)
return float(0)
def get_service_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == "B":
result = Decimal("36405830225") / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal("36000000000") / Decimal(str(total_equipment))
return float(result)
return float(0)

@ -1,29 +0,0 @@
from sqlalchemy import (UUID, Column, DateTime, Float, ForeignKey, Integer,
String)
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class OverhaulJob(Base, DefaultMixin):
__tablename__ = "oh_tr_overhaul_job"
overhaul_activity_id = Column(
UUID(as_uuid=True), ForeignKey("oh_tr_overhaul_activity.id"), nullable=False
)
scope_equipment_job_id = Column(
UUID(as_uuid=True),
ForeignKey("oh_ms_scope_equipment_job.id", ondelete="cascade"),
nullable=False,
)
notes = Column(String, nullable=True)
status = Column(String, nullable=True, default="pending")
scope_equipment_job = relationship(
"ScopeEquipmentJob", lazy="raise", back_populates="overhaul_jobs"
)
overhaul_activity = relationship("OverhaulActivity", lazy="raise", back_populates="overhaul_jobs")

@ -1,91 +0,0 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.models import StandardResponse
from .schema import (OverhaulJobBase, OverhaulJobCreate, OverhaulJobPagination,
OverhaulJobRead)
from .service import create, delete, get_all
router = APIRouter()
@router.get(
"/{overhaul_equipment_id}", response_model=StandardResponse[OverhaulJobPagination]
)
async def get_jobs(common: CommonParameters, overhaul_equipment_id: str):
"""Get all scope pagination."""
# return
results = await get_all(common=common, overhaul_equipment_id=overhaul_equipment_id)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.post("/{overhaul_equipment_id}", response_model=StandardResponse[None])
async def create_overhaul_equipment_jobs(
db_session: DbSession, overhaul_equipment_id, overhaul_job_in: OverhaulJobCreate
):
"""Get all scope activity pagination."""
# return
await create(
db_session=db_session,
overhaul_equipment_id=overhaul_equipment_id,
overhaul_job_in=overhaul_job_in,
)
return StandardResponse(
data=None,
message="Data created successfully",
)
@router.delete("/{overhaul_job_id}", response_model=StandardResponse[None])
async def delete_overhaul_equipment_job(db_session: DbSession, overhaul_job_id):
await delete(db_session=db_session, overhaul_job_id=overhaul_job_id)
return StandardResponse(
data=None,
message="Data deleted successfully",
)
# @router.post("", response_model=StandardResponse[List[str]])
# async def create_scope(db_session: DbSession, scope_in: OverhaulJobCreate):
# overhaul_job = await create(db_session=db_session, scope_in=scope_in)
# return StandardResponse(data=overhaul_job, message="Data created successfully")
# @router.put("/{scope_id}", response_model=StandardResponse[ScopeRead])
# async def update_scope(db_session: DbSession, scope_id: str, scope_in: ScopeUpdate, current_user: CurrentUser):
# scope = await get(db_session=db_session, scope_id=scope_id)
# if not scope:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail="A data with this id does not exist.",
# )
# return StandardResponse(data=await update(db_session=db_session, scope=scope, scope_in=scope_in), message="Data updated successfully")
# @router.delete("/{scope_id}", response_model=StandardResponse[ScopeRead])
# async def delete_scope(db_session: DbSession, scope_id: str):
# scope = await get(db_session=db_session, scope_id=scope_id)
# if not scope:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail=[{"msg": "A data with this id does not exist."}],
# )
# await delete(db_session=db_session, scope_id=scope_id)
# return StandardResponse(message="Data deleted successfully", data=scope)

@ -1,40 +0,0 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
from src.scope_equipment_job.schema import ScopeEquipmentJobRead
from src.job.schema import ActivityMasterRead
class OverhaulJobBase(DefultBase):
pass
class OverhaulJobCreate(OverhaulJobBase):
job_ids: Optional[List[UUID]] = []
class OverhaulJobUpdate(OverhaulJobBase):
pass
class OverhaulActivity(DefultBase):
id: UUID
overhaul_scope_id: UUID
overhaul_scope: ScopeRead
class ScopeEquipment(DefultBase):
job: ActivityMasterRead
class OverhaulJobRead(OverhaulJobBase):
id: UUID
scope_equipment_job: ScopeEquipment
overhaul_activity: OverhaulActivity
class OverhaulJobPagination(Pagination):
items: List[OverhaulJobRead] = []

@ -1,119 +0,0 @@
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, func
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.scope_equipment_job.model import ScopeEquipmentJob
from src.overhaul_activity.model import OverhaulActivity
from .model import OverhaulJob
from .schema import OverhaulJobCreate
async def get_all(*, common, overhaul_equipment_id: str):
"""Returns all documents."""
query = (
Select(OverhaulJob)
.where(OverhaulJob.overhaul_activity_id == overhaul_equipment_id)
.options(
selectinload(OverhaulJob.scope_equipment_job).options(
selectinload(ScopeEquipmentJob.job)),
selectinload(OverhaulJob.overhaul_activity).options(
selectinload(OverhaulActivity.overhaul_scope)),
)
)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(
*, db_session: DbSession, overhaul_equipment_id, overhaul_job_in: OverhaulJobCreate
):
overhaul_jobs = []
if not overhaul_equipment_id:
raise ValueError("assetnum parameter is required")
equipment_stmt = Select(OverhaulJob).where(
OverhaulJob.overhaul_activity_id == overhaul_equipment_id
)
equipment = await db_session.scalar(equipment_stmt)
for job_id in overhaul_job_in.job_ids:
overhaul_equipment_job = OverhaulJob(
overhaul_activity_id=overhaul_equipment_id, scope_equipment_job_id=job_id
)
overhaul_jobs.append(overhaul_equipment_job)
db_session.add_all(overhaul_jobs)
await db_session.commit()
return overhaul_job_in.job_ids
async def delete(
*,
db_session: DbSession,
overhaul_job_id: str,
) -> bool:
"""
Deletes a scope job and returns success status.
Args:
db_session: Database session
scope_job_id: ID of the scope job to delete
user_id: ID of user performing the deletion
Returns:
bool: True if deletion was successful, False otherwise
Raises:
NotFoundException: If scope job doesn't exist
AuthorizationError: If user lacks delete permission
"""
try:
# Check if job exists
scope_job = await db_session.get(OverhaulJob, overhaul_job_id)
if not scope_job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
# Perform deletion
await db_session.delete(scope_job)
await db_session.commit()
return True
except Exception as e:
await db_session.rollback()
raise
# async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
# """Updates a document."""
# data = scope_in.model_dump()
# update_data = scope_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(scope, field, update_data[field])
# await db_session.commit()
# return scope
# async def delete(*, db_session: DbSession, scope_id: str):
# """Deletes a document."""
# query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
# await db_session.execute(query)
# await db_session.commit()

@ -1,18 +0,0 @@
from sqlalchemy import (UUID, Column, DateTime, Float, ForeignKey, Integer,
String)
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class OverhaulSchedule(Base, DefaultMixin):
__tablename__ = "rp_oh_schedule"
year = Column(Integer, nullable=False)
plan_duration = Column(Integer, nullable=True)
planned_outage = Column(Integer, nullable=True)
actual_shutdown = Column(Integer, nullable=True)
start = Column(DateTime(timezone=True)) # This will be TIMESTAMP WITH TIME ZONE
finish = Column(DateTime(timezone=True))
remark = Column(String, nullable=True)

@ -1,63 +0,0 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.models import StandardResponse
from .schema import (OverhaulScheduleCreate, OverhaulSchedulePagination, OverhaulScheduleUpdate)
from .service import create, get_all, delete, update
router = APIRouter()
@router.get(
"", response_model=StandardResponse[OverhaulSchedulePagination]
)
async def get_schedules(common: CommonParameters):
"""Get all scope pagination."""
# return
results = await get_all(common=common)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.post("", response_model=StandardResponse[None])
async def create_overhaul_equipment_jobs(
db_session: DbSession, overhaul_job_in: OverhaulScheduleCreate
):
await create(
db_session=db_session,
overhaul_job_in=overhaul_job_in,
)
return StandardResponse(
data=None,
message="Data created successfully",
)
@router.put("/{overhaul_job_id}", response_model=StandardResponse[None])
async def update_overhaul_schedule(
db_session: DbSession, overhaul_job_id: str, overhaul_job_in: OverhaulScheduleUpdate
):
await update(db_session=db_session, overhaul_schedule_id=overhaul_job_id, overhaul_job_in=overhaul_job_in)
return StandardResponse(
data=None,
message="Data updated successfully",
)
@router.delete("/{overhaul_job_id}", response_model=StandardResponse[None])
async def delete_overhaul_equipment_job(db_session: DbSession, overhaul_job_id):
await delete(db_session=db_session, overhaul_schedule_id=overhaul_job_id)
return StandardResponse(
data=None,
message="Data deleted successfully",
)

@ -1,44 +0,0 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
from src.scope_equipment_job.schema import ScopeEquipmentJobRead
from src.job.schema import ActivityMasterRead
class OverhaulScheduleBase(DefultBase):
pass
class OverhaulScheduleCreate(OverhaulScheduleBase):
year: int
plan_duration: Optional[int] = Field(None)
planned_outage: Optional[int] = Field(None)
actual_shutdown: Optional[int] = Field(None)
start: datetime
finish: datetime
remark: Optional[str] = Field(None)
class OverhaulScheduleUpdate(OverhaulScheduleBase):
start: datetime
finish: datetime
class OverhaulScheduleRead(OverhaulScheduleBase):
id: UUID
year: int
plan_duration: Optional[int]
planned_outage: Optional[int]
actual_shutdown: Optional[int]
start: datetime
finish: datetime
remark: Optional[str]
class OverhaulSchedulePagination(Pagination):
items: List[OverhaulScheduleRead] = []

@ -1,57 +0,0 @@
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, func
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.scope_equipment_job.model import ScopeEquipmentJob
from src.overhaul_activity.model import OverhaulActivity
from .model import OverhaulSchedule
from .schema import OverhaulScheduleCreate, OverhaulScheduleUpdate
async def get_all(*, common):
"""Returns all documents."""
query = Select(OverhaulSchedule).order_by(OverhaulSchedule.start.desc())
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(
*, db_session: DbSession, overhaul_job_in: OverhaulScheduleCreate
):
schedule = OverhaulSchedule(**overhaul_job_in.model_dump())
db_session.add(schedule)
await db_session.commit()
return schedule
async def update(*, db_session: DbSession, overhaul_schedule_id: str, overhaul_job_in: OverhaulScheduleUpdate):
"""Updates a document."""
data = overhaul_job_in.model_dump()
overhaul_schedule = await db_session.get(OverhaulSchedule, overhaul_schedule_id)
update_data = overhaul_job_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(overhaul_schedule, field, update_data[field])
await db_session.commit()
return overhaul_schedule
async def delete(*, db_session: DbSession, overhaul_schedule_id: str):
"""Deletes a document."""
query = Delete(OverhaulSchedule).where(OverhaulSchedule.id == overhaul_schedule_id)
await db_session.execute(query)
await db_session.commit()

@ -1,17 +0,0 @@
from sqlalchemy import Column, DateTime, Float, Integer, String
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class OverhaulScope(Base, DefaultMixin):
__tablename__ = "oh_ms_overhaul_scope"
type = Column(String, nullable=False) # Changed to non-nullable to match the model
start_date = Column(DateTime(timezone=True), nullable=False) # Made non-nullable to match model
end_date = Column(DateTime(timezone=True), nullable=True) # Already nullable
duration_oh = Column(Integer, nullable=True)
crew_number = Column(Integer, nullable=True, default=1)
status = Column(String, nullable=False, default="Upcoming")
activity_equipments = relationship("OverhaulActivity", lazy="selectin")

@ -1,81 +0,0 @@
from typing import Optional
from fastapi import APIRouter, HTTPException, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.models import StandardResponse
from .model import OverhaulScope
from .schema import ScopeCreate, ScopePagination, ScopeRead, ScopeUpdate
from .service import create, delete, get, get_all, update
router = APIRouter()
@router.get("", response_model=StandardResponse[ScopePagination])
async def get_scopes(common: CommonParameters, scope_name: Optional[str] = None):
"""Get all scope pagination."""
# return
results = await get_all(common=common, scope_name=scope_name)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.get("/{overhaul_session_id}", response_model=StandardResponse[ScopeRead])
async def get_scope(db_session: DbSession, overhaul_session_id: str):
scope = await get(db_session=db_session, overhaul_session_id=overhaul_session_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=scope, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[ScopeRead])
async def create_scope(db_session: DbSession, scope_in: ScopeCreate):
scope = await create(db_session=db_session, scope_in=scope_in)
return StandardResponse(data=scope, message="Data created successfully")
@router.put("/{scope_id}", response_model=StandardResponse[ScopeRead])
async def update_scope(
db_session: DbSession,
scope_id: str,
scope_in: ScopeUpdate,
current_user: CurrentUser,
):
scope = await get(db_session=db_session, scope_id=scope_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(db_session=db_session, scope=scope, scope_in=scope_in),
message="Data updated successfully",
)
@router.delete("/{scope_id}", response_model=StandardResponse[ScopeRead])
async def delete_scope(db_session: DbSession, scope_id: str):
scope = await get(db_session=db_session, scope_id=scope_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, scope_id=scope_id)
return StandardResponse(message="Data deleted successfully", data=scope)

@ -1,33 +0,0 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
class ScopeBase(DefultBase):
duration_oh: Optional[int] = Field(720, title="Duration OH")
crew_number: Optional[int] = Field(10, title="Crew")
status: Optional[str] = Field("Upcoming")
type: str = Field(..., title="Type") # Added title
class ScopeCreate(ScopeBase):
start_date: datetime
end_date: Optional[datetime] = Field(None)
class ScopeUpdate(ScopeBase):
pass
class ScopeRead(ScopeBase):
id: UUID
start_date: datetime
end_date: Optional[datetime]
class ScopePagination(Pagination):
items: List[ScopeRead] = []

@ -1,199 +0,0 @@
from typing import Optional
from sqlalchemy import Delete, Select, func
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.overhaul_activity.model import OverhaulActivity
from src.scope_equipment.service import get_by_scope_name
from src.utils import time_now
from .model import OverhaulScope
from .schema import ScopeCreate, ScopeUpdate
from .utils import get_material_cost, get_service_cost
from datetime import datetime
async def get(
*, db_session: DbSession, overhaul_session_id: str
) -> Optional[OverhaulScope]:
"""Returns a document based on the given document id."""
query = Select(OverhaulScope).filter(OverhaulScope.id == overhaul_session_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(*, common, scope_name: Optional[str] = None):
"""Returns all documents."""
query = Select(OverhaulScope)
if scope_name:
query = query.filter(OverhaulScope.type == scope_name)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, scope_in: ScopeCreate):
# Ensure dates are datetime objects
if isinstance(scope_in.start_date, str):
try:
start_date = datetime.fromisoformat(scope_in.start_date.replace('Z', '+00:00'))
except ValueError:
start_date = datetime.strptime(scope_in.start_date, "%Y-%m-%d %H:%M:%S")
else:
start_date = scope_in.start_date
# Handle end_date (which could be None)
end_date = None
if scope_in.end_date:
if isinstance(scope_in.end_date, str):
try:
end_date = datetime.fromisoformat(scope_in.end_date.replace('Z', '+00:00'))
except ValueError:
end_date = datetime.strptime(scope_in.end_date, "%Y-%m-%d %H:%M:%S")
else:
end_date = scope_in.end_date
# Calculate duration in days if both dates are available
duration_days = None
if start_date and end_date:
duration_days = (end_date - start_date).days
# Create the OverhaulScope object with all hardcoded values
overhaul_session = OverhaulScope(
start_date=scope_in.start_date,
end_date=scope_in.end_date,
type=scope_in.type, # Hardcoded type
duration_oh=duration_days, # Hardcoded duration (30 days)
crew_number=scope_in.crew_number, # Hardcoded crew number
status=scope_in.status # Hardcoded status
)
# raise Exception(overhaul_session.start_date)
db_session.add(overhaul_session)
# Need to flush to get the id
await db_session.flush()
scope_name = scope_in.type
# Fix the function call - parameters were in wrong order
equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
material_cost = get_material_cost(
scope=overhaul_session.type, total_equipment=len(equipments)
)
service_cost = get_service_cost(
scope=overhaul_session.type, total_equipment=len(equipments)
)
scope_equipments = [
OverhaulActivity(
assetnum=equipment.assetnum,
overhaul_scope_id=overhaul_session.id,
material_cost=material_cost,
service_cost=service_cost,
)
for equipment in equipments
]
if scope_equipments: # Only add if there are items
db_session.add_all(scope_equipments)
await db_session.commit()
return overhaul_session
async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
"""Updates a document."""
data = scope_in.model_dump()
update_data = scope_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(scope, field, update_data[field])
await db_session.commit()
return scope
async def delete(*, db_session: DbSession, scope_id: str):
"""Deletes a document."""
query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
await db_session.execute(query)
await db_session.commit()
async def get_overview_overhaul(*, db_session: DbSession):
current_date = time_now().date()
# For ongoing overhaul with count
ongoing_query = (
Select(OverhaulScope, func.count(OverhaulActivity.id).label("equipment_count"))
.outerjoin(OverhaulScope.activity_equipments)
.where(
OverhaulScope.start_date <= current_date,
OverhaulScope.end_date >= current_date,
)
.group_by(OverhaulScope.id)
)
ongoing_result = await db_session.execute(ongoing_query)
# Use first() instead of scalar_one_or_none()
ongoing_result = ongoing_result.first()
if ongoing_result:
ongoing_overhaul, equipment_count = ongoing_result # Unpack the result tuple
return {
"status": "Ongoing",
"overhaul": {
"id": ongoing_overhaul.id,
"type": ongoing_overhaul.type,
"start_date": ongoing_overhaul.start_date,
"end_date": ongoing_overhaul.end_date,
"duration_oh": ongoing_overhaul.duration_oh,
"crew_number": ongoing_overhaul.crew_number,
"remaining_days": (ongoing_overhaul.end_date - current_date).days,
"equipment_count": equipment_count,
},
}
# For upcoming overhaul with count
upcoming_query = (
Select(OverhaulScope, func.count(OverhaulActivity.id).label("equipment_count"))
.outerjoin(OverhaulScope.activity_equipments)
.where(
OverhaulScope.start_date > current_date,
)
.group_by(OverhaulScope.id)
.order_by(OverhaulScope.start_date)
)
upcoming_result = await db_session.execute(upcoming_query)
upcoming_result = upcoming_result.first()
if upcoming_result:
upcoming_overhaul, equipment_count = upcoming_result # Unpack the result tuple
days_until = (upcoming_overhaul.start_date - current_date).days
return {
"status": "Upcoming",
"overhaul": {
"id": upcoming_overhaul.id,
"type": upcoming_overhaul.type,
"start_date": upcoming_overhaul.start_date,
"end_date": upcoming_overhaul.end_date,
"duration_oh": upcoming_overhaul.duration_oh,
"crew_number": upcoming_overhaul.crew_number,
"remaining_days": days_until,
"equipment_count": equipment_count,
},
}
return {"status": "no_overhaul", "overhaul": None}

@ -1,35 +0,0 @@
from decimal import Decimal, getcontext
def get_material_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == "B":
result = Decimal("365539731101") / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal("8565468127") / Decimal(str(total_equipment))
return float(result)
return float(0)
def get_service_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == "B":
result = Decimal("36405830225") / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal("36000000000") / Decimal(str(total_equipment))
return float(result)
return float(0)

@ -1,6 +0,0 @@
from src.enums import OptimumOHEnum
class ScopeEquipmentType(OptimumOHEnum):
TEMP = "Temporary"
PERM = "Permanent"

@ -1,52 +0,0 @@
from sqlalchemy import UUID, Column, Date, Float, ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class ScopeEquipment(Base, DefaultMixin):
__tablename__ = "oh_ms_scope_equipment"
assetnum = Column(String, nullable=True)
scope_overhaul = Column(String, nullable=False)
type = Column(String, nullable=False, default="Permanent")
removal_date = Column(Date, nullable=True)
assigned_date = Column(Date, nullable=True)
master_equipment = relationship(
"MasterEquipment",
lazy="raise",
primaryjoin="and_(ScopeEquipment.assetnum == foreign(MasterEquipment.assetnum))",
uselist=False, # Add this if it's a one-to-one relationship
)
class MasterEquipment(Base, DefaultMixin):
__tablename__ = "ms_equipment_master"
id = Column(UUID(as_uuid=True), primary_key=True, index=True)
parent_id = Column(
UUID(as_uuid=True),
ForeignKey("ms_equipment_master.id", ondelete="CASCADE"),
nullable=True,
)
assetnum = Column(String, nullable=True)
system_tag = Column(String, nullable=True)
location_tag = Column(String, nullable=True)
name = Column(String, nullable=True)
equipment_tree_id = Column(
UUID(as_uuid=True), ForeignKey("ms_equipment_tree.id"), nullable=True
)
equipment_tree = relationship("MasterEquipmentTree", backref="master_equipments")
parent = relationship("MasterEquipment", remote_side=[id], lazy="selectin")
class MasterEquipmentTree(Base, DefaultMixin):
__tablename__ = "ms_equipment_tree"
level_no = Column(Integer)

@ -1,86 +0,0 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status
from fastapi.params import Query
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.models import StandardResponse
from .model import ScopeEquipment
from .schema import (MasterEquipmentPagination, ScopeEquipmentCreate,
ScopeEquipmentPagination, ScopeEquipmentRead,
ScopeEquipmentUpdate)
from .service import (create, delete, get_all, get_all_master_equipment,
get_by_assetnum, update)
router = APIRouter()
@router.get("", response_model=StandardResponse[ScopeEquipmentPagination])
async def get_scope_equipments(common: CommonParameters, scope_name: str = Query(None)):
"""Get all scope pagination."""
# return
data = await get_all(common=common, scope_name=scope_name)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.get(
"/available/{scope_name}",
response_model=StandardResponse[MasterEquipmentPagination],
)
async def get_master_equipment(common: CommonParameters, scope_name: str):
results = await get_all_master_equipment(common=common, scope_name=scope_name)
return StandardResponse(data=results, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[List[str]])
async def create_scope_equipment(
db_session: DbSession, scope_equipment_in: ScopeEquipmentCreate
):
scope = await create(db_session=db_session, scope_equipment_in=scope_equipment_in)
return StandardResponse(data=scope, message="Data created successfully")
@router.put("/{assetnum}", response_model=StandardResponse[ScopeEquipmentRead])
async def update_scope_equipment(
db_session: DbSession, assetnum: str, scope__equipment_in: ScopeEquipmentUpdate
):
scope_equipment = await get_by_assetnum(db_session=db_session, assetnum=assetnum)
if not scope_equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(
data=await update(
db_session=db_session,
scope_equipment=scope_equipment,
scope__equipment_in=scope__equipment_in,
),
message="Data updated successfully",
)
@router.delete("/{assetnum}", response_model=StandardResponse[None])
async def delete_scope_equipment(db_session: DbSession, assetnum: str):
scope_equipment = await get_by_assetnum(db_session=db_session, assetnum=assetnum)
if not scope_equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, assetnum=assetnum)
return StandardResponse(message="Data deleted successfully", data=None)

@ -1,58 +0,0 @@
from datetime import datetime
from typing import List, Optional, ForwardRef
from uuid import UUID
from pydantic import Field, computed_field, field_validator, validator
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
from .enum import ScopeEquipmentType
class MasterEquipmentBase(DefultBase):
name: Optional[str] = Field(None, title="Name")
location_tag: Optional[str] = Field(None, title="Location Tag")
class ScopeEquipmentBase(DefultBase):
scope_overhaul: Optional[str] = Field(None, title="Scope ID")
class ScopeEquipmentCreate(DefultBase):
assetnums: List[str]
scope_name: str
removal_date: Optional[datetime] = Field(None)
type: Optional[str] = Field(ScopeEquipmentType.PERM)
class ScopeEquipmentUpdate(ScopeEquipmentBase):
assetnum: Optional[str] = Field(None, title="Asset Number")
class ScopeEquipmentRead(ScopeEquipmentBase):
id: UUID
assetnum: str
assigned_date: datetime
master_equipment: Optional[MasterEquipmentBase] = Field(None)
class ScopeEquipmentPagination(Pagination):
items: List[ScopeEquipmentRead] = []
class MasterEquipmentRead(DefultBase):
assetnum: Optional[str] = Field(None, title="Asset Number")
location_tag: Optional[str] = Field(None, title="Location Tag")
name: str
EquipmentMasterTreeRef = ForwardRef("MasterEquipmentTree")
class MasterEquipmentTree(MasterEquipmentRead):
parent_id: Optional[UUID]
parent: Optional[EquipmentMasterTreeRef] = Field(None) # type: ignore
class MasterEquipmentPagination(Pagination):
items: List[MasterEquipmentRead] = []

@ -1,202 +0,0 @@
from datetime import datetime, timedelta
from typing import Optional, Union
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, and_, desc, func, not_, or_
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.overhaul_scope.model import OverhaulScope
from src.scope_equipment.enum import ScopeEquipmentType
from src.workorder.model import MasterWorkOrder
from .model import MasterEquipment, MasterEquipmentTree, ScopeEquipment
from .schema import ScopeEquipmentCreate, ScopeEquipmentUpdate
async def get_by_assetnum(*, db_session: DbSession, assetnum: str):
query = (
Select(ScopeEquipment)
.filter(ScopeEquipment.assetnum == assetnum)
.options(selectinload(ScopeEquipment.master_equipment))
)
result = await db_session.execute(query)
return result.unique().scalars().one_or_none()
async def get_all(*, common, scope_name: str = None):
"""Returns all documents."""
query = Select(ScopeEquipment).options(
selectinload(ScopeEquipment.master_equipment)
)
query = query.order_by(desc(ScopeEquipment.created_at))
if scope_name:
query = query.where(ScopeEquipment.scope_overhaul == scope_name)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, scope_equipment_in: ScopeEquipmentCreate):
"""Creates a new document."""
# scope_equipment = ScopeEquipment(**scope_equipment_in.model_dump())
assetnums = scope_equipment_in.assetnums
results = []
removal_date = scope_equipment_in.removal_date
if scope_equipment_in.type == ScopeEquipmentType.TEMP:
# Search for the next or ongoing overhaul session for the given scope
stmt = (
Select(OverhaulScope.end_date)
.where(
OverhaulScope.type == scope_equipment_in.scope_name,
(OverhaulScope.start_date <= datetime.now())
& (OverhaulScope.end_date >= datetime.now()) # Ongoing
| (OverhaulScope.start_date > datetime.now()), # Upcoming
)
.order_by(OverhaulScope.start_date.asc())
.limit(1)
)
result = await db_session.execute(stmt)
removal_date = result.scalar_one_or_none()
# If no overhaul found, set a default removal date or handle the error
if removal_date is None:
# Handle if no overhaul session is found, set default or raise an error
removal_date = datetime.now() + timedelta(
days=30
) # Example: 30 days from now
for assetnum in assetnums:
stmt = insert(ScopeEquipment).values(
assetnum=assetnum,
scope_overhaul=scope_equipment_in.scope_name,
type=scope_equipment_in.type,
removal_date=removal_date,
)
stmt = stmt.on_conflict_do_nothing(
index_elements=["assetnum", "scope_overhaul"]
)
await db_session.execute(stmt)
results.append(assetnum)
await db_session.commit()
return results
async def update(
*,
db_session: DbSession,
scope_equipment: ScopeEquipment,
scope_equipment_in: ScopeEquipmentUpdate
):
"""Updates a document."""
data = scope_equipment_in.model_dump()
update_data = scope_equipment_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(scope_equipment, field, update_data[field])
await db_session.commit()
return scope_equipment
async def delete(*, db_session: DbSession, assetnum: str):
"""Deletes a document."""
query = Delete(ScopeEquipment).where(ScopeEquipment.assetnum == assetnum)
await db_session.execute(query)
await db_session.commit()
return assetnum
# query = Select(ScopeEquipment).filter(
# ScopeEquipment.id == scope_equipment_id)
# scope_equipment = await db_session.execute(query)
# scope_equipment: ScopeEquipment = scope_equipment.scalars().one_or_none()
# if not scope_equipment:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail="A data with this id does not exist.",
# )
# if not scope_equipment.scope_id:
# await db_session.delete(scope_equipment)
# else:
# if scope_equipment.current_scope_id == scope_equipment.scope_id:
# await db_session.delete(scope_equipment)
# else:
# scope_equipment.current_scope_id = scope_equipment.scope_id
# await db_session.commit()
async def get_by_scope_name(
*, db_session: DbSession, scope_name: Optional[str]
) -> Optional[ScopeEquipment]:
"""Returns a document based on the given document id."""
query = Select(ScopeEquipment).options(
selectinload(ScopeEquipment.master_equipment)
)
if scope_name:
query = query.filter(ScopeEquipment.scope_overhaul == scope_name)
result = await db_session.execute(query)
return result.scalars().all()
# async def get_exculed_scope_name(*, db_session: DbSession, scope_name: Union[str, list]) -> Optional[ScopeEquipment]:
# scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name)
# query = Select(ScopeEquipment)
# if scope:
# query = query.filter(ScopeEquipment.current_scope_id != scope.id)
# else:
# query = query.filter(ScopeEquipment.current_scope_id != None)
# result = await db_session.execute(query)
# return result.scalars().all()
async def get_all_master_equipment(*, common: CommonParameters, scope_name):
equipments_scope = [
equip.assetnum
for equip in await get_by_scope_name(
db_session=common.get("db_session"), scope_name=scope_name
)
]
query = Select(MasterEquipment).filter(MasterEquipment.assetnum.is_not(None))
# Only add not_in filter if there are items in equipments_scope
if equipments_scope:
query = query.filter(MasterEquipment.assetnum.not_in(equipments_scope))
results = await search_filter_sort_paginate(model=query, **common)
return results
async def get_equipment_level_by_no(*, db_session: DbSession, level: int):
query = (
Select(MasterEquipment)
.join(MasterEquipment.equipment_tree)
.where(MasterEquipmentTree.level_no == level)
)
result = await db_session.execute(query)
return result.scalars().all()

@ -1,20 +0,0 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class ScopeEquipmentJob(Base, DefaultMixin):
__tablename__ = "oh_ms_scope_equipment_job"
assetnum = Column(String, nullable=False)
job_id = Column(UUID(as_uuid=True), ForeignKey("oh_ms_job.id", ondelete="cascade"))
job = relationship("MasterActivity", lazy="selectin")
overhaul_jobs = relationship(
"OverhaulJob", back_populates="scope_equipment_job", lazy="selectin"
)

@ -1,51 +0,0 @@
from typing import Dict, List
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .schema import ScopeEquipmentJobCreate, ScopeEquipmentJobPagination
from .service import create, delete, get_all
router = APIRouter()
@router.get("/{assetnum}", response_model=StandardResponse[ScopeEquipmentJobPagination])
async def get_scope_equipment_jobs(
db_session: DbSession, assetnum, common: CommonParameters
):
"""Get all scope activity pagination."""
# return
data = await get_all(db_session=db_session, assetnum=assetnum, common=common)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)
@router.post("/{assetnum}", response_model=StandardResponse[None])
async def create_scope_equipment_jobs(
db_session: DbSession, assetnum, scope_job_in: ScopeEquipmentJobCreate
):
"""Get all scope activity pagination."""
# return
await create(db_session=db_session, assetnum=assetnum, scope_job_in=scope_job_in)
return StandardResponse(
data=None,
message="Data created successfully",
)
@router.delete("/{scope_job_id}", response_model=StandardResponse[None])
async def delete_scope_equipment_job(db_session: DbSession, scope_job_id):
await delete(db_session=db_session, scope_job_id=scope_job_id)
return StandardResponse(
data=None,
message="Data deleted successfully",
)

@ -1,81 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.job.schema import ActivityMasterRead
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
class ScopeEquipmentJobBase(DefultBase):
assetnum: Optional[str] = Field(None, description="Assetnum is required")
class ScopeEquipmentJobCreate(ScopeEquipmentJobBase):
job_ids: Optional[List[UUID]] = []
class ScopeEquipmentJobUpdate(ScopeEquipmentJobBase):
name: Optional[str] = Field(None)
cost: Optional[str] = Field(0)
class OverhaulActivity(DefultBase):
id: UUID
overhaul_scope: ScopeRead
class OverhaulJob(DefultBase):
id: UUID
overhaul_activity: OverhaulActivity
class ScopeEquipmentJobRead(ScopeEquipmentJobBase):
id: UUID
job: ActivityMasterRead
overhaul_jobs: List[OverhaulJob] = []
class ScopeEquipmentJobPagination(Pagination):
items: List[ScopeEquipmentJobRead] = []
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,130 +0,0 @@
import random
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, and_
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.overhaul_activity.model import OverhaulActivity
from src.overhaul_job.model import OverhaulJob
from src.scope_equipment.model import MasterEquipment, MasterEquipmentTree
from src.scope_equipment.service import get_equipment_level_by_no
from .model import ScopeEquipmentJob
from .schema import ScopeEquipmentJobCreate
# async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]:
# """Returns a document based on the given document id."""
# result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# return result
async def get_all(db_session: DbSession, assetnum: Optional[str], common):
# Example usage
if not assetnum:
raise ValueError("assetnum parameter is required")
# First get the parent equipment
equipment_stmt = Select(MasterEquipment).where(MasterEquipment.assetnum == assetnum)
equipment: MasterEquipment = await db_session.scalar(equipment_stmt)
if not equipment:
raise ValueError(f"No equipment found with assetnum: {assetnum}")
# Build query for parts
stmt = (
Select(ScopeEquipmentJob)
.where(ScopeEquipmentJob.assetnum == assetnum)
.options(
selectinload(ScopeEquipmentJob.job),
selectinload(ScopeEquipmentJob.overhaul_jobs)
.selectinload(OverhaulJob.overhaul_activity)
.selectinload(OverhaulActivity.overhaul_scope),
)
)
results = await search_filter_sort_paginate(model=stmt, **common)
return results
async def create(
*, db_session: DbSession, assetnum, scope_job_in: ScopeEquipmentJobCreate
):
scope_jobs = []
if not assetnum:
raise ValueError("assetnum parameter is required")
# First get the parent equipment
equipment_stmt = Select(MasterEquipment).where(MasterEquipment.assetnum == assetnum)
equipment: MasterEquipment = await db_session.scalar(equipment_stmt)
if not equipment:
raise ValueError(f"No equipment found with assetnum: {assetnum}")
for job_id in scope_job_in.job_ids:
scope_equipment_job = ScopeEquipmentJob(assetnum=assetnum, job_id=job_id)
scope_jobs.append(scope_equipment_job)
db_session.add_all(scope_jobs)
await db_session.commit()
return
# async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate):
# """Updates a document."""
# data = scope_equipment_activty_in.model_dump()
# update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(activity, field, update_data[field])
# await db_session.commit()
# return activity
async def delete(
*,
db_session: DbSession,
scope_job_id: int,
) -> bool:
"""
Deletes a scope job and returns success status.
Args:
db_session: Database session
scope_job_id: ID of the scope job to delete
user_id: ID of user performing the deletion
Returns:
bool: True if deletion was successful, False otherwise
Raises:
NotFoundException: If scope job doesn't exist
AuthorizationError: If user lacks delete permission
"""
try:
# Check if job exists
scope_job = await db_session.get(ScopeEquipmentJob, scope_job_id)
if not scope_job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
# Perform deletion
await db_session.delete(scope_job)
await db_session.commit()
return True
except Exception as e:
await db_session.rollback()
raise

@ -1,21 +0,0 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from src.workorder.model import MasterWorkOrder
class ScopeEquipmentPart(Base, DefaultMixin):
__tablename__ = "oh_tr_scope_equipment_part"
assetnum = Column(String, nullable=False)
stock = Column(Integer, nullable=False, default=0)
master_equipments = relationship(
"MasterEquipment",
lazy="raise",
primaryjoin="and_(ScopeEquipmentPart.assetnum == foreign(MasterEquipment.assetnum))",
uselist=False,
)

@ -1,26 +0,0 @@
from typing import Dict, List
from fastapi import APIRouter, HTTPException, Query, status
from src.database.service import (CommonParameters, DbSession,
search_filter_sort_paginate)
from src.models import StandardResponse
from .schema import (ScopeEquipmentActivityCreate,
ScopeEquipmentActivityPagination,
ScopeEquipmentActivityRead, ScopeEquipmentActivityUpdate)
from .service import get_all
router = APIRouter()
@router.get("/{assetnum}", response_model=StandardResponse[List[Dict]])
async def get_scope_equipment_parts(db_session: DbSession, assetnum):
"""Get all scope activity pagination."""
# return
data = await get_all(db_session=db_session, assetnum=assetnum)
return StandardResponse(
data=data,
message="Data retrieved successfully",
)

@ -1,69 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
from src.models import DefultBase, Pagination
class ScopeEquipmentActivityBase(DefultBase):
assetnum: str = Field(..., description="Assetnum is required")
class ScopeEquipmentActivityCreate(ScopeEquipmentActivityBase):
name: str
cost: Optional[float] = Field(0)
class ScopeEquipmentActivityUpdate(ScopeEquipmentActivityBase):
name: Optional[str] = Field(None)
cost: Optional[str] = Field(0)
class ScopeEquipmentActivityRead(ScopeEquipmentActivityBase):
name: str
cost: float
class ScopeEquipmentActivityPagination(Pagination):
items: List[ScopeEquipmentActivityRead] = []
# {
# "overview": {
# "totalEquipment": 30,
# "nextSchedule": {
# "date": "2025-01-12",
# "Overhaul": "B",
# "equipmentCount": 30
# }
# },
# "criticalParts": [
# "Boiler feed pump",
# "Boiler reheater system",
# "Drum Level (Right) Root Valve A",
# "BCP A Discharge Valve",
# "BFPT A EXH Press HI Root VLV"
# ],
# "schedules": [
# {
# "date": "2025-01-12",
# "Overhaul": "B",
# "status": "upcoming"
# }
# // ... other scheduled overhauls
# ],
# "systemComponents": {
# "boiler": {
# "status": "operational",
# "lastOverhaul": "2024-06-15"
# },
# "turbine": {
# "hpt": { "status": "operational" },
# "ipt": { "status": "operational" },
# "lpt": { "status": "operational" }
# }
# // ... other major components
# }
# }

@ -1,104 +0,0 @@
import random
from typing import Optional
from sqlalchemy import Delete, Select, and_
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.scope_equipment.model import MasterEquipment, MasterEquipmentTree
from src.scope_equipment.service import get_equipment_level_by_no
from .model import ScopeEquipmentPart
from .schema import ScopeEquipmentActivityCreate, ScopeEquipmentActivityUpdate
# async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]:
# """Returns a document based on the given document id."""
# result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# return result
def create_dummy_parts(assetnum: str, count: int = 5):
"""
Create a list of dummy ScopeEquipmentPart objects with random stock values.
Args:
assetnum (str): The base asset number to generate dummy parts for.
count (int): The number of parts to create. Default is 5.
Returns:
List[ScopeEquipmentPart]: A list of dummy ScopeEquipmentPart objects.
"""
parts = []
for i in range(1, count + 1):
# Generate a unique part asset number
part_assetnum = f"{assetnum}_PART_{i}"
stock = random.randint(1, 100) # Random stock value between 1 and 100
parts.append({"assetnum": part_assetnum, "stock": stock})
return parts
async def get_all(db_session: DbSession, assetnum: Optional[str]):
# Example usage
dummy_parts = create_dummy_parts(assetnum, count=10)
# if not assetnum:
# raise ValueError("assetnum parameter is required")
# db_session: DbSession = common.get("db_session")
# # First get the parent equipment
# equipment_stmt = Select(MasterEquipment).where(
# MasterEquipment.assetnum == assetnum)
# equipment: MasterEquipment = await db_session.scalar(equipment_stmt)
# if not equipment:
# raise ValueError(f"No equipment found with assetnum: {assetnum}")
# # Build query for parts
# stmt = (
# Select(ScopeEquipmentPart)
# .join(ScopeEquipmentPart.master_equipments)
# .join(MasterEquipment.equipment_tree)
# .where(
# and_(
# MasterEquipment.parent_id == equipment.id,
# MasterEquipmentTree.level_no == 4
# )
# ).options(selectinload(ScopeEquipmentPart.master_equipments))
# )
# results = await search_filter_sort_paginate(model=stmt, **common)
return dummy_parts
# async def create(*, db_session: DbSession, scope_equipment_activty_in: ScopeEquipmentActivityCreate):
# activity = ScopeEquipmentActivity(
# **scope_equipment_activty_in.model_dump())
# db_session.add(activity)
# await db_session.commit()
# return activity
# async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate):
# """Updates a document."""
# data = scope_equipment_activty_in.model_dump()
# update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(activity, field, update_data[field])
# await db_session.commit()
# return activity
# async def delete(*, db_session: DbSession, scope_equipment_activity_id: str):
# """Deletes a document."""
# activity = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
# await db_session.delete(activity)
# await db_session.commit()

@ -1,122 +0,0 @@
import re
from datetime import datetime, timedelta, timezone
from typing import Optional
import pytz
from dateutil.relativedelta import relativedelta
from src.config import TIMEZONE
def parse_relative_expression(date_str: str) -> Optional[datetime]:
"""
Parse relative date expressions using T (days), M (months), and Y (years)
Returns tuple of (datetime, type_description) or None if not a relative date
"""
pattern = r"^([HTMY])([+-]\d+)?$"
match = re.match(pattern, date_str)
if not match:
return None
unit, offset = match.groups()
offset = int(offset) if offset else 0
# Use UTC timezone for consistency
today = datetime.now(timezone.tzname("Asia/Jakarta"))
if unit == "H":
# For hours, keep minutes and seconds
result_time = today + timedelta(hours=offset)
return result_time
elif unit == "T":
return today + timedelta(days=offset)
elif unit == "M":
return today + relativedelta(months=offset)
elif unit == "Y":
return today + relativedelta(years=offset)
def parse_date_string(date_str: str) -> Optional[datetime]:
"""
Parse date strings in various formats including relative expressions
Returns tuple of (datetime, type)
"""
# Try parsing as relative expression first
relative_result = parse_relative_expression(date_str)
if relative_result:
return relative_result
# Try different date formats
date_formats = [
("%Y-%m-%d", "iso"), # 2024-11-08
("%Y/%m/%d", "slash"), # 2024/11/08
("%d-%m-%Y", "european"), # 08-11-2024
("%d/%m/%Y", "european_slash"), # 08/11/2024
("%Y.%m.%d", "dot"), # 2024.11.08
("%d.%m.%Y", "european_dot"), # 08.11.2024
]
for fmt, type_name in date_formats:
try:
# Parse the date and set it to start of day in UTC
dt = datetime.strptime(date_str, fmt)
dt = dt.replace(
hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=timezone.tzname("Asia/Jakarta"),
)
return dt
except ValueError:
continue
raise ValueError(
"Invalid date format. Supported formats:\n"
"Relative formats:\n"
"- T (days): T, T-n, T+n\n"
"- M (months): M, M-1, M+2\n"
"- Y (years): Y, Y-1, Y+1\n"
"Regular formats:\n"
"- YYYY-MM-DD\n"
"- YYYY/MM/DD\n"
"- DD-MM-YYYY\n"
"- DD/MM/YYYY\n"
"- YYYY.MM.DD\n"
"- DD.MM.YYYY"
)
def time_now():
return datetime.now(pytz.timezone(TIMEZONE))
import requests
def get_latest_numOfFail(location_tag, token) -> float:
today = datetime.today().strftime("%Y-%m-%d")
url_today = f"http://192.168.1.82:8000/reliability/main/number-of-failures/{location_tag}/2016-01-01/{today}"
try:
response = requests.get(
url_today,
{
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
)
data = response.json()
raise Exception(data)
latest_num = data["data"][-1]["num_fail"]
raise Exception(latest_num)
if not latest_num:
latest_num = 0
return latest_num
except requests.exceptions.RequestException as e:
print(f"Error fetching data: {e}")
return 0

@ -1,20 +0,0 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class MasterWorkOrder(Base, DefaultMixin):
__tablename__ = "oh_wo_master"
assetnum = Column(String, nullable=True)
worktype = Column(String, nullable=True)
workgroup = Column(String, nullable=True)
total_cost_max = Column(Float, nullable=True)
scope_equipments = relationship(
"ScopeEquipment",
lazy="raise",
primaryjoin="and_(MasterWorkOrder.assetnum == foreign(ScopeEquipment.assetnum))",
)
Loading…
Cancel
Save