Malware_Detection_Ubuntu/Final_Malware.py
2024-10-25 10:24:52 +05:30

577 lines
21 KiB
Python

import os
import time
import logging
import subprocess
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import pandas as pd
import pickle
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import sys
import os
import pandas as pd
import numpy as np
import codecs
import pickle
import requests
isMonitoring = False
output_directory = "outputs"
bytes_output_directory = "outputs/bytes_output"
asm_output_directory = "outputs/asm_output"
result_folder = "results"
bytes_result_directory = "results/bytes_result"
asm_result_directory = "results/asm_result"
bytes_model_directory = "bytes_models"
asm_model_directory = "asm_models"
if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory):
messagebox.showinfo("Error", "Models Not Found for Prediction")
exit(-1)
if not os.path.exists(output_directory):
os.makedirs(output_directory)
if not os.path.exists(asm_output_directory):
os.makedirs(asm_output_directory)
if not os.path.exists(bytes_output_directory):
os.makedirs(bytes_output_directory)
if not os.path.exists(result_folder):
os.makedirs(result_folder)
if not os.path.exists(asm_result_directory):
os.makedirs(asm_result_directory)
if not os.path.exists(bytes_result_directory):
os.makedirs(bytes_result_directory)
logging.basicConfig(filename= "/home/tech4biz/Desktop/malware.logs", level=logging.INFO)
def send_predictions_to_api(file_path):
url = "http://142.93.221.85:8000/predict-malware/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
def send_asm_predictions_to_api(file_path):
url = "http://142.93.221.85:8000/predict-malware/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
def format_bytes_to_hex(data):
hex_dump = ""
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_values = " ".join(f"{byte:02X}" for byte in chunk)
address = f"{i:08X}"
hex_dump += f"{address} {hex_values}\n"
return hex_dump
def convert_file_to_hex(input_file, output_file):
try:
with open(input_file, 'rb') as f:
data = f.read()
hex_dump = format_bytes_to_hex(data)
with open(output_file, 'w') as f:
f.write(hex_dump)
logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'")
except Exception as e:
logging.error(f"Error converting '{input_file}': {e}")
def scan_and_convert_directory(directory, output_dir):
for root, _, files in os.walk(directory, followlinks=True):
for filename in files:
input_file = os.path.join(root, filename)
if not filename.endswith(".bytes"):
output_file = os.path.join(output_dir, f"{filename}.bytes")
if not os.path.exists(output_file):
convert_file_to_hex(input_file, output_file)
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, output_dir, hex_dirs, disasm_dirs):
self.output_dir = output_dir
self.hex_dirs = hex_dirs
self.disasm_dirs = disasm_dirs
super().__init__()
def on_created(self, event):
if not event.is_directory:
input_file = event.src_path
output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes")
if not os.path.exists(output_file_hex):
# Convert to hex in a new thread
threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start()
threading.Thread(target=self.run_disassembly, args=(input_file,)).start()
# Disassemble in a new thread
def run_hex_conversion(self, input_file, output_file):
convert_file_to_hex(input_file, output_file)
run_malware_ai_analysis_bytes()
def run_disassembly(self, file_path):
try:
print(f"Disassembling {file_path}")
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
assembly_code = result.stdout
base_name = os.path.basename(file_path)
if not file_path.endswith(".asm"):
asm_file_name = f"{base_name}.asm"
asm_file_path = os.path.join(asm_output_directory, asm_file_name)
with open(asm_file_path, "w") as asm_file:
asm_file.write(assembly_code)
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
run_malware_analysis_asm()
except subprocess.CalledProcessError as e:
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
def monitor_directories(directories, output_dir):
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
observer = Observer()
for directory in directories:
observer.schedule(event_handler, path=directory, recursive=True)
logging.info(f"Monitoring directory: {directory}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def start_observer(directories, output_dir):
observer = Observer()
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
for directory in directories:
observer.schedule(event_handler, path=directory, recursive=True)
logging.info(f"Monitoring directory: {directory}")
observer.start()
return observer
def disassemble_elf(file_path, output_dir):
try:
print(f"Disassembling {file_path}")
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
assembly_code = result.stdout
base_name = os.path.basename(file_path)
if not file_path.endswith(".asm"):
asm_file_name = f"{base_name}.asm"
asm_file_path = os.path.join(output_dir, asm_file_name)
with open(asm_file_path, "w") as asm_file:
asm_file.write(assembly_code)
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
except subprocess.CalledProcessError as e:
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
def find_elf_files(start_dirs):
elf_files = []
for start_dir in start_dirs:
if not os.path.isdir(start_dir):
continue
try:
find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print']
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
# print("Result: ",find_result)
if find_result.returncode != 0:
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
continue
file_paths = find_result.stdout.splitlines()
print(f"Found files in {start_dir}:")
for file_path in file_paths:
try:
file_command = ['file', '--mime-type', file_path]
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
elf_files.append(file_path)
except subprocess.CalledProcessError as e:
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
except Exception as e:
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
print(f"Found ELF files: {elf_files} ")
return elf_files
def process_files(output_dir, start_dirs):
os.makedirs(output_dir, exist_ok=True)
elf_files = find_elf_files(start_dirs)
if not elf_files:
print("No ELF files found.")
return
for elf_file in elf_files:
disassemble_elf(elf_file, output_dir)
print("Disassembly complete. Assembly files are saved in the output directory.")
def process_files_malware(folder_path, files_to_process):
feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns
for k, file in enumerate(files_to_process):
if file.endswith("bytes"):
try:
with open(os.path.join(folder_path, file), "r") as byte_file:
for lines in byte_file:
line = lines.rstrip().split(" ")
for hex_code in line:
if hex_code != '??':
index = int(hex_code, 16)
if index < 257: # Keep the bounds check for 257
feature_matrix[k][index] += 1
else:
feature_matrix[k][257] += 1 # This now references the 258th feature
except:
continue
# Normalize the features
scaler = MinMaxScaler()
feature_matrix = scaler.fit_transform(feature_matrix)
return feature_matrix
def test_files(folder_path, model_path, output_csv):
files = os.listdir(folder_path)
# Check if the CSV file already exists
if os.path.exists(output_csv):
existing_results = pd.read_csv(output_csv)
already_scanned_files = set(existing_results['File'].tolist())
else:
already_scanned_files = set()
# Filter out files that have already been scanned
files_to_process = [file for file in files if file not in already_scanned_files]
if not files_to_process:
print("All files have already been scanned.")
return
# Process only the files that haven't been scanned yet
feature_matrix = process_files_malware(folder_path, files_to_process)
# Load the trained model
with open(model_path, 'rb') as model_file:
model = pickle.load(model_file)
# Make predictions
predictions = model.predict(feature_matrix)
prediction_probs = model.predict_proba(feature_matrix)
# Create a DataFrame for the new results
new_results = pd.DataFrame({
'File': files_to_process,
'Predicted Class': predictions,
'Prediction Probability': [max(probs) for probs in prediction_probs]
})
# Append new results to the existing CSV file or create a new one
if os.path.exists(output_csv):
new_results.to_csv(output_csv, mode='a', header=False, index=False)
else:
new_results.to_csv(output_csv, index=False)
print(f"New predictions appended to {output_csv}")
def run_malware_ai_analysis_bytes():
print("bytes malware analysis started")
directory = bytes_output_directory
model_files = bytes_model_directory
model_folder = model_files # Folder containing the .pkl files
model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')]
for model_file in model_files:
model_path = os.path.join(model_folder, model_file)
output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv")
test_files(directory, model_path, output_csv)
try:
send_predictions_to_api(output_csv)
except:
print("Connection Failed")
def preprocess_asm_file(file_path):
prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE']
opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx']
keywords = ['.dll', 'std::', ':dword']
registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip']
# Initialize counts
prefix_counts = np.zeros(len(prefixes), dtype=int)
opcode_counts = np.zeros(len(opcodes), dtype=int)
keyword_counts = np.zeros(len(keywords), dtype=int)
register_counts = np.zeros(len(registers), dtype=int)
# Process file
with open(file_path, 'r', encoding='cp1252', errors='replace') as f:
for line in f:
line = line.rstrip().split()
if not line:
continue
l = line[0]
for i, prefix in enumerate(prefixes):
if prefix in l:
prefix_counts[i] += 1
line = line[1:]
for i, opcode in enumerate(opcodes):
if any(opcode == li for li in line):
opcode_counts[i] += 1
for i, register in enumerate(registers):
if any(register in li and ('text' in l or 'CODE' in l) for li in line):
register_counts[i] += 1
for i, keyword in enumerate(keywords):
if any(keyword in li for li in line):
keyword_counts[i] += 1
# Create feature vector
feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts])
return feature_vector
# Main function to load models and make predictions
def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory):
print("Starting analysis...")
# Get all .asm files in the folder
asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')]
# Load all .pkl models from the models folder
model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')]
models = {}
for model_file in model_files:
model_name = os.path.splitext(model_file)[0]
with open(os.path.join(models_folder, model_file), 'rb') as f:
model_clf = pickle.load(f)
models[model_name] = model_clf
# Prediction and saving results
for model_name, model_clf in models.items():
print(f"Making asm predictions with {model_name}...")
# Generate the correct class mapping
def get_class_mapping(model_name):
if model_name == 'XGBClassifier':
return {i: i for i in range(9)} # XGB uses 0-8
else:
return {i: i+1 for i in range(9)} # Other models use 1-9
class_mapping = get_class_mapping(model_name)
# Check if result file for the model already exists
results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv'
if os.path.exists(results_file_path):
results_df = pd.read_csv(results_file_path)
else:
results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability'])
new_predictions = []
for asm_file in asm_files:
if asm_file not in results_df['file_name'].values:
file_path = os.path.join(asm_folder_path, asm_file)
feature_vector = preprocess_asm_file(file_path)
feature_vector = feature_vector.reshape(1, -1)
# Predict using the current model
prediction = model_clf.predict(feature_vector)
probability = model_clf.predict_proba(feature_vector)
mapped_prediction = class_mapping[prediction[0]]
predicted_prob = probability[0][prediction[0]]
if "XGB" in model_name.upper():
new_predictions.append({
'file_name': asm_file,
'prediction': mapped_prediction+1,
'probability': predicted_prob
})
else:
new_predictions.append({
'file_name': asm_file,
'prediction': mapped_prediction,
'probability': predicted_prob
})
# Append new predictions to results DataFrame
if new_predictions:
new_predictions_df = pd.DataFrame(new_predictions)
results_df = pd.concat([results_df, new_predictions_df], ignore_index=True)
results_df.to_csv(results_file_path, index=False)
print(f"Predictions saved to {results_file_path}.")
try:
send_asm_predictions_to_api(results_file_path)
except:
print("Connection Failed")
def run_hex_conversion():
hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')]
hex_output_dir =bytes_output_directory
if not hex_dirs or not hex_output_dir:
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
return
def hex_conversion_task():
for hex_dir in hex_dirs:
hex_dir = hex_dir.strip()
if os.path.isdir(hex_dir):
scan_and_convert_directory(hex_dir, hex_output_dir)
else:
messagebox.showwarning("Warning", f"{hex_dir} is not a directory.")
print("Hex conversion complete.")
run_malware_ai_analysis_bytes()
global isMonitoring
if(not isMonitoring):
isMonitoring = True
start_monitoring()
# hex_conversion_task()
threading.Thread(target=hex_conversion_task).start()
def run_disassembly():
start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')]
output_dir = asm_output_directory
if not start_dirs or not output_dir:
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
return
def disassembly_task():
process_files(output_dir, start_dirs)
run_malware_analysis_asm()
global isMonitoring
if(not isMonitoring):
isMonitoring = True
start_monitoring()
# disassembly_task()
threading.Thread(target=disassembly_task).start()
def start_monitoring():
directories = [d.strip() for d in hex_files_entry.get().split(',')]
directories += [d.strip() for d in start_dirs_entry.get().split(',')]
output_dir = output_directory
def monitoring_task():
monitor_directories(directories, output_dir)
# Start monitoring in a new thread
threading.Thread(target=monitoring_task, daemon=True).start()
print("Started monitoring directories.")
def on_closing():
root.destroy()
def browse_hex_directories():
directories = []
while True:
directory = filedialog.askdirectory(title="Select a Directory")
if not directory:
break # Stop if no more directories are selected
directories.append(directory)
if directories:
hex_files_entry.delete(0, tk.END)
hex_files_entry.insert(0, ', '.join(directories))
def browse_start_dirs():
directories = []
while True:
directory = filedialog.askdirectory(title="Select a Directory")
if not directory:
break # Stop if no more directories are selected
directories.append(directory)
if directories:
start_dirs_entry.delete(0, tk.END)
start_dirs_entry.insert(0, ', '.join(directories))
def show_frame(frame):
frame.tkraise()
# Create the main window
root = tk.Tk()
root.title("File Conversion and Disassembly Wizard")
root.protocol("WM_DELETE_WINDOW", on_closing)
notebook = ttk.Notebook(root)
notebook.pack(fill='both', expand=True)
hex_frame = ttk.Frame(notebook)
asm_frame = ttk.Frame(notebook)
malware_frame = ttk.Frame(notebook)
notebook.add(hex_frame, text='Hex Conversion')
notebook.add(asm_frame, text='ELF Disassembly')
tk.Label(hex_frame, text="Select Directories to Convert to Hex:").pack(pady=5)
hex_files_entry = tk.Entry(hex_frame, width=80)
hex_files_entry.pack(pady=5)
tk.Button(hex_frame, text="Browse...", command=browse_hex_directories).pack(pady=5)
tk.Button(hex_frame, text="Convert to Hex", command=run_hex_conversion).pack(pady=10)
tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:").pack(pady=5)
start_dirs_entry = tk.Entry(asm_frame, width=80)
start_dirs_entry.pack(pady=5)
tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5)
tk.Button(asm_frame, text="Disassemble ELF Files", command=run_disassembly).pack(pady=10)
show_frame(hex_frame)
root.mainloop()