update activities
parent
415e3c0a0f
commit
7cd2c837df
@ -0,0 +1,120 @@
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict
|
||||
from fastapi import HTTPException
|
||||
import httpx
|
||||
from starlette.config import Config
|
||||
from src.config import config
|
||||
|
||||
|
||||
class MaximoDataMapper:
|
||||
"""
|
||||
Helper class to map MAXIMO API response to our data structure.
|
||||
Update these mappings according to actual MAXIMO API documentation.
|
||||
"""
|
||||
|
||||
def __init__(self, maximo_data: Dict[Any, Any]):
|
||||
self.data = maximo_data
|
||||
|
||||
def get_start_date(self) -> datetime:
|
||||
"""
|
||||
Extract start date from MAXIMO data.
|
||||
TODO: Update this based on actual MAXIMO API response structure
|
||||
Example: might be data['startDate'] or data['SCHEDSTART'] etc.
|
||||
"""
|
||||
# This is a placeholder - update with actual MAXIMO field name
|
||||
start_date_str = self.data.get('scheduleStart')
|
||||
if not start_date_str:
|
||||
raise ValueError("Start date not found in MAXIMO data")
|
||||
return datetime.fromisoformat(start_date_str)
|
||||
|
||||
def get_end_date(self) -> datetime:
|
||||
"""
|
||||
Extract end date from MAXIMO data.
|
||||
TODO: Update this based on actual MAXIMO API response structure
|
||||
"""
|
||||
# This is a placeholder - update with actual MAXIMO field name
|
||||
end_date_str = self.data.get('scheduleEnd')
|
||||
if not end_date_str:
|
||||
raise ValueError("End date not found in MAXIMO data")
|
||||
return datetime.fromisoformat(end_date_str)
|
||||
|
||||
def get_maximo_id(self) -> str:
|
||||
"""
|
||||
Extract MAXIMO ID from response.
|
||||
TODO: Update this based on actual MAXIMO API response structure
|
||||
"""
|
||||
# This is a placeholder - update with actual MAXIMO field name
|
||||
maximo_id = self.data.get('workOrderId')
|
||||
if not maximo_id:
|
||||
raise ValueError("MAXIMO ID not found in response")
|
||||
return str(maximo_id)
|
||||
|
||||
def get_status(self) -> str:
|
||||
"""
|
||||
Extract status from MAXIMO data.
|
||||
TODO: Update this based on actual MAXIMO API response structure
|
||||
"""
|
||||
# This is a placeholder - update with actual MAXIMO status field and values
|
||||
status = self.data.get('status', '').upper()
|
||||
return status
|
||||
|
||||
def get_total_cost(self) -> float:
|
||||
"""
|
||||
Extract total cost from MAXIMO data.
|
||||
TODO: Update this based on actual MAXIMO API response structure
|
||||
"""
|
||||
# This is a placeholder - update with actual MAXIMO field name
|
||||
cost = self.data.get('totalCost', 0)
|
||||
return float(cost)
|
||||
|
||||
|
||||
class MaximoService:
|
||||
def __init__(self):
|
||||
# TODO: Update these settings based on actual MAXIMO API configuration
|
||||
self.base_url = config.get("MAXIMO_BASE_URL")
|
||||
self.api_key = config.get("MAXIMO_API_KEY")
|
||||
|
||||
async def get_recent_overhaul(self) -> dict:
|
||||
"""
|
||||
Fetch most recent overhaul from MAXIMO.
|
||||
TODO: Update this method based on actual MAXIMO API endpoints and parameters
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
# TODO: Update endpoint and parameters based on actual MAXIMO API
|
||||
response = await client.get(
|
||||
f"{self.base_url}/your-endpoint-here",
|
||||
headers={
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
# Add any other required headers
|
||||
},
|
||||
params={
|
||||
# Update these parameters based on actual MAXIMO API
|
||||
"orderBy": "-scheduleEnd", # Example parameter
|
||||
"limit": 1
|
||||
}
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"MAXIMO API error: {response.text}"
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
if not data:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="No recent overhaul found"
|
||||
)
|
||||
|
||||
# TODO: Update this based on actual MAXIMO response structure
|
||||
return data[0] if isinstance(data, list) else data
|
||||
|
||||
except httpx.RequestError as e:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail=f"Failed to connect to MAXIMO: {str(e)}"
|
||||
)
|
||||
@ -0,0 +1,9 @@
|
||||
from src.enums import OptimumOHEnum
|
||||
|
||||
class OverhaulStatus(OptimumOHEnum):
|
||||
PLANNED = "PLANNED"
|
||||
IN_PROGRESS = "IN_PROGRESS"
|
||||
COMPLETED = "COMPLETED"
|
||||
DELAYED = "DELAYED"
|
||||
CANCELLED = "CANCELLED"
|
||||
ON_HOLD = "ON_HOLD"
|
||||
@ -0,0 +1,17 @@
|
||||
|
||||
from sqlalchemy import UUID, Column, DateTime, Float, ForeignKey, Integer, String
|
||||
from src.database.core import Base
|
||||
from src.models import DefaultMixin
|
||||
from .enums import OverhaulStatus
|
||||
|
||||
class OverhaulHistory(Base, DefaultMixin):
|
||||
__tablename__ = "oh_tr_overhaul_history"
|
||||
|
||||
scope_id = Column(UUID(as_uuid=True), ForeignKey(
|
||||
"oh_scope.id"), nullable=True)
|
||||
schedule_start_date = Column(DateTime(timezone=True))
|
||||
schedule_end_date = Column(DateTime(timezone=True))
|
||||
total_cost = Column(Float, nullable=False, default=0)
|
||||
status = Column(String, nullable=False, default=OverhaulStatus.PLANNED)
|
||||
maximo_id = Column(String, nullable=True,
|
||||
comment="Id From MAXIMO regarding overhaul schedule")
|
||||
@ -0,0 +1,51 @@
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
|
||||
from src.maximo.service import MaximoService
|
||||
|
||||
from .model import OverhaulHistory
|
||||
from .schema import OverhaulHistoryCreate, OverhaulHistoryRead, OverhaulHistoryUpdate, OverhaulHistoryPagination
|
||||
from .service import get, get_all, start_overhaul
|
||||
|
||||
from src.database.service import CommonParameters, search_filter_sort_paginate
|
||||
from src.database.core import DbSession
|
||||
from src.auth.service import CurrentUser
|
||||
from src.models import StandardResponse
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("", response_model=StandardResponse[OverhaulHistoryPagination])
|
||||
async def get_histories(common: CommonParameters):
|
||||
"""Get all scope pagination."""
|
||||
# return
|
||||
return StandardResponse(
|
||||
data=await search_filter_sort_paginate(model=OverhaulHistory, **common),
|
||||
message="Data retrieved successfully",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{overhaul_history_id}", response_model=StandardResponse[OverhaulHistoryRead])
|
||||
async def get_history(db_session: DbSession, overhaul_history_id: str):
|
||||
overhaul_history = await get(db_session=db_session, overhaul_history_id=overhaul_history_id)
|
||||
if not overhaul_history:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="A data with this id does not exist.",
|
||||
)
|
||||
|
||||
return StandardResponse(data=overhaul_history, message="Data retrieved successfully")
|
||||
|
||||
|
||||
@router.post("", response_model=StandardResponse[OverhaulHistoryRead])
|
||||
async def create_history(db_session: DbSession, scope_in: OverhaulHistoryRead):
|
||||
|
||||
try:
|
||||
maximo_service = MaximoService()
|
||||
maximo_data = await maximo_service.get_recent_overhaul()
|
||||
overhaul = await start_overhaul(db_session=db_session, maximo_data=maximo_data)
|
||||
|
||||
except HTTPException as he:
|
||||
raise he
|
||||
|
||||
return StandardResponse(data=overhaul, message="Data created successfully")
|
||||
@ -0,0 +1,33 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
|
||||
from pydantic import Field
|
||||
from src.models import DefultBase, Pagination
|
||||
from src.scope.schema import ScopeRead
|
||||
|
||||
|
||||
class OverhaulHistoryBase(DefultBase):
|
||||
pass
|
||||
|
||||
|
||||
class OverhaulHistoryCreate(OverhaulHistoryBase):
|
||||
pass
|
||||
|
||||
|
||||
class OverhaulHistoryUpdate(OverhaulHistoryBase):
|
||||
pass
|
||||
|
||||
|
||||
class OverhaulHistoryRead(OverhaulHistoryBase):
|
||||
id: UUID
|
||||
scope: ScopeRead
|
||||
schedule_start_date: datetime
|
||||
schedule_end_date: Optional[datetime]
|
||||
total_cost: Optional[float] = Field(0)
|
||||
maximo_id: Optional[str]
|
||||
|
||||
|
||||
class OverhaulHistoryPagination(Pagination):
|
||||
items: List[OverhaulHistoryRead] = []
|
||||
@ -0,0 +1,64 @@
|
||||
|
||||
|
||||
from fastapi import HTTPException
|
||||
from sqlalchemy import Select, Delete, and_
|
||||
|
||||
from src.maximo.service import MaximoDataMapper
|
||||
from src.overhaul_history.enums import OverhaulStatus
|
||||
from src.overhaul_history.utils import determine_overhaul_status
|
||||
from .model import OverhaulHistory
|
||||
from .schema import OverhaulHistoryRead, OverhaulHistoryCreate
|
||||
from typing import Optional
|
||||
|
||||
from src.database.core import DbSession
|
||||
from src.auth.service import CurrentUser
|
||||
from src.scope.service import get_by_scope_name
|
||||
|
||||
|
||||
async def get(*, db_session: DbSession, overhaul_history_id: str) -> Optional[OverhaulHistory]:
|
||||
"""Returns a document based on the given document id."""
|
||||
result = await db_session.get(OverhaulHistory, overhaul_history_id)
|
||||
return result.scalars().one_or_none()
|
||||
|
||||
|
||||
async def get_all(*, db_session: DbSession):
|
||||
"""Returns all documents."""
|
||||
query = Select(OverhaulHistory)
|
||||
result = await db_session.execute(query)
|
||||
return result.scalars().all()
|
||||
|
||||
|
||||
async def start_overhaul(*, db_session: DbSession, maximo_data: dict):
|
||||
mapper = MaximoDataMapper(maximo_data)
|
||||
maximo_id = mapper.get_maximo_id()
|
||||
|
||||
# Check for existing overhaul
|
||||
existing_overhaul = db_session.query(OverhaulHistory).filter(
|
||||
and_(
|
||||
OverhaulHistory.maximo_id == maximo_id,
|
||||
OverhaulHistory.status == OverhaulStatus.IN_PROGRESS
|
||||
)
|
||||
).first()
|
||||
|
||||
if existing_overhaul:
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail=f"Overhaul with MAXIMO ID {maximo_id} already started"
|
||||
)
|
||||
|
||||
status, status_reason = await determine_overhaul_status(maximo_data)
|
||||
scope = await get_by_scope_name("A")
|
||||
|
||||
overhaul = OverhaulHistory(
|
||||
scope_id=scope.id,
|
||||
schedule_start_date=mapper.get_start_date(),
|
||||
schedule_end_date=mapper.get_end_date(),
|
||||
total_cost=mapper.get_total_cost(),
|
||||
maximo_id=maximo_id,
|
||||
status=status
|
||||
)
|
||||
|
||||
db_session.add(overhaul)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(overhaul)
|
||||
return overhaul
|
||||
@ -0,0 +1,20 @@
|
||||
from typing import Any, Dict, Optional
|
||||
from .enums import OverhaulStatus
|
||||
from src.maximo.service import MaximoDataMapper
|
||||
|
||||
|
||||
async def determine_overhaul_status(maximo_data: Dict[Any, Any]) -> tuple[str, Optional[str]]:
|
||||
"""Map MAXIMO status to our status enum"""
|
||||
mapper = MaximoDataMapper(maximo_data)
|
||||
maximo_status = mapper.get_status()
|
||||
|
||||
# TODO: Update these mappings based on actual MAXIMO status values
|
||||
status_mapping = {
|
||||
'COMP': OverhaulStatus.COMPLETED,
|
||||
'INPRG': OverhaulStatus.IN_PROGRESS,
|
||||
'PLAN': OverhaulStatus.PLANNED,
|
||||
'HOLD': OverhaulStatus.ON_HOLD,
|
||||
# Add other status mappings based on actual MAXIMO statuses
|
||||
}
|
||||
|
||||
return status_mapping.get(maximo_status, OverhaulStatus.PLANNED), None
|
||||
@ -0,0 +1,18 @@
|
||||
|
||||
from sqlalchemy import UUID, Column, Float, Integer, String, ForeignKey
|
||||
from src.database.core import Base
|
||||
from src.models import DefaultMixin, IdentityMixin, TimeStampMixin
|
||||
from sqlalchemy.orm import relationship
|
||||
from src.workorder.model import MasterWorkOrder
|
||||
from sqlalchemy.ext.hybrid import hybrid_property
|
||||
|
||||
|
||||
class ScopeEquipmentActivity(Base, DefaultMixin):
|
||||
__tablename__ = "oh_tr_overhaul_activity"
|
||||
|
||||
assetnum = Column(String, nullable=True)
|
||||
name = Column(String, nullable=False)
|
||||
cost = Column(Float, nullable=False, default=0)
|
||||
|
||||
scope_equipments = relationship(
|
||||
"ScopeEquipment", lazy="raise", primaryjoin="and_(ScopeEquipmentActivity.assetnum == foreign(ScopeEquipment.assetnum))", uselist=False)
|
||||
@ -0,0 +1,71 @@
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query, status
|
||||
|
||||
|
||||
from .service import get_all, create, get, update, delete
|
||||
from .schema import ScopeEquipmentActivityCreate, ScopeEquipmentActivityPagination, ScopeEquipmentActivityRead, ScopeEquipmentActivityUpdate
|
||||
|
||||
from src.models import StandardResponse
|
||||
from src.database.service import CommonParameters, search_filter_sort_paginate, DbSession
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("", response_model=StandardResponse[ScopeEquipmentActivityPagination])
|
||||
async def get_scope_equipment_activities(common: CommonParameters, assetnum: str = Query(None)):
|
||||
"""Get all scope activity pagination."""
|
||||
# return
|
||||
data = await get_all(common=common, assetnum=assetnum)
|
||||
|
||||
return StandardResponse(
|
||||
data=data,
|
||||
message="Data retrieved successfully",
|
||||
)
|
||||
|
||||
|
||||
@router.post("", response_model=StandardResponse[ScopeEquipmentActivityRead])
|
||||
async def create_activity(db_session: DbSession, scope_equipment_activity_in: ScopeEquipmentActivityCreate):
|
||||
|
||||
activity = await create(db_session=db_session, scope_equipment_activty_in=scope_equipment_activity_in)
|
||||
|
||||
return StandardResponse(data=activity, message="Data created successfully")
|
||||
|
||||
|
||||
@router.get("/{scope_equipment_activity_id}", response_model=StandardResponse[ScopeEquipmentActivityRead])
|
||||
async def get_activity(db_session: DbSession, scope_equipment_activity_id: str):
|
||||
activity = await get(db_session=db_session, scope_equipment_activity_id=scope_equipment_activity_id)
|
||||
if not activity:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="A data with this id does not exist.",
|
||||
)
|
||||
|
||||
return StandardResponse(data=activity, message="Data retrieved successfully")
|
||||
|
||||
|
||||
@router.put("/{scope_equipment_activity_id}", response_model=StandardResponse[ScopeEquipmentActivityRead])
|
||||
async def update_scope(db_session: DbSession, scope_equipment_activity_in: ScopeEquipmentActivityUpdate, scope_equipment_activity_id):
|
||||
activity = await get(db_session=db_session, scope_equipment_activity_id=scope_equipment_activity_id)
|
||||
|
||||
if not activity:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="A data with this id does not exist.",
|
||||
)
|
||||
|
||||
return StandardResponse(data=await update(db_session=db_session, activity=activity, scope_equipment_activity_in=scope_equipment_activity_in), message="Data updated successfully")
|
||||
|
||||
|
||||
@router.delete("/{scope_equipment_activity_id}", response_model=StandardResponse[ScopeEquipmentActivityRead])
|
||||
async def delete_scope(db_session: DbSession, scope_equipment_activity_id: str):
|
||||
activity = await get(db_session=db_session, scope_equipment_activity_id=scope_equipment_activity_id)
|
||||
|
||||
if not activity:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=[{"msg": "A data with this id does not exist."}],
|
||||
)
|
||||
|
||||
await delete(db_session=db_session, scope_equipment_activity_id=scope_equipment_activity_id)
|
||||
|
||||
return StandardResponse(message="Data deleted successfully", data=activity)
|
||||
@ -0,0 +1,69 @@
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
from src.models import DefultBase, Pagination
|
||||
|
||||
|
||||
class ScopeEquipmentActivityBase(DefultBase):
|
||||
assetnum: str = Field(..., description="Assetnum is required")
|
||||
|
||||
|
||||
class ScopeEquipmentActivityCreate(ScopeEquipmentActivityBase):
|
||||
name: str
|
||||
cost: Optional[float] = Field(0)
|
||||
|
||||
|
||||
class ScopeEquipmentActivityUpdate(ScopeEquipmentActivityBase):
|
||||
name: Optional[str] = Field(None)
|
||||
cost: Optional[str] = Field(0)
|
||||
|
||||
|
||||
class ScopeEquipmentActivityRead(ScopeEquipmentActivityBase):
|
||||
name: str
|
||||
cost: float
|
||||
|
||||
|
||||
class ScopeEquipmentActivityPagination(Pagination):
|
||||
items: List[ScopeEquipmentActivityRead] = []
|
||||
|
||||
|
||||
# {
|
||||
# "overview": {
|
||||
# "totalEquipment": 30,
|
||||
# "nextSchedule": {
|
||||
# "date": "2025-01-12",
|
||||
# "Overhaul": "B",
|
||||
# "equipmentCount": 30
|
||||
# }
|
||||
# },
|
||||
# "criticalParts": [
|
||||
# "Boiler feed pump",
|
||||
# "Boiler reheater system",
|
||||
# "Drum Level (Right) Root Valve A",
|
||||
# "BCP A Discharge Valve",
|
||||
# "BFPT A EXH Press HI Root VLV"
|
||||
# ],
|
||||
# "schedules": [
|
||||
# {
|
||||
# "date": "2025-01-12",
|
||||
# "Overhaul": "B",
|
||||
# "status": "upcoming"
|
||||
# }
|
||||
# // ... other scheduled overhauls
|
||||
# ],
|
||||
# "systemComponents": {
|
||||
# "boiler": {
|
||||
# "status": "operational",
|
||||
# "lastOverhaul": "2024-06-15"
|
||||
# },
|
||||
# "turbine": {
|
||||
# "hpt": { "status": "operational" },
|
||||
# "ipt": { "status": "operational" },
|
||||
# "lpt": { "status": "operational" }
|
||||
# }
|
||||
# // ... other major components
|
||||
# }
|
||||
# }
|
||||
@ -0,0 +1,58 @@
|
||||
|
||||
|
||||
from sqlalchemy import Select, Delete
|
||||
from typing import Optional
|
||||
|
||||
from .model import ScopeEquipmentActivity
|
||||
from .schema import ScopeEquipmentActivityCreate, ScopeEquipmentActivityUpdate
|
||||
|
||||
from src.database.core import DbSession
|
||||
from src.database.service import CommonParameters, search_filter_sort_paginate
|
||||
from src.auth.service import CurrentUser
|
||||
|
||||
|
||||
async def get(*, db_session: DbSession, scope_equipment_activity_id: str) -> Optional[ScopeEquipmentActivity]:
|
||||
"""Returns a document based on the given document id."""
|
||||
result = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
|
||||
return result
|
||||
|
||||
|
||||
async def get_all(common: CommonParameters, assetnum: Optional[str]):
|
||||
query = Select(ScopeEquipmentActivity)
|
||||
|
||||
if assetnum:
|
||||
query = query.filter(ScopeEquipmentActivity.assetnum == assetnum)
|
||||
|
||||
results = await search_filter_sort_paginate(model=query, **common)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
async def create(*, db_session: DbSession, scope_equipment_activty_in: ScopeEquipmentActivityCreate):
|
||||
activity = ScopeEquipmentActivity(
|
||||
**scope_equipment_activty_in.model_dump())
|
||||
db_session.add(activity)
|
||||
await db_session.commit()
|
||||
return activity
|
||||
|
||||
|
||||
async def update(*, db_session: DbSession, activity: ScopeEquipmentActivity, scope_equipment_activty_in: ScopeEquipmentActivityUpdate):
|
||||
"""Updates a document."""
|
||||
data = scope_equipment_activty_in.model_dump()
|
||||
|
||||
update_data = scope_equipment_activty_in.model_dump(exclude_defaults=True)
|
||||
|
||||
for field in data:
|
||||
if field in update_data:
|
||||
setattr(activity, field, update_data[field])
|
||||
|
||||
await db_session.commit()
|
||||
|
||||
return activity
|
||||
|
||||
|
||||
async def delete(*, db_session: DbSession, scope_equipment_activity_id: str):
|
||||
"""Deletes a document."""
|
||||
activity = await db_session.get(ScopeEquipmentActivity, scope_equipment_activity_id)
|
||||
await db_session.delete(activity)
|
||||
await db_session.commit()
|
||||
@ -0,0 +1,80 @@
|
||||
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import re
|
||||
from typing import Optional
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
|
||||
def parse_relative_expression(date_str: str) -> Optional[datetime]:
|
||||
"""
|
||||
Parse relative date expressions using T (days), M (months), and Y (years)
|
||||
Returns tuple of (datetime, type_description) or None if not a relative date
|
||||
"""
|
||||
pattern = r"^([HTMY])([+-]\d+)?$"
|
||||
match = re.match(pattern, date_str)
|
||||
|
||||
if not match:
|
||||
return None
|
||||
|
||||
unit, offset = match.groups()
|
||||
offset = int(offset) if offset else 0
|
||||
# Use UTC timezone for consistency
|
||||
today = datetime.now(timezone.tzname("Asia/Jakarta"))
|
||||
if unit == "H":
|
||||
# For hours, keep minutes and seconds
|
||||
result_time = today + timedelta(hours=offset)
|
||||
return result_time
|
||||
elif unit == "T":
|
||||
return today + timedelta(days=offset)
|
||||
elif unit == "M":
|
||||
return today + relativedelta(months=offset)
|
||||
elif unit == "Y":
|
||||
return today + relativedelta(years=offset)
|
||||
|
||||
|
||||
def parse_date_string(date_str: str) -> Optional[datetime]:
|
||||
"""
|
||||
Parse date strings in various formats including relative expressions
|
||||
Returns tuple of (datetime, type)
|
||||
"""
|
||||
# Try parsing as relative expression first
|
||||
relative_result = parse_relative_expression(date_str)
|
||||
if relative_result:
|
||||
return relative_result
|
||||
|
||||
# Try different date formats
|
||||
date_formats = [
|
||||
("%Y-%m-%d", "iso"), # 2024-11-08
|
||||
("%Y/%m/%d", "slash"), # 2024/11/08
|
||||
("%d-%m-%Y", "european"), # 08-11-2024
|
||||
("%d/%m/%Y", "european_slash"), # 08/11/2024
|
||||
("%Y.%m.%d", "dot"), # 2024.11.08
|
||||
("%d.%m.%Y", "european_dot"), # 08.11.2024
|
||||
]
|
||||
|
||||
for fmt, type_name in date_formats:
|
||||
try:
|
||||
# Parse the date and set it to start of day in UTC
|
||||
dt = datetime.strptime(date_str, fmt)
|
||||
dt = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc
|
||||
)
|
||||
return dt
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
raise ValueError(
|
||||
"Invalid date format. Supported formats:\n"
|
||||
"Relative formats:\n"
|
||||
"- T (days): T, T-n, T+n\n"
|
||||
"- M (months): M, M-1, M+2\n"
|
||||
"- Y (years): Y, Y-1, Y+1\n"
|
||||
"Regular formats:\n"
|
||||
"- YYYY-MM-DD\n"
|
||||
"- YYYY/MM/DD\n"
|
||||
"- DD-MM-YYYY\n"
|
||||
"- DD/MM/YYYY\n"
|
||||
"- YYYY.MM.DD\n"
|
||||
"- DD.MM.YYYY"
|
||||
)
|
||||
Loading…
Reference in New Issue