import os import time import logging import subprocess import tkinter as tk from tkinter import filedialog, messagebox, ttk from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import threading import pandas as pd import pickle import numpy as np from sklearn.preprocessing import MinMaxScaler import sys import os import pandas as pd import numpy as np import codecs import pickle import requests isMonitoring = False output_directory = "outputs" bytes_output_directory = "outputs/bytes_output" asm_output_directory = "outputs/asm_output" result_folder = "results" bytes_result_directory = "results/bytes_result" asm_result_directory = "results/asm_result" bytes_model_directory = "bytes_models" asm_model_directory = "asm_models" if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory): messagebox.showinfo("Error", "Models Not Found for Prediction") exit(-1) if not os.path.exists(output_directory): os.makedirs(output_directory) if not os.path.exists(asm_output_directory): os.makedirs(asm_output_directory) if not os.path.exists(bytes_output_directory): os.makedirs(bytes_output_directory) if not os.path.exists(result_folder): os.makedirs(result_folder) if not os.path.exists(asm_result_directory): os.makedirs(asm_result_directory) if not os.path.exists(bytes_result_directory): os.makedirs(bytes_result_directory) logging.basicConfig(filename= "/home/tech4biz/Desktop/malware.logs", level=logging.INFO) def send_predictions_to_api(file_path): url = "http://142.93.221.85:8000/predict-malware/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") def send_asm_predictions_to_api(file_path): url = "http://142.93.221.85:8000/predict-malware/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") def format_bytes_to_hex(data): hex_dump = "" for i in range(0, len(data), 16): chunk = data[i:i+16] hex_values = " ".join(f"{byte:02X}" for byte in chunk) address = f"{i:08X}" hex_dump += f"{address} {hex_values}\n" return hex_dump def convert_file_to_hex(input_file, output_file): try: with open(input_file, 'rb') as f: data = f.read() hex_dump = format_bytes_to_hex(data) with open(output_file, 'w') as f: f.write(hex_dump) logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'") except Exception as e: logging.error(f"Error converting '{input_file}': {e}") def scan_and_convert_directory(directory, output_dir): for root, _, files in os.walk(directory, followlinks=True): for filename in files: input_file = os.path.join(root, filename) if not filename.endswith(".bytes"): output_file = os.path.join(output_dir, f"{filename}.bytes") if not os.path.exists(output_file): convert_file_to_hex(input_file, output_file) class FileChangeHandler(FileSystemEventHandler): def __init__(self, output_dir, hex_dirs, disasm_dirs): self.output_dir = output_dir self.hex_dirs = hex_dirs self.disasm_dirs = disasm_dirs super().__init__() def on_created(self, event): if not event.is_directory: input_file = event.src_path output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes") if not os.path.exists(output_file_hex): # Convert to hex in a new thread threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start() threading.Thread(target=self.run_disassembly, args=(input_file,)).start() # Disassemble in a new thread def run_hex_conversion(self, input_file, output_file): convert_file_to_hex(input_file, output_file) run_malware_ai_analysis_bytes() def run_disassembly(self, file_path): try: print(f"Disassembling {file_path}") result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True) assembly_code = result.stdout base_name = os.path.basename(file_path) if not file_path.endswith(".asm"): asm_file_name = f"{base_name}.asm" asm_file_path = os.path.join(asm_output_directory, asm_file_name) with open(asm_file_path, "w") as asm_file: asm_file.write(assembly_code) print(f"Disassembly complete. Assembly code saved to {asm_file_path}") run_malware_analysis_asm() except subprocess.CalledProcessError as e: print(f"Error disassembling file {file_path}: {e}", file=sys.stderr) def monitor_directories(directories, output_dir): event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories) observer = Observer() for directory in directories: observer.schedule(event_handler, path=directory, recursive=True) logging.info(f"Monitoring directory: {directory}") observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() def start_observer(directories, output_dir): observer = Observer() event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories) for directory in directories: observer.schedule(event_handler, path=directory, recursive=True) logging.info(f"Monitoring directory: {directory}") observer.start() return observer def disassemble_elf(file_path, output_dir): try: print(f"Disassembling {file_path}") result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True) assembly_code = result.stdout base_name = os.path.basename(file_path) if not file_path.endswith(".asm"): asm_file_name = f"{base_name}.asm" asm_file_path = os.path.join(output_dir, asm_file_name) with open(asm_file_path, "w") as asm_file: asm_file.write(assembly_code) print(f"Disassembly complete. Assembly code saved to {asm_file_path}") except subprocess.CalledProcessError as e: print(f"Error disassembling file {file_path}: {e}", file=sys.stderr) def find_elf_files(start_dirs): elf_files = [] for start_dir in start_dirs: if not os.path.isdir(start_dir): continue try: find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print'] find_result = subprocess.run(find_command, capture_output=True, text=True, check=False) # print("Result: ",find_result) if find_result.returncode != 0: print(f"Error running find command: {find_result.stderr}", file=sys.stderr) continue file_paths = find_result.stdout.splitlines() print(f"Found files in {start_dir}:") for file_path in file_paths: try: file_command = ['file', '--mime-type', file_path] file_result = subprocess.run(file_command, capture_output=True, text=True, check=True) if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout: elf_files.append(file_path) except subprocess.CalledProcessError as e: print(f"Error running file command on {file_path}: {e}", file=sys.stderr) except Exception as e: print(f"Error processing directory {start_dir}: {e}", file=sys.stderr) print(f"Found ELF files: {elf_files} ") return elf_files def process_files(output_dir, start_dirs): os.makedirs(output_dir, exist_ok=True) elf_files = find_elf_files(start_dirs) if not elf_files: print("No ELF files found.") return for elf_file in elf_files: disassemble_elf(elf_file, output_dir) print("Disassembly complete. Assembly files are saved in the output directory.") def process_files_malware(folder_path, files_to_process): feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns for k, file in enumerate(files_to_process): if file.endswith("bytes"): try: with open(os.path.join(folder_path, file), "r") as byte_file: for lines in byte_file: line = lines.rstrip().split(" ") for hex_code in line: if hex_code != '??': index = int(hex_code, 16) if index < 257: # Keep the bounds check for 257 feature_matrix[k][index] += 1 else: feature_matrix[k][257] += 1 # This now references the 258th feature except: continue # Normalize the features scaler = MinMaxScaler() feature_matrix = scaler.fit_transform(feature_matrix) return feature_matrix def test_files(folder_path, model_path, output_csv): files = os.listdir(folder_path) # Check if the CSV file already exists if os.path.exists(output_csv): existing_results = pd.read_csv(output_csv) already_scanned_files = set(existing_results['File'].tolist()) else: already_scanned_files = set() # Filter out files that have already been scanned files_to_process = [file for file in files if file not in already_scanned_files] if not files_to_process: print("All files have already been scanned.") return # Process only the files that haven't been scanned yet feature_matrix = process_files_malware(folder_path, files_to_process) # Load the trained model with open(model_path, 'rb') as model_file: model = pickle.load(model_file) # Make predictions predictions = model.predict(feature_matrix) prediction_probs = model.predict_proba(feature_matrix) # Create a DataFrame for the new results new_results = pd.DataFrame({ 'File': files_to_process, 'Predicted Class': predictions, 'Prediction Probability': [max(probs) for probs in prediction_probs] }) # Append new results to the existing CSV file or create a new one if os.path.exists(output_csv): new_results.to_csv(output_csv, mode='a', header=False, index=False) else: new_results.to_csv(output_csv, index=False) print(f"New predictions appended to {output_csv}") def run_malware_ai_analysis_bytes(): print("bytes malware analysis started") directory = bytes_output_directory model_files = bytes_model_directory model_folder = model_files # Folder containing the .pkl files model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')] for model_file in model_files: model_path = os.path.join(model_folder, model_file) output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv") test_files(directory, model_path, output_csv) try: send_predictions_to_api(output_csv) except: print("Connection Failed") def preprocess_asm_file(file_path): prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE'] opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx'] keywords = ['.dll', 'std::', ':dword'] registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip'] # Initialize counts prefix_counts = np.zeros(len(prefixes), dtype=int) opcode_counts = np.zeros(len(opcodes), dtype=int) keyword_counts = np.zeros(len(keywords), dtype=int) register_counts = np.zeros(len(registers), dtype=int) # Process file with open(file_path, 'r', encoding='cp1252', errors='replace') as f: for line in f: line = line.rstrip().split() if not line: continue l = line[0] for i, prefix in enumerate(prefixes): if prefix in l: prefix_counts[i] += 1 line = line[1:] for i, opcode in enumerate(opcodes): if any(opcode == li for li in line): opcode_counts[i] += 1 for i, register in enumerate(registers): if any(register in li and ('text' in l or 'CODE' in l) for li in line): register_counts[i] += 1 for i, keyword in enumerate(keywords): if any(keyword in li for li in line): keyword_counts[i] += 1 # Create feature vector feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts]) return feature_vector # Main function to load models and make predictions def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory): print("Starting analysis...") # Get all .asm files in the folder asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')] # Load all .pkl models from the models folder model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')] models = {} for model_file in model_files: model_name = os.path.splitext(model_file)[0] with open(os.path.join(models_folder, model_file), 'rb') as f: model_clf = pickle.load(f) models[model_name] = model_clf # Prediction and saving results for model_name, model_clf in models.items(): print(f"Making asm predictions with {model_name}...") # Generate the correct class mapping def get_class_mapping(model_name): if model_name == 'XGBClassifier': return {i: i for i in range(9)} # XGB uses 0-8 else: return {i: i+1 for i in range(9)} # Other models use 1-9 class_mapping = get_class_mapping(model_name) # Check if result file for the model already exists results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv' if os.path.exists(results_file_path): results_df = pd.read_csv(results_file_path) else: results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability']) new_predictions = [] for asm_file in asm_files: if asm_file not in results_df['file_name'].values: file_path = os.path.join(asm_folder_path, asm_file) feature_vector = preprocess_asm_file(file_path) feature_vector = feature_vector.reshape(1, -1) # Predict using the current model prediction = model_clf.predict(feature_vector) probability = model_clf.predict_proba(feature_vector) mapped_prediction = class_mapping[prediction[0]] predicted_prob = probability[0][prediction[0]] if "XGB" in model_name.upper(): new_predictions.append({ 'file_name': asm_file, 'prediction': mapped_prediction+1, 'probability': predicted_prob }) else: new_predictions.append({ 'file_name': asm_file, 'prediction': mapped_prediction, 'probability': predicted_prob }) # Append new predictions to results DataFrame if new_predictions: new_predictions_df = pd.DataFrame(new_predictions) results_df = pd.concat([results_df, new_predictions_df], ignore_index=True) results_df.to_csv(results_file_path, index=False) print(f"Predictions saved to {results_file_path}.") try: send_asm_predictions_to_api(results_file_path) except: print("Connection Failed") def run_hex_conversion(): hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')] hex_output_dir =bytes_output_directory if not hex_dirs or not hex_output_dir: messagebox.showwarning("Warning", "Please specify both directories and output directory.") return def hex_conversion_task(): for hex_dir in hex_dirs: hex_dir = hex_dir.strip() if os.path.isdir(hex_dir): scan_and_convert_directory(hex_dir, hex_output_dir) else: messagebox.showwarning("Warning", f"{hex_dir} is not a directory.") print("Hex conversion complete.") run_malware_ai_analysis_bytes() global isMonitoring if(not isMonitoring): isMonitoring = True start_monitoring() # hex_conversion_task() threading.Thread(target=hex_conversion_task).start() def run_disassembly(): start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')] output_dir = asm_output_directory if not start_dirs or not output_dir: messagebox.showwarning("Warning", "Please specify both directories and output directory.") return def disassembly_task(): process_files(output_dir, start_dirs) run_malware_analysis_asm() global isMonitoring if(not isMonitoring): isMonitoring = True start_monitoring() # disassembly_task() threading.Thread(target=disassembly_task).start() def start_monitoring(): directories = [d.strip() for d in hex_files_entry.get().split(',')] directories += [d.strip() for d in start_dirs_entry.get().split(',')] output_dir = output_directory def monitoring_task(): monitor_directories(directories, output_dir) # Start monitoring in a new thread threading.Thread(target=monitoring_task, daemon=True).start() print("Started monitoring directories.") def on_closing(): root.destroy() def browse_hex_directories(): directories = [] while True: directory = filedialog.askdirectory(title="Select a Directory") if not directory: break # Stop if no more directories are selected directories.append(directory) if directories: hex_files_entry.delete(0, tk.END) hex_files_entry.insert(0, ', '.join(directories)) def browse_start_dirs(): directories = [] while True: directory = filedialog.askdirectory(title="Select a Directory") if not directory: break # Stop if no more directories are selected directories.append(directory) if directories: start_dirs_entry.delete(0, tk.END) start_dirs_entry.insert(0, ', '.join(directories)) def show_frame(frame): frame.tkraise() # Create the main window root = tk.Tk() root.title("File Conversion and Disassembly Wizard") root.protocol("WM_DELETE_WINDOW", on_closing) notebook = ttk.Notebook(root) notebook.pack(fill='both', expand=True) hex_frame = ttk.Frame(notebook) asm_frame = ttk.Frame(notebook) malware_frame = ttk.Frame(notebook) notebook.add(hex_frame, text='Hex Conversion') notebook.add(asm_frame, text='ELF Disassembly') tk.Label(hex_frame, text="Select Directories to Convert to Hex:").pack(pady=5) hex_files_entry = tk.Entry(hex_frame, width=80) hex_files_entry.pack(pady=5) tk.Button(hex_frame, text="Browse...", command=browse_hex_directories).pack(pady=5) tk.Button(hex_frame, text="Convert to Hex", command=run_hex_conversion).pack(pady=10) tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:").pack(pady=5) start_dirs_entry = tk.Entry(asm_frame, width=80) start_dirs_entry.pack(pady=5) tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5) tk.Button(asm_frame, text="Disassemble ELF Files", command=run_disassembly).pack(pady=10) show_frame(hex_frame) root.mainloop()