""" Report generation services. """ import os import pandas as pd from datetime import datetime, timedelta from django.conf import settings from django.utils import timezone from django.db.models import Avg, Count, Min, Max, Sum from celery import shared_task from .models import Report from apps.analytics.models import Transaction, Forecast from apps.analytics.services import AnalyticsService import logging logger = logging.getLogger(__name__) class ReportGenerationService: """Service for generating various types of reports.""" @staticmethod @shared_task def generate_transaction_summary_report(report_id, area=None, property_type=None, start_date=None, end_date=None, format_type='pdf'): """Generate transaction summary report.""" try: report = Report.objects.get(id=report_id) report.status = 'processing' report.save() # Get transaction data queryset = Transaction.objects.all() if area: queryset = queryset.filter(area_en__icontains=area) if property_type: queryset = queryset.filter(property_type=property_type) if start_date: queryset = queryset.filter(instance_date__gte=start_date) if end_date: queryset = queryset.filter(instance_date__lte=end_date) # Generate report based on format if format_type == 'pdf': file_path = ReportGenerationService._generate_pdf_report( report, queryset, 'transaction_summary' ) elif format_type == 'excel': file_path = ReportGenerationService._generate_excel_report( report, queryset, 'transaction_summary' ) elif format_type == 'csv': file_path = ReportGenerationService._generate_csv_report( report, queryset, 'transaction_summary' ) else: raise ValueError(f"Unsupported format: {format_type}") # Update report with file info report.file_path = file_path report.file_size = os.path.getsize(file_path) report.status = 'completed' report.save() except Exception as e: logger.error(f'Error generating transaction summary report: {e}') report.status = 'failed' report.error_message = str(e) report.save() @staticmethod @shared_task def generate_area_analysis_report(report_id, area, format_type='pdf'): """Generate area analysis report.""" try: report = Report.objects.get(id=report_id) report.status = 'processing' report.save() # Get area data queryset = Transaction.objects.filter(area_en__icontains=area) # Generate report if format_type == 'pdf': file_path = ReportGenerationService._generate_pdf_report( report, queryset, 'area_analysis' ) elif format_type == 'excel': file_path = ReportGenerationService._generate_excel_report( report, queryset, 'area_analysis' ) elif format_type == 'csv': file_path = ReportGenerationService._generate_csv_report( report, queryset, 'area_analysis' ) else: raise ValueError(f"Unsupported format: {format_type}") report.file_path = file_path report.file_size = os.path.getsize(file_path) report.status = 'completed' report.save() except Exception as e: logger.error(f'Error generating area analysis report: {e}') report.status = 'failed' report.error_message = str(e) report.save() @staticmethod @shared_task def generate_forecast_report(report_id, area, property_type, forecast_periods, format_type='pdf'): """Generate forecast report.""" try: report = Report.objects.get(id=report_id) report.status = 'processing' report.save() # Get forecast data forecasts = Forecast.objects.filter( area_en__icontains=area, property_type=property_type ).order_by('forecast_date') # Generate report if format_type == 'pdf': file_path = ReportGenerationService._generate_pdf_forecast_report( report, forecasts, area, property_type ) elif format_type == 'excel': file_path = ReportGenerationService._generate_excel_forecast_report( report, forecasts, area, property_type ) elif format_type == 'csv': file_path = ReportGenerationService._generate_csv_forecast_report( report, forecasts, area, property_type ) else: raise ValueError(f"Unsupported format: {format_type}") report.file_path = file_path report.file_size = os.path.getsize(file_path) report.status = 'completed' report.save() except Exception as e: logger.error(f'Error generating forecast report: {e}') report.status = 'failed' report.error_message = str(e) report.save() @staticmethod def _generate_pdf_report(report, queryset, report_type): """Generate PDF report.""" try: from reportlab.lib.pagesizes import letter, A4 from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib import colors from reportlab.lib.units import inch # Create file path filename = f"report_{report.id}_{report_type}.pdf" file_path = os.path.join(settings.MEDIA_ROOT, 'reports', filename) # Ensure directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Create PDF document doc = SimpleDocTemplate(file_path, pagesize=A4) styles = getSampleStyleSheet() story = [] # Title title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=16, spaceAfter=30, alignment=1 # Center alignment ) story.append(Paragraph(f"{report.title}", title_style)) story.append(Spacer(1, 20)) # Generate data based on report type if report_type == 'transaction_summary': data = ReportGenerationService._get_transaction_summary_data(queryset) elif report_type == 'area_analysis': data = ReportGenerationService._get_area_analysis_data(queryset) else: data = [] # Create table if data: table = Table(data) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 12), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(table) # Build PDF doc.build(story) return file_path except Exception as e: logger.error(f'Error generating PDF report: {e}') raise @staticmethod def _generate_excel_report(report, queryset, report_type): """Generate Excel report.""" try: filename = f"report_{report.id}_{report_type}.xlsx" file_path = os.path.join(settings.MEDIA_ROOT, 'reports', filename) # Ensure directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Create Excel writer with pd.ExcelWriter(file_path, engine='openpyxl') as writer: # Generate data based on report type if report_type == 'transaction_summary': data = ReportGenerationService._get_transaction_summary_data(queryset) elif report_type == 'area_analysis': data = ReportGenerationService._get_area_analysis_data(queryset) else: data = [] if data: # Convert to DataFrame df = pd.DataFrame(data[1:], columns=data[0]) df.to_excel(writer, sheet_name='Summary', index=False) return file_path except Exception as e: logger.error(f'Error generating Excel report: {e}') raise @staticmethod def _generate_csv_report(report, queryset, report_type): """Generate CSV report.""" try: filename = f"report_{report.id}_{report_type}.csv" file_path = os.path.join(settings.MEDIA_ROOT, 'reports', filename) # Ensure directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Generate data based on report type if report_type == 'transaction_summary': data = ReportGenerationService._get_transaction_summary_data(queryset) elif report_type == 'area_analysis': data = ReportGenerationService._get_area_analysis_data(queryset) else: data = [] if data: # Convert to DataFrame and save as CSV df = pd.DataFrame(data[1:], columns=data[0]) df.to_csv(file_path, index=False) return file_path except Exception as e: logger.error(f'Error generating CSV report: {e}') raise @staticmethod def _get_transaction_summary_data(queryset): """Get transaction summary data for reports.""" # Calculate summary statistics stats = queryset.aggregate( total_transactions=Count('id'), total_value=Sum('transaction_value'), average_price=Avg('transaction_value'), min_price=Min('transaction_value'), max_price=Max('transaction_value'), ) # Calculate average price per sqft total_area = queryset.aggregate(total_area=Sum('actual_area'))['total_area'] avg_price_per_sqft = (stats['total_value'] / total_area) if total_area and total_area > 0 else 0 data = [ ['Metric', 'Value'], ['Total Transactions', stats['total_transactions']], ['Total Value', f"AED {stats['total_value']:,.2f}"], ['Average Price', f"AED {stats['average_price']:,.2f}"], ['Min Price', f"AED {stats['min_price']:,.2f}"], ['Max Price', f"AED {stats['max_price']:,.2f}"], ['Average Price per Sqft', f"AED {avg_price_per_sqft:,.2f}"], ] return data @staticmethod def _get_area_analysis_data(queryset): """Get area analysis data for reports.""" # Group by area and calculate statistics area_stats = queryset.values('area_en').annotate( transaction_count=Count('id'), average_price=Avg('transaction_value'), total_value=Sum('transaction_value'), ).order_by('-transaction_count')[:10] data = [['Area', 'Transactions', 'Average Price', 'Total Value']] for stat in area_stats: data.append([ stat['area_en'], stat['transaction_count'], f"AED {stat['average_price']:,.2f}", f"AED {stat['total_value']:,.2f}", ]) return data @staticmethod def _generate_pdf_forecast_report(report, forecasts, area, property_type): """Generate PDF forecast report.""" # Similar to _generate_pdf_report but for forecast data # Implementation would be similar to above pass @staticmethod def _generate_excel_forecast_report(report, forecasts, area, property_type): """Generate Excel forecast report.""" # Similar to _generate_excel_report but for forecast data pass @staticmethod def _generate_csv_forecast_report(report, forecasts, area, property_type): """Generate CSV forecast report.""" # Similar to _generate_csv_report but for forecast data pass