import pandas as pd import numpy as np import matplotlib.pyplot as plt # Import both scalers from sklearn.preprocessing import StandardScaler, RobustScaler # Removed LabelEncoder as it's not used in this version from sklearn.cluster import KMeans # Added silhouette_score back, keep classification_report, confusion_matrix from sklearn.metrics import classification_report, confusion_matrix, silhouette_score import argparse import os import seaborn as sns # Command line arguments setup parser = argparse.ArgumentParser(description='Anomaly detection using K-Means clustering with Rate of Change, Change Point Detection, and Delayed Evaluation.') parser.add_argument('--timesteps', type=int, default=20, help='Number of timesteps for sequences.') parser.add_argument('--n_clusters', type=int, default=5, help='Number of clusters for K-Means.') parser.add_argument('--n_init', type=int, default=10, help='Number of initializations for K-Means.') # Using n_init from options (FIXED) parser.add_argument('--transition', action='store_true', help='Use transition data for testing.') parser.add_argument('--plot_raw', action='store_true', help='Plot raw data.') parser.add_argument('--plot_clustered', action='store_true', help='Plot clustered data.') # Removed plot_anomalies, plot_misclassified flags as they are not implemented in this version # parser.add_argument('--plot_anomalies', action='store_true', help='Plot detected anomalies.') # parser.add_argument('--plot_misclassified', action='store_true', help='Plot misclassified instances.') parser.add_argument('--delay', type=int, default=10, help='Number of timesteps to delay evaluation after a change point.') parser.add_argument('--show_change_points', action='store_true', help='Show change points on clustered plots.') parser.add_argument('--use_standard_scaler', action='store_true', help='Use StandardScaler instead of RobustScaler.') # Added scaler choice flag options = parser.parse_args() # Parameters n_clusters = options.n_clusters timesteps = options.timesteps n_init = options.n_init # Used n_init from options (FIXED) delay_steps = options.delay show_change_points = options.show_change_points use_standard_scaler = options.use_standard_scaler # Get scaler choice # Data loading (same as previous code) NumberOfFailures = 4 datafiles = [[], []] for i in range(NumberOfFailures + 1): datafiles[0].append([]) datafiles[1].append([]) datafiles[0][0] = ['2024-08-07_5_', '2024-08-08_5_', '2025-01-25_5_', '2025-01-26_5_'] datafiles[0][1] = ['2024-12-11_5_', '2024-12-12_5_', '2024-12-13_5_'] datafiles[0][2] = ['2024-12-18_5_', '2024-12-21_5_', '2024-12-22_5_', '2024-12-23_5_', '2024-12-24_5_'] datafiles[0][3] = ['2024-12-28_5_', '2024-12-29_5_', '2024-12-30_5_'] datafiles[0][4] = ['2025-02-13_5_', '2025-02-14_5_'] if options.transition: datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_'] datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_', '2024-12-16_5_'] datafiles[1][2] = ['2024-12-17_5_', '2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_'] datafiles[1][3] = ['2024-12-27_5_', '2024-12-31_5_', '2025-01-01_5_'] datafiles[1][4] = ['2025-02-12_5_', '2025-02-15_5_', '2025-02-16_5_'] else: datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_'] datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_'] datafiles[1][2] = ['2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_'] datafiles[1][3] = ['2024-12-31_5_', '2025-01-01_5_'] datafiles[1][4] = ['2025-02-15_5_', '2025-02-16_5_'] features = ['r1 s1', 'r1 s4', 'r1 s5'] n_original_features = len(features) # Store original number of features for sequence creation with ROC # Using standard LaTeX formatting for display names featureNames = {'r1 s1': r'$T_{evap}$', 'r1 s4': r'$T_{cond}$', 'r1 s5': r'$T_{air}$'} unitNames = {'r1 s1': r'($^o$C)', 'r1 s4': r'($^o$C)', 'r1 s5': r'($^o$C)'} NumFeatures = len(features) # Used for indexing features[:NumFeatures] # Load and preprocess data (same as previous code) dataTrain = [] for class_files in datafiles[0]: script_dir = os.path.dirname(os.path.abspath(__file__)) data_dir = os.path.join(script_dir, 'data') class_dfs = [] for base_filename in class_files: filepath = os.path.join(data_dir, f'{base_filename}.csv') try: df = pd.read_csv(filepath) df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce') df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce')) for col in features: df[col] = pd.to_numeric(df[col], errors='coerce') df = df.set_index('timestamp').resample('5Min')[features].mean() df = df[features].interpolate() class_dfs.append(df) except FileNotFoundError: print(f"Warning: File {filepath} not found and skipped.") if class_dfs: dataTrain.append(pd.concat(class_dfs)) combined_train_data = pd.concat(dataTrain) dataTest = [] for class_files in datafiles[1]: script_dir = os.path.dirname(os.path.abspath(__file__)) data_dir = os.path.join(script_dir, 'data') class_dfs = [] for base_filename in class_files: filepath = os.path.join(data_dir, f'{base_filename}.csv') try: df = pd.read_csv(filepath) df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce') df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce')) for col in features: df[col] = pd.to_numeric(df[col], errors='coerce') df = df.set_index('timestamp').resample('5Min')[features].mean() df = df[features].interpolate() class_dfs.append(df) except FileNotFoundError: print(f"Warning: File {filepath} not found and skipped.") if class_dfs: dataTest.append(pd.concat(class_dfs)) # Normalize data (Uses RobustScaler by default, StandardScaler if --use_standard_scaler is set) scaler = StandardScaler() if use_standard_scaler else RobustScaler() # Scaler choice based on argument scaled_train_data = scaler.fit_transform(combined_train_data[features]) scaled_test_data_list = [] # This list will store NumPy arrays initially for df in dataTest: scaled_test_data_list.append(scaler.transform(df[features])) # Convert scaled data to DataFrames for easier handling and plotting scaled_train_df = pd.DataFrame(scaled_train_data, columns=features, index=combined_train_data.index) scaled_test_df_list = [pd.DataFrame(data, columns=features, index=df.index) for data, df in zip(scaled_test_data_list, dataTest)] # This list stores DataFrames # Create time sequences WITH Rate of Change (from previous version) def create_sequences_with_rate_of_change(data, timesteps, original_features_count): sequences = [] # Calculate rate of change for the entire data first (handles NaNs appropriately) # Resulting shape is (len(data), original_features_count), with NaN in first row rate_of_change_full = np.diff(data, axis=0, prepend=np.nan) # Combine original data and rate of change # Horizontally stack original data and rate of change combined_data = np.hstack((data, rate_of_change_full)) # Shape (len(data), 2 * original_features_count) # Create sequences from the combined data for i in range(1, len(combined_data) - timesteps + 1): # CORRECTED: Start range from 1 to skip the first sequence sequence = combined_data[i:i + timesteps] # Shape (timesteps, 2 * original_features_count) sequences.append(sequence) return np.array(sequences) X_train_sequences = create_sequences_with_rate_of_change(scaled_train_df.values, timesteps, n_original_features) # Use scaled_train_df.values # Train K-Means model on all training data n_samples_train, n_timesteps_train, n_total_features_train = X_train_sequences.shape # Total features is now 2 * original_features X_train_reshaped = X_train_sequences.reshape(n_samples_train, n_timesteps_train * n_total_features_train) # Flatten sequences kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=n_init) # Using n_init from options (FIXED) kmeans.fit(X_train_reshaped) # Function to detect change points (Function following sharp jumps or drops) # This function works on a single 2D data array (samples, features) # It's typically better to detect change points on the original scaled features BEFORE adding ROC def detect_change_points(data, threshold=0.8): change_points = [] # Iterate through the data points starting from the second one for i in range(1, len(data)): # Calculate the absolute difference between the current point and the previous point difference = np.abs(data[i] - data[i-1]) # If the difference for ANY feature is greater than the threshold, mark this point as a change point if np.any(difference > threshold): change_points.append(i) return np.array(change_points) # Function to plot clustered data (adapted to accept sequence indices and show change points) def plot_clustered_data(df, predicted_clusters, time_index, n_clusters, features, featureNames, unitNames, show_cp=False, change_point_indices=None): # Note: 'features' here is the list of original feature names num_original_features = len(features) fig, axes = plt.subplots(num_original_features, 1, figsize=(15, 5 * num_original_features), sharex=True) if num_original_features == 1: axes = [axes] # Ensure axes is always an array colors = plt.cm.viridis(np.linspace(0, 1, n_clusters)) # We are plotting original features only # The input 'features' list in this function IS the list of original feature names ('r1 s1', 'r1 s4', 'r1 s5') for i, feature in enumerate(features): # Iterate only through original features for plotting # Plot data points colored by their assigned cluster for cluster_id in range(n_clusters): cluster_indices_kmeans = np.where(predicted_clusters == cluster_id)[0] if len(cluster_indices_kmeans) > 0: # Use time_index for x-axis and original df for y-axis values axes[i].scatter(time_index[cluster_indices_kmeans], df[feature].loc[time_index[cluster_indices_kmeans]], color=colors[cluster_id], label=f'Cluster {cluster_id}', s=10, alpha=0.6) # Added alpha for better visualization of overlaps axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}') # Use original names/units from dict axes[i].set_title(featureNames[feature]) # Use original names from dict axes[i].grid(True, linestyle='--', alpha=0.6) # Added grid for readability # Plot change points if enabled if show_cp and change_point_indices is not None: # Ensure change_point_indices contains datetime objects matching time_index for cp_time in change_point_indices: axes[i].axvline(x=cp_time, color='red', linestyle='--', linewidth=1.5, label='Change Point' if i == 0 else '', alpha=0.8) # Only one label for change point # Add legend to the last subplot, including cluster labels and change point if plotted handles, labels = [], [] # Collect handles and labels from all axes to avoid duplicates for ax in axes: for handle, label in zip(*ax.get_legend_handles_labels()): if label not in labels: handles.append(handle) labels.append(label) if handles: axes[-1].legend(handles, labels, loc='upper right') plt.tight_layout() plt.show() # Combined Evaluation Function (Full and Delayed) def evaluate_and_report(kmeans_model, scaled_test_df_list, original_test_data_list, true_labels_list, timesteps, delay_steps, features, options, n_original_features): all_y_true_full = [] # True labels for all test sequences all_predicted_cluster_labels_full = [] # Predicted clusters for all test sequences # all_original_test_sequences_full = [] # Not directly needed in evaluation logic itself all_change_points_detected_list = [] # Detected change points time indices for each test file # --- 1. Collect data and predict clusters for ALL test sequences --- # Iterate over scaled DataFrames (scaled_test_df_list) and original DataFrames (original_test_data_list) for k, (scaled_df, original_df, y_true_categorical) in enumerate(zip(scaled_test_df_list, original_test_data_list, true_labels_list)): original_indices = original_df.index # time_index for plotting and aligning labels corresponds to the end of each sequence time_index = original_indices[timesteps - 1:] # Create sequences (using create_sequences_with_rate_of_change for this version) # scaled_df is a DataFrame, pass its .values sequences = create_sequences_with_rate_of_change(scaled_df.values, timesteps, n_original_features) if sequences.size == 0: print(f"Warning: No sequences generated for test file {k}. Skipping.") all_change_points_detected_list.append([]) # Append empty list for consistency continue # Skip to next file n_sequences = sequences.shape[0] reshaped_sequences = sequences.reshape(n_sequences, -1) predicted_clusters = kmeans_model.predict(reshaped_sequences) # Collect true labels aligned with the sequences that were ACTUALLY created. # create_sequences_with_rate_of_change skips the first sequence (ending at timesteps-1). # So, start collecting true labels from the index corresponding to the END of the second sequence (index timesteps). all_y_true_full.extend(y_true_categorical[timesteps:]) # CORRECTED: Start slicing from timesteps all_predicted_cluster_labels_full.extend(predicted_clusters) # Detect change points for this test file (on scaled data - using ORIGINAL features before adding ROC) # scaled_df.values contains only the original scaled features change_points = detect_change_points(scaled_df.values, threshold=0.8) # Adjust threshold as needed # Map original data change point indices to the time_index of sequences # A change at original index `cp_original` corresponds to the sequence ENDING at `cp_original`. # The index in the `sequences` array corresponding to original index `cp_original` is `cp_original - (timesteps - 1)`. # We need to make sure this sequence index is valid (>= 0 and < n_sequences). change_point_sequence_indices = change_points - (timesteps - 1) # Filter for valid sequence indices valid_change_point_sequence_indices = change_point_sequence_indices[(change_point_sequence_indices >= 0) & (change_point_sequence_indices < n_sequences)] # Store the time index of the valid change point sequences if valid_change_point_sequence_indices.size > 0: cp_time_indices = time_index[valid_change_point_sequence_indices].tolist() all_change_points_detected_list.append(cp_time_indices) else: all_change_points_detected_list.append([]) # Plot clustered data for the current test file if requested and transition=False (to avoid duplicate plots handled later) # This uses original_df and predicted_clusters for this file if options.plot_clustered and not options.transition: print(f"\nClustered Data for Test File {k}:") plot_clustered_data(original_df, predicted_clusters, time_index, kmeans_model.n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=cp_time_indices if show_change_points else None) # Convert collected lists to numpy arrays for evaluation all_y_true_full = np.array(all_y_true_full) all_predicted_cluster_labels_full = np.array(all_predicted_cluster_labels_full) # all_original_test_sequences_full = np.array(all_original_test_sequences_full) # Convert if needed for plotting later # --- 2. Perform FULL Evaluation (on all test sequences) --- print("\n--- Full Evaluation Results (All Test Sequences) ---") # Analyze clusters and assign a dominant true label to each cluster based on ALL test sequences cluster_dominant_label_full = {} for cluster_id in range(kmeans_model.n_clusters): indices_in_cluster = np.where(all_predicted_cluster_labels_full == cluster_id)[0] if len(indices_in_cluster) > 0: labels_in_cluster = all_y_true_full[indices_in_cluster] if len(labels_in_cluster) > 0: # Use np.argmax to find the index of the max count (dominant label) dominant_label = np.argmax(np.bincount(labels_in_cluster)) cluster_dominant_label_full[cluster_id] = dominant_label else: cluster_dominant_label_full[cluster_id] = -1 # No data in this cluster with known labels else: cluster_dominant_label_full[cluster_id] = -1 # Empty cluster # Create predicted labels for full evaluation based on the dominant label of the assigned cluster predicted_labels_numeric_full = np.array([cluster_dominant_label_full.get(cluster_id, -1) for cluster_id in all_predicted_cluster_labels_full]) # Evaluate (using numeric labels for the full set) valid_indices_full = predicted_labels_numeric_full != -1 if np.sum(valid_indices_full) > 0 and len(np.unique(all_y_true_full[valid_indices_full])) > 1: print("Classification Report (Full Test Set):") print(classification_report(all_y_true_full[valid_indices_full], predicted_labels_numeric_full[valid_indices_full])) cm_full = confusion_matrix(all_y_true_full[valid_indices_full], predicted_labels_numeric_full[valid_indices_full]) plt.figure(figsize=(8, 6)) sns.heatmap(cm_full, annot=True, fmt='d', cmap='Blues') plt.xlabel('Predicted Cluster (Dominant True Label)') plt.ylabel('True Label') plt.title('Confusion Matrix (Full Test Set)') plt.show() else: print("Could not perform full evaluation (not enough data or classes after mapping).") # --- 3. Perform DELAYED Evaluation (on subset after delay) --- print("\n--- Delayed Evaluation Results (Subset after Delay) ---") all_y_true_delayed = [] all_predicted_cluster_labels_delayed = [] # Apply delay logic PER FILE's sequence indices, and collect results sequence_count_so_far = 0 # Iterate over scaled DataFrames (scaled_test_df_list) and original DataFrames (original_test_data_list) for k, (scaled_df, original_df, y_true_categorical) in enumerate(zip(scaled_test_df_list, original_test_data_list, true_labels_list)): # Use the correct sequence creation function based on this version (with ROC) sequences = create_sequences_with_rate_of_change(scaled_df.values, timesteps, n_original_features) if sequences.size == 0: sequence_count_so_far += 0 # No sequences added continue # Skip empty files n_sequences_file = sequences.shape[0] # Detect change points for THIS file (on scaled data - original features) change_points = detect_change_points(scaled_df.values, threshold=0.8) # Adjust threshold as needed # Apply delay logic PER FILE's sequence indices evaluation_allowed_file = np.ones(n_sequences_file, dtype=bool) # Map original data change point indices to sequence indices for delay logic change_point_sequence_indices_file = change_points - (timesteps - 1) # Filter for valid sequence indices for delay valid_change_point_sequence_indices_file = change_point_sequence_indices_file[(change_point_sequence_indices_file >= 0) & (change_point_sequence_indices_file < n_sequences_file)] for cp_seq_index in valid_change_point_sequence_indices_file: start_delay = max(0, cp_seq_index) end_delay = min(n_sequences_file, cp_seq_index + delay_steps) evaluation_allowed_file[start_delay:end_delay] = False # Collect data for DELAYED evaluation (only where evaluation_allowed_file is True) # Use the dominant label mapping calculated on the FULL test set for consistency # Get the predicted clusters for sequences in THIS file (from the full prediction list) predicted_clusters_file = all_predicted_cluster_labels_full[sequence_count_so_far : sequence_count_so_far + n_sequences_file] predicted_labels_numeric_file = np.array([cluster_dominant_label_full.get(cluster, -1) for cluster in predicted_clusters_file]) # Use full mapping # Get true labels for sequences in THIS file (aligned with sequences) true_labels_file = all_y_true_full[sequence_count_so_far : sequence_count_so_far + n_sequences_file] all_y_true_delayed.extend(true_labels_file[evaluation_allowed_file]) all_predicted_cluster_labels_delayed.extend(predicted_labels_numeric_file[evaluation_allowed_file]) sequence_count_so_far += n_sequences_file # Update count for slicing all_y_true_delayed = np.array(all_y_true_delayed) all_predicted_cluster_labels_delayed = np.array(all_predicted_cluster_labels_delayed) # Perform Delayed Evaluation valid_indices_delayed = all_predicted_cluster_labels_delayed != -1 if np.sum(valid_indices_delayed) > 0 and len(np.unique(all_y_true_delayed[valid_indices_delayed])) > 1: print("Classification Report (Subset after Delay):") print(classification_report(all_y_true_delayed[valid_indices_delayed], all_predicted_cluster_labels_delayed[valid_indices_delayed])) cm_delayed = confusion_matrix(all_y_true_delayed[valid_indices_delayed], all_predicted_cluster_labels_delayed[valid_indices_delayed]) plt.figure(figsize=(8, 6)) sns.heatmap(cm_delayed, annot=True, fmt='d', cmap='Blues') plt.xlabel('Predicted Label (Delayed)') plt.ylabel('True Label') plt.title('Confusion Matrix (Subset after Delay)') plt.show() else: print("Could not perform delayed evaluation (not enough data after delay or classes).") # --- 4. Report Detected Change Points --- print("\nDetected Change Points (Start Time of Sequence after Change):") # Print the collected list of change point time indices per file for i, cp_list in enumerate(all_change_points_detected_list): print(f"File {i}: {cp_list}") # Note: Anomaly and Misclassified plotting is not implemented in this version due to complexity with delayed evaluation subset. # Return collected data if needed for further processing (e.g., plotting) return all_y_true_full, all_predicted_cluster_labels_full, all_change_points_detected_list # Removed original_test_sequences_full return # Main execution if __name__ == "__main__": # Load and preprocess training data (already done outside __name__ == "__main__") # Load and preprocess test data (already done outside __name__ == "__main__") # Create true labels list for test data true_labels_list = [] for i, df in enumerate(dataTest): true_labels_list.append(np.full(len(df), i)) # Plot raw data if requested if options.plot_raw: print("\nPlotting Raw Data:") num_features = len(features) fig, axes = plt.subplots(num_features, 1, figsize=(15, 5 * num_features), sharex=True) if num_features == 1: axes = [axes] for i, feature in enumerate(features): for k, df in enumerate(dataTest): axes[i].plot(df.index, df[feature], label=f'Class {k}', alpha=0.7) # Added alpha axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}') axes[i].set_title(featureNames[feature]) axes[-1].legend(loc='upper right') # Legend on the last subplot plt.tight_layout() plt.show() # Plot clustered data for training set if requested # Use scaled_train_df.values for sequence creation train_sequences = create_sequences_with_rate_of_change(scaled_train_df.values, timesteps, n_original_features) train_reshaped_sequences = train_sequences.reshape(train_sequences.shape[0], -1) train_predicted_clusters = kmeans.predict(train_reshaped_sequences) train_time_index = combined_train_data.index[timesteps - 1:] if options.plot_clustered: print("\nClustered Data for Training Set:") # Change points detection for training data (optional) - on original scaled features train_change_points = detect_change_points(scaled_train_df.values, threshold=0.8) train_change_point_sequence_indices = train_change_points - (timesteps - 1) valid_train_change_point_sequence_indices = train_change_point_sequence_indices[(train_change_point_sequence_indices >= 0) & (train_change_point_sequence_indices < train_sequences.shape[0])] train_cp_time_indices = train_time_index[valid_train_change_point_sequence_indices].tolist() if valid_train_change_point_sequence_indices.size > 0 else None # Use combined_train_data for plotting original values plot_clustered_data(combined_train_data.loc[train_time_index], train_predicted_clusters, train_time_index, n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=train_cp_time_indices) # Plot clustered data for TEST set (per file) if requested # Note: This iterates through test files and plots each one's clustered data. # It also calculates and plots change points for each file. if options.plot_clustered: print("\nClustered Data for Test Sets (per file):") # Corrected loop: iterate over scaled DataFrames (scaled_test_df_list) and original DataFrames (dataTest) for k, (scaled_df, original_df) in enumerate(zip(scaled_test_df_list, dataTest)): original_indices = original_df.index time_index = original_indices[timesteps - 1:] # Use scaled_df.values for sequence creation sequences = create_sequences_with_rate_of_change(scaled_df.values, timesteps, n_original_features) if sequences.size == 0: continue reshaped_sequences = sequences.reshape(sequences.shape[0], -1) predicted_clusters = kmeans.predict(reshaped_sequences) # Change points detection for this test file - on original scaled features change_points = detect_change_points(scaled_df.values, threshold=0.8) change_point_sequence_indices = change_points - (timesteps - 1) valid_change_point_sequence_indices = change_point_sequence_indices[(change_point_sequence_indices >= 0) & (change_point_sequence_indices < sequences.shape[0])] cp_time_indices = time_index[valid_change_point_sequence_indices].tolist() if valid_change_point_sequence_indices.size > 0 else None print(f" Plotting Test File {k}") # Use original_df for plotting original values plot_clustered_data(original_df, predicted_clusters, time_index, n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=cp_time_indices) # Perform Evaluation (Full and Delayed) # This function handles all evaluation reporting and confusion matrix plotting # Pass scaled_test_df_list (list of DataFrames) all_y_true_full, all_predicted_cluster_labels_full, all_change_points_detected_list = evaluate_and_report(kmeans, scaled_test_df_list, dataTest, true_labels_list, timesteps, delay_steps, features, options, n_original_features) # Pass scaled_test_df_list (FIXED) # Calculate and print Inertia and Silhouette Score for combined test data # Need to combine all test sequences and cluster labels first # The evaluation function now returns the full lists, so we can use them here. # Need the combined reshaped sequences for Silhouette calculation # Use scaled_test_df_list to create sequences X_test_sequences_combined = np.vstack([create_sequences_with_rate_of_change(df.values, timesteps, n_original_features) for df in scaled_test_df_list if create_sequences_with_rate_of_change(df.values, timesteps, n_original_features).size > 0]) if X_test_sequences_combined.size > 0 and all_predicted_cluster_labels_full.size > 0: # Check if any sequences were processed for evaluation X_test_combined_reshaped = X_test_sequences_combined.reshape(X_test_sequences_combined.shape[0], -1) print("\n--- K-Means Model Evaluation (Overall Metrics on Combined Test Data) ---") print(f"Inertia: {kmeans.inertia_:.4f}") # Inertia is from training fit # Silhouette score on the combined test data # Use the cluster labels predicted for the full test set if len(np.unique(all_predicted_cluster_labels_full)) > 1 and all_predicted_cluster_labels_full.size > 0: # Check for multiple clusters and non-empty try: silhouette = silhouette_score(X_test_combined_reshaped, all_predicted_cluster_labels_full) print(f"Silhouette Score: {silhouette:.4f}") except ValueError as e: print(f"Silhouette Score: Could not calculate ({e})") else: print("Silhouette Score: Not applicable for single cluster or empty data on combined test data.") else: print("\n--- K-Means Model Evaluation (Overall Metrics) ---") print("No test data sequences available or processed for overall evaluation metrics.") # Note: Anomaly and Misclassified plotting is not implemented in this version.