You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
211 lines
6.8 KiB
Python
211 lines
6.8 KiB
Python
import json
|
|
import os
|
|
from typing import Dict, List, Union, Any
|
|
|
|
def load_and_merge_models(main_file: str, models_folder: str = ".") -> Dict[str, Any]:
|
|
"""
|
|
Recursively loads and merges JSON models, replacing file references with actual content.
|
|
|
|
Args:
|
|
main_file: Name of the main JSON file (e.g., "- TJB - Unit 3 -")
|
|
models_folder: Folder containing the JSON files (default: current directory)
|
|
|
|
Returns:
|
|
Merged dictionary with all file references resolved
|
|
"""
|
|
|
|
def load_json_file(filename: str) -> Dict[str, Any]:
|
|
"""Load a JSON file and return its content."""
|
|
filepath = os.path.join(models_folder, f"{filename}.json")
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"JSON file not found: {filepath}")
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in file {filepath}: {e}")
|
|
|
|
def resolve_references(data: Any, visited: set = None) -> Any:
|
|
"""
|
|
Recursively resolve file references in the data structure.
|
|
|
|
Args:
|
|
data: The data to process (can be dict, list, or primitive)
|
|
visited: Set of visited files to prevent circular references
|
|
|
|
Returns:
|
|
Data with all file references resolved
|
|
"""
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
if isinstance(data, dict):
|
|
# Process dictionary recursively
|
|
result = {}
|
|
for key, value in data.items():
|
|
result[key] = resolve_references(value, visited)
|
|
return result
|
|
|
|
elif isinstance(data, list):
|
|
# Process each item in the list
|
|
result = []
|
|
for item in data:
|
|
if isinstance(item, str):
|
|
# Check if this string references a JSON file
|
|
json_filepath = os.path.join(models_folder, f"{item}.json")
|
|
|
|
if os.path.exists(json_filepath):
|
|
# This is a file reference
|
|
if item in visited:
|
|
raise ValueError(f"Circular reference detected: {item}")
|
|
|
|
visited.add(item)
|
|
file_content = load_json_file(item)
|
|
resolved_content = resolve_references(file_content, visited.copy())
|
|
result.append(resolved_content)
|
|
visited.remove(item)
|
|
else:
|
|
# This is a literal equipment tag
|
|
result.append(item)
|
|
else:
|
|
# Recursively process non-string items
|
|
result.append(resolve_references(item, visited))
|
|
return result
|
|
|
|
else:
|
|
# Return primitive values as-is
|
|
return data
|
|
|
|
# Load the main file
|
|
main_content = load_json_file(main_file)
|
|
|
|
# Resolve all references
|
|
merged_result = resolve_references(main_content)
|
|
|
|
return merged_result
|
|
|
|
def save_merged_model(merged_data: Dict[str, Any], output_file: str = "result.json") -> None:
|
|
"""
|
|
Save the merged model to a JSON file.
|
|
|
|
Args:
|
|
merged_data: The merged data dictionary
|
|
output_file: Output filename (default: "result.json")
|
|
"""
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(merged_data, f, indent=2, ensure_ascii=False)
|
|
print(f"Merged model saved to: {output_file}")
|
|
|
|
def print_structure(data: Any, indent: int = 0) -> None:
|
|
"""
|
|
Print the structure of the merged model in a readable format.
|
|
|
|
Args:
|
|
data: The data to print
|
|
indent: Current indentation level
|
|
"""
|
|
spaces = " " * indent
|
|
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
print(f"{spaces}{key}:")
|
|
print_structure(value, indent + 1)
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
if isinstance(item, str):
|
|
print(f"{spaces}[{i}] {item}")
|
|
else:
|
|
print(f"{spaces}[{i}]:")
|
|
print_structure(item, indent + 1)
|
|
else:
|
|
print(f"{spaces}{data}")
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
try:
|
|
# Set your main file name here
|
|
main_file = "- TJB - Unit 3 -"
|
|
|
|
# Set the folder containing your JSON files (default: current directory)
|
|
models_folder = "RBD Model"
|
|
|
|
print(f"Loading and merging models starting from: {main_file}.json")
|
|
print(f"Looking for JSON files in: {os.path.abspath(models_folder)}")
|
|
print("-" * 50)
|
|
|
|
# Load and merge all models
|
|
merged_model = load_and_merge_models(main_file, models_folder)
|
|
|
|
# Print the structure
|
|
print("Merged model structure:")
|
|
print_structure(merged_model)
|
|
print("-" * 50)
|
|
|
|
# Save to result.json
|
|
save_merged_model(merged_model, "result.json")
|
|
|
|
# Also save a pretty-printed version for inspection
|
|
print("\nMerged model JSON:")
|
|
print(json.dumps(merged_model, indent=2, ensure_ascii=False))
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
print("\nMake sure your JSON files are in the correct format and the main file exists.")
|
|
|
|
# Additional utility functions
|
|
def validate_model_structure(data: Any) -> bool:
|
|
"""
|
|
Validate that the model structure follows the expected format.
|
|
|
|
Returns:
|
|
True if valid, False otherwise
|
|
"""
|
|
if not isinstance(data, dict):
|
|
return False
|
|
|
|
valid_keys = {"series", "parallel", "parallel_no_redundancy"}
|
|
|
|
if len(data) != 1:
|
|
return False
|
|
|
|
key = next(iter(data.keys()))
|
|
if key not in valid_keys:
|
|
return False
|
|
|
|
value = data[key]
|
|
if not isinstance(value, list):
|
|
return False
|
|
|
|
# Recursively validate nested structures
|
|
for item in value:
|
|
if isinstance(item, dict):
|
|
if not validate_model_structure(item):
|
|
return False
|
|
elif not isinstance(item, str):
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_all_equipment_tags(data: Any) -> List[str]:
|
|
"""
|
|
Extract all equipment tags from the merged model.
|
|
|
|
Args:
|
|
data: The merged model data
|
|
|
|
Returns:
|
|
List of all equipment tag strings found in the model
|
|
"""
|
|
tags = []
|
|
|
|
if isinstance(data, dict):
|
|
for value in data.values():
|
|
tags.extend(get_all_equipment_tags(value))
|
|
elif isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, str):
|
|
tags.append(item)
|
|
else:
|
|
tags.extend(get_all_equipment_tags(item))
|
|
|
|
return tags |