main
Cizz22 1 year ago
commit fbb9dc2031

@ -0,0 +1,2 @@
env/
.env

2
.gitignore vendored

@ -0,0 +1,2 @@
env/
.env

@ -0,0 +1,44 @@
# Quick Start:
#
# pip install pre-commit
# pre-commit install && pre-commit install -t pre-push
# pre-commit run --all-files
#
# To Skip Checks:
#
# git commit --no-verify
fail_fast: false
default_language_version:
python: python3.11.2
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# ruff version.
rev: v0.7.0
hooks:
# Run the linter.
#
# When running with --fix, Ruff's lint hook should be placed before Ruff's formatter hook,
# and before Black, isort, and other formatting tools, as Ruff's fix behavior can output code changes that require reformatting.
- id: ruff
args: [--fix]
# Run the formatter.
- id: ruff-format
# Typos
- repo: https://github.com/crate-ci/typos
rev: v1.26.1
hooks:
- id: typos
exclude: ^(data/dispatch-sample-data.dump|src/dispatch/static/dispatch/src/|src/dispatch/database/revisions/)
# Pytest
- repo: local
hooks:
- id: tests
name: run tests
entry: pytest -v tests/
language: system
types: [python]
stages: [push]

@ -0,0 +1,14 @@
# Define variables
POETRY = poetry
PYTHON = $(POETRY) run python
APP = src/server.py
# Targets and their rules
# Install dependencies
install:
$(POETRY) install
# Run the application
run:
python bin/run.py

1614
poetry.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,28 @@
[tool.poetry]
name = "lccaappservice"
version = "0.1.0"
description = ""
authors = ["Cizz22 <cisatraa@gmail.com>"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
fastapi = {extras = ["standard"], version = "^0.115.4"}
sqlalchemy = "^2.0.36"
httpx = "^0.27.2"
pytest = "^8.3.3"
faker = "^30.8.2"
factory-boy = "^3.3.1"
sqlalchemy-utils = "^0.41.2"
slowapi = "^0.1.9"
uvicorn = "^0.32.0"
pytz = "^2024.2"
sqlalchemy-filters = "^0.13.0"
psycopg2-binary = "^2.9.10"
asyncpg = "^0.30.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

@ -0,0 +1,4 @@
import uvicorn
if __name__ == "__main__":
uvicorn.run("src.main:app", host="0.0.0.0", port=8081, reload=True)

@ -0,0 +1,58 @@
from typing import List, Optional
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from src.auth.service import JWTBearer
from src.masterdata.router import router as masterdata_router
from src.equipment.router import router as equipment_router
from src.yeardata.router import router as yeardata_router
from src.equipment_master.router import router as equipment_master_router
class ErrorMessage(BaseModel):
msg: str
class ErrorResponse(BaseModel):
detail: Optional[List[ErrorMessage]]
api_router = APIRouter(
default_response_class=JSONResponse,
responses={
400: {"model": ErrorResponse},
401: {"model": ErrorResponse},
403: {"model": ErrorResponse},
404: {"model": ErrorResponse},
500: {"model": ErrorResponse},
},
)
@api_router.get("/healthcheck", include_in_schema=False)
def healthcheck():
return {"status": "ok"}
authenticated_api_router = APIRouter(dependencies=[Depends(JWTBearer())],
)
# Master Data
authenticated_api_router.include_router(
masterdata_router, prefix="/masterdata", tags=["masterdata"])
authenticated_api_router.include_router(
equipment_router, prefix="/equipment", tags=["equipment"])
authenticated_api_router.include_router(
equipment_master_router, prefix="/equipment-master", tags=["equipment_master"])
authenticated_api_router.include_router(
yeardata_router, prefix="/yeardata", tags=["yeardata"])
api_router.include_router(authenticated_api_router)

@ -0,0 +1,9 @@
from pydantic import BaseModel
class UserBase(BaseModel):
name: str
role: str
user_id: str

@ -0,0 +1,55 @@
# app/auth/auth_bearer.py
from typing import Annotated, Optional
from fastapi import Depends, Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import requests
import src.config as config
from .model import UserBase
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=403, detail="Invalid authentication scheme.")
user_info = self.verify_jwt(credentials.credentials)
if not user_info:
raise HTTPException(
status_code=403, detail="Invalid token or expired token.")
request.state.user = user_info
return user_info
else:
raise HTTPException(
status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> Optional[UserBase]:
try:
response = requests.get(
f"{config.AUTH_SERVICE_API}/verify-token",
headers={"Authorization": f"Bearer {jwtoken}"},
)
if not response.ok:
return None
user_data = response.json()
return UserBase(**user_data['data'])
except Exception as e:
print(f"Token verification error: {str(e)}")
return None
# Create dependency to get current user from request state
async def get_current_user(request: Request) -> UserBase:
return request.state.user
CurrentUser = Annotated[UserBase, Depends(get_current_user)]

@ -0,0 +1,63 @@
import logging
import os
import base64
from urllib import parse
from typing import List
from pydantic import BaseModel
from starlette.config import Config
from starlette.datastructures import CommaSeparatedStrings
log = logging.getLogger(__name__)
class BaseConfigurationModel(BaseModel):
"""Base configuration model used by all config options."""
pass
def get_env_tags(tag_list: List[str]) -> dict:
"""Create dictionary of available env tags."""
tags = {}
for t in tag_list:
tag_key, env_key = t.split(":")
env_value = os.environ.get(env_key)
if env_value:
tags.update({tag_key: env_value})
return tags
config = Config(".env")
LOG_LEVEL = config("LOG_LEVEL", default=logging.WARNING)
ENV = config("ENV", default="local")
# database
DATABASE_HOSTNAME = config("DATABASE_HOSTNAME")
_DATABASE_CREDENTIAL_USER = config("DATABASE_CREDENTIAL_USER")
_DATABASE_CREDENTIAL_PASSWORD = config("DATABASE_CREDENTIAL_PASSWORD")
_QUOTED_DATABASE_PASSWORD = parse.quote(str(_DATABASE_CREDENTIAL_PASSWORD))
DATABASE_NAME = config("DATABASE_NAME", default="digital_twin")
DATABASE_PORT = config("DATABASE_PORT", default="5432")
DATABASE_ENGINE_POOL_SIZE = config(
"DATABASE_ENGINE_POOL_SIZE", cast=int, default=20)
DATABASE_ENGINE_MAX_OVERFLOW = config(
"DATABASE_ENGINE_MAX_OVERFLOW", cast=int, default=0)
# Deal with DB disconnects
# https://docs.sqlalchemy.org/en/20/core/pooling.html#pool-disconnects
DATABASE_ENGINE_POOL_PING = config("DATABASE_ENGINE_POOL_PING", default=False)
SQLALCHEMY_DATABASE_URI = f"postgresql+asyncpg://{_DATABASE_CREDENTIAL_USER}:{_QUOTED_DATABASE_PASSWORD}@{DATABASE_HOSTNAME}:{DATABASE_PORT}/{DATABASE_NAME}"
TIMEZONE = "Asia/Jakarta"
AUTH_SERVICE_API = config(
"AUTH_SERVICE_API", default="http://192.168.1.82:8000/auth")

@ -0,0 +1,156 @@
# src/database.py
from starlette.requests import Request
from sqlalchemy_utils import get_mapper
from sqlalchemy.sql.expression import true
from sqlalchemy.orm import object_session, sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy import create_engine, inspect
from pydantic import BaseModel
from fastapi import Depends
from typing import Annotated, Any
from contextlib import contextmanager
import re
import functools
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from src.config import SQLALCHEMY_DATABASE_URI
engine = create_async_engine(
SQLALCHEMY_DATABASE_URI,
echo=False,
future=True
)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
def get_db(request: Request):
return request.state.db
DbSession = Annotated[AsyncSession, Depends(get_db)]
class CustomBase:
__repr_attrs__ = []
__repr_max_length__ = 15
@declared_attr
def __tablename__(self):
return resolve_table_name(self.__name__)
def dict(self):
"""Returns a dict representation of a model."""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
@property
def _id_str(self):
ids = inspect(self).identity
if ids:
return "-".join([str(x) for x in ids]) if len(ids) > 1 else str(ids[0])
else:
return "None"
@property
def _repr_attrs_str(self):
max_length = self.__repr_max_length__
values = []
single = len(self.__repr_attrs__) == 1
for key in self.__repr_attrs__:
if not hasattr(self, key):
raise KeyError(
"{} has incorrect attribute '{}' in " "__repr__attrs__".format(
self.__class__, key
)
)
value = getattr(self, key)
wrap_in_quote = isinstance(value, str)
value = str(value)
if len(value) > max_length:
value = value[:max_length] + "..."
if wrap_in_quote:
value = "'{}'".format(value)
values.append(value if single else "{}:{}".format(key, value))
return " ".join(values)
def __repr__(self):
# get id like '#123'
id_str = ("#" + self._id_str) if self._id_str else ""
# join class name, id and repr_attrs
return "<{} {}{}>".format(
self.__class__.__name__,
id_str,
" " + self._repr_attrs_str if self._repr_attrs_str else "",
)
Base = declarative_base(cls=CustomBase)
# make_searchable(Base.metadata)
@contextmanager
async def get_session():
"""Context manager to ensure the session is closed after use."""
session = async_session()
try:
yield session
await session.commit()
except:
await session.rollback()
raise
finally:
await session.close()
def resolve_table_name(name):
"""Resolves table names to their mapped names."""
names = re.split("(?=[A-Z])", name) # noqa
return "_".join([x.lower() for x in names if x])
raise_attribute_error = object()
# def resolve_attr(obj, attr, default=None):
# """Attempts to access attr via dotted notation, returns none if attr does not exist."""
# try:
# return functools.reduce(getattr, attr.split("."), obj)
# except AttributeError:
# return default
# def get_model_name_by_tablename(table_fullname: str) -> str:
# """Returns the model name of a given table."""
# return get_class_by_tablename(table_fullname=table_fullname).__name__
def get_class_by_tablename(table_fullname: str) -> Any:
"""Return class reference mapped to table."""
def _find_class(name):
for c in Base._decl_class_registry.values():
if hasattr(c, "__table__"):
if c.__table__.fullname.lower() == name.lower():
return c
mapped_name = resolve_table_name(table_fullname)
mapped_class = _find_class(mapped_name)
return mapped_class
# def get_table_name_by_class_instance(class_instance: Base) -> str:
# """Returns the name of the table for a given class instance."""
# return class_instance._sa_instance_state.mapper.mapped_table.name

@ -0,0 +1,131 @@
import logging
from typing import Annotated, List
from sqlalchemy import desc, func, or_, Select
from sqlalchemy_filters import apply_pagination
from sqlalchemy.exc import ProgrammingError
from .core import DbSession
from fastapi import Query, Depends
from pydantic.types import Json, constr
log = logging.getLogger(__name__)
# allows only printable characters
QueryStr = constr(pattern=r"^[ -~]+$", min_length=1)
def common_parameters(
db_session: DbSession, # type: ignore
current_user: QueryStr = Query(None, alias="currentUser"), # type: ignore
page: int = Query(1, gt=0, lt=2147483647),
items_per_page: int = Query(5, alias="itemsPerPage", gt=-2, lt=2147483647),
query_str: QueryStr = Query(None, alias="q"), # type: ignore
filter_spec: QueryStr = Query(None, alias="filter"), # type: ignore
sort_by: List[str] = Query([], alias="sortBy[]"),
descending: List[bool] = Query([], alias="descending[]"),
# role: QueryStr = Depends(get_current_role),
):
return {
"db_session": db_session,
"page": page,
"items_per_page": items_per_page,
"query_str": query_str,
"filter_spec": filter_spec,
"sort_by": sort_by,
"descending": descending,
"current_user": current_user,
# "role": role,
}
CommonParameters = Annotated[
dict[str, int | str | DbSession | QueryStr |
Json | List[str] | List[bool]],
Depends(common_parameters),
]
def search(*, query_str: str, query: Query, model, sort=False):
"""Perform a search based on the query."""
search_model = model
if not query_str.strip():
return query
search = []
if hasattr(search_model, "search_vector"):
vector = search_model.search_vector
search.append(vector.op("@@")(func.tsq_parse(query_str)))
if hasattr(search_model, "name"):
search.append(
search_model.name.ilike(f"%{query_str}%"),
)
search.append(search_model.name == query_str)
if not search:
raise Exception(f"Search not supported for model: {model}")
query = query.filter(or_(*search))
if sort:
query = query.order_by(
desc(func.ts_rank_cd(vector, func.tsq_parse(query_str))))
return query.params(term=query_str)
async def search_filter_sort_paginate(
db_session: DbSession,
model,
query_str: str = None,
filter_spec: str | dict | None = None,
page: int = 1,
items_per_page: int = 5,
sort_by: List[str] = None,
descending: List[bool] = None,
current_user: str = None,
):
"""Common functionality for searching, filtering, sorting, and pagination."""
# try:
query = Select(model)
if query_str:
sort = False if sort_by else True
query = search(query_str=query_str, query=query,
model=model, sort=sort)
# Get total count
count_query = Select(func.count()).select_from(query.subquery())
total = await db_session.scalar(count_query)
query = (
query
.offset((page - 1) * items_per_page)
.limit(items_per_page)
)
result = await db_session.execute(query)
items = result.scalars().all()
# try:
# query, pagination = apply_pagination(
# query=query, page_number=page, page_size=items_per_page)
# except ProgrammingError as e:
# log.debug(e)
# return {
# "items": [],
# "itemsPerPage": items_per_page,
# "page": page,
# "total": 0,
# }
return {
"items": items,
"itemsPerPage": items_per_page,
"page": page,
"total": total,
}

@ -0,0 +1,24 @@
from enum import StrEnum
class OptimumOHEnum(StrEnum):
"""
A custom Enum class that extends StrEnum.
This class inherits all functionality from StrEnum, including
string representation and automatic value conversion to strings.
Example:
class Visibility(DispatchEnum):
OPEN = "Open"
RESTRICTED = "Restricted"
assert str(Visibility.OPEN) == "Open"
"""
pass # No additional implementation needed
class ResponseStatus(OptimumOHEnum):
SUCCESS = "success"
ERROR = "error"

@ -0,0 +1,17 @@
from sqlalchemy import Column, Float, Integer, String
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class Equipment(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_ms_equipment_data"
equipment_id = Column(String, nullable=False)
acquisition_year = Column(Integer, nullable=False)
acquisition_cost = Column(Float, nullable=False)
capital_cost_record_time = Column(Integer, nullable=False)
design_life = Column(Integer, nullable=False)
forecasting_target_year = Column(Integer, nullable=False)
manhours_rate = Column(Float, nullable=False)

@ -0,0 +1,78 @@
from fastapi import APIRouter, HTTPException, status
from .model import Equipment
from .schema import EquipmentPagination, EquipmentRead, EquipmentCreate, EquipmentUpdate
from .service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[EquipmentPagination])
async def get_equipments(common: CommonParameters):
"""Get all equipment pagination."""
# return
return StandardResponse(
data=await search_filter_sort_paginate(model=Equipment, **common),
message="Data retrieved successfully",
)
@router.get("/tree", response_model=StandardResponse[EquipmentRead])
async def get_equipment_tree():
"""Get as tree structure."""
pass
@router.get("/{equipment_id}", response_model=StandardResponse[EquipmentRead])
async def get_equipment(db_session: DbSession, equipment_id: str):
equipment = await get(db_session=db_session, equipment_id=equipment_id)
if not equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=equipment, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[EquipmentRead])
async def create_equipment(db_session: DbSession, equipment_in: EquipmentCreate, current_user: CurrentUser):
equipment_in.created_by = current_user.name
equipment = await create(db_session=db_session, equipment_in=equipment_in)
return StandardResponse(data=equipment, message="Data created successfully")
@router.put("/{equipment_id}", response_model=StandardResponse[EquipmentRead])
async def update_equipment(db_session: DbSession, equipment_id: str, equipment_in: EquipmentUpdate, current_user: CurrentUser):
equipment = await get(db_session=db_session, equipment_id=equipment_id)
if not equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
equipment_in.updated_by = current_user.name
return StandardResponse(data=await update(db_session=db_session, equipment=equipment, equipment_in=equipment_in), message="Data updated successfully")
@router.delete("/{equipment_id}", response_model=StandardResponse[EquipmentRead])
async def delete_equipment(db_session: DbSession, equipment_id: str):
equipment = await get(db_session=db_session, equipment_id=equipment_id)
if not equipment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, equipment_id=equipment_id)
return StandardResponse(message="Data deleted successfully", data=equipment)

@ -0,0 +1,35 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
class EquipmentBase(DefultBase):
acquisition_year: Optional[int] = Field(None, nullable=True)
acquisition_cost: Optional[float] = Field(None, nullable=True)
capital_cost_record_time: Optional[int] = Field(None, nullable=True)
design_life: Optional[int] = Field(None, nullable=True)
forecasting_target_year: Optional[int] = Field(None, nullable=True)
manhours_rate: Optional[float] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class EquipmentCreate(EquipmentBase):
pass
class EquipmentUpdate(EquipmentBase):
pass
class EquipmentRead(EquipmentBase):
id: UUID
class EquipmentPagination(Pagination):
items: List[EquipmentRead] = []

@ -0,0 +1,53 @@
from sqlalchemy import Select, Delete
from .model import Equipment
from .schema import EquipmentCreate, EquipmentUpdate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
async def get(*, db_session: DbSession, equipment_id: str) -> Optional[Equipment]:
"""Returns a document based on the given document id."""
query = Select(Equipment).filter(Equipment.id == equipment_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(*, db_session: DbSession):
"""Returns all documents."""
query = Select(Equipment)
result = await db_session.execute(query)
return result.scalars().all()
async def create(*, db_session: DbSession, equipment_in: EquipmentCreate):
"""Creates a new document."""
equipment = Equipment(**equipment_in.model_dump())
db_session.add(equipment)
await db_session.commit()
return equipment
async def update(*, db_session: DbSession, equipment: Equipment, equipment_in: EquipmentUpdate):
"""Updates a document."""
data = equipment_in.model_dump()
update_data = equipment_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(equipment, field, update_data[field])
await db_session.commit()
return equipment
async def delete(*, db_session: DbSession, equipment_id: str):
"""Deletes a document."""
query = Delete(Equipment).where(Equipment.id == equipment_id)
await db_session.execute(query)
await db_session.commit()

@ -0,0 +1,53 @@
from sqlalchemy import UUID, Column, Float, ForeignKey, Integer, String
from src.database.core import Base
from src.models import DefaultMixin
from sqlalchemy.orm import relationship, declarative_base, declared_attr, remote, foreign, backref
class EquipmentMaster(Base, DefaultMixin):
__tablename__ = "ms_equipment_master"
id = Column(UUID(as_uuid=True), primary_key=True, index=True)
name = Column(String, nullable=False)
parent_id = Column(UUID(as_uuid=True), ForeignKey(
'ms_equipment_master.id', ondelete='CASCADE'), nullable=True)
equipment_tree_id = Column(UUID(as_uuid=True), ForeignKey(
'ms_equipment_tree.id', ondelete='CASCADE'), nullable=True)
category_id = Column(UUID(as_uuid=True), nullable=True)
system_tag = Column(String, nullable=True)
assetnum = Column(String, nullable=True)
location_tag = Column(String, nullable=True)
# Relationship definitions
# Define both sides of the relationship
# parent = relationship(
# "EquipmentMaster",
# back_populates="children",
# remote_side=[id]
# )
children = relationship(
"EquipmentMaster",
backref=backref("parent", remote_side=[id]),
lazy="selectin"
)
# equipment_tree = relationship(
# "EquipmentTree",
# back_populates="equipment_master"
# )
class EquipmentTree(Base, DefaultMixin):
__tablename__ = "ms_equipment_tree"
level_no = Column(Integer, nullable=False)
name = Column(String, nullable=False)
# equipment_master = relationship(
# "EquipmentMaster",
# back_populates="equipment_tree",
# cascade="all, delete-orphan",
# lazy="selectin" # Using selectin for better performance with recursive loads
# )

@ -0,0 +1,21 @@
from typing import Annotated, List, Optional
from src.models import StandardResponse
from .service import get_all_master
from fastapi import APIRouter, Query
from .schema import EquipmentMasterTree
from src.database.core import DbSession
router = APIRouter()
@router.get("", response_model=StandardResponse[List[EquipmentMasterTree]])
async def get_all_equipment_master_tree(
db_session: DbSession,
parent_id: Annotated[Optional[str], Query(description="Parent ID")] = None,
):
equipment_masters = await get_all_master(parent_id=parent_id, db_session=db_session)
return StandardResponse(data=equipment_masters, message="Data retrieved successfully")

@ -0,0 +1,35 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
class EquipmentMasterBase(DefultBase):
parent_id: Optional[UUID] = Field(None, nullable=True)
name: Optional[str] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
class EquipmentMasterCreate(EquipmentMasterBase):
pass
class EquipmentMasterUpdate(EquipmentMasterBase):
pass
class EquipmentMasterRead(EquipmentMasterBase):
id: UUID
equipment_tree_id: Optional[UUID] = Field(None, nullable=True)
category_id: Optional[UUID] = Field(None, nullable=True)
system_tag: Optional[str] = Field(None, nullable=True)
assetnum: Optional[str] = Field(None, nullable=True)
location_tag: Optional[str] = Field(None, nullable=True)
class EquipmentMasterTree(EquipmentMasterRead):
children: List[EquipmentMasterRead] = []

@ -0,0 +1,26 @@
from sqlalchemy import Select, select
from src.database.core import DbSession
from sqlalchemy.orm import selectinload
from typing import Optional
from .model import EquipmentMaster
async def get_all_master(*, db_session: DbSession, parent_id: Optional[str] = None):
query = Select(EquipmentMaster).options(selectinload(EquipmentMaster.children))
results = await db_session.execute(query)
results = results.unique().scalars().all()
return results
async def get_master(*, db_session: DbSession, equipment_master_id: str):
query = Select(EquipmentMaster).filter(
EquipmentMaster.id == equipment_master_id)
result = await db_session.execute(query)
result = result.scalars().one_or_none()
if result:
await db_session.refresh(result)
return result

@ -0,0 +1,164 @@
# Define base error model
import logging
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from src.enums import ResponseStatus
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, DataError, DBAPIError
from asyncpg.exceptions import DataError as AsyncPGDataError, PostgresError
class ErrorDetail(BaseModel):
field: Optional[str] = None
message: str
code: Optional[str] = None
params: Optional[Dict[str, Any]] = None
class ErrorResponse(BaseModel):
data: Optional[Any] = None
message: str
status: ResponseStatus = ResponseStatus.ERROR
errors: Optional[List[ErrorDetail]] = None
# Custom exception handler setup
def get_request_context(request: Request):
"""
Get detailed request context for logging.
"""
def get_client_ip():
"""
Get the real client IP address from Kong Gateway headers.
Kong sets X-Real-IP and X-Forwarded-For headers by default.
"""
# Kong specific headers
if "X-Real-IP" in request.headers:
return request.headers["X-Real-IP"]
# Fallback to X-Forwarded-For
if "X-Forwarded-For" in request.headers:
# Get the first IP (original client)
return request.headers["X-Forwarded-For"].split(",")[0].strip()
# Last resort
return request.client.host
return {
"endpoint": request.url.path,
"url": request.url,
"method": request.method,
"remote_addr": get_client_ip(),
}
def handle_sqlalchemy_error(error: SQLAlchemyError):
"""
Handle SQLAlchemy errors and return user-friendly error messages.
"""
original_error = getattr(error, 'orig', None)
print(original_error)
if isinstance(error, IntegrityError):
if "unique constraint" in str(error).lower():
return "This record already exists.", 409
elif "foreign key constraint" in str(error).lower():
return "Related record not found.", 400
else:
return "Data integrity error.", 400
elif isinstance(error, DataError) or isinstance(original_error, AsyncPGDataError):
return "Invalid data provided.", 400
elif isinstance(error, DBAPIError):
if "unique constraint" in str(error).lower():
return "This record already exists.", 409
elif "foreign key constraint" in str(error).lower():
return "Related record not found.", 400
elif "null value in column" in str(error).lower():
return "Required data missing.", 400
elif "invalid input for query argument" in str(error).lower():
return "Invalid data provided.", 400
else:
return "Database error.", 500
else:
# Log the full error for debugging purposes
logging.error(f"Unexpected database error: {str(error)}")
return "An unexpected database error occurred.", 500
def handle_exception(request: Request, exc: Exception):
"""
Global exception handler for Fastapi application.
"""
request_info = get_request_context(request)
if isinstance(exc, RateLimitExceeded):
_rate_limit_exceeded_handler(request, exc)
if isinstance(exc, HTTPException):
logging.error(
f"HTTP exception | Code: {exc.status_code} | Error: {exc.detail} | Request: {request_info}",
extra={"error_category": "http"},
)
return JSONResponse(
status_code=exc.status_code,
content={
"data": None,
"message": str(exc.detail),
"status": ResponseStatus.ERROR,
"errors": [
ErrorDetail(
message=str(exc.detail)
).model_dump()
]
}
)
if isinstance(exc, SQLAlchemyError):
error_message, status_code = handle_sqlalchemy_error(exc)
logging.error(
f"Database Error | Error: {str(error_message)} | Request: {request_info}",
extra={"error_category": "database"},
)
return JSONResponse(
status_code=status_code,
content={
"data": None,
"message": error_message,
"status": ResponseStatus.ERROR,
"errors": [
ErrorDetail(
message=error_message
).model_dump()
]
}
)
# Log unexpected errors
logging.error(
f"Unexpected Error | Error: {str(exc)} | Request: {request_info}",
extra={"error_category": "unexpected"},
)
return JSONResponse(
status_code=500,
content={
"data": None,
"message": exc.__class__.__name__,
"status": ResponseStatus.ERROR,
"errors": [
ErrorDetail(
message="An unexpected error occurred."
).model_dump()
]
}
)

@ -0,0 +1,33 @@
import logging
from src.config import LOG_LEVEL
from src.enums import OptimumOHEnum
LOG_FORMAT_DEBUG = "%(levelname)s:%(message)s:%(pathname)s:%(funcName)s:%(lineno)d"
class LogLevels(OptimumOHEnum):
info = "INFO"
warn = "WARN"
error = "ERROR"
debug = "DEBUG"
def configure_logging():
log_level = str(LOG_LEVEL).upper() # cast to string
log_levels = list(LogLevels)
if log_level not in log_levels:
# we use error as the default log level
logging.basicConfig(level=LogLevels.error)
return
if log_level == LogLevels.debug:
logging.basicConfig(level=log_level, format=LOG_FORMAT_DEBUG)
return
logging.basicConfig(level=log_level)
# sometimes the slack client can be too verbose
logging.getLogger("slack_sdk.web.base_client").setLevel(logging.CRITICAL)

@ -0,0 +1,111 @@
import time
import logging
from os import path
from uuid import uuid1
from typing import Optional, Final
from contextvars import ContextVar
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse
from pydantic import ValidationError
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from sqlalchemy import inspect
from sqlalchemy.orm import scoped_session
from sqlalchemy.ext.asyncio import async_scoped_session
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.routing import compile_path
from starlette.middleware.gzip import GZipMiddleware
from starlette.responses import Response, StreamingResponse, FileResponse
from starlette.staticfiles import StaticFiles
import logging
from src.enums import ResponseStatus
from src.logging import configure_logging
from src.rate_limiter import limiter
from src.api import api_router
from src.database.core import engine, async_session
from src.exceptions import handle_exception
log = logging.getLogger(__name__)
# we configure the logging level and format
configure_logging()
# we define the exception handlers
exception_handlers = {Exception: handle_exception}
# we create the ASGI for the app
app = FastAPI(exception_handlers=exception_handlers, openapi_url="", title="LCCA API",
description="Welcome to LCCA's API documentation!",
version="0.1.0")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(GZipMiddleware, minimum_size=2000)
REQUEST_ID_CTX_KEY: Final[str] = "request_id"
_request_id_ctx_var: ContextVar[Optional[str]] = ContextVar(
REQUEST_ID_CTX_KEY, default=None)
def get_request_id() -> Optional[str]:
return _request_id_ctx_var.get()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request_id = str(uuid1())
# we create a per-request id such that we can ensure that our session is scoped for a particular request.
# see: https://github.com/tiangolo/fastapi/issues/726
ctx_token = _request_id_ctx_var.set(request_id)
try:
session = async_scoped_session(async_session, scopefunc=get_request_id)
request.state.db = session()
response = await call_next(request)
except Exception as e:
raise e from None
finally:
await request.state.db.close()
_request_id_ctx_var.reset(ctx_token)
return response
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000 ; includeSubDomains"
return response
# class MetricsMiddleware(BaseHTTPMiddleware):
# async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# method = request.method
# endpoint = request.url.path
# tags = {"method": method, "endpoint": endpoint}
# try:
# start = time.perf_counter()
# response = await call_next(request)
# elapsed_time = time.perf_counter() - start
# tags.update({"status_code": response.status_code})
# metric_provider.counter("server.call.counter", tags=tags)
# metric_provider.timer("server.call.elapsed", value=elapsed_time, tags=tags)
# log.debug(f"server.call.elapsed.{endpoint}: {elapsed_time}")
# except Exception as e:
# metric_provider.counter("server.call.exception.counter", tags=tags)
# raise e from None
# return response
# app.add_middleware(ExceptionMiddleware)
app.include_router(api_router)

@ -0,0 +1,13 @@
from sqlalchemy import Column, Float
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class MasterData(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_master"
discount_rate = Column(Float, nullable=True)
inflation_rate = Column(Float, nullable=True)
manhours_rate = Column(Float, nullable=True)

@ -0,0 +1,72 @@
from fastapi import APIRouter, HTTPException, status
from .model import MasterData
from .schema import MasterDataPagination, MasterDataRead, MasterDataCreate, MasterDataUpdate
from .service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[MasterDataPagination])
async def get_masterdatas(common: CommonParameters):
"""Get all documents."""
# return
return StandardResponse(
data=await search_filter_sort_paginate(model=MasterData, **common),
message="Data retrieved successfully",
)
@router.get("/{masterdata_id}", response_model=StandardResponse[MasterDataRead])
async def get_masterdata(db_session: DbSession, masterdata_id: str):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=masterdata, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[MasterDataRead])
async def create_masterdata(db_session: DbSession, masterdata_in: MasterDataCreate, current_user: CurrentUser):
masterdata_in.created_by = current_user.name
masterdata = await create(db_session=db_session, masterdata_in=masterdata_in)
return StandardResponse(data=masterdata, message="Data created successfully")
@router.put("/{masterdata_id}", response_model=StandardResponse[MasterDataRead])
async def update_masterdata(db_session: DbSession, masterdata_id: str, masterdata_in: MasterDataUpdate, current_user: CurrentUser):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
masterdata_in.updated_by = current_user.name
return StandardResponse(data=await update(db_session=db_session, masterdata=masterdata, masterdata_in=masterdata_in), message="Data updated successfully")
@router.delete("/{masterdata_id}", response_model=StandardResponse[MasterDataRead])
async def delete_masterdata(db_session: DbSession, masterdata_id: str):
masterdata = await get(db_session=db_session, masterdata_id=masterdata_id)
if not masterdata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, masterdata_id=masterdata_id)
return StandardResponse(message="Data deleted successfully", data=masterdata)

@ -0,0 +1,34 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
from src.auth.service import CurrentUser
class MasterdataBase(DefultBase):
discount_rate: Optional[float]
inflation_rate: Optional[float]
manhours_rate: Optional[float]
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class MasterDataCreate(MasterdataBase):
pass
class MasterDataUpdate(MasterdataBase):
pass
class MasterDataRead(MasterdataBase):
id: UUID
class MasterDataPagination(Pagination):
items: List[MasterDataRead] = []

@ -0,0 +1,53 @@
from sqlalchemy import Select, Delete
from .model import MasterData
from .schema import MasterDataCreate, MasterDataUpdate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
async def get(*, db_session: DbSession, masterdata_id: str) -> Optional[MasterData]:
"""Returns a document based on the given document id."""
query = Select(MasterData).filter(MasterData.id == masterdata_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(*, db_session: DbSession):
"""Returns all documents."""
query = Select(MasterData)
result = await db_session.execute(query)
return result.scalars().all()
async def create(*, db_session: DbSession, masterdata_in: MasterDataCreate):
"""Creates a new document."""
masterdata = MasterData(**masterdata_in.model_dump())
db_session.add(masterdata)
await db_session.commit()
return masterdata
async def update(*, db_session: DbSession, masterdata: MasterData, masterdata_in: MasterDataUpdate):
"""Updates a document."""
data = masterdata_in.model_dump()
update_data = masterdata_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(masterdata, field, update_data[field])
await db_session.commit()
return masterdata
async def delete(*, db_session: DbSession, masterdata_id: str):
"""Deletes a document."""
query = Delete(MasterData).where(MasterData.id == masterdata_id)
await db_session.execute(query)
await db_session.commit()

@ -0,0 +1,46 @@
# import logging
# from dispatch.plugins.base import plugins
# from .config import METRIC_PROVIDERS
# log = logging.getLogger(__file__)
# class Metrics(object):
# _providers = []
# def __init__(self):
# if not METRIC_PROVIDERS:
# log.info(
# "No metric providers defined via METRIC_PROVIDERS env var. Metrics will not be sent."
# )
# else:
# self._providers = METRIC_PROVIDERS
# def gauge(self, name, value, tags=None):
# for provider in self._providers:
# log.debug(
# f"Sending gauge metric {name} to provider {provider}. Value: {value} Tags: {tags}"
# )
# p = plugins.get(provider)
# p.gauge(name, value, tags=tags)
# def counter(self, name, value=None, tags=None):
# for provider in self._providers:
# log.debug(
# f"Sending counter metric {name} to provider {provider}. Value: {value} Tags: {tags}"
# )
# p = plugins.get(provider)
# p.counter(name, value=value, tags=tags)
# def timer(self, name, value, tags=None):
# for provider in self._providers:
# log.debug(
# f"Sending timer metric {name} to provider {provider}. Value: {value} Tags: {tags}"
# )
# p = plugins.get(provider)
# p.timer(name, value, tags=tags)
# provider = Metrics()

@ -0,0 +1,85 @@
# src/common/models.py
from datetime import datetime
from typing import Generic, Optional, TypeVar
import uuid
from pydantic import BaseModel, Field, SecretStr
from sqlalchemy import Column, DateTime, String, func, event
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from src.config import TIMEZONE
import pytz
from src.auth.service import CurrentUser
from src.enums import ResponseStatus
# SQLAlchemy Mixins
class TimeStampMixin(object):
"""Timestamping mixin"""
created_at = Column(
DateTime(timezone=True), default=datetime.now(pytz.timezone(TIMEZONE)))
created_at._creation_order = 9998
updated_at = Column(
DateTime(timezone=True), default=datetime.now(pytz.timezone(TIMEZONE)))
updated_at._creation_order = 9998
@staticmethod
def _updated_at(mapper, connection, target):
target.updated_at = datetime.now(pytz.timezone(TIMEZONE))
@classmethod
def __declare_last__(cls):
event.listen(cls, "before_update", cls._updated_at)
class UUIDMixin:
"""UUID mixin"""
id = Column(UUID(as_uuid=True), primary_key=True,
default=uuid.uuid4, unique=True, nullable=False)
class IdentityMixin:
"""Identity mixin"""
created_by = Column(String(100), nullable=True)
updated_by = Column(String(100), nullable=True)
class DefaultMixin(TimeStampMixin, UUIDMixin):
"""Default mixin"""
pass
# Pydantic Models
class DefultBase(BaseModel):
class Config:
from_attributes = True
validate_assignment = True
arbitrary_types_allowed = True
str_strip_whitespace = True
json_encoders = {
# custom output conversion for datetime
datetime: lambda v: v.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if v else None,
SecretStr: lambda v: v.get_secret_value() if v else None,
}
class Pagination(DefultBase):
itemsPerPage: int
page: int
total: int
class PrimaryKeyModel(BaseModel):
id: uuid.UUID
# Define data type variable for generic response
T = TypeVar('T')
class StandardResponse(BaseModel, Generic[T]):
data: Optional[T] = None
message: str = "Success"
status: ResponseStatus = ResponseStatus.SUCCESS

@ -0,0 +1,5 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)

@ -0,0 +1,11 @@
from sqlalchemy import Column, Float, Integer, String
from src.database.core import Base
from src.models import DefaultMixin, IdentityMixin
class Yeardata(Base, DefaultMixin, IdentityMixin):
__tablename__ = "lcc_ms_year_data"
year = Column(Integer, nullable=False)
rp_per_kwh = Column(Float, nullable=False)

@ -0,0 +1,72 @@
from fastapi import APIRouter, HTTPException, status
from .model import Yeardata
from .schema import YeardataPagination, YeardataRead, YeardataCreate, YeardataUpdate
from .service import get, get_all, create, update, delete
from src.database.service import CommonParameters, search_filter_sort_paginate
from src.database.core import DbSession
from src.auth.service import CurrentUser
from src.models import StandardResponse
router = APIRouter()
@router.get("", response_model=StandardResponse[YeardataPagination])
async def get_yeardatas(common: CommonParameters):
"""Get all yeardata pagination."""
# return
return StandardResponse(
data=await search_filter_sort_paginate(model=Yeardata, **common),
message="Data retrieved successfully",
)
@router.get("/{yeardata_id}", response_model=StandardResponse[YeardataRead])
async def get_yeardata(db_session: DbSession, yeardata_id: str):
yeardata = await get(db_session=db_session, yeardata_id=yeardata_id)
if not yeardata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
return StandardResponse(data=yeardata, message="Data retrieved successfully")
@router.post("", response_model=StandardResponse[YeardataRead])
async def create_yeardata(db_session: DbSession, yeardata_in: YeardataCreate, current_user: CurrentUser):
yeardata_in.created_by = current_user.name
yeardata = await create(db_session=db_session, yeardata_in=yeardata_in)
return StandardResponse(data=yeardata, message="Data created successfully")
@router.put("/{yeardata_id}", response_model=StandardResponse[YeardataRead])
async def update_yeardata(db_session: DbSession, yeardata_id: str, yeardata_in: YeardataUpdate, current_user: CurrentUser):
yeardata = await get(db_session=db_session, yeardata_id=yeardata_id)
if not yeardata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A data with this id does not exist.",
)
yeardata_in.updated_by = current_user.name
return StandardResponse(data=await update(db_session=db_session, yeardata=yeardata, yeardata_in=yeardata_in), message="Data updated successfully")
@router.delete("/{yeardata_id}", response_model=StandardResponse[YeardataRead])
async def delete_yeardata(db_session: DbSession, yeardata_id: str):
yeardata = await get(db_session=db_session, yeardata_id=yeardata_id)
if not yeardata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": "A data with this id does not exist."}],
)
await delete(db_session=db_session, yeardata_id=yeardata_id)
return StandardResponse(message="Data deleted successfully", data=yeardata)

@ -0,0 +1,32 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import Field
from src.models import DefultBase, Pagination
class YeardataBase(DefultBase):
year: Optional[int] = Field(None, nullable=True)
rp_per_kwh: Optional[float] = Field(None, nullable=True)
created_at: Optional[datetime] = Field(None, nullable=True)
updated_at: Optional[datetime] = Field(None, nullable=True)
created_by: Optional[str] = Field(None, nullable=True)
updated_by: Optional[str] = Field(None, nullable=True)
class YeardataCreate(YeardataBase):
pass
class YeardataUpdate(YeardataBase):
pass
class YeardataRead(YeardataBase):
id: UUID
class YeardataPagination(Pagination):
items: List[YeardataRead] = []

@ -0,0 +1,53 @@
from sqlalchemy import Select, Delete
from .model import Yeardata
from .schema import YeardataCreate, YeardataUpdate
from typing import Optional
from src.database.core import DbSession
from src.auth.service import CurrentUser
async def get(*, db_session: DbSession, yeardata_id: str) -> Optional[Yeardata]:
"""Returns a document based on the given document id."""
query = Select(Yeardata).filter(Yeardata.id == yeardata_id)
result = await db_session.execute(query)
return result.scalars().one_or_none()
async def get_all(*, db_session: DbSession):
"""Returns all documents."""
query = Select(Yeardata)
result = await db_session.execute(query)
return result.scalars().all()
async def create(*, db_session: DbSession, yeardata_in: YeardataCreate):
"""Creates a new document."""
yeardata = Yeardata(**yeardata_in.model_dump())
db_session.add(yeardata)
await db_session.commit()
return yeardata
async def update(*, db_session: DbSession, yeardata: Yeardata, yeardata_in: YeardataUpdate):
"""Updates a document."""
data = yeardata_in.model_dump()
update_data = yeardata_in.model_dump(exclude_defaults=True)
for field in data:
if field in update_data:
setattr(yeardata, field, update_data[field])
await db_session.commit()
return yeardata
async def delete(*, db_session: DbSession, yeardata_id: str):
"""Deletes a document."""
query = Delete(Yeardata).where(Yeardata.id == yeardata_id)
await db_session.execute(query)
await db_session.commit()

@ -0,0 +1,69 @@
import asyncio
from typing import AsyncGenerator, Generator
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import pytest
from sqlalchemy_utils import drop_database, database_exists
from starlette.config import environ
from starlette.testclient import TestClient
# from src.database import Base, get_db
# from src.main import app
# Test database URL
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
engine = create_async_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def override_get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
app.dependency_overrides[get_db] = override_get_db
@pytest.fixture(scope="session")
def event_loop() -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(autouse=True)
async def setup_db() -> AsyncGenerator[None, None]:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient(app=app, base_url="http://test") as client:
yield client

@ -0,0 +1,3 @@
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

@ -0,0 +1,33 @@
import uuid
from datetime import datetime
from factory import (
LazyAttribute,
LazyFunction,
Sequence,
SubFactory,
post_generation,
SelfAttribute,
)
from factory.alchemy import SQLAlchemyModelFactory
from factory.fuzzy import FuzzyChoice, FuzzyDateTime, FuzzyInteger, FuzzyText
from faker import Faker
from faker.providers import misc
# from pytz import UTC
from .database import Session
fake = Faker()
fake.add_provider(misc)
class BaseFactory(SQLAlchemyModelFactory):
"""Base Factory."""
class Meta:
"""Factory configuration."""
abstract = True
sqlalchemy_session = Session
sqlalchemy_session_persistence = "commit"
Loading…
Cancel
Save