web_defender/Device/views.py
2024-12-10 13:41:54 +05:30

359 lines
14 KiB
Python

from django.shortcuts import render, redirect
from .models import*
from .forms import *
import string
import secrets
from django.shortcuts import render, redirect, get_object_or_404
import requests
from django.shortcuts import render
import mapbox
from django.http import JsonResponse
from django.contrib.auth.decorators import login_required
from django.views.decorators.cache import never_cache
import pytz
from datetime import datetime
import json
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.shortcuts import get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from rest_framework.decorators import api_view
from rest_framework.response import Response
from Dashboard .models import DdosPrediction, Rensomware_TypePrediction, Rensomware_AuditPrediction
from malware .models import MalwarePredictionsDevice
import os
from django.conf import settings
from rest_framework.response import Response
from rest_framework.decorators import api_view
import pandas as pd
from rest_framework import status
from rest_framework.authentication import TokenAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
#=================================================================================================
def generate_random_string(length=15):
characters = string.ascii_letters + string.digits
return ''.join(secrets.choice(characters) for _ in range(length))
from django.db import IntegrityError
from django.forms.utils import ErrorList
@login_required(login_url='login')
@never_cache
def add_device(request):
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
if request.method == 'POST':
form = DevicesForm(request.POST)
if form.is_valid():
try:
random_string = generate_random_string()
form.instance.device_unique_id = random_string
form.save()
return redirect('devices')
except IntegrityError as e:
# Add the integrity error to the form's non-field errors
form._errors.setdefault("__all__", ErrorList()).append(str(e))
else:
form = DevicesForm(initial=request.POST)
return render(request, 'device/add_device.html', {'form': form, "mapbox_access_token": mapbox_access_token})
#=================================================================================================
@login_required(login_url='login')
@never_cache
def edit_device(request, device_unique_id):
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
device = get_object_or_404(Devices, device_unique_id=device_unique_id)
if request.method == 'POST':
form = EditDevicesForm(request.POST, instance=device)
if form.is_valid():
form.save()
return redirect('devices')
else:
form = EditDevicesForm(instance=device)
return render(request, 'device/edit_device.html', {'form': form,"mapbox_access_token": mapbox_access_token})
def device_list(request):
devices = Devices.objects.all()
return render(request, 'device/device_list.html', {'devices': devices})
#=======================================================================================================
def getLocation(request):
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
context = {
'mapbox_access_token': mapbox_access_token
}
return render(request, 'device/getLocation.html', context)
#===================================================================================================
# @never_cache
# def map_view(request):
# mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
# devices = Devices.objects.all()
# device_data = []
# geocoder = mapbox.Geocoder(access_token=mapbox_access_token)
# for device in devices:
# response = geocoder.forward(device.pod)
# lon, lat = response.json()['features'][0]['geometry']['coordinates']
# device_data.append({'name': device.device_name,'pod':device.pod,"timezone":device.timezone, 'device_id':device.device_unique_id, 'lon': lon, 'lat': lat})
# return JsonResponse({'mapbox_access_token': mapbox_access_token, 'device_data': device_data})
@never_cache
def map_view(request):
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
devices = Devices.objects.all()
device_data = []
geocoder = mapbox.Geocoder(access_token=mapbox_access_token)
for device in devices:
if device.pod: # Check if pod is not None
response = geocoder.forward(device.pod)
if response.status_code == 200: # Ensure request was successful
features = response.json().get('features')
if features:
lon, lat = features[0]['geometry']['coordinates']
device_data.append({
'name': device.device_name,
'pod': device.pod,
"timezone": device.timezone,
'device_id': device.device_unique_id,
'lon': lon,
'lat': lat
})
return JsonResponse({'mapbox_access_token': mapbox_access_token, 'device_data': device_data})
#=================================================================================================
@login_required(login_url='login')
@never_cache
def maps(request):
return render(request, 'device/map.html')
#=================================================================================================\
def get_device_ids_by_user_id(user_id):
try:
# Get the UserProfile instance using the user ID
user_profile = UserProfile.objects.get(user__id=user_id)
print('user_profile',user_profile)
# Retrieve all Devices associated with this UserProfile
devices = Devices.objects.filter(used_by=user_profile )
print('devices',devices)
# Get the device IDs
device_ids = [device.id for device in devices]
return device_ids
except UserProfile.DoesNotExist:
return []
@csrf_exempt
@require_http_methods(["PUT"])
# def update_device(request, device_id):
# try:
# # Parse the request body
# data = json.loads(request.body)
# # Example: Access the user ID from the request (you might need to adjust this)
# # You can customize this part based on how your user information is available
# user_id = request.user.id if request.user.is_authenticated else data.get('user_id')
# if not user_id:
# return JsonResponse({"error": "User ID not provided or user not authenticated."}, status=400)
# # Get the device IDs associated with the user
# device_ids = get_device_ids_by_user_id(user_id)
# print("device id",device_ids)
# if int(device_id) not in device_ids:
# return JsonResponse({"error": "Device not associated with this user."}, status=403)
# # Retrieve the device to update
# device = Devices.objects.get(id=device_id)
# # Update the device details (modify as needed based on the fields you're updating)
# device.device_name = data.get('device_name', device.device_name)
# device.operating_system = data.get('operating_system', device.operating_system)
# # Add more fields to update as needed
# device.save()
# return JsonResponse({"message": "Device updated successfully."})
# except Devices.DoesNotExist:
# return JsonResponse({"error": "Device not found."}, status=404)
# except json.JSONDecodeError:
# return JsonResponse({"error": "Invalid JSON payload."}, status=400)
@csrf_exempt
@require_http_methods(["PUT"])
def update_device(request, device_id=None):
try:
# Parse the request body
data = json.loads(request.body)
# Get the user_id from the authenticated request (or passed in data)
user_id = request.user.id if request.user.is_authenticated else data.get('user_id')
if not user_id:
return JsonResponse({"error": "User ID not provided or user not authenticated."}, status=400)
# Get the device IDs associated with the user
device_ids = get_device_ids_by_user_id(user_id)
if not device_ids:
return JsonResponse({"error": "No devices found for this user."}, status=404)
# If device_id is not provided, automatically select the first device ID from the user's devices
if not device_id:
device_id = device_ids[0]
# Retrieve the device to update
device = Devices.objects.get(id=device_id)
# Update the device details
# device.device_name = data.get('device_name', device.device_name)
# device.operating_system = data.get('operating_system', device.operating_system)
device.mac_address = data.get('mac_address', device.mac_address)
device.unique_id = data.get('unique_id', device.unique_id)
device.save()
# Serialize the used_by field if it's a complex object
used_by_data = {
"id": device.used_by.id,
"email": device.used_by.email
} if device.used_by else None
# Prepare the updated device details for the response
updated_device_data = {
"id": device.id,
"mac_address": device.mac_address,
"unique_id": device.unique_id,
"used_by": used_by_data,
"created_at": device.created_at,
"updated_at": device.updated_at
}
# Return a response with the device data
return JsonResponse({
"message": "Device updated successfully.",
"device": updated_device_data,
})
except Devices.DoesNotExist:
return JsonResponse({"error": "Device not found."}, status=404)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON payload."}, status=400)
@api_view(['GET'])
def get_device_data(request, device_id):
# print('Entering get_device_data function')
# Retrieve DDoS, ransomware, and malware data for the given device
ddos_data = DdosPrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
malware_data = MalwarePredictionsDevice.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
ransomware_type_data = Rensomware_TypePrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
ransomware_audit_data = Rensomware_AuditPrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
# print(f"Retrieved ddos_data: {list(ddos_data)}")
# print(f"Retrieved malware_data: {list(malware_data)}")
# print(f"Retrieved ransomware_type_data: {list(ransomware_type_data)}")
# print(f"Retrieved ransomware_audit_data: {list(ransomware_audit_data)}")
# Prepare the response data
response_data = {
'ddos_data': list(ddos_data),
'malware_data': list(malware_data), # Only includes file path and timestamp
'ransomware_type_data': list(ransomware_type_data),
'ransomware_audit_data': list(ransomware_audit_data)
}
# print("Prepared response data:")
# print(response_data)
# print("Exiting get_device_data function")
return Response(response_data)
class CheckDeviceAssociation(APIView):
"""
API to check if the given device is associated with the user based on
user ID, device MAC address, and unique ID, and return the required responses.
"""
# Remove authentication and permission classes
authentication_classes = []
permission_classes = []
def post(self, request):
try:
# Extract the required parameters from the request body
email = request.data.get("email")
mac_address = request.data.get("mac_address")
unique_id = request.data.get("unique_id")
# Validate the presence of required parameters
if not email or not mac_address or not unique_id:
return Response(
{"error": "user_id, mac_address, and unique_id are required."},
status=status.HTTP_400_BAD_REQUEST
)
# Check if the user exists
try:
user_profile = UserProfile.objects.get(email=email)
print(user_profile)
except UserProfile.DoesNotExist:
# User does not exist
return Response(
{"info": "User does not exist. Proceeding to send OTP."},
status=status.HTTP_404_NOT_FOUND
)
# Check if the device exists and is associated with the user
try:
device = Devices.objects.get(
used_by=user_profile,
mac_address=mac_address,
unique_id=unique_id
)
print(device)
# Device is already registered
return Response(
{"message": "Device already registered."},
status=status.HTTP_200_OK
)
except Devices.DoesNotExist:
# User exists but device is not registered
return Response(
{"message": "User exists but device is not registered."},
status=status.HTTP_201_CREATED
)
except Exception as e:
return Response(
{"error": f"An unexpected error occurred: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)