From 9ca17db2ab22aa1f3c563b1c0c4d49845e2b593e Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Fri, 29 Nov 2024 14:35:49 +0700 Subject: [PATCH] update scope equipment --- Dockerfile | 2 +- src/database/service.py | 9 +- src/models.py | 1 + src/scope_equipment/model.py | 28 ++++- src/scope_equipment/router.py | 10 +- src/scope_equipment/schema.py | 12 +- src/scope_equipment/service.py | 24 +++- {temporal => src/workorder}/__init__.py | 0 src/workorder/model.py | 21 ++++ temporal/config.py | 40 ------- temporal/connectors/__init__.py | 0 temporal/connectors/database.py | 104 ----------------- temporal/run.py | 39 ------- temporal/run_migration.py | 20 ---- temporal/transformation/__init__.py | 0 temporal/transformation/wo_transform.py | 48 -------- temporal/workflows/__init__.py | 0 temporal/workflows/historical_migration.py | 123 --------------------- 18 files changed, 92 insertions(+), 389 deletions(-) rename {temporal => src/workorder}/__init__.py (100%) create mode 100644 src/workorder/model.py delete mode 100644 temporal/config.py delete mode 100644 temporal/connectors/__init__.py delete mode 100644 temporal/connectors/database.py delete mode 100644 temporal/run.py delete mode 100644 temporal/run_migration.py delete mode 100644 temporal/transformation/__init__.py delete mode 100644 temporal/transformation/wo_transform.py delete mode 100644 temporal/workflows/__init__.py delete mode 100644 temporal/workflows/historical_migration.py diff --git a/Dockerfile b/Dockerfile index 429288e..750c014 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,7 +41,7 @@ COPY . /app/ RUN rm -rf /app/tests/ # Expose port for the application -EXPOSE 3005 +EXPOSE 3000 # Set the working directory WORKDIR /app diff --git a/src/database/service.py b/src/database/service.py index f54d438..81490fa 100644 --- a/src/database/service.py +++ b/src/database/service.py @@ -93,12 +93,17 @@ async def search_filter_sort_paginate( ): """Common functionality for searching, filtering, sorting, and pagination.""" # try: - query = Select(model) + ## Check if model is Select + if not isinstance(model, Select): + query = Select(model) + else: + query = model if query_str: sort = False if sort_by else True query = search(query_str=query_str, query=query, model=model, sort=sort) + # Get total count count_query = Select(func.count()).select_from(query.subquery()) @@ -109,6 +114,7 @@ async def search_filter_sort_paginate( .offset((page - 1) * items_per_page) .limit(items_per_page) ) + result = await db_session.execute(query) items = result.scalars().all() @@ -130,4 +136,5 @@ async def search_filter_sort_paginate( "itemsPerPage": items_per_page, "page": page, "total": total, + "totalPages": (total + items_per_page - 1) // items_per_page, } diff --git a/src/models.py b/src/models.py index 68ae20e..1e34428 100644 --- a/src/models.py +++ b/src/models.py @@ -67,6 +67,7 @@ class DefultBase(BaseModel): class Pagination(DefultBase): itemsPerPage: int + totalPages: int page: int total: int diff --git a/src/scope_equipment/model.py b/src/scope_equipment/model.py index d636246..89c68b8 100644 --- a/src/scope_equipment/model.py +++ b/src/scope_equipment/model.py @@ -3,12 +3,36 @@ from sqlalchemy import UUID, Column, Float, Integer, String, ForeignKey from src.database.core import Base from src.models import DefaultMixin, IdentityMixin, TimeStampMixin from sqlalchemy.orm import relationship +from src.workorder.model import MasterWorkOrder +from sqlalchemy.ext.hybrid import hybrid_property class ScopeEquipment(Base, DefaultMixin): __tablename__ = "oh_scope_equip" assetnum = Column(String, nullable=True) - scope_id = Column(UUID(as_uuid=True), ForeignKey('oh_scope.id'), nullable=False) + scope_id = Column(UUID(as_uuid=True), ForeignKey( + 'oh_scope.id'), nullable=False) + + scope = relationship("Scope", backref="scope_equipments", lazy="raise") + master_equipment = relationship( + "MasterEquipment", + lazy="raise", + primaryjoin="and_(ScopeEquipment.assetnum == foreign(MasterEquipment.assetnum))", + uselist=False # Add this if it's a one-to-one relationship + ) + work_orders = relationship("MasterWorkOrder", lazy="selectin", + primaryjoin="and_(ScopeEquipment.assetnum == foreign(MasterWorkOrder.assetnum))") - scope = relationship("Scope", backref="scope_equipments", lazy="selectin") + @hybrid_property + def total_cost(self): + return sum(wo.total_cost_max for wo in self.work_orders if wo.total_cost_max) + + +class MasterEquipment(Base, DefaultMixin): + __tablename__ = "ms_equipment_master" + + assetnum = Column(String, nullable=True) + system_tag = Column(String, nullable=True) + location_tag = Column(String, nullable=True) + name = Column(String, nullable=True) diff --git a/src/scope_equipment/router.py b/src/scope_equipment/router.py index f20820b..9b909d3 100644 --- a/src/scope_equipment/router.py +++ b/src/scope_equipment/router.py @@ -16,14 +16,18 @@ router = APIRouter() @router.get("", response_model=StandardResponse[ScopeEquipmentPagination]) -async def get_scope_equipments(common: CommonParameters): +async def get_scope_equipments(common: CommonParameters, scope_name:str = Query(None), exclude: bool = Query(False)): """Get all scope pagination.""" # return + data = await get_all(db_session=common["db_session"], common=common, scope_name=scope_name, exclude=exclude) + + return StandardResponse( - data=await search_filter_sort_paginate(model=ScopeEquipment, **common), + data=data, message="Data retrieved successfully", ) + @router.get("/scope/{scope_name}", response_model=StandardResponse[List[ScopeEquipmentRead]]) async def get_scope_name(db_session: DbSession, scope_name: str, exclude: bool = Query(False)): if exclude: @@ -77,5 +81,3 @@ async def delete_scope_equipment(db_session: DbSession, scope_equipment_id: str) await delete(db_session=db_session, scope_equipment_id=scope_equipment_id) return StandardResponse(message="Data deleted successfully", data=scope_equipment) - - diff --git a/src/scope_equipment/schema.py b/src/scope_equipment/schema.py index 052e2c8..c3d4c39 100644 --- a/src/scope_equipment/schema.py +++ b/src/scope_equipment/schema.py @@ -3,10 +3,14 @@ from datetime import datetime from typing import List, Optional from uuid import UUID -from pydantic import Field +from pydantic import Field, computed_field, field_validator, validator from src.models import DefultBase, Pagination +from src.scope.schema import ScopeRead - +class MasterEquipmentBase(DefultBase): + name: Optional[str] = Field(None, title="Name") + location_tag: Optional[str] = Field(None, title="Location Tag") + class ScopeEquipmentBase(DefultBase): scope_id: Optional[UUID] = Field(None, title="Scope ID") @@ -22,7 +26,11 @@ class ScopeEquipmentUpdate(ScopeEquipmentBase): class ScopeEquipmentRead(ScopeEquipmentBase): id: UUID assetnum: str + scope: ScopeRead + master_equipment: MasterEquipmentBase + total_cost: float + class ScopeEquipmentPagination(Pagination): items: List[ScopeEquipmentRead] = [] diff --git a/src/scope_equipment/service.py b/src/scope_equipment/service.py index c15919e..be5e063 100644 --- a/src/scope_equipment/service.py +++ b/src/scope_equipment/service.py @@ -1,10 +1,14 @@ -from sqlalchemy import Select, Delete +from sqlalchemy import Select, Delete, desc, func + +from src.workorder.model import MasterWorkOrder from .model import ScopeEquipment from src.scope.service import get_by_scope_name as get_scope_by_name_service from .schema import ScopeEquipmentCreate, ScopeEquipmentUpdate from typing import Optional, Union +from sqlalchemy.orm import selectinload +from src.database.service import CommonParameters, search_filter_sort_paginate from src.database.core import DbSession from src.auth.service import CurrentUser @@ -18,11 +22,21 @@ async def get(*, db_session: DbSession, scope_equipment_id: str) -> Optional[Sco return result.scalars().one_or_none() -async def get_all(*, db_session: DbSession): +async def get_all(*, db_session: DbSession, common, scope_name: str = None, exclude: bool = False): """Returns all documents.""" - query = Select(ScopeEquipment) - result = await db_session.execute(query) - return result.scalars().all() + query = Select(ScopeEquipment).options(selectinload( + ScopeEquipment.scope), selectinload(ScopeEquipment.master_equipment)) + + + query = query.order_by(desc(ScopeEquipment.created_at)) + + if scope_name: + scope = await get_scope_by_name_service(db_session=db_session, scope_name=scope_name) + query = query.filter(ScopeEquipment.scope_id == scope.id) if not exclude else query.filter( + ScopeEquipment.scope_id != scope.id) + + results = await search_filter_sort_paginate(model=query, **common) + return results async def create(*, db_session: DbSession, scope_equipment_in: ScopeEquipmentCreate): diff --git a/temporal/__init__.py b/src/workorder/__init__.py similarity index 100% rename from temporal/__init__.py rename to src/workorder/__init__.py diff --git a/src/workorder/model.py b/src/workorder/model.py new file mode 100644 index 0000000..148add9 --- /dev/null +++ b/src/workorder/model.py @@ -0,0 +1,21 @@ + + +from sqlalchemy import UUID, Column, Float, Integer, String, ForeignKey +from src.database.core import Base +from src.models import DefaultMixin, IdentityMixin, TimeStampMixin +from sqlalchemy.orm import relationship + + +from src.models import DefaultMixin + + +class MasterWorkOrder(Base, DefaultMixin): + __tablename__ = "oh_wo_master" + + assetnum = Column(String, nullable=True) + worktype = Column(String, nullable=True) + workgroup = Column(String, nullable=True) + total_cost_max = Column(Float, nullable=True) + + + scope_equipments = relationship("ScopeEquipment", lazy="raise", primaryjoin="and_(MasterWorkOrder.assetnum == foreign(ScopeEquipment.assetnum))") diff --git a/temporal/config.py b/temporal/config.py deleted file mode 100644 index a3e9a01..0000000 --- a/temporal/config.py +++ /dev/null @@ -1,40 +0,0 @@ -from connectors.database import DBConfig -from starlette.config import Config -import os - - -def get_config(): - try: - # Try to load from .env file first - config = Config(".env") - except FileNotFoundError: - # If .env doesn't exist, use environment variables - config = Config(environ=os.environ) - - return config - - -env = get_config() - -config = { - 'batch_size': 1000, - 'target_table': 'oh_wo_master', - 'columns': ['assetnum', 'worktype', 'workgroup', 'total_cost_max', 'created_at'], -} - -target_config = DBConfig( - host=env("DATABASE_HOSTNAME"), - port=env("DATABASE_PORT"), - database=env("DATABASE_NAME"), - user=env("DATABASE_CREDENTIAL_USER"), - password=env("DATABASE_CREDENTIAL_PASSWORD") -) - - -source_config = DBConfig( - host=env("COLLECTOR_HOSTNAME"), - port=env("COLLECTOR_PORT"), - database=env("COLLECTOR_NAME"), - user=env("COLLECTOR_CREDENTIAL_USER"), - password=env("COLLECTOR_CREDENTIAL_PASSWORD") -) diff --git a/temporal/connectors/__init__.py b/temporal/connectors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/temporal/connectors/database.py b/temporal/connectors/database.py deleted file mode 100644 index 6039a25..0000000 --- a/temporal/connectors/database.py +++ /dev/null @@ -1,104 +0,0 @@ -# src/connectors/database.py -from dataclasses import dataclass -from typing import List, Dict, Any, Generator -import pandas as pd -from sqlalchemy import create_engine, text -from sqlalchemy.engine import Engine -import logging -import os -from starlette.config import Config - -logger = logging.getLogger(__name__) - - - -@dataclass -class DBConfig: - host: str - port: int - database: str - user: str - password: str - - def get_connection_string(self) -> str: - return f"postgresql+psycopg2://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" - - -class DatabaseConnector: - def __init__(self, source_config: DBConfig, target_config: DBConfig): - self.source_engine = create_engine( - source_config.get_connection_string()) - self.target_engine = create_engine( - target_config.get_connection_string()) - - def fetch_batch( - self, - batch_size: int, - last_id: int = 0, - columns: List[str] = None - ) -> pd.DataFrame: - """ - Fetch a batch of data from source database - """ - try: - query = """ - SELECT {} - FROM dl_wo_staging - WHERE id > :last_id - AND worktype IN ('PM', 'CM', 'EM', 'PROACTIVE') - ORDER BY id - LIMIT :batch_size - """.format(', '.join(columns) if columns else '*') - - # Execute query - params = { - "last_id": last_id, - "batch_size": batch_size - } - - df = pd.read_sql( - text(query), - self.source_engine, - params=params - ) - - return df - - except Exception as e: - logger.error(f"Error fetching batch: {str(e)}") - raise - - def load_batch( - self, - df: pd.DataFrame, - target_table: str, - if_exists: str = 'append' - ) -> bool: - """ - Load a batch of data to target database - """ - try: - df.to_sql( - target_table, - self.target_engine, - if_exists=if_exists, - index=False, - method='multi', - chunksize=1000 - ) - return True - except Exception as e: - logger.error(f"Error loading batch: {str(e)}") - raise - - def get_total_records(self) -> int: - """Get total number of records to migrate""" - try: - with self.source_engine.connect() as conn: - result = conn.execute(text("SELECT COUNT(*) FROM sensor_data")) - return result.scalar() - except Exception as e: - logger.error(f"Error getting total records: {str(e)}") - raise - - diff --git a/temporal/run.py b/temporal/run.py deleted file mode 100644 index 85c4cec..0000000 --- a/temporal/run.py +++ /dev/null @@ -1,39 +0,0 @@ -# src/scripts/run_migration.py -import asyncio -from temporalio.client import Client -from temporalio.worker import Worker -from workflows.historical_migration import DataMigrationWorkflow, fetch_data_activity, transform_data_activity, validate_data_activity, load_data_activity - - -async def main(): - # Create Temporal client - client = await Client.connect("192.168.1.82:7233") - - # Create worker - worker = Worker( - client, - task_queue="migration-queue", - workflows=[DataMigrationWorkflow], - activities=[ - fetch_data_activity, - transform_data_activity, - validate_data_activity, - load_data_activity - ] - ) - - # Start worker - await worker.run() - - # # Start workflow - # handle = await client.start_workflow( - # DataMigrationWorkflow.run, - # id="data-migration", - # task_queue="migration-queue" - # ) - - # result = await handle.result() - # print(f"Migration completed: {result}") - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/temporal/run_migration.py b/temporal/run_migration.py deleted file mode 100644 index 317789c..0000000 --- a/temporal/run_migration.py +++ /dev/null @@ -1,20 +0,0 @@ -from temporalio.client import Client -from workflows.historical_migration import DataMigrationWorkflow - - -async def run(): - # Start workflow - - client = await Client.connect("192.168.1.82:7233") - - handle = await client.start_workflow( - DataMigrationWorkflow.run, - id="data-migration", - task_queue="migration-queue" - ) - - - -if __name__ == "__main__": - import asyncio - asyncio.run(run()) \ No newline at end of file diff --git a/temporal/transformation/__init__.py b/temporal/transformation/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/temporal/transformation/wo_transform.py b/temporal/transformation/wo_transform.py deleted file mode 100644 index e4d05d5..0000000 --- a/temporal/transformation/wo_transform.py +++ /dev/null @@ -1,48 +0,0 @@ - - -# src/transformers/sensor_data.py -from typing import Dict, Any -import pandas as pd -import numpy as np -from datetime import datetime -from uuid import UUID, uuid4 -from config import config - - -class WoDataTransformer: - def transform(self, df: pd.DataFrame) -> pd.DataFrame: - """ - Transform sensor data according to business rules - """ - # Create a copy to avoid modifying original data - transformed = df.copy() - - # 1. Add UUID - transformed['id'] = uuid4() - - # # 5. Drop unnecessary columns - # columns_to_drop = self.config.get('columns_to_drop', []) - # if columns_to_drop: - # transformed = transformed.drop(columns=columns_to_drop, errors='ignore') - - return transformed - - def validate(self, df: pd.DataFrame) -> bool: - """ - Validate transformed data - """ - if df.empty: - return False - - # Check required columns - if not all(col in df.columns for col in config.get('columns')): - return False - - # check id column and id is UUID - if 'id' not in df.columns: - return False - - if not all(isinstance(val, UUID) for val in df['id']): - return False - - return True diff --git a/temporal/workflows/__init__.py b/temporal/workflows/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/temporal/workflows/historical_migration.py b/temporal/workflows/historical_migration.py deleted file mode 100644 index 3a9b462..0000000 --- a/temporal/workflows/historical_migration.py +++ /dev/null @@ -1,123 +0,0 @@ -from datetime import timedelta -import pandas as pd -from connectors.database import DatabaseConnector -from temporalio import workflow, activity -from temporalio.common import RetryPolicy -from typing import Dict, List -from config import source_config, target_config, config -from transformation.wo_transform import WoDataTransformer -# Activities - - -@activity.defn -async def fetch_data_activity(batch_size: int, last_id: int, columns) -> Dict: - db_connector = DatabaseConnector(source_config, target_config) - df = db_connector.fetch_batch(batch_size, last_id, columns) - return df.to_dict(orient='records') - - -@activity.defn -async def transform_data_activity(data: List[Dict]) -> List[Dict]: - transformer = WoDataTransformer() - df = pd.DataFrame(data) - transformed_df = transformer.transform(df) - return transformed_df.to_dict(orient='records') - - -@activity.defn -async def validate_data_activity(data: List[Dict]) -> bool: - transformer = WoDataTransformer() - df = pd.DataFrame(data) - return transformer.validate(df) - - -@activity.defn -async def load_data_activity(data: List[Dict], target_table: str) -> bool: - db_connector = DatabaseConnector(source_config, target_config) - df = pd.DataFrame(data) - return db_connector.load_batch(df, target_table) - -# Workflow - - -@workflow.defn -class DataMigrationWorkflow: - def __init__(self): - self._total_processed = 0 - self._last_id = 0 - - @workflow.run - async def run(self) -> Dict: - retry_policy = RetryPolicy( - initial_interval=timedelta(seconds=1), - maximum_interval=timedelta(minutes=10), - maximum_attempts=3 - ) - - batch_size = config.get('batch_size', 1000) - target_table = config.get('target_table') - columns = config.get('columns') - - while True: - # 1. Fetch batch - data = await workflow.execute_activity( - fetch_data_activity, - args=[batch_size, self._last_id, columns], - retry_policy=retry_policy, - start_to_close_timeout=timedelta(minutes=5) - ) - - if not data: - break - - # 2. Transform data - transformed_data = await workflow.execute_activity( - transform_data_activity, - args=[data, config], - retry_policy=retry_policy, - start_to_close_timeout=timedelta(minutes=10) - ) - - # 3. Validate data - is_valid = await workflow.execute_activity( - validate_data_activity, - args=[transformed_data], - retry_policy=retry_policy, - start_to_close_timeout=timedelta(minutes=5) - ) - - if not is_valid: - raise ValueError( - f"Data validation failed for batch after ID {self._last_id}") - - # 4. Load data - success = await workflow.execute_activity( - load_data_activity, - args=[transformed_data, target_table], - retry_policy=retry_policy, - start_to_close_timeout=timedelta(minutes=10) - ) - - if not success: - raise Exception( - f"Failed to load batch after ID {self._last_id}") - - # Update progress - self._total_processed += len(data) - self._last_id = data[-1]['id'] - - # Record progress - # await workflow.execute_activity( - # record_progress_activity, - # args=[{ - # 'last_id': self._last_id, - # 'total_processed': self._total_processed - # }], - # retry_policy=retry_policy, - # start_to_close_timeout=timedelta(minutes=1) - # ) - - return { - 'total_processed': self._total_processed, - 'last_id': self._last_id - }