You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
11 KiB
Python

from collections import defaultdict
import json
import logging
import os
import tempfile
from datetime import datetime
from decimal import Decimal
from typing import Optional
from uuid import UUID
import httpx
import ijson
from fastapi import HTTPException, status
from src.aeros_simulation.model import AerosSimulationCalcResult, AerosSimulationPlotResult
from src.aeros_simulation.service import get_all_aeros_node, get_or_save_node, get_plant_calc_result, get_simulation_by_id, get_simulation_with_plot_result
from src.aeros_simulation.utils import calculate_eaf, calculate_eaf_konkin
from src.config import AEROS_BASE_URL
from src.database.core import DbSession
from src.logging import setup_logging
log = logging.getLogger(__name__)
setup_logging(logger=log)
async def execute_simulation(
*, db_session: DbSession, simulation_id: Optional[UUID] = None,
sim_data: dict, is_saved: bool = False, eq_update: dict = None
):
"""Execute the actual simulation call"""
eq_update = eq_update or {}
log.info("Executing simulation with id=%s, schematic=%s", simulation_id, sim_data.get("SchematicName"))
tmpfile = os.path.join(tempfile.gettempdir(), f"simulation_{simulation_id}.json")
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{AEROS_BASE_URL}/api/Simulation/RunSimulation",
json=sim_data,
headers={"Content-Type": "application/json"},
) as response:
response.raise_for_status()
with open(tmpfile, "wb") as f:
async for chunk in response.aiter_bytes():
f.write(chunk)
if not is_saved:
# If not saving to DB, just return parsed JSON
with open(tmpfile, "r") as f:
return json.load(f)
# Update simulation status
simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
simulation.status = "running"
await db_session.commit()
await process_large_json_streaming(
db_session=db_session,
file_path=tmpfile,
simulation_id=simulation.id,
eq_update=eq_update,
schematic_name=sim_data["SchematicName"],
)
simulation.status = "completed"
simulation.completed_at = datetime.now()
await db_session.commit()
log.info("Simulation result saved for simulation id: %s", simulation.id)
return True
except Exception as e:
if simulation_id:
simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
simulation.status = "failed"
simulation.error = str(e)
await db_session.commit()
log.error("Simulation failed: %s", str(e))
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
finally:
if os.path.exists(tmpfile):
os.remove(tmpfile)
async def process_large_json_streaming(
*, db_session: DbSession, file_path: str, simulation_id, eq_update, schematic_name
):
"""Stream JSON parsing from disk with minimal memory usage."""
batch_size = 200 # increase batch size for efficiency
plot_batch, calc_batch = [], []
def convert_item(item):
if isinstance(item, dict):
return {k: convert_item(v) for k, v in item.items()}
if isinstance(item, list):
return [convert_item(i) for i in item]
if isinstance(item, Decimal):
return float(item)
return item
available_nodes = {
f"{node.node_type}:{node.node_name}": node
for node in await get_all_aeros_node(db_session=db_session, schematic_name=schematic_name)
}
availabilities = defaultdict(float)
try:
with open(file_path, "r") as file:
# Process 'plotNodeOuts'
log.info("Processing plot array...")
for plot_item in ijson.items(file, "plotNodeOuts.item"):
item = convert_item(plot_item)
plot_obj = await create_plot_result_object(simulation_id, item, available_nodes, eq_update, db_session)
if plot_obj:
plot_batch.append(plot_obj)
if len(plot_batch) >= batch_size:
db_session.add_all(plot_batch)
await db_session.commit()
plot_batch.clear()
# Reset file pointer
file.seek(0)
# Process 'nodeResultOuts'
log.info("Processing calculation array...")
for calc_item in ijson.items(file, "nodeResultOuts.item"):
item = convert_item(calc_item)
calc_obj = await create_calc_result_object(simulation_id, item, available_nodes, eq_update, db_session)
if calc_obj:
calc_batch.append(calc_obj)
if len(calc_batch) >= batch_size:
db_session.add_all(calc_batch)
await db_session.commit()
calc_batch.clear()
# Final flush
if plot_batch:
db_session.add_all(plot_batch)
await db_session.commit()
if calc_batch:
db_session.add_all(calc_batch)
await db_session.commit()
except Exception as e:
log.error("Error processing JSON stream: %s", str(e))
raise
async def create_plot_result_object(
simulation_id, result, available_nodes, eq_update, db_session
):
node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode"
node = available_nodes.get(f"{node_type}:{result['nodeName']}")
if not node:
if result["nodeType"] not in ["RegularNode", "Schematic"]:
return None
node = await get_or_save_node(
db_session=db_session, node_data=result, type="plot"
)
available_nodes[f"{node_type}:{result['nodeName']}"] = node
return AerosSimulationPlotResult(
aeros_simulation_id=simulation_id,
aeros_node_id=node.id,
max_flow_rate=float(result["maxFlowrate"]),
storage_capacity=float(result["storageCapacity"]),
point_availabilities=result["pointAvailabilities"],
point_flowrates=result["pointFlowrates"],
timestamp_outs=result["timeStampOuts"],
)
async def create_calc_result_object(
simulation_id, result, available_nodes, eq_update, db_session
) :
"""Create a single calc result object."""
node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode"
node = available_nodes.get(f"{node_type}:{result['nodeName']}")
if not node:
if result["nodeType"] not in ["RegularNode", "Schematic"]:
return None
node = await get_or_save_node(db_session=db_session, node_data=result, type="calc")
# Add to available_nodes for future use
available_nodes[f"{node_type}:{result['nodeName']}"] = node
eq_reliability = eq_update.get(result["nodeName"], {
"eta": 0, "beta": 0, "mttr": 0, "parameters": {}
})
# eaf, derating_hours = calculate_eaf(
# available_hours=result["totalUpTime"],
# period_hours=result["totalUpTime"] + result["totalDowntime"],
# actual_production=result["production"],
# ideal_production=result["idealProduction"],
# downtime_hours=result["totalDowntime"]
# )
efor = (result["totalDowntime"] / (result["totalDowntime"] + result["totalUpTime"])) * 100 if (result["totalDowntime"] + result["totalUpTime"]) > 0 else 0
return AerosSimulationCalcResult(
aeros_simulation_id=simulation_id,
aeros_node_id=node.id,
total_downtime=result["totalDowntime"],
total_uptime=result["totalUpTime"],
num_events=result["numEvents"],
production=result["production"],
production_std=result["productionStd"],
ideal_production=result["idealProduction"],
availability=result["availability"],
efficiency=result["efficiency"],
effective_loss=result["effectiveLoss"],
num_cm=result["numCM"],
cm_waiting_time=result["cmWaitingTime"],
total_cm_downtime=result["totalCMDowntime"],
num_pm=result["numPM"],
total_pm_downtime=result["totalPMDowntime"],
num_ip=result["numIP"],
total_ip_downtime=result["totalIPDowntime"],
num_oh=result["numOH"],
total_oh_downtime=result["totalOHDowntime"],
t_wait_for_crew=result["tWaitForCrew"],
t_wait_for_spare=result["tWaitForSpare"],
duration_at_full=result["durationAtFull"],
duration_above_hh=result["durationAboveHH"],
duration_above_h=result["durationAboveH"],
duration_below_l=result["durationBelowL"],
duration_below_ll=result["durationBelowLL"],
duration_at_empty=result["durationAtEmpty"],
stg_input=result["stgInput"],
stg_output=result["stgOutput"],
average_level=result["averageLevel"],
potential_production=result["potentialProduction"],
eaf=0,
efor=efor,
derating_hours=0,
beta=eq_reliability["beta"] if node_type == "RegularNode" else None,
eta=eq_reliability["eta"] if node_type == "RegularNode" else None,
mttr=eq_reliability["mttr"] if node_type == "RegularNode" else None,
parameters=eq_reliability["parameters"] if node_type == "RegularNode" else None
)
async def calculate_plant_eaf(
db_session: DbSession, simulation_id: UUID, mo_downtime: int, po_downtime:int, oh_interval:int
):
"""Calculate overall plant EAF from individual node results."""
plant_calc_data = await get_plant_calc_result(
db_session=db_session, simulation_id=simulation_id
)
plant_plot_data = await get_simulation_with_plot_result(
db_session=db_session, simulation_id=simulation_id, node_id="plant"
)
is_oh_from_aeros = (plant_calc_data.total_uptime + plant_calc_data.total_downtime) > oh_interval
seasonal_outage = (mo_downtime*24 + po_downtime*24) if is_oh_from_aeros else mo_downtime*24
forced_outage = (plant_calc_data.total_downtime - po_downtime*24) if is_oh_from_aeros else plant_calc_data.total_downtime
total_period_time = plant_calc_data.total_uptime + seasonal_outage + forced_outage
eaf, efor, sof, edh = calculate_eaf(
available_hours=plant_calc_data.total_uptime,
period_hours=total_period_time,
forced_outage_hours=forced_outage,
seasonal_outage_hours=seasonal_outage,
plot_data=plant_plot_data.timestamp_outs
)
plant_calc_data.total_mo_downtime = mo_downtime
plant_calc_data.total_po_downtime = po_downtime
plant_calc_data.eaf = eaf
plant_calc_data.efor = efor
plant_calc_data.sof = sof
plant_calc_data.derating_hours = edh
await db_session.commit()
return eaf
# async def calculate_eaf_konkin_pnat(
# db_session
# )