add new schedule api

feature/reliability_stat
Cizz22 9 months ago
parent 00e3e4f4ca
commit 1f918ad2c6

@ -18,6 +18,7 @@ from src.overhaul_job.router import router as job_overhaul_router
from src.overhaul_scope.router import router as scope_router
from src.scope_equipment.router import router as scope_equipment_router
from src.scope_equipment_job.router import router as scope_equipment_job_router
from src.overhaul_schedule.router import router as ovehaul_schedule_router
# from src.overhaul_scope.router import router as scope_router
# from src.scope_equipment.router import router as scope_equipment_router
# from src.overhaul.router import router as overhaul_router
@ -84,6 +85,12 @@ authenticated_api_router.include_router(
tags=["scope_equipment", "job"],
)
authenticated_api_router.include_router(
ovehaul_schedule_router,
prefix="/overhaul-schedules",
tags=["overhaul_schedule"],
)
authenticated_api_router.include_router(
job_overhaul_router, prefix="/overhaul-jobs", tags=["job", "overhaul"]
)

@ -92,42 +92,4 @@ async def get_all_budget_constrains(
included_results.sort(key=lambda x: x["eaf_contribution"], reverse=True)
return included_results, consequence_results
# """Get all overhaul overview with EAF values that sum to 100%."""
# # equipments = await get_by_scope_name(db_session=db_session, scope_name=scope_name)
# equipments = await get_all_by_session_id(db_session=db_session, overhaul_session_id=session_id)
# # If no equipments found, return empty list
# if not equipments:
# return []
# # Create result array of dictionaries
# result = [
# {
# "id": equipment.id,
# "assetnum": equipment.assetnum,
# "location_tag": equipment.equipment.location_tag,
# "name": equipment.equipment.name,
# "total_cost": equipment.material_cost + equipment.service_cost
# "eaf_contribution": ## Create Dummy % number, each equipment has different value
# }
# for equipment in equipments
# ]
# result.sort(key=lambda x: x["eaf_contribution"], reverse=True) #Sort from biggest contribution
# # Filter equipment up to threshold
# cumulative_cost = 0
# included_results = []
# for equipment in result:
# cumulative_cost += equipment["total_cost"]
# if cumulative_cost >= cost_threshold:
# break
# included_results.append(equipment)
# # rest equipemnt is consequence list
# consequence_results = result[len(included_results):]
# return included_results ,consequence_results
#

@ -0,0 +1,18 @@
from sqlalchemy import (UUID, Column, DateTime, Float, ForeignKey, Integer,
String)
from sqlalchemy.orm import relationship
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
class OverhaulSchedule(Base, DefaultMixin):
__tablename__ = "rp_oh_schedule"
year = Column(Integer, nullable=False)
plan_duration = Column(Integer, nullable=True)
planned_outage = Column(Integer, nullable=True)
actual_outage = Column(Integer, nullable=True)
start = Column(DateTime, nullable=True)
finish = Column(DateTime, nullable=True)
remark = Column(String, nullable=True)

@ -0,0 +1,52 @@
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import CommonParameters
from src.models import StandardResponse
from .schema import (OverhaulScheduleCreate, OverhaulSchedulePagination)
from .service import create, get_all
router = APIRouter()
@router.get(
"/", response_model=StandardResponse[OverhaulSchedulePagination]
)
async def get_schedules(common: CommonParameters):
"""Get all scope pagination."""
# return
results = await get_all(common=common)
return StandardResponse(
data=results,
message="Data retrieved successfully",
)
@router.post("/", response_model=StandardResponse[None])
async def create_overhaul_equipment_jobs(
db_session: DbSession, overhaul_job_in: OverhaulScheduleCreate
):
await create(
db_session=db_session,
overhaul_job_in=overhaul_job_in,
)
return StandardResponse(
data=None,
message="Data created successfully",
)
# @router.delete("/{overhaul_job_id}", response_model=StandardResponse[None])
# async def delete_overhaul_equipment_job(db_session: DbSession, overhaul_job_id):
# await delete(db_session=db_session, overhaul_job_id=overhaul_job_id)
# return StandardResponse(
# data=None,
# message="Data deleted successfully",
# )

@ -0,0 +1,43 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
from src.overhaul_scope.schema import ScopeRead
from src.scope_equipment_job.schema import ScopeEquipmentJobRead
from src.job.schema import ActivityMasterRead
class OverhaulScheduleBase(DefultBase):
pass
class OverhaulScheduleCreate(OverhaulScheduleBase):
year: int
plan_duration: Optional[int] = None
planned_outage: Optional[int] = None
actual_outage: Optional[int] = None
start: datetime
finish: datetime
remark: Optional[str]
class OverhaulScheduleUpdate(OverhaulScheduleBase):
pass
class OverhaulScheduleRead(OverhaulScheduleBase):
id: UUID
year: int
plan_duration: Optional[int] = None
planned_outage: Optional[int] = None
actual_outage: Optional[int] = None
start: datetime
finish: datetime
remark: Optional[str]
class OverhaulSchedulePagination(Pagination):
items: List[OverhaulScheduleRead] = []

@ -0,0 +1,93 @@
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy import Delete, Select, func
from sqlalchemy.orm import selectinload
from src.auth.service import CurrentUser
from src.database.core import DbSession
from src.database.service import search_filter_sort_paginate
from src.scope_equipment_job.model import ScopeEquipmentJob
from src.overhaul_activity.model import OverhaulActivity
from .model import OverhaulSchedule
from .schema import OverhaulScheduleCreate
async def get_all(*, common):
"""Returns all documents."""
query = Select(OverhaulSchedule).order_by(OverhaulSchedule.start)
results = await search_filter_sort_paginate(model=query, **common)
return results
async def create(
*, db_session: DbSession, overhaul_job_in: OverhaulScheduleCreate
):
schedule = OverhaulSchedule(**overhaul_job_in.model_dump())
db_session.add(schedule)
await db_session.commit()
return schedule
# async def delete(
# *,
# db_session: DbSession,
# overhaul_job_id: str,
# ) -> bool:
# """
# Deletes a scope job and returns success status.
# Args:
# db_session: Database session
# scope_job_id: ID of the scope job to delete
# user_id: ID of user performing the deletion
# Returns:
# bool: True if deletion was successful, False otherwise
# Raises:
# NotFoundException: If scope job doesn't exist
# AuthorizationError: If user lacks delete permission
# """
# try:
# # Check if job exists
# scope_job = await db_session.get(OverhaulJob, overhaul_job_id)
# if not scope_job:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail="A data with this id does not exist.",
# )
# # Perform deletion
# await db_session.delete(scope_job)
# await db_session.commit()
# return True
# except Exception as e:
# await db_session.rollback()
# raise
# async def update(*, db_session: DbSession, scope: OverhaulScope, scope_in: ScopeUpdate):
# """Updates a document."""
# data = scope_in.model_dump()
# update_data = scope_in.model_dump(exclude_defaults=True)
# for field in data:
# if field in update_data:
# setattr(scope, field, update_data[field])
# await db_session.commit()
# return scope
# async def delete(*, db_session: DbSession, scope_id: str):
# """Deletes a document."""
# query = Delete(OverhaulScope).where(OverhaulScope.id == scope_id)
# await db_session.execute(query)
# await db_session.commit()
Loading…
Cancel
Save