web_defender/Dashboard/views.py
2024-12-09 13:43:16 +05:30

1692 lines
59 KiB
Python

from django.shortcuts import render
from botocore.exceptions import NoCredentialsError
import boto3
import io
import csv
import json
from django.http import HttpResponse
import os
import botocore
from django.http import JsonResponse
from django.conf import settings
import re
import random
from cryptography.fernet import Fernet
from django.http import JsonResponse
from Accounts.models import UserProfile
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
from django.views.decorators.cache import never_cache
from .models import*
from .serializers import*
from rest_framework import generics
from django.shortcuts import render,redirect,get_object_or_404
from django.views.decorators.csrf import csrf_exempt
import pytz
from datetime import datetime
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import SqlStatus
from .serializers import SqlStatusSerializer
import csv
from rest_framework.decorators import api_view
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
import pandas as pd
import time
#4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w
#==========================================================================
def current_user(request):
try:
user_profile = UserProfile.objects.get(user=request.user)
company_name = user_profile.company_name
data = {
'username': request.user.username,
'company_name': company_name
}
return JsonResponse(data)
except UserProfile.DoesNotExist:
return JsonResponse({'error': 'UserProfile not found for this user'})
#==========================================================================================
@never_cache
@login_required(login_url='login')
def home(request):
return render(request,"dashboard/index.html")
@never_cache
@login_required(login_url='login')
def navbar(request):
return render(request, 'navbar/nav.html')
#===========================================================================================
@never_cache
@login_required(login_url='login')
def ddos(request):
file_path = 'media/ddos_predictions/predictions.csv'
data = pd.read_csv(file_path)
# Create a mapping for protocol names to their short forms
protocol_mapping = {
"Protocol_ICMP": "ICMP",
"Protocol_TCP": "TCP",
"Protocol_UDP": "UDP",
"Protocol_HTTP": "HTTP",
"Protocol_HTTPS": "HTTPS",
"Protocol_SSH": "SSH",
"Protocol_DHCP": "DHCP",
"Protocol_FTP": "FTP",
"Protocol_SMTP": "SMTP",
"Protocol_POP3": "POP3",
"Protocol_IMAP": "IMAP",
"Protocol_DNS": "DNS"
}
ddos_columns = ['pktcount', 'byteperflow', 'tot_kbps', 'rx_kbps', 'flows', 'bytecount', 'tot_dur']
ddos_sums = {col: int(data[col].sum()) for col in ddos_columns}
ddos_sums['byteperflow'] /= 15
ddos_sums['tot_kbps'] /= 15
src_ip_counts = data['src_ip'].value_counts()
src_ip_dict = src_ip_counts.to_dict()
dest_ip_counts = data['dst_ip'].value_counts()
dest_ip_dict = dest_ip_counts.to_dict()
protocol_columns = data.columns[7:19]
protocol_counts = {}
for protocol in protocol_columns:
short_form = protocol_mapping.get(protocol, protocol) # Default to the original name if not found
protocol_counts[short_form] = int((data[protocol] == 1).sum())
print(protocol_counts)
filtered_data = data[data['probability'] > 0.9]
src_ip_counts2 = filtered_data['src_ip'].value_counts()
src_ip_dict2 = src_ip_counts2.to_dict()
return render(request, 'ddos/ddos.html',{'ddos_sums': ddos_sums,'src_ip_dict' : src_ip_dict , 'dest_ip_dict' : dest_ip_dict , 'protocol_counts' : protocol_counts,'src_ip_dict2' : src_ip_dict2})
#================================================================================
@never_cache
def read_tx_bytes(request):
file_key = 'network_data.csv'
spaces_endpoint = "https://li-phy.sfo3.digitaloceanspaces.com"
spaces_key = "DO00JUFYECFVU7FNT4GX"
spaces_secret = "4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w"
spaces_bucket = "Extract"
try:
session = boto3.session.Session()
client = session.client('s3',
region_name="sfo3",
endpoint_url=spaces_endpoint,
aws_access_key_id=spaces_key,
aws_secret_access_key=spaces_secret)
response = client.get_object(Bucket=spaces_bucket, Key=file_key)
csv_data = response['Body'].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_data))
tx_bytes_data = []
tx_kbps_data = []
switch_data = []
for row in csv_reader:
tx_bytes_data.append(row['tx_bytes'])
tx_kbps_data.append(row['tx_kbps'])
switch_data.append(row['switch'])
return JsonResponse({
"tx_bytes_data": tx_bytes_data,
"tx_kbps_data": tx_kbps_data,
"switch_data": switch_data
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
@never_cache
def dataset_sdn(request):
file_key = 'network_data.csv'
spaces_endpoint = "https://li-phy.sfo3.digitaloceanspaces.com"
spaces_key = "DO00JUFYECFVU7FNT4GX"
spaces_secret = "4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w"
spaces_bucket = "Extract"
try:
session = boto3.session.Session()
client = session.client('s3',
region_name="sfo3",
endpoint_url=spaces_endpoint,
aws_access_key_id=spaces_key,
aws_secret_access_key=spaces_secret)
response = client.get_object(Bucket=spaces_bucket, Key=file_key)
csv_data = response['Body'].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_data))
tx_bytes_data = []
tx_kbps_data = []
switch_data = []
for row in csv_reader:
tx_bytes_data.append(row['dt'])
tx_bytes_data.append(row['rx_bytes'])
tx_bytes_data.append(row['Protocol'])
tx_bytes_data.append(row['pktrate'])
tx_bytes_data.append(row['packetins'])
tx_bytes_data.append(row['tot_dur'])
tx_bytes_data.append(row['tx_bytes'])
tx_bytes_data.append(row['bytecount'])
tx_bytes_data.append(row['dst'])
switch_data.append(row['switch'])
tx_kbps_data.append(row['tot_kbps'])
tx_kbps_data.append(row['rx_kbps'])
return JsonResponse({
"tx_bytes_data": tx_bytes_data,
"tx_kbps_data": tx_kbps_data,
"switch_data": switch_data
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
#================================== DMA =====================================================
@never_cache
def kern_log(request):
space_name = 'Extract'
object_key = 'kern.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
encrypted_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
encrypted_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'kern_log': encrypted_log,
}
return JsonResponse(live_data)
#===============================================================================================
def extract_addresses(log_data):
pattern = r'\[mem ([0-9a-fx-]+)\]'
matches = re.findall(pattern, log_data)
full_addresses = [match for match in matches]
filtered_addresses = [match[-9:] for match in matches]
return full_addresses, filtered_addresses
@never_cache
def address_log(request):
space_name = 'Extract'
object_key = 'address.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
address_logs = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
full_addresses, filtered_addresses = extract_addresses(address_logs)
except NoCredentialsError as e:
full_addresses = [f"Error fetching logs.txt data: {str(e)}"]
filtered_addresses = []
live_data = {
'full_addresses': full_addresses,
'filtered_addresses': filtered_addresses,
}
return JsonResponse(live_data)
@never_cache
def encrypt_log(request):
space_name = 'Extract'
object_key = 'encrypt.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
encrypted_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
if not encrypted_log:
local_file_path = settings.MEDIA_ROOT,'/encrypt.txt'
with open(local_file_path, 'r') as local_file:
encrypted_log = local_file.read()
pattern = r'\[mem (.*?)\]'
matches = re.findall(pattern, encrypted_log)
last_20_characters_list = [match[-35:] for match in matches]
last_8_characters_list = [match[-7:] for match in matches]
except NoCredentialsError as e:
encrypted_log = f"Error fetching logs.txt data: {str(e)}"
last_20_characters_list = []
last_8_characters_list = []
live_data = {
'last_8_characters_list': last_8_characters_list,
'last_20_characters_list': last_20_characters_list,
}
return JsonResponse(live_data)
def encrypt_password(password, encryption_key):
f = Fernet(encryption_key)
encrypted_password = f.encrypt(password.encode())
return encrypted_password
def random_line_from_file(file_name):
media_path = os.path.join(settings.MEDIA_ROOT, file_name)
with open(media_path, 'r') as file:
lines = file.readlines()
return random.choice(lines).strip()
@never_cache
def password_view(request):
encryption_key = b'b4gZcpKPnr8y56c_3YggQcT66GXiV3TbwOqM8OvGmow='
original_password = random_line_from_file('flexxon.txt')
encrypted_password = encrypt_password(original_password, encryption_key)
response_data = {
"original_password": original_password,
"encrypted_password": encrypted_password.decode()
}
return JsonResponse(response_data)
@never_cache
def processes_log(request):
space_name = 'Extract'
object_key = 'logs.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
processes_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
processes_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'processes_log': processes_log,
}
return JsonResponse(live_data)
#---------------------------------------------------------------------------------------
@login_required(login_url='login')
@never_cache
def dma(request):
return render(request, 'dma/dma.html')
def get_combined_files():
df1 = pd.read_csv('media/malware_predictions/bytes_predictions_KNeighborsClassifier.csv')
df2 = pd.read_csv('media/malware_predictions/bytes_predictions_RandomForestClassifier.csv')
df3 = pd.read_csv('media/malware_predictions/latest_malware_bytes_predictions_SGD.csv')
df4 = pd.read_csv('media/malware_predictions/latest_malware_bytes_predictions_XGB.csv')
# df1 = pd.read_csv('media/temp/bytes_predictions_KNeighborsClassifier.csv')
# df2 = pd.read_csv('media/temp/bytes_predictions_RandomForestClassifier.csv')
# df3 = pd.read_csv('media/temp/bytes_predictions_SGDClassifier.csv')
# df4 = pd.read_csv('media/temp/bytes_predictions_XGBooster.csv')
# Step 2: Create a new DataFrame to hold combined results
combined_data1 = pd.DataFrame()
# Step 3: Combine predictions
combined_data1['File'] = df1['File'] # Assuming all files are the same
combined_data1['Predicted Class'] = df1['Predicted Class'] # Placeholder
combined_data1['Prediction Probability'] = 0.0 # Initialize probability column
max_length = max(len(df1), len(df2), len(df3), len(df4))
# Step 4: Loop through each row and calculate the highest probability and average
# for i in range(len(df1)):
# # Get probabilities from all models
# probs = [
# df1['Prediction Probability'][i],
# df2['Prediction Probability'][i],
# df3['Prediction Probability'][i],
# df4['Prediction Probability'][i],
# ]
# # Get predicted classes
# classes = [
# df1['Predicted Class'][i],
# df2['Predicted Class'][i],
# df3['Predicted Class'][i],
# df4['Predicted Class'][i],
# ]
# # Find the index of the highest probability
# max_index = probs.index(max(probs))
# # Set the highest predicted class
# combined_data1.at[i, 'Predicted Class'] = classes[max_index]
# # Calculate the average probability
# combined_data1.at[i, 'Prediction Probability'] = sum(probs) / len(probs)
for i in range(max_length):
probs, classes = [], []
for df in [df1, df2, df3, df4]:
try:
probs.append(df['Prediction Probability'].iloc[i])
classes.append(df['Predicted Class'].iloc[i])
except IndexError:
# Skip if the row does not exist in this DataFrame
pass
if probs and classes:
max_index = probs.index(max(probs))
combined_data1.at[i, 'Predicted Class'] = classes[max_index]
combined_data1.at[i, 'Prediction Probability'] = sum(probs) / len(probs)
df5 = pd.read_csv('media/malware_predictions/latest_malware_ASM_predictions_KNeighbours.csv')
df6 = pd.read_csv('media/malware_predictions/latest_malware_ASM_predictions_LogisticRegression.csv')
df7 = pd.read_csv('media/malware_predictions/latest_malware_ASM_predictions_RandomForest.csv')
df8 = pd.read_csv('media/malware_predictions/latest_malware_ASM_predictions_XGB.csv')
combined_data2 = pd.DataFrame()
# Step 3: Combine predictions
combined_data2['File'] = df5['File'] # Assuming all files are the same
combined_data2['Predicted Class'] = df5['Predicted Class'] # Placeholder
combined_data2['Prediction Probability'] = 0.0 # Initialize probability column
# Step 4: Loop through each row and calculate the highest probability and average
for i in range(len(df5)):
# Get probabilities from all models
probs = [
df5['Prediction Probability'][i],
df6['Prediction Probability'][i],
df7['Prediction Probability'][i],
df8['Prediction Probability'][i],
]
# Get predicted classes
classes = [
df5['Predicted Class'][i],
df6['Predicted Class'][i],
df7['Predicted Class'][i],
df8['Predicted Class'][i],
]
# Find the index of the highest probability
max_index = probs.index(max(probs))
# Set the highest predicted class
combined_data2.at[i, 'Predicted Class'] = classes[max_index]
# Calculate the average probability
# combined_data2.at[i, 'Prediction Probability'] = sum(probs) / len(probs)
combined_data2.at[i,'Prediction Probability'] = probs[max_index]
combined_data = pd.concat([combined_data1, combined_data2], ignore_index=True)
return combined_data
@login_required(login_url='login')
@never_cache
def malware(request):
combined_data = get_combined_files()
class_names = {
1: "Ramnit",
2: "Lollipop",
3: "Kelihos_ver3",
4: "Vundo",
5: "Simda",
6: "Tracur",
7: "Kelihos_ver1",
8: "Obfuscator.ACY",
9: "Gatak"
}
high_probability_files = combined_data[combined_data['Prediction Probability'] >= 0.9]
files_list = high_probability_files['File'].tolist()
files70_90 = combined_data[(combined_data['Prediction Probability'] >= 0.7) & (combined_data['Prediction Probability'] < 0.9)]
frequency = files70_90['Predicted Class'].value_counts().sort_index()
complete_index = pd.Index(range(10))
frequency = frequency.reindex(complete_index, fill_value=0)
print(frequency,'in the frequency')
# if frequency:
# print("Check_malware_frequency")
all_frequency = combined_data['Predicted Class'].value_counts().reindex(range(1, 10), fill_value=0).sort_index()
frequency_with_names = all_frequency.rename(class_names)
print(frequency_with_names,'with name')
avg_probability = combined_data.groupby('Predicted Class')['Prediction Probability'].mean().reset_index()
all_classes = pd.DataFrame({'Predicted Class': range(1, 10)})
avg_probability = pd.merge(all_classes, avg_probability, on='Predicted Class', how='left')
avg_probability['Prediction Probability'].fillna(0, inplace=True)
avg_probability['Class Name'] = avg_probability['Predicted Class'].map(class_names)
average_probability_dict = dict(zip(avg_probability['Class Name'], avg_probability['Prediction Probability']))
print(average_probability_dict,"avg is here ")
file_path = os.path.join(settings.MEDIA_ROOT, 'logs', 'logs.txt')
data = None
try:
with open(file_path, 'r') as file:
data = file.readlines()[::-1] # Reverse lines for latest logs
except:
pass
return render(request, 'malware/malware.html', {'files_list': files_list , 'frequency' : frequency.to_dict() , 'class_frequency' : frequency_with_names.to_dict() , 'average' : average_probability_dict ,"logs":data})
@never_cache
def bye_asm_log(request):
space_name = 'Extract'
object_key = 'extract.log'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
extract_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
# Split the log into lines, sort them in descending order, and join them back into a string
extract_log_lines = extract_log.split('\n')
extract_log_lines.sort(reverse=True)
extract_log = '\n'.join(extract_log_lines)
print(extract_log)
except NoCredentialsError as e:
extract_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'extract_log': extract_log,
}
return JsonResponse(live_data)
@login_required(login_url='login')
@never_cache
def ransomware(request):
file_path = 'media/logs/usage_log.txt'
cpu_data = []
memory_data = []
# Read data from the log file
if os.path.exists(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
# Extract the last 5 entries
lines = lines[-5:]
for line in lines:
# Parse CPU and memory usage from each line
parts = line.strip().split(",")
cpu_usage = parts[0]
memory_usage = parts[1]
cpu_data.append(cpu_usage)
memory_data.append(memory_usage)
csv_file_path = 'media/ransomware_predictions/latest_ransomware_type.csv' # Replace with your actual CSV file path
df = pd.read_csv(csv_file_path)
mapping_file_path = 'media/ransomware_predictions/mapping_win.txt'
mapping_df = pd.read_csv(mapping_file_path, header=None, names=['predicted_class', 'class_name'])
class_mapping = dict(zip(mapping_df['predicted_class'], mapping_df['class_name']))
df['class_name'] = df['predicted_class'].map(class_mapping)
class_frequency = df['class_name'].value_counts()
all_classes_df = pd.DataFrame({'class_name': mapping_df['class_name']})
all_classes_df['frequency'] = all_classes_df['class_name'].map(class_frequency).fillna(0).astype(int)
class_frequency_dict = dict(zip(all_classes_df['class_name'], all_classes_df['frequency']))
yes_no_path = 'media/ransomware_predictions/ransomware.csv'
# Reading the CSV file into a DataFrame
yes_no = pd.read_csv(yes_no_path)
# # Extracting the value of 'Predicted Label'
flag =yes_no[yes_no.columns[-1]].iloc[0]
time = yes_no[yes_no.columns[-2]].iloc[0]
return render(request, 'ransomware/ransomware.html' , context={ 'type' : class_frequency_dict, 'cpu' : json.dumps(cpu_data) , 'memory' : json.dumps(memory_data) , 'flag' : flag,'time' : time})
#==================================================================================================
import time
def read_data_from_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
data = file.readlines()
return [line.strip() for line in data]
def variable_names(request):
# Read data from the file
data = read_data_from_file('media/VariableNames.txt')
# Prepare JSON data
json_data = {'data': data}
return JsonResponse(json_data)
#=============================================================================================
@login_required(login_url='login')
@never_cache
def bubble(request):
return render(request, 'dashboard/bubble.html')
#==========================================================================================
@csrf_exempt
def get_number_status(request):
try:
number_obj = Number.objects.get(id=1)
return JsonResponse({'status': number_obj.status})
except Number.DoesNotExist:
return JsonResponse({'error': 'Number object not found'})
@csrf_exempt
def get_number_status1(request):
try:
number_obj = Number.objects.get(id=2)
return JsonResponse({'status': number_obj.status})
except Number.DoesNotExist:
return JsonResponse({'error': 'Number object not found'})
@csrf_exempt
def status_change(request, pk):
number = get_object_or_404(Number, pk=pk)
# Change status only if it's currently True
if number.status:
number.status = False
number.save()
return JsonResponse({'status': number.status})
@never_cache
def detect_log(request):
space_name = 'Extract'
object_key = 'detect.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
processes_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
processes_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'processes_log': processes_log,
}
return JsonResponse(live_data)
@csrf_exempt
def change_status(request):
if request.method == 'POST':
try:
number_obj = Number.objects.first()
if number_obj:
if number_obj.status == 0:
number_obj.status = 1 # Change status from 0 to 1
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
return JsonResponse({'error': 'Invalid request method'})
def combined_info(request):
space_name = 'Extract'
object_key = 'detect.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
detect_txt_data = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
detect_txt_data = f"Error fetching detect.txt data: {str(e)}"
context = {
's3_detect_txt_content': detect_txt_data,
}
return render(request, 'ransomware/view_logs.html', context)
#========================================================================================================
def get_log_data(request):
# Call the modified combined_info view to fetch log content
response = combined_info(request)
return response
#========================================================================================================
@csrf_exempt
def change_status_shadow(request, pk):
try:
number_obj = get_object_or_404(Number, pk=pk)
if number_obj:
if number_obj.status == False:
number_obj.status = True # Change status from 0 to 1
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
@csrf_exempt
def change_status_shadow_web(request, pk):
try:
number_obj = get_object_or_404(Number, pk=pk)
if number_obj:
if number_obj.status == True:
number_obj.status = False
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
# device 2 functions ===========================================================================
@csrf_exempt
def get_number_status11(request):
try:
number_obj = Number.objects.get(id=3)
return JsonResponse({'status': number_obj.status})
except Number.DoesNotExist:
return JsonResponse({'error': 'Number object not found'})
@csrf_exempt
def get_number_status12(request):
try:
number_obj = Number.objects.get(id=4)
return JsonResponse({'status': number_obj.status})
except Number.DoesNotExist:
return JsonResponse({'error': 'Number object not found'})
@csrf_exempt
def status_change1(request, pk):
number = get_object_or_404(Number, pk=pk)
# Change status only if it's currently True
if number.status:
number.status = False
number.save()
return JsonResponse({'status': number.status})
@csrf_exempt
def change_status1(request):
if request.method == 'POST':
try:
number_obj = Number.objects.get(id=3)
if number_obj:
if number_obj.status == 0:
number_obj.status = 1 # Change status from 0 to 1
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
return JsonResponse({'error': 'Invalid request method'})
@csrf_exempt
def change_status_shadow1(request, pk):
try:
number_obj = get_object_or_404(Number, pk=pk)
if number_obj:
if number_obj.status == False:
number_obj.status = True # Change status from 0 to 1
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
@csrf_exempt
def change_status_shadow_web1(request, pk):
try:
number_obj = get_object_or_404(Number, pk=pk)
if number_obj:
if number_obj.status == True:
number_obj.status = False
number_obj.save()
return JsonResponse({'status': number_obj.status})
else:
return JsonResponse({'error': 'Number object not found'})
except Exception as e:
return JsonResponse({'error': str(e)})
#=================================================================================================
#===============================Singapor device==================================================
@never_cache
def read_tx_bytes1(request):
file_key = 'network_data.csv'
spaces_endpoint = "https://li-phy.sfo3.digitaloceanspaces.com"
spaces_key = "DO00JUFYECFVU7FNT4GX"
spaces_secret = "4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w"
spaces_bucket = "LA"
try:
session = boto3.session.Session()
client = session.client('s3',
region_name="sfo3",
endpoint_url=spaces_endpoint,
aws_access_key_id=spaces_key,
aws_secret_access_key=spaces_secret)
response = client.get_object(Bucket=spaces_bucket, Key=file_key)
csv_data = response['Body'].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_data))
tx_bytes_data = []
tx_kbps_data = []
switch_data = []
for row in csv_reader:
tx_bytes_data.append(row['tx_bytes'])
tx_kbps_data.append(row['tx_kbps'])
switch_data.append(row['switch'])
return JsonResponse({
"tx_bytes_data": tx_bytes_data,
"tx_kbps_data": tx_kbps_data,
"switch_data": switch_data
})
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
#==========================================================================================
@never_cache
def kern_log1(request):
space_name = 'LA'
object_key = 'kern.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
encrypted_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
encrypted_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'kern_log': encrypted_log,
}
return JsonResponse(live_data)
@never_cache
def address_log1(request):
space_name = 'LA'
object_key = 'address.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
address_logs = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
full_addresses, filtered_addresses = extract_addresses(address_logs)
except NoCredentialsError as e:
full_addresses = [f"Error fetching logs.txt data: {str(e)}"]
filtered_addresses = []
live_data = {
'full_addresses': full_addresses,
'filtered_addresses': filtered_addresses,
}
return JsonResponse(live_data)
@never_cache
def encrypt_log1(request):
space_name = 'LA'
object_key = 'encrypt.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
encrypted_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
if not encrypted_log:
local_file_path = settings.MEDIA_ROOT,'/encrypt.txt'
with open(local_file_path, 'r') as local_file:
encrypted_log = local_file.read()
pattern = r'\[mem (.*?)\]'
matches = re.findall(pattern, encrypted_log)
last_20_characters_list = [match[-35:] for match in matches]
last_8_characters_list = [match[-7:] for match in matches]
except NoCredentialsError as e:
encrypted_log = f"Error fetching logs.txt data: {str(e)}"
last_20_characters_list = []
last_8_characters_list = []
live_data = {
'last_8_characters_list': last_8_characters_list,
'last_20_characters_list': last_20_characters_list,
}
return JsonResponse(live_data)
@never_cache
def processes_log1(request):
space_name = 'LA'
object_key = 'logs.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
processes_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
processes_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'processes_log': processes_log,
}
return JsonResponse(live_data)
@never_cache
def bye_asm_log1(request):
space_name = 'LA'
object_key = 'extract.log'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
extract_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
# Split the log into lines, sort them in descending order, and join them back into a string
extract_log_lines = extract_log.split('\n')
extract_log_lines.sort(reverse=True)
extract_log = '\n'.join(extract_log_lines)
except NoCredentialsError as e:
extract_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'extract_log': extract_log,
}
return JsonResponse(live_data)
@never_cache
def detect_log1(request):
space_name = 'LA'
object_key = 'detect.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
processes_log = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
processes_log = f"Error fetching logs.txt data: {str(e)}"
live_data = {
'processes_log': processes_log,
}
return JsonResponse(live_data)
def get_log_data1(request):
# Call the modified combined_info view to fetch log content
response = combined_info1(request)
return response
def combined_info1(request):
space_name = 'LA'
object_key = 'detect.txt'
aws_access_key_id = 'DO00JUFYECFVU7FNT4GX'
aws_secret_access_key = '4tdFBteJ29Mpc8rUN+eaOq/q9BLvpdiQh9iR+ZVjE1w'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
detect_txt_data = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
except NoCredentialsError as e:
detect_txt_data = f"Error fetching detect.txt data: {str(e)}"
context = {
's3_detect_txt_content': detect_txt_data,
}
return render(request, 'ransomware/view_logs1.html', context)
#====================================================================================================
def generate_random_values(request):
values = {
'MSSQL': random.randint(1, 10),
'Portmap': random.randint(1, 10),
'UDP': random.randint(1, 10),
'NetBIOS': random.randint(1, 10),
'Syn': random.randint(1, 10),
'UDPLag': random.randint(1, 10),
}
return JsonResponse(values)
#===============================================================================================
# aws_access_key_id = 'DO00A4LNJKM8FZ4PVRD4'
# aws_secret_access_key = 'zc22Pm7xmG5pfSHl5aHbyhdYHMJAN1sIRV7+/ZzLVPo'
# space_name = 'ddos'
# region_name = 'sfo3'
# endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
# # Initialize the boto3 client
# s3 = boto3.client('s3',
# aws_access_key_id=aws_access_key_id,
# aws_secret_access_key=aws_secret_access_key,
# region_name=region_name,
# endpoint_url=endpoint_url)
# def fetch_ddos_value(request):
# object_key = "udp.log"
# try:
# # Fetch the file content directly from DigitalOcean Space
# response = s3.get_object(Bucket=space_name, Key=object_key)
# content = response['Body'].read().decode('utf-8')
# # Return the content as a JSON response
# return JsonResponse({"log_content": content})
# except Exception as e:
# return JsonResponse({"error": str(e)}, status=500)
def fetch_ddos_value(request):
space_name = 'ddos'
files = [
'mysql.log',
'netbios.log',
'portmap.log',
'syn.log',
'udplag.log',
'udp.log',
]
aws_access_key_id = 'DO00A4LNJKM8FZ4PVRD4'
aws_secret_access_key = 'zc22Pm7xmG5pfSHl5aHbyhdYHMJAN1sIRV7+/ZzLVPo'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
data = {}
error_messages = []
for file_key in files:
try:
response = s3.get_object(Bucket=space_name, Key=file_key)
content = response['Body'].read().decode('utf-8')
data[file_key] = content
except NoCredentialsError as e:
error_messages.append(f"Error fetching {file_key} data: No credentials; {str(e)}")
except Exception as e:
error_messages.append(f"Error fetching {file_key} data: {str(e)}")
if error_messages:
return JsonResponse({"errors": error_messages}, status=500)
else:
return JsonResponse({"data": data})
#===================================================================
class SqlStatusView(APIView):
def get(self, request):
status_instance = SqlStatus.objects.first() # Assuming you want the first instance
if status_instance:
serializer = SqlStatusSerializer(status_instance)
return Response(serializer.data)
else:
return Response({"error": "SqlStatus instance not found"}, status=status.HTTP_404_NOT_FOUND)
#===============================================================================
from django.http import JsonResponse
import boto3
from botocore.exceptions import NoCredentialsError
def sql_status_info(request):
space_name = 'ddos'
object_key = 'mysql.log'
aws_access_key_id = 'DO00A4LNJKM8FZ4PVRD4'
aws_secret_access_key = 'zc22Pm7xmG5pfSHl5aHbyhdYHMJAN1sIRV7+/ZzLVPo'
endpoint_url = 'https://li-phy.sfo3.digitaloceanspaces.com'
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name='sfo3',
endpoint_url=endpoint_url)
try:
mysql = s3.get_object(Bucket=space_name, Key=object_key)['Body'].read().decode('utf-8')
status = 'success'
except NoCredentialsError as e:
mysql = f"Error fetching mysql.log data: {str(e)}"
status = 'error'
response_data = {
'mysql': mysql,
}
return JsonResponse(response_data)
@csrf_exempt
def restore_database(request):
if request.method == "POST":
try:
record = RestoreDatabase.objects.first() # Assuming you want to update the first record
if record:
record.value = "1"
record.save()
return JsonResponse({"status": "success"})
else:
return JsonResponse({"status": "error", "message": "No record found"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)})
return JsonResponse({"status": "error", "message": "Invalid request"})
@csrf_exempt
def restore_database_default(request):
try:
record = RestoreDatabase.objects.first() # Assuming you want to update the first record
if record:
record.value = "0"
record.save()
return JsonResponse({"status": "success"})
else:
return JsonResponse({"status": "error", "message": "No record found"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)})
def check_restore_value(request):
try:
record = RestoreDatabase.objects.first() # Assuming you want to check the first record
if record:
return JsonResponse({"value": record.value})
else:
return JsonResponse({"status": "error", "message": "No record found"})
except Exception as e:
return JsonResponse({"status": "error", "message": str(e)})
def check_restore_value1(request):
try:
mysql = 1
status = 'success'
except NoCredentialsError as e:
mysql = f"Error fetching mysql.log data: {str(e)}"
status = 'error'
response_data = {
'mysql': mysql,
}
return JsonResponse(response_data)
@api_view(['POST'])
def upload_csv(request):
# Check if the request contains a file
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
# Check if the file is a CSV
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
try:
# Read and process the CSV file
file_data = csv_file.read().decode('utf-8').splitlines()
csv_reader = csv.reader(file_data)
# Example: Iterate through CSV rows and do processing
for row in csv_reader:
# Here, process each row (e.g., save data to the database)
print(row) # Just printing for example, you can replace this with actual logic
return JsonResponse({'message': 'File processed successfully'})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
import csv
from django.http import JsonResponse
from rest_framework.decorators import api_view
@api_view(['POST'])
def malware_bytes_predictions_XGB(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_bytes_predictions_XGB.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_bytes_predictions_SGD(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_bytes_predictions_SGD.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_bytes_predictions_RandomForest(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_bytes_predictions_RandomForest.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_bytes_predictions_KNeighbours(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_bytes_predictions_KNeighbours.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_ASM_predictions_XGB(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_ASM_predictions_XGB.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_ASM_predictions_LogisticRegression(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_ASM_predictions_LogisticRegression.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_ASM_predictions_RandomForest(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_ASM_predictions_RandomForest.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def malware_ASM_predictions_KNeighbours(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
folder_path = os.path.join(settings.MEDIA_ROOT, 'malware_predictions')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
save_path = os.path.join(folder_path, 'latest_malware_ASM_predictions_KNeighbours.csv')
if os.path.exists(save_path):
os.remove(save_path)
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def upload_logs(request):
log_file = request.FILES.get('file')
if not log_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not log_file.name.endswith('.txt'):
return JsonResponse({'error': 'File is not TXT'}, status=400)
# Define the directory and file path where the CSV will be stored
folder_path = os.path.join(settings.MEDIA_ROOT, 'logs')
# Make sure the directory exists
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# Define the path for the file (always named 'latest_ransomware.csv')
save_path = os.path.join(folder_path, 'logs.txt')
# If the file already exists, remove it to ensure overwriting
try:
if os.path.exists(save_path):
os.remove(save_path)
except Exception as e:
print(f"warning: {e}")
# Save the new file
with open(save_path, 'w') as destination:
for chunk in log_file.chunks():
destination.write(f'{datetime.now()} - {chunk}')
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def ransomware_predictions(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
# Define the directory and file path where the CSV will be stored
folder_path = os.path.join(settings.MEDIA_ROOT, 'ransomware_predictions')
# Make sure the directory exists
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# Define the path for the file (always named 'latest_ransomware.csv')
save_path = os.path.join(folder_path, 'latest_ransomware.csv')
# If the file already exists, remove it to ensure overwriting
if os.path.exists(save_path):
os.remove(save_path)
# Save the new file
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def ransomware_type_predictions(request):
try:
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
# Define the directory and file path where the CSV will be stored
folder_path = os.path.join(settings.MEDIA_ROOT, 'ransomware_predictions')
# Make sure the directory exists
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# Define the path for the file (always named 'latest_ransomware.csv')
save_path = os.path.join(folder_path, 'latest_ransomware_type.csv')
# If the file already exists, remove it to ensure overwriting
if os.path.exists(save_path):
os.remove(save_path)
# Save the new file
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
except Exception as e:
print(e)
@api_view(['POST'])
def ddos_predictions(request):
csv_file = request.FILES.get('file')
if not csv_file:
return JsonResponse({'error': 'No file provided'}, status=400)
if not csv_file.name.endswith('.csv'):
return JsonResponse({'error': 'File is not CSV'}, status=400)
# Define the directory and file path where the CSV will be stored
folder_path = os.path.join(settings.MEDIA_ROOT, 'ddos_predictions')
# Make sure the directory exists
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# Define the path for the file (always named 'latest_ransomware.csv')
save_path = os.path.join(folder_path, 'predictions.csv')
# If the file already exists, remove it to ensure overwriting
if os.path.exists(save_path):
os.remove(save_path)
# Save the new file
with open(save_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
return JsonResponse({'message': 'File uploaded and overwritten successfully', 'file_path': save_path})
@api_view(['POST'])
def usage_log(request):
try:
cpu_usage = request.data.get('cpu_usage')
memory_usage = request.data.get('memory_usage')
# Validate the data
if cpu_usage is None or memory_usage is None:
return Response({'error': 'Invalid data'}, status=status.HTTP_400_BAD_REQUEST)
# File path
file_path = 'media/logs/usage_log.txt'
# Read existing data from the file
if os.path.exists(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
else:
lines = []
# Append new data
lines.append(f"{cpu_usage},{memory_usage}\n")
# Keep only the last 5 entries
if len(lines) > 5:
lines = lines[-5:]
# Write back to the file
with open(file_path, 'w') as f:
f.writelines(lines)
return Response({'message': 'Data logged successfully'}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)