577 lines
21 KiB
Python
577 lines
21 KiB
Python
import os
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox, ttk
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
import threading
|
|
import pandas as pd
|
|
import pickle
|
|
import numpy as np
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
import sys
|
|
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
import codecs
|
|
import pickle
|
|
import requests
|
|
|
|
|
|
|
|
isMonitoring = False
|
|
|
|
output_directory = "outputs"
|
|
bytes_output_directory = "outputs/bytes_output"
|
|
asm_output_directory = "outputs/asm_output"
|
|
result_folder = "results"
|
|
bytes_result_directory = "results/bytes_result"
|
|
asm_result_directory = "results/asm_result"
|
|
bytes_model_directory = "bytes_models"
|
|
asm_model_directory = "asm_models"
|
|
|
|
if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory):
|
|
messagebox.showinfo("Error", "Models Not Found for Prediction")
|
|
exit(-1)
|
|
|
|
if not os.path.exists(output_directory):
|
|
os.makedirs(output_directory)
|
|
|
|
if not os.path.exists(asm_output_directory):
|
|
os.makedirs(asm_output_directory)
|
|
|
|
if not os.path.exists(bytes_output_directory):
|
|
os.makedirs(bytes_output_directory)
|
|
|
|
if not os.path.exists(result_folder):
|
|
os.makedirs(result_folder)
|
|
|
|
if not os.path.exists(asm_result_directory):
|
|
os.makedirs(asm_result_directory)
|
|
|
|
if not os.path.exists(bytes_result_directory):
|
|
os.makedirs(bytes_result_directory)
|
|
|
|
logging.basicConfig(filename= "/home/tech4biz/Desktop/malware.logs", level=logging.INFO)
|
|
|
|
|
|
|
|
|
|
def send_predictions_to_api(file_path):
|
|
url = "http://142.93.221.85:8000/predict-malware/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
|
|
def send_asm_predictions_to_api(file_path):
|
|
url = "http://142.93.221.85:8000/predict-malware/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
|
|
|
|
def format_bytes_to_hex(data):
|
|
hex_dump = ""
|
|
for i in range(0, len(data), 16):
|
|
chunk = data[i:i+16]
|
|
hex_values = " ".join(f"{byte:02X}" for byte in chunk)
|
|
address = f"{i:08X}"
|
|
hex_dump += f"{address} {hex_values}\n"
|
|
return hex_dump
|
|
|
|
def convert_file_to_hex(input_file, output_file):
|
|
try:
|
|
with open(input_file, 'rb') as f:
|
|
data = f.read()
|
|
|
|
hex_dump = format_bytes_to_hex(data)
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write(hex_dump)
|
|
|
|
logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'")
|
|
except Exception as e:
|
|
logging.error(f"Error converting '{input_file}': {e}")
|
|
|
|
def scan_and_convert_directory(directory, output_dir):
|
|
for root, _, files in os.walk(directory, followlinks=True):
|
|
for filename in files:
|
|
input_file = os.path.join(root, filename)
|
|
if not filename.endswith(".bytes"):
|
|
output_file = os.path.join(output_dir, f"{filename}.bytes")
|
|
if not os.path.exists(output_file):
|
|
convert_file_to_hex(input_file, output_file)
|
|
|
|
class FileChangeHandler(FileSystemEventHandler):
|
|
def __init__(self, output_dir, hex_dirs, disasm_dirs):
|
|
self.output_dir = output_dir
|
|
self.hex_dirs = hex_dirs
|
|
self.disasm_dirs = disasm_dirs
|
|
super().__init__()
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory:
|
|
input_file = event.src_path
|
|
output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes")
|
|
if not os.path.exists(output_file_hex):
|
|
# Convert to hex in a new thread
|
|
threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start()
|
|
threading.Thread(target=self.run_disassembly, args=(input_file,)).start()
|
|
|
|
# Disassemble in a new thread
|
|
|
|
def run_hex_conversion(self, input_file, output_file):
|
|
convert_file_to_hex(input_file, output_file)
|
|
run_malware_ai_analysis_bytes()
|
|
def run_disassembly(self, file_path):
|
|
try:
|
|
print(f"Disassembling {file_path}")
|
|
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
|
|
assembly_code = result.stdout
|
|
|
|
base_name = os.path.basename(file_path)
|
|
if not file_path.endswith(".asm"):
|
|
asm_file_name = f"{base_name}.asm"
|
|
asm_file_path = os.path.join(asm_output_directory, asm_file_name)
|
|
|
|
with open(asm_file_path, "w") as asm_file:
|
|
asm_file.write(assembly_code)
|
|
|
|
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
|
|
run_malware_analysis_asm()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
|
|
|
|
def monitor_directories(directories, output_dir):
|
|
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
|
|
observer = Observer()
|
|
for directory in directories:
|
|
observer.schedule(event_handler, path=directory, recursive=True)
|
|
logging.info(f"Monitoring directory: {directory}")
|
|
|
|
observer.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|
|
|
|
|
|
def start_observer(directories, output_dir):
|
|
|
|
observer = Observer()
|
|
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
|
|
for directory in directories:
|
|
observer.schedule(event_handler, path=directory, recursive=True)
|
|
logging.info(f"Monitoring directory: {directory}")
|
|
|
|
observer.start()
|
|
return observer
|
|
|
|
|
|
|
|
def disassemble_elf(file_path, output_dir):
|
|
try:
|
|
print(f"Disassembling {file_path}")
|
|
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
|
|
assembly_code = result.stdout
|
|
|
|
base_name = os.path.basename(file_path)
|
|
if not file_path.endswith(".asm"):
|
|
asm_file_name = f"{base_name}.asm"
|
|
asm_file_path = os.path.join(output_dir, asm_file_name)
|
|
|
|
with open(asm_file_path, "w") as asm_file:
|
|
asm_file.write(assembly_code)
|
|
|
|
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
|
|
|
|
def find_elf_files(start_dirs):
|
|
elf_files = []
|
|
for start_dir in start_dirs:
|
|
if not os.path.isdir(start_dir):
|
|
continue
|
|
|
|
try:
|
|
find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print']
|
|
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
|
|
# print("Result: ",find_result)
|
|
if find_result.returncode != 0:
|
|
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
|
|
continue
|
|
|
|
file_paths = find_result.stdout.splitlines()
|
|
print(f"Found files in {start_dir}:")
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
file_command = ['file', '--mime-type', file_path]
|
|
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
|
|
|
|
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
|
|
elf_files.append(file_path)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
|
|
|
|
print(f"Found ELF files: {elf_files} ")
|
|
return elf_files
|
|
|
|
def process_files(output_dir, start_dirs):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
elf_files = find_elf_files(start_dirs)
|
|
|
|
if not elf_files:
|
|
print("No ELF files found.")
|
|
return
|
|
|
|
for elf_file in elf_files:
|
|
disassemble_elf(elf_file, output_dir)
|
|
|
|
print("Disassembly complete. Assembly files are saved in the output directory.")
|
|
|
|
def process_files_malware(folder_path, files_to_process):
|
|
feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns
|
|
|
|
for k, file in enumerate(files_to_process):
|
|
if file.endswith("bytes"):
|
|
try:
|
|
with open(os.path.join(folder_path, file), "r") as byte_file:
|
|
for lines in byte_file:
|
|
line = lines.rstrip().split(" ")
|
|
for hex_code in line:
|
|
if hex_code != '??':
|
|
index = int(hex_code, 16)
|
|
if index < 257: # Keep the bounds check for 257
|
|
feature_matrix[k][index] += 1
|
|
else:
|
|
feature_matrix[k][257] += 1 # This now references the 258th feature
|
|
except:
|
|
continue
|
|
# Normalize the features
|
|
scaler = MinMaxScaler()
|
|
feature_matrix = scaler.fit_transform(feature_matrix)
|
|
|
|
return feature_matrix
|
|
|
|
def test_files(folder_path, model_path, output_csv):
|
|
files = os.listdir(folder_path)
|
|
|
|
# Check if the CSV file already exists
|
|
if os.path.exists(output_csv):
|
|
existing_results = pd.read_csv(output_csv)
|
|
already_scanned_files = set(existing_results['File'].tolist())
|
|
else:
|
|
already_scanned_files = set()
|
|
|
|
# Filter out files that have already been scanned
|
|
files_to_process = [file for file in files if file not in already_scanned_files]
|
|
|
|
if not files_to_process:
|
|
print("All files have already been scanned.")
|
|
return
|
|
|
|
# Process only the files that haven't been scanned yet
|
|
feature_matrix = process_files_malware(folder_path, files_to_process)
|
|
|
|
# Load the trained model
|
|
with open(model_path, 'rb') as model_file:
|
|
model = pickle.load(model_file)
|
|
|
|
# Make predictions
|
|
predictions = model.predict(feature_matrix)
|
|
prediction_probs = model.predict_proba(feature_matrix)
|
|
|
|
# Create a DataFrame for the new results
|
|
new_results = pd.DataFrame({
|
|
'File': files_to_process,
|
|
'Predicted Class': predictions,
|
|
'Prediction Probability': [max(probs) for probs in prediction_probs]
|
|
})
|
|
|
|
# Append new results to the existing CSV file or create a new one
|
|
if os.path.exists(output_csv):
|
|
new_results.to_csv(output_csv, mode='a', header=False, index=False)
|
|
else:
|
|
new_results.to_csv(output_csv, index=False)
|
|
|
|
print(f"New predictions appended to {output_csv}")
|
|
|
|
def run_malware_ai_analysis_bytes():
|
|
print("bytes malware analysis started")
|
|
directory = bytes_output_directory
|
|
model_files = bytes_model_directory
|
|
|
|
model_folder = model_files # Folder containing the .pkl files
|
|
model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')]
|
|
|
|
for model_file in model_files:
|
|
model_path = os.path.join(model_folder, model_file)
|
|
output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv")
|
|
test_files(directory, model_path, output_csv)
|
|
try:
|
|
send_predictions_to_api(output_csv)
|
|
except:
|
|
print("Connection Failed")
|
|
|
|
|
|
|
|
|
|
def preprocess_asm_file(file_path):
|
|
prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE']
|
|
opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx']
|
|
keywords = ['.dll', 'std::', ':dword']
|
|
registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip']
|
|
|
|
# Initialize counts
|
|
prefix_counts = np.zeros(len(prefixes), dtype=int)
|
|
opcode_counts = np.zeros(len(opcodes), dtype=int)
|
|
keyword_counts = np.zeros(len(keywords), dtype=int)
|
|
register_counts = np.zeros(len(registers), dtype=int)
|
|
|
|
# Process file
|
|
with open(file_path, 'r', encoding='cp1252', errors='replace') as f:
|
|
for line in f:
|
|
line = line.rstrip().split()
|
|
if not line:
|
|
continue
|
|
l = line[0]
|
|
for i, prefix in enumerate(prefixes):
|
|
if prefix in l:
|
|
prefix_counts[i] += 1
|
|
line = line[1:]
|
|
for i, opcode in enumerate(opcodes):
|
|
if any(opcode == li for li in line):
|
|
opcode_counts[i] += 1
|
|
for i, register in enumerate(registers):
|
|
if any(register in li and ('text' in l or 'CODE' in l) for li in line):
|
|
register_counts[i] += 1
|
|
for i, keyword in enumerate(keywords):
|
|
if any(keyword in li for li in line):
|
|
keyword_counts[i] += 1
|
|
|
|
# Create feature vector
|
|
feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts])
|
|
|
|
return feature_vector
|
|
|
|
|
|
# Main function to load models and make predictions
|
|
def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory):
|
|
print("Starting analysis...")
|
|
|
|
# Get all .asm files in the folder
|
|
asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')]
|
|
|
|
# Load all .pkl models from the models folder
|
|
model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')]
|
|
|
|
models = {}
|
|
for model_file in model_files:
|
|
model_name = os.path.splitext(model_file)[0]
|
|
with open(os.path.join(models_folder, model_file), 'rb') as f:
|
|
model_clf = pickle.load(f)
|
|
models[model_name] = model_clf
|
|
|
|
# Prediction and saving results
|
|
for model_name, model_clf in models.items():
|
|
print(f"Making asm predictions with {model_name}...")
|
|
|
|
# Generate the correct class mapping
|
|
def get_class_mapping(model_name):
|
|
if model_name == 'XGBClassifier':
|
|
return {i: i for i in range(9)} # XGB uses 0-8
|
|
else:
|
|
return {i: i+1 for i in range(9)} # Other models use 1-9
|
|
|
|
class_mapping = get_class_mapping(model_name)
|
|
|
|
# Check if result file for the model already exists
|
|
results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv'
|
|
if os.path.exists(results_file_path):
|
|
results_df = pd.read_csv(results_file_path)
|
|
else:
|
|
results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability'])
|
|
|
|
new_predictions = []
|
|
|
|
for asm_file in asm_files:
|
|
if asm_file not in results_df['file_name'].values:
|
|
file_path = os.path.join(asm_folder_path, asm_file)
|
|
feature_vector = preprocess_asm_file(file_path)
|
|
feature_vector = feature_vector.reshape(1, -1)
|
|
|
|
# Predict using the current model
|
|
prediction = model_clf.predict(feature_vector)
|
|
probability = model_clf.predict_proba(feature_vector)
|
|
|
|
mapped_prediction = class_mapping[prediction[0]]
|
|
predicted_prob = probability[0][prediction[0]]
|
|
|
|
|
|
if "XGB" in model_name.upper():
|
|
new_predictions.append({
|
|
'file_name': asm_file,
|
|
'prediction': mapped_prediction+1,
|
|
'probability': predicted_prob
|
|
})
|
|
else:
|
|
new_predictions.append({
|
|
'file_name': asm_file,
|
|
'prediction': mapped_prediction,
|
|
'probability': predicted_prob
|
|
})
|
|
|
|
# Append new predictions to results DataFrame
|
|
if new_predictions:
|
|
new_predictions_df = pd.DataFrame(new_predictions)
|
|
results_df = pd.concat([results_df, new_predictions_df], ignore_index=True)
|
|
results_df.to_csv(results_file_path, index=False)
|
|
|
|
print(f"Predictions saved to {results_file_path}.")
|
|
try:
|
|
send_asm_predictions_to_api(results_file_path)
|
|
except:
|
|
print("Connection Failed")
|
|
|
|
|
|
def run_hex_conversion():
|
|
hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')]
|
|
hex_output_dir =bytes_output_directory
|
|
|
|
if not hex_dirs or not hex_output_dir:
|
|
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
|
|
return
|
|
|
|
def hex_conversion_task():
|
|
for hex_dir in hex_dirs:
|
|
hex_dir = hex_dir.strip()
|
|
if os.path.isdir(hex_dir):
|
|
scan_and_convert_directory(hex_dir, hex_output_dir)
|
|
else:
|
|
messagebox.showwarning("Warning", f"{hex_dir} is not a directory.")
|
|
|
|
print("Hex conversion complete.")
|
|
run_malware_ai_analysis_bytes()
|
|
global isMonitoring
|
|
if(not isMonitoring):
|
|
isMonitoring = True
|
|
start_monitoring()
|
|
# hex_conversion_task()
|
|
threading.Thread(target=hex_conversion_task).start()
|
|
|
|
def run_disassembly():
|
|
start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')]
|
|
output_dir = asm_output_directory
|
|
|
|
if not start_dirs or not output_dir:
|
|
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
|
|
return
|
|
|
|
def disassembly_task():
|
|
|
|
process_files(output_dir, start_dirs)
|
|
run_malware_analysis_asm()
|
|
|
|
global isMonitoring
|
|
if(not isMonitoring):
|
|
isMonitoring = True
|
|
start_monitoring()
|
|
# disassembly_task()
|
|
threading.Thread(target=disassembly_task).start()
|
|
|
|
def start_monitoring():
|
|
|
|
directories = [d.strip() for d in hex_files_entry.get().split(',')]
|
|
directories += [d.strip() for d in start_dirs_entry.get().split(',')]
|
|
output_dir = output_directory
|
|
|
|
def monitoring_task():
|
|
monitor_directories(directories, output_dir)
|
|
|
|
# Start monitoring in a new thread
|
|
threading.Thread(target=monitoring_task, daemon=True).start()
|
|
print("Started monitoring directories.")
|
|
|
|
def on_closing():
|
|
|
|
root.destroy()
|
|
|
|
def browse_hex_directories():
|
|
directories = []
|
|
while True:
|
|
directory = filedialog.askdirectory(title="Select a Directory")
|
|
if not directory:
|
|
break # Stop if no more directories are selected
|
|
directories.append(directory)
|
|
|
|
if directories:
|
|
hex_files_entry.delete(0, tk.END)
|
|
hex_files_entry.insert(0, ', '.join(directories))
|
|
|
|
def browse_start_dirs():
|
|
directories = []
|
|
while True:
|
|
directory = filedialog.askdirectory(title="Select a Directory")
|
|
if not directory:
|
|
break # Stop if no more directories are selected
|
|
directories.append(directory)
|
|
|
|
if directories:
|
|
start_dirs_entry.delete(0, tk.END)
|
|
start_dirs_entry.insert(0, ', '.join(directories))
|
|
|
|
|
|
def show_frame(frame):
|
|
frame.tkraise()
|
|
|
|
|
|
# Create the main window
|
|
root = tk.Tk()
|
|
root.title("File Conversion and Disassembly Wizard")
|
|
|
|
|
|
root.protocol("WM_DELETE_WINDOW", on_closing)
|
|
|
|
|
|
notebook = ttk.Notebook(root)
|
|
notebook.pack(fill='both', expand=True)
|
|
|
|
hex_frame = ttk.Frame(notebook)
|
|
asm_frame = ttk.Frame(notebook)
|
|
malware_frame = ttk.Frame(notebook)
|
|
notebook.add(hex_frame, text='Hex Conversion')
|
|
notebook.add(asm_frame, text='ELF Disassembly')
|
|
|
|
tk.Label(hex_frame, text="Select Directories to Convert to Hex:").pack(pady=5)
|
|
hex_files_entry = tk.Entry(hex_frame, width=80)
|
|
hex_files_entry.pack(pady=5)
|
|
tk.Button(hex_frame, text="Browse...", command=browse_hex_directories).pack(pady=5)
|
|
tk.Button(hex_frame, text="Convert to Hex", command=run_hex_conversion).pack(pady=10)
|
|
|
|
tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:").pack(pady=5)
|
|
start_dirs_entry = tk.Entry(asm_frame, width=80)
|
|
start_dirs_entry.pack(pady=5)
|
|
tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5)
|
|
|
|
tk.Button(asm_frame, text="Disassemble ELF Files", command=run_disassembly).pack(pady=10)
|
|
show_frame(hex_frame)
|
|
root.mainloop() |