Final_Installer_Merged/Final_Marged.py
2024-10-25 11:19:11 +05:30

1734 lines
62 KiB
Python

import os
import time
import logging
import subprocess
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import pandas as pd
import pickle
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import sys
import codecs
import pickle
import csv
import pickle
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from tkinter import simpledialog
import pyshark
import psutil
import joblib
from sklearn.preprocessing import StandardScaler
import sklearn.ensemble._forest
from threading import Thread, Event
# Global variable for thread control
stop_event = Event()
value = False
# Important features and weights as provided
important_features = [
'pktcount',
'byteperflow',
'tot_kbps',
'rx_kbps',
'flows',
'bytecount',
'tot_dur',
'Protocol_ICMP',
'Protocol_TCP',
'Protocol_UDP',
]
# Drop features you don't need based on what you used in training
drop_features = [
'src', 'dst', 'dt', 'dur', 'pktrate', 'pktperflow',
'Protocol_HTTP', 'Protocol_HTTPS', 'Protocol_SSH',
'Protocol_DHCP', 'Protocol_FTP', 'Protocol_SMTP',
'Protocol_POP3', 'Protocol_IMAP', 'Protocol_DNS'
]
# Automatically detect active network interface
def get_active_interface():
interfaces = psutil.net_if_addrs()
for interface, addrs in interfaces.items():
for addr in addrs:
if addr.family == 2: # AF_INET (IPv4)
if addr.address != '127.0.0.1': # Skip localhost (lo)
return interface
raise Exception("No active interface found")
# Preprocessing function to extract specific features from packets
def preprocess_packet(packet):
try:
if float(packet.frame_info.time_delta) < 1:
byteperflow = float(packet.length)
else:
byteperflow = float(packet.length) / float(packet.frame_info.time_delta)
# Capture IP or IPv6 addresses
src_ip = None
dst_ip = None
if hasattr(packet, 'ip'):
src_ip = packet.ip.src
dst_ip = packet.ip.dst
elif hasattr(packet, 'ipv6'):
src_ip = packet.ipv6.src
dst_ip = packet.ipv6.dst
if src_ip and ':' in src_ip:
return None
# Capture protocol layer
protocol = packet.highest_layer
# Add flags for common protocols
protocol_icmp = 1 if protocol == "ICMP" else 0
protocol_tcp = 1 if protocol == "TCP" else 0
protocol_udp = 1 if protocol == "UDP" else 0
features = {
'pktcount': int(packet.length),
'byteperflow': byteperflow,
'tot_kbps': float(packet.length) / 1000.0,
'rx_kbps': float(packet.length) / 1000.0,
'flows': 1,
'bytecount': float(packet.length),
'tot_dur': float(packet.frame_info.time_delta),
'Protocol_ICMP': protocol_icmp,
'Protocol_TCP': protocol_tcp,
'Protocol_UDP': protocol_udp,
'src_ip': src_ip,
'dst_ip': dst_ip,
'probability': 0.0
}
return pd.DataFrame([features])
except AttributeError:
return None
def prepare_X_test(packets_list, drop_features):
return None
def send_prediction(file_path):
url = "http://127.0.0.1:8000/ddos-predictions/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
def make_predictions(X_test, X):
logistic_regression_model = joblib.load('logistic_regression_model.pkl')
svm_model = joblib.load('svm_model.pkl')
knn_model = joblib.load('knn_model.pkl')
decision_tree_model = joblib.load('decision_tree_model.pkl')
random_forest_model = joblib.load('random_forest_model.pkl')
scaler = StandardScaler()
X_test_scaled = scaler.fit_transform(X_test)
models = {
'Logistic Regression': logistic_regression_model,
'SVM': svm_model,
'KNN': knn_model,
'Decision Tree': decision_tree_model,
'Random Forest': random_forest_model
}
all_predictions = []
for model_name, model in models.items():
y_pred = model.predict(X_test_scaled)
all_predictions.append(y_pred)
transposed_predictions = list(zip(*all_predictions))
i = 0
for row in transposed_predictions:
row_sum = sum(row)
avg = row_sum / 5
X['probability'][i] = avg
i += 1
with open('predictions.csv', mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=X.keys())
writer.writeheader()
for index, row in X.iterrows():
writer.writerow(row.to_dict())
try:
send_prediction("predictions.csv")
except:
print("could not connect to server")
def capture_packets(interface=None):
try:
subprocess.check_call(['sudo', 'apt', 'install', '-y', 'tshark'])
print("tshark installed successfully.")
except subprocess.CalledProcessError:
print("Failed to install tshark. Please install it manually.")
if interface is None:
interface = get_active_interface()
capture = pyshark.LiveCapture(interface=interface, tshark_path='/usr/bin/tshark')
try:
while value:
packets_list = []
if stop_event.is_set():
break
count = 0
for packet in capture:
if count == 15:
break
try:
processed_packet = preprocess_packet(packet)
if processed_packet is not None:
if ":" in processed_packet["dst_ip"] or ":" in processed_packet["src_ip"]:
continue
packets_list.append(processed_packet)
count += 1
except AttributeError as e:
print(f"Error processing packet: {e}")
if len(packets_list) >= 1:
X_test = pd.concat(packets_list, ignore_index=True)
X_test_scaled = X_test.drop(drop_features, axis=1, errors='ignore')
X_test_scaled = X_test_scaled.reindex(columns=important_features, fill_value=0)
if X_test_scaled is not None:
results = make_predictions(X_test_scaled, X_test)
time.sleep(10)
except KeyboardInterrupt:
print("\nPacket capturing stopped.")
def start_capture():
global thread
if os.geteuid() != 0:
root.withdraw() # Hide the main window
password = simpledialog.askstring("Password", "Enter your sudo password and run again:", show='*')
if password:
try:
subprocess.run(['sudo', '-S', sys.executable] + sys.argv, input=password.encode(), check=True)
except subprocess.CalledProcessError:
messagebox.showerror("Error", "Failed to run the script with sudo.")
finally:
root.destroy()
else:
messagebox.showerror("Error", "No password provided. Unable to run with sudo.")
elif not stop_event.is_set():
global value
value = True
stop_event.clear()
thread = Thread(target=capture_packets)
thread.start()
start_button.config(state=tk.DISABLED)
stop_button.config(state=tk.NORMAL)
def stop_capture():
global value
value = False
stop_event.set()
if thread.is_alive():
thread.join()
start_button.config(state=tk.NORMAL)
stop_button.config(state=tk.DISABLED)
root.destroy()
def setup_gui(frame):
global start_button, stop_button, thread
start_button = tk.Button(frame, text="Start Capture", command=start_capture)
start_button.pack(pady=20)
stop_button = tk.Button(frame, text="Stop Capture", command=stop_capture, state=tk.DISABLED)
stop_button.pack(pady=20)
#rensoomware tested
import os
import subprocess
import sys
import csv
import pickle
import pandas as pd
import tkinter as tk
from tkinter import filedialog
from tkinter import messagebox
import requests
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
value = True
# Function to get CPU and memory usage percentages
def get_usage_percentages():
cpu_usage = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
return cpu_usage, memory_percent
# Function to send usage data to the API
def send_data_to_api(cpu_usage, memory_usage):
api_url = 'http://127.0.0.1:8000/usage_log/' # Update this if needed
payload = {
'cpu_usage': cpu_usage,
'memory_usage': memory_usage
}
try:
response = requests.post(api_url, json=payload)
if response.status_code == 201:
print("Data logged successfully.")
else:
print("Failed to log data:", response.json())
except Exception as e:
print("Error while sending data:", str(e))
# Function to send ransomware prediction data to the API
def send_predictions_to_api(file_path):
url = "http://127.0.0.1:8000/ransomware-type-predictions/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
# Function to browse directory using Tkinter
def browse_directory():
"""Open a dialog to choose a directory."""
root = tk.Tk()
root.withdraw() # Hide the root window
directory = filedialog.askdirectory(title="Select a Directory")
return directory
# Function to calculate the MD5 hash of a file
def md5_hash(file_path):
result = subprocess.run(['md5sum', file_path], capture_output=True, text=True)
return result.stdout.split()[0]
# Extract machine type from ELF file
def get_machine_type(file_path):
try:
result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True)
for line in result.stdout.splitlines():
if 'Machine:' in line:
return line.split(':')[1].strip()
except Exception as e:
print(f"Error getting machine type: {e}")
return None
# Extract number of sections from ELF file
def get_number_of_sections(file_path):
try:
result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True)
for line in result.stdout.splitlines():
if 'Number of section headers:' in line:
return int(line.split(':')[1].strip())
except Exception as e:
print(f"Error getting number of sections: {e}")
return None
# Extract resource size from ELF file
def get_resource_size(file_path):
try:
result = subprocess.run(['readelf', '-S', file_path], capture_output=True, text=True)
for line in result.stdout.splitlines():
if '.rodata' in line:
size_hex = line.split()[5]
return int(size_hex, 16)
except Exception as e:
print(f"Error getting resource size: {e}")
return 0
# Extract linker version from ELF file
def get_linker_version(file_path):
try:
result = subprocess.run(['objdump', '-p', file_path], capture_output=True, text=True)
for line in result.stdout.splitlines():
if 'Version:' in line:
version = line.split(':')[1].strip()
major_version = version.split('.')[0]
minor_version = version.split('.')[1] if '.' in version else '0'
return major_version, minor_version
except Exception as e:
print(f"Error getting linker version: {e}")
return None, None
# Extract dynamic linking information from ELF file
def get_dynamic_info(file_path):
try:
result = subprocess.run(['readelf', '-d', file_path], capture_output=True, text=True)
dynamic_info = []
for line in result.stdout.splitlines():
dynamic_info.append(line)
return dynamic_info
except Exception as e:
print(f"Error getting dynamic linking info: {e}")
return None
# Function to extract features from an ELF file
def extract_features(file_path):
features = {
'FileName': file_path,
'md5Hash': md5_hash(file_path),
'Machine': get_machine_type(file_path),
'NumberOfSections': get_number_of_sections(file_path),
'ResourceSize': get_resource_size(file_path),
'LinkerVersionMajor': None,
'LinkerVersionMinor': None,
'DynamicInfo': get_dynamic_info(file_path)
}
# Get linker version
major_version, minor_version = get_linker_version(file_path)
features['LinkerVersionMajor'] = major_version
features['LinkerVersionMinor'] = minor_version
return features
# Function to find ELF files in the selected directory
def find_elf_files(start_dir, status_label):
if not os.path.isdir(start_dir):
return
try:
find_command = [
'find', start_dir,
'(', '-path', '/proc', '-o', '-path', '/sys', '-o', '-path', '/run', ')', '-prune', '-o',
'-type', 'f', '-print'
]
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
if find_result.returncode != 0:
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
return
file_paths = find_result.stdout.splitlines()
print(f"Found files in {start_dir}:")
print(file_paths)
for file_path in file_paths:
try:
file_command = ['file', '--mime-type', file_path]
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
status_label.config(text=f"Processing: {os.path.basename(file_path)}")
status_label.update()
if os.path.exists(file_path):
try:
extracted_features = extract_features(file_path)
except:
print("Error Reading File" + file_path)
continue
with open('data.csv', 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow(extracted_features.values())
else:
print("File not found!")
except subprocess.CalledProcessError as e:
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
except Exception as e:
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
# Function to load the model
def load_model():
with open('model.pkl', 'rb') as f:
model = pickle.load(f)
return model
# Function to predict ransomware based on features
def predict_ransomware(file_path):
model = load_model()
# Extract features from the given file
features = extract_features(file_path)
feature_df = pd.DataFrame([features])
# Predict ransomware type
prediction = model.predict(feature_df.drop(columns=['FileName', 'md5Hash', 'Machine', 'DynamicInfo']))
return prediction[0]
# Function to run predictions on selected ELF files
def run_predictions(selected_dir, status_label):
csv_filename = "data.csv"
with open(csv_filename, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['FileName',
'md5Hash',
'Machine',
'NumberOfSections',
'ResourceSize',
'LinkerVersionMajor',
'LinkerVersionMinor',
'DynamicInfo']) # Write header if file is new
csv_filename = "predictions.csv"
with open(csv_filename, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['filename', 'predicted_class']) # Write header if file is new
if selected_dir:
status_label.config(text="Processing...")
status_label.update()
find_elf_files(selected_dir, status_label)
with open("data.csv", "r") as file:
reader = csv.DictReader(file)
files = [row['FileName'] for row in reader]
for ransomware_file in files:
print(ransomware_file)
result = predict_ransomware(ransomware_file)
csv_filename = "predictions.csv"
file_exists = os.path.exists(csv_filename)
with open(csv_filename, mode='a', newline='') as file:
writer = csv.writer(file)
if not file_exists:
writer.writerow(['filename', 'predicted_class'])
writer.writerow([os.path.basename(ransomware_file), result])
status_label.config(text="Predictions Saved")
try:
send_predictions_to_api("predictions.csv")
except:
print("Connection to API failed")
try:
cpu_percent, memory_percent = get_usage_percentages()
send_data_to_api(cpu_percent, memory_percent)
except:
print("Connection to API failed")
else:
status_label.config(text="No directory selected")
global value
if(value):
value = False
print("VALLLLLL")
start_watchdog(selected_dir, status_label)
# Watchdog Event Handler
class WatcherHandler(FileSystemEventHandler):
def __init__(self, status_label):
self.status_label = status_label
def on_modified(self, event):
if event.is_directory:
return
else:
print(f"File modified: {event.src_path}")
run_predictions(os.path.dirname(event.src_path), self.status_label)
def on_created(self, event):
if event.is_directory:
return
else:
print(f"File created: {event.src_path}")
run_predictions(os.path.dirname(event.src_path), self.status_label)
# Function to start the watchdog observer
def start_watchdog(directory, status_label):
event_handler = WatcherHandler(status_label)
observer = Observer()
observer.schedule(event_handler, path=directory, recursive=True)
observer.start()
print(f"Started monitoring {directory} for changes.")
return observer
# Entry point to run predictions for selected directory
if __name__ == "__main__":
selected_directory = sys.argv[1] if len(sys.argv) > 1 else None
if selected_directory:
run_predictions(selected_directory)
else:
print("Please specify a directory.")
#remsomwareaudit
import tkinter as tk
from tkinter import messagebox
import subprocess
import os
import csv
import inotify_simple
import threading
import time
import re
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from collections import defaultdict
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from datetime import datetime
permission_operations = None
# Define event masks manually
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
IN_MODIFY = 0x00000002
IN_OPEN = 0x00000020
IN_ISDIR = 0x40000000
####################
TEST_DATA_PATH = 'combined_log_summary.csv'
VARIABLE_NAMES_PATH = 'output.txt'
def predict_ransomware():
# Load the trained model
model = tf.keras.models.load_model('updated_ransomware_classifier.h5')
# Load and prepare test data
# Read variable names
with open(VARIABLE_NAMES_PATH, encoding='utf-8') as f:
columns = [line.split(';')[1].strip() for line in f]
# Load test data
data = pd.read_csv(TEST_DATA_PATH, header=None, names=columns)
# Check and clean column names
data.columns = data.columns.str.strip()
X = data
# Standardize the features
scaler = StandardScaler()
X = scaler.fit_transform(X)
# Make predictions
predictions = model.predict(X)
predicted_labels = (predictions > 0.5).astype(int)
# Convert predictions to "Yes" or "No"
predicted_labels_text = ['Yes' if label == 1 else 'No' for label in predicted_labels.flatten()]
# Get current timestamp
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
# Save predictions and true labels to a CSV file with timestamp
output_df = pd.DataFrame({
'Timestamp': [timestamp] * len(predicted_labels_text), # Add timestamp column
'Predicted Label': predicted_labels_text
})
output_file = f'prediction.csv'
output_df.to_csv(output_file, index=False)
print(f"Predictions saved to {output_file} ({timestamp})")
def send_predictions_to_api(file_path):
url = "http://142.93.221.85:8000/predict-malware/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
try:
send_predictions_to_api(output_file)
except:
print("Error Connection Server")
####################
ID = 0
is_flip = 0
flipped = False
class PermissionChangeHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self.file_types = set()
def get_file_extension(self, file_path):
"""Extracts the file extension from the file path."""
_, ext = os.path.splitext(file_path)
return ext.strip(".") # Strip the dot from the extension
def on_modified(self, event):
if not event.is_directory:
file_path = event.src_path
file_extension = self.get_file_extension(file_path)
# Collect all file types
file_types = set()
for operations in permission_operations.values():
for key in operations:
match = re.search(r'\.\w+$', key)
if match:
file_types.add(match.group().strip('.'))
if file_extension in file_types:
current_permissions = oct(os.stat(file_path).st_mode & 0o777)
# Check all operations (chmod/chown) for this file type
for operation, perms in permission_operations.items():
for key in perms:
if file_extension in key:
perms[key] += 1
# print(f"Updated {operation} for {file_extension}: {perms[key]}")
class AuditDManagerApp:
# def __init__(self, root):
# self.root = root
# self.root.title("AuditD Manager")
# self.root.geometry("400x350") # Adjusted for additional widget
# # Create Widgets
# self.install_button = tk.Button(root, text="Install AuditD", command=self.install_auditd)
# self.install_button.pack(pady=10)
# self.start_button = tk.Button(root, text="Start AuditD", command=self.start_auditd)
# self.start_button.pack(pady=10)
# self.stop_button = tk.Button(root, text="Stop AuditD", command=self.stop_auditd)
# self.stop_button.pack(pady=10)
# self.status_button = tk.Button(root, text="Check Status", command=self.check_status)
# self.status_button.pack(pady=10)
def __init__(self, root):
self.root = root
# Ensure root is a Tk or Toplevel object
if isinstance(self.root, tk.Tk) or isinstance(self.root, tk.Toplevel):
self.root.title("AuditD Manager")
else:
raise TypeError("root must be a Tk or Toplevel window")
# Add Text Entry for Watch Path
# Initialize monitoring flags and data structures
self.monitoring = False
self.log_file = "/var/log/audit/audit.log"
self.combined_csv_file = "combined_log_summary.csv"
self.monitored_files_set = {
'bash.bashrc', 'bash_completion.d', 'environment', 'fstab', 'fwupd', 'group', 'host.conf', 'hosts', 'init.d',
'inputrc', 'ld.so.cache', 'locale.alias', 'locale.conf', 'login.defs', 'machine-id', 'modprobe.d', 'nsswitch.conf',
'passwd', 'sensors.d', 'sensors3.conf', 'shadow', 'shells', 'sudo.conf', 'sudoers', 'sudoers.d'
}
self.log_counts = {key: 0 for key in [
'Id','PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT',
'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ',
'LOGIN', 'SERVICE_START', 'SERVICE_STOP']}
# Track file extensions
self.ext_count = {ext: {'modified': 0, 'created': 0, 'deleted': 0, 'opened': 0} for ext in [
'.db', '.AR', '.01', '.GIF', '.TXT', '.scc', '.dat', '.bmp', '.STF', '.scf',
'.exe', '.typelib', '.cl', '.ocx', '.xml', '.json', '.csv', '.html', '.css',
'.js', '.py', '.log', '.sql', '.pdf', '.doc', '.docx', '.ppt', '.pptx',
'.xlsx', '.jpg', '.jpeg', '.png', '.mp4', '.mp3', '.zip', '.tar', '.gz', '.rar', '.7z', '.apk', '.iso']}
# Track permission operations
global permission_operations
permission_operations = {
'chmod': {f'chmod{perm}{ext}': 0 for perm in ['644', '755', '777'] for ext in self.ext_count},
'chown': {f'chown{owner}{ext}': 0 for owner in ['user', 'group'] for ext in self.ext_count},
'chgrp': {f'chgrp{group}{ext}': 0 for group in ['staff', 'admin'] for ext in self.ext_count}
}
# Directory operations tracking
self.directory_count = {'created': 0, 'deleted': 0, 'modified': 0, 'opened': 0}
# Initialize inotify
self.inotify = inotify_simple.INotify()
self.EVENT_MASKS = IN_CREATE | IN_DELETE | IN_MODIFY | IN_OPEN | IN_ISDIR
self.watch_path = '/etc' # Default path, will be updated
self.watch_descriptor2 = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS)
# Observer for filesystem events
self.observer = None
self.event_handler = None
self.monitor_thread = threading.Thread(target=self.monitor_logs)
# Initialize file monitoring data
self.open_count = defaultdict(int)
def run_command(self, command, success_message, error_message):
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
messagebox.showinfo("Success", success_message)
except subprocess.CalledProcessError as e:
messagebox.showerror("Error", f"{error_message}\n\n{e.stderr.decode()}")
def prompt_for_password(self, command, success_message, error_message):
password_window = tk.Toplevel(self.root)
password_window.title("Enter Sudo Password")
tk.Label(password_window, text="Enter your sudo password:").pack(pady=10)
password_entry = tk.Entry(password_window, show="*")
password_entry.pack(pady=5)
def on_submit():
password = password_entry.get()
password_window.destroy()
if not password:
messagebox.showwarning("Input Error", "Please enter your sudo password.")
return
full_command = f"echo {password} | sudo -S {command}"
self.run_command(full_command, success_message, error_message)
tk.Button(password_window, text="Submit", command=on_submit).pack(pady=10)
def install_auditd(self):
command = "sudo apt-get update && sudo apt-get install -y auditd"
self.prompt_for_password(command, "AuditD installed successfully!", "Failed to install AuditD.")
def start_auditd(self):
command = "sudo systemctl start auditd"
self.prompt_for_password(command, "AuditD started successfully!", "Failed to start AuditD.")
self.start_monitoring()
def stop_auditd(self):
command = "sudo systemctl stop auditd"
self.prompt_for_password(command, "AuditD stopped successfully!", "Failed to stop AuditD.")
self.stop_monitoring()
def check_status(self):
command = "systemctl status auditd"
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
status = result.stdout.decode()
messagebox.showinfo("AuditD Status", status)
except subprocess.CalledProcessError as e:
messagebox.showerror("Error", f"Failed to check status of AuditD.\n\n{e.stderr.decode()}")
def start_monitoring(self):
self.monitoring = True
if not self.monitor_thread.is_alive():
self.monitor_thread = threading.Thread(target=self.monitor_logs)
self.monitor_thread.start()
# Get the user-defined watch path
self.watch_path = '/etc' # Default to root if empty
self.watch_descriptor = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS)
# Start filesystem event monitoring
if self.observer is None:
self.event_handler = PermissionChangeHandler()
self.observer = Observer()
self.observer.schedule(self.event_handler, '/home', recursive=True)
self.observer.start()
def stop_monitoring(self):
self.monitoring = False
if self.monitor_thread.is_alive():
self.monitor_thread.join()
# Stop filesystem event monitoring
if self.observer:
self.observer.stop()
self.observer.join()
def monitor_logs(self):
while self.monitoring:
if os.path.exists(self.log_file):
with open(self.log_file, 'r') as f:
lines = f.readlines()
for line in lines:
if 'type=' in line:
log_type = line.split('type=')[1].split(' ')[0]
if log_type in self.log_counts:
self.log_counts[log_type] += 1
self.update_csv()
self.monitor_extensions()
predict_ransomware()
time.sleep(5) # Sleep for one second before the next update
def update_csv(self):
# headers = [
# 'Id' ,'PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT',
# 'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ',
# 'LOGIN', 'SERVICE_START', 'SERVICE_STOP'
# ] + [f'chmod{perm}{ext}' for perm in ['644', '755', '777'] for ext in self.ext_count] + \
# [f'chown{owner}{ext}' for owner in ['user', 'group'] for ext in self.ext_count] + \
# [f'chgrp{group}{ext}' for group in ['staff', 'admin'] for ext in self.ext_count] + \
# [f'Modified({ext})' for ext in self.ext_count] + \
# [f'Created({ext})' for ext in self.ext_count] + \
# [f'Deleted({ext})' for ext in self.ext_count] + \
# [f'Opened({ext})' for ext in self.ext_count] + \
# ['Directories Created', 'Directories Deleted', 'Directories Modified', 'Directories Opened']+ \
# list(self.monitored_files_set)
global ID
ID += 1
global is_flip
global flipped
if flipped:
is_flip = 1
flipped = False
else:
is_flip = 0
flipped = True
row = [
ID,
self.log_counts.get('PROCTITLE', 0),
self.log_counts.get('AVC', 0),
self.log_counts.get('SYSCALL', 0),
self.log_counts.get('USER_AUTH', 0),
self.log_counts.get('USER_ACCT', 0),
self.log_counts.get('USER_CMD', 0),
self.log_counts.get('CRED_REFR', 0),
self.log_counts.get('USER_START', 0),
self.log_counts.get('USER_AVC', 0),
self.log_counts.get('USER_END', 0),
self.log_counts.get('CRED_DISP', 0),
self.log_counts.get('CRED_ACQ', 0),
self.log_counts.get('LOGIN', 0),
self.log_counts.get('SERVICE_START', 0),
self.log_counts.get('SERVICE_STOP', 0),
]
# print(permission_operations['chmod'])
# Add permission operations and extensions
row.extend(permission_operations['chmod'].values())
row.extend(permission_operations['chown'].values())
row.extend(permission_operations['chgrp'].values())
# Add extension counts for modification, creation, deletion, and opening
for ext in self.ext_count:
row.extend([
self.ext_count[ext]['modified'],
self.ext_count[ext]['created'],
self.ext_count[ext]['deleted'],
self.ext_count[ext]['opened'],
])
# Add directory counts
row.extend([
self.directory_count['created'],
self.directory_count['deleted'],
self.directory_count['modified'],
self.directory_count['opened']
])
# Add monitored files open counts
row.extend(self.open_count.get(file, 0) for file in sorted(self.monitored_files_set))
# Write to CSV, append if file exists
file_exists = os.path.isfile(self.combined_csv_file)
with open(self.combined_csv_file, 'a', newline='') as csv_file:
writer = csv.writer(csv_file)
if not file_exists:
pass
writer.writerow(row)
def monitor_extensions(self):
events = self.inotify.read(timeout=100000)
for event in events:
(_, event_types, _, filename) = event
filename = event.name
ext = os.path.splitext(filename)[1]
if ext in self.ext_count:
if event.mask & IN_CREATE:
self.ext_count[ext]['created'] += 1
if event.mask & IN_DELETE:
self.ext_count[ext]['deleted'] += 1
if event.mask & IN_MODIFY:
self.ext_count[ext]['modified'] += 1
if event.mask & IN_OPEN:
self.ext_count[ext]['opened'] += 1
if filename in self.monitored_files_set:
self.open_count[filename] += 1
if event.mask & IN_ISDIR:
if event.mask & IN_CREATE:
self.directory_count['created'] += 1
if event.mask & IN_DELETE:
self.directory_count['deleted'] += 1
if event.mask & IN_MODIFY:
self.directory_count['modified'] += 1
if event.mask & IN_OPEN:
self.directory_count['opened'] += 1
#malwaretested
import os
import time
import logging
import subprocess
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import pandas as pd
import pickle
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import sys
import os
import pandas as pd
import numpy as np
import codecs
import pickle
import requests
isMonitoring = False
output_directory = "outputs"
bytes_output_directory = "outputs/bytes_output"
asm_output_directory = "outputs/asm_output"
result_folder = "results"
bytes_result_directory = "results/bytes_result"
asm_result_directory = "results/asm_result"
bytes_model_directory = "bytes_models"
asm_model_directory = "asm_models"
if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory):
messagebox.showinfo("Error", "Models Not Found for Prediction")
exit(-1)
if not os.path.exists(output_directory):
os.makedirs(output_directory)
if not os.path.exists(asm_output_directory):
os.makedirs(asm_output_directory)
if not os.path.exists(bytes_output_directory):
os.makedirs(bytes_output_directory)
if not os.path.exists(result_folder):
os.makedirs(result_folder)
if not os.path.exists(asm_result_directory):
os.makedirs(asm_result_directory)
if not os.path.exists(bytes_result_directory):
os.makedirs(bytes_result_directory)
logging.basicConfig(level=logging.INFO)
def send_predictions_to_api(file_path):
url = "http://142.93.221.85:8000/predict-malware/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
def send_asm_predictions_to_api(file_path):
url = "http://142.93.221.85:8000/predict-malware/"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files)
if response.status_code == 200:
print(f"Successfully sent {file_path} to API.")
else:
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
def format_bytes_to_hex(data):
hex_dump = ""
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_values = " ".join(f"{byte:02X}" for byte in chunk)
address = f"{i:08X}"
hex_dump += f"{address} {hex_values}\n"
return hex_dump
def convert_file_to_hex(input_file, output_file):
try:
with open(input_file, 'rb') as f:
data = f.read()
hex_dump = format_bytes_to_hex(data)
with open(output_file, 'w') as f:
f.write(hex_dump)
logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'")
except Exception as e:
logging.error(f"Error converting '{input_file}': {e}")
def scan_and_convert_directory(directory, output_dir):
for root, _, files in os.walk(directory, followlinks=True):
for filename in files:
input_file = os.path.join(root, filename)
if not filename.endswith(".bytes"):
output_file = os.path.join(output_dir, f"{filename}.bytes")
if not os.path.exists(output_file):
convert_file_to_hex(input_file, output_file)
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, output_dir, hex_dirs, disasm_dirs):
self.output_dir = output_dir
self.hex_dirs = hex_dirs
self.disasm_dirs = disasm_dirs
super().__init__()
def on_created(self, event):
if not event.is_directory:
input_file = event.src_path
output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes")
if not os.path.exists(output_file_hex):
# Convert to hex in a new thread
threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start()
threading.Thread(target=self.run_disassembly, args=(input_file,)).start()
# Disassemble in a new thread
def run_hex_conversion(self, input_file, output_file):
convert_file_to_hex(input_file, output_file)
run_malware_ai_analysis_bytes()
def run_disassembly(self, file_path):
try:
print(f"Disassembling {file_path}")
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
assembly_code = result.stdout
base_name = os.path.basename(file_path)
if not file_path.endswith(".asm"):
asm_file_name = f"{base_name}.asm"
asm_file_path = os.path.join(asm_output_directory, asm_file_name)
with open(asm_file_path, "w") as asm_file:
asm_file.write(assembly_code)
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
run_malware_analysis_asm()
except subprocess.CalledProcessError as e:
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
def monitor_directories(directories, output_dir):
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
observer = Observer()
for directory in directories:
observer.schedule(event_handler, path=directory, recursive=True)
logging.info(f"Monitoring directory: {directory}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def start_observer(directories, output_dir):
observer = Observer()
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
for directory in directories:
observer.schedule(event_handler, path=directory, recursive=True)
logging.info(f"Monitoring directory: {directory}")
observer.start()
return observer
def disassemble_elf(file_path, output_dir):
try:
print(f"Disassembling {file_path}")
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
assembly_code = result.stdout
base_name = os.path.basename(file_path)
if not file_path.endswith(".asm"):
asm_file_name = f"{base_name}.asm"
asm_file_path = os.path.join(output_dir, asm_file_name)
with open(asm_file_path, "w") as asm_file:
asm_file.write(assembly_code)
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
except subprocess.CalledProcessError as e:
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
def find_elf_files(start_dirs):
elf_files = []
for start_dir in start_dirs:
if not os.path.isdir(start_dir):
continue
try:
find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print']
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
if find_result.returncode != 0:
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
continue
file_paths = find_result.stdout.splitlines()
print(f"Found files in {start_dir}:")
print(file_paths)
for file_path in file_paths:
try:
file_command = ['file', '--mime-type', file_path]
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
elf_files.append(file_path)
except subprocess.CalledProcessError as e:
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
except Exception as e:
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
print(f"Found ELF files: {elf_files}")
return elf_files
def process_files(output_dir, start_dirs):
os.makedirs(output_dir, exist_ok=True)
elf_files = find_elf_files(start_dirs)
if not elf_files:
print("No ELF files found.")
return
for elf_file in elf_files:
disassemble_elf(elf_file, output_dir)
print("Disassembly complete. Assembly files are saved in the output directory.")
def process_files_malware(folder_path, files_to_process):
feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns
for k, file in enumerate(files_to_process):
if file.endswith("bytes"):
try:
with open(os.path.join(folder_path, file), "r") as byte_file:
for lines in byte_file:
line = lines.rstrip().split(" ")
for hex_code in line:
if hex_code != '??':
index = int(hex_code, 16)
if index < 257: # Keep the bounds check for 257
feature_matrix[k][index] += 1
else:
feature_matrix[k][257] += 1 # This now references the 258th feature
except:
continue
# Normalize the features
scaler = MinMaxScaler()
feature_matrix = scaler.fit_transform(feature_matrix)
return feature_matrix
def test_files(folder_path, model_path, output_csv):
files = os.listdir(folder_path)
# Check if the CSV file already exists
if os.path.exists(output_csv):
existing_results = pd.read_csv(output_csv)
already_scanned_files = set(existing_results['File'].tolist())
else:
already_scanned_files = set()
# Filter out files that have already been scanned
files_to_process = [file for file in files if file not in already_scanned_files]
if not files_to_process:
print("All files have already been scanned.")
return
# Process only the files that haven't been scanned yet
feature_matrix = process_files_malware(folder_path, files_to_process)
# Load the trained model
with open(model_path, 'rb') as model_file:
model = pickle.load(model_file)
# Make predictions
predictions = model.predict(feature_matrix)
prediction_probs = model.predict_proba(feature_matrix)
# Create a DataFrame for the new results
new_results = pd.DataFrame({
'File': files_to_process,
'Predicted Class': predictions,
'Prediction Probability': [max(probs) for probs in prediction_probs]
})
# Append new results to the existing CSV file or create a new one
if os.path.exists(output_csv):
new_results.to_csv(output_csv, mode='a', header=False, index=False)
else:
new_results.to_csv(output_csv, index=False)
print(f"New predictions appended to {output_csv}")
def run_malware_ai_analysis_bytes():
print("bytes malware analysis started")
directory = bytes_output_directory
model_files = bytes_model_directory
model_folder = model_files # Folder containing the .pkl files
model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')]
for model_file in model_files:
model_path = os.path.join(model_folder, model_file)
output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv")
test_files(directory, model_path, output_csv)
try:
send_predictions_to_api(output_csv)
except:
print("Connection Failed")
def preprocess_asm_file(file_path):
prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE']
opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx']
keywords = ['.dll', 'std::', ':dword']
registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip']
# Initialize counts
prefix_counts = np.zeros(len(prefixes), dtype=int)
opcode_counts = np.zeros(len(opcodes), dtype=int)
keyword_counts = np.zeros(len(keywords), dtype=int)
register_counts = np.zeros(len(registers), dtype=int)
# Process file
with open(file_path, 'r', encoding='cp1252', errors='replace') as f:
for line in f:
line = line.rstrip().split()
if not line:
continue
l = line[0]
for i, prefix in enumerate(prefixes):
if prefix in l:
prefix_counts[i] += 1
line = line[1:]
for i, opcode in enumerate(opcodes):
if any(opcode == li for li in line):
opcode_counts[i] += 1
for i, register in enumerate(registers):
if any(register in li and ('text' in l or 'CODE' in l) for li in line):
register_counts[i] += 1
for i, keyword in enumerate(keywords):
if any(keyword in li for li in line):
keyword_counts[i] += 1
# Create feature vector
feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts])
return feature_vector
# Main function to load models and make predictions
def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory):
print("Starting analysis...")
# Get all .asm files in the folder
asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')]
# Load all .pkl models from the models folder
model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')]
models = {}
for model_file in model_files:
model_name = os.path.splitext(model_file)[0]
with open(os.path.join(models_folder, model_file), 'rb') as f:
model_clf = pickle.load(f)
models[model_name] = model_clf
# Prediction and saving results
for model_name, model_clf in models.items():
print(f"Making asm predictions with {model_name}...")
# Generate the correct class mapping
def get_class_mapping(model_name):
if model_name == 'XGBClassifier':
return {i: i for i in range(9)} # XGB uses 0-8
else:
return {i: i+1 for i in range(9)} # Other models use 1-9
class_mapping = get_class_mapping(model_name)
# Check if result file for the model already exists
results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv'
if os.path.exists(results_file_path):
results_df = pd.read_csv(results_file_path)
else:
results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability'])
new_predictions = []
for asm_file in asm_files:
if asm_file not in results_df['file_name'].values:
file_path = os.path.join(asm_folder_path, asm_file)
feature_vector = preprocess_asm_file(file_path)
feature_vector = feature_vector.reshape(1, -1)
# Predict using the current model
prediction = model_clf.predict(feature_vector)
probability = model_clf.predict_proba(feature_vector)
mapped_prediction = class_mapping[prediction[0]]
predicted_prob = probability[0][prediction[0]]
if "XGB" in model_name.upper():
new_predictions.append({
'file_name': asm_file,
'prediction': mapped_prediction+1,
'probability': predicted_prob
})
else:
new_predictions.append({
'file_name': asm_file,
'prediction': mapped_prediction,
'probability': predicted_prob
})
# Append new predictions to results DataFrame
if new_predictions:
new_predictions_df = pd.DataFrame(new_predictions)
results_df = pd.concat([results_df, new_predictions_df], ignore_index=True)
results_df.to_csv(results_file_path, index=False)
print(f"Predictions saved to {results_file_path}.")
try:
send_asm_predictions_to_api(results_file_path)
except:
print("Connection Failed")
def run_hex_conversion():
hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')]
hex_output_dir =bytes_output_directory
if not hex_dirs or not hex_output_dir:
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
return
def hex_conversion_task():
for hex_dir in hex_dirs:
hex_dir = hex_dir.strip()
if os.path.isdir(hex_dir):
scan_and_convert_directory(hex_dir, hex_output_dir)
else:
messagebox.showwarning("Warning", f"{hex_dir} is not a directory.")
print("Hex conversion complete.")
run_malware_ai_analysis_bytes()
global isMonitoring
if(not isMonitoring):
isMonitoring = True
start_monitoring()
# hex_conversion_task()
threading.Thread(target=hex_conversion_task).start()
def run_disassembly():
start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')]
output_dir = asm_output_directory
if not start_dirs or not output_dir:
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
return
def disassembly_task():
process_files(output_dir, start_dirs)
run_malware_analysis_asm()
global isMonitoring
if(not isMonitoring):
isMonitoring = True
start_monitoring()
# disassembly_task()
threading.Thread(target=disassembly_task).start()
def start_monitoring():
directories = [d.strip() for d in hex_files_entry.get().split(',')]
directories += [d.strip() for d in start_dirs_entry.get().split(',')]
output_dir = output_directory
def monitoring_task():
monitor_directories(directories, output_dir)
# Start monitoring in a new thread
threading.Thread(target=monitoring_task, daemon=True).start()
print("Started monitoring directories.")
def on_closing():
root.destroy()
def browse_hex_directories():
directories = []
while True:
directory = filedialog.askdirectory(title="Select a Directory")
if not directory:
break # Stop if no more directories are selected
directories.append(directory)
if directories:
hex_files_entry.delete(0, tk.END)
hex_files_entry.insert(0, ', '.join(directories))
def browse_start_dirs():
directories = []
while True:
directory = filedialog.askdirectory(title="Select a Directory")
if not directory:
break # Stop if no more directories are selected
directories.append(directory)
if directories:
start_dirs_entry.delete(0, tk.END)
start_dirs_entry.insert(0, ', '.join(directories))
# def malware_gui(frame):
# frame.tkraise() # Raise the malware frame (if needed)
def malware_gui(parent_frame):
# Create a new window for malware analysis
malware_window = tk.Toplevel(parent_frame)
malware_window.title("Malware Analysis")
# Add content to the malware window
tk.Label(malware_window, text="Malware Analysis Section", font=("Arial", 16)).pack(pady=20)
# Add any additional widgets needed for malware analysis
tk.Label(malware_window, text="Select files for analysis:").pack(pady=5)
malware_files_entry = tk.Entry(malware_window, width=80)
malware_files_entry.pack(pady=5)
tk.Button(malware_window, text="Browse...", command=browse_hex_directories).pack(pady=5)
tk.Button(malware_window, text="Analyze Malware", command=lambda: print("Analyzing...")).pack(pady=10)
import tkinter as tk
from tkinter import ttk
def create_wizard_window():
global root
root = tk.Tk()
root.title("File Conversion and Disassembly Wizard")
root.geometry("600x400")
root.resizable(False, False)
# Wizard frames
frame1 = tk.Frame(root, bg="#f0f0f0")
frame2 = tk.Frame(root, bg="#f0f0f0")
frame3 = tk.Frame(root, bg="#f0f0f0")
frame4 = tk.Frame(root, bg="#f0f0f0")
frame5 = tk.Frame(root, bg="#f0f0f0")
frame6 = tk.Frame(root, bg="#f0f0f0")
frames = [frame1, frame2, frame3, frame4,frame5,frame6]
def show_frame(frame):
"""Hide all frames and show only the specified one."""
for frm in frames:
frm.pack_forget()
frame.pack(fill='both', expand=True)
def update_progress(step):
"""Update the progress bar and label to reflect the current step."""
progress_label.config(text=f"Step {step} of 4")
progress_bar['value'] = (step / 4) * 100
# Title bar frame for better aesthetics
title_frame = tk.Frame(root, bg="#0078d7")
title_frame.pack(fill="x", side="top")
title_label = tk.Label(title_frame, text="Setup Wizard", font=("Arial", 14, "bold"), fg="white", bg="#0078d7")
title_label.pack(pady=10)
# Progress bar
progress_bar = ttk.Progressbar(root, orient="horizontal", mode="determinate", length=400)
progress_bar.pack(side="bottom", pady=10)
progress_label = tk.Label(root, text="Step 1 of 4", font=("Arial", 12))
progress_label.pack(side="bottom")
# Frame 1 - Welcome Screen
label1 = tk.Label(frame1, text="Welcome to the File Conversion Wizard", font=("Arial", 16), bg="#f0f0f0")
label1.pack(pady=40)
desc_label1 = tk.Label(frame1, text="This wizard will guide you through the steps.", bg="#f0f0f0", font=("Arial", 12))
desc_label1.pack(pady=10)
next_button1 = ttk.Button(frame1, text="Next", command=lambda: [update_progress(2), show_frame(frame2)])
next_button1.pack(pady=10, side="bottom")
# Frame 2 - Packet Capture UI
label2 = tk.Label(frame2, text="Packet Capture Setup", font=("Arial", 16), bg="#f0f0f0")
label2.pack(pady=40)
# Insert your packet capture setup UI here
setup_gui(frame2) # Assuming you have this function defined
# Create a separate frame for buttons
button_frame2 = tk.Frame(frame2, bg="#f0f0f0")
button_frame2.pack(side="bottom", pady=10)
next_button2 = ttk.Button(button_frame2, text="Next", command=lambda: [update_progress(3), show_frame(frame3)], width=10)
next_button2.pack(side="right", padx=10)
back_button2 = ttk.Button(button_frame2, text="Back", command=lambda: [update_progress(1), show_frame(frame1)], width=10)
back_button2.pack(side="left", padx=10)
# Frame 3 - Malware Analysis
notebook = ttk.Notebook(frame3)
notebook.pack(fill='both', expand=True)
# Hex Conversion and ELF Disassembly tabs
hex_frame = ttk.Frame(notebook)
asm_frame = ttk.Frame(notebook)
notebook.add(hex_frame, text='Hex Conversion')
notebook.add(asm_frame, text='ELF Disassembly')
# Frame 3 Content
tk.Label(hex_frame, text="Select Directories to Convert to Hex:", font=("Arial", 12)).pack(pady=5)
global hex_files_entry
hex_files_entry = tk.Entry(hex_frame, width=80)
hex_files_entry.pack(pady=5)
tk.Button(hex_frame, text="Browse...", command=browse_hex_directories).pack(pady=5)
tk.Button(hex_frame, text="Convert to Hex", command=run_hex_conversion).pack(pady=10)
tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:", font=("Arial", 12)).pack(pady=5)
global start_dirs_entry
start_dirs_entry = tk.Entry(asm_frame, width=80)
start_dirs_entry.pack(pady=5)
tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5)
tk.Button(asm_frame, text="Disassemble ELF Files", command=run_disassembly).pack(pady=10)
next_button3 = ttk.Button(frame3, text="Next", command=lambda: [update_progress(4), show_frame(frame4)])
next_button3.pack(side="right", padx=10, pady=20)
back_button3 = ttk.Button(frame3, text="Back", command=lambda: [update_progress(2), show_frame(frame2)])
back_button3.pack(side="left", padx=10, pady=20)
# Frame 4 - Ransomware Detection
label4 = tk.Label(frame4, text="Ransomware Detection", font=("Arial", 16), bg="#f0f0f0")
label4.pack(pady=40)
directory_frame = tk.Frame(frame4, bg="#f0f0f0")
directory_frame.pack(pady=10)
selected_dir_label = tk.Label(directory_frame, text="No Directory Selected", width=40, bg="#f0f0f0")
selected_dir_label.grid(row=1, column=0)
def select_directory():
directory = browse_directory()
if directory:
selected_dir_label.config(text=directory)
browse_button = ttk.Button(directory_frame, text="Select Directory", command=select_directory)
browse_button.grid(row=0, column=0)
status_label = tk.Label(frame4, text="", fg="blue", bg="#f0f0f0")
status_label.pack(pady=10)
run_button = ttk.Button(frame4, text="Run Predictions", command=lambda: run_predictions(selected_dir_label.cget("text"), status_label))
run_button.pack(pady=10)
finish_button = ttk.Button(frame4, text="Finish", command=root.quit)
finish_button.pack(side="right", padx=10, pady=20)
back_button4 = ttk.Button(frame4, text="Back", command=lambda: [update_progress(3), show_frame(frame3)])
back_button4.pack(side="left", padx=10, pady=20)
# Frame 5 - AuditD Manager
# Pass root to AuditDManagerApp instead of frame5
audit_app = AuditDManagerApp(root)
audit_app.frame.pack(fill='both', expand=True)
# Navigation buttons for Frame 5
button_frame5 = tk.Frame(frame5, bg="#f0f0f0")
button_frame5.pack(side="bottom", pady=10)
back_button5 = ttk.Button(button_frame5, text="Back", command=lambda: [update_progress(4), show_frame(frame4)], width=10)
back_button5.pack(side="left", padx=10)
finish_button = ttk.Button(button_frame5, text="Finish", command=root.quit, width=10)
finish_button.pack(side="right", padx=10)
# Frame 6 - Summary (Optional)
label6 = tk.Label(frame6, text="Setup Complete!", font=("Arial", 16), bg="#f0f0f0")
label6.pack(pady=40)
desc_label6 = tk.Label(frame6, text="Thank you for using the setup wizard.", bg="#f0f0f0", font=("Arial", 12))
desc_label6.pack(pady=10)
# Show the first frame
show_frame(frame1)
root.mainloop()
def on_closing():
root.quit()
if __name__ == "__main__":
create_wizard_window()