You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
448 lines
17 KiB
Python
448 lines
17 KiB
Python
import os
|
|
import psycopg2
|
|
import pandas as pd
|
|
import numpy as np
|
|
from flask import Flask, render_template, jsonify, request
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
from dotenv import load_dotenv
|
|
from telegram_notifier import check_red_status_and_notify
|
|
import traceback
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Base URL configuration from environment variable, default to empty string if not set
|
|
BASE_URL = os.getenv("BASE_URL", "")
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Database configuration from environment variables
|
|
DB_HOST_1 = os.getenv("DB_HOST_1") # For tables: pf_parts, ms_equipment_master
|
|
DB_HOST_2 = os.getenv("DB_HOST_2") # For table: dl_pi_fetch_last
|
|
DB_PORT = os.getenv("DB_PORT")
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASS = os.getenv("DB_PASS")
|
|
DB_NAME = os.getenv("DB_NAME")
|
|
|
|
def chunked_iterable(iterable, size):
|
|
"""Helper to split list into chunks"""
|
|
for i in range(0, len(iterable), size):
|
|
yield iterable[i:i + size]
|
|
|
|
def get_db_connection(table=None, retries=3, delay=2):
|
|
"""Koneksi PostgreSQL dengan retry dan timeout"""
|
|
host = DB_HOST_2 if table == "dl_pi_fetch_last" else DB_HOST_1
|
|
for attempt in range(retries):
|
|
try:
|
|
return psycopg2.connect(
|
|
host=host,
|
|
port=DB_PORT,
|
|
user=DB_USER,
|
|
password=DB_PASS,
|
|
dbname=DB_NAME,
|
|
connect_timeout=5 # timeout agar tidak hang lama
|
|
)
|
|
except Exception as e:
|
|
print(f"DB connection error (attempt {attempt + 1}/{retries}): {e}")
|
|
if attempt < retries - 1:
|
|
time.sleep(delay)
|
|
else:
|
|
raise
|
|
|
|
def fetch_part_info():
|
|
"""Fetch part information from pf_parts and ms_equipment_master tables"""
|
|
try:
|
|
conn = get_db_connection("pf_parts") # Using DB_HOST_1 (192.168.1.85)
|
|
cursor = conn.cursor()
|
|
|
|
# Query to fetch part information with equipment grouping
|
|
query = """
|
|
SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name
|
|
FROM pf_parts p
|
|
LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id
|
|
ORDER BY e.id, p.id
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert to DataFrame for easier processing
|
|
df = pd.DataFrame(rows, columns=['part_id', 'location_tag', 'part_name', 'equipment_id', 'equipment_name'])
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return df
|
|
except Exception as e:
|
|
print(f"Database error fetching part info: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def fetch_data(days=3):
|
|
"""Fetch data from dl_pi_fetch_last table for the last 'days' days"""
|
|
try:
|
|
conn = get_db_connection("dl_pi_fetch_last") # Using DB_HOST_2 (192.168.1.86)
|
|
cursor = conn.cursor()
|
|
|
|
# Calculate the date range
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# Query to fetch data for the last 'days' days
|
|
query = """
|
|
SELECT part_id, value, created_at
|
|
FROM dl_pi_fetch_last
|
|
WHERE created_at >= %s AND created_at <= %s
|
|
ORDER BY part_id, created_at
|
|
"""
|
|
|
|
cursor.execute(query, (start_date, end_date))
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert to DataFrame for easier analysis
|
|
df = pd.DataFrame(rows, columns=['part_id', 'value', 'created_at'])
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return df
|
|
except Exception as e:
|
|
print(f"Database error: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def analyze_data(df, part_info_df):
|
|
"""
|
|
Analyze data based on the three criteria:
|
|
1. Same value for 3 consecutive days (error)
|
|
2. Value is 0 or null (error)
|
|
3. Significant increase/decrease compared to previous 3 days (warning)
|
|
|
|
Also adds part information (label, equipment_id, equipment_name) to the results
|
|
"""
|
|
if df.empty:
|
|
return []
|
|
|
|
results = []
|
|
|
|
# Group by part_id
|
|
for part_id, group in df.groupby('part_id'):
|
|
# Sort by timestamp
|
|
group = group.sort_values('created_at')
|
|
|
|
# Initialize status
|
|
status = "green"
|
|
messages = []
|
|
|
|
# Check for null or zero values (Criterion 2)
|
|
if group['value'].isnull().any() or (group['value'] == 0).any():
|
|
status = "red"
|
|
messages.append("Error: Contains null or zero values")
|
|
|
|
# Check for same value for 3 consecutive days (Criterion 1)
|
|
# Group by date and check for consecutive identical values
|
|
group['date'] = group['created_at'].dt.date
|
|
daily_values = group.groupby('date')['value'].mean().reset_index()
|
|
|
|
if len(daily_values) >= 3:
|
|
for i in range(len(daily_values) - 2):
|
|
if (daily_values['value'].iloc[i] == daily_values['value'].iloc[i+1] == daily_values['value'].iloc[i+2]):
|
|
status = "red"
|
|
messages.append(f"Error: Same value ({daily_values['value'].iloc[i]}) for 3 consecutive days")
|
|
break
|
|
|
|
# Check for significant changes (Criterion 3)
|
|
if len(daily_values) >= 4:
|
|
for i in range(3, len(daily_values)):
|
|
current_value = daily_values['value'].iloc[i]
|
|
prev_values = daily_values['value'].iloc[i-3:i]
|
|
avg_prev = prev_values.mean()
|
|
|
|
# Define threshold for significant change (e.g., 30%)
|
|
threshold = 0.3
|
|
|
|
if avg_prev != 0: # Avoid division by zero
|
|
change_ratio = abs((current_value - avg_prev) / avg_prev)
|
|
|
|
if change_ratio > threshold:
|
|
if status != "red": # Don't override error status
|
|
status = "yellow"
|
|
messages.append(f"Warning: Significant change detected ({change_ratio:.2%} change from previous average)")
|
|
|
|
# Get part information
|
|
part_info = part_info_df[part_info_df['part_id'] == part_id]
|
|
|
|
# Create label and get equipment information
|
|
label = ""
|
|
equipment_id = None
|
|
equipment_name = None
|
|
|
|
if not part_info.empty:
|
|
location_tag = part_info['location_tag'].iloc[0] if not part_info['location_tag'].isnull().iloc[0] else ""
|
|
part_name = part_info['part_name'].iloc[0] if not part_info['part_name'].isnull().iloc[0] else ""
|
|
label = f"{location_tag} | {part_name}".strip()
|
|
equipment_id = part_info['equipment_id'].iloc[0] if not part_info['equipment_id'].isnull().iloc[0] else None
|
|
equipment_name = part_info['equipment_name'].iloc[0] if not part_info['equipment_name'].isnull().iloc[0] else None
|
|
|
|
# Add to results
|
|
results.append({
|
|
'part_id': part_id,
|
|
'label': label,
|
|
'equipment_id': equipment_id,
|
|
'equipment_name': equipment_name,
|
|
'status': status,
|
|
'messages': messages,
|
|
'latest_value': group['value'].iloc[-1] if not group.empty else None,
|
|
'latest_created_at': group['created_at'].iloc[-1] if not group.empty else None
|
|
})
|
|
|
|
# Sort results by equipment_id, then part_id
|
|
results = sorted(results, key=lambda x: (x['equipment_id'] or "", x['part_id']))
|
|
|
|
return results
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Render the main monitoring dashboard"""
|
|
return render_template('index.html', base_url=BASE_URL)
|
|
|
|
def fetch_part_history(part_id, days=14):
|
|
"""Fetch historical data for a specific part_id for the last 'days' days"""
|
|
try:
|
|
# First connection for dl_pi_fetch_last table (DB_HOST_2)
|
|
conn_data = get_db_connection("dl_pi_fetch_last")
|
|
cursor_data = conn_data.cursor()
|
|
|
|
# Calculate the date range
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# Query to fetch data for the specific part_id for the last 'days' days
|
|
query = """
|
|
SELECT part_id, value, created_at
|
|
FROM dl_pi_fetch_last
|
|
WHERE part_id = %s AND created_at >= %s AND created_at <= %s
|
|
ORDER BY created_at
|
|
"""
|
|
|
|
cursor_data.execute(query, (part_id, start_date, end_date))
|
|
rows = cursor_data.fetchall()
|
|
|
|
# Second connection for pf_parts and ms_equipment_master tables (DB_HOST_1)
|
|
conn_parts = get_db_connection("pf_parts")
|
|
cursor_parts = conn_parts.cursor()
|
|
|
|
# Fetch part information
|
|
part_info_query = """
|
|
SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name
|
|
FROM pf_parts p
|
|
LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id
|
|
WHERE p.id = %s
|
|
"""
|
|
|
|
cursor_parts.execute(part_info_query, (part_id,))
|
|
part_info_row = cursor_parts.fetchone()
|
|
|
|
# Create part info dictionary
|
|
part_info = {
|
|
'part_id': part_id,
|
|
'location_tag': part_info_row[1] if part_info_row and part_info_row[1] else "",
|
|
'part_name': part_info_row[2] if part_info_row and part_info_row[2] else "",
|
|
'equipment_id': part_info_row[3] if part_info_row and part_info_row[3] else None,
|
|
'equipment_name': part_info_row[4] if part_info_row and part_info_row[4] else None
|
|
}
|
|
|
|
# Create label
|
|
label = f"{part_info['location_tag']} {part_info['part_name']}".strip()
|
|
|
|
# Convert to list of dictionaries for JSON serialization
|
|
result = []
|
|
for row in rows:
|
|
result.append({
|
|
'part_id': row[0],
|
|
'label': label,
|
|
'value': row[1],
|
|
'created_at': row[2].isoformat() if row[2] else None
|
|
})
|
|
|
|
# Close both connections
|
|
cursor_data.close()
|
|
conn_data.close()
|
|
cursor_parts.close()
|
|
conn_parts.close()
|
|
|
|
return {
|
|
'part_info': part_info,
|
|
'label': label,
|
|
'history': result
|
|
}
|
|
except Exception as e:
|
|
print(f"Database error: {e}")
|
|
return {
|
|
'part_info': {'part_id': part_id},
|
|
'label': part_id,
|
|
'history': []
|
|
}
|
|
|
|
@app.route('/api/data')
|
|
def get_data():
|
|
"""API endpoint to fetch data from _log_pi_status_data table with date filter"""
|
|
# Get date parameter from request, default to today's date
|
|
filter_date_str = request.args.get('date')
|
|
|
|
try:
|
|
if filter_date_str:
|
|
# Parse the date string from frontend (format: YYYY-MM-DD)
|
|
filter_date = datetime.strptime(filter_date_str, '%Y-%m-%d').date()
|
|
else:
|
|
# Default to today's date
|
|
filter_date = datetime.now().date()
|
|
|
|
# Connect to database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Query to fetch data from _log_pi_status_data for the specified date
|
|
query = """
|
|
SELECT timestamp, part_id, label, equipment_id, equipment_name, status,
|
|
messages, latest_value, latest_created_at
|
|
FROM _log_pi_status_data
|
|
WHERE DATE(timestamp) = %s
|
|
ORDER BY equipment_id, part_id
|
|
"""
|
|
|
|
cursor.execute(query, (filter_date,))
|
|
rows = cursor.fetchall()
|
|
|
|
# Process the results
|
|
results = []
|
|
for row in rows:
|
|
timestamp, part_id, label, equipment_id, equipment_name, status, messages_str, latest_value, latest_created_at = row
|
|
|
|
# Convert messages string back to list
|
|
messages = messages_str.split('; ') if messages_str else []
|
|
|
|
results.append({
|
|
'part_id': part_id,
|
|
'label': label,
|
|
'equipment_id': equipment_id,
|
|
'equipment_name': equipment_name,
|
|
'status': status,
|
|
'messages': messages,
|
|
'latest_value': latest_value,
|
|
'latest_created_at': latest_created_at
|
|
})
|
|
|
|
# Close database connection
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# Group results by equipment_id for frontend display
|
|
grouped_results = {}
|
|
for item in results:
|
|
equipment_id = item['equipment_id'] or 'unknown'
|
|
equipment_name = item['equipment_name'] or 'Unknown Equipment'
|
|
|
|
if equipment_id not in grouped_results:
|
|
grouped_results[equipment_id] = {
|
|
'equipment_id': equipment_id,
|
|
'equipment_name': equipment_name,
|
|
'parts': []
|
|
}
|
|
|
|
grouped_results[equipment_id]['parts'].append(item)
|
|
|
|
# Convert to list for JSON serialization
|
|
final_results = list(grouped_results.values())
|
|
|
|
return jsonify(final_results)
|
|
|
|
except Exception as e:
|
|
print(f"Error in get_data: {e}")
|
|
return jsonify([]), 500
|
|
|
|
|
|
@app.route('/check-data')
|
|
def checkdata():
|
|
"""API endpoint to fetch and analyze data"""
|
|
df = fetch_data(days=10) # Fetch data for the last 3 days
|
|
part_info_df = fetch_part_info() # Fetch part information
|
|
results = analyze_data(df, part_info_df)
|
|
|
|
# Telegram notification jika ada status merah
|
|
base_url = request.host_url.rstrip('/')
|
|
check_red_status_and_notify(results, "8201929832:AAFhDu7LD4xbNyDQ9Cc2JSuTDMhqrLaDDdc", "-1002721738007", base_url)
|
|
|
|
# Logging ke database
|
|
conn = None
|
|
cursor = None
|
|
try:
|
|
conn = get_db_connection() # koneksi ke 192.168.1.85
|
|
cursor = conn.cursor()
|
|
current_timestamp = datetime.now()
|
|
|
|
# Delete records from today before inserting new ones
|
|
today_date = current_timestamp.date()
|
|
delete_query = """
|
|
DELETE FROM _log_pi_status_data
|
|
WHERE DATE(latest_created_at) = %s
|
|
"""
|
|
cursor.execute(delete_query, (today_date,))
|
|
deleted_count = cursor.rowcount
|
|
conn.commit()
|
|
print(f"Deleted {deleted_count} records from _log_pi_status_data with latest_created_at={today_date}")
|
|
|
|
data_to_insert = []
|
|
for result in results:
|
|
# Only insert records with status other than "green"
|
|
if result['status'] != "green":
|
|
messages_str = '; '.join(result['messages']) if result['messages'] else None
|
|
latest_created_at = result['latest_created_at']
|
|
data_to_insert.append((
|
|
current_timestamp,
|
|
result['part_id'],
|
|
result['label'],
|
|
result['equipment_id'],
|
|
result['equipment_name'],
|
|
result['status'],
|
|
messages_str,
|
|
result['latest_value'],
|
|
latest_created_at
|
|
))
|
|
|
|
query = """
|
|
INSERT INTO _log_pi_status_data
|
|
(timestamp, part_id, label, equipment_id, equipment_name, status, messages, latest_value, \
|
|
latest_created_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \
|
|
"""
|
|
|
|
batch_size = 500
|
|
total = 0
|
|
for batch in chunked_iterable(data_to_insert, batch_size):
|
|
cursor.executemany(query, batch)
|
|
conn.commit()
|
|
total += len(batch)
|
|
|
|
print(f"Successfully logged {total} results to _log_pi_status_data table")
|
|
except Exception as e:
|
|
print(f"Error logging results to database: {e}")
|
|
traceback.print_exc()
|
|
finally:
|
|
if cursor: cursor.close()
|
|
if conn: conn.close()
|
|
|
|
return jsonify(results)
|
|
|
|
@app.route('/api/part-history')
|
|
def get_part_history():
|
|
"""API endpoint to fetch historical data for a specific part_id"""
|
|
part_id = request.args.get('part_id')
|
|
if not part_id:
|
|
return jsonify({'error': 'part_id parameter is required'}), 400
|
|
|
|
days = request.args.get('days', 14, type=int)
|
|
result = fetch_part_history(part_id, days)
|
|
return jsonify(result)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5003, debug=True) |