359 lines
14 KiB
Python
359 lines
14 KiB
Python
from django.shortcuts import render, redirect
|
|
from .models import*
|
|
from .forms import *
|
|
import string
|
|
import secrets
|
|
from django.shortcuts import render, redirect, get_object_or_404
|
|
import requests
|
|
from django.shortcuts import render
|
|
import mapbox
|
|
from django.http import JsonResponse
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.views.decorators.cache import never_cache
|
|
import pytz
|
|
from datetime import datetime
|
|
import json
|
|
from django.http import JsonResponse
|
|
from django.views.decorators.http import require_http_methods
|
|
from django.shortcuts import get_object_or_404
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from rest_framework.decorators import api_view
|
|
from rest_framework.response import Response
|
|
from Dashboard .models import DdosPrediction, Rensomware_TypePrediction, Rensomware_AuditPrediction
|
|
from malware .models import MalwarePredictionsDevice
|
|
import os
|
|
from django.conf import settings
|
|
from rest_framework.response import Response
|
|
from rest_framework.decorators import api_view
|
|
import pandas as pd
|
|
from rest_framework import status
|
|
from rest_framework.authentication import TokenAuthentication
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
|
|
|
|
#=================================================================================================
|
|
|
|
def generate_random_string(length=15):
|
|
characters = string.ascii_letters + string.digits
|
|
return ''.join(secrets.choice(characters) for _ in range(length))
|
|
|
|
from django.db import IntegrityError
|
|
|
|
from django.forms.utils import ErrorList
|
|
|
|
@login_required(login_url='login')
|
|
@never_cache
|
|
def add_device(request):
|
|
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
|
|
if request.method == 'POST':
|
|
form = DevicesForm(request.POST)
|
|
if form.is_valid():
|
|
try:
|
|
random_string = generate_random_string()
|
|
form.instance.device_unique_id = random_string
|
|
form.save()
|
|
return redirect('devices')
|
|
except IntegrityError as e:
|
|
# Add the integrity error to the form's non-field errors
|
|
form._errors.setdefault("__all__", ErrorList()).append(str(e))
|
|
else:
|
|
form = DevicesForm(initial=request.POST)
|
|
return render(request, 'device/add_device.html', {'form': form, "mapbox_access_token": mapbox_access_token})
|
|
|
|
#=================================================================================================
|
|
|
|
|
|
@login_required(login_url='login')
|
|
@never_cache
|
|
def edit_device(request, device_unique_id):
|
|
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
|
|
device = get_object_or_404(Devices, device_unique_id=device_unique_id)
|
|
if request.method == 'POST':
|
|
form = EditDevicesForm(request.POST, instance=device)
|
|
if form.is_valid():
|
|
form.save()
|
|
return redirect('devices')
|
|
else:
|
|
form = EditDevicesForm(instance=device)
|
|
return render(request, 'device/edit_device.html', {'form': form,"mapbox_access_token": mapbox_access_token})
|
|
|
|
def device_list(request):
|
|
devices = Devices.objects.all()
|
|
return render(request, 'device/device_list.html', {'devices': devices})
|
|
|
|
|
|
|
|
#=======================================================================================================
|
|
|
|
|
|
def getLocation(request):
|
|
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
|
|
context = {
|
|
'mapbox_access_token': mapbox_access_token
|
|
}
|
|
return render(request, 'device/getLocation.html', context)
|
|
|
|
|
|
#===================================================================================================
|
|
# @never_cache
|
|
# def map_view(request):
|
|
# mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
|
|
# devices = Devices.objects.all()
|
|
# device_data = []
|
|
# geocoder = mapbox.Geocoder(access_token=mapbox_access_token)
|
|
# for device in devices:
|
|
# response = geocoder.forward(device.pod)
|
|
# lon, lat = response.json()['features'][0]['geometry']['coordinates']
|
|
# device_data.append({'name': device.device_name,'pod':device.pod,"timezone":device.timezone, 'device_id':device.device_unique_id, 'lon': lon, 'lat': lat})
|
|
|
|
# return JsonResponse({'mapbox_access_token': mapbox_access_token, 'device_data': device_data})
|
|
|
|
@never_cache
|
|
def map_view(request):
|
|
mapbox_access_token = 'pk.eyJ1IjoiZmxleHhvbiIsImEiOiJjbHVtYzNoM2cwNXI2MnFveW51c2tyejVwIn0.ceqt6Ot6nU67CUmxVAWPEQ'
|
|
devices = Devices.objects.all()
|
|
device_data = []
|
|
geocoder = mapbox.Geocoder(access_token=mapbox_access_token)
|
|
|
|
for device in devices:
|
|
if device.pod: # Check if pod is not None
|
|
response = geocoder.forward(device.pod)
|
|
if response.status_code == 200: # Ensure request was successful
|
|
features = response.json().get('features')
|
|
if features:
|
|
lon, lat = features[0]['geometry']['coordinates']
|
|
device_data.append({
|
|
'name': device.device_name,
|
|
'pod': device.pod,
|
|
"timezone": device.timezone,
|
|
'device_id': device.device_unique_id,
|
|
'lon': lon,
|
|
'lat': lat
|
|
})
|
|
|
|
return JsonResponse({'mapbox_access_token': mapbox_access_token, 'device_data': device_data})
|
|
|
|
|
|
#=================================================================================================
|
|
@login_required(login_url='login')
|
|
@never_cache
|
|
def maps(request):
|
|
return render(request, 'device/map.html')
|
|
|
|
#=================================================================================================\
|
|
def get_device_ids_by_user_id(user_id):
|
|
try:
|
|
# Get the UserProfile instance using the user ID
|
|
user_profile = UserProfile.objects.get(user__id=user_id)
|
|
print('user_profile',user_profile)
|
|
|
|
# Retrieve all Devices associated with this UserProfile
|
|
devices = Devices.objects.filter(used_by=user_profile )
|
|
print('devices',devices)
|
|
|
|
# Get the device IDs
|
|
device_ids = [device.id for device in devices]
|
|
return device_ids
|
|
except UserProfile.DoesNotExist:
|
|
return []
|
|
|
|
|
|
|
|
|
|
@csrf_exempt
|
|
@require_http_methods(["PUT"])
|
|
# def update_device(request, device_id):
|
|
# try:
|
|
# # Parse the request body
|
|
# data = json.loads(request.body)
|
|
|
|
# # Example: Access the user ID from the request (you might need to adjust this)
|
|
# # You can customize this part based on how your user information is available
|
|
# user_id = request.user.id if request.user.is_authenticated else data.get('user_id')
|
|
|
|
# if not user_id:
|
|
# return JsonResponse({"error": "User ID not provided or user not authenticated."}, status=400)
|
|
|
|
# # Get the device IDs associated with the user
|
|
# device_ids = get_device_ids_by_user_id(user_id)
|
|
# print("device id",device_ids)
|
|
|
|
# if int(device_id) not in device_ids:
|
|
# return JsonResponse({"error": "Device not associated with this user."}, status=403)
|
|
|
|
# # Retrieve the device to update
|
|
# device = Devices.objects.get(id=device_id)
|
|
|
|
# # Update the device details (modify as needed based on the fields you're updating)
|
|
# device.device_name = data.get('device_name', device.device_name)
|
|
# device.operating_system = data.get('operating_system', device.operating_system)
|
|
# # Add more fields to update as needed
|
|
# device.save()
|
|
|
|
# return JsonResponse({"message": "Device updated successfully."})
|
|
# except Devices.DoesNotExist:
|
|
# return JsonResponse({"error": "Device not found."}, status=404)
|
|
# except json.JSONDecodeError:
|
|
# return JsonResponse({"error": "Invalid JSON payload."}, status=400)
|
|
@csrf_exempt
|
|
@require_http_methods(["PUT"])
|
|
def update_device(request, device_id=None):
|
|
try:
|
|
# Parse the request body
|
|
data = json.loads(request.body)
|
|
|
|
# Get the user_id from the authenticated request (or passed in data)
|
|
user_id = request.user.id if request.user.is_authenticated else data.get('user_id')
|
|
|
|
if not user_id:
|
|
return JsonResponse({"error": "User ID not provided or user not authenticated."}, status=400)
|
|
|
|
# Get the device IDs associated with the user
|
|
device_ids = get_device_ids_by_user_id(user_id)
|
|
|
|
if not device_ids:
|
|
return JsonResponse({"error": "No devices found for this user."}, status=404)
|
|
|
|
# If device_id is not provided, automatically select the first device ID from the user's devices
|
|
if not device_id:
|
|
device_id = device_ids[0]
|
|
|
|
# Retrieve the device to update
|
|
device = Devices.objects.get(id=device_id)
|
|
|
|
# Update the device details
|
|
# device.device_name = data.get('device_name', device.device_name)
|
|
# device.operating_system = data.get('operating_system', device.operating_system)
|
|
device.mac_address = data.get('mac_address', device.mac_address)
|
|
device.unique_id = data.get('unique_id', device.unique_id)
|
|
device.save()
|
|
|
|
# Serialize the used_by field if it's a complex object
|
|
used_by_data = {
|
|
"id": device.used_by.id,
|
|
|
|
"email": device.used_by.email
|
|
} if device.used_by else None
|
|
|
|
# Prepare the updated device details for the response
|
|
updated_device_data = {
|
|
"id": device.id,
|
|
"mac_address": device.mac_address,
|
|
"unique_id": device.unique_id,
|
|
"used_by": used_by_data,
|
|
"created_at": device.created_at,
|
|
"updated_at": device.updated_at
|
|
}
|
|
|
|
# Return a response with the device data
|
|
return JsonResponse({
|
|
"message": "Device updated successfully.",
|
|
"device": updated_device_data,
|
|
})
|
|
|
|
except Devices.DoesNotExist:
|
|
return JsonResponse({"error": "Device not found."}, status=404)
|
|
|
|
except json.JSONDecodeError:
|
|
return JsonResponse({"error": "Invalid JSON payload."}, status=400)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@api_view(['GET'])
|
|
def get_device_data(request, device_id):
|
|
# print('Entering get_device_data function')
|
|
|
|
# Retrieve DDoS, ransomware, and malware data for the given device
|
|
ddos_data = DdosPrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
|
|
malware_data = MalwarePredictionsDevice.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
|
|
ransomware_type_data = Rensomware_TypePrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
|
|
ransomware_audit_data = Rensomware_AuditPrediction.objects.filter(device_id=device_id).values('file_path', 'uploaded_at')
|
|
|
|
# print(f"Retrieved ddos_data: {list(ddos_data)}")
|
|
# print(f"Retrieved malware_data: {list(malware_data)}")
|
|
# print(f"Retrieved ransomware_type_data: {list(ransomware_type_data)}")
|
|
# print(f"Retrieved ransomware_audit_data: {list(ransomware_audit_data)}")
|
|
|
|
# Prepare the response data
|
|
response_data = {
|
|
'ddos_data': list(ddos_data),
|
|
'malware_data': list(malware_data), # Only includes file path and timestamp
|
|
'ransomware_type_data': list(ransomware_type_data),
|
|
'ransomware_audit_data': list(ransomware_audit_data)
|
|
}
|
|
|
|
# print("Prepared response data:")
|
|
# print(response_data)
|
|
# print("Exiting get_device_data function")
|
|
|
|
return Response(response_data)
|
|
|
|
|
|
|
|
|
|
class CheckDeviceAssociation(APIView):
|
|
"""
|
|
API to check if the given device is associated with the user based on
|
|
user ID, device MAC address, and unique ID, and return the required responses.
|
|
"""
|
|
|
|
# Remove authentication and permission classes
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
|
|
def post(self, request):
|
|
try:
|
|
# Extract the required parameters from the request body
|
|
email = request.data.get("email")
|
|
mac_address = request.data.get("mac_address")
|
|
unique_id = request.data.get("unique_id")
|
|
|
|
# Validate the presence of required parameters
|
|
if not email or not mac_address or not unique_id:
|
|
return Response(
|
|
{"error": "user_id, mac_address, and unique_id are required."},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Check if the user exists
|
|
try:
|
|
user_profile = UserProfile.objects.get(email=email)
|
|
print(user_profile)
|
|
except UserProfile.DoesNotExist:
|
|
# User does not exist
|
|
return Response(
|
|
{"info": "User does not exist. Proceeding to send OTP."},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Check if the device exists and is associated with the user
|
|
try:
|
|
device = Devices.objects.get(
|
|
used_by=user_profile,
|
|
mac_address=mac_address,
|
|
unique_id=unique_id
|
|
)
|
|
print(device)
|
|
# Device is already registered
|
|
return Response(
|
|
{"message": "Device already registered."},
|
|
status=status.HTTP_200_OK
|
|
)
|
|
except Devices.DoesNotExist:
|
|
# User exists but device is not registered
|
|
return Response(
|
|
{"message": "User exists but device is not registered."},
|
|
status=status.HTTP_201_CREATED
|
|
)
|
|
|
|
except Exception as e:
|
|
return Response(
|
|
{"error": f"An unexpected error occurred: {str(e)}"},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
) |