123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from sklearn.cluster import KMeans
- from sklearn.preprocessing import StandardScaler
- # Removed LabelEncoder as it's not used in this version
- from sklearn.metrics import classification_report, confusion_matrix, silhouette_score # Added silhouette_score back
- import argparse
- import os
- import seaborn as sns
-
- # Command line arguments setup (Removed plot_anomalies, plot_misclassified flags as they are not implemented)
- parser = argparse.ArgumentParser(description='Anomaly detection using K-Means with change point detection and delayed evaluation.')
- parser.add_argument('--timesteps', type=int, default=20, help='Number of timesteps for sequences.')
- parser.add_argument('--n_clusters', type=int, default=5, help='Number of clusters for K-Means.')
- parser.add_argument('--n_init', type=int, default=10, help='Number of initializations for K-Means.') # Added n_init back
- parser.add_argument('--transition', action='store_true', help='Use transition data for testing.')
- parser.add_argument('--plot_raw', action='store_true', help='Plot raw data.')
- parser.add_argument('--plot_clustered', action='store_true', help='Plot clustered data.')
- # parser.add_argument('--plot_anomalies', action='store_true', help='Plot detected anomalies.') # Removed
- # parser.add_argument('--plot_misclassified', action='store_true', help='Plot misclassified instances.') # Removed
- parser.add_argument('--delay', type=int, default=10, help='Number of timesteps to delay evaluation after a change point.')
- parser.add_argument('--show_change_points', action='store_true', help='Show change points on clustered plots.')
- options = parser.parse_args()
-
- # Parameters
- n_clusters = options.n_clusters
- timesteps = options.timesteps
- n_init = options.n_init # Used n_init from options (FIXED)
- delay_steps = options.delay
- show_change_points = options.show_change_points
-
- # Data loading (same as previous code)
- NumberOfFailures = 4
- datafiles = [[], []]
- for i in range(NumberOfFailures + 1):
- datafiles[0].append([])
- datafiles[1].append([])
-
- datafiles[0][0] = ['2024-08-07_5_', '2024-08-08_5_', '2025-01-25_5_', '2025-01-26_5_']
- datafiles[0][1] = ['2024-12-11_5_', '2024-12-12_5_', '2024-12-13_5_']
- datafiles[0][2] = ['2024-12-18_5_', '2024-12-21_5_', '2024-12-22_5_', '2024-12-23_5_', '2024-12-24_5_']
- datafiles[0][3] = ['2024-12-28_5_', '2024-12-29_5_', '2024-12-30_5_']
- datafiles[0][4] = ['2025-02-13_5_', '2025-02-14_5_']
-
- if options.transition:
- datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_']
- datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_', '2024-12-16_5_']
- datafiles[1][2] = ['2024-12-17_5_', '2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_']
- datafiles[1][3] = ['2024-12-27_5_', '2024-12-31_5_', '2025-01-01_5_']
- datafiles[1][4] = ['2025-02-12_5_', '2025-02-15_5_', '2025-02-16_5_']
- else:
- datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_']
- datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_']
- datafiles[1][2] = ['2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_']
- datafiles[1][3] = ['2024-12-31_5_', '2025-01-01_5_']
- datafiles[1][4] = ['2025-02-15_5_', '2025-02-16_5_']
-
- features = ['r1 s1', 'r1 s4', 'r1 s5']
- # Using standard LaTeX formatting without span tags
- featureNames = {'r1 s1': r'$T_{evap}$', 'r1 s4': r'$T_{cond}$', 'r1 s5': r'$T_{air}$'}
- unitNames = {'r1 s1': r'($^o$C)', 'r1 s4': r'($^o$C)', 'r1 s5': r'($^o$C)'}
- NumFeatures = len(features)
-
- # Load and preprocess data (same as previous code)
- dataTrain = []
- for class_files in datafiles[0]:
- script_dir = os.path.dirname(os.path.abspath(__file__))
- data_dir = os.path.join(script_dir, 'data')
- class_dfs = []
- for base_filename in class_files:
- filepath = os.path.join(data_dir, f'{base_filename}.csv')
- try:
- df = pd.read_csv(filepath)
- df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce')
- df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce'))
- for col in features:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df = df.set_index('timestamp').resample('5Min')[features].mean()
- df = df[features].interpolate()
- class_dfs.append(df)
- except FileNotFoundError:
- print(f"Warning: File {filepath} not found and skipped.")
- if class_dfs:
- dataTrain.append(pd.concat(class_dfs))
- combined_train_data = pd.concat(dataTrain)
-
- dataTest = []
- for class_files in datafiles[1]:
- script_dir = os.path.dirname(os.path.abspath(__file__))
- data_dir = os.path.join(script_dir, 'data')
- class_dfs = []
- for base_filename in class_files:
- filepath = os.path.join(data_dir, f'{base_filename}.csv')
- try:
- df = pd.read_csv(filepath)
- df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce')
- df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce'))
- for col in features:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df = df.set_index('timestamp').resample('5Min')[features].mean()
- df = df[features].interpolate()
- class_dfs.append(df)
- except FileNotFoundError:
- print(f"Warning: File {filepath} not found and skipped.")
- if class_dfs:
- dataTest.append(pd.concat(class_dfs))
-
- # Normalize data
- scaler = StandardScaler() # Using StandardScaler
- scaled_train_data = scaler.fit_transform(combined_train_data[features])
- scaled_test_data_list = []
- for df in dataTest:
- scaled_test_data_list.append(scaler.transform(df[features]))
-
- scaled_train_df = pd.DataFrame(scaled_train_data, columns=features, index=combined_train_data.index)
- scaled_test_df_list = [pd.DataFrame(data, columns=features, index=df.index) for data, df in zip(scaled_test_data_list, dataTest)]
-
- # Create time sequences (NO Rate of Change)
- def create_sequences(data, timesteps):
- sequences = []
- for i in range(len(data) - timesteps + 1):
- sequences.append(data[i:i + timesteps])
- return np.array(sequences)
-
- X_train_sequences = create_sequences(scaled_train_df.values, timesteps)
- # X_test_sequences_list calculated later in main execution
-
- # Train K-Means model on all training data
- n_samples_train, n_timesteps_train, n_features_train = X_train_sequences.shape
- X_train_reshaped = X_train_sequences.reshape(n_samples_train, n_timesteps_train * n_features_train)
- kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=n_init) # Using n_init from options (FIXED)
- kmeans.fit(X_train_reshaped)
-
- # Function to detect change points (Function following sharp jumps or drops)
- # This function works on a single 2D data array (samples, features)
- def detect_change_points(data, threshold=0.8):
- change_points = []
- # Iterate through the data points starting from the second one
- for i in range(1, len(data)):
- # Calculate the absolute difference between the current point and the previous point
- difference = np.abs(data[i] - data[i-1])
- # If the difference for ANY feature is greater than the threshold, mark this point as a change point
- if np.any(difference > threshold):
- change_points.append(i)
- return np.array(change_points)
-
- # Function to plot clustered data (adapted to accept sequence indices and show change points)
- def plot_clustered_data(df, predicted_clusters, time_index, n_clusters, features, featureNames, unitNames, show_cp=False, change_point_indices=None):
- num_features = len(features)
- fig, axes = plt.subplots(num_features, 1, figsize=(15, 5 * num_features), sharex=True)
- if num_features == 1: axes = [axes] # Ensure axes is always an array
- colors = plt.cm.viridis(np.linspace(0, 1, n_clusters))
-
- for i, feature in enumerate(features):
- # Plot data points colored by their assigned cluster
- for cluster_id in range(n_clusters):
- cluster_indices_kmeans = np.where(predicted_clusters == cluster_id)[0]
- if len(cluster_indices_kmeans) > 0:
- # Use time_index for x-axis and original df for y-axis values
- axes[i].scatter(time_index[cluster_indices_kmeans], df[feature].loc[time_index[cluster_indices_kmeans]],
- color=colors[cluster_id], label=f'Cluster {cluster_id}', s=10, alpha=0.6) # Added alpha for better visualization of overlaps
- axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}')
- axes[i].set_title(featureNames[feature])
- axes[i].grid(True, linestyle='--', alpha=0.6) # Added grid for readability
-
- # Plot change points if enabled
- if show_cp and change_point_indices is not None:
- # Ensure change_point_indices contains datetime objects matching time_index
- for cp_time in change_point_indices:
- axes[i].axvline(x=cp_time, color='red', linestyle='--', linewidth=1.5, label='Change Point' if i == 0 else '', alpha=0.8) # Only one label for change point
-
- # Add legend to the last subplot, including change point if plotted
- handles, labels = [], []
- for ax in axes:
- for handle, label in zip(*ax.get_legend_handles_labels()):
- if label not in labels:
- handles.append(handle)
- labels.append(label)
- if handles:
- axes[-1].legend(handles, labels, loc='upper right')
-
- plt.tight_layout()
- plt.show()
-
- # Combined Evaluation Function (Full and Delayed)
- def evaluate_and_report(kmeans_model, scaled_test_data_list, original_test_data_list, true_labels_list, timesteps, delay_steps, features, options):
- all_y_true_full = [] # True labels for all test sequences
- all_predicted_cluster_labels_full = [] # Predicted clusters for all test sequences
- all_original_test_sequences_full = [] # Original sequences for plotting anomalies/misclassified later if needed
- all_change_points_detected_list = [] # Detected change points for each test file
-
- # --- 1. Collect data and predict clusters for ALL test sequences ---
- for k, (scaled_df, original_df, y_true_categorical) in enumerate(zip(scaled_test_data_list, original_test_data_list, true_labels_list)):
- original_indices = original_df.index
- # time_index for plotting corresponds to the end of each sequence
- time_index = original_indices[timesteps - 1:]
-
- sequences = create_sequences(scaled_df.values, timesteps)
- if sequences.size == 0:
- print(f"Warning: No sequences generated for test file {k}. Skipping.")
- all_change_points_detected_list.append([]) # Append empty list for consistency
- continue # Skip to next file
-
- n_sequences = sequences.shape[0]
- reshaped_sequences = sequences.reshape(n_sequences, -1)
- predicted_clusters = kmeans_model.predict(reshaped_sequences)
-
- # Collect true labels and predicted clusters for FULL evaluation
- all_y_true_full.extend(y_true_categorical[timesteps - 1:])
- all_predicted_cluster_labels_full.extend(predicted_clusters)
-
- # Collect original sequences (aligned with sequence ends) for potential plotting
- for i in range(n_sequences):
- start_index = original_df.index.get_loc(time_index[i]) - (timesteps - 1)
- end_index = start_index + timesteps
- all_original_test_sequences_full.append(original_df[features].iloc[start_index:end_index].values)
-
-
- # Detect change points for this test file (on scaled data)
- change_points = detect_change_points(scaled_df.values, threshold=0.8) # Adjust threshold as needed
- # Map change point indices to the time_index of sequences
- # A change at index `i` in the original data affects the sequence ending at `i`.
- # So, a change point at original index `cp_original` corresponds to the sequence ending at `cp_original`.
- # The index in the `sequences` array corresponding to original index `cp_original` is `cp_original - (timesteps - 1)`.
- # We need to make sure this sequence index is valid (>= 0 and < n_sequences).
- change_point_sequence_indices = change_points - (timesteps - 1)
- # Filter for valid sequence indices
- valid_change_point_sequence_indices = change_point_sequence_indices[(change_point_sequence_indices >= 0) & (change_point_sequence_indices < n_sequences)]
-
- # Store the time index of the valid change point sequences
- if valid_change_point_sequence_indices.size > 0:
- cp_time_indices = time_index[valid_change_point_sequence_indices].tolist()
- all_change_points_detected_list.append(cp_time_indices)
- else:
- all_change_points_detected_list.append([])
-
- # Plot clustered data for the current test file if requested and transition=False (to avoid duplicate plots handled later)
- if options.plot_clustered and not options.transition:
- print(f"\nClustered Data for Test File {k}:")
- plot_clustered_data(original_df, predicted_clusters, time_index, kmeans_model.n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=cp_time_indices if show_change_points else None)
-
-
- all_y_true_full = np.array(all_y_true_full)
- all_predicted_cluster_labels_full = np.array(all_predicted_cluster_labels_full)
- all_original_test_sequences_full = np.array(all_original_test_sequences_full)
-
-
- # --- 2. Perform FULL Evaluation (on all test sequences) ---
- print("\n--- Full Evaluation Results (All Test Sequences) ---")
-
- # Analyze clusters and assign a dominant true label to each cluster based on ALL test sequences
- cluster_dominant_label_full = {}
- for cluster_id in range(kmeans_model.n_clusters):
- indices_in_cluster = np.where(all_predicted_cluster_labels_full == cluster_id)[0]
- if len(indices_in_cluster) > 0:
- labels_in_cluster = all_y_true_full[indices_in_cluster]
- if len(labels_in_cluster) > 0:
- # Use np.argmax to find the index of the max count (dominant label)
- dominant_label = np.argmax(np.bincount(labels_in_cluster))
- cluster_dominant_label_full[cluster_id] = dominant_label
- else:
- cluster_dominant_label_full[cluster_id] = -1 # No data in this cluster with known labels
- else:
- cluster_dominant_label_full[cluster_id] = -1 # Empty cluster
-
- # Create predicted labels for full evaluation based on the dominant label of the assigned cluster
- predicted_labels_numeric_full = np.array([cluster_dominant_label_full.get(cluster_id, -1) for cluster_id in all_predicted_cluster_labels_full])
-
- # Evaluate (using numeric labels for the full set)
- valid_indices_full = predicted_labels_numeric_full != -1
- if np.sum(valid_indices_full) > 0 and len(np.unique(all_y_true_full[valid_indices_full])) > 1:
- print("Classification Report (Full Test Set):")
- print(classification_report(all_y_true_full[valid_indices_full], predicted_labels_numeric_full[valid_indices_full]))
- cm_full = confusion_matrix(all_y_true_full[valid_indices_full], predicted_labels_numeric_full[valid_indices_full])
- plt.figure(figsize=(8, 6))
- sns.heatmap(cm_full, annot=True, fmt='d', cmap='Blues')
- plt.xlabel('Predicted Cluster (Dominant True Label)')
- plt.ylabel('True Label')
- plt.title('Confusion Matrix (Full Test Set)')
- plt.show()
- else:
- print("Could not perform full evaluation (not enough data or classes after mapping).")
-
-
- # --- 3. Perform DELAYED Evaluation (on subset after delay) ---
- print("\n--- Delayed Evaluation Results (Subset after Delay) ---")
-
- all_y_true_delayed = []
- all_predicted_cluster_labels_delayed = []
-
- # Recalculate evaluation_allowed based on combined sequence indices and change points across all test data
- # This requires mapping the original change point indices back to the combined sequence indices.
- # This is complex. Let's simplify: Apply delay logic PER FILE, then combine. This is what the original evaluate_with_delay did.
- # Let's revert the data collection for delayed eval back to per-file processing for simplicity and match original intent.
-
- # Re-process test data per file for delayed evaluation
- for k, (scaled_df, original_df, y_true_categorical) in enumerate(zip(scaled_test_data_list, original_test_data_list, true_labels_list)):
- original_indices = original_df.index
- time_index = original_indices[timesteps - 1:]
- sequences = create_sequences(scaled_df.values, timesteps)
- if sequences.size == 0: continue # Skip empty files
-
- n_sequences = sequences.shape[0]
- reshaped_sequences = sequences.reshape(n_sequences, -1)
- predicted_clusters = kmeans_model.predict(reshaped_sequences)
-
- # Detect change points for THIS file (on scaled data)
- change_points = detect_change_points(scaled_df.values, threshold=0.8) # Adjust threshold as needed
-
- # Apply delay logic PER FILE
- evaluation_allowed_file = np.ones(n_sequences, dtype=bool)
- # Map original data change point indices to sequence indices for delay logic
- change_point_sequence_indices_file = change_points - (timesteps - 1)
- # Filter for valid sequence indices for delay
- valid_change_point_sequence_indices_file = change_point_sequence_indices_file[(change_point_sequence_indices_file >= 0) & (change_point_sequence_indices_file < n_sequences)]
-
-
- for cp_seq_index in valid_change_point_sequence_indices_file:
- start_delay = max(0, cp_seq_index)
- end_delay = min(n_sequences, cp_seq_index + delay_steps)
- evaluation_allowed_file[start_delay:end_delay] = False
-
-
- # Collect data for DELAYED evaluation (only where evaluation_allowed_file is True)
- # Use the dominant label mapping calculated on the FULL test set for consistency
- predicted_labels_numeric_file = np.array([cluster_dominant_label_full.get(cluster, -1) for cluster in predicted_clusters]) # Use full mapping
- true_labels_file = y_true_categorical[timesteps - 1:] # True labels aligned with sequences
-
- all_y_true_delayed.extend(true_labels_file[evaluation_allowed_file])
- all_predicted_cluster_labels_delayed.extend(predicted_labels_numeric_file[evaluation_allowed_file])
-
-
- all_y_true_delayed = np.array(all_y_true_delayed)
- all_predicted_cluster_labels_delayed = np.array(all_predicted_cluster_labels_delayed)
-
-
- # Perform Delayed Evaluation
- valid_indices_delayed = all_predicted_cluster_labels_delayed != -1
- if np.sum(valid_indices_delayed) > 0 and len(np.unique(all_y_true_delayed[valid_indices_delayed])) > 1:
- print("Classification Report (Subset after Delay):")
- print(classification_report(all_y_true_delayed[valid_indices_delayed], all_predicted_cluster_labels_delayed[valid_indices_delayed]))
- cm_delayed = confusion_matrix(all_y_true_delayed[valid_indices_delayed], all_predicted_cluster_labels_delayed[valid_indices_delayed])
- plt.figure(figsize=(8, 6))
- sns.heatmap(cm_delayed, annot=True, fmt='d', cmap='Blues')
- plt.xlabel('Predicted Label (Delayed)')
- plt.ylabel('True Label')
- plt.title('Confusion Matrix (Subset after Delay)')
- plt.show()
- else:
- print("Could not perform delayed evaluation (not enough data after delay or classes).")
-
- # --- 4. Report Detected Change Points ---
- print("\nDetected Change Points (Start Time of Sequence after Change):")
- # Print the collected list of change point time indices per file
- for i, cp_list in enumerate(all_change_points_detected_list):
- print(f"File {i}: {cp_list}")
-
- # Note: Anomaly and Misclassified plotting is not implemented in this version due to complexity with delayed evaluation subset.
-
- # Main execution
- if __name__ == "__main__":
- # Load and preprocess training data (already done outside __name__ == "__main__")
- # Load and preprocess test data (already done outside __name__ == "__main__")
-
- # Create true labels list for test data
- true_labels_list = []
- for i, df in enumerate(dataTest):
- true_labels_list.append(np.full(len(df), i))
-
- # Plot raw data if requested
- if options.plot_raw:
- print("\nPlotting Raw Data:")
- num_features = len(features)
- fig, axes = plt.subplots(num_features, 1, figsize=(15, 5 * num_features), sharex=True)
- if num_features == 1: axes = [axes]
- for i, feature in enumerate(features):
- for k, df in enumerate(dataTest):
- axes[i].plot(df.index, df[feature], label=f'Class {k}', alpha=0.7) # Added alpha
- axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}')
- axes[i].set_title(featureNames[feature])
- axes[-1].legend(loc='upper right') # Legend on the last subplot
- plt.tight_layout()
- plt.show()
-
- # Plot clustered data for training set if requested
- train_sequences = create_sequences(scaled_train_df.values, timesteps)
- train_reshaped_sequences = train_sequences.reshape(train_sequences.shape[0], -1)
- train_predicted_clusters = kmeans.predict(train_reshaped_sequences)
- train_time_index = combined_train_data.index[timesteps - 1:]
- if options.plot_clustered:
- print("\nClustered Data for Training Set:")
- # Change points detection for training data (optional)
- train_change_points = detect_change_points(scaled_train_df.values, threshold=0.8)
- train_change_point_sequence_indices = train_change_points - (timesteps - 1)
- valid_train_change_point_sequence_indices = train_change_point_sequence_indices[(train_change_point_sequence_indices >= 0) & (train_change_point_sequence_indices < train_sequences.shape[0])]
- train_cp_time_indices = train_time_index[valid_train_change_point_sequence_indices].tolist() if valid_train_change_point_sequence_indices.size > 0 else None
-
- plot_clustered_data(combined_train_data.loc[train_time_index], train_predicted_clusters, train_time_index, n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=train_cp_time_indices)
-
-
- # Plot clustered data for TEST set (per file) if requested
- # Note: This iterates through test files and plots each one's clustered data.
- # It also calculates and plots change points for each file.
- if options.plot_clustered:
- print("\nClustered Data for Test Sets (per file):")
- for k, (scaled_df, original_df) in enumerate(zip(scaled_test_df_list, dataTest)):
- original_indices = original_df.index
- time_index = original_indices[timesteps - 1:]
- sequences = create_sequences(scaled_df.values, timesteps)
- if sequences.size == 0: continue
- reshaped_sequences = sequences.reshape(sequences.shape[0], -1)
- predicted_clusters = kmeans.predict(reshaped_sequences)
-
- # Change points detection for this test file
- change_points = detect_change_points(scaled_df.values, threshold=0.8)
- change_point_sequence_indices = change_points - (timesteps - 1)
- valid_change_point_sequence_indices = change_point_sequence_indices[(change_point_sequence_indices >= 0) & (change_point_sequence_indices < sequences.shape[0])]
- cp_time_indices = time_index[valid_change_point_sequence_indices].tolist() if valid_change_point_sequence_indices.size > 0 else None
-
- print(f" Plotting Test File {k}")
- plot_clustered_data(original_df, predicted_clusters, time_index, n_clusters, features, featureNames, unitNames, show_cp=show_change_points, change_point_indices=cp_time_indices)
-
-
- # Perform Evaluation (Full and Delayed)
- # This function handles all evaluation reporting and confusion matrix plotting
- evaluate_and_report(kmeans, scaled_test_df_list, dataTest, true_labels_list, timesteps, delay_steps, features, options) # Corrected: Pass scaled_test_df_list
-
- # Calculate and print Inertia and Silhouette Score for combined test data
- # Need to combine all test sequences and cluster labels first, if not already done
- X_test_sequences_combined = np.vstack([create_sequences(df.values, timesteps) for df in scaled_test_df_list if create_sequences(df.values, timesteps).size > 0])
- if X_test_sequences_combined.size > 0:
- X_test_combined_reshaped = X_test_sequences_combined.reshape(X_test_sequences_combined.shape[0], -1)
- all_cluster_labels_test_combined = kmeans.predict(X_test_combined_reshaped)
-
- print("\n--- K-Means Model Evaluation (Overall Metrics on Combined Test Data) ---")
- print(f"Inertia: {kmeans.inertia_:.4f}") # Inertia is from training fit
-
- # Silhouette score on the combined test data
- if len(np.unique(all_cluster_labels_test_combined)) > 1 and len(all_cluster_labels_test_combined) > 0:
- silhouette = silhouette_score(X_test_combined_reshaped, all_cluster_labels_test_combined)
- print(f"Silhouette Score: {silhouette:.4f}")
- else:
- print("Silhouette Score: Not applicable for single cluster on combined test data.")
- else:
- print("\n--- K-Means Model Evaluation (Overall Metrics) ---")
- print("No test data sequences available to evaluate overall Inertia and Silhouette Score.")
-
- # Note: Anomaly and Misclassified plotting is not implemented in this version.
|