123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655 |
- # Csar Fdez, UdL, 2025
- # Changes from v1: Normalization
- # IN v1, each failure type has its own normalization pars (mean and stdevs)
- # In v2, mean and stdev is the same for all data
- # v3.py trains the models looping in TIME_STEPS (4,8,12,16,20,24,....) finding the optimal Threshold factor
-
- # Derived from v3_class, derived from v3.py with code from v1_multifailure.py
- # This code don't train for multiple time steps !!
-
- # partial and total blocked condenser merged in one class.
- # Construction of train and test sets changed. Now is done by days
-
-
-
-
- # This code snippet creates the foundation for a Python script used for time series prediction and classification, using Keras.
-
- ##################################### Section 1: Libraries ################################
-
- import pandas as pd # Data analysis, working with tabular data
- import matplotlib.pyplot as plt # To draw and visualize
- import datetime # For working with dates and times. Useful for time series data (although no direct code is used in this section)
- import numpy as np # For numerical calculations, especially with arrays
- import keras # Deep learning library for building and training neural networks
- import os.path # To work with file paths (checking for file existence, etc.)
- from keras import layers # Importing the cross-layer module that includes different layers of neural networks
- from optparse import OptionParser # To manage command line options. This allows you to run the script with different settings.
- import copy # To create copies of objects (potentially for data or models)
- import pickle # To save and load Python objects (although not used directly in this section of code)
-
-
- parser = OptionParser() # Creates an OptionParser object to handle command line arguments.
-
- parser.add_option("-t", "--train", dest="train", help="Trains the models (false)", default=False, action="store_true")
- # Adds a -t or --train option. When this option is used (e.g., python your_script.py -t),
- # the options.train variable is set to True, otherwise it defaults to False.
- # Used to control whether the script should train a new model or load a pre-trained model.
-
- parser.add_option("-n", "--timesteps", dest="timesteps", help="TIME STEPS ", default=12)
- # Adds the -n or --timesteps option to specify the number of time steps.
- # The default is 12. This is very important for time series tasks, as it determines the length of the input sequence for the model.
-
- parser.add_option("-r", "--transition", dest="transition", help="Includes transition data (false)", default=False, action="store_true")
- # If specified, data from the transition period between normal and failure is considered (default: False).
-
- parser.add_option("-p", "--plot", dest="plot", help="Only plot data (false)", default=False, action="store_true")
- #
-
- #parser.add_option("-f", "--thresholdfactor", dest="TF", help="Threshold Factor ", default=1.4)
- # threshold makes no sense when classifying, becaues we apply many models and decide class for the less MSE
-
- # In previous versions, a threshold factor was used.
- # Why is the threshold factor not used in this version?
- # Because of the use of the model and the decision-making based on the least squares error (MSE), it no longer makes sense to use it.
-
- (options, args) = parser.parse_args() # Parse command line arguments and store them in the options object
-
- ##################################### Section 2: Defining Data and Files ################################
-
- # data files arrays. Index:
- # 0. No failure
- # 1. Blocked evaporator
- # 2. Full Blocked condenser
- # 3. Partial Blocked condenser
- # 4 Fan condenser not working
- # 5. Open door
-
-
- NumberOfFailures=4 # So far, we have only data for the first 4 types of failures
- # Determine the number of failure types, here 4 failure types are considered
- datafiles=[[],[]] # 0 for train, 1 for test
- # Create an empty list to store the list of given files. Index 0 is for training data and index 1 is for testing data.
-
- for i in range(NumberOfFailures+1): # Initialize internal lists for each class (class 0 for no failure mode and classes 1 to 4 for failure types)
- datafiles[0].append([])
- datafiles[1].append([])
-
- # Next set of ddata corresponds to Freezer, SP=-26 # Explanation that the data is for a freezer with a set point (SP) of -26°C
- datafiles[0][0]=['2024-08-07_5_','2024-08-08_5_','2025-01-25_5_','2025-01-26_5_'] # List of files given for class 0 (no crashes) in the training set. File names include date.
- datafiles[0][1]=['2024-12-11_5_', '2024-12-12_5_','2024-12-13_5_'] # List of files given for Class 1 (Evaporator Blockage Failure) in the training set
- datafiles[0][2]=['2024-12-18_5_','2024-12-21_5_','2024-12-22_5_','2024-12-23_5_','2024-12-24_5_'] # List of data files for Class 2 (Complete Condenser Blockage Failure) in the training set
- datafiles[0][3]=['2024-12-28_5_','2024-12-29_5_','2024-12-30_5_'] # List of files given for Class 3 (Partial Condenser Blockage Failure) in the training set
- datafiles[0][4]=['2025-02-13_5_','2025-02-14_5_'] # List of files given for class 4 (condenser fan malfunction) in the training set
-
- if options.transition: # Checks if a --transition option is enabled on the command line.
- datafiles[1][0]=['2025-01-27_5_','2025-01-28_5_'] # List of data files for class 0 in the test suite (including transfer data if the option is enabled).
- datafiles[1][1]=['2024-12-14_5_','2024-12-15_5_','2024-12-16_5_'] # with TRANSITION # List of data files for class 1 in the test set (including transfer data if the option is enabled).
- datafiles[1][2]=['2024-12-17_5_','2024-12-19_5_','2024-12-25_5_','2024-12-26_5_'] # with TRANSITION # List of data files for class 2 in the test suite (including transfer data if the option is enabled).
- datafiles[1][3]=['2024-12-27_5_','2024-12-31_5_','2025-01-01_5_'] # with TRANSITION # List of data files for class 3 in the test suite (including transfer data if the option is enabled).
- datafiles[1][4]=['2025-02-12_5_','2025-02-15_5_','2025-02-16_5_'] # List of data files for class 4 in the test suite (including transfer data if the option is enabled).
-
- else: # If --transition is not possible
- datafiles[1][0]=['2025-01-27_5_','2025-01-28_5_'] # List of data files for class 0 in the test set (without transfer data)
- datafiles[1][1]=['2024-12-14_5_','2024-12-15_5_'] # List of data files for class 1 in the test set (without transfer data)
- datafiles[1][2]=['2024-12-19_5_','2024-12-25_5_','2024-12-26_5_'] # List of data files for class 2 in the test set (without transfer data)
- datafiles[1][3]=['2024-12-31_5_','2025-01-01_5_'] # List of data files for class 3 in the test set (without transfer data)
- datafiles[1][4]=['2025-02-15_5_','2025-02-16_5_'] # List of data files for class 4 in the test set (without transfer data)
-
-
- #datafiles[0][4]=['2025-02-05_5_']
- #datafiles[1][4]=['2025-02-05_5_']
-
- ##################################### Section 3: Features ################################
-
-
- #r1s5 supply air flow temperature
- #r1s1 inlet evaporator temperature
- #r1s4 condenser outlet
-
- # VAriables r1s4 and pa1 apiii may not exists in cloud controlers
-
-
- features=['r1 s1','r1 s4','r1 s5','pa1 apiii']
- features=['r1 s1','r1 s4','r1 s5']
- features=['r1 s5']
- # Feature combination suggested by AKO
- #features=['r1 s1','r1 s4','r1 s5','pa1 apiii']
- features=['r1 s1','r1 s4','r1 s5'] # Redefining the feature list (this seems to be the final list of features used)
- #features=['r1 s1','r1 s5','pa1 apiii']
- #features=['r1 s5','pa1 apiii']
- #features=['r1 s1','r1 s5']
- #features=['r1 s5']
-
-
-
- featureNames={} # Create a dictionary to write feature names into names that can be displayed in the image.
- featureNames['r1 s1']='$T_{evap}$'
- featureNames['r1 s4']='$T_{cond}$'
- featureNames['r1 s5']='$T_{air}$'
- featureNames['pa1 apiii']='$P_{elec}$'
-
- unitNames={} # Create a dictionary to write feature names in their units
- unitNames['r1 s1']='$(^{o}C)$'
- unitNames['r1 s4']='$(^{o}C)$'
- unitNames['r1 s5']='$(^{o}C)$'
- unitNames['pa1 apiii']='$(W)$'
-
- NumFeatures=len(features) # Length or number of features
-
-
- ##################################### Section 4: Data Loading and Preprocessing ################################
-
-
- df_list=[[],[]] # Create a nested list to store Pandas DataFrames. Index 0 is for training data and index 1 is for test data.
- for i in range(NumberOfFailures+1): # Initializing internal lists for each class
- df_list[0].append([])
- df_list[1].append([])
-
- for i in range(NumberOfFailures+1): # Loop to process data for each class in the training set
- dftemp=[] # Create a temporary list to store DataFrames from different files of a class
- for f in datafiles[0][i]: # The loop is for reading each file corresponding to a specific class in the training set.
- print(" ", f) # Print the name of the file being processed.
-
-
-
- # *
- # ***
- script_dir = os.path.dirname(os.path.abspath(__file__)) # Script folder path, get the path of the current script directory.
- data_dir = os.path.join(script_dir, 'data') # data folder path, creating the path to the 'data' directory where the CSV files are located.
- file_path = os.path.join(data_dir, f + '.csv') # csv full file path.
-
-
- print(f"Currently attempting to open: {file_path}") # For debugging, print the path it is trying to open (for debugging).
-
- df1 = pd.read_csv(file_path) # Reading a CSV file and converting it to a Pandas DataFrame.
- dftemp.append(df1) # Add the read DataFrame to the temporary list dftemp
- df_list[0][i]=pd.concat(dftemp) # Merge all DataFrames in dftemp of a specific class and store it in df_list[0][i]
-
- ########## A similar section for loading test data
- ########
- #####
- ###
- ##
- #
-
- for i in range(NumberOfFailures+1):
- dftemp=[]
- for f in datafiles[1][i]:
- print(" ", f)
-
- # *** ***
- script_dir = os.path.dirname(os.path.abspath(__file__))
- data_dir = os.path.join(script_dir, 'data')
- file_path = os.path.join(data_dir, f + '.csv')
-
-
- print(f"Currently attempting to open: {file_path}") #
-
- df1 = pd.read_csv(file_path)
- dftemp.append(df1)
- df_list[1][i]=pd.concat(dftemp)
-
-
- # Explains that the data was sampled every 5 minutes (30 samples of 10 seconds each)
- # subsampled to 5' = 30 * 10"
- # We consider smaples every 5' because in production, we will only have data at this frequency
- subsamplingrate=30 # Determining the resampling rate
-
- dataframe=[[],[]] # Create a nested list to store the sampled DataFrames. Index 0 for training data and index 1 for testing data.
- for i in range(NumberOfFailures+1): # Initializing internal lists for each class
- dataframe[0].append([])
- dataframe[1].append([])
-
- for i in range(NumberOfFailures+1): # Loop to process data for each class in the training set
- datalength=df_list[0][i].shape[0] # Getting the number of rows in the original DataFrame
- dataframe[0][i]=df_list[0][i].iloc[range(0,datalength,subsamplingrate)][features]
- # The code is sampling rows with a sub sampling rate and selecting specific feature columns from the dataframe and storing the result somewhere else.
- dataframe[0][i].reset_index(inplace=True,drop=True) # Reset DataFrame index and remove old index
- dataframe[0][i].dropna(inplace=True) # Delete rows that have NaN values
-
- for i in range(NumberOfFailures+1): # Similar section for sampling and preprocessing of test data
- datalength=df_list[1][i].shape[0]
- dataframe[1][i]=df_list[1][i].iloc[range(0,datalength,subsamplingrate)][features]
- dataframe[1][i].reset_index(inplace=True,drop=True)
- dataframe[1][i].dropna(inplace=True)
-
-
- # Train data i [0] and test data is [1]
- dataTrain=[] # Creating a List to Store Training DataFrames
- dataTest=[] # Creating a list to store test DataFrames
- for i in range(NumberOfFailures+1): # Copying the processed DataFrames into the dataTrain and dataTest lists
- dataTrain.append(dataframe[0][i])
- dataTest.append(dataframe[1][i])
-
- ##################################### Section 5: Data Normalization ################################
-
- # Calculate means and stdev
- a=dataTrain[0] # Initialization with DataFrame training class 0
- for i in range(1,NumberOfFailures+1): # Merge training DataFrames of all classes vertically into a NumPy array for mean and standard deviation
- a=np.vstack((a,dataTrain[i]))
-
- means=a.mean(axis=0) # Calculate the average for each feature across the entire training data.
- stdevs=a.std(axis=0) # Calculating the standard deviation for each feature across the entire training data
- def normalize2(train,test): # Define a function to normalize data using the mean and standard deviation
- return( (train-means)/stdevs, (test-means)/stdevs )
-
- dataTrainNorm=[] # Create a list to store normalized training data
- dataTestNorm=[] # Create a list to store normalized test data
- for i in range(NumberOfFailures+1): # Initializing internal lists
- dataTrainNorm.append([])
- dataTestNorm.append([])
-
- for i in range(NumberOfFailures+1): # Normalization of training and test data for each class
- (dataTrainNorm[i],dataTestNorm[i])=normalize2(dataTrain[i],dataTest[i])
-
- ##################################### Section 6: Creating Time Sequences ################################
-
- # Definition and training of educational sector models
-
- NumFilters=64 # Determine the number of filters in the convolutional layers
- KernelSize=7 # Determine the size of the kernel in the convolutional layers
- DropOut=0.2 # Determine the dropout rate to prevent overfitting
- def create_sequences(values, time_steps): # Define a function to convert time series data into input sequences for the model. This function creates sequences of length time_steps
- output = []
- for i in range(len(values) - time_steps + 1):
- output.append(values[i : (i + time_steps)])
- return np.stack(output)
-
- def listToString(l): # Define a function to convert a list to a string without spaces (for naming files)
- r=''
- for i in l:
- r+=str(i)
- return(r.replace(' ',''))
-
- ##################################### Section 7: Model Definition ################################
-
- model=[] # Create a list to store cross models
- modelckpt_callback =[] # A list to hold ModelCheckpoint callbacks used to store the best model weights during training.
- es_callback =[]
- # It creates an empty list. This list is later populated with tools called EarlyStopping.
- # The job of these tools is to act like a smart supervisor:
- # they monitor the model's performance on the validation data
- # and if they see that the model's performance (e.g., error rate) is no longer improving,
- # they issue a stop training command to avoid the common problem of "overfitting" (learning
- # too much on the training data and performing poorly on new data).
- path_checkpoint=[] # Create a list to store and maintain checkpoint model file paths
- timesteps=int(options.timesteps) # The number of time steps taken from the command line argument ( options.timesteps ).
- x_train=[] # A list to hold the training sequences for each class.
-
- # A loop is created on each failure class:
- # The normalized data is converted to sequences using create_sequences.
- # A SequentialKeras model is created. This model is an Autoencoder that uses Conv1D layers to encode into a latent space and Conv1DTranspose layers to decode and use the data.
- # The model is compiled using the Adam algorithm and the MSE loss function.
- # A summary of the model is printed.
- # The checkpoint file path is set to store the weights of the built model.
- # The EarlyStopping and ModelCheckpoint callbacks are defined
-
- for i in range(NumberOfFailures+1): # Loop to create the model and prepare the training data for each class
- x_train.append(create_sequences(dataTrainNorm[i],timesteps)) # Create input sequences from normalized training data for the current class
- model.append([]) # Add an empty list to the model list
-
- # Define a Keras Sequential model for the current class. The model includes Conv1D and Conv1DTranspose layers to build an autoencoder
- model[i] = keras.Sequential(
- [
- layers.Input(shape=(x_train[i].shape[1], x_train[i].shape[2])), # Input layer with shape (time_steps, num_feature)
- layers.Conv1D( # 1D convolutional layers for feature generation
- filters=NumFilters,
- kernel_size=KernelSize,
- padding="same",
- strides=2,
- activation="relu",
- ),
- layers.Dropout(rate=DropOut), # Dropout layers to prevent overfitting
- layers.Conv1D(
- filters=int(NumFilters/2),
- kernel_size=KernelSize,
- padding="same",
- strides=2,
- activation="relu",
- ),
- layers.Conv1DTranspose( # Convolutional transform for input layer
- filters=int(NumFilters/2),
- kernel_size=KernelSize,
- padding="same",
- strides=2,
- activation="relu",
- ),
- layers.Dropout(rate=DropOut),
- layers.Conv1DTranspose(
- filters=NumFilters,
- kernel_size=KernelSize,
- padding="same",
- strides=2,
- activation="relu",
- ),
- layers.Conv1DTranspose(filters=x_train[i].shape[2], kernel_size=KernelSize, padding="same"),
- ]
- )
- model[i].compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss="mse") # Compile the model using the Adam algorithm and the least-squares loss function (MSE)
- model[i].summary() # Print model summary
- path_checkpoint.append("model_class_v5_"+str(i)+"_"+str(timesteps)+listToString(features)+"_checkpoint.weights.h5") # Create a file path for the saved checkpoint model
- es_callback.append(keras.callbacks.EarlyStopping(monitor="val_loss", min_delta=0, patience=15)) # Define a callback for early stopping, stopping training if no improvement is observed in val_loss
- # Define a callback to store the best model weights based on val_loss
- modelckpt_callback.append(keras.callbacks.ModelCheckpoint( monitor="val_loss", filepath=path_checkpoint[i], verbose=1, save_weights_only=True, save_best_only=True,))
-
-
- ##################################### Section 8: Training or Loading the Model ################################
-
- # If options.train is True:
- # A history list is created to keep track of the training history of each model.
- # A loop is executed over each class and the corresponding model is trained with the training data of that class. Callbacks are used to stop early and save the best weights.
- # The model predictions are made on the training data.
- # Otherwise (if options.train is False):
- # Pre-trained weights are loaded from checkpoint files.
-
- if options.train: # If the --train option is enabled on the command line
- history=[] # Create a list to store training history
- for i in range(NumberOfFailures+1): # Loop to train the model for each class
- # Train the model with the current class training data
- history.append(model[i].fit( x_train[i], x_train[i], epochs=400, batch_size=128, validation_split=0.3, callbacks=[ es_callback[i], modelckpt_callback[i] ],))
-
- x_train_pred=model[i].predict(x_train[i]) # Predict the model output for training data (to check performance)
- else: # If --train is not possible
- for i in range(NumberOfFailures+1): # Load pre-trained weights from checkpoint files
- model[i].load_weights(path_checkpoint[i])
-
- ##################################### Section 9: Data Plotting Function (Primary) ################################
-
- # Let's plot some features
-
- colorline=['black','violet','lightcoral','cyan','lime','grey'] # A list of colors to draw lines
- colordot=['grey','darkviolet','red','blue','green','black'] # A list of colors to draw points
-
- #featuresToPlot=['r1 s1','r1 s2','r1 s3','pa1 apiii']
- featuresToPlot=features # Redefine the feature list to draw the graph using the original list
-
- indexesToPlot=[] # Create a list to store the indices of the features you want to plot.
- for i in featuresToPlot: # Find the index of each feature in the original list
- indexesToPlot.append(features.index(i))
-
-
- def plotData(): # Define a function to plot the experimental data for each class
- NumFeaturesToPlot=len(indexesToPlot) # Specific features to be drawn
- plt.rcParams.update({'font.size': 16}) # Set font size for chart
- fig, axes = plt.subplots( # Create shapes and sub-axes for the chart
- nrows=NumFeaturesToPlot, ncols=1, figsize=(15, 10), dpi=80, facecolor="w", edgecolor="k",sharex=True
- )
- for i in range(NumFeaturesToPlot): # Set labels and titles for axes # Show help # Loop to plot data for each class for each feature
- init=0
- end=testRanges[0][1]
- for j in range(NumberOfFailures+1):
- if NumFeaturesToPlot==1:
- axes.plot(range(init,end),x_test[testRanges[j][0]:testRanges[j][1],0,indexesToPlot[i]]*stdevs[i]+means[i],label="Class "+str(j), color=colorline[j],linewidth=1)
- else:
- axes[i].plot(range(init,end),x_test[testRanges[j][0]:testRanges[j][1],0,indexesToPlot[i]]*stdevs[i]+means[i],label="Class "+str(j), color=colorline[j],linewidth=1)
- if j<NumberOfFailures:
- init=end
- end+=(testRanges[j+1][1]-testRanges[j+1][0])
-
-
- s=''
- s+=featureNames[features[indexesToPlot[i]]]
- s+=' '+unitNames[features[indexesToPlot[i]]]
- if NumFeaturesToPlot==1:
-
-
-
-
- axes.set_ylabel(s)
- axes.grid()
- else:
- axes[i].set_ylabel(s)
- axes[i].grid()
-
- if NumFeaturesToPlot==1:
- axes.legend(ncol=4,loc=(0.1,0.98))
- else:
- axes[0].legend(ncol=4,loc=(0.1,0.98))
- plt.show()
-
- ##################################### Section 10: Preparing Test Data ################################
-
- # 2nd scenario. Go over anomalies and classify it by less error
- #datalist=[dataTestNorm[0],dataTestNorm[1],dataTestNorm[2],dataTestNorm[3]]
-
-
- # Anomaly classification:
- # datalist: A list of normalized test data for each class.
- # x_test: The test data that was converted into sequences using create_sequences.
- # testRanges: A list of ranges of x_test index that correspond to each class. This is for plotting plots with different colors.
- # If options.plot is True, the plotData function is called and the program terminates.
- # testClasses: A list of actual classes corresponding to each range in testRanges.
- # A check is made to ensure that the lengths of testClasses and testRanges are equal.
- # x_test_predict: A list to hold the predictions of each Autoencoder model on the test data.
- # A loop is run over each model and its predictions are made on x_test.
- # test_mae_loss: A list to hold the mean absolute error (MAE) between the predictions and the actual data for each model.
- # test_mae_loss_average: The average MAE across features.
- # classes: An array representing the predicted class for each test sequence. The predicted class is the class with the lowest MAE.
-
- datalist=[dataTestNorm[0],dataTestNorm[1],dataTestNorm[2],dataTestNorm[3],dataTestNorm[4]] # Create a list of test data for all classes
- x_test=create_sequences(datalist[0],int(options.timesteps)) # Create input sequences from test data for class 0
- for i in range(1,len(datalist)): # Merge input sequences for all classes into one array
- x_test=np.vstack((x_test,create_sequences(datalist[i],int(options.timesteps))))
-
- # Define ranges for plotting in different colors
- testRanges=[] # Create a list to store the index ranges for each class in the array
- r=0 # Initialize the starting index
- for i in range(len(datalist)): # Calculate index intervals for each class
- testRanges.append([r,r+datalist[i].shape[0]-int(options.timesteps)+1])
- r+=datalist[i].shape[0]-int(options.timesteps)+1
-
- if options.plot: # If the --plot option is enabled on the command line
- # Only plot data and exit
- plotData() # Call the function to plot the data
- exit(0) # Exit the program
-
- testClasses=[0,1,2,3,4] # List of actual classes for the test data
-
- if not len(testClasses)==len(testRanges): # Check that the length of the class and range lists is equal
- print("ERROR: testClasses and testRanges must have same length")
- exit(0)
- ##################################### Section 11: Prediction and Classification ################################
-
- x_test_predict=[] # Create a list to store the model predictions for the test data
- for m in range(NumberOfFailures+1): # Predict the output of each model for the entire test data
- x_test_predict.append(model[m].predict(x_test))
-
- x_test_predict=np.array((x_test_predict)) # Convert the list of predictions to a NumPy array
- test_mae_loss =[] # Create a list to store the mean absolute error (MAE) for each model
- for m in range(NumberOfFailures+1): # Calculate the MAE between each model's predictions and the actual data
- test_mae_loss.append(np.mean(np.abs(x_test_predict[m,:,:,:] - x_test), axis=1))
-
- test_mae_loss=np.array((test_mae_loss)) # Convert the MAE list to a NumPy array
- test_mae_loss_average=np.mean(test_mae_loss,axis=2) # average over features # Calculate the average MAE for each sample across features
- classes=np.argmin(test_mae_loss_average,axis=0) # Choose the minimum loss # Determine the predicted class for each sample based on the model that has the lowest MAE
-
- # Plot the classification results:
- # x and y: Lists to hold indices and features that were misclassified.
- # A loop is run over each class and then over each sample in the range corresponding to that class. If the predicted class is different from the actual class, the corresponding index and features are added to the x and y lists.
- # plotData4: A function similar to plotData, but it also plots the points that were misclassified on the graph
-
- x=[] # Create a list for indices of misclassified samples
- y=[] # Create a list to store the features of misclassified samples
- for j in range(NumberOfFailures+1): # Loop to find misclassified samples and store their indices and features
- x.append([])
- y.append([])
- for j in range(NumberOfFailures+1):
- for k in range(testRanges[j][0],testRanges[j][1]):
- if not classes[k]==testClasses[j]:
- x[classes[k]].append(k)
- y[classes[k]].append(x_test[k,0,indexesToPlot[0]]*stdevs[0]+means[0])
-
- ##################################### Section 12: Data plotting function with anomaly detection ################################
-
- # Similar function to plotData with the addition of plotting points for misclassified samples
- # Define a function to plot the test data and show the misclassified samples
-
- def plotData4():
- NumFeaturesToPlot=len(indexesToPlot)
- plt.rcParams.update({'font.size': 16})
- fig, axes = plt.subplots(
- nrows=NumFeaturesToPlot, ncols=1, figsize=(15, 10), dpi=80, facecolor="w", edgecolor="k",sharex=True
- )
- for i in range(NumFeaturesToPlot):
- init=0
- end=testRanges[0][1]
- for j in range(NumberOfFailures+1):
- if NumFeaturesToPlot==1:
- axes.plot(range(init,end),x_test[testRanges[j][0]:testRanges[j][1],0,indexesToPlot[i]]*stdevs[i]+means[i],label="Class "+str(j), color=colorline[j],linewidth=1)
- else:
- axes[i].plot(range(init,end),x_test[testRanges[j][0]:testRanges[j][1],0,indexesToPlot[i]]*stdevs[i]+means[i],label="Class "+str(j), color=colorline[j],linewidth=1)
- if j<NumberOfFailures:
- init=end
- end+=(testRanges[j+1][1]-testRanges[j+1][0])
-
- #if i==0:
- # axes[0].plot(x[j],y[j] ,color=colordot[j],marker='.',markersize=10,linewidth=0,label="Fail detect class "+str(j) )
-
-
-
- s=''
- s+=featureNames[features[indexesToPlot[i]]]
- s+=' '+unitNames[features[indexesToPlot[i]]]
- if NumFeaturesToPlot==1:
- axes.set_ylabel(s)
- axes.set_xlabel('Sample number')
- axes.grid()
- else:
- axes[i].set_ylabel(s)
- axes[NumFeaturesToPlot-1].set_xlabel('Sample number')
- axes[i].grid()
-
- for j in range(NumberOfFailures+1):
- if NumFeaturesToPlot==1:
- axes.plot(x[j],y[j] ,color=colordot[j],marker='.',markersize=10,linewidth=0,label="Fail detect class "+str(j) )
- else:
- axes[0].plot(x[j],y[j] ,color=colordot[j],marker='.',markersize=10,linewidth=0,label="Fail detect class "+str(j) )
-
- if NumFeaturesToPlot==1:
- axes.legend(ncol=4,loc=(0.1,0.98))
- else:
- axes[0].legend(ncol=4,loc=(0.1,0.98))
-
-
- #axes[NumFeaturesToPlot-1].set_xlabel("Sample number")
- plt.show()
- ##################################### Section 13: Helper function for determining class ################################
-
- # Define a function to find the actual class of an instance based on its index.
-
- def whichClass(k,ranges):
- for i in range(NumberOfFailures+1):
- if k in range(ranges[i][0],ranges[i][1]):
- return(i)
- print("Error: Class not exists")
- exit(0)
-
- ##################################### Section 14: Calculating the anomaly metric ################################
-
-
- # Classification performance evaluation:
- # whichClass: A helper function that returns the true class of an instance given the index and ranges defined in testRanges.
- # anomalyMetric: A function to calculate the classification performance evaluation such as sensitivity, precision and F1-score for each class.
- # TP, FP, FN: Arrays to hold the number of True Positive, False Positive and False Negative for each class.
- # Loops are run over the test data and can calculate TP, FP and FN.
- # Sensitivity, Precision and F1-Score are calculated and printed for each class and their scores.
- # The anomalyMetric function is called for the classification results and the test range.
- # computeDelay: A function to calculate and correctly detect failures for a list of classes (when there are transition periods).
- # d: An array to hold each class.
- # NoFailsInARow: The number of consecutive samples that must be correctly classified for the failure to be considered detected.
- # Loops over the specified classes and calculates the quantity. Defined as the number of samples from the initial class until NoFailsInARow consecutive samples are correctly classified.
- # The average is calculated and printed.
- # The computeDelay function is called for classes 2, 3, and 4.
- # Finally, the plotData4 function is called to display the classification results.
-
-
- ## implemenent anomaly metrics for each failure class
- def anomalyMetric(classes,testranges,testclasses):
- ##########Define a function for the performance evaluation criteria of the classification (sensitivity, precision, F1-score).
- # Calculate TP, FP, FN for each class.
- # Calculate sensitivity and precision for each class.
- # Calculate the sensitivity and precision.
- # Calculate F1-score.
- # Print results.
-
- # FP, TP: false/true positive
- # TN, FN: true/false negative
- # Sensitivity (recall): probab failure detection if data is fail: TP/(TP+FN)
- # Precision: Rate of positive results: TP/(TP+FP)
- # F1-score: predictive performance measure: 2*Precision*Sensitity/(Precision+Sensitity)
- TP=np.zeros(NumberOfFailures+1)
- FP=np.zeros(NumberOfFailures+1)
- FN=np.zeros(NumberOfFailures+1)
- Sensitivity=np.zeros(NumberOfFailures+1)
- Precision=np.zeros(NumberOfFailures+1)
- for i in range(len(testranges)):
- for k in range(testranges[i][0],testranges[i][1]):
- if classes[k]==testclasses[i]:
- TP[i]+=1
- else:
- FP[i]+=1
- for k in range(testranges[NumberOfFailures][1]):
- for i in range(len(testranges)):
- classK=whichClass(k,testranges)
- if not classK==testClasses[i]:
- if not classes[k]==classK:
- FN[classes[k]]+=1
-
- for i in range(NumberOfFailures+1):
- if (TP[i]+FN[i])>0:
- Sensitivity[i]=TP[i]/(TP[i]+FN[i])
- else:
- Sensitivity[i]=0
- Precision[i]=TP[i]/(TP[i]+FP[i])
- S=Sensitivity.mean()
- P=Precision.mean()
- F1=2*S*P/(S+P)
- print("Sensitivity: ",Sensitivity)
- print("S: ",S)
- print("Precision: ",Precision)
- print("P: ",P)
- print("F1-Score: ",F1)
-
- anomalyMetric(classes,testRanges,testClasses) # Call the function to calculate and print the evaluation criteria
-
- ##################################### Section 15: Estimated Amount ################################
-
- # Compute delay until correct detection for a list of ranges (when transition data exists)
-
- # Define a function to calculate the correct failure detection (if there is transitive data)
- # The number of consecutive samples that are correctly classified.
- # Calculate until a certain number of samples are correctly classified.
- # Calculate the value.
- # Print the average
-
- def computeDelay(l,classes,testRanges,testClasses):
- d=np.zeros(len(l))
- NoFailsInARow=4
- ind=0
- for i in l:
- start=testRanges[i][0]
- count=0
- while start<testRanges[i][1]:
- if classes[start]==testClasses[i]:
- count+=1
- if count==NoFailsInARow or start==(testRanges[i][1]-1):
- count=0
- #print(start,start-testRanges[i][0]-NoFailsInARow+timesteps)
- d[ind]=start-testRanges[i][0]-NoFailsInARow+timesteps
- break
- start+=1
- ind+=1
- print(d)
- return(d.mean())
-
- d=computeDelay([2,3,4],classes,testRanges,testClasses) # Call the function to calculate, for classes 2, 3, and 4
- print("Delay: ",d)
- ##################################### Section 16: Plotting Data with Final Results ################################
-
- plotData4() # Call the function to plot the classification results.
-
- # In short, this code is a fault detection and classification system based on Autoencoders.
- # For each fault class (and the no-fault class), an Autoencoder is trained.
- # In the testing phase, test samples are passed through all Autoencoders,
- # The class with the lowest error is selected as the predicted class. The performance is then evaluated using various metrics.
|