update scope equipment
parent
a8161e7edb
commit
9ca17db2ab
@ -0,0 +1,21 @@
|
||||
|
||||
|
||||
from sqlalchemy import UUID, Column, Float, Integer, String, ForeignKey
|
||||
from src.database.core import Base
|
||||
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
|
||||
from src.models import DefaultMixin
|
||||
|
||||
|
||||
class MasterWorkOrder(Base, DefaultMixin):
|
||||
__tablename__ = "oh_wo_master"
|
||||
|
||||
assetnum = Column(String, nullable=True)
|
||||
worktype = Column(String, nullable=True)
|
||||
workgroup = Column(String, nullable=True)
|
||||
total_cost_max = Column(Float, nullable=True)
|
||||
|
||||
|
||||
scope_equipments = relationship("ScopeEquipment", lazy="raise", primaryjoin="and_(MasterWorkOrder.assetnum == foreign(ScopeEquipment.assetnum))")
|
||||
@ -1,40 +0,0 @@
|
||||
from connectors.database import DBConfig
|
||||
from starlette.config import Config
|
||||
import os
|
||||
|
||||
|
||||
def get_config():
|
||||
try:
|
||||
# Try to load from .env file first
|
||||
config = Config(".env")
|
||||
except FileNotFoundError:
|
||||
# If .env doesn't exist, use environment variables
|
||||
config = Config(environ=os.environ)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
env = get_config()
|
||||
|
||||
config = {
|
||||
'batch_size': 1000,
|
||||
'target_table': 'oh_wo_master',
|
||||
'columns': ['assetnum', 'worktype', 'workgroup', 'total_cost_max', 'created_at'],
|
||||
}
|
||||
|
||||
target_config = DBConfig(
|
||||
host=env("DATABASE_HOSTNAME"),
|
||||
port=env("DATABASE_PORT"),
|
||||
database=env("DATABASE_NAME"),
|
||||
user=env("DATABASE_CREDENTIAL_USER"),
|
||||
password=env("DATABASE_CREDENTIAL_PASSWORD")
|
||||
)
|
||||
|
||||
|
||||
source_config = DBConfig(
|
||||
host=env("COLLECTOR_HOSTNAME"),
|
||||
port=env("COLLECTOR_PORT"),
|
||||
database=env("COLLECTOR_NAME"),
|
||||
user=env("COLLECTOR_CREDENTIAL_USER"),
|
||||
password=env("COLLECTOR_CREDENTIAL_PASSWORD")
|
||||
)
|
||||
@ -1,104 +0,0 @@
|
||||
# src/connectors/database.py
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, Any, Generator
|
||||
import pandas as pd
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.engine import Engine
|
||||
import logging
|
||||
import os
|
||||
from starlette.config import Config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class DBConfig:
|
||||
host: str
|
||||
port: int
|
||||
database: str
|
||||
user: str
|
||||
password: str
|
||||
|
||||
def get_connection_string(self) -> str:
|
||||
return f"postgresql+psycopg2://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
|
||||
|
||||
|
||||
class DatabaseConnector:
|
||||
def __init__(self, source_config: DBConfig, target_config: DBConfig):
|
||||
self.source_engine = create_engine(
|
||||
source_config.get_connection_string())
|
||||
self.target_engine = create_engine(
|
||||
target_config.get_connection_string())
|
||||
|
||||
def fetch_batch(
|
||||
self,
|
||||
batch_size: int,
|
||||
last_id: int = 0,
|
||||
columns: List[str] = None
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Fetch a batch of data from source database
|
||||
"""
|
||||
try:
|
||||
query = """
|
||||
SELECT {}
|
||||
FROM dl_wo_staging
|
||||
WHERE id > :last_id
|
||||
AND worktype IN ('PM', 'CM', 'EM', 'PROACTIVE')
|
||||
ORDER BY id
|
||||
LIMIT :batch_size
|
||||
""".format(', '.join(columns) if columns else '*')
|
||||
|
||||
# Execute query
|
||||
params = {
|
||||
"last_id": last_id,
|
||||
"batch_size": batch_size
|
||||
}
|
||||
|
||||
df = pd.read_sql(
|
||||
text(query),
|
||||
self.source_engine,
|
||||
params=params
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching batch: {str(e)}")
|
||||
raise
|
||||
|
||||
def load_batch(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
target_table: str,
|
||||
if_exists: str = 'append'
|
||||
) -> bool:
|
||||
"""
|
||||
Load a batch of data to target database
|
||||
"""
|
||||
try:
|
||||
df.to_sql(
|
||||
target_table,
|
||||
self.target_engine,
|
||||
if_exists=if_exists,
|
||||
index=False,
|
||||
method='multi',
|
||||
chunksize=1000
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading batch: {str(e)}")
|
||||
raise
|
||||
|
||||
def get_total_records(self) -> int:
|
||||
"""Get total number of records to migrate"""
|
||||
try:
|
||||
with self.source_engine.connect() as conn:
|
||||
result = conn.execute(text("SELECT COUNT(*) FROM sensor_data"))
|
||||
return result.scalar()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting total records: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@ -1,39 +0,0 @@
|
||||
# src/scripts/run_migration.py
|
||||
import asyncio
|
||||
from temporalio.client import Client
|
||||
from temporalio.worker import Worker
|
||||
from workflows.historical_migration import DataMigrationWorkflow, fetch_data_activity, transform_data_activity, validate_data_activity, load_data_activity
|
||||
|
||||
|
||||
async def main():
|
||||
# Create Temporal client
|
||||
client = await Client.connect("192.168.1.82:7233")
|
||||
|
||||
# Create worker
|
||||
worker = Worker(
|
||||
client,
|
||||
task_queue="migration-queue",
|
||||
workflows=[DataMigrationWorkflow],
|
||||
activities=[
|
||||
fetch_data_activity,
|
||||
transform_data_activity,
|
||||
validate_data_activity,
|
||||
load_data_activity
|
||||
]
|
||||
)
|
||||
|
||||
# Start worker
|
||||
await worker.run()
|
||||
|
||||
# # Start workflow
|
||||
# handle = await client.start_workflow(
|
||||
# DataMigrationWorkflow.run,
|
||||
# id="data-migration",
|
||||
# task_queue="migration-queue"
|
||||
# )
|
||||
|
||||
# result = await handle.result()
|
||||
# print(f"Migration completed: {result}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@ -1,20 +0,0 @@
|
||||
from temporalio.client import Client
|
||||
from workflows.historical_migration import DataMigrationWorkflow
|
||||
|
||||
|
||||
async def run():
|
||||
# Start workflow
|
||||
|
||||
client = await Client.connect("192.168.1.82:7233")
|
||||
|
||||
handle = await client.start_workflow(
|
||||
DataMigrationWorkflow.run,
|
||||
id="data-migration",
|
||||
task_queue="migration-queue"
|
||||
)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(run())
|
||||
@ -1,48 +0,0 @@
|
||||
|
||||
|
||||
# src/transformers/sensor_data.py
|
||||
from typing import Dict, Any
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
from uuid import UUID, uuid4
|
||||
from config import config
|
||||
|
||||
|
||||
class WoDataTransformer:
|
||||
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Transform sensor data according to business rules
|
||||
"""
|
||||
# Create a copy to avoid modifying original data
|
||||
transformed = df.copy()
|
||||
|
||||
# 1. Add UUID
|
||||
transformed['id'] = uuid4()
|
||||
|
||||
# # 5. Drop unnecessary columns
|
||||
# columns_to_drop = self.config.get('columns_to_drop', [])
|
||||
# if columns_to_drop:
|
||||
# transformed = transformed.drop(columns=columns_to_drop, errors='ignore')
|
||||
|
||||
return transformed
|
||||
|
||||
def validate(self, df: pd.DataFrame) -> bool:
|
||||
"""
|
||||
Validate transformed data
|
||||
"""
|
||||
if df.empty:
|
||||
return False
|
||||
|
||||
# Check required columns
|
||||
if not all(col in df.columns for col in config.get('columns')):
|
||||
return False
|
||||
|
||||
# check id column and id is UUID
|
||||
if 'id' not in df.columns:
|
||||
return False
|
||||
|
||||
if not all(isinstance(val, UUID) for val in df['id']):
|
||||
return False
|
||||
|
||||
return True
|
||||
@ -1,123 +0,0 @@
|
||||
from datetime import timedelta
|
||||
import pandas as pd
|
||||
from connectors.database import DatabaseConnector
|
||||
from temporalio import workflow, activity
|
||||
from temporalio.common import RetryPolicy
|
||||
from typing import Dict, List
|
||||
from config import source_config, target_config, config
|
||||
from transformation.wo_transform import WoDataTransformer
|
||||
# Activities
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def fetch_data_activity(batch_size: int, last_id: int, columns) -> Dict:
|
||||
db_connector = DatabaseConnector(source_config, target_config)
|
||||
df = db_connector.fetch_batch(batch_size, last_id, columns)
|
||||
return df.to_dict(orient='records')
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def transform_data_activity(data: List[Dict]) -> List[Dict]:
|
||||
transformer = WoDataTransformer()
|
||||
df = pd.DataFrame(data)
|
||||
transformed_df = transformer.transform(df)
|
||||
return transformed_df.to_dict(orient='records')
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def validate_data_activity(data: List[Dict]) -> bool:
|
||||
transformer = WoDataTransformer()
|
||||
df = pd.DataFrame(data)
|
||||
return transformer.validate(df)
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def load_data_activity(data: List[Dict], target_table: str) -> bool:
|
||||
db_connector = DatabaseConnector(source_config, target_config)
|
||||
df = pd.DataFrame(data)
|
||||
return db_connector.load_batch(df, target_table)
|
||||
|
||||
# Workflow
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class DataMigrationWorkflow:
|
||||
def __init__(self):
|
||||
self._total_processed = 0
|
||||
self._last_id = 0
|
||||
|
||||
@workflow.run
|
||||
async def run(self) -> Dict:
|
||||
retry_policy = RetryPolicy(
|
||||
initial_interval=timedelta(seconds=1),
|
||||
maximum_interval=timedelta(minutes=10),
|
||||
maximum_attempts=3
|
||||
)
|
||||
|
||||
batch_size = config.get('batch_size', 1000)
|
||||
target_table = config.get('target_table')
|
||||
columns = config.get('columns')
|
||||
|
||||
while True:
|
||||
# 1. Fetch batch
|
||||
data = await workflow.execute_activity(
|
||||
fetch_data_activity,
|
||||
args=[batch_size, self._last_id, columns],
|
||||
retry_policy=retry_policy,
|
||||
start_to_close_timeout=timedelta(minutes=5)
|
||||
)
|
||||
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 2. Transform data
|
||||
transformed_data = await workflow.execute_activity(
|
||||
transform_data_activity,
|
||||
args=[data, config],
|
||||
retry_policy=retry_policy,
|
||||
start_to_close_timeout=timedelta(minutes=10)
|
||||
)
|
||||
|
||||
# 3. Validate data
|
||||
is_valid = await workflow.execute_activity(
|
||||
validate_data_activity,
|
||||
args=[transformed_data],
|
||||
retry_policy=retry_policy,
|
||||
start_to_close_timeout=timedelta(minutes=5)
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
raise ValueError(
|
||||
f"Data validation failed for batch after ID {self._last_id}")
|
||||
|
||||
# 4. Load data
|
||||
success = await workflow.execute_activity(
|
||||
load_data_activity,
|
||||
args=[transformed_data, target_table],
|
||||
retry_policy=retry_policy,
|
||||
start_to_close_timeout=timedelta(minutes=10)
|
||||
)
|
||||
|
||||
if not success:
|
||||
raise Exception(
|
||||
f"Failed to load batch after ID {self._last_id}")
|
||||
|
||||
# Update progress
|
||||
self._total_processed += len(data)
|
||||
self._last_id = data[-1]['id']
|
||||
|
||||
# Record progress
|
||||
# await workflow.execute_activity(
|
||||
# record_progress_activity,
|
||||
# args=[{
|
||||
# 'last_id': self._last_id,
|
||||
# 'total_processed': self._total_processed
|
||||
# }],
|
||||
# retry_policy=retry_policy,
|
||||
# start_to_close_timeout=timedelta(minutes=1)
|
||||
# )
|
||||
|
||||
return {
|
||||
'total_processed': self._total_processed,
|
||||
'last_id': self._last_id
|
||||
}
|
||||
Loading…
Reference in New Issue