fix minor

main
Cizz22 11 months ago
parent 1ffb7226d9
commit 73f1752a37

@ -17,6 +17,10 @@ class MasterActivity(Base, DefaultMixin):
# lazy="raise", # lazy="raise",
# primaryjoin="and_(MasterActivity.id == foreign(MasterActivityDetail.activity_id))", # primaryjoin="and_(MasterActivity.id == foreign(MasterActivityDetail.activity_id))",
# ) # )
overhaul_jobs = relationship(
"OverhaulJob", lazy="raise", back_populates="job"
)
# class MasterActivityDetail(Base, DefaultMixin): # class MasterActivityDetail(Base, DefaultMixin):

@ -37,21 +37,77 @@ async def get_overhaul_schedules(*, db_session: DbSession):
def get_overhaul_system_components(): def get_overhaul_system_components():
"""Get all overhaul system components.""" """Get all overhaul system components with dummy data."""
return { return {
"boiler": { "HPT": {
"efficiency": "92%",
"work_hours": "1200",
"reliability": "96%",
},
"IPT": {
"efficiency": "91%",
"work_hours": "1100",
"reliability": "95%",
},
"LPT": {
"efficiency": "90%", "efficiency": "90%",
"work_hours": "1000", "work_hours": "1000",
"reliability": "95%", "reliability": "94%",
}, },
"HPT": { "EG": {
"efficiency": "88%",
"work_hours": "950",
"reliability": "93%",
},
"Boiler": {
"efficiency": "90%", "efficiency": "90%",
"work_hours": "1000", "work_hours": "1000",
"reliability": "95%", "reliability": "95%",
} },
"HPH 1": {
"efficiency": "89%",
"work_hours": "1050",
"reliability": "94%",
},
"HPH 2": {
"efficiency": "88%",
"work_hours": "1020",
"reliability": "93%",
},
"HPH 3": {
"efficiency": "87%",
"work_hours": "1010",
"reliability": "92%",
},
"HPH 5": {
"efficiency": "86%",
"work_hours": "980",
"reliability": "91%",
},
"HPH 6": {
"efficiency": "85%",
"work_hours": "970",
"reliability": "90%",
},
"HPH 7": {
"efficiency": "84%",
"work_hours": "960",
"reliability": "89%",
},
"Condenser": {
"efficiency": "83%",
"work_hours": "940",
"reliability": "88%",
},
"Deaerator": {
"efficiency": "82%",
"work_hours": "930",
"reliability": "87%",
},
} }
# async def get(*, db_session: DbSession, scope_id: str) -> Optional[Scope]: # async def get(*, db_session: DbSession, scope_id: str) -> Optional[Scope]:
# """Returns a document based on the given document id.""" # """Returns a document based on the given document id."""
# query = Select(Scope).filter(Scope.id == scope_id) # query = Select(Scope).filter(Scope.id == scope_id)

@ -0,0 +1,20 @@
from sqlalchemy import Column, DateTime, Float, Integer, String, UUID, ForeignKey
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
from sqlalchemy.orm import relationship
class OverhaulJob(Base, DefaultMixin):
__tablename__ = "oh_tr_overhaul_job"
overhaul_activity_id = Column(UUID(as_uuid=True), ForeignKey(
"oh_ms_overhaul_activity.id"), nullable=False)
job_id = Column(UUID(as_uuid=True), ForeignKey(
"oh_ms_job.id", ondelete="cascade"))
notes = Column(String, nullable=True)
status = Column(String, nullable=True, default="pending")
job = relationship(
"MasterActivity", lazy="raise", back_populates="overhaul_jobs"
)

@ -0,0 +1,73 @@
from typing import Optional
from fastapi import APIRouter, HTTPException, status
from .model import OverhaulScope
from .schema import ScopeCreate, ScopeRead, ScopeUpdate, ScopePagination
from .service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[ScopePagination])
async def get_scopes(common: CommonParameters, scope_name: Optional[str] = None):
"""Get all scope pagination."""
# return
results = await get_all(common=common, scope_name=scope_name)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.get("/{overhaul_session_id}", response_model=StandardResponse[ScopeRead])
async def get_scope(db_session: DbSession, overhaul_session_id: str):
scope = await get(db_session=db_session, overhaul_session_id=overhaul_session_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=scope, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[ScopeRead])
async def create_scope(db_session: DbSession, scope_in: ScopeCreate):
scope = await create(db_session=db_session, scope_in=scope_in)
return StandardResponse(data=scope, message="Data created successfully")
@router.put("/{scope_id}", response_model=StandardResponse[ScopeRead])
async def update_scope(db_session: DbSession, scope_id: str, scope_in: ScopeUpdate, current_user: CurrentUser):
scope = await get(db_session=db_session, scope_id=scope_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=await update(db_session=db_session, scope=scope, scope_in=scope_in), message="Data updated successfully")
@router.delete("/{scope_id}", response_model=StandardResponse[ScopeRead])
async def delete_scope(db_session: DbSession, scope_id: str):
scope = await get(db_session=db_session, scope_id=scope_id)
if not scope:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, scope_id=scope_id)
return StandardResponse(message="Data deleted successfully", data=scope)

@ -0,0 +1,33 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
class ScopeBase(DefultBase):
duration_oh: Optional[int] = Field(None, title="Duration OH")
crew_number: Optional[int] = Field(1, title="Crew")
status: Optional[str] = Field("Upcoming")
type: str
class ScopeCreate(ScopeBase):
start_date: datetime
end_date: Optional[datetime] = Field(None)
class ScopeUpdate(ScopeBase):
pass
class ScopeRead(ScopeBase):
id: UUID
start_date: datetime
end_date: Optional[datetime]
class ScopePagination(Pagination):
items: List[ScopeRead] = []

@ -0,0 +1,174 @@
from sqlalchemy import Select, Delete, func
from src.database.service import search_filter_sort_paginate
from src.overhaul_activity.model import OverhaulActivity
from src.scope_equipment.service import get_by_scope_name
from src.utils import time_now
from .model import OverhaulScope
from .schema import ScopeCreate, ScopeUpdate
from .utils import get_material_cost, get_service_cost
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
async def get(*, db_session: DbSession, overhaul_session_id: str) -> Optional[OverhaulScope]:
"""Returns a document based on the given document id."""
query = Select(OverhaulScope).filter(
OverhaulScope.id == overhaul_session_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(*, common, scope_name: Optional[str] = None):
"""Returns all documents."""
query = Select(OverhaulScope)
if scope_name:
query = query.filter(OverhaulScope.type == scope_name)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(*, db_session: DbSession, scope_in: ScopeCreate):
"""Creates a new document."""
overhaul_session = OverhaulScope(**scope_in.model_dump())
db_session.add(overhaul_session)
# Need to flush to get the id
await db_session.flush()
scope_name = scope_in.type
# Fix the function call - parameters were in wrong order
equipments = await get_by_scope_name(
db_session=db_session,
scope_name=scope_name
)
material_cost = get_material_cost(scope=overhaul_session.type, total_equipment=len(equipments))
service_cost = get_service_cost(scope=overhaul_session.type, total_equipment=len(equipments))
scope_equipments = [
OverhaulActivity(
assetnum=equipment.assetnum,
overhaul_scope_id=overhaul_session.id,
material_cost=material_cost,
service_cost=service_cost,
)
for equipment in equipments
]
if scope_equipments: # Only add if there are items
db_session.add_all(scope_equipments)
await db_session.commit()
return overhaul_session
async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
"""Updates a document."""
data = scope_in.model_dump()
update_data = scope_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(scope, field, update_data[field])
await db_session.commit()
return scope
async def delete(*, db_session: DbSession, scope_id: str):
"""Deletes a document."""
query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
await db_session.execute(query)
await db_session.commit()
async def get_overview_overhaul(*, db_session: DbSession):
current_date = time_now().date()
# For ongoing overhaul with count
ongoing_query = Select(
OverhaulScope,
func.count(OverhaulActivity.id).label('equipment_count')
).outerjoin(
OverhaulScope.activity_equipments
).where(
OverhaulScope.start_date <= current_date,
OverhaulScope.end_date >= current_date,
).group_by(
OverhaulScope.id
)
ongoing_result = await db_session.execute(ongoing_query)
# Use first() instead of scalar_one_or_none()
ongoing_result = ongoing_result.first()
if ongoing_result:
ongoing_overhaul, equipment_count = ongoing_result # Unpack the result tuple
return {
"status": "Ongoing",
"overhaul": {
"id": ongoing_overhaul.id,
"type": ongoing_overhaul.type,
"start_date": ongoing_overhaul.start_date,
"end_date": ongoing_overhaul.end_date,
"duration_oh": ongoing_overhaul.duration_oh,
"crew_number": ongoing_overhaul.crew_number,
"remaining_days": (ongoing_overhaul.end_date - current_date).days,
"equipment_count": equipment_count
}
}
# For upcoming overhaul with count
upcoming_query = Select(
OverhaulScope,
func.count(OverhaulActivity.id).label('equipment_count')
).outerjoin(
OverhaulScope.activity_equipments
).where(
OverhaulScope.start_date > current_date,
).group_by(
OverhaulScope.id
).order_by(
OverhaulScope.start_date
)
upcoming_result = await db_session.execute(upcoming_query)
upcoming_result = upcoming_result.first()
if upcoming_result:
upcoming_overhaul, equipment_count = upcoming_result # Unpack the result tuple
days_until = (upcoming_overhaul.start_date - current_date).days
return {
"status": "Upcoming",
"overhaul": {
"id": upcoming_overhaul.id,
"type": upcoming_overhaul.type,
"start_date": upcoming_overhaul.start_date,
"end_date": upcoming_overhaul.end_date,
"duration_oh": upcoming_overhaul.duration_oh,
"crew_number": upcoming_overhaul.crew_number,
"remaining_days": days_until,
"equipment_count": equipment_count
}
}
return {
"status": "no_overhaul",
"overhaul": None
}

@ -0,0 +1,33 @@
from decimal import Decimal, getcontext
def get_material_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == 'B':
result = Decimal('365539731101') / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal('8565468127') / Decimal(str(total_equipment))
return float(result)
return float(0)
def get_service_cost(scope, total_equipment):
# Set precision to 28 digits (maximum precision for Decimal)
getcontext().prec = 28
if not total_equipment: # Guard against division by zero
return float(0)
if scope == 'B':
result = Decimal('36405830225') / Decimal(str(total_equipment))
return float(result)
else:
result = Decimal('36000000000') / Decimal(str(total_equipment))
return float(result)
return float(0)
Loading…
Cancel
Save