import os import psycopg2 import pandas as pd import numpy as np from flask import Flask, render_template, jsonify, request from datetime import datetime, timedelta import time from dotenv import load_dotenv from telegram_notifier import check_red_status_and_notify import traceback # Load environment variables from .env file load_dotenv() # Base URL configuration from environment variable, default to empty string if not set BASE_URL = os.getenv("BASE_URL", "") app = Flask(__name__) # Database configuration from environment variables DB_HOST_1 = os.getenv("DB_HOST_1") # For tables: pf_parts, ms_equipment_master DB_HOST_2 = os.getenv("DB_HOST_2") # For table: dl_pi_fetch_last DB_PORT = os.getenv("DB_PORT") DB_USER = os.getenv("DB_USER") DB_PASS = os.getenv("DB_PASS") DB_NAME = os.getenv("DB_NAME") def chunked_iterable(iterable, size): """Helper to split list into chunks""" for i in range(0, len(iterable), size): yield iterable[i:i + size] def get_db_connection(table=None, retries=3, delay=2): """Koneksi PostgreSQL dengan retry dan timeout""" host = DB_HOST_2 if table == "dl_pi_fetch_last" else DB_HOST_1 for attempt in range(retries): try: return psycopg2.connect( host=host, port=DB_PORT, user=DB_USER, password=DB_PASS, dbname=DB_NAME, connect_timeout=5 # timeout agar tidak hang lama ) except Exception as e: print(f"DB connection error (attempt {attempt + 1}/{retries}): {e}") if attempt < retries - 1: time.sleep(delay) else: raise def fetch_part_info(): """Fetch part information from pf_parts and ms_equipment_master tables""" try: conn = get_db_connection("pf_parts") # Using DB_HOST_1 (192.168.1.85) cursor = conn.cursor() # Query to fetch part information with equipment grouping query = """ SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name FROM pf_parts p LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id ORDER BY e.id, p.id """ cursor.execute(query) rows = cursor.fetchall() # Convert to DataFrame for easier processing df = pd.DataFrame(rows, columns=['part_id', 'location_tag', 'part_name', 'equipment_id', 'equipment_name']) cursor.close() conn.close() return df except Exception as e: print(f"Database error fetching part info: {e}") return pd.DataFrame() def fetch_data(days=3): """Fetch data from dl_pi_fetch_last table for the last 'days' days""" try: conn = get_db_connection("dl_pi_fetch_last") # Using DB_HOST_2 (192.168.1.86) cursor = conn.cursor() # Calculate the date range end_date = datetime.now() start_date = end_date - timedelta(days=days) # Query to fetch data for the last 'days' days query = """ SELECT part_id, value, created_at FROM dl_pi_fetch_last WHERE created_at >= %s AND created_at <= %s ORDER BY part_id, created_at """ cursor.execute(query, (start_date, end_date)) rows = cursor.fetchall() # Convert to DataFrame for easier analysis df = pd.DataFrame(rows, columns=['part_id', 'value', 'created_at']) cursor.close() conn.close() return df except Exception as e: print(f"Database error: {e}") return pd.DataFrame() def analyze_data(df, part_info_df): """ Analyze data based on the three criteria: 1. Same value for 3 consecutive days (error) 2. Value is 0 or null (error) 3. Significant increase/decrease compared to previous 3 days (warning) Also adds part information (label, equipment_id, equipment_name) to the results """ if df.empty: return [] results = [] # Group by part_id for part_id, group in df.groupby('part_id'): # Sort by timestamp group = group.sort_values('created_at') # Initialize status status = "green" messages = [] # Check for null or zero values (Criterion 2) if group['value'].isnull().any() or (group['value'] == 0).any(): status = "red" messages.append("Error: Contains null or zero values") # Check for same value for 3 consecutive days (Criterion 1) # Group by date and check for consecutive identical values group['date'] = group['created_at'].dt.date daily_values = group.groupby('date')['value'].mean().reset_index() if len(daily_values) >= 3: for i in range(len(daily_values) - 2): if (daily_values['value'].iloc[i] == daily_values['value'].iloc[i+1] == daily_values['value'].iloc[i+2]): status = "red" messages.append(f"Error: Same value ({daily_values['value'].iloc[i]}) for 3 consecutive days") break # Check for significant changes (Criterion 3) if len(daily_values) >= 4: for i in range(3, len(daily_values)): current_value = daily_values['value'].iloc[i] prev_values = daily_values['value'].iloc[i-3:i] avg_prev = prev_values.mean() # Define threshold for significant change (e.g., 30%) threshold = 0.3 if avg_prev != 0: # Avoid division by zero change_ratio = abs((current_value - avg_prev) / avg_prev) if change_ratio > threshold: if status != "red": # Don't override error status status = "yellow" messages.append(f"Warning: Significant change detected ({change_ratio:.2%} change from previous average)") # Get part information part_info = part_info_df[part_info_df['part_id'] == part_id] # Create label and get equipment information label = "" equipment_id = None equipment_name = None if not part_info.empty: location_tag = part_info['location_tag'].iloc[0] if not part_info['location_tag'].isnull().iloc[0] else "" part_name = part_info['part_name'].iloc[0] if not part_info['part_name'].isnull().iloc[0] else "" label = f"{location_tag} | {part_name}".strip() equipment_id = part_info['equipment_id'].iloc[0] if not part_info['equipment_id'].isnull().iloc[0] else None equipment_name = part_info['equipment_name'].iloc[0] if not part_info['equipment_name'].isnull().iloc[0] else None # Add to results results.append({ 'part_id': part_id, 'label': label, 'equipment_id': equipment_id, 'equipment_name': equipment_name, 'status': status, 'messages': messages, 'latest_value': group['value'].iloc[-1] if not group.empty else None, 'latest_created_at': group['created_at'].iloc[-1] if not group.empty else None }) # Sort results by equipment_id, then part_id results = sorted(results, key=lambda x: (x['equipment_id'] or "", x['part_id'])) return results @app.route('/') def index(): """Render the main monitoring dashboard""" return render_template('index.html', base_url=BASE_URL) def fetch_part_history(part_id, days=14): """Fetch historical data for a specific part_id for the last 'days' days""" try: # First connection for dl_pi_fetch_last table (DB_HOST_2) conn_data = get_db_connection("dl_pi_fetch_last") cursor_data = conn_data.cursor() # Calculate the date range end_date = datetime.now() start_date = end_date - timedelta(days=days) # Query to fetch data for the specific part_id for the last 'days' days query = """ SELECT part_id, value, created_at FROM dl_pi_fetch_last WHERE part_id = %s AND created_at >= %s AND created_at <= %s ORDER BY created_at """ cursor_data.execute(query, (part_id, start_date, end_date)) rows = cursor_data.fetchall() # Second connection for pf_parts and ms_equipment_master tables (DB_HOST_1) conn_parts = get_db_connection("pf_parts") cursor_parts = conn_parts.cursor() # Fetch part information part_info_query = """ SELECT p.id as part_id, p.location_tag, p.part_name, e.id as equipment_id, e.name as equipment_name FROM pf_parts p LEFT JOIN ms_equipment_master e ON p.equipment_id = e.id WHERE p.id = %s """ cursor_parts.execute(part_info_query, (part_id,)) part_info_row = cursor_parts.fetchone() # Create part info dictionary part_info = { 'part_id': part_id, 'location_tag': part_info_row[1] if part_info_row and part_info_row[1] else "", 'part_name': part_info_row[2] if part_info_row and part_info_row[2] else "", 'equipment_id': part_info_row[3] if part_info_row and part_info_row[3] else None, 'equipment_name': part_info_row[4] if part_info_row and part_info_row[4] else None } # Create label label = f"{part_info['location_tag']} {part_info['part_name']}".strip() # Convert to list of dictionaries for JSON serialization result = [] for row in rows: result.append({ 'part_id': row[0], 'label': label, 'value': row[1], 'created_at': row[2].isoformat() if row[2] else None }) # Close both connections cursor_data.close() conn_data.close() cursor_parts.close() conn_parts.close() return { 'part_info': part_info, 'label': label, 'history': result } except Exception as e: print(f"Database error: {e}") return { 'part_info': {'part_id': part_id}, 'label': part_id, 'history': [] } @app.route('/api/data') def get_data(): """API endpoint to fetch data from _log_pi_status_data table with date filter""" # Get date parameter from request, default to today's date filter_date_str = request.args.get('date') try: if filter_date_str: # Parse the date string from frontend (format: YYYY-MM-DD) filter_date = datetime.strptime(filter_date_str, '%Y-%m-%d').date() else: # Default to today's date filter_date = datetime.now().date() # Connect to database conn = get_db_connection() cursor = conn.cursor() # Query to fetch data from _log_pi_status_data for the specified date query = """ SELECT timestamp, part_id, label, equipment_id, equipment_name, status, messages, latest_value, latest_created_at FROM _log_pi_status_data WHERE DATE(timestamp) = %s ORDER BY equipment_id, part_id """ cursor.execute(query, (filter_date,)) rows = cursor.fetchall() # Process the results results = [] for row in rows: timestamp, part_id, label, equipment_id, equipment_name, status, messages_str, latest_value, latest_created_at = row # Convert messages string back to list messages = messages_str.split('; ') if messages_str else [] results.append({ 'part_id': part_id, 'label': label, 'equipment_id': equipment_id, 'equipment_name': equipment_name, 'status': status, 'messages': messages, 'latest_value': latest_value, 'latest_created_at': latest_created_at }) # Close database connection cursor.close() conn.close() # Group results by equipment_id for frontend display grouped_results = {} for item in results: equipment_id = item['equipment_id'] or 'unknown' equipment_name = item['equipment_name'] or 'Unknown Equipment' if equipment_id not in grouped_results: grouped_results[equipment_id] = { 'equipment_id': equipment_id, 'equipment_name': equipment_name, 'parts': [] } grouped_results[equipment_id]['parts'].append(item) # Convert to list for JSON serialization final_results = list(grouped_results.values()) return jsonify(final_results) except Exception as e: print(f"Error in get_data: {e}") return jsonify([]), 500 @app.route('/check-data') def checkdata(): """API endpoint to fetch and analyze data""" df = fetch_data(days=10) # Fetch data for the last 3 days part_info_df = fetch_part_info() # Fetch part information results = analyze_data(df, part_info_df) # Telegram notification jika ada status merah base_url = request.host_url.rstrip('/') check_red_status_and_notify(results, "8201929832:AAFhDu7LD4xbNyDQ9Cc2JSuTDMhqrLaDDdc", "-1002721738007", base_url) # Logging ke database conn = None cursor = None try: conn = get_db_connection() # koneksi ke 192.168.1.85 cursor = conn.cursor() current_timestamp = datetime.now() # Delete records from today before inserting new ones today_date = current_timestamp.date() delete_query = """ DELETE FROM _log_pi_status_data WHERE DATE(latest_created_at) = %s """ cursor.execute(delete_query, (today_date,)) deleted_count = cursor.rowcount conn.commit() print(f"Deleted {deleted_count} records from _log_pi_status_data with latest_created_at={today_date}") data_to_insert = [] for result in results: # Only insert records with status other than "green" if result['status'] != "green": messages_str = '; '.join(result['messages']) if result['messages'] else None latest_created_at = result['latest_created_at'] data_to_insert.append(( current_timestamp, result['part_id'], result['label'], result['equipment_id'], result['equipment_name'], result['status'], messages_str, result['latest_value'], latest_created_at )) query = """ INSERT INTO _log_pi_status_data (timestamp, part_id, label, equipment_id, equipment_name, status, messages, latest_value, \ latest_created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) \ """ batch_size = 500 total = 0 for batch in chunked_iterable(data_to_insert, batch_size): cursor.executemany(query, batch) conn.commit() total += len(batch) print(f"Successfully logged {total} results to _log_pi_status_data table") except Exception as e: print(f"Error logging results to database: {e}") traceback.print_exc() finally: if cursor: cursor.close() if conn: conn.close() return jsonify(results) @app.route('/api/part-history') def get_part_history(): """API endpoint to fetch historical data for a specific part_id""" part_id = request.args.get('part_id') if not part_id: return jsonify({'error': 'part_id parameter is required'}), 400 days = request.args.get('days', 14, type=int) result = fetch_part_history(part_id, days) return jsonify(result) if __name__ == '__main__': app.run(host='0.0.0.0', port=5003, debug=True)