You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

448 lines
17 KiB
Python

import os
import psycopg2
import pandas as pd
import numpy as np
from flask import Flask, render_template, jsonify, request
from datetime import datetime, timedelta
import time
from dotenv import load_dotenv
from telegram_notifier import check_red_status_and_notify
import traceback
# Load environment variables from .env file
load_dotenv()
# Base URL configuration from environment variable, default to empty string if not set
BASE_URL = os.getenv("BASE_URL", "")
app = Flask(__name__)
# Database configuration from environment variables
DB_HOST_1 = os.getenv("DB_HOST_1") # For tables: pf_parts, ms_equipment_master
DB_HOST_2 = os.getenv("DB_HOST_2") # For table: dl_pi_fetch_last
DB_PORT = os.getenv("DB_PORT")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_NAME = os.getenv("DB_NAME")
def chunked_iterable(iterable, size):
"""Helper to split list into chunks"""
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
def get_db_connection(table=None, retries=3, delay=2):
"""Koneksi PostgreSQL dengan retry dan timeout"""
host = DB_HOST_2 if table == "dl_pi_fetch_last" else DB_HOST_1
for attempt in range(retries):
try:
return psycopg2.connect(
host=host,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
dbname=DB_NAME,
connect_timeout=5 # timeout agar tidak hang lama
)
except Exception as e:
print(f"DB connection error (attempt {attempt + 1}/{retries}): {e}")
if attempt < retries - 1:
time.sleep(delay)
else:
raise
def fetch_part_info():
"""Fetch part information from pf_parts and ms_equipment_master tables"""
try:
conn = get_db_connection("pf_parts") # Using DB_HOST_1 (192.168.1.85)
cursor = conn.cursor()
# Query to fetch part information with equipment grouping
query = """
SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name
FROM pf_parts p
LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id
ORDER BY e.id, p.id
"""
cursor.execute(query)
rows = cursor.fetchall()
# Convert to DataFrame for easier processing
df = pd.DataFrame(rows, columns=['part_id', 'location_tag', 'part_name', 'equipment_id', 'equipment_name'])
cursor.close()
conn.close()
return df
except Exception as e:
print(f"Database error fetching part info: {e}")
return pd.DataFrame()
def fetch_data(days=3):
"""Fetch data from dl_pi_fetch_last table for the last 'days' days"""
try:
conn = get_db_connection("dl_pi_fetch_last") # Using DB_HOST_2 (192.168.1.86)
cursor = conn.cursor()
# Calculate the date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Query to fetch data for the last 'days' days
query = """
SELECT part_id, value, created_at
FROM dl_pi_fetch_last
WHERE created_at >= %s AND created_at <= %s
ORDER BY part_id, created_at
"""
cursor.execute(query, (start_date, end_date))
rows = cursor.fetchall()
# Convert to DataFrame for easier analysis
df = pd.DataFrame(rows, columns=['part_id', 'value', 'created_at'])
cursor.close()
conn.close()
return df
except Exception as e:
print(f"Database error: {e}")
return pd.DataFrame()
def analyze_data(df, part_info_df):
"""
Analyze data based on the three criteria:
1. Same value for 3 consecutive days (error)
2. Value is 0 or null (error)
3. Significant increase/decrease compared to previous 3 days (warning)
Also adds part information (label, equipment_id, equipment_name) to the results
"""
if df.empty:
return []
results = []
# Group by part_id
for part_id, group in df.groupby('part_id'):
# Sort by timestamp
group = group.sort_values('created_at')
# Initialize status
status = "green"
messages = []
# Check for null or zero values (Criterion 2)
if group['value'].isnull().any() or (group['value'] == 0).any():
status = "red"
messages.append("Error: Contains null or zero values")
# Check for same value for 3 consecutive days (Criterion 1)
# Group by date and check for consecutive identical values
group['date'] = group['created_at'].dt.date
daily_values = group.groupby('date')['value'].mean().reset_index()
if len(daily_values) >= 3:
for i in range(len(daily_values) - 2):
if (daily_values['value'].iloc[i] == daily_values['value'].iloc[i+1] == daily_values['value'].iloc[i+2]):
status = "red"
messages.append(f"Error: Same value ({daily_values['value'].iloc[i]}) for 3 consecutive days")
break
# Check for significant changes (Criterion 3)
if len(daily_values) >= 4:
for i in range(3, len(daily_values)):
current_value = daily_values['value'].iloc[i]
prev_values = daily_values['value'].iloc[i-3:i]
avg_prev = prev_values.mean()
# Define threshold for significant change (e.g., 30%)
threshold = 0.3
if avg_prev != 0: # Avoid division by zero
change_ratio = abs((current_value - avg_prev) / avg_prev)
if change_ratio > threshold:
if status != "red": # Don't override error status
status = "yellow"
messages.append(f"Warning: Significant change detected ({change_ratio:.2%} change from previous average)")
# Get part information
part_info = part_info_df[part_info_df['part_id'] == part_id]
# Create label and get equipment information
label = ""
equipment_id = None
equipment_name = None
if not part_info.empty:
location_tag = part_info['location_tag'].iloc[0] if not part_info['location_tag'].isnull().iloc[0] else ""
part_name = part_info['part_name'].iloc[0] if not part_info['part_name'].isnull().iloc[0] else ""
label = f"{location_tag} | {part_name}".strip()
equipment_id = part_info['equipment_id'].iloc[0] if not part_info['equipment_id'].isnull().iloc[0] else None
equipment_name = part_info['equipment_name'].iloc[0] if not part_info['equipment_name'].isnull().iloc[0] else None
# Add to results
results.append({
'part_id': part_id,
'label': label,
'equipment_id': equipment_id,
'equipment_name': equipment_name,
'status': status,
'messages': messages,
'latest_value': group['value'].iloc[-1] if not group.empty else None,
'latest_created_at': group['created_at'].iloc[-1] if not group.empty else None
})
# Sort results by equipment_id, then part_id
results = sorted(results, key=lambda x: (x['equipment_id'] or "", x['part_id']))
return results
@app.route('/')
def index():
"""Render the main monitoring dashboard"""
return render_template('index.html', base_url=BASE_URL)
def fetch_part_history(part_id, days=14):
"""Fetch historical data for a specific part_id for the last 'days' days"""
try:
# First connection for dl_pi_fetch_last table (DB_HOST_2)
conn_data = get_db_connection("dl_pi_fetch_last")
cursor_data = conn_data.cursor()
# Calculate the date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Query to fetch data for the specific part_id for the last 'days' days
query = """
SELECT part_id, value, created_at
FROM dl_pi_fetch_last
WHERE part_id = %s AND created_at >= %s AND created_at <= %s
ORDER BY created_at
"""
cursor_data.execute(query, (part_id, start_date, end_date))
rows = cursor_data.fetchall()
# Second connection for pf_parts and ms_equipment_master tables (DB_HOST_1)
conn_parts = get_db_connection("pf_parts")
cursor_parts = conn_parts.cursor()
# Fetch part information
part_info_query = """
SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name
FROM pf_parts p
LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id
WHERE p.id = %s
"""
cursor_parts.execute(part_info_query, (part_id,))
part_info_row = cursor_parts.fetchone()
# Create part info dictionary
part_info = {
'part_id': part_id,
'location_tag': part_info_row[1] if part_info_row and part_info_row[1] else "",
'part_name': part_info_row[2] if part_info_row and part_info_row[2] else "",
'equipment_id': part_info_row[3] if part_info_row and part_info_row[3] else None,
'equipment_name': part_info_row[4] if part_info_row and part_info_row[4] else None
}
# Create label
label = f"{part_info['location_tag']} {part_info['part_name']}".strip()
# Convert to list of dictionaries for JSON serialization
result = []
for row in rows:
result.append({
'part_id': row[0],
'label': label,
'value': row[1],
'created_at': row[2].isoformat() if row[2] else None
})
# Close both connections
cursor_data.close()
conn_data.close()
cursor_parts.close()
conn_parts.close()
return {
'part_info': part_info,
'label': label,
'history': result
}
except Exception as e:
print(f"Database error: {e}")
return {
'part_info': {'part_id': part_id},
'label': part_id,
'history': []
}
@app.route('/api/data')
def get_data():
"""API endpoint to fetch data from _log_pi_status_data table with date filter"""
# Get date parameter from request, default to today's date
filter_date_str = request.args.get('date')
try:
if filter_date_str:
# Parse the date string from frontend (format: YYYY-MM-DD)
filter_date = datetime.strptime(filter_date_str, '%Y-%m-%d').date()
else:
# Default to today's date
filter_date = datetime.now().date()
# Connect to database
conn = get_db_connection()
cursor = conn.cursor()
# Query to fetch data from _log_pi_status_data for the specified date
query = """
SELECT timestamp, part_id, label, equipment_id, equipment_name, status,
messages, latest_value, latest_created_at
FROM _log_pi_status_data
WHERE DATE(timestamp) = %s
ORDER BY equipment_id, part_id
"""
cursor.execute(query, (filter_date,))
rows = cursor.fetchall()
# Process the results
results = []
for row in rows:
timestamp, part_id, label, equipment_id, equipment_name, status, messages_str, latest_value, latest_created_at = row
# Convert messages string back to list
messages = messages_str.split('; ') if messages_str else []
results.append({
'part_id': part_id,
'label': label,
'equipment_id': equipment_id,
'equipment_name': equipment_name,
'status': status,
'messages': messages,
'latest_value': latest_value,
'latest_created_at': latest_created_at
})
# Close database connection
cursor.close()
conn.close()
# Group results by equipment_id for frontend display
grouped_results = {}
for item in results:
equipment_id = item['equipment_id'] or 'unknown'
equipment_name = item['equipment_name'] or 'Unknown Equipment'
if equipment_id not in grouped_results:
grouped_results[equipment_id] = {
'equipment_id': equipment_id,
'equipment_name': equipment_name,
'parts': []
}
grouped_results[equipment_id]['parts'].append(item)
# Convert to list for JSON serialization
final_results = list(grouped_results.values())
return jsonify(final_results)
except Exception as e:
print(f"Error in get_data: {e}")
return jsonify([]), 500
@app.route('/check-data')
def checkdata():
"""API endpoint to fetch and analyze data"""
df = fetch_data(days=10) # Fetch data for the last 3 days
part_info_df = fetch_part_info() # Fetch part information
results = analyze_data(df, part_info_df)
# Telegram notification jika ada status merah
base_url = request.host_url.rstrip('/')
check_red_status_and_notify(results, "8201929832:AAFhDu7LD4xbNyDQ9Cc2JSuTDMhqrLaDDdc", "-1002721738007", base_url)
# Logging ke database
conn = None
cursor = None
try:
conn = get_db_connection() # koneksi ke 192.168.1.85
cursor = conn.cursor()
current_timestamp = datetime.now()
# Delete records from today before inserting new ones
today_date = current_timestamp.date()
delete_query = """
DELETE FROM _log_pi_status_data
WHERE DATE(latest_created_at) = %s
"""
cursor.execute(delete_query, (today_date,))
deleted_count = cursor.rowcount
conn.commit()
print(f"Deleted {deleted_count} records from _log_pi_status_data with latest_created_at={today_date}")
data_to_insert = []
for result in results:
# Only insert records with status other than "green"
if result['status'] != "green":
messages_str = '; '.join(result['messages']) if result['messages'] else None
latest_created_at = result['latest_created_at']
data_to_insert.append((
current_timestamp,
result['part_id'],
result['label'],
result['equipment_id'],
result['equipment_name'],
result['status'],
messages_str,
result['latest_value'],
latest_created_at
))
query = """
INSERT INTO _log_pi_status_data
(timestamp, part_id, label, equipment_id, equipment_name, status, messages, latest_value, \
latest_created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \
"""
batch_size = 500
total = 0
for batch in chunked_iterable(data_to_insert, batch_size):
cursor.executemany(query, batch)
conn.commit()
total += len(batch)
print(f"Successfully logged {total} results to _log_pi_status_data table")
except Exception as e:
print(f"Error logging results to database: {e}")
traceback.print_exc()
finally:
if cursor: cursor.close()
if conn: conn.close()
return jsonify(results)
@app.route('/api/part-history')
def get_part_history():
"""API endpoint to fetch historical data for a specific part_id"""
part_id = request.args.get('part_id')
if not part_id:
return jsonify({'error': 'part_id parameter is required'}), 400
days = request.args.get('days', 14, type=int)
result = fetch_part_history(part_id, days)
return jsonify(result)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5003, debug=True)