| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import argparse
- import torch
- import sys
- sys.path.append("../")
- from boolean_classifier.datasets.boolean_ngram_dataset import BooleanNGramDataset
- from boolean_classifier.datasets.ngram_dataset import NGramDataset
- from boolean_classifier.architectures.ffnn import FFNN
- from torch.utils.data import DataLoader
- import multiprocessing
- import json
- import os
- import torch.nn
- from torch.optim.lr_scheduler import _LRScheduler
- from torch.utils.data import DataLoader
- from tqdm import tqdm
- from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
- import joblib
-
-
- def load_configuration(configuration_filepath: str) -> dict:
- with open(configuration_filepath, "r") as configuration_file:
- configuration = json.load(configuration_file)
- return configuration
-
- def evaluate(model: torch.nn.Module, dataloader: DataLoader) -> tuple[list, list]:
- y_trues = []
- y_preds = []
- device = next(model.parameters()).device
- model = model.eval()
- with torch.no_grad():
- for x, y in tqdm(dataloader):
- if feature_selector is not None:
- x = torch.Tensor(feature_selector.transform(x))
- x, y = x.to(device), y.to(device)
- outputs = model.predict(x)
- y_pred = outputs.argmax(dim=1)
- y_trues.extend(y.cpu())
- y_preds.extend(y_pred.cpu())
- return y_trues, y_preds
-
- def save_results(y_trues: list, y_preds: list, output_filepath: str):
- acc = accuracy_score(y_trues, y_preds)
- precision = precision_score(y_trues, y_preds)
- recall = recall_score(y_trues, y_preds)
- f1 = f1_score(y_trues, y_preds)
- cm = confusion_matrix(y_trues, y_preds)
-
- with open(output_filepath, "w") as output_file:
- output_file.write("Accuracy: {}\n".format(acc))
- output_file.write("Precision: {}\n".format(precision))
- output_file.write("Recall: {}\n".format(recall))
- output_file.write("F1: {}\n".format(f1))
- output_file.write("Confusion Matrix: {}\n".format(cm))
-
-
-
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Evaluate malware detector')
- parser.add_argument("evaluation_file",
- type=str,
- help="Evaluation file containing the hashes and labels of the benign and malicious samples"
- )
- parser.add_argument("dataset_type",
- type=str,
- help="Type of dataset: {Boolean, EMBER}"
- )
- parser.add_argument("configuration_file",
- type=str,
- help="Configuration file containing the hyperparameters of the model"
- )
- parser.add_argument("output_file",
- type=str,
- help="File to where to store the results",
- )
- parser.add_argument("--batch_size",
- type=int,
- help="Batch size for training",
- default=32
- )
- args = parser.parse_args()
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print("Device: ", device)
- num_workers = max(multiprocessing.cpu_count() - 4, multiprocessing.cpu_count() // 2 + 1)
-
- configuration = load_configuration(args.configuration_file)
- if args.dataset_type == "BooleanBigrams":
- dataset = BooleanNGramDataset(args.evaluation_file)
- elif args.dataset_type == "Bigrams":
- dataset = NGramDataset(args.evaluation_file)
- else:
- raise NotImplementedError("Only Boolean dataset is currently supported")
- dataloader = DataLoader(
- dataset,
- batch_size=args.batch_size,
- num_workers=num_workers,
- )
- model = FFNN(configuration)
- model = model.to(device)
- model.load_state_dict(torch.load(os.path.join(configuration["model_path"], "model.pth"), weights_only=True))
- model.eval()
- if configuration["feature_selector"] is not None:
- feature_selector = joblib.load(configuration["feature_selector"])
- else:
- feature_selector = None
- y_trues, y_preds = evaluate(model, dataloader)
- save_results(y_trues, y_preds, args.output_file)
-
-
|