import os import time import logging import subprocess import tkinter as tk from tkinter import filedialog, messagebox, ttk from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import threading import pandas as pd import pickle import numpy as np from sklearn.preprocessing import MinMaxScaler import sys import codecs import pickle import csv import pickle import psutil from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from tkinter import simpledialog import pyshark import psutil import joblib from sklearn.preprocessing import StandardScaler import sklearn.ensemble._forest from threading import Thread, Event #hardware check # Function to run device check before showing the wizard window def device_check(): try: subprocess.run(['python3', 'intaller.py'], check=True) return True except subprocess.CalledProcessError as e: print(f"Error running device check script: {e}") return False # Global variable for thread control stop_event = Event() value = False # Important features and weights as provided important_features = [ 'pktcount', 'byteperflow', 'tot_kbps', 'rx_kbps', 'flows', 'bytecount', 'tot_dur', 'Protocol_ICMP', 'Protocol_TCP', 'Protocol_UDP', ] # Drop features you don't need based on what you used in training # Drop features you don't need based on what you used in training drop_features = ['src', 'dst', 'dt', 'dur', 'pktrate', 'pktperflow', 'Protocol_HTTP', 'Protocol_HTTPS', 'Protocol_SSH', 'Protocol_DHCP', 'Protocol_FTP', 'Protocol_SMTP', 'Protocol_POP3', 'Protocol_IMAP', 'Protocol_DNS'] # Automatically detect active network interface def get_active_interface(): interfaces = psutil.net_if_addrs() for interface, addrs in interfaces.items(): for addr in addrs: if addr.family == 2: # AF_INET (IPv4) if addr.address != '127.0.0.1': # Skip localhost (lo) return interface raise Exception("No active interface found") # Preprocessing function to extract specific features from packets def preprocess_packet(packet): try: if float(packet.frame_info.time_delta) < 1: byteperflow = float(packet.length) else: byteperflow = float(packet.length) / float(packet.frame_info.time_delta) # Capture IP or IPv6 addresses src_ip = None dst_ip = None if hasattr(packet, 'ip'): src_ip = packet.ip.src dst_ip = packet.ip.dst elif hasattr(packet, 'ipv6'): src_ip = packet.ipv6.src dst_ip = packet.ipv6.dst if src_ip and ':' in src_ip: return None # Capture protocol layer protocol = packet.highest_layer # Add flags for common protocols protocol_icmp = 1 if protocol == "ICMP" else 0 protocol_tcp = 1 if protocol == "TCP" else 0 protocol_udp = 1 if protocol == "UDP" else 0 features = { 'pktcount': int(packet.length), 'byteperflow': byteperflow, 'tot_kbps': float(packet.length) / 1000.0, 'rx_kbps': float(packet.length) / 1000.0, 'flows': 1, 'bytecount': float(packet.length), 'tot_dur': float(packet.frame_info.time_delta), 'Protocol_ICMP': protocol_icmp, 'Protocol_TCP': protocol_tcp, 'Protocol_UDP': protocol_udp, 'src_ip': src_ip, 'dst_ip': dst_ip, 'probability': 0.0 } return pd.DataFrame([features]) except AttributeError: return None def prepare_X_test(packets_list, drop_features): return None def send_prediction(file_path): url = "http://127.0.0.1:8000/ddos-predictions/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") def make_predictions(X_test, X): logistic_regression_model = joblib.load('logistic_regression_model.pkl') svm_model = joblib.load('svm_model.pkl') knn_model = joblib.load('knn_model.pkl') decision_tree_model = joblib.load('decision_tree_model.pkl') random_forest_model = joblib.load('random_forest_model.pkl') scaler = StandardScaler() X_test_scaled = scaler.fit_transform(X_test) models = { 'Logistic Regression': logistic_regression_model, 'SVM': svm_model, 'KNN': knn_model, 'Decision Tree': decision_tree_model, 'Random Forest': random_forest_model } all_predictions = [] for model_name, model in models.items(): y_pred = model.predict(X_test_scaled) all_predictions.append(y_pred) transposed_predictions = list(zip(*all_predictions)) i = 0 for row in transposed_predictions: row_sum = sum(row) avg = row_sum / 5 X.loc[i, 'probability'] = avg i += 1 with open('predictions.csv', mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=X.keys()) writer.writeheader() for index, row in X.iterrows(): writer.writerow(row.to_dict()) try: send_prediction("predictions.csv") except: print("could not connect to server") def capture_packets(interface=None): try: subprocess.check_call(['sudo', 'apt', 'install', '-y', 'tshark']) print("tshark installed successfully.") except subprocess.CalledProcessError: print("Failed to install tshark. Please install it manually.") if interface is None: interface = get_active_interface() capture = pyshark.LiveCapture(interface=interface, tshark_path='/usr/bin/tshark') try: # print("here") # capture.sniff(timeout=60) while value: # print(value) packets_list = [] if stop_event.is_set(): break # print("c") count = 0 # print(packets_list) for packet in capture: # print("Packet", packet) # if(count == 15): # break # print(f"Packet No. - {count} Received!") try: processed_packet = preprocess_packet(packet) if processed_packet is not None: # print(processed_packet["dst_ip"]) # print(processed_packet["src_ip"]) if ":" in processed_packet["dst_ip"] or ":" in processed_packet["src_ip"]: print("packet isn't correct") continue # print(processed_packet) packets_list.append(processed_packet) count+=1 print(count, len(packets_list)) # X_test_scaled = prepare_X_test(packets_list, drop_features) if len(packets_list) >= 1: X_test = pd.concat(packets_list, ignore_index=True) X_test_scaled = X_test.drop(drop_features, axis=1, errors='ignore') X_test_scaled = X_test_scaled.reindex(columns=important_features, fill_value=0) if X_test_scaled is not None: make_predictions(X_test_scaled,X_test) time.sleep(10) except AttributeError as e: print(f"Error processing packet: {e}") print("Packets being Captured..!") except KeyboardInterrupt: print("\nPacket capturing stopped.") def start_capture(): global thread if os.geteuid() != 0: root.withdraw() # Hide the main window password = simpledialog.askstring("Password", "Enter your sudo password and run again:", show='*') if password: try: subprocess.run(['sudo', '-S', sys.executable] + sys.argv, input=password.encode(), check=True) except subprocess.CalledProcessError: messagebox.showerror("Error", "Failed to run the script with sudo.") finally: root.destroy() else: messagebox.showerror("Error", "No password provided. Unable to run with sudo.") elif not stop_event.is_set(): global value value = True stop_event.clear() thread = Thread(target=capture_packets) thread.start() start_button.config(state=tk.DISABLED) stop_button.config(state=tk.NORMAL) def stop_capture(): global value value = False stop_event.set() if thread.is_alive(): thread.join() start_button.config(state=tk.NORMAL) stop_button.config(state=tk.DISABLED) root.destroy() def setup_gui(frame): global start_button, stop_button, thread start_button = tk.Button(frame, text="Start Capture", command=start_capture) start_button.pack(pady=20) stop_button = tk.Button(frame, text="Stop Capture", command=stop_capture, state=tk.DISABLED) stop_button.pack(pady=20) #rensoomware tested import os import subprocess import sys import csv import pickle import pandas as pd import tkinter as tk from tkinter import filedialog from tkinter import messagebox import requests import psutil from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler value = True # Function to get CPU and memory usage percentages def get_usage_percentages(): cpu_usage = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent return cpu_usage, memory_percent # Function to send usage data to the API def send_data_to_api(cpu_usage, memory_usage): api_url = 'http://127.0.0.1:8000/usage_log/' # Update this if needed payload = { 'cpu_usage': cpu_usage, 'memory_usage': memory_usage } try: response = requests.post(api_url, json=payload) if response.status_code == 201: print("Data logged successfully.") else: print("Failed to log data:", response.json()) except Exception as e: print("Error while sending data:", str(e)) # Function to send ransomware prediction data to the API def send_predictions_to_api(file_path): url = "http://127.0.0.1:8000/ransomware-type-predictions/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") # Function to browse directory using Tkinter def browse_directory(): """Open a dialog to choose a directory.""" root = tk.Tk() root.withdraw() # Hide the root window directory = filedialog.askdirectory(title="Select a Directory") return directory # Function to calculate the MD5 hash of a file def md5_hash(file_path): result = subprocess.run(['md5sum', file_path], capture_output=True, text=True) return result.stdout.split()[0] # Extract machine type from ELF file def get_machine_type(file_path): try: result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True) for line in result.stdout.splitlines(): if 'Machine:' in line: return line.split(':')[1].strip() except Exception as e: print(f"Error getting machine type: {e}") return None # Extract number of sections from ELF file def get_number_of_sections(file_path): try: result = subprocess.run(['readelf', '-h', file_path], capture_output=True, text=True) for line in result.stdout.splitlines(): if 'Number of section headers:' in line: return int(line.split(':')[1].strip()) except Exception as e: print(f"Error getting number of sections: {e}") return None # Extract resource size from ELF file def get_resource_size(file_path): try: result = subprocess.run(['readelf', '-S', file_path], capture_output=True, text=True) for line in result.stdout.splitlines(): if '.rodata' in line: size_hex = line.split()[5] return int(size_hex, 16) except Exception as e: print(f"Error getting resource size: {e}") return 0 # Extract linker version from ELF file def get_linker_version(file_path): try: result = subprocess.run(['objdump', '-p', file_path], capture_output=True, text=True) for line in result.stdout.splitlines(): if 'Version:' in line: version = line.split(':')[1].strip() major_version = version.split('.')[0] minor_version = version.split('.')[1] if '.' in version else '0' return major_version, minor_version except Exception as e: print(f"Error getting linker version: {e}") return None, None # Extract dynamic linking information from ELF file def get_dynamic_info(file_path): try: result = subprocess.run(['readelf', '-d', file_path], capture_output=True, text=True) dynamic_info = [] for line in result.stdout.splitlines(): dynamic_info.append(line) return dynamic_info except Exception as e: print(f"Error getting dynamic linking info: {e}") return None # Function to extract features from an ELF file def extract_features(file_path): features = { 'FileName': file_path, 'md5Hash': md5_hash(file_path), 'Machine': get_machine_type(file_path), 'NumberOfSections': get_number_of_sections(file_path), 'ResourceSize': get_resource_size(file_path), 'LinkerVersionMajor': None, 'LinkerVersionMinor': None, 'DynamicInfo': get_dynamic_info(file_path) } # Get linker version major_version, minor_version = get_linker_version(file_path) features['LinkerVersionMajor'] = major_version features['LinkerVersionMinor'] = minor_version return features # Function to find ELF files in the selected directory def find_elf_files(start_dir, status_label): if not os.path.isdir(start_dir): return try: find_command = [ 'find', start_dir, '(', '-path', '/proc', '-o', '-path', '/sys', '-o', '-path', '/run', ')', '-prune', '-o', '-type', 'f', '-print' ] find_result = subprocess.run(find_command, capture_output=True, text=True, check=False) if find_result.returncode != 0: print(f"Error running find command: {find_result.stderr}", file=sys.stderr) return file_paths = find_result.stdout.splitlines() print(f"Found files in {start_dir}:") print(file_paths) for file_path in file_paths: try: file_command = ['file', '--mime-type', file_path] file_result = subprocess.run(file_command, capture_output=True, text=True, check=True) if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout: status_label.config(text=f"Processing: {os.path.basename(file_path)}") status_label.update() if os.path.exists(file_path): try: extracted_features = extract_features(file_path) except: print("Error Reading File" + file_path) continue with open('data.csv', 'a', newline='') as file: writer = csv.writer(file) writer.writerow(extracted_features.values()) else: print("File not found!") except subprocess.CalledProcessError as e: print(f"Error running file command on {file_path}: {e}", file=sys.stderr) except Exception as e: print(f"Error processing directory {start_dir}: {e}", file=sys.stderr) # Function to load the model def load_model(): with open('model.pkl', 'rb') as f: model = pickle.load(f) return model # Function to predict ransomware based on features def predict_ransomware(file_path): model = load_model() # Extract features from the given file features = extract_features(file_path) feature_df = pd.DataFrame([features]) # Predict ransomware type prediction = model.predict(feature_df.drop(columns=['FileName', 'md5Hash', 'Machine', 'DynamicInfo'])) return prediction[0] # Function to run predictions on selected ELF files def run_predictions(selected_dir, status_label): csv_filename = "data.csv" with open(csv_filename, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(['FileName', 'md5Hash', 'Machine', 'NumberOfSections', 'ResourceSize', 'LinkerVersionMajor', 'LinkerVersionMinor', 'DynamicInfo']) # Write header if file is new csv_filename = "predictions.csv" with open(csv_filename, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(['filename', 'predicted_class']) # Write header if file is new if selected_dir: status_label.config(text="Processing...") status_label.update() find_elf_files(selected_dir, status_label) with open("data.csv", "r") as file: reader = csv.DictReader(file) files = [row['FileName'] for row in reader] for ransomware_file in files: print(ransomware_file) result = predict_ransomware(ransomware_file) csv_filename = "predictions.csv" file_exists = os.path.exists(csv_filename) with open(csv_filename, mode='a', newline='') as file: writer = csv.writer(file) if not file_exists: writer.writerow(['filename', 'predicted_class']) writer.writerow([os.path.basename(ransomware_file), result]) status_label.config(text="Predictions Saved") try: send_predictions_to_api("predictions.csv") except: print("Connection to API failed") try: cpu_percent, memory_percent = get_usage_percentages() send_data_to_api(cpu_percent, memory_percent) except: print("Connection to API failed") else: status_label.config(text="No directory selected") global value if(value): value = False print("VALLLLLL") start_watchdog(selected_dir, status_label) # Watchdog Event Handler class WatcherHandler(FileSystemEventHandler): def __init__(self, status_label): self.status_label = status_label def on_modified(self, event): if event.is_directory: return else: print(f"File modified: {event.src_path}") run_predictions(os.path.dirname(event.src_path), self.status_label) def on_created(self, event): if event.is_directory: return else: print(f"File created: {event.src_path}") run_predictions(os.path.dirname(event.src_path), self.status_label) # Function to start the watchdog observer def start_watchdog(directory, status_label): event_handler = WatcherHandler(status_label) observer = Observer() observer.schedule(event_handler, path=directory, recursive=True) observer.start() print(f"Started monitoring {directory} for changes.") return observer # Entry point to run predictions for selected directory if __name__ == "__main__": selected_directory = sys.argv[1] if len(sys.argv) > 1 else None if selected_directory: run_predictions(selected_directory) else: print("Please specify a directory.") #remsomwareaudit import tkinter as tk from tkinter import messagebox import subprocess import os import csv import inotify_simple import threading import time import re import requests from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from collections import defaultdict import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler import tensorflow as tf from sklearn.metrics import accuracy_score, confusion_matrix, classification_report from datetime import datetime permission_operations = None # Define event masks manually IN_CREATE = 0x00000100 IN_DELETE = 0x00000200 IN_MODIFY = 0x00000002 IN_OPEN = 0x00000020 IN_ISDIR = 0x40000000 #################### TEST_DATA_PATH = 'combined_log_summary.csv' VARIABLE_NAMES_PATH = 'output.txt' def predict_ransomware(): # Load the trained model model = tf.keras.models.load_model('updated_ransomware_classifier.h5') # Load and prepare test data # Read variable names with open(VARIABLE_NAMES_PATH, encoding='utf-8') as f: columns = [line.split(';')[1].strip() for line in f] # Load test data data = pd.read_csv(TEST_DATA_PATH, header=None, names=columns) # Check and clean column names data.columns = data.columns.str.strip() X = data # Standardize the features scaler = StandardScaler() X = scaler.fit_transform(X) # Make predictions predictions = model.predict(X) predicted_labels = (predictions > 0.5).astype(int) # Convert predictions to "Yes" or "No" predicted_labels_text = ['Yes' if label == 1 else 'No' for label in predicted_labels.flatten()] # Get current timestamp timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') # Save predictions and true labels to a CSV file with timestamp output_df = pd.DataFrame({ 'Timestamp': [timestamp] * len(predicted_labels_text), # Add timestamp column 'Predicted Label': predicted_labels_text }) output_file = f'prediction.csv' output_df.to_csv(output_file, index=False) print(f"Predictions saved to {output_file} ({timestamp})") def send_predictions_to_api(file_path): url = "http://142.93.221.85:8000/predict-malware/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") try: send_predictions_to_api(output_file) except: print("Error Connection Server") #################### ID = 0 is_flip = 0 flipped = False class PermissionChangeHandler(FileSystemEventHandler): def __init__(self): super().__init__() self.file_types = set() def get_file_extension(self, file_path): """Extracts the file extension from the file path.""" _, ext = os.path.splitext(file_path) return ext.strip(".") # Strip the dot from the extension def on_modified(self, event): if not event.is_directory: file_path = event.src_path file_extension = self.get_file_extension(file_path) # Collect all file types file_types = set() for operations in permission_operations.values(): for key in operations: match = re.search(r'\.\w+$', key) if match: file_types.add(match.group().strip('.')) if file_extension in file_types: current_permissions = oct(os.stat(file_path).st_mode & 0o777) # Check all operations (chmod/chown) for this file type for operation, perms in permission_operations.items(): for key in perms: if file_extension in key: perms[key] += 1 # print(f"Updated {operation} for {file_extension}: {perms[key]}") class AuditDManagerApp: def __init__(self, frame): self.frame = frame # self.root = frame # self.root.title("AuditD Manager") # self.root.geometry("400x350") # # Adjusted for additional widget self.frame.pack(fill='both', expand=True) # Create Widgets self.install_button = tk.Button(self.frame , text="Install AuditD", command=self.install_auditd) self.install_button.pack(pady=10) self.start_button = tk.Button(self.frame , text="Start AuditD", command=self.start_auditd) self.start_button.pack(pady=10) self.stop_button = tk.Button(self.frame , text="Stop AuditD", command=self.stop_auditd) self.stop_button.pack(pady=10) self.status_button = tk.Button(self.frame, text="Check Status", command=self.check_status) self.status_button.pack(pady=10) # Add Text Entry for Watch Path # Initialize monitoring flags and data structures self.monitoring = False self.log_file = "/var/log/audit/audit.log" self.combined_csv_file = "combined_log_summary.csv" self.monitored_files_set = { 'bash.bashrc', 'bash_completion.d', 'environment', 'fstab', 'fwupd', 'group', 'host.conf', 'hosts', 'init.d', 'inputrc', 'ld.so.cache', 'locale.alias', 'locale.conf', 'login.defs', 'machine-id', 'modprobe.d', 'nsswitch.conf', 'passwd', 'sensors.d', 'sensors3.conf', 'shadow', 'shells', 'sudo.conf', 'sudoers', 'sudoers.d' } self.log_counts = {key: 0 for key in [ 'Id','PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT', 'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ', 'LOGIN', 'SERVICE_START', 'SERVICE_STOP']} # Track file extensions self.ext_count = {ext: {'modified': 0, 'created': 0, 'deleted': 0, 'opened': 0} for ext in [ '.db', '.AR', '.01', '.GIF', '.TXT', '.scc', '.dat', '.bmp', '.STF', '.scf', '.exe', '.typelib', '.cl', '.ocx', '.xml', '.json', '.csv', '.html', '.css', '.js', '.py', '.log', '.sql', '.pdf', '.doc', '.docx', '.ppt', '.pptx', '.xlsx', '.jpg', '.jpeg', '.png', '.mp4', '.mp3', '.zip', '.tar', '.gz', '.rar', '.7z', '.apk', '.iso']} # Track permission operations global permission_operations permission_operations = { 'chmod': {f'chmod{perm}{ext}': 0 for perm in ['644', '755', '777'] for ext in self.ext_count}, 'chown': {f'chown{owner}{ext}': 0 for owner in ['user', 'group'] for ext in self.ext_count}, 'chgrp': {f'chgrp{group}{ext}': 0 for group in ['staff', 'admin'] for ext in self.ext_count} } # Directory operations tracking self.directory_count = {'created': 0, 'deleted': 0, 'modified': 0, 'opened': 0} # Initialize inotify self.inotify = inotify_simple.INotify() self.EVENT_MASKS = IN_CREATE | IN_DELETE | IN_MODIFY | IN_OPEN | IN_ISDIR self.watch_path = '/etc' # Default path, will be updated self.watch_descriptor2 = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS) # Observer for filesystem events self.observer = None self.event_handler = None self.monitor_thread = threading.Thread(target=self.monitor_logs) # Initialize file monitoring data self.open_count = defaultdict(int) def run_command(self, command, success_message, error_message): try: result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) messagebox.showinfo("Success", success_message) except subprocess.CalledProcessError as e: messagebox.showerror("Error", f"{error_message}\n\n{e.stderr.decode()}") def prompt_for_password(self, command, success_message, error_message): password_window = tk.Toplevel(self.root) password_window.title("Enter Sudo Password") tk.Label(password_window, text="Enter your sudo password:").pack(pady=10) password_entry = tk.Entry(password_window, show="*") password_entry.pack(pady=5) def on_submit(): password = password_entry.get() password_window.destroy() if not password: messagebox.showwarning("Input Error", "Please enter your sudo password.") return full_command = f"echo {password} | sudo -S {command}" self.run_command(full_command, success_message, error_message) tk.Button(password_window, text="Submit", command=on_submit).pack(pady=10) def install_auditd(self): command = "sudo apt-get update && sudo apt-get install -y auditd" self.prompt_for_password(command, "AuditD installed successfully!", "Failed to install AuditD.") def start_auditd(self): command = "sudo systemctl start auditd" self.prompt_for_password(command, "AuditD started successfully!", "Failed to start AuditD.") self.start_monitoring() def stop_auditd(self): command = "sudo systemctl stop auditd" self.prompt_for_password(command, "AuditD stopped successfully!", "Failed to stop AuditD.") self.stop_monitoring() def check_status(self): command = "systemctl status auditd" try: result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) status = result.stdout.decode() messagebox.showinfo("AuditD Status", status) except subprocess.CalledProcessError as e: messagebox.showerror("Error", f"Failed to check status of AuditD.\n\n{e.stderr.decode()}") def start_monitoring(self): self.monitoring = True if not self.monitor_thread.is_alive(): self.monitor_thread = threading.Thread(target=self.monitor_logs) self.monitor_thread.start() # Get the user-defined watch path self.watch_path = '/etc' # Default to root if empty self.watch_descriptor = self.inotify.add_watch(self.watch_path, self.EVENT_MASKS) # Start filesystem event monitoring if self.observer is None: self.event_handler = PermissionChangeHandler() self.observer = Observer() self.observer.schedule(self.event_handler, '/home', recursive=True) self.observer.start() def stop_monitoring(self): self.monitoring = False if self.monitor_thread.is_alive(): self.monitor_thread.join() # Stop filesystem event monitoring if self.observer: self.observer.stop() self.observer.join() def monitor_logs(self): while self.monitoring: if os.path.exists(self.log_file): with open(self.log_file, 'r') as f: lines = f.readlines() for line in lines: if 'type=' in line: log_type = line.split('type=')[1].split(' ')[0] if log_type in self.log_counts: self.log_counts[log_type] += 1 self.update_csv() self.monitor_extensions() predict_ransomware() time.sleep(5) # Sleep for one second before the next update def update_csv(self): # headers = [ # 'Id' ,'PROCTITLE', 'AVC', 'SYSCALL', 'USER_AUTH', 'USER_ACCT', # 'USER_CMD', 'CRED_REFR', 'USER_START', 'USER_AVC', 'USER_END', 'CRED_DISP', 'CRED_ACQ', # 'LOGIN', 'SERVICE_START', 'SERVICE_STOP' # ] + [f'chmod{perm}{ext}' for perm in ['644', '755', '777'] for ext in self.ext_count] + \ # [f'chown{owner}{ext}' for owner in ['user', 'group'] for ext in self.ext_count] + \ # [f'chgrp{group}{ext}' for group in ['staff', 'admin'] for ext in self.ext_count] + \ # [f'Modified({ext})' for ext in self.ext_count] + \ # [f'Created({ext})' for ext in self.ext_count] + \ # [f'Deleted({ext})' for ext in self.ext_count] + \ # [f'Opened({ext})' for ext in self.ext_count] + \ # ['Directories Created', 'Directories Deleted', 'Directories Modified', 'Directories Opened']+ \ # list(self.monitored_files_set) global ID ID += 1 global is_flip global flipped if flipped: is_flip = 1 flipped = False else: is_flip = 0 flipped = True row = [ ID, self.log_counts.get('PROCTITLE', 0), self.log_counts.get('AVC', 0), self.log_counts.get('SYSCALL', 0), self.log_counts.get('USER_AUTH', 0), self.log_counts.get('USER_ACCT', 0), self.log_counts.get('USER_CMD', 0), self.log_counts.get('CRED_REFR', 0), self.log_counts.get('USER_START', 0), self.log_counts.get('USER_AVC', 0), self.log_counts.get('USER_END', 0), self.log_counts.get('CRED_DISP', 0), self.log_counts.get('CRED_ACQ', 0), self.log_counts.get('LOGIN', 0), self.log_counts.get('SERVICE_START', 0), self.log_counts.get('SERVICE_STOP', 0), ] # print(permission_operations['chmod']) # Add permission operations and extensions row.extend(permission_operations['chmod'].values()) row.extend(permission_operations['chown'].values()) row.extend(permission_operations['chgrp'].values()) # Add extension counts for modification, creation, deletion, and opening for ext in self.ext_count: row.extend([ self.ext_count[ext]['modified'], self.ext_count[ext]['created'], self.ext_count[ext]['deleted'], self.ext_count[ext]['opened'], ]) # Add directory counts row.extend([ self.directory_count['created'], self.directory_count['deleted'], self.directory_count['modified'], self.directory_count['opened'] ]) # Add monitored files open counts row.extend(self.open_count.get(file, 0) for file in sorted(self.monitored_files_set)) # Write to CSV, append if file exists file_exists = os.path.isfile(self.combined_csv_file) with open(self.combined_csv_file, 'a', newline='') as csv_file: writer = csv.writer(csv_file) if not file_exists: pass writer.writerow(row) def monitor_extensions(self): events = self.inotify.read(timeout=100000) for event in events: (_, event_types, _, filename) = event filename = event.name ext = os.path.splitext(filename)[1] if ext in self.ext_count: if event.mask & IN_CREATE: self.ext_count[ext]['created'] += 1 if event.mask & IN_DELETE: self.ext_count[ext]['deleted'] += 1 if event.mask & IN_MODIFY: self.ext_count[ext]['modified'] += 1 if event.mask & IN_OPEN: self.ext_count[ext]['opened'] += 1 if filename in self.monitored_files_set: self.open_count[filename] += 1 if event.mask & IN_ISDIR: if event.mask & IN_CREATE: self.directory_count['created'] += 1 if event.mask & IN_DELETE: self.directory_count['deleted'] += 1 if event.mask & IN_MODIFY: self.directory_count['modified'] += 1 if event.mask & IN_OPEN: self.directory_count['opened'] += 1 #malwaretested import os import time import logging import subprocess import tkinter as tk from tkinter import filedialog, messagebox, ttk from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import threading import pandas as pd import pickle import numpy as np from sklearn.preprocessing import MinMaxScaler import sys import os import pandas as pd import numpy as np import codecs import pickle import requests isMonitoring = False output_directory = "outputs" bytes_output_directory = "outputs/bytes_output" asm_output_directory = "outputs/asm_output" result_folder = "results" bytes_result_directory = "results/bytes_result" asm_result_directory = "results/asm_result" bytes_model_directory = "bytes_models" asm_model_directory = "asm_models" if not os.path.exists(asm_model_directory) or not os.path.exists(bytes_model_directory): messagebox.showinfo("Error", "Models Not Found for Prediction") exit(-1) if not os.path.exists(output_directory): os.makedirs(output_directory) if not os.path.exists(asm_output_directory): os.makedirs(asm_output_directory) if not os.path.exists(bytes_output_directory): os.makedirs(bytes_output_directory) if not os.path.exists(result_folder): os.makedirs(result_folder) if not os.path.exists(asm_result_directory): os.makedirs(asm_result_directory) if not os.path.exists(bytes_result_directory): os.makedirs(bytes_result_directory) logging.basicConfig(level=logging.INFO) def send_predictions_to_api(file_path): url = "http://142.93.221.85:8000/predict-malware/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") def send_asm_predictions_to_api(file_path): url = "http://142.93.221.85:8000/predict-malware/" with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, files=files) if response.status_code == 200: print(f"Successfully sent {file_path} to API.") else: print(f"Failed to send {file_path} to API. Status code: {response.status_code}") def format_bytes_to_hex(data): hex_dump = "" for i in range(0, len(data), 16): chunk = data[i:i+16] hex_values = " ".join(f"{byte:02X}" for byte in chunk) address = f"{i:08X}" hex_dump += f"{address} {hex_values}\n" return hex_dump def convert_file_to_hex(input_file, output_file): try: with open(input_file, 'rb') as f: data = f.read() hex_dump = format_bytes_to_hex(data) with open(output_file, 'w') as f: f.write(hex_dump) logging.info(f"Converted '{input_file}' to hex dump and saved to '{output_file}'") except Exception as e: logging.error(f"Error converting '{input_file}': {e}") def scan_and_convert_directory(directory, output_dir): for root, _, files in os.walk(directory, followlinks=True): for filename in files: input_file = os.path.join(root, filename) if not filename.endswith(".bytes"): output_file = os.path.join(output_dir, f"{filename}.bytes") if not os.path.exists(output_file): convert_file_to_hex(input_file, output_file) class FileChangeHandler(FileSystemEventHandler): def __init__(self, output_dir, hex_dirs, disasm_dirs): self.output_dir = output_dir self.hex_dirs = hex_dirs self.disasm_dirs = disasm_dirs super().__init__() def on_created(self, event): if not event.is_directory: input_file = event.src_path output_file_hex = os.path.join(bytes_output_directory, f"{os.path.basename(input_file)}.bytes") if not os.path.exists(output_file_hex): # Convert to hex in a new thread threading.Thread(target=self.run_hex_conversion, args=(input_file, output_file_hex)).start() threading.Thread(target=self.run_disassembly, args=(input_file,)).start() # Disassemble in a new thread def run_hex_conversion(self, input_file, output_file): convert_file_to_hex(input_file, output_file) run_malware_ai_analysis_bytes() def run_disassembly(self, file_path): try: print(f"Disassembling {file_path}") result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True) assembly_code = result.stdout base_name = os.path.basename(file_path) if not file_path.endswith(".asm"): asm_file_name = f"{base_name}.asm" asm_file_path = os.path.join(asm_output_directory, asm_file_name) with open(asm_file_path, "w") as asm_file: asm_file.write(assembly_code) print(f"Disassembly complete. Assembly code saved to {asm_file_path}") run_malware_analysis_asm() except subprocess.CalledProcessError as e: print(f"Error disassembling file {file_path}: {e}", file=sys.stderr) def monitor_directories(directories, output_dir): event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories) observer = Observer() for directory in directories: observer.schedule(event_handler, path=directory, recursive=True) logging.info(f"Monitoring directory: {directory}") observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() def start_observer(directories, output_dir): observer = Observer() event_handler = FileChangeHandler(output_dir, hex_dirs=directories, disasm_dirs=directories) for directory in directories: observer.schedule(event_handler, path=directory, recursive=True) logging.info(f"Monitoring directory: {directory}") observer.start() return observer def disassemble_elf(file_path, output_dir): try: print(f"Disassembling {file_path}") result = subprocess.run(['objdump', '-d', file_path], capture_output=True, text=True, check=True) assembly_code = result.stdout base_name = os.path.basename(file_path) if not file_path.endswith(".asm"): asm_file_name = f"{base_name}.asm" asm_file_path = os.path.join(output_dir, asm_file_name) with open(asm_file_path, "w") as asm_file: asm_file.write(assembly_code) print(f"Disassembly complete. Assembly code saved to {asm_file_path}") except subprocess.CalledProcessError as e: print(f"Error disassembling file {file_path}: {e}", file=sys.stderr) def find_elf_files(start_dirs): elf_files = [] for start_dir in start_dirs: if not os.path.isdir(start_dir): continue try: find_command = ['find', start_dir, '-path', '/proc', '-prune', '-o', '-path', '/sys', '-prune', '-o', '-path', '/run', '-prune', '-o', '-type', 'f', '-print'] find_result = subprocess.run(find_command, capture_output=True, text=True, check=False) if find_result.returncode != 0: print(f"Error running find command: {find_result.stderr}", file=sys.stderr) continue file_paths = find_result.stdout.splitlines() print(f"Found files in {start_dir}:") print(file_paths) for file_path in file_paths: try: file_command = ['file', '--mime-type', file_path] file_result = subprocess.run(file_command, capture_output=True, text=True, check=True) if 'application/x-executable' in file_result.stdout or 'application/x-sharedlib' in file_result.stdout: elf_files.append(file_path) except subprocess.CalledProcessError as e: print(f"Error running file command on {file_path}: {e}", file=sys.stderr) except Exception as e: print(f"Error processing directory {start_dir}: {e}", file=sys.stderr) print(f"Found ELF files: {elf_files}") return elf_files def process_files(output_dir, start_dirs): os.makedirs(output_dir, exist_ok=True) elf_files = find_elf_files(start_dirs) if not elf_files: print("No ELF files found.") return for elf_file in elf_files: disassemble_elf(elf_file, output_dir) print("Disassembly complete. Assembly files are saved in the output directory.") def process_files_malware(folder_path, files_to_process): feature_matrix = np.zeros((len(files_to_process), 258), dtype=int) # Adjusted to 258 columns for k, file in enumerate(files_to_process): if file.endswith("bytes"): try: with open(os.path.join(folder_path, file), "r") as byte_file: for lines in byte_file: line = lines.rstrip().split(" ") for hex_code in line: if hex_code != '??': index = int(hex_code, 16) if index < 257: # Keep the bounds check for 257 feature_matrix[k][index] += 1 else: feature_matrix[k][257] += 1 # This now references the 258th feature except: continue # Normalize the features scaler = MinMaxScaler() feature_matrix = scaler.fit_transform(feature_matrix) return feature_matrix def test_files(folder_path, model_path, output_csv): files = os.listdir(folder_path) # Check if the CSV file already exists if os.path.exists(output_csv): existing_results = pd.read_csv(output_csv) already_scanned_files = set(existing_results['File'].tolist()) else: already_scanned_files = set() # Filter out files that have already been scanned files_to_process = [file for file in files if file not in already_scanned_files] if not files_to_process: print("All files have already been scanned.") return # Process only the files that haven't been scanned yet feature_matrix = process_files_malware(folder_path, files_to_process) # Load the trained model with open(model_path, 'rb') as model_file: model = pickle.load(model_file) # Make predictions predictions = model.predict(feature_matrix) prediction_probs = model.predict_proba(feature_matrix) # Create a DataFrame for the new results new_results = pd.DataFrame({ 'File': files_to_process, 'Predicted Class': predictions, 'Prediction Probability': [max(probs) for probs in prediction_probs] }) # Append new results to the existing CSV file or create a new one if os.path.exists(output_csv): new_results.to_csv(output_csv, mode='a', header=False, index=False) else: new_results.to_csv(output_csv, index=False) print(f"New predictions appended to {output_csv}") def run_malware_ai_analysis_bytes(): print("bytes malware analysis started") directory = bytes_output_directory model_files = bytes_model_directory model_folder = model_files # Folder containing the .pkl files model_files = [f for f in os.listdir(model_folder) if f.endswith('.pkl')] for model_file in model_files: model_path = os.path.join(model_folder, model_file) output_csv = os.path.join(bytes_result_directory, f"bytes_predictions_{os.path.splitext(model_file)[0]}.csv") test_files(directory, model_path, output_csv) try: send_predictions_to_api(output_csv) except: print("Connection Failed") def preprocess_asm_file(file_path): prefixes = ['.text:', '.Pav:', '.idata:', '.data:', '.bss:', '.rdata:', '.edata:', '.rsrc:', '.tls:', '.reloc:', '.BSS:', '.CODE'] opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add', 'imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb', 'jz', 'rtn', 'lea', 'movzx'] keywords = ['.dll', 'std::', ':dword'] registers = ['edx', 'esi', 'eax', 'ebx', 'ecx', 'edi', 'ebp', 'esp', 'eip'] # Initialize counts prefix_counts = np.zeros(len(prefixes), dtype=int) opcode_counts = np.zeros(len(opcodes), dtype=int) keyword_counts = np.zeros(len(keywords), dtype=int) register_counts = np.zeros(len(registers), dtype=int) # Process file with open(file_path, 'r', encoding='cp1252', errors='replace') as f: for line in f: line = line.rstrip().split() if not line: continue l = line[0] for i, prefix in enumerate(prefixes): if prefix in l: prefix_counts[i] += 1 line = line[1:] for i, opcode in enumerate(opcodes): if any(opcode == li for li in line): opcode_counts[i] += 1 for i, register in enumerate(registers): if any(register in li and ('text' in l or 'CODE' in l) for li in line): register_counts[i] += 1 for i, keyword in enumerate(keywords): if any(keyword in li for li in line): keyword_counts[i] += 1 # Create feature vector feature_vector = np.concatenate([prefix_counts, opcode_counts, register_counts, keyword_counts]) return feature_vector # Main function to load models and make predictions def run_malware_analysis_asm(asm_folder_path=asm_output_directory, models_folder=asm_model_directory): print("Starting analysis...") # Get all .asm files in the folder asm_files = [f for f in os.listdir(asm_folder_path) if f.endswith('.asm')] # Load all .pkl models from the models folder model_files = [f for f in os.listdir(models_folder) if f.endswith('.pkl')] models = {} for model_file in model_files: model_name = os.path.splitext(model_file)[0] with open(os.path.join(models_folder, model_file), 'rb') as f: model_clf = pickle.load(f) models[model_name] = model_clf # Prediction and saving results for model_name, model_clf in models.items(): print(f"Making asm predictions with {model_name}...") # Generate the correct class mapping def get_class_mapping(model_name): if model_name == 'XGBClassifier': return {i: i for i in range(9)} # XGB uses 0-8 else: return {i: i+1 for i in range(9)} # Other models use 1-9 class_mapping = get_class_mapping(model_name) # Check if result file for the model already exists results_file_path = f'{asm_result_directory}/asm_prediction_{model_name}.csv' if os.path.exists(results_file_path): results_df = pd.read_csv(results_file_path) else: results_df = pd.DataFrame(columns=['file_name', 'prediction', 'probability']) new_predictions = [] for asm_file in asm_files: if asm_file not in results_df['file_name'].values: file_path = os.path.join(asm_folder_path, asm_file) feature_vector = preprocess_asm_file(file_path) feature_vector = feature_vector.reshape(1, -1) # Predict using the current model prediction = model_clf.predict(feature_vector) probability = model_clf.predict_proba(feature_vector) mapped_prediction = class_mapping[prediction[0]] predicted_prob = probability[0][prediction[0]] if "XGB" in model_name.upper(): new_predictions.append({ 'file_name': asm_file, 'prediction': mapped_prediction+1, 'probability': predicted_prob }) else: new_predictions.append({ 'file_name': asm_file, 'prediction': mapped_prediction, 'probability': predicted_prob }) # Append new predictions to results DataFrame if new_predictions: new_predictions_df = pd.DataFrame(new_predictions) results_df = pd.concat([results_df, new_predictions_df], ignore_index=True) results_df.to_csv(results_file_path, index=False) print(f"Predictions saved to {results_file_path}.") try: send_asm_predictions_to_api(results_file_path) except: print("Connection Failed") def run_hex_conversion(): hex_dirs = [d.strip() for d in hex_files_entry.get().split(',')] hex_output_dir =bytes_output_directory if not hex_dirs or not hex_output_dir: messagebox.showwarning("Warning", "Please specify both directories and output directory.") return def hex_conversion_task(): for hex_dir in hex_dirs: hex_dir = hex_dir.strip() if os.path.isdir(hex_dir): scan_and_convert_directory(hex_dir, hex_output_dir) else: messagebox.showwarning("Warning", f"{hex_dir} is not a directory.") print("Hex conversion complete.") run_malware_ai_analysis_bytes() global isMonitoring if(not isMonitoring): isMonitoring = True start_monitoring() # hex_conversion_task() threading.Thread(target=hex_conversion_task).start() def run_disassembly(): start_dirs = [d.strip() for d in start_dirs_entry.get().split(',')] output_dir = asm_output_directory if not start_dirs or not output_dir: messagebox.showwarning("Warning", "Please specify both directories and output directory.") return def disassembly_task(): process_files(output_dir, start_dirs) run_malware_analysis_asm() global isMonitoring if(not isMonitoring): isMonitoring = True start_monitoring() # disassembly_task() threading.Thread(target=disassembly_task).start() def start_monitoring(): directories = [d.strip() for d in hex_files_entry.get().split(',')] directories += [d.strip() for d in start_dirs_entry.get().split(',')] output_dir = output_directory def monitoring_task(): monitor_directories(directories, output_dir) # Start monitoring in a new thread threading.Thread(target=monitoring_task, daemon=True).start() print("Started monitoring directories.") def on_closing(): root.destroy() def browse_hex_directories(): directories = [] while True: directory = filedialog.askdirectory(title="Select a Directory") if not directory: break # Stop if no more directories are selected directories.append(directory) if directories: hex_files_entry.delete(0, tk.END) hex_files_entry.insert(0, ', '.join(directories)) def browse_start_dirs(): directories = [] while True: directory = filedialog.askdirectory(title="Select a Directory") if not directory: break # Stop if no more directories are selected directories.append(directory) if directories: start_dirs_entry.delete(0, tk.END) start_dirs_entry.insert(0, ', '.join(directories)) def malware_gui(parent_frame): # Create a new window for malware analysis malware_window = tk.Toplevel(parent_frame) malware_window.title("Malware Analysis") # Add content to the malware window tk.Label(malware_window, text="Malware Analysis Section", font=("Arial", 16)).pack(pady=20) # Add any additional widgets needed for malware analysis tk.Label(malware_window, text="Select files for analysis:").pack(pady=5) malware_files_entry = tk.Entry(malware_window, width=80) malware_files_entry.pack(pady=5) tk.Button(malware_window, text="Browse...", command=browse_hex_directories).pack(pady=5) tk.Button(malware_window, text="Analyze Malware", command=lambda: print("Analyzing...")).pack(pady=10) import tkinter as tk from tkinter import ttk # Requires `pip install ttkthemes` def create_wizard_window(): global root root = tk.Tk() root.title("File Conversion and Disassembly Wizard") root.geometry("700x450") root.resizable(False, False) # Wizard frames with a more appealing color scheme frame1 = tk.Frame(root, bg="#ffffff") frame2 = tk.Frame(root, bg="#f9f9f9") frame3 = tk.Frame(root, bg="#f9f9f9") frame4 = tk.Frame(root, bg="#ffffff") frame5 = tk.Frame(root, bg="#f9f9f9") frame6 = tk.Frame(root, bg="#ffffff") frames = [frame1, frame2, frame3, frame4, frame5, frame6] def show_frame(frame): """Hide all frames and show only the specified one.""" for frm in frames: frm.pack_forget() frame.pack(fill='both', expand=True) def update_progress(step): """Update the progress bar and label to reflect the current step.""" progress_label.config(text=f"Step {step} of 5") progress_bar['value'] = (step / 5) * 100 # Enhanced Title Bar with Icons title_frame = tk.Frame(root, bg="#283593") title_frame.pack(fill="x", side="top") title_label = tk.Label( title_frame, text="🛠 Setup Wizard", font=("Arial", 16, "bold"), fg="white", bg="#283593", anchor="w", padx=20 ) title_label.pack(pady=10) # Progress bar with percentage display progress_frame = tk.Frame(root, bg="#f0f0f0") progress_frame.pack(fill="x", side="bottom") progress_bar = ttk.Progressbar(progress_frame, orient="horizontal", mode="determinate", length=500) progress_bar.pack(side="left", padx=20, pady=10) progress_label = tk.Label(progress_frame, text="Step 1 of 5", font=("Arial", 12), bg="#f0f0f0") progress_label.pack(side="left", padx=10) # Frame 1 - Welcome Screen label1 = tk.Label(frame1, text="Welcome to the File Conversion Wizard", font=("Arial", 18, "bold"), bg="#ffffff") label1.pack(pady=40) desc_label1 = tk.Label(frame1, text="This wizard will guide you through the steps.", bg="#ffffff", font=("Arial", 12)) desc_label1.pack(pady=10) next_button1 = ttk.Button(frame1, text="Next ➡️", command=lambda: [update_progress(2), show_frame(frame2)]) next_button1.pack(pady=20) # Frame 2 - Packet Capture Setup label2 = tk.Label(frame2, text="📡 Packet Capture Setup", font=("Arial", 16, "bold"), bg="#f9f9f9") label2.pack(pady=40) setup_gui(frame2) # Assuming you have this function defined button_frame2 = tk.Frame(frame2, bg="#f9f9f9") button_frame2.pack(side="bottom", pady=10) next_button2 = ttk.Button(button_frame2, text="Next ➡️", command=lambda: [update_progress(3), show_frame(frame3)], width=10) next_button2.pack(side="right", padx=10) back_button2 = ttk.Button(button_frame2, text="⬅️ Back", command=lambda: [update_progress(1), show_frame(frame1)], width=10) back_button2.pack(side="left", padx=10) # Frame 3 - Malware Analysis with Tabbed Layout notebook = ttk.Notebook(frame3) notebook.pack(fill='both', expand=True) hex_frame = ttk.Frame(notebook) asm_frame = ttk.Frame(notebook) notebook.add(hex_frame, text='🔢 Hex Conversion') notebook.add(asm_frame, text='⚙️ ELF Disassembly') tk.Label(hex_frame, text="Select Directories to Convert to Hex:", font=("Arial", 12)).pack(pady=5) global hex_files_entry hex_files_entry = tk.Entry(hex_frame, width=80) hex_files_entry.pack(pady=5) tk.Button(hex_frame, text="Browse...").pack(pady=5) tk.Button(hex_frame, text="Convert to Hex").pack(pady=10) tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:", font=("Arial", 12)).pack(pady=5) global start_dirs_entry start_dirs_entry = tk.Entry(asm_frame, width=80) start_dirs_entry.pack(pady=5) tk.Button(asm_frame, text="Browse...").pack(pady=5) tk.Button(asm_frame, text="Disassemble ELF Files").pack(pady=10) button_frame3 = tk.Frame(frame3, bg="#f9f9f9") button_frame3.pack(side="bottom", pady=10) next_button3 = ttk.Button(button_frame3, text="Next ➡️", command=lambda: [update_progress(4), show_frame(frame4)], width=10) next_button3.pack(side="right", padx=10) back_button3 = ttk.Button(button_frame3, text="⬅️ Back", command=lambda: [update_progress(2), show_frame(frame2)], width=10) back_button3.pack(side="left", padx=10) # Frame 4 - Ransomware Detection label4 = tk.Label(frame4, text="🛡️ Ransomware Detection", font=("Arial", 16, "bold"), bg="#f0f0f0", fg="#333") label4.pack(pady=20) directory_frame = tk.Frame(frame4, bg="#f0f0f0") directory_frame.pack(pady=10) selected_dir_label = tk.Label(directory_frame, text="No Directory Selected", width=50, bg="#f0f0f0", font=("Arial", 12), fg="#666") selected_dir_label.grid(row=1, column=0, pady=10) def select_directory(): # Placeholder for directory selection functionality selected_dir_label.config(text="Selected Directory Path") browse_button = ttk.Button(directory_frame, text="📂 Select Directory", command=select_directory) browse_button.grid(row=0, column=0, pady=5) status_label = tk.Label(frame4, text="", fg="blue", bg="#f0f0f0", font=("Arial", 12)) status_label.pack(pady=10) button_frame4 = tk.Frame(frame4, bg="#f0f0f0") button_frame4.pack(side="bottom", pady=15) next_button4 = ttk.Button(button_frame4, text="Next ➡️", command=lambda: [update_progress(5), show_frame(frame5)], width=10) next_button4.pack(side="right", padx=10) back_button4 = ttk.Button(button_frame4, text="⬅️ Back", command=lambda: [update_progress(3), show_frame(frame3)], width=10) back_button4.pack(side="left", padx=10) # Frame 5 - AuditD Manager label5 = tk.Label(frame5, text="🔍 AuditD Manager", font=("Arial", 16, "bold"), bg="#f0f0f0", fg="#333") label5.pack(pady=20) # Placeholder for AuditD Manager audit_app = AuditDManagerApp(frame5) # Uncomment and define if AuditDManagerApp is available audit_app.frame.pack(fill='both', expand=True, pady=10) button_frame5 = tk.Frame(frame5, bg="#f0f0f0") button_frame5.pack(side="bottom", pady=15) next_button5 = ttk.Button(button_frame5, text="Next ➡️", command=lambda: show_frame(frame6), width=10) next_button5.pack(side="right", padx=10) back_button5 = ttk.Button(button_frame5, text="⬅️ Back", command=lambda: [update_progress(4), show_frame(frame4)], width=10) back_button5.pack(side="left", padx=10) # Frame 6 - Completion Screen label6 = tk.Label(frame6, text="🎉 Setup Complete!", font=("Arial", 18, "bold"), bg="#ffffff") label6.pack(pady=40) desc_label6 = tk.Label(frame6, text="Thank you for using the setup wizard.", bg="#ffffff", font=("Arial", 12)) desc_label6.pack(pady=10) finish_button = ttk.Button(frame6, text="➡️ Finish ⬅️", command=root.quit, width=10) finish_button.pack(pady=10) # Show the first frame show_frame(frame1) root.mainloop() def on_closing(): root.quit() if device_check(): # If device check is successful, initialize the Tkinter window create_wizard_window() else: # If the device check fails, show an error message and exit print("Device check failed. Exiting program.") messagebox.showerror("Error", "Device check failed. The wizard will not start.") # def create_wizard_window(): # global root # root = tk.Tk() # root.title("File Conversion and Disassembly Wizard") # root.geometry("600x400") # root.resizable(False, False) # # Wizard frames # frame1 = tk.Frame(root, bg="#f0f0f0") # frame2 = tk.Frame(root, bg="#f0f0f0") # frame3 = tk.Frame(root, bg="#f0f0f0") # frame4 = tk.Frame(root, bg="#f0f0f0") # frame5 = tk.Frame(root, bg="#f0f0f0") # frame6 = tk.Frame(root, bg="#f0f0f0") # frames = [frame1, frame2, frame3, frame4, frame5, frame6] # current_step = 1 # def show_frame(frame): # """Hide all frames and show only the specified one.""" # for frm in frames: # frm.pack_forget() # frame.pack(fill='both', expand=True) # def update_progress(step): # """Update the progress bar and label to reflect the current step.""" # progress_label.config(text=f"Step {step} of 5") # progress_bar['value'] = (step / 5) * 100 # # Title bar frame for better aesthetics # title_frame = tk.Frame(root, bg="#0078d7") # title_frame.pack(fill="x", side="top") # title_label = tk.Label(title_frame, text="Setup Wizard", font=("Arial", 14, "bold"), fg="white", bg="#0078d7") # title_label.pack(pady=10) # # Progress bar # progress_bar = ttk.Progressbar(root, orient="horizontal", mode="determinate", length=400) # progress_bar.pack(side="bottom", pady=10) # progress_label = tk.Label(root, text="Step 1 of 5", font=("Arial", 12)) # progress_label.pack(side="bottom") # # Frame 1 - Welcome Screen # label1 = tk.Label(frame1, text="Welcome to the File Conversion Wizard", font=("Arial", 16), bg="#f0f0f0") # label1.pack(pady=40) # desc_label1 = tk.Label(frame1, text="This wizard will guide you through the steps.", bg="#f0f0f0", font=("Arial", 12)) # desc_label1.pack(pady=10) # next_button1 = ttk.Button(frame1, text="Next", command=lambda: [update_progress(2), show_frame(frame2)]) # next_button1.pack(pady=10, side="bottom") # # Frame 2 - Packet Capture UI # label2 = tk.Label(frame2, text="Packet Capture Setup", font=("Arial", 16), bg="#f0f0f0") # label2.pack(pady=40) # # Insert your packet capture setup UI here # setup_gui(frame2) # Assuming you have this function defined # button_frame2 = tk.Frame(frame2, bg="#f0f0f0") # button_frame2.pack(side="bottom", pady=10) # next_button2 = ttk.Button(button_frame2, text="Next", command=lambda: [update_progress(3), show_frame(frame3)], width=10) # next_button2.pack(side="right", padx=10) # back_button2 = ttk.Button(button_frame2, text="Back", command=lambda: [update_progress(1), show_frame(frame1)], width=10) # back_button2.pack(side="left", padx=10) # # Frame 3 - Malware Analysis # notebook = ttk.Notebook(frame3) # notebook.pack(fill='both', expand=True) # hex_frame = ttk.Frame(notebook) # asm_frame = ttk.Frame(notebook) # notebook.add(hex_frame, text='Hex Conversion') # notebook.add(asm_frame, text='ELF Disassembly') # tk.Label(hex_frame, text="Select Directories to Convert to Hex:", font=("Arial", 12)).pack(pady=5) # global hex_files_entry # hex_files_entry = tk.Entry(hex_frame, width=80) # hex_files_entry.pack(pady=5) # tk.Button(hex_frame, text="Browse...",command=browse_hex_directories).pack(pady=5) # tk.Button(hex_frame, text="Convert to Hex",command=run_hex_conversion).pack(pady=10) # tk.Label(asm_frame, text="Select Directories to Scan for ELF Files:", font=("Arial", 12)).pack(pady=5) # global start_dirs_entry # start_dirs_entry = tk.Entry(asm_frame, width=80) # start_dirs_entry.pack(pady=5) # tk.Button(asm_frame, text="Browse...", command=browse_start_dirs).pack(pady=5) # tk.Button(asm_frame, text="Disassemble ELF Files",command=run_disassembly).pack(pady=10) # button_frame3 = tk.Frame(frame3, bg="#f0f0f0") # button_frame3.pack(side="bottom", pady=10) # next_button3 = ttk.Button(button_frame3, text="Next", command=lambda: [update_progress(4), show_frame(frame4)], width=10) # next_button3.pack(side="right", padx=10) # back_button3 = ttk.Button(button_frame3, text="Back", command=lambda: [update_progress(2), show_frame(frame2)], width=10) # back_button3.pack(side="left", padx=10) # # Frame 4 - Ransomware Detection # label4 = tk.Label(frame4, text="Ransomware Detection", font=("Arial", 16), bg="#f0f0f0") # label4.pack(pady=40) # directory_frame = tk.Frame(frame4, bg="#f0f0f0") # directory_frame.pack(pady=10) # selected_dir_label = tk.Label(directory_frame, text="No Directory Selected", width=40, bg="#f0f0f0") # selected_dir_label.grid(row=1, column=0) # # Directory label # def select_directory(): # directory = browse_directory() # if directory: # selected_dir_label.config(text=directory) # # Start the watchdog observer # browse_button = ttk.Button(directory_frame, text="Select Directory", command=select_directory) # browse_button.grid(row=0, column=0) # status_label = tk.Label(frame4, text="", fg="blue", bg="#f0f0f0") # status_label.pack(pady=10) # run_button = ttk.Button(frame4, text="Run Predictions", command=lambda: run_predictions(selected_dir_label.cget("text"), status_label)) # run_button.pack(pady=10) # button_frame4 = tk.Frame(frame4, bg="#f0f0f0") # button_frame4.pack(side="bottom", pady=10) # next_button4 = ttk.Button(button_frame4, text="Next", command=lambda: [update_progress(5), show_frame(frame5)], width=10) # next_button4.pack(side="right", padx=10) # back_button4 = ttk.Button(button_frame4, text="Back", command=lambda: [update_progress(3), show_frame(frame3)], width=10) # back_button4.pack(side="left", padx=10) # # Frame 5 - AuditD Manager # audit_app = AuditDManagerApp(frame5) # audit_app.frame.pack(fill='both', expand=True) # # tk.Label(frame5, text="AuditD Manager", font=("Arial", 16), bg="#f0f0f0").pack(pady=40) # button_frame5 = tk.Frame(frame5, bg="#f0f0f0") # button_frame5.pack(side="bottom", pady=10) # next_button5 = ttk.Button(button_frame5, text="Next", command=lambda: show_frame(frame6), width=10) # next_button5.pack(side="right", padx=10) # back_button5 = ttk.Button(button_frame5, text="Back", command=lambda: [update_progress(4), show_frame(frame4)], width=10) # back_button5.pack(side="left", padx=10) # # Frame 6 - Summary # label6 = tk.Label(frame6, text="Setup Complete!", font=("Arial", 16), bg="#f0f0f0") # label6.pack(pady=40) # desc_label6 = tk.Label(frame6, text="Thank you for using the setup wizard.", bg="#f0f0f0", font=("Arial", 12)) # desc_label6.pack(pady=10) # finish_button = ttk.Button(frame6, text="Finish", command=root.quit, width=10) # finish_button.pack(pady=10) # # Show the first frame # show_frame(frame1) # root.mainloop() # def on_closing(): # root.quit() # if __name__ == "__main__": # root = tk.Tk() # create_wizard_window()