# Import necessary libraries import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.cluster import KMeans # Import scalers and metrics from scikit-learn from sklearn.preprocessing import StandardScaler, RobustScaler, LabelEncoder # RobustScaler is used for scaling from sklearn.metrics import silhouette_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, confusion_matrix, classification_report import argparse # For parsing command line arguments import os # For path manipulation import seaborn as sns # For enhanced data visualization (like confusion matrix) # Command line arguments setup # Defines the command line interface for the script, allowing users to specify parameters parser = argparse.ArgumentParser(description='Anomaly detection using K-Means clustering with visualization.') # --timesteps: Number of data points in each sequence (time window) parser.add_argument('--timesteps', type=int, default=20, help='Number of timesteps for sequences.') # --n_clusters: Number of clusters for K-Means. Expected to be number of failure types + normal state. parser.add_argument('--n_clusters', type=int, default=5, help='Number of clusters for K-Means (should match the number of failure types + normal).') # --n_init: Number of times K-Means is run with different initial centroids. The best result is chosen. parser.add_argument('--n_init', type=int, default=10, help='Number of initializations for K-Means.') # --transition: Flag to use data files that include transition periods for testing. parser.add_argument('--transition', action='store_true', help='Use transition data for testing.') # Plotting flags: control which plots are displayed. parser.add_argument('--plot_raw', action='store_true', help='Plot raw data.') parser.add_argument('--plot_clustered', action='store_true', help='Plot clustered data.') parser.add_argument('--plot_anomalies', action='store_true', help='Plot detected anomalies (based on clusters).') parser.add_argument('--plot_misclassified', action='store_true', help='Plot misclassified instances (based on clusters).') # Parse the arguments provided by the user options = parser.parse_args() # Assign parsed arguments to variables n_clusters = options.n_clusters timesteps = options.timesteps n_init = options.n_init ##################################################################################################### # Data File Configuration ##################################################################################################### # Number of distinct failure types we have data for (excluding the normal state) NumberOfFailures = 4 # So far, we have only data for the first 4 types of failures # List to hold file paths for training and testing data # datafiles[0]: training data files, datafiles[1]: testing data files # Inner lists correspond to different classes/failure types (0: Normal, 1-4: Failure Types) datafiles = [[], []] # 0 for train, 1 for test # Initialize inner lists for each class (Normal + NumberOfFailures) for i in range(NumberOfFailures + 1): datafiles[0].append([]) datafiles[1].append([]) # Assign specific filenames to each class for the training set # datafiles[0][0]: Normal training data # datafiles[0][1]: Failure Type 1 training data # ... and so on datafiles[0][0] = ['2024-08-07_5_', '2024-08-08_5_', '2025-01-25_5_', '2025-01-26_5_'] datafiles[0][1] = ['2024-12-11_5_', '2024-12-12_5_', '2024-12-13_5_'] datafiles[0][2] = ['2024-12-18_5_', '2024-12-21_5_', '2024-12-22_5_', '2024-12-23_5_', '2024-12-24_5_'] datafiles[0][3] = ['2024-12-28_5_', '2024-12-29_5_', '2024-12-30_5_'] datafiles[0][4] = ['2025-02-13_5_', '2025-02-14_5_'] # Assign specific filenames for the testing set # Uses different files based on whether the --transition flag is set if options.transition: # Test files including transition data datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_'] datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_', '2024-12-16_5_'] # with TRANSITION datafiles[1][2] = ['2024-12-17_5_', '2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_'] # with TRANSITION datafiles[1][3] = ['2024-12-27_5_', '2024-12-31_5_', '2025-01-01_5_'] # with TRANSITION datafiles[1][4] = ['2025-02-12_5_', '2025-02-15_5_', '2025-02-16_5_'] else: # Test files without explicit transition data datafiles[1][0] = ['2025-01-27_5_', '2025-01-28_5_'] datafiles[1][1] = ['2024-12-14_5_', '2024-12-15_5_'] datafiles[1][2] = ['2024-12-19_5_', '2024-12-25_5_', '2024-12-26_5_'] datafiles[1][3] = ['2024-12-31_5_', '2025-01-01_5_'] datafiles[1][4] = ['2025-02-15_5_', '2025-02-16_5_'] # Features (columns) to be used from the data files features = ['r1 s1', 'r1 s4', 'r1 s5'] # Store the initial count of features before potentially adding derived features n_original_features = len(features) # Store the original number of features # Dictionaries to map feature names to display names (e.g., for plots) featureNames = {} featureNames['r1 s1'] = r'T\_\{evap\}' # Evaporator Temperature featureNames['r1 s4'] = r'T\_\{cond\}' # Condenser Temperature featureNames['r1 s5'] = r'T\_\{air\}' # Air Temperature # Dictionaries to map feature names to their units (e.g., for plots) unitNames = {} unitNames['r1 s1'] = r'($^o$C)' unitNames['r1 s4'] = r'($^o$C)' unitNames['r1 s5'] = r'($^o$C)' # Redundant variable, but kept from original code NumFeatures = len(features) ##################################################################################################### # Data Loading and Preprocessing (Training Data) ##################################################################################################### # List to hold DataFrames for training data, organized by class dataTrain = [] # Loop through each list of files for each training class for class_files in datafiles[0]: class_dfs = [] # List to hold dataframes for current class # Loop through each filename in the current class for base_filename in class_files: # Construct the full file path script_dir = os.path.dirname(os.path.abspath(__file__)) # Get directory of the current script data_dir = os.path.join(script_dir, 'data') # Assume data is in a 'data' subdirectory filepath = os.path.join(data_dir, f'{base_filename}.csv') # Full path to the CSV file try: # Read the CSV file into a pandas DataFrame df = pd.read_csv(filepath) # Convert 'datetime' column to datetime objects using two possible formats, coercing errors df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce') df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce')) # Convert feature columns to numeric, coercing errors to NaN for col in features: df[col] = pd.to_numeric(df[col], errors='coerce') # Set the timestamp as index, resample to 5-minute frequency, and calculate the mean for features df = df.set_index('timestamp').resample('5Min')[features].mean() # Resample and calculate mean only for features # Estimate missing values (NaN) using linear interpolation df = df[features].interpolate() # Estimate missing values using linear interpolation # Append the processed DataFrame to the list for the current class class_dfs.append(df) except FileNotFoundError: # Print a warning if a file is not found and skip it print(f"Warning: File {filepath} not found and skipped.") # If any files were successfully loaded for this class, concatenate them if class_dfs: dataTrain.append(pd.concat(class_dfs)) # Concatenate all class DataFrames into a single DataFrame for training combined_train_data = pd.concat(dataTrain) ##################################################################################################### # Data Loading and Preprocessing (Test Data) ##################################################################################################### # List to hold DataFrames for test data, organized by class # Each element in dataTest corresponds to a different class (Normal, Failure Type 1, etc.) dataTest = [] # Loop through each list of files for each test class for class_files in datafiles[1]: class_dfs = [] # List to hold dataframes for current class # Loop through each filename in the current class for base_filename in class_files: # Construct the full file path script_dir = os.path.dirname(os.path.abspath(__file__)) # Get directory of the current script data_dir = os.path.join(script_dir, 'data') # Assume data is in a 'data' subdirectory filepath = os.path.join(data_dir, f'{base_filename}.csv') # Full path to the CSV file try: # Read the CSV file into a pandas DataFrame df = pd.read_csv(filepath) # Convert 'datetime' column to datetime objects using two possible formats, coercing errors df['timestamp'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M', errors='coerce') df['timestamp'] = df['timestamp'].fillna(pd.to_datetime(df['datetime'], format='%d-%m-%Y %H:%M:%S', errors='coerce')) # Convert feature columns to numeric, coercing errors to NaN for col in features: df[col] = pd.to_numeric(df[col], errors='coerce') # Set the timestamp as index, resample to 5-minute frequency, and calculate the mean for features df = df.set_index('timestamp').resample('5Min')[features].mean() # Resample and calculate mean only for features # Estimate missing values (NaN) using linear interpolation df = df[features].interpolate() # Estimate missing values using linear interpolation # Append the processed DataFrame to the list for the current class class_dfs.append(df) except FileNotFoundError: # Print a warning if a file is not found and skip it print(f"Warning: File {filepath} not found and skipped.") # If any files were successfully loaded for this class, concatenate them if class_dfs: dataTest.append(pd.concat(class_dfs)) ##################################################################################################### # Raw Data Plotting (Optional) ##################################################################################################### # Plot raw data if the --plot_raw flag is provided if options.plot_raw: num_features = len(features) # Create a figure and a set of subplots (one for each feature) fig, axes = plt.subplots(num_features, 1, figsize=(15, 5 * num_features), sharex=True) # Ensure axes is an array even if there's only one feature if num_features == 1: axes = [axes] # Loop through each feature for i, feature in enumerate(features): # Loop through each test data DataFrame (each class) for k, df in enumerate(dataTest): # Plot the feature data over time for the current class axes[i].plot(df.index, df[feature], label=f'Class {k}') # Set ylabel and title for the subplot axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}') axes[i].set_title(featureNames[feature]) # Add legend to the subplot axes[i].legend() # Adjust layout to prevent labels overlapping plt.tight_layout() # Display the plot plt.show() # exit(0) # Uncomment to exit after plotting raw data ######################################################################################################## # Data Scaling ######################################################################################################## # Initialize the scaler (RobustScaler is less affected by outliers than StandardScaler) # StandardScaler() # Original scaler scaler = RobustScaler() # Changed from StandardScaler # Fit the scaler on the training data and transform it # Only the original features are scaled scaled_train_data = scaler.fit_transform(combined_train_data[features]) # Normalize only the original features # Transform the test data using the scaler fitted on the training data # A list comprehension is used to transform each test DataFrame scaled_test_data_list = [scaler.transform(df[features]) for df in dataTest] # Normalize only the original features # Convert normalized data back to pandas DataFrames for easier handling (optional but can be useful) scaled_train_df = pd.DataFrame(scaled_train_data, columns=features, index=combined_train_data.index) scaled_test_df_list = [pd.DataFrame(data, columns=features, index=df.index) for data, df in zip(scaled_test_data_list, dataTest)] ############################################################################################################ # Sequence Creation with Rate of Change Feature Engineering ############################################################################################################ # Function to create time sequences from data and append the rate of change as new features def create_sequences_with_rate_of_change(data, timesteps, original_features_count): # Parameter name indicates count sequences = [] # List to store the created sequences # Iterate through the data to create overlapping sequences for i in range(len(data) - timesteps + 1): # Extract a sequence of 'timesteps' length sequence = data[i:i + timesteps] # Calculate the difference between consecutive points along the time axis (axis=0) # This computes the rate of change for each feature across timesteps rate_of_change = np.diff(sequence[:timesteps], axis=0) # Pad the rate of change to have the same number of timesteps as the original sequence # np.diff reduces the number of timesteps by 1, so we add a row of zeros at the beginning # Use the count of original features for padding dimension padding = np.zeros((1, original_features_count)) # Corrected: Use the features count rate_of_change_padded = np.vstack((padding, rate_of_change)) # Stack the padding on top # Concatenate the original sequence and the padded rate of change sequence horizontally # Resulting sequence has 'timesteps' rows and '2 * original_features_count' columns sequences.append(np.hstack((sequence, rate_of_change_padded))) # Concatenate original and rate of change # Convert the list of sequences into a NumPy array return np.array(sequences) # Create time sequences with rate of change for the scaled training data # The output shape will be (num_training_sequences, timesteps, 2 * n_original_features) X_train_sequences = create_sequences_with_rate_of_change(scaled_train_df.values, timesteps, n_original_features) # Pass n_original_features # Create time sequences with rate of change for each scaled test data DataFrame # X_test_sequences_list will be a list of arrays, one for each test class X_test_sequences_list = [create_sequences_with_rate_of_change(df.values, timesteps, n_original_features) for df in scaled_test_df_list] # Pass n_original_features ############################################################################################################ # K-Means Clustering Model ############################################################################################################ # Reshape the training sequences for K-Means # K-Means expects a 2D array (samples, features) # We flatten each sequence (timesteps * total_features) into a single row n_samples, n_timesteps, n_total_features = X_train_sequences.shape X_train_reshaped = X_train_sequences.reshape(n_samples, n_timesteps * n_total_features) # Train the K-Means model # n_clusters: Number of clusters (expected to be number of classes) # random_state=42: Ensures reproducibility of initial centroids for n_init runs # n_init=10: Runs K-Means 10 times with different centroid seeds and picks the best result (lowest inertia) kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=n_init) # n_init to avoid convergence to local optima # Fit the K-Means model on the reshaped training data kmeans.fit(X_train_reshaped) ############################################################################################################################ # Predict Clusters for Test Data ############################################################################################################################ # List to store predicted cluster labels for each test data DataFrame cluster_labels_test_list = [] # List to store reshaped test data (useful for evaluation metrics later) X_test_reshaped_list = [] # kmeans_models = [] # To store kmeans model for each test set (This variable is declared but not used subsequently) # Loop through each test data sequence array (each class) for i, X_test_seq in enumerate(X_test_sequences_list): # Get dimensions of the current test sequence array n_samples_test, n_timesteps_test, n_total_features_test = X_test_seq.shape # Reshape the test sequences for prediction (flatten each sequence) X_test_reshaped = X_test_seq.reshape(n_samples_test, n_timesteps_test * n_total_features_test) # Predict cluster labels for the reshaped test data labels = kmeans.predict(X_test_reshaped) # Append the predicted labels and reshaped data to the lists cluster_labels_test_list.append(labels) X_test_reshaped_list.append(X_test_reshaped) # Append reshaped data # kmeans_models.append(kmeans) # Store the trained kmeans model (Variable declared but not used) ############################################################################################################################ # Plotting Clustered Data (Optional) ############################################################################################################ # Function to plot the original data points colored by their assigned cluster label # Plots only the original features def plot_clustered_data(original_data_list, cluster_labels_list, n_clusters, features, featureNames, unitNames): num_features = len(features) # Create subplots, one for each original feature fig, axes = plt.subplots(num_features, 1, figsize=(15, 5 * num_features), sharex=True) # Ensure axes is an array even if only one feature if num_features == 1: axes = [axes] # Generate a color map for the clusters colors = plt.cm.viridis(np.linspace(0, 1, n_clusters)) # Assign colors to each cluster # Loop through each original test data DataFrame (each class) for k, df in enumerate(original_data_list): original_indices = df.index # Get the original time index # Get the time index corresponding to the start of each sequence (shifted by timesteps-1) time_index = original_indices[timesteps - 1:] # Loop through each original feature for i, feature in enumerate(features): # Loop through each predicted cluster ID for cluster_id in range(n_clusters): # Find the indices in the current test data corresponding to the current cluster ID cluster_indices_kmeans = np.where(cluster_labels_list[k] == cluster_id)[0] # If there are data points assigned to this cluster if len(cluster_indices_kmeans) > 0: # Scatter plot the data points for this cluster # x-axis: time_index points corresponding to the sequence end # y-axis: original feature values at those time_index points # color: color assigned to the cluster # label: label for the cluster (only show for the first class (k==0) to avoid redundant legends) # s=10: size of the scatter points axes[i].scatter(time_index[cluster_indices_kmeans], df[feature].loc[time_index[cluster_indices_kmeans]], color=colors[cluster_id], label=f'Cluster {cluster_id}' if k == 0 else "", s=10) # Set ylabel and title for the subplot axes[i].set_ylabel(f'{featureNames[feature]} {unitNames[feature]}') axes[i].set_title(featureNames[feature]) # Add legend to the last subplot (or each if desired) axes[num_features - 1].legend(loc='upper right') # Place legend on the last subplot # Adjust layout and display the plot plt.tight_layout() plt.show() # Call the plotting function if the --plot_clustered flag is provided if options.plot_clustered: plot_clustered_data(dataTest, cluster_labels_test_list, n_clusters, features, featureNames, unitNames) ##################################################################################################### # Evaluation and plotting of anomalies and misclassified instances (based on cluster labels) ##################################################################################################### # Function to evaluate clustering results and plot anomalies/misclassified instances def evaluate_and_plot_anomalies(kmeans_model, scaled_test_data_list, n_clusters, original_test_data_list, true_labels_list, features, featureNames, unitNames, plot_anomalies=False, plot_misclassified=False): # Lists to store collected data and labels across all test classes all_y_true_categorical = [] # Stores true labels (0, 1, 2, ...) for each sequence all_predicted_cluster_labels = [] # Stores predicted cluster ID for each sequence all_original_test_sequences = [] # Stores the original feature values for each sequence (for plotting) # Lists to store evaluation metrics per test class (before combining) inertia_values = [] # Inertia values for each class's data predicted by the model silhouette_scores = [] # Silhouette scores for each class's data predicted by the model # Loop through each test class data (scaled, original, and true labels) for i, (scaled_test_df, original_test_df, y_true_categorical) in enumerate(zip(scaled_test_data_list, original_test_data_list, true_labels_list)): # Create sequences with rate of change for the current scaled test data X_test_sequences = create_sequences_with_rate_of_change(scaled_test_df.values, timesteps, n_original_features) # Pass n_original_features # Skip evaluation for this class if no sequences were generated (data too short) if X_test_sequences.size == 0: print(f"Warning: No test sequences generated for class {i}. Skipping evaluation for this class.") continue # Reshape the sequences for prediction by the trained K-Means model n_samples_test = X_test_sequences.shape[0] X_test_reshaped = X_test_sequences.reshape(n_samples_test, -1) # Predict cluster labels for the current test class data cluster_labels_predicted = kmeans_model.predict(X_test_reshaped) # Calculate and store Inertia for the current class's data (based on the overall model) # This is different from the model's final inertia on training data inertia_values.append(kmeans_model.inertia_) # Note: This seems to append the total model inertia, not per-class inertia. It might be intended to be calculated differently here. Keeping original code logic. # Calculate and store Silhouette score if possible (requires >1 unique labels and >0 samples) if len(np.unique(cluster_labels_predicted)) > 1 and len(cluster_labels_predicted) > 0: silhouette_scores.append(silhouette_score(X_test_reshaped, cluster_labels_predicted)) else: silhouette_scores.append(np.nan) # Append NaN if silhouette cannot be calculated # Get the time indices corresponding to the end of each sequence in the original data original_indices = original_test_df.index[timesteps - 1:] # Collect true labels, predicted labels, and original sequences for evaluation/plotting # Loop through the sequences generated for the current class for j, label in enumerate(y_true_categorical[timesteps - 1:]): # Iterate over true labels corresponding to sequence ends all_y_true_categorical.append(label) # Append the true label all_predicted_cluster_labels.append(cluster_labels_predicted[j]) # Append the predicted cluster label # Get the start and end index in the original DataFrame for the current sequence start_index = original_test_df.index.get_loc(original_indices[j]) - (timesteps - 1) end_index = start_index + timesteps # Extract and append the original feature values for the current sequence all_original_test_sequences.append(original_test_df[features].iloc[start_index:end_index].values) # Append # Convert collected lists to NumPy arrays for easier handling all_y_true_categorical = np.array(all_y_true_categorical) all_predicted_cluster_labels = np.array(all_predicted_cluster_labels) all_original_test_sequences = np.array(all_original_test_sequences) # Print evaluation metrics (based on collected values across all test classes) print("\nEvaluation Metrics:") # Print mean Inertia (likely the final Inertia of the trained model as per the loop) print(f"Inertia (final): {np.mean(inertia_values):.4f}") # Check if this is the intended calculation # Print mean Silhouette score across classes (ignoring NaNs) print(f"Average Silhouette Score (valid cases): {np.nanmean(silhouette_scores):.4f}") # Analyze clusters and assign a dominant true label to each cluster ID # This helps in mapping cluster IDs back to meaningful class labels for evaluation cluster_dominant_label = {} # Dictionary to store the dominant true label for each cluster ID for cluster_id in range(n_clusters): # Loop through each cluster ID # Find indices of all sequences assigned to the current cluster ID indices_in_cluster = np.where(all_predicted_cluster_labels == cluster_id)[0] # If there are sequences in this cluster if len(indices_in_cluster) > 0: # Get the true labels for all sequences in this cluster labels_in_cluster = all_y_true_categorical[indices_in_cluster] # If there are labels (and thus samples) in this cluster if len(labels_in_cluster) > 0: # Find the most frequent true label (dominant label) in this cluster dominant_label = np.argmax(np.bincount(labels_in_cluster)) cluster_dominant_label[cluster_id] = dominant_label # Store the dominant label else: cluster_dominant_label[cluster_id] = -1 # Assign -1 if no data points have true labels (shouldn't happen if indices_in_cluster > 0 and all_y_true_categorical is aligned) else: cluster_dominant_label[cluster_id] = -1 # Assign -1 if the cluster is empty # Create predicted labels in numeric form based on the dominant true label of the assigned cluster # This maps the predicted cluster ID for each sequence to the dominant true label of that cluster predicted_labels_numeric = np.array([cluster_dominant_label.get(cluster_id, -1) for cluster_id in all_predicted_cluster_labels]) # Evaluate the clustering's ability to separate classes using classification metrics # Only consider instances where a dominant label could be assigned (predicted_labels_numeric != -1) valid_indices = predicted_labels_numeric != -1 # Indices where a dominant label mapping exists # Perform evaluation if there are valid instances and more than one true class represented if np.sum(valid_indices) > 0 and len(np.unique(all_y_true_categorical[valid_indices])) > 1: print("\nEvaluation Results (Clusters vs True Labels):") # Print classification report (Precision, Recall, F1-score per class, and overall metrics) print(classification_report(all_y_true_categorical[valid_indices], predicted_labels_numeric[valid_indices])) # Compute the confusion matrix cm = confusion_matrix(all_y_true_categorical[valid_indices], predicted_labels_numeric[valid_indices]) # Plot the confusion matrix using seaborn heatmap plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') # annot=True shows values, fmt='d' formats as integers plt.xlabel('Predicted Cluster (Dominant True Label)') # Label for x-axis plt.ylabel('True Label') # Label for y-axis plt.title('Confusion Matrix (Clusters vs True Labels)') # Title of the plot plt.show() # Display the plot else: print("\nCould not perform detailed evaluation (not enough data or classes).") ################################################################################################# # Plotting Anomalies (Optional) ################################################################################################# # Plot detected anomalies if the --plot_anomalies flag is provided # Anomalies are defined here as instances assigned to clusters whose dominant true label is > 0 (Failure types) if plot_anomalies: print("\nChecking anomaly data:") # Identify clusters that predominantly contain non-normal true labels (failure types) anomaly_clusters = [cluster_id for cluster_id, label in cluster_dominant_label.items() if label > 0] # Find indices of all sequences assigned to these "anomaly" clusters anomaly_indices = np.where(np.isin(all_predicted_cluster_labels, anomaly_clusters))[0] # If any anomalies are detected if len(anomaly_indices) > 0: # Limit the number of anomaly plots to show num_anomalies_to_plot = min(5, len(anomaly_indices)) colors = ['red', 'green', 'blue'] # Define different colors for features # Randomly select and plot a few anomaly sequences for i in np.random.choice(anomaly_indices, num_anomalies_to_plot, replace=False): # Print shape and sample values for the sequence being plotted print(f"Shape of all_original_test_sequences[{i}]: {all_original_test_sequences[i].shape}") print(f"First few values of all_original_test_sequences[{i}]:\n{all_original_test_sequences[i][:5]}") # Create a new figure for each anomaly plot plt.figure(figsize=(12, 6)) # Plot each feature in the sequence over time steps for j, feature in enumerate(features): # Plot the feature values (y-axis) against time steps (x-axis) plt.plot(np.arange(timesteps), all_original_test_sequences[i][:, j], label=feature, color=colors[j % len(colors)]) # Get the true label and predicted cluster for the title true_label = all_y_true_categorical[i] predicted_cluster_for_title = all_predicted_cluster_labels[i] # Set the title for the anomaly plot, including true label and predicted cluster plt.title(f'Detected Anomaly (True: {true_label}, Cluster: {predicted_cluster_for_title})') # Corrected title format plt.xlabel('Time Step') plt.ylabel('Value') plt.legend() # Add legend to identify features plt.show() # Display the plot else: print("No anomalies detected based on cluster dominance.") ################################################################################################# # Plotting Misclassified Instances (Optional) ################################################################################################# # Plot misclassified instances if the --plot_misclassified flag is provided # Misclassified are defined here as instances where the true label is DIFFERENT from the dominant label of the assigned cluster if plot_misclassified: print("\nChecking misclassified data:") # Find indices where the true label does not match the dominant label of the predicted cluster misclassified_indices = np.where(all_y_true_categorical != predicted_labels_numeric)[0] # If any misclassified instances are found if len(misclassified_indices) > 0: # Limit the number of misclassified plots to show num_misclassified_to_plot = min(5, len(misclassified_indices)) colors = ['red', 'green', 'blue'] # Define different colors for features # Randomly select and plot a few misclassified sequences for i in np.random.choice(misclassified_indices, num_misclassified_to_plot, replace=False): # Print shape and sample values for the sequence being plotted print(f"Shape of all_original_test_sequences[{i}]: {all_original_test_sequences[i].shape}") print(f"First few values of all_original_test_sequences[{i}]:\n{all_original_test_sequences[i][:5]}") # Create a new figure for each misclassified plot plt.figure(figsize=(12, 6)) # Plot each feature in the sequence over time steps for j, feature in enumerate(features): # Plot the feature values (y-axis) against time steps (x-axis) plt.plot(np.arange(timesteps), all_original_test_sequences[i][:, j], label=feature, color=colors[j % len(colors)]) # FIXED: Get labels using index i for plot title true_label = all_y_true_categorical[i] # Get the true label predicted_label = predicted_labels_numeric[i] # Get the numeric predicted label (dominant cluster label) # Set the title for the misclassified plot, including true label and predicted cluster's dominant label plt.title(f'Misclassified Instance (True: {true_label}, Predicted Cluster Dominant Label: {predicted_label})') # Corrected title format plt.xlabel('Time Step') plt.ylabel('Value') plt.legend() # Add legend to identify features plt.show() # Display the plot else: print("No misclassified instances found based on cluster dominance.") # Return the true and predicted labels for potential further use return all_y_true_categorical, predicted_labels_numeric ##################################################################################################### # Main Execution ##################################################################################################### # Create the list of true labels for the test data # Assign a numeric label (0, 1, 2, ...) to each sequence based on its original file class true_labels_list = [] for i, df in enumerate(dataTest): # Loop through each test DataFrame (each class) # Create a numpy array of the same length as the DataFrame, filled with the class index (i) true_labels_list.append(np.full(len(df), i)) # Call the evaluation and plotting function with the necessary data and options y_true_final, y_pred_final = evaluate_and_plot_anomalies(kmeans, scaled_test_df_list, n_clusters, dataTest, true_labels_list, features, featureNames, unitNames, plot_anomalies=options.plot_anomalies, plot_misclassified=options.plot_misclassified) ##################################################################################################### # Final Evaluation Metrics (on combined test data) ##################################################################################################### # Calculate and print final Inertia and Silhouette Score for the combined test data # Check if there's any reshaped test data available if X_test_reshaped_list: # Vertically stack all reshaped test data arrays into a single array X_test_combined_reshaped = np.vstack(X_test_reshaped_list) # Concatenate all predicted cluster labels into a single array all_cluster_labels_test = np.concatenate(cluster_labels_test_list) # Print K-Means evaluation metrics on the combined test data print("\nK-Means Model Evaluation on Combined Test Data:") # Print the final Inertia of the trained K-Means model print(f"Inertia: {kmeans.inertia_:.4f}") # Calculate and print Silhouette Score if possible # Requires more than one unique predicted label and at least one sample if len(np.unique(all_cluster_labels_test)) > 1 and len(all_cluster_labels_test) > 0: silhouette = silhouette_score(X_test_combined_reshaped, all_cluster_labels_test) print(f"Silhouette Score: {silhouette:.4f}") else: print("Silhouette Score: Not applicable for single cluster.") else: # Print a message if no test data sequences were available for evaluation print("\nNo test data sequences available to evaluate Inertia and Silhouette Score.")