add file read for big json
parent
e7cb0a0e76
commit
7b5d9f55e6
@ -0,0 +1,242 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from typing import Optional
|
||||
from uuid import UUID
|
||||
|
||||
import httpx
|
||||
import ijson
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from src.aeros_simulation.model import AerosSimulationCalcResult, AerosSimulationPlotResult
|
||||
from src.aeros_simulation.service import get_all_aeros_node, get_or_save_node, get_simulation_by_id
|
||||
from src.aeros_simulation.utils import calculate_eaf
|
||||
from src.config import AEROS_BASE_URL
|
||||
from src.database.core import DbSession
|
||||
from src.logging import setup_logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
setup_logging(logger=log)
|
||||
|
||||
|
||||
async def execute_simulation(
|
||||
*, db_session: DbSession, simulation_id: Optional[UUID] = None,
|
||||
sim_data: dict, is_saved: bool = False, eq_update: dict = None
|
||||
):
|
||||
"""Execute the actual simulation call"""
|
||||
eq_update = eq_update or {}
|
||||
log.info("Executing simulation with id=%s, schematic=%s", simulation_id, sim_data.get("SchematicName"))
|
||||
|
||||
tmpfile = os.path.join(tempfile.gettempdir(), f"simulation_{simulation_id}.json")
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=300.0) as client:
|
||||
async with client.stream(
|
||||
"POST",
|
||||
f"{AEROS_BASE_URL}/api/Simulation/RunSimulation",
|
||||
json=sim_data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
with open(tmpfile, "wb") as f:
|
||||
async for chunk in response.aiter_bytes():
|
||||
f.write(chunk)
|
||||
|
||||
if not is_saved:
|
||||
# If not saving to DB, just return parsed JSON
|
||||
with open(tmpfile, "r") as f:
|
||||
return json.load(f)
|
||||
|
||||
# Update simulation status
|
||||
simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
|
||||
simulation.status = "processing"
|
||||
await db_session.commit()
|
||||
|
||||
await process_large_json_streaming(
|
||||
db_session=db_session,
|
||||
file_path=tmpfile,
|
||||
simulation_id=simulation.id,
|
||||
eq_update=eq_update,
|
||||
schematic_name=sim_data["SchematicName"],
|
||||
)
|
||||
|
||||
simulation.status = "completed"
|
||||
simulation.completed_at = datetime.now()
|
||||
await db_session.commit()
|
||||
log.info("Simulation result saved for simulation id: %s", simulation.id)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if simulation_id:
|
||||
simulation = await get_simulation_by_id(db_session=db_session, simulation_id=simulation_id)
|
||||
simulation.status = "failed"
|
||||
simulation.error = str(e)
|
||||
await db_session.commit()
|
||||
log.error("Simulation failed: %s", str(e))
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) from e
|
||||
finally:
|
||||
if os.path.exists(tmpfile):
|
||||
os.remove(tmpfile)
|
||||
|
||||
|
||||
async def process_large_json_streaming(
|
||||
*, db_session: DbSession, file_path: str, simulation_id, eq_update, schematic_name
|
||||
):
|
||||
"""Stream JSON parsing from disk with minimal memory usage."""
|
||||
batch_size = 200 # increase batch size for efficiency
|
||||
plot_batch, calc_batch = [], []
|
||||
|
||||
def convert_item(item):
|
||||
if isinstance(item, dict):
|
||||
return {k: convert_item(v) for k, v in item.items()}
|
||||
if isinstance(item, list):
|
||||
return [convert_item(i) for i in item]
|
||||
if isinstance(item, Decimal):
|
||||
return float(item)
|
||||
return item
|
||||
|
||||
available_nodes = {
|
||||
f"{node.node_type}:{node.node_name}": node
|
||||
for node in await get_all_aeros_node(db_session=db_session, schematic_name=schematic_name)
|
||||
}
|
||||
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
# Process 'plotNodeOuts'
|
||||
log.info("Processing plot array...")
|
||||
for plot_item in ijson.items(file, "plotNodeOuts.item"):
|
||||
item = convert_item(plot_item)
|
||||
plot_obj = await create_plot_result_object(simulation_id, item, available_nodes, eq_update, db_session)
|
||||
if plot_obj:
|
||||
plot_batch.append(plot_obj)
|
||||
|
||||
if len(plot_batch) >= batch_size:
|
||||
db_session.add_all(plot_batch)
|
||||
await db_session.commit()
|
||||
plot_batch.clear()
|
||||
|
||||
# Reset file pointer
|
||||
file.seek(0)
|
||||
|
||||
# Process 'nodeResultOuts'
|
||||
log.info("Processing calculation array...")
|
||||
for calc_item in ijson.items(file, "nodeResultOuts.item"):
|
||||
item = convert_item(calc_item)
|
||||
calc_obj = await create_calc_result_object(simulation_id, item, available_nodes, eq_update, db_session)
|
||||
if calc_obj:
|
||||
calc_batch.append(calc_obj)
|
||||
|
||||
if len(calc_batch) >= batch_size:
|
||||
db_session.add_all(calc_batch)
|
||||
await db_session.commit()
|
||||
calc_batch.clear()
|
||||
|
||||
# Final flush
|
||||
if plot_batch:
|
||||
db_session.add_all(plot_batch)
|
||||
await db_session.commit()
|
||||
if calc_batch:
|
||||
db_session.add_all(calc_batch)
|
||||
await db_session.commit()
|
||||
|
||||
except Exception as e:
|
||||
log.error("Error processing JSON stream: %s", str(e))
|
||||
raise
|
||||
|
||||
|
||||
async def create_plot_result_object(
|
||||
simulation_id, result, available_nodes, eq_update, db_session
|
||||
):
|
||||
node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode"
|
||||
node = available_nodes.get(f"{node_type}:{result['nodeName']}")
|
||||
if not node:
|
||||
if result["nodeType"] not in ["RegularNode", "Schematic"]:
|
||||
return None
|
||||
node = await get_or_save_node(
|
||||
db_session=db_session, node_data=result, type="plot"
|
||||
)
|
||||
available_nodes[f"{node_type}:{result['nodeName']}"] = node
|
||||
|
||||
|
||||
return AerosSimulationPlotResult(
|
||||
aeros_simulation_id=simulation_id,
|
||||
aeros_node_id=node.id,
|
||||
max_flow_rate=float(result["maxFlowrate"]),
|
||||
storage_capacity=float(result["storageCapacity"]),
|
||||
point_availabilities=result["pointAvailabilities"],
|
||||
point_flowrates=result["pointFlowrates"],
|
||||
timestamp_outs=result["timeStampOuts"],
|
||||
)
|
||||
|
||||
async def create_calc_result_object(
|
||||
simulation_id, result, available_nodes, eq_update, db_session
|
||||
) :
|
||||
"""Create a single calc result object."""
|
||||
node_type = "RegularNode" if result["nodeType"] == "RegularNode" else "SchematicNode"
|
||||
node = available_nodes.get(f"{node_type}:{result['nodeName']}")
|
||||
|
||||
if not node:
|
||||
if result["nodeType"] not in ["RegularNode", "Schematic"]:
|
||||
return None
|
||||
node = await get_or_save_node(db_session=db_session, node_data=result, type="calc")
|
||||
# Add to available_nodes for future use
|
||||
available_nodes[f"{node_type}:{result['nodeName']}"] = node
|
||||
|
||||
eq_reliability = eq_update.get(result["nodeName"], {
|
||||
"eta": 0, "beta": 0, "mttr": 0, "parameters": {}
|
||||
})
|
||||
|
||||
eaf, derating_hours = calculate_eaf(
|
||||
available_hours=result["totalUpTime"],
|
||||
period_hours=result["totalUpTime"] + result["totalDowntime"],
|
||||
actual_production=result["production"],
|
||||
ideal_production=result["idealProduction"],
|
||||
downtime_hours=result["totalDowntime"]
|
||||
)
|
||||
|
||||
efor = (result["totalDowntime"] / (result["totalDowntime"] + result["totalUpTime"])) * 100 if (result["totalDowntime"] + result["totalUpTime"]) > 0 else 0
|
||||
|
||||
return AerosSimulationCalcResult(
|
||||
aeros_simulation_id=simulation_id,
|
||||
aeros_node_id=node.id,
|
||||
total_downtime=result["totalDowntime"],
|
||||
total_uptime=result["totalUpTime"],
|
||||
num_events=result["numEvents"],
|
||||
production=result["production"],
|
||||
production_std=result["productionStd"],
|
||||
ideal_production=result["idealProduction"],
|
||||
availability=result["availability"],
|
||||
efficiency=result["efficiency"],
|
||||
effective_loss=result["effectiveLoss"],
|
||||
num_cm=result["numCM"],
|
||||
cm_waiting_time=result["cmWaitingTime"],
|
||||
total_cm_downtime=result["totalCMDowntime"],
|
||||
num_pm=result["numPM"],
|
||||
total_pm_downtime=result["totalPMDowntime"],
|
||||
num_ip=result["numIP"],
|
||||
total_ip_downtime=result["totalIPDowntime"],
|
||||
num_oh=result["numOH"],
|
||||
total_oh_downtime=result["totalOHDowntime"],
|
||||
t_wait_for_crew=result["tWaitForCrew"],
|
||||
t_wait_for_spare=result["tWaitForSpare"],
|
||||
duration_at_full=result["durationAtFull"],
|
||||
duration_above_hh=result["durationAboveHH"],
|
||||
duration_above_h=result["durationAboveH"],
|
||||
duration_below_l=result["durationBelowL"],
|
||||
duration_below_ll=result["durationBelowLL"],
|
||||
duration_at_empty=result["durationAtEmpty"],
|
||||
stg_input=result["stgInput"],
|
||||
stg_output=result["stgOutput"],
|
||||
average_level=result["averageLevel"],
|
||||
potential_production=result["potentialProduction"],
|
||||
eaf=eaf,
|
||||
efor=efor,
|
||||
derating_hours=derating_hours,
|
||||
beta=eq_reliability["beta"] if node_type == "RegularNode" else None,
|
||||
eta=eq_reliability["eta"] if node_type == "RegularNode" else None,
|
||||
mttr=eq_reliability["mttr"] if node_type == "RegularNode" else None,
|
||||
parameters=eq_reliability["parameters"] if node_type == "RegularNode" else None
|
||||
)
|
||||
Loading…
Reference in New Issue