1987 lines
70 KiB
Python
1987 lines
70 KiB
Python
|
|
import os
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox, ttk
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
import threading
|
|
import pandas as pd
|
|
import pickle
|
|
import numpy as np
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
import sys
|
|
import codecs
|
|
import pickle
|
|
import csv
|
|
import pickle
|
|
import psutil
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
from tkinter import simpledialog
|
|
import pyshark
|
|
import psutil
|
|
import joblib
|
|
from sklearn.preprocessing import StandardScaler
|
|
import sklearn.ensemble._forest
|
|
from threading import Thread, Event
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#hardware check
|
|
# Function to run device check before showing the wizard window
|
|
def device_check():
|
|
try:
|
|
subprocess.run(['python3', 'intaller.py'], check=True)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running device check script: {e}")
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Global variable for thread control
|
|
stop_event = Event()
|
|
value = False
|
|
|
|
# Important features and weights as provided
|
|
important_features = [
|
|
'pktcount',
|
|
'byteperflow',
|
|
'tot_kbps',
|
|
'rx_kbps',
|
|
'flows',
|
|
'bytecount',
|
|
'tot_dur',
|
|
'Protocol_ICMP',
|
|
'Protocol_TCP',
|
|
'Protocol_UDP',
|
|
]
|
|
|
|
# Drop features you don't need based on what you used in training
|
|
# Drop features you don't need based on what you used in training
|
|
drop_features = ['src', 'dst', 'dt', 'dur', 'pktrate', 'pktperflow',
|
|
|
|
'Protocol_HTTP',
|
|
'Protocol_HTTPS',
|
|
'Protocol_SSH',
|
|
'Protocol_DHCP',
|
|
'Protocol_FTP',
|
|
'Protocol_SMTP',
|
|
'Protocol_POP3',
|
|
'Protocol_IMAP',
|
|
'Protocol_DNS']
|
|
|
|
# Automatically detect active network interface
|
|
def get_active_interface():
|
|
interfaces = psutil.net_if_addrs()
|
|
for interface, addrs in interfaces.items():
|
|
for addr in addrs:
|
|
if addr.family == 2: # AF_INET (IPv4)
|
|
if addr.address != '127.0.0.1': # Skip localhost (lo)
|
|
return interface
|
|
raise Exception("No active interface found")
|
|
|
|
# Preprocessing function to extract specific features from packets
|
|
def preprocess_packet(packet):
|
|
try:
|
|
if float(packet.frame_info.time_delta) < 1:
|
|
byteperflow = float(packet.length)
|
|
else:
|
|
byteperflow = float(packet.length) / float(packet.frame_info.time_delta)
|
|
|
|
# Capture IP or IPv6 addresses
|
|
src_ip = None
|
|
dst_ip = None
|
|
if hasattr(packet, 'ip'):
|
|
src_ip = packet.ip.src
|
|
dst_ip = packet.ip.dst
|
|
elif hasattr(packet, 'ipv6'):
|
|
src_ip = packet.ipv6.src
|
|
dst_ip = packet.ipv6.dst
|
|
if src_ip and ':' in src_ip:
|
|
return None
|
|
|
|
# Capture protocol layer
|
|
protocol = packet.highest_layer
|
|
|
|
# Add flags for common protocols
|
|
protocol_icmp = 1 if protocol == "ICMP" else 0
|
|
protocol_tcp = 1 if protocol == "TCP" else 0
|
|
protocol_udp = 1 if protocol == "UDP" else 0
|
|
|
|
features = {
|
|
'pktcount': int(packet.length),
|
|
'byteperflow': byteperflow,
|
|
'tot_kbps': float(packet.length) / 1000.0,
|
|
'rx_kbps': float(packet.length) / 1000.0,
|
|
'flows': 1,
|
|
'bytecount': float(packet.length),
|
|
'tot_dur': float(packet.frame_info.time_delta),
|
|
'Protocol_ICMP': protocol_icmp,
|
|
'Protocol_TCP': protocol_tcp,
|
|
'Protocol_UDP': protocol_udp,
|
|
'src_ip': src_ip,
|
|
'dst_ip': dst_ip,
|
|
'probability': 0.0
|
|
}
|
|
|
|
return pd.DataFrame([features])
|
|
except AttributeError:
|
|
return None
|
|
|
|
def prepare_X_test(packets_list, drop_features):
|
|
return None
|
|
|
|
def send_prediction(file_path):
|
|
url = "http://127.0.0.1:8000/ddos-predictions/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
def make_predictions(X_test, X):
|
|
logistic_regression_model = joblib.load('logistic_regression_model.pkl')
|
|
svm_model = joblib.load('svm_model.pkl')
|
|
knn_model = joblib.load('knn_model.pkl')
|
|
decision_tree_model = joblib.load('decision_tree_model.pkl')
|
|
random_forest_model = joblib.load('random_forest_model.pkl')
|
|
|
|
scaler = StandardScaler()
|
|
X_test_scaled = scaler.fit_transform(X_test)
|
|
|
|
models = {
|
|
'Logistic Regression': logistic_regression_model,
|
|
'SVM': svm_model,
|
|
'KNN': knn_model,
|
|
'Decision Tree': decision_tree_model,
|
|
'Random Forest': random_forest_model
|
|
}
|
|
|
|
all_predictions = []
|
|
for model_name, model in models.items():
|
|
y_pred = model.predict(X_test_scaled)
|
|
all_predictions.append(y_pred)
|
|
|
|
transposed_predictions = list(zip(*all_predictions))
|
|
i = 0
|
|
for row in transposed_predictions:
|
|
row_sum = sum(row)
|
|
avg = row_sum / 5
|
|
X.loc[i, 'probability'] = avg
|
|
i += 1
|
|
|
|
with open('predictions.csv', mode='w', newline='') as file:
|
|
writer = csv.DictWriter(file, fieldnames=X.keys())
|
|
writer.writeheader()
|
|
for index, row in X.iterrows():
|
|
writer.writerow(row.to_dict())
|
|
try:
|
|
send_prediction("predictions.csv")
|
|
except:
|
|
print("could not connect to server")
|
|
|
|
def capture_packets(interface=None):
|
|
|
|
try:
|
|
subprocess.check_call(['sudo', 'apt', 'install', '-y', 'tshark'])
|
|
print("tshark installed successfully.")
|
|
except subprocess.CalledProcessError:
|
|
print("Failed to install tshark. Please install it manually.")
|
|
if interface is None:
|
|
interface = get_active_interface()
|
|
|
|
capture = pyshark.LiveCapture(interface=interface, tshark_path='/usr/bin/tshark')
|
|
|
|
|
|
|
|
try:
|
|
# print("here")
|
|
# capture.sniff(timeout=60)
|
|
while value:
|
|
# print(value)
|
|
packets_list = []
|
|
if stop_event.is_set():
|
|
break
|
|
# print("c")
|
|
count = 0
|
|
# print(packets_list)
|
|
for packet in capture:
|
|
# print("Packet", packet)
|
|
|
|
# if(count == 15):
|
|
# break
|
|
# print(f"Packet No. - {count} Received!")
|
|
try:
|
|
processed_packet = preprocess_packet(packet)
|
|
|
|
if processed_packet is not None:
|
|
# print(processed_packet["dst_ip"])
|
|
# print(processed_packet["src_ip"])
|
|
|
|
if ":" in processed_packet["dst_ip"] or ":" in processed_packet["src_ip"]:
|
|
print("packet isn't correct")
|
|
continue
|
|
# print(processed_packet)
|
|
packets_list.append(processed_packet)
|
|
count+=1
|
|
print(count, len(packets_list))
|
|
|
|
# X_test_scaled = prepare_X_test(packets_list, drop_features)
|
|
if len(packets_list) >= 1:
|
|
X_test = pd.concat(packets_list, ignore_index=True)
|
|
X_test_scaled = X_test.drop(drop_features, axis=1, errors='ignore')
|
|
X_test_scaled = X_test_scaled.reindex(columns=important_features, fill_value=0)
|
|
|
|
if X_test_scaled is not None:
|
|
make_predictions(X_test_scaled,X_test)
|
|
time.sleep(10)
|
|
|
|
except AttributeError as e:
|
|
print(f"Error processing packet: {e}")
|
|
print("Packets being Captured..!")
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nPacket capturing stopped.")
|
|
|
|
def start_capture():
|
|
global thread
|
|
if os.geteuid() != 0:
|
|
root.withdraw() # Hide the main window
|
|
password = simpledialog.askstring("Password", "Enter your sudo password and run again:", show='*')
|
|
if password:
|
|
try:
|
|
subprocess.run(['sudo', '-S', sys.executable] + sys.argv, input=password.encode(), check=True)
|
|
except subprocess.CalledProcessError:
|
|
messagebox.showerror("Error", "Failed to run the script with sudo.")
|
|
finally:
|
|
root.destroy()
|
|
else:
|
|
messagebox.showerror("Error", "No password provided. Unable to run with sudo.")
|
|
elif not stop_event.is_set():
|
|
global value
|
|
value = True
|
|
stop_event.clear()
|
|
thread = Thread(target=capture_packets)
|
|
thread.start()
|
|
|
|
start_button.config(state=tk.DISABLED)
|
|
stop_button.config(state=tk.NORMAL)
|
|
|
|
def stop_capture():
|
|
global value
|
|
value = False
|
|
stop_event.set()
|
|
if thread.is_alive():
|
|
thread.join()
|
|
start_button.config(state=tk.NORMAL)
|
|
stop_button.config(state=tk.DISABLED)
|
|
root.destroy()
|
|
|
|
def setup_gui(frame):
|
|
global start_button, stop_button, thread
|
|
start_button = tk.Button(frame, text="Start Capture", command=start_capture)
|
|
start_button.pack(pady=20)
|
|
|
|
stop_button = tk.Button(frame, text="Stop Capture", command=stop_capture, state=tk.DISABLED)
|
|
stop_button.pack(pady=20)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#rensoomware tested
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import csv
|
|
import pickle
|
|
import pandas as pd
|
|
import tkinter as tk
|
|
from tkinter import filedialog
|
|
from tkinter import messagebox
|
|
import requests
|
|
import psutil
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
value = True
|
|
|
|
# Function to get CPU and memory usage percentages
|
|
def get_usage_percentages():
|
|
cpu_usage = psutil.cpu_percent(interval=1)
|
|
memory_percent = psutil.virtual_memory().percent
|
|
return cpu_usage, memory_percent
|
|
|
|
# Function to send usage data to the API
|
|
def send_data_to_api(cpu_usage, memory_usage):
|
|
api_url = 'http://127.0.0.1:8000/usage_log/' # Update this if needed
|
|
payload = {
|
|
'cpu_usage': cpu_usage,
|
|
'memory_usage': memory_usage
|
|
}
|
|
try:
|
|
response = requests.post(api_url, json=payload)
|
|
if response.status_code == 201:
|
|
print("Data logged successfully.")
|
|
else:
|
|
print("Failed to log data:", response.json())
|
|
except Exception as e:
|
|
print("Error while sending data:", str(e))
|
|
|
|
# Function to send ransomware prediction data to the API
|
|
def send_predictions_to_api(file_path):
|
|
url = "http://127.0.0.1:8000/ransomware-type-predictions/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
# Function to browse directory using Tkinter
|
|
def browse_directory():
|
|
"""Open a dialog to choose a directory."""
|
|
root = tk.Tk()
|
|
root.withdraw() # Hide the root window
|
|
directory = filedialog.askdirectory(title="Select a Directory")
|
|
return directory
|
|
|
|
# Function to calculate the MD5 hash of a file
|
|
def md5_hash(file_path):
|
|
result = subprocess.run(['md5sum', file_path], capture_output=True, text=True)
|
|
return result.stdout.split()[0]
|
|
|
|
# Extract machine type from ELF file
|
|
def get_machine_type(file_path):
|
|
try:
|
|
result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True)
|
|
for line in result.stdout.splitlines():
|
|
if 'Machine:' in line:
|
|
return line.split(':')[1].strip()
|
|
except Exception as e:
|
|
print(f"Error getting machine type: {e}")
|
|
return None
|
|
|
|
# Extract number of sections from ELF file
|
|
def get_number_of_sections(file_path):
|
|
try:
|
|
result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True)
|
|
for line in result.stdout.splitlines():
|
|
if 'Number of section headers:' in line:
|
|
return int(line.split(':')[1].strip())
|
|
except Exception as e:
|
|
print(f"Error getting number of sections: {e}")
|
|
return None
|
|
|
|
# Extract resource size from ELF file
|
|
def get_resource_size(file_path):
|
|
try:
|
|
result = subprocess.run(['readelf', '-S', file_path], capture_output=True, text=True)
|
|
for line in result.stdout.splitlines():
|
|
if '.rodata' in line:
|
|
size_hex = line.split()[5]
|
|
return int(size_hex, 16)
|
|
except Exception as e:
|
|
print(f"Error getting resource size: {e}")
|
|
return 0
|
|
|
|
# Extract linker version from ELF file
|
|
def get_linker_version(file_path):
|
|
try:
|
|
result = subprocess.run(['objdump', '-p', file_path], capture_output=True, text=True)
|
|
for line in result.stdout.splitlines():
|
|
if 'Version:' in line:
|
|
version = line.split(':')[1].strip()
|
|
major_version = version.split('.')[0]
|
|
minor_version = version.split('.')[1] if '.' in version else '0'
|
|
return major_version, minor_version
|
|
except Exception as e:
|
|
print(f"Error getting linker version: {e}")
|
|
return None, None
|
|
|
|
# Extract dynamic linking information from ELF file
|
|
def get_dynamic_info(file_path):
|
|
try:
|
|
result = subprocess.run(['readelf', '-d', file_path], capture_output=True, text=True)
|
|
dynamic_info = []
|
|
for line in result.stdout.splitlines():
|
|
dynamic_info.append(line)
|
|
return dynamic_info
|
|
except Exception as e:
|
|
print(f"Error getting dynamic linking info: {e}")
|
|
return None
|
|
|
|
# Function to extract features from an ELF file
|
|
def extract_features(file_path):
|
|
features = {
|
|
'FileName': file_path,
|
|
'md5Hash': md5_hash(file_path),
|
|
'Machine': get_machine_type(file_path),
|
|
'NumberOfSections': get_number_of_sections(file_path),
|
|
'ResourceSize': get_resource_size(file_path),
|
|
'LinkerVersionMajor': None,
|
|
'LinkerVersionMinor': None,
|
|
'DynamicInfo': get_dynamic_info(file_path)
|
|
}
|
|
|
|
# Get linker version
|
|
major_version, minor_version = get_linker_version(file_path)
|
|
features['LinkerVersionMajor'] = major_version
|
|
features['LinkerVersionMinor'] = minor_version
|
|
|
|
return features
|
|
|
|
# Function to find ELF files in the selected directory
|
|
def find_elf_files(start_dir, status_label):
|
|
if not os.path.isdir(start_dir):
|
|
return
|
|
|
|
try:
|
|
find_command = [
|
|
'find', start_dir,
|
|
'(', '-path', '/proc', '-o', '-path', '/sys', '-o', '-path', '/run', ')', '-prune', '-o',
|
|
'-type', 'f', '-print'
|
|
]
|
|
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
|
|
|
|
if find_result.returncode != 0:
|
|
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
|
|
return
|
|
|
|
file_paths = find_result.stdout.splitlines()
|
|
print(f"Found files in {start_dir}:")
|
|
print(file_paths)
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
file_command = ['file', '--mime-type', file_path]
|
|
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
|
|
|
|
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
|
|
status_label.config(text=f"Processing: {os.path.basename(file_path)}")
|
|
status_label.update()
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
extracted_features = extract_features(file_path)
|
|
except:
|
|
print("Error Reading File" + file_path)
|
|
continue
|
|
|
|
with open('data.csv', 'a', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(extracted_features.values())
|
|
|
|
else:
|
|
print("File not found!")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
|
|
|
|
# Function to load the model
|
|
def load_model():
|
|
with open('model.pkl', 'rb') as f:
|
|
model = pickle.load(f)
|
|
return model
|
|
|
|
# Function to predict ransomware based on features
|
|
def predict_ransomware(file_path):
|
|
model = load_model()
|
|
|
|
# Extract features from the given file
|
|
features = extract_features(file_path)
|
|
feature_df = pd.DataFrame([features])
|
|
|
|
# Predict ransomware type
|
|
prediction = model.predict(feature_df.drop(columns=['FileName', 'md5Hash', 'Machine', 'DynamicInfo']))
|
|
return prediction[0]
|
|
|
|
# Function to run predictions on selected ELF files
|
|
def run_predictions(selected_dir, status_label):
|
|
csv_filename = "data.csv"
|
|
with open(csv_filename, mode='w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(['FileName',
|
|
'md5Hash',
|
|
'Machine',
|
|
'NumberOfSections',
|
|
'ResourceSize',
|
|
'LinkerVersionMajor',
|
|
'LinkerVersionMinor',
|
|
'DynamicInfo']) # Write header if file is new
|
|
|
|
csv_filename = "predictions.csv"
|
|
with open(csv_filename, mode='w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(['filename', 'predicted_class']) # Write header if file is new
|
|
|
|
if selected_dir:
|
|
status_label.config(text="Processing...")
|
|
status_label.update()
|
|
find_elf_files(selected_dir, status_label)
|
|
|
|
with open("data.csv", "r") as file:
|
|
reader = csv.DictReader(file)
|
|
files = [row['FileName'] for row in reader]
|
|
for ransomware_file in files:
|
|
print(ransomware_file)
|
|
result = predict_ransomware(ransomware_file)
|
|
|
|
csv_filename = "predictions.csv"
|
|
file_exists = os.path.exists(csv_filename)
|
|
|
|
with open(csv_filename, mode='a', newline='') as file:
|
|
writer = csv.writer(file)
|
|
if not file_exists:
|
|
writer.writerow(['filename', 'predicted_class'])
|
|
writer.writerow([os.path.basename(ransomware_file), result])
|
|
|
|
status_label.config(text="Predictions Saved")
|
|
|
|
try:
|
|
send_predictions_to_api("predictions.csv")
|
|
except:
|
|
print("Connection to API failed")
|
|
|
|
try:
|
|
cpu_percent, memory_percent = get_usage_percentages()
|
|
send_data_to_api(cpu_percent, memory_percent)
|
|
except:
|
|
print("Connection to API failed")
|
|
else:
|
|
status_label.config(text="No directory selected")
|
|
global value
|
|
if(value):
|
|
value = False
|
|
print("VALLLLLL")
|
|
start_watchdog(selected_dir, status_label)
|
|
|
|
|
|
# Watchdog Event Handler
|
|
class WatcherHandler(FileSystemEventHandler):
|
|
def __init__(self, status_label):
|
|
self.status_label = status_label
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
return
|
|
else:
|
|
print(f"File modified: {event.src_path}")
|
|
run_predictions(os.path.dirname(event.src_path), self.status_label)
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
else:
|
|
print(f"File created: {event.src_path}")
|
|
run_predictions(os.path.dirname(event.src_path), self.status_label)
|
|
|
|
# Function to start the watchdog observer
|
|
def start_watchdog(directory, status_label):
|
|
event_handler = WatcherHandler(status_label)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, path=directory, recursive=True)
|
|
observer.start()
|
|
print(f"Started monitoring {directory} for changes.")
|
|
return observer
|
|
# Entry point to run predictions for selected directory
|
|
if __name__ == "__main__":
|
|
selected_directory = sys.argv[1] if len(sys.argv) > 1 else None
|
|
if selected_directory:
|
|
run_predictions(selected_directory)
|
|
else:
|
|
print("Please specify a directory.")
|
|
|
|
|
|
|
|
|
|
#remsomwareaudit
|
|
import tkinter as tk
|
|
from tkinter import messagebox
|
|
import subprocess
|
|
import os
|
|
import csv
|
|
import inotify_simple
|
|
import threading
|
|
import time
|
|
import re
|
|
import requests
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
from collections import defaultdict
|
|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.preprocessing import StandardScaler
|
|
import tensorflow as tf
|
|
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
|
|
from datetime import datetime
|
|
|
|
permission_operations = None
|
|
# Define event masks manually
|
|
IN_CREATE = 0x00000100
|
|
IN_DELETE = 0x00000200
|
|
IN_MODIFY = 0x00000002
|
|
IN_OPEN = 0x00000020
|
|
IN_ISDIR = 0x40000000
|
|
|
|
####################
|
|
|
|
|
|
TEST_DATA_PATH = 'combined_log_summary.csv'
|
|
VARIABLE_NAMES_PATH = 'output.txt'
|
|
def predict_ransomware():
|
|
# Load the trained model
|
|
model = tf.keras.models.load_model('updated_ransomware_classifier.h5')
|
|
|
|
# Load and prepare test data
|
|
# Read variable names
|
|
with open(VARIABLE_NAMES_PATH, encoding='utf-8') as f:
|
|
columns = [line.split(';')[1].strip() for line in f]
|
|
|
|
# Load test data
|
|
data = pd.read_csv(TEST_DATA_PATH, header=None, names=columns)
|
|
|
|
# Check and clean column names
|
|
data.columns = data.columns.str.strip()
|
|
X = data
|
|
# Standardize the features
|
|
scaler = StandardScaler()
|
|
X = scaler.fit_transform(X)
|
|
|
|
# Make predictions
|
|
predictions = model.predict(X)
|
|
predicted_labels = (predictions > 0.5).astype(int)
|
|
|
|
|
|
# Convert predictions to "Yes" or "No"
|
|
predicted_labels_text = ['Yes' if label == 1 else 'No' for label in predicted_labels.flatten()]
|
|
|
|
|
|
# Get current timestamp
|
|
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
|
|
|
|
|
|
# Save predictions and true labels to a CSV file with timestamp
|
|
output_df = pd.DataFrame({
|
|
'Timestamp': [timestamp] * len(predicted_labels_text), # Add timestamp column
|
|
'Predicted Label': predicted_labels_text
|
|
})
|
|
|
|
output_file = f'prediction.csv'
|
|
output_df.to_csv(output_file, index=False)
|
|
print(f"Predictions saved to {output_file} ({timestamp})")
|
|
|
|
|
|
def send_predictions_to_api(file_path):
|
|
url = "http://142.93.221.85:8000/predict-malware/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
try:
|
|
send_predictions_to_api(output_file)
|
|
except:
|
|
print("Error Connection Server")
|
|
|
|
####################
|
|
|
|
ID = 0
|
|
|
|
is_flip = 0
|
|
flipped = False
|
|
class PermissionChangeHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.file_types = set()
|
|
|
|
def get_file_extension(self, file_path):
|
|
"""Extracts the file extension from the file path."""
|
|
_, ext = os.path.splitext(file_path)
|
|
return ext.strip(".") # Strip the dot from the extension
|
|
|
|
def on_modified(self, event):
|
|
if not event.is_directory:
|
|
file_path = event.src_path
|
|
file_extension = self.get_file_extension(file_path)
|
|
|
|
# Collect all file types
|
|
file_types = set()
|
|
for operations in permission_operations.values():
|
|
for key in operations:
|
|
match = re.search(r'\.\w+$', key)
|
|
if match:
|
|
file_types.add(match.group().strip('.'))
|
|
|
|
if file_extension in file_types:
|
|
current_permissions = oct(os.stat(file_path).st_mode & 0o777)
|
|
|
|
|
|
# Check all operations (chmod/chown) for this file type
|
|
for operation, perms in permission_operations.items():
|
|
for key in perms:
|
|
if file_extension in key:
|
|
perms[key] += 1
|
|
# print(f"Updated {operation} for {file_extension}: {perms[key]}")
|
|
|
|
class AuditDManagerApp:
|
|
def __init__(self, frame):
|
|
self.frame = frame
|
|
# self.root = frame
|
|
# self.root.title("AuditD Manager")
|
|
# self.root.geometry("400x350")
|
|
# # Adjusted for additional widget
|
|
self.frame.pack(fill='both', expand=True)
|
|
|
|
|
|
# Create Widgets
|
|
self.install_button = tk.Button(self.frame , text="Install AuditD", command=self.install_auditd)
|
|
self.install_button.pack(pady=10)
|
|
|
|
self.start_button = tk.Button(self.frame , text="Start AuditD", command=self.start_auditd)
|
|
self.start_button.pack(pady=10)
|
|
|
|
self.stop_button = tk.Button(self.frame , text="Stop AuditD", command=self.stop_auditd)
|
|
self.stop_button.pack(pady=10)
|
|
|
|
self.status_button = tk.Button(self.frame, text="Check Status", command=self.check_status)
|
|
self.status_button.pack(pady=10)
|
|
|
|
|
|
# Add Text Entry for Watch Path
|
|
|
|
# Initialize monitoring flags and data structures
|
|
self.monitoring = False
|
|
self.log_file = "/var/log/audit/audit.log"
|
|
self.combined_csv_file = "combined_log_summary.csv"
|
|
self.monitored_files_set = {
|
|
'bash.bashrc', 'bash_completion.d', 'environment', 'fstab', 'fwupd', 'group', 'host.conf', 'hosts', 'init.d',
|
|
'inputrc', 'ld.so.cache', 'locale.alias', 'locale.conf', 'login.defs', 'machine-id', 'modprobe.d', 'nsswitch.conf',
|
|
'passwd', 'sensors.d', 'sensors3.conf', 'shadow', 'shells', 'sudo.conf', 'sudoers', 'sudoers.d'
|
|
}
|
|
self.log_counts = {key: 0 for key in [
|
|
'Id','PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT',
|
|
'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ',
|
|
'LOGIN', 'SERVICE_START', 'SERVICE_STOP']}
|
|
|
|
# Track file extensions
|
|
self.ext_count = {ext: {'modified': 0, 'created': 0, 'deleted': 0, 'opened': 0} for ext in [
|
|
'.db', '.AR', '.01', '.GIF', '.TXT', '.scc', '.dat', '.bmp', '.STF', '.scf',
|
|
'.exe', '.typelib', '.cl', '.ocx', '.xml', '.json', '.csv', '.html', '.css',
|
|
'.js', '.py', '.log', '.sql', '.pdf', '.doc', '.docx', '.ppt', '.pptx',
|
|
'.xlsx', '.jpg', '.jpeg', '.png', '.mp4', '.mp3', '.zip', '.tar', '.gz', '.rar', '.7z', '.apk', '.iso']}
|
|
|
|
# Track permission operations
|
|
global permission_operations
|
|
permission_operations = {
|
|
'chmod': {f'chmod{perm}{ext}': 0 for perm in ['644', '755', '777'] for ext in self.ext_count},
|
|
'chown': {f'chown{owner}{ext}': 0 for owner in ['user', 'group'] for ext in self.ext_count},
|
|
'chgrp': {f'chgrp{group}{ext}': 0 for group in ['staff', 'admin'] for ext in self.ext_count}
|
|
}
|
|
|
|
# Directory operations tracking
|
|
self.directory_count = {'created': 0, 'deleted': 0, 'modified': 0, 'opened': 0}
|
|
|
|
# Initialize inotify
|
|
self.inotify = inotify_simple.INotify()
|
|
self.EVENT_MASKS = IN_CREATE | IN_DELETE | IN_MODIFY | IN_OPEN | IN_ISDIR
|
|
self.watch_path = '/etc' # Default path, will be updated
|
|
self.watch_descriptor2 = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS)
|
|
|
|
# Observer for filesystem events
|
|
self.observer = None
|
|
self.event_handler = None
|
|
self.monitor_thread = threading.Thread(target=self.monitor_logs)
|
|
|
|
# Initialize file monitoring data
|
|
self.open_count = defaultdict(int)
|
|
|
|
def run_command(self, command, success_message, error_message):
|
|
try:
|
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
messagebox.showinfo("Success", success_message)
|
|
except subprocess.CalledProcessError as e:
|
|
messagebox.showerror("Error", f"{error_message}\n\n{e.stderr.decode()}")
|
|
|
|
def prompt_for_password(self, command, success_message, error_message):
|
|
password_window = tk.Toplevel(self.root)
|
|
password_window.title("Enter Sudo Password")
|
|
|
|
tk.Label(password_window, text="Enter your sudo password:").pack(pady=10)
|
|
|
|
password_entry = tk.Entry(password_window, show="*")
|
|
password_entry.pack(pady=5)
|
|
|
|
def on_submit():
|
|
password = password_entry.get()
|
|
password_window.destroy()
|
|
if not password:
|
|
messagebox.showwarning("Input Error", "Please enter your sudo password.")
|
|
return
|
|
|
|
full_command = f"echo {password} | sudo -S {command}"
|
|
self.run_command(full_command, success_message, error_message)
|
|
tk.Button(password_window, text="Submit", command=on_submit).pack(pady=10)
|
|
|
|
def install_auditd(self):
|
|
command = "sudo apt-get update && sudo apt-get install -y auditd"
|
|
self.prompt_for_password(command, "AuditD installed successfully!", "Failed to install AuditD.")
|
|
|
|
def start_auditd(self):
|
|
command = "sudo systemctl start auditd"
|
|
self.prompt_for_password(command, "AuditD started successfully!", "Failed to start AuditD.")
|
|
self.start_monitoring()
|
|
|
|
def stop_auditd(self):
|
|
command = "sudo systemctl stop auditd"
|
|
self.prompt_for_password(command, "AuditD stopped successfully!", "Failed to stop AuditD.")
|
|
self.stop_monitoring()
|
|
|
|
def check_status(self):
|
|
command = "systemctl status auditd"
|
|
try:
|
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
status = result.stdout.decode()
|
|
messagebox.showinfo("AuditD Status", status)
|
|
except subprocess.CalledProcessError as e:
|
|
messagebox.showerror("Error", f"Failed to check status of AuditD.\n\n{e.stderr.decode()}")
|
|
|
|
def start_monitoring(self):
|
|
self.monitoring = True
|
|
if not self.monitor_thread.is_alive():
|
|
self.monitor_thread = threading.Thread(target=self.monitor_logs)
|
|
self.monitor_thread.start()
|
|
|
|
# Get the user-defined watch path
|
|
self.watch_path = '/etc' # Default to root if empty
|
|
self.watch_descriptor = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS)
|
|
|
|
# Start filesystem event monitoring
|
|
if self.observer is None:
|
|
self.event_handler = PermissionChangeHandler()
|
|
self.observer = Observer()
|
|
self.observer.schedule(self.event_handler, '/home', recursive=True)
|
|
self.observer.start()
|
|
|
|
def stop_monitoring(self):
|
|
self.monitoring = False
|
|
if self.monitor_thread.is_alive():
|
|
self.monitor_thread.join()
|
|
|
|
# Stop filesystem event monitoring
|
|
if self.observer:
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
|
|
def monitor_logs(self):
|
|
while self.monitoring:
|
|
if os.path.exists(self.log_file):
|
|
with open(self.log_file, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
for line in lines:
|
|
if 'type=' in line:
|
|
log_type = line.split('type=')[1].split(' ')[0]
|
|
if log_type in self.log_counts:
|
|
self.log_counts[log_type] += 1
|
|
|
|
self.update_csv()
|
|
|
|
self.monitor_extensions()
|
|
predict_ransomware()
|
|
time.sleep(5) # Sleep for one second before the next update
|
|
|
|
def update_csv(self):
|
|
# headers = [
|
|
# 'Id' ,'PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT',
|
|
# 'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ',
|
|
# 'LOGIN', 'SERVICE_START', 'SERVICE_STOP'
|
|
# ] + [f'chmod{perm}{ext}' for perm in ['644', '755', '777'] for ext in self.ext_count] + \
|
|
# [f'chown{owner}{ext}' for owner in ['user', 'group'] for ext in self.ext_count] + \
|
|
# [f'chgrp{group}{ext}' for group in ['staff', 'admin'] for ext in self.ext_count] + \
|
|
# [f'Modified({ext})' for ext in self.ext_count] + \
|
|
# [f'Created({ext})' for ext in self.ext_count] + \
|
|
# [f'Deleted({ext})' for ext in self.ext_count] + \
|
|
# [f'Opened({ext})' for ext in self.ext_count] + \
|
|
# ['Directories Created', 'Directories Deleted', 'Directories Modified', 'Directories Opened']+ \
|
|
# list(self.monitored_files_set)
|
|
|
|
global ID
|
|
ID += 1
|
|
global is_flip
|
|
global flipped
|
|
if flipped:
|
|
is_flip = 1
|
|
flipped = False
|
|
else:
|
|
is_flip = 0
|
|
flipped = True
|
|
|
|
row = [
|
|
ID,
|
|
self.log_counts.get('PROCTITLE', 0),
|
|
self.log_counts.get('AVC', 0),
|
|
self.log_counts.get('SYSCALL', 0),
|
|
self.log_counts.get('USER_AUTH', 0),
|
|
self.log_counts.get('USER_ACCT', 0),
|
|
self.log_counts.get('USER_CMD', 0),
|
|
self.log_counts.get('CRED_REFR', 0),
|
|
self.log_counts.get('USER_START', 0),
|
|
self.log_counts.get('USER_AVC', 0),
|
|
self.log_counts.get('USER_END', 0),
|
|
self.log_counts.get('CRED_DISP', 0),
|
|
self.log_counts.get('CRED_ACQ', 0),
|
|
self.log_counts.get('LOGIN', 0),
|
|
self.log_counts.get('SERVICE_START', 0),
|
|
self.log_counts.get('SERVICE_STOP', 0),
|
|
]
|
|
|
|
# print(permission_operations['chmod'])
|
|
# Add permission operations and extensions
|
|
row.extend(permission_operations['chmod'].values())
|
|
row.extend(permission_operations['chown'].values())
|
|
row.extend(permission_operations['chgrp'].values())
|
|
|
|
# Add extension counts for modification, creation, deletion, and opening
|
|
for ext in self.ext_count:
|
|
row.extend([
|
|
self.ext_count[ext]['modified'],
|
|
self.ext_count[ext]['created'],
|
|
self.ext_count[ext]['deleted'],
|
|
self.ext_count[ext]['opened'],
|
|
])
|
|
|
|
# Add directory counts
|
|
row.extend([
|
|
self.directory_count['created'],
|
|
self.directory_count['deleted'],
|
|
self.directory_count['modified'],
|
|
self.directory_count['opened']
|
|
])
|
|
|
|
# Add monitored files open counts
|
|
row.extend(self.open_count.get(file, 0) for file in sorted(self.monitored_files_set))
|
|
|
|
# Write to CSV, append if file exists
|
|
file_exists = os.path.isfile(self.combined_csv_file)
|
|
with open(self.combined_csv_file, 'a', newline='') as csv_file:
|
|
writer = csv.writer(csv_file)
|
|
if not file_exists:
|
|
pass
|
|
writer.writerow(row)
|
|
|
|
|
|
def monitor_extensions(self):
|
|
events = self.inotify.read(timeout=100000)
|
|
for event in events:
|
|
(_, event_types, _, filename) = event
|
|
|
|
filename = event.name
|
|
ext = os.path.splitext(filename)[1]
|
|
if ext in self.ext_count:
|
|
if event.mask & IN_CREATE:
|
|
self.ext_count[ext]['created'] += 1
|
|
if event.mask & IN_DELETE:
|
|
self.ext_count[ext]['deleted'] += 1
|
|
if event.mask & IN_MODIFY:
|
|
self.ext_count[ext]['modified'] += 1
|
|
if event.mask & IN_OPEN:
|
|
self.ext_count[ext]['opened'] += 1
|
|
if filename in self.monitored_files_set:
|
|
self.open_count[filename] += 1
|
|
|
|
if event.mask & IN_ISDIR:
|
|
if event.mask & IN_CREATE:
|
|
self.directory_count['created'] += 1
|
|
if event.mask & IN_DELETE:
|
|
self.directory_count['deleted'] += 1
|
|
if event.mask & IN_MODIFY:
|
|
self.directory_count['modified'] += 1
|
|
if event.mask & IN_OPEN:
|
|
self.directory_count['opened'] += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#malwaretested
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox, ttk
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
import threading
|
|
import pandas as pd
|
|
import pickle
|
|
import numpy as np
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
import sys
|
|
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
import codecs
|
|
import pickle
|
|
import requests
|
|
|
|
|
|
|
|
isMonitoring = False
|
|
|
|
output_directory = "outputs"
|
|
bytes_output_directory = "outputs/bytes_output"
|
|
asm_output_directory = "outputs/asm_output"
|
|
result_folder = "results"
|
|
bytes_result_directory = "results/bytes_result"
|
|
asm_result_directory = "results/asm_result"
|
|
bytes_model_directory = "bytes_models"
|
|
asm_model_directory = "asm_models"
|
|
|
|
if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory):
|
|
messagebox.showinfo("Error", "Models Not Found for Prediction")
|
|
exit(-1)
|
|
|
|
if not os.path.exists(output_directory):
|
|
os.makedirs(output_directory)
|
|
|
|
if not os.path.exists(asm_output_directory):
|
|
os.makedirs(asm_output_directory)
|
|
|
|
if not os.path.exists(bytes_output_directory):
|
|
os.makedirs(bytes_output_directory)
|
|
|
|
if not os.path.exists(result_folder):
|
|
os.makedirs(result_folder)
|
|
|
|
if not os.path.exists(asm_result_directory):
|
|
os.makedirs(asm_result_directory)
|
|
|
|
if not os.path.exists(bytes_result_directory):
|
|
os.makedirs(bytes_result_directory)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
|
|
|
|
def send_predictions_to_api(file_path):
|
|
url = "http://142.93.221.85:8000/predict-malware/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
|
|
def send_asm_predictions_to_api(file_path):
|
|
url = "http://142.93.221.85:8000/predict-malware/"
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = requests.post(url, files=files)
|
|
if response.status_code == 200:
|
|
print(f"Successfully sent {file_path} to API.")
|
|
else:
|
|
print(f"Failed to send {file_path} to API. Status code: {response.status_code}")
|
|
|
|
|
|
|
|
def format_bytes_to_hex(data):
|
|
hex_dump = ""
|
|
for i in range(0, len(data), 16):
|
|
chunk = data[i:i+16]
|
|
hex_values = " ".join(f"{byte:02X}" for byte in chunk)
|
|
address = f"{i:08X}"
|
|
hex_dump += f"{address} {hex_values}\n"
|
|
return hex_dump
|
|
|
|
def convert_file_to_hex(input_file, output_file):
|
|
try:
|
|
with open(input_file, 'rb') as f:
|
|
data = f.read()
|
|
|
|
hex_dump = format_bytes_to_hex(data)
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write(hex_dump)
|
|
|
|
logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'")
|
|
except Exception as e:
|
|
logging.error(f"Error converting '{input_file}': {e}")
|
|
|
|
def scan_and_convert_directory(directory, output_dir):
|
|
for root, _, files in os.walk(directory, followlinks=True):
|
|
for filename in files:
|
|
input_file = os.path.join(root, filename)
|
|
if not filename.endswith(".bytes"):
|
|
output_file = os.path.join(output_dir, f"{filename}.bytes")
|
|
if not os.path.exists(output_file):
|
|
convert_file_to_hex(input_file, output_file)
|
|
|
|
class FileChangeHandler(FileSystemEventHandler):
|
|
def __init__(self, output_dir, hex_dirs, disasm_dirs):
|
|
self.output_dir = output_dir
|
|
self.hex_dirs = hex_dirs
|
|
self.disasm_dirs = disasm_dirs
|
|
super().__init__()
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory:
|
|
input_file = event.src_path
|
|
output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes")
|
|
if not os.path.exists(output_file_hex):
|
|
# Convert to hex in a new thread
|
|
threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start()
|
|
threading.Thread(target=self.run_disassembly, args=(input_file,)).start()
|
|
|
|
# Disassemble in a new thread
|
|
|
|
def run_hex_conversion(self, input_file, output_file):
|
|
convert_file_to_hex(input_file, output_file)
|
|
run_malware_ai_analysis_bytes()
|
|
def run_disassembly(self, file_path):
|
|
try:
|
|
print(f"Disassembling {file_path}")
|
|
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
|
|
assembly_code = result.stdout
|
|
|
|
base_name = os.path.basename(file_path)
|
|
if not file_path.endswith(".asm"):
|
|
asm_file_name = f"{base_name}.asm"
|
|
asm_file_path = os.path.join(asm_output_directory, asm_file_name)
|
|
|
|
with open(asm_file_path, "w") as asm_file:
|
|
asm_file.write(assembly_code)
|
|
|
|
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
|
|
run_malware_analysis_asm()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
|
|
|
|
def monitor_directories(directories, output_dir):
|
|
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
|
|
observer = Observer()
|
|
for directory in directories:
|
|
observer.schedule(event_handler, path=directory, recursive=True)
|
|
logging.info(f"Monitoring directory: {directory}")
|
|
|
|
observer.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|
|
|
|
|
|
def start_observer(directories, output_dir):
|
|
|
|
observer = Observer()
|
|
event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories)
|
|
for directory in directories:
|
|
observer.schedule(event_handler, path=directory, recursive=True)
|
|
logging.info(f"Monitoring directory: {directory}")
|
|
|
|
observer.start()
|
|
return observer
|
|
|
|
|
|
|
|
def disassemble_elf(file_path, output_dir):
|
|
try:
|
|
print(f"Disassembling {file_path}")
|
|
result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True)
|
|
assembly_code = result.stdout
|
|
|
|
base_name = os.path.basename(file_path)
|
|
if not file_path.endswith(".asm"):
|
|
asm_file_name = f"{base_name}.asm"
|
|
asm_file_path = os.path.join(output_dir, asm_file_name)
|
|
|
|
with open(asm_file_path, "w") as asm_file:
|
|
asm_file.write(assembly_code)
|
|
|
|
print(f"Disassembly complete. Assembly code saved to {asm_file_path}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error disassembling file {file_path}: {e}", file=sys.stderr)
|
|
|
|
def find_elf_files(start_dirs):
|
|
elf_files = []
|
|
for start_dir in start_dirs:
|
|
if not os.path.isdir(start_dir):
|
|
continue
|
|
|
|
try:
|
|
find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print']
|
|
find_result = subprocess.run(find_command, capture_output=True, text=True, check=False)
|
|
|
|
if find_result.returncode != 0:
|
|
print(f"Error running find command: {find_result.stderr}", file=sys.stderr)
|
|
continue
|
|
|
|
file_paths = find_result.stdout.splitlines()
|
|
print(f"Found files in {start_dir}:")
|
|
print(file_paths)
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
file_command = ['file', '--mime-type', file_path]
|
|
file_result = subprocess.run(file_command, capture_output=True, text=True, check=True)
|
|
|
|
if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout:
|
|
elf_files.append(file_path)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running file command on {file_path}: {e}", file=sys.stderr)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing directory {start_dir}: {e}", file=sys.stderr)
|
|
|
|
print(f"Found ELF files: {elf_files}")
|
|
return elf_files
|
|
|
|
def process_files(output_dir, start_dirs):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
elf_files = find_elf_files(start_dirs)
|
|
|
|
if not elf_files:
|
|
print("No ELF files found.")
|
|
return
|
|
|
|
for elf_file in elf_files:
|
|
disassemble_elf(elf_file, output_dir)
|
|
|
|
print("Disassembly complete. Assembly files are saved in the output directory.")
|
|
|
|
def process_files_malware(folder_path, files_to_process):
|
|
feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns
|
|
|
|
for k, file in enumerate(files_to_process):
|
|
if file.endswith("bytes"):
|
|
try:
|
|
with open(os.path.join(folder_path, file), "r") as byte_file:
|
|
for lines in byte_file:
|
|
line = lines.rstrip().split(" ")
|
|
for hex_code in line:
|
|
if hex_code != '??':
|
|
index = int(hex_code, 16)
|
|
if index < 257: # Keep the bounds check for 257
|
|
feature_matrix[k][index] += 1
|
|
else:
|
|
feature_matrix[k][257] += 1 # This now references the 258th feature
|
|
except:
|
|
continue
|
|
# Normalize the features
|
|
scaler = MinMaxScaler()
|
|
feature_matrix = scaler.fit_transform(feature_matrix)
|
|
|
|
return feature_matrix
|
|
|
|
def test_files(folder_path, model_path, output_csv):
|
|
files = os.listdir(folder_path)
|
|
|
|
# Check if the CSV file already exists
|
|
if os.path.exists(output_csv):
|
|
existing_results = pd.read_csv(output_csv)
|
|
already_scanned_files = set(existing_results['File'].tolist())
|
|
else:
|
|
already_scanned_files = set()
|
|
|
|
# Filter out files that have already been scanned
|
|
files_to_process = [file for file in files if file not in already_scanned_files]
|
|
|
|
if not files_to_process:
|
|
print("All files have already been scanned.")
|
|
return
|
|
|
|
# Process only the files that haven't been scanned yet
|
|
feature_matrix = process_files_malware(folder_path, files_to_process)
|
|
|
|
# Load the trained model
|
|
with open(model_path, 'rb') as model_file:
|
|
model = pickle.load(model_file)
|
|
|
|
# Make predictions
|
|
predictions = model.predict(feature_matrix)
|
|
prediction_probs = model.predict_proba(feature_matrix)
|
|
|
|
# Create a DataFrame for the new results
|
|
new_results = pd.DataFrame({
|
|
'File': files_to_process,
|
|
'Predicted Class': predictions,
|
|
'Prediction Probability': [max(probs) for probs in prediction_probs]
|
|
})
|
|
|
|
# Append new results to the existing CSV file or create a new one
|
|
if os.path.exists(output_csv):
|
|
new_results.to_csv(output_csv, mode='a', header=False, index=False)
|
|
else:
|
|
new_results.to_csv(output_csv, index=False)
|
|
|
|
print(f"New predictions appended to {output_csv}")
|
|
|
|
def run_malware_ai_analysis_bytes():
|
|
print("bytes malware analysis started")
|
|
directory = bytes_output_directory
|
|
model_files = bytes_model_directory
|
|
|
|
model_folder = model_files # Folder containing the .pkl files
|
|
model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')]
|
|
|
|
for model_file in model_files:
|
|
model_path = os.path.join(model_folder, model_file)
|
|
output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv")
|
|
test_files(directory, model_path, output_csv)
|
|
try:
|
|
send_predictions_to_api(output_csv)
|
|
except:
|
|
print("Connection Failed")
|
|
|
|
|
|
|
|
|
|
def preprocess_asm_file(file_path):
|
|
prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE']
|
|
opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx']
|
|
keywords = ['.dll', 'std::', ':dword']
|
|
registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip']
|
|
|
|
# Initialize counts
|
|
prefix_counts = np.zeros(len(prefixes), dtype=int)
|
|
opcode_counts = np.zeros(len(opcodes), dtype=int)
|
|
keyword_counts = np.zeros(len(keywords), dtype=int)
|
|
register_counts = np.zeros(len(registers), dtype=int)
|
|
|
|
# Process file
|
|
with open(file_path, 'r', encoding='cp1252', errors='replace') as f:
|
|
for line in f:
|
|
line = line.rstrip().split()
|
|
if not line:
|
|
continue
|
|
l = line[0]
|
|
for i, prefix in enumerate(prefixes):
|
|
if prefix in l:
|
|
prefix_counts[i] += 1
|
|
line = line[1:]
|
|
for i, opcode in enumerate(opcodes):
|
|
if any(opcode == li for li in line):
|
|
opcode_counts[i] += 1
|
|
for i, register in enumerate(registers):
|
|
if any(register in li and ('text' in l or 'CODE' in l) for li in line):
|
|
register_counts[i] += 1
|
|
for i, keyword in enumerate(keywords):
|
|
if any(keyword in li for li in line):
|
|
keyword_counts[i] += 1
|
|
|
|
# Create feature vector
|
|
feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts])
|
|
|
|
return feature_vector
|
|
|
|
|
|
# Main function to load models and make predictions
|
|
def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory):
|
|
print("Starting analysis...")
|
|
|
|
# Get all .asm files in the folder
|
|
asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')]
|
|
|
|
# Load all .pkl models from the models folder
|
|
model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')]
|
|
|
|
models = {}
|
|
for model_file in model_files:
|
|
model_name = os.path.splitext(model_file)[0]
|
|
with open(os.path.join(models_folder, model_file), 'rb') as f:
|
|
model_clf = pickle.load(f)
|
|
models[model_name] = model_clf
|
|
|
|
# Prediction and saving results
|
|
for model_name, model_clf in models.items():
|
|
print(f"Making asm predictions with {model_name}...")
|
|
|
|
# Generate the correct class mapping
|
|
def get_class_mapping(model_name):
|
|
if model_name == 'XGBClassifier':
|
|
return {i: i for i in range(9)} # XGB uses 0-8
|
|
else:
|
|
return {i: i+1 for i in range(9)} # Other models use 1-9
|
|
|
|
class_mapping = get_class_mapping(model_name)
|
|
|
|
# Check if result file for the model already exists
|
|
results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv'
|
|
if os.path.exists(results_file_path):
|
|
results_df = pd.read_csv(results_file_path)
|
|
else:
|
|
results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability'])
|
|
|
|
new_predictions = []
|
|
|
|
for asm_file in asm_files:
|
|
if asm_file not in results_df['file_name'].values:
|
|
file_path = os.path.join(asm_folder_path, asm_file)
|
|
feature_vector = preprocess_asm_file(file_path)
|
|
feature_vector = feature_vector.reshape(1, -1)
|
|
|
|
# Predict using the current model
|
|
prediction = model_clf.predict(feature_vector)
|
|
probability = model_clf.predict_proba(feature_vector)
|
|
|
|
mapped_prediction = class_mapping[prediction[0]]
|
|
predicted_prob = probability[0][prediction[0]]
|
|
|
|
|
|
if "XGB" in model_name.upper():
|
|
new_predictions.append({
|
|
'file_name': asm_file,
|
|
'prediction': mapped_prediction+1,
|
|
'probability': predicted_prob
|
|
})
|
|
else:
|
|
new_predictions.append({
|
|
'file_name': asm_file,
|
|
'prediction': mapped_prediction,
|
|
'probability': predicted_prob
|
|
})
|
|
|
|
# Append new predictions to results DataFrame
|
|
if new_predictions:
|
|
new_predictions_df = pd.DataFrame(new_predictions)
|
|
results_df = pd.concat([results_df, new_predictions_df], ignore_index=True)
|
|
results_df.to_csv(results_file_path, index=False)
|
|
|
|
print(f"Predictions saved to {results_file_path}.")
|
|
try:
|
|
send_asm_predictions_to_api(results_file_path)
|
|
except:
|
|
print("Connection Failed")
|
|
|
|
|
|
def run_hex_conversion():
|
|
hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')]
|
|
hex_output_dir =bytes_output_directory
|
|
|
|
if not hex_dirs or not hex_output_dir:
|
|
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
|
|
return
|
|
|
|
def hex_conversion_task():
|
|
for hex_dir in hex_dirs:
|
|
hex_dir = hex_dir.strip()
|
|
if os.path.isdir(hex_dir):
|
|
scan_and_convert_directory(hex_dir, hex_output_dir)
|
|
else:
|
|
messagebox.showwarning("Warning", f"{hex_dir} is not a directory.")
|
|
|
|
print("Hex conversion complete.")
|
|
run_malware_ai_analysis_bytes()
|
|
global isMonitoring
|
|
if(not isMonitoring):
|
|
isMonitoring = True
|
|
start_monitoring()
|
|
# hex_conversion_task()
|
|
threading.Thread(target=hex_conversion_task).start()
|
|
|
|
def run_disassembly():
|
|
start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')]
|
|
output_dir = asm_output_directory
|
|
|
|
if not start_dirs or not output_dir:
|
|
messagebox.showwarning("Warning", "Please specify both directories and output directory.")
|
|
return
|
|
|
|
def disassembly_task():
|
|
|
|
process_files(output_dir, start_dirs)
|
|
run_malware_analysis_asm()
|
|
|
|
global isMonitoring
|
|
if(not isMonitoring):
|
|
isMonitoring = True
|
|
start_monitoring()
|
|
# disassembly_task()
|
|
threading.Thread(target=disassembly_task).start()
|
|
|
|
def start_monitoring():
|
|
|
|
directories = [d.strip() for d in hex_files_entry.get().split(',')]
|
|
directories += [d.strip() for d in start_dirs_entry.get().split(',')]
|
|
output_dir = output_directory
|
|
|
|
def monitoring_task():
|
|
monitor_directories(directories, output_dir)
|
|
|
|
# Start monitoring in a new thread
|
|
threading.Thread(target=monitoring_task, daemon=True).start()
|
|
print("Started monitoring directories.")
|
|
|
|
def on_closing():
|
|
|
|
root.destroy()
|
|
|
|
def browse_hex_directories():
|
|
directories = []
|
|
while True:
|
|
directory = filedialog.askdirectory(title="Select a Directory")
|
|
if not directory:
|
|
break # Stop if no more directories are selected
|
|
directories.append(directory)
|
|
|
|
if directories:
|
|
hex_files_entry.delete(0, tk.END)
|
|
hex_files_entry.insert(0, ', '.join(directories))
|
|
|
|
def browse_start_dirs():
|
|
directories = []
|
|
while True:
|
|
directory = filedialog.askdirectory(title="Select a Directory")
|
|
if not directory:
|
|
break # Stop if no more directories are selected
|
|
directories.append(directory)
|
|
|
|
if directories:
|
|
start_dirs_entry.delete(0, tk.END)
|
|
start_dirs_entry.insert(0, ', '.join(directories))
|
|
|
|
|
|
def malware_gui(parent_frame):
|
|
# Create a new window for malware analysis
|
|
malware_window = tk.Toplevel(parent_frame)
|
|
malware_window.title("Malware Analysis")
|
|
|
|
# Add content to the malware window
|
|
tk.Label(malware_window, text="Malware Analysis Section", font=("Arial", 16)).pack(pady=20)
|
|
|
|
# Add any additional widgets needed for malware analysis
|
|
tk.Label(malware_window, text="Select files for analysis:").pack(pady=5)
|
|
malware_files_entry = tk.Entry(malware_window, width=80)
|
|
malware_files_entry.pack(pady=5)
|
|
tk.Button(malware_window, text="Browse...", command=browse_hex_directories).pack(pady=5)
|
|
tk.Button(malware_window, text="Analyze Malware", command=lambda: print("Analyzing...")).pack(pady=10)
|
|
|
|
|
|
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
# Requires `pip install ttkthemes`
|
|
|
|
def create_wizard_window():
|
|
global root
|
|
root = tk.Tk()
|
|
root.title("File Conversion and Disassembly Wizard")
|
|
root.geometry("700x450")
|
|
root.resizable(False, False)
|
|
|
|
# Wizard frames with a more appealing color scheme
|
|
frame1 = tk.Frame(root, bg="#ffffff")
|
|
frame2 = tk.Frame(root, bg="#f9f9f9")
|
|
frame3 = tk.Frame(root, bg="#f9f9f9")
|
|
frame4 = tk.Frame(root, bg="#ffffff")
|
|
frame5 = tk.Frame(root, bg="#f9f9f9")
|
|
frame6 = tk.Frame(root, bg="#ffffff")
|
|
frames = [frame1, frame2, frame3, frame4, frame5, frame6]
|
|
|
|
def show_frame(frame):
|
|
"""Hide all frames and show only the specified one."""
|
|
for frm in frames:
|
|
frm.pack_forget()
|
|
frame.pack(fill='both', expand=True)
|
|
|
|
def update_progress(step):
|
|
"""Update the progress bar and label to reflect the current step."""
|
|
progress_label.config(text=f"Step {step} of 5")
|
|
progress_bar['value'] = (step / 5) * 100
|
|
|
|
# Enhanced Title Bar with Icons
|
|
title_frame = tk.Frame(root, bg="#283593")
|
|
title_frame.pack(fill="x", side="top")
|
|
|
|
title_label = tk.Label(
|
|
title_frame,
|
|
text="🛠 Setup Wizard",
|
|
font=("Arial", 16, "bold"),
|
|
fg="white",
|
|
bg="#283593",
|
|
anchor="w", padx=20
|
|
)
|
|
title_label.pack(pady=10)
|
|
|
|
# Progress bar with percentage display
|
|
progress_frame = tk.Frame(root, bg="#f0f0f0")
|
|
progress_frame.pack(fill="x", side="bottom")
|
|
progress_bar = ttk.Progressbar(progress_frame, orient="horizontal", mode="determinate", length=500)
|
|
progress_bar.pack(side="left", padx=20, pady=10)
|
|
progress_label = tk.Label(progress_frame, text="Step 1 of 5", font=("Arial", 12), bg="#f0f0f0")
|
|
progress_label.pack(side="left", padx=10)
|
|
|
|
# Frame 1 - Welcome Screen
|
|
label1 = tk.Label(frame1, text="Welcome to the File Conversion Wizard", font=("Arial", 18, "bold"), bg="#ffffff")
|
|
label1.pack(pady=40)
|
|
desc_label1 = tk.Label(frame1, text="This wizard will guide you through the steps.", bg="#ffffff", font=("Arial", 12))
|
|
desc_label1.pack(pady=10)
|
|
|
|
next_button1 = ttk.Button(frame1, text="Next ➡️", command=lambda: [update_progress(2), show_frame(frame2)])
|
|
next_button1.pack(pady=20)
|
|
|
|
# Frame 2 - Packet Capture Setup
|
|
label2 = tk.Label(frame2, text="📡 Packet Capture Setup", font=("Arial", 16, "bold"), bg="#f9f9f9")
|
|
label2.pack(pady=40)
|
|
setup_gui(frame2) # Assuming you have this function defined
|
|
|
|
|
|
button_frame2 = tk.Frame(frame2, bg="#f9f9f9")
|
|
button_frame2.pack(side="bottom", pady=10)
|
|
|
|
next_button2 = ttk.Button(button_frame2, text="Next ➡️", command=lambda: [update_progress(3), show_frame(frame3)], width=10)
|
|
next_button2.pack(side="right", padx=10)
|
|
back_button2 = ttk.Button(button_frame2, text="⬅️ Back", command=lambda: [update_progress(1), show_frame(frame1)], width=10)
|
|
back_button2.pack(side="left", padx=10)
|
|
|
|
# Frame 3 - Malware Analysis with Tabbed Layout
|
|
notebook = ttk.Notebook(frame3)
|
|
notebook.pack(fill='both', expand=True)
|
|
|
|
hex_frame = ttk.Frame(notebook)
|
|
asm_frame = ttk.Frame(notebook)
|
|
notebook.add(hex_frame, text='🔢 Hex Conversion')
|
|
notebook.add(asm_frame, text='⚙️ ELF Disassembly')
|
|
|
|
tk.Label(hex_frame, text="Select Directories to Convert to Hex:", font=("Arial", 12)).pack(pady=5)
|
|
global hex_files_entry
|
|
hex_files_entry = tk.Entry(hex_frame, width=80)
|
|
hex_files_entry.pack(pady=5)
|
|
tk.Button(hex_frame, text="Browse...").pack(pady=5)
|
|
tk.Button(hex_frame, text="Convert to Hex").pack(pady=10)
|
|
|
|
tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:", font=("Arial", 12)).pack(pady=5)
|
|
global start_dirs_entry
|
|
start_dirs_entry = tk.Entry(asm_frame, width=80)
|
|
start_dirs_entry.pack(pady=5)
|
|
tk.Button(asm_frame, text="Browse...").pack(pady=5)
|
|
tk.Button(asm_frame, text="Disassemble ELF Files").pack(pady=10)
|
|
|
|
button_frame3 = tk.Frame(frame3, bg="#f9f9f9")
|
|
button_frame3.pack(side="bottom", pady=10)
|
|
next_button3 = ttk.Button(button_frame3, text="Next ➡️", command=lambda: [update_progress(4), show_frame(frame4)], width=10)
|
|
next_button3.pack(side="right", padx=10)
|
|
back_button3 = ttk.Button(button_frame3, text="⬅️ Back", command=lambda: [update_progress(2), show_frame(frame2)], width=10)
|
|
back_button3.pack(side="left", padx=10)
|
|
|
|
# Frame 4 - Ransomware Detection
|
|
label4 = tk.Label(frame4, text="🛡️ Ransomware Detection", font=("Arial", 16, "bold"), bg="#f0f0f0", fg="#333")
|
|
label4.pack(pady=20)
|
|
|
|
directory_frame = tk.Frame(frame4, bg="#f0f0f0")
|
|
directory_frame.pack(pady=10)
|
|
|
|
selected_dir_label = tk.Label(directory_frame, text="No Directory Selected", width=50, bg="#f0f0f0", font=("Arial", 12), fg="#666")
|
|
selected_dir_label.grid(row=1, column=0, pady=10)
|
|
|
|
def select_directory():
|
|
# Placeholder for directory selection functionality
|
|
selected_dir_label.config(text="Selected Directory Path")
|
|
|
|
browse_button = ttk.Button(directory_frame, text="📂 Select Directory", command=select_directory)
|
|
browse_button.grid(row=0, column=0, pady=5)
|
|
|
|
status_label = tk.Label(frame4, text="", fg="blue", bg="#f0f0f0", font=("Arial", 12))
|
|
status_label.pack(pady=10)
|
|
|
|
button_frame4 = tk.Frame(frame4, bg="#f0f0f0")
|
|
button_frame4.pack(side="bottom", pady=15)
|
|
|
|
next_button4 = ttk.Button(button_frame4, text="Next ➡️", command=lambda: [update_progress(5), show_frame(frame5)], width=10)
|
|
next_button4.pack(side="right", padx=10)
|
|
back_button4 = ttk.Button(button_frame4, text="⬅️ Back", command=lambda: [update_progress(3), show_frame(frame3)], width=10)
|
|
back_button4.pack(side="left", padx=10)
|
|
|
|
# Frame 5 - AuditD Manager
|
|
label5 = tk.Label(frame5, text="🔍 AuditD Manager", font=("Arial", 16, "bold"), bg="#f0f0f0", fg="#333")
|
|
label5.pack(pady=20)
|
|
|
|
# Placeholder for AuditD Manager
|
|
audit_app = AuditDManagerApp(frame5) # Uncomment and define if AuditDManagerApp is available
|
|
audit_app.frame.pack(fill='both', expand=True, pady=10)
|
|
|
|
button_frame5 = tk.Frame(frame5, bg="#f0f0f0")
|
|
button_frame5.pack(side="bottom", pady=15)
|
|
|
|
next_button5 = ttk.Button(button_frame5, text="Next ➡️", command=lambda: show_frame(frame6), width=10)
|
|
next_button5.pack(side="right", padx=10)
|
|
back_button5 = ttk.Button(button_frame5, text="⬅️ Back", command=lambda: [update_progress(4), show_frame(frame4)], width=10)
|
|
back_button5.pack(side="left", padx=10)
|
|
|
|
# Frame 6 - Completion Screen
|
|
label6 = tk.Label(frame6, text="🎉 Setup Complete!", font=("Arial", 18, "bold"), bg="#ffffff")
|
|
label6.pack(pady=40)
|
|
desc_label6 = tk.Label(frame6, text="Thank you for using the setup wizard.", bg="#ffffff", font=("Arial", 12))
|
|
desc_label6.pack(pady=10)
|
|
finish_button = ttk.Button(frame6, text="➡️ Finish ⬅️", command=root.quit, width=10)
|
|
finish_button.pack(pady=10)
|
|
|
|
# Show the first frame
|
|
show_frame(frame1)
|
|
root.mainloop()
|
|
|
|
def on_closing():
|
|
root.quit()
|
|
|
|
if device_check():
|
|
# If device check is successful, initialize the Tkinter window
|
|
create_wizard_window()
|
|
else:
|
|
# If the device check fails, show an error message and exit
|
|
print("Device check failed. Exiting program.")
|
|
messagebox.showerror("Error", "Device check failed. The wizard will not start.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def create_wizard_window():
|
|
# global root
|
|
# root = tk.Tk()
|
|
# root.title("File Conversion and Disassembly Wizard")
|
|
# root.geometry("600x400")
|
|
# root.resizable(False, False)
|
|
|
|
# # Wizard frames
|
|
# frame1 = tk.Frame(root, bg="#f0f0f0")
|
|
# frame2 = tk.Frame(root, bg="#f0f0f0")
|
|
# frame3 = tk.Frame(root, bg="#f0f0f0")
|
|
# frame4 = tk.Frame(root, bg="#f0f0f0")
|
|
# frame5 = tk.Frame(root, bg="#f0f0f0")
|
|
# frame6 = tk.Frame(root, bg="#f0f0f0")
|
|
|
|
# frames = [frame1, frame2, frame3, frame4, frame5, frame6]
|
|
|
|
# current_step = 1
|
|
|
|
# def show_frame(frame):
|
|
# """Hide all frames and show only the specified one."""
|
|
# for frm in frames:
|
|
# frm.pack_forget()
|
|
# frame.pack(fill='both', expand=True)
|
|
|
|
# def update_progress(step):
|
|
# """Update the progress bar and label to reflect the current step."""
|
|
# progress_label.config(text=f"Step {step} of 5")
|
|
# progress_bar['value'] = (step / 5) * 100
|
|
|
|
# # Title bar frame for better aesthetics
|
|
# title_frame = tk.Frame(root, bg="#0078d7")
|
|
# title_frame.pack(fill="x", side="top")
|
|
|
|
# title_label = tk.Label(title_frame, text="Setup Wizard", font=("Arial", 14, "bold"), fg="white", bg="#0078d7")
|
|
# title_label.pack(pady=10)
|
|
|
|
# # Progress bar
|
|
# progress_bar = ttk.Progressbar(root, orient="horizontal", mode="determinate", length=400)
|
|
# progress_bar.pack(side="bottom", pady=10)
|
|
|
|
# progress_label = tk.Label(root, text="Step 1 of 5", font=("Arial", 12))
|
|
# progress_label.pack(side="bottom")
|
|
|
|
# # Frame 1 - Welcome Screen
|
|
# label1 = tk.Label(frame1, text="Welcome to the File Conversion Wizard", font=("Arial", 16), bg="#f0f0f0")
|
|
# label1.pack(pady=40)
|
|
# desc_label1 = tk.Label(frame1, text="This wizard will guide you through the steps.", bg="#f0f0f0", font=("Arial", 12))
|
|
# desc_label1.pack(pady=10)
|
|
|
|
# next_button1 = ttk.Button(frame1, text="Next", command=lambda: [update_progress(2), show_frame(frame2)])
|
|
# next_button1.pack(pady=10, side="bottom")
|
|
|
|
# # Frame 2 - Packet Capture UI
|
|
# label2 = tk.Label(frame2, text="Packet Capture Setup", font=("Arial", 16), bg="#f0f0f0")
|
|
# label2.pack(pady=40)
|
|
|
|
# # Insert your packet capture setup UI here
|
|
# setup_gui(frame2) # Assuming you have this function defined
|
|
|
|
# button_frame2 = tk.Frame(frame2, bg="#f0f0f0")
|
|
# button_frame2.pack(side="bottom", pady=10)
|
|
|
|
# next_button2 = ttk.Button(button_frame2, text="Next", command=lambda: [update_progress(3), show_frame(frame3)], width=10)
|
|
# next_button2.pack(side="right", padx=10)
|
|
|
|
# back_button2 = ttk.Button(button_frame2, text="Back", command=lambda: [update_progress(1), show_frame(frame1)], width=10)
|
|
# back_button2.pack(side="left", padx=10)
|
|
|
|
# # Frame 3 - Malware Analysis
|
|
# notebook = ttk.Notebook(frame3)
|
|
# notebook.pack(fill='both', expand=True)
|
|
|
|
# hex_frame = ttk.Frame(notebook)
|
|
# asm_frame = ttk.Frame(notebook)
|
|
# notebook.add(hex_frame, text='Hex Conversion')
|
|
# notebook.add(asm_frame, text='ELF Disassembly')
|
|
|
|
# tk.Label(hex_frame, text="Select Directories to Convert to Hex:", font=("Arial", 12)).pack(pady=5)
|
|
# global hex_files_entry
|
|
# hex_files_entry = tk.Entry(hex_frame, width=80)
|
|
# hex_files_entry.pack(pady=5)
|
|
# tk.Button(hex_frame, text="Browse...",command=browse_hex_directories).pack(pady=5)
|
|
# tk.Button(hex_frame, text="Convert to Hex",command=run_hex_conversion).pack(pady=10)
|
|
|
|
# tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:", font=("Arial", 12)).pack(pady=5)
|
|
# global start_dirs_entry
|
|
# start_dirs_entry = tk.Entry(asm_frame, width=80)
|
|
# start_dirs_entry.pack(pady=5)
|
|
# tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5)
|
|
# tk.Button(asm_frame, text="Disassemble ELF Files",command=run_disassembly).pack(pady=10)
|
|
|
|
# button_frame3 = tk.Frame(frame3, bg="#f0f0f0")
|
|
# button_frame3.pack(side="bottom", pady=10)
|
|
|
|
# next_button3 = ttk.Button(button_frame3, text="Next", command=lambda: [update_progress(4), show_frame(frame4)], width=10)
|
|
# next_button3.pack(side="right", padx=10)
|
|
|
|
# back_button3 = ttk.Button(button_frame3, text="Back", command=lambda: [update_progress(2), show_frame(frame2)], width=10)
|
|
# back_button3.pack(side="left", padx=10)
|
|
|
|
# # Frame 4 - Ransomware Detection
|
|
# label4 = tk.Label(frame4, text="Ransomware Detection", font=("Arial", 16), bg="#f0f0f0")
|
|
# label4.pack(pady=40)
|
|
|
|
# directory_frame = tk.Frame(frame4, bg="#f0f0f0")
|
|
# directory_frame.pack(pady=10)
|
|
|
|
# selected_dir_label = tk.Label(directory_frame, text="No Directory Selected", width=40, bg="#f0f0f0")
|
|
# selected_dir_label.grid(row=1, column=0)
|
|
|
|
# # Directory label
|
|
|
|
|
|
# def select_directory():
|
|
# directory = browse_directory()
|
|
# if directory:
|
|
# selected_dir_label.config(text=directory)
|
|
# # Start the watchdog observer
|
|
|
|
# browse_button = ttk.Button(directory_frame, text="Select Directory", command=select_directory)
|
|
# browse_button.grid(row=0, column=0)
|
|
|
|
# status_label = tk.Label(frame4, text="", fg="blue", bg="#f0f0f0")
|
|
# status_label.pack(pady=10)
|
|
|
|
# run_button = ttk.Button(frame4, text="Run Predictions", command=lambda: run_predictions(selected_dir_label.cget("text"), status_label))
|
|
# run_button.pack(pady=10)
|
|
|
|
# button_frame4 = tk.Frame(frame4, bg="#f0f0f0")
|
|
# button_frame4.pack(side="bottom", pady=10)
|
|
|
|
# next_button4 = ttk.Button(button_frame4, text="Next", command=lambda: [update_progress(5), show_frame(frame5)], width=10)
|
|
# next_button4.pack(side="right", padx=10)
|
|
|
|
# back_button4 = ttk.Button(button_frame4, text="Back", command=lambda: [update_progress(3), show_frame(frame3)], width=10)
|
|
# back_button4.pack(side="left", padx=10)
|
|
|
|
# # Frame 5 - AuditD Manager
|
|
# audit_app = AuditDManagerApp(frame5)
|
|
# audit_app.frame.pack(fill='both', expand=True)
|
|
|
|
# # tk.Label(frame5, text="AuditD Manager", font=("Arial", 16), bg="#f0f0f0").pack(pady=40)
|
|
|
|
|
|
# button_frame5 = tk.Frame(frame5, bg="#f0f0f0")
|
|
# button_frame5.pack(side="bottom", pady=10)
|
|
|
|
# next_button5 = ttk.Button(button_frame5, text="Next", command=lambda: show_frame(frame6), width=10)
|
|
# next_button5.pack(side="right", padx=10)
|
|
|
|
# back_button5 = ttk.Button(button_frame5, text="Back", command=lambda: [update_progress(4), show_frame(frame4)], width=10)
|
|
# back_button5.pack(side="left", padx=10)
|
|
|
|
# # Frame 6 - Summary
|
|
# label6 = tk.Label(frame6, text="Setup Complete!", font=("Arial", 16), bg="#f0f0f0")
|
|
# label6.pack(pady=40)
|
|
# desc_label6 = tk.Label(frame6, text="Thank you for using the setup wizard.", bg="#f0f0f0", font=("Arial", 12))
|
|
# desc_label6.pack(pady=10)
|
|
|
|
# finish_button = ttk.Button(frame6, text="Finish", command=root.quit, width=10)
|
|
# finish_button.pack(pady=10)
|
|
|
|
# # Show the first frame
|
|
# show_frame(frame1)
|
|
|
|
# root.mainloop()
|
|
|
|
|
|
# def on_closing():
|
|
# root.quit()
|
|
|
|
# if __name__ == "__main__":
|
|
|
|
# root = tk.Tk()
|
|
# create_wizard_window()
|
|
|
|
|
|
|