""" Analytics services for data processing and forecasting. """ import pandas as pd import numpy as np from datetime import datetime, timedelta from django.db.models import Avg, Count, Min, Max, Sum from django.utils import timezone from .models import Transaction, Forecast, MarketTrend from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures from sklearn.metrics import mean_absolute_error, mean_squared_error import logging logger = logging.getLogger(__name__) class AnalyticsService: """Service for analytics calculations and market analysis.""" def get_market_analysis(self, area, property_type): """Get comprehensive market analysis for an area and property type.""" try: # Get transactions for the area and property type transactions = Transaction.objects.filter( area_en__icontains=area, property_type=property_type ).order_by('instance_date') if not transactions.exists(): return { 'area': area, 'property_type': property_type, 'analysis_period': 'No data available', 'key_metrics': {}, 'trends': [], 'recommendations': ['Insufficient data for analysis'], 'forecast_accuracy': None } # Calculate key metrics key_metrics = self._calculate_key_metrics(transactions) # Analyze trends trends = self._analyze_trends(transactions) # Generate recommendations recommendations = self._generate_recommendations(key_metrics, trends) # Calculate forecast accuracy if forecasts exist forecast_accuracy = self._calculate_forecast_accuracy(area, property_type) return { 'area': area, 'property_type': property_type, 'analysis_period': f"Last {transactions.count()} transactions", 'key_metrics': key_metrics, 'trends': trends, 'recommendations': recommendations, 'forecast_accuracy': forecast_accuracy } except Exception as e: logger.error(f'Error in market analysis: {e}') raise def _calculate_key_metrics(self, transactions): """Calculate key market metrics.""" total_value = sum(t.transaction_value for t in transactions) total_area = sum(t.actual_area for t in transactions if t.actual_area) prices = [t.transaction_value for t in transactions] areas = [t.actual_area for t in transactions if t.actual_area] return { 'total_transactions': len(transactions), 'total_value': total_value, 'average_price': np.mean(prices), 'median_price': np.median(prices), 'min_price': min(prices), 'max_price': max(prices), 'average_price_per_sqft': total_value / total_area if total_area > 0 else 0, 'price_volatility': np.std(prices) / np.mean(prices) if np.mean(prices) > 0 else 0, 'average_area': np.mean(areas) if areas else 0, } def _analyze_trends(self, transactions): """Analyze market trends.""" trends = [] # Price trend over time if len(transactions) >= 10: # Split into two periods mid_point = len(transactions) // 2 recent_transactions = transactions[mid_point:] older_transactions = transactions[:mid_point] recent_avg = np.mean([t.transaction_value for t in recent_transactions]) older_avg = np.mean([t.transaction_value for t in older_transactions]) price_change = ((recent_avg - older_avg) / older_avg * 100) if older_avg > 0 else 0 if price_change > 10: trends.append(f"Strong price growth: {price_change:.1f}% increase") elif price_change > 5: trends.append(f"Moderate price growth: {price_change:.1f}% increase") elif price_change < -10: trends.append(f"Significant price decline: {price_change:.1f}% decrease") elif price_change < -5: trends.append(f"Moderate price decline: {price_change:.1f}% decrease") else: trends.append(f"Stable prices: {price_change:.1f}% change") # Volume trend if len(transactions) >= 6: # Check last 3 months vs previous 3 months three_months_ago = timezone.now() - timedelta(days=90) six_months_ago = timezone.now() - timedelta(days=180) recent_count = transactions.filter(instance_date__gte=three_months_ago).count() previous_count = transactions.filter( instance_date__gte=six_months_ago, instance_date__lt=three_months_ago ).count() if previous_count > 0: volume_change = ((recent_count - previous_count) / previous_count * 100) if volume_change > 20: trends.append(f"High transaction volume: {volume_change:.1f}% increase") elif volume_change < -20: trends.append(f"Low transaction volume: {volume_change:.1f}% decrease") else: trends.append(f"Stable transaction volume: {volume_change:.1f}% change") return trends def _generate_recommendations(self, key_metrics, trends): """Generate market recommendations based on metrics and trends.""" recommendations = [] # Price recommendations avg_price = key_metrics.get('average_price', 0) volatility = key_metrics.get('price_volatility', 0) if volatility > 0.3: recommendations.append("High price volatility - consider market timing carefully") elif volatility < 0.1: recommendations.append("Low price volatility - stable market conditions") # Volume recommendations total_transactions = key_metrics.get('total_transactions', 0) if total_transactions < 10: recommendations.append("Limited transaction data - consider broader area analysis") elif total_transactions > 100: recommendations.append("High transaction volume - good market liquidity") # Trend-based recommendations for trend in trends: if "Strong price growth" in trend: recommendations.append("Consider investing before prices rise further") elif "Significant price decline" in trend: recommendations.append("Potential buying opportunity - prices may be undervalued") elif "High transaction volume" in trend: recommendations.append("Active market - good time for transactions") elif "Low transaction volume" in trend: recommendations.append("Quiet market - consider waiting for better conditions") return recommendations def _calculate_forecast_accuracy(self, area, property_type): """Calculate forecast accuracy for the area and property type.""" try: # Get recent forecasts recent_forecasts = Forecast.objects.filter( area_en__icontains=area, property_type=property_type, forecast_date__gte=timezone.now() - timedelta(days=30) ) if not recent_forecasts.exists(): return None # Get actual prices for the forecast period actual_prices = [] predicted_prices = [] for forecast in recent_forecasts: actual_transactions = Transaction.objects.filter( area_en__icontains=area, property_type=property_type, instance_date__gte=forecast.forecast_date, instance_date__lt=forecast.forecast_date + timedelta(days=30) ) if actual_transactions.exists(): actual_avg = np.mean([t.transaction_value for t in actual_transactions]) actual_prices.append(actual_avg) predicted_prices.append(float(forecast.predicted_price)) if len(actual_prices) < 3: return None # Calculate accuracy metrics mae = mean_absolute_error(actual_prices, predicted_prices) mse = mean_squared_error(actual_prices, predicted_prices) rmse = np.sqrt(mse) # Calculate percentage accuracy avg_actual = np.mean(actual_prices) accuracy = max(0, 100 - (mae / avg_actual * 100)) if avg_actual > 0 else 0 return round(accuracy, 2) except Exception as e: logger.error(f'Error calculating forecast accuracy: {e}') return None class ForecastingService: """Service for property price forecasting.""" def generate_forecast(self, area_en, property_type, property_sub_type='', forecast_periods=12, confidence_level=0.95): """Generate property price forecast.""" try: # Get historical data transactions = Transaction.objects.filter( area_en__icontains=area_en, property_type=property_type ).order_by('instance_date') if property_sub_type: transactions = transactions.filter(property_sub_type=property_sub_type) if len(transactions) < 10: raise ValueError("Insufficient data for forecasting") # Prepare data for forecasting df = self._prepare_forecast_data(transactions) # Generate forecast forecast_data = self._generate_time_series_forecast( df, forecast_periods, confidence_level ) # Save forecast to database self._save_forecast(area_en, property_type, property_sub_type, forecast_data) return forecast_data except Exception as e: logger.error(f'Error generating forecast: {e}') raise def _prepare_forecast_data(self, transactions): """Prepare transaction data for forecasting.""" data = [] for t in transactions: data.append({ 'date': t.instance_date, 'price': float(t.transaction_value), 'area': float(t.actual_area) if t.actual_area else 0, 'price_per_sqft': float(t.transaction_value / t.actual_area) if t.actual_area and t.actual_area > 0 else 0 }) df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date') df = df.resample('M').mean().dropna() # Monthly aggregation return df def _generate_time_series_forecast(self, df, periods, confidence_level): """Generate time series forecast using linear regression.""" try: # Create time features df['time_index'] = range(len(df)) df['price_per_sqft'] = df['price'] / df['area'].replace(0, 1) # Avoid division by zero # Use price per sqft for forecasting X = df[['time_index']].values y = df['price_per_sqft'].values # Fit linear regression model model = LinearRegression() model.fit(X, y) # Generate future time indices last_time = df['time_index'].iloc[-1] future_indices = np.arange(last_time + 1, last_time + periods + 1).reshape(-1, 1) # Make predictions predictions = model.predict(future_indices) # Calculate confidence intervals residuals = y - model.predict(X) std_error = np.std(residuals) # Z-score for confidence level if confidence_level == 0.95: z_score = 1.96 elif confidence_level == 0.90: z_score = 1.645 else: z_score = 1.96 confidence_interval = z_score * std_error # Generate forecast dates last_date = df.index[-1] forecast_dates = pd.date_range( start=last_date + pd.DateOffset(months=1), periods=periods, freq='M' ) # Prepare results forecast_data = { 'area': df.index[0].strftime('%Y-%m-%d') + ' to ' + df.index[-1].strftime('%Y-%m-%d'), 'forecast_periods': periods, 'confidence_level': confidence_level, 'model_accuracy': self._calculate_model_accuracy(model, X, y), 'forecasts': [] } for i, (date, pred) in enumerate(zip(forecast_dates, predictions)): # Convert back to total price (assuming average area) avg_area = df['area'].mean() total_price = pred * avg_area forecast_data['forecasts'].append({ 'date': date.strftime('%Y-%m-%d'), 'predicted_price': round(total_price, 2), 'predicted_price_per_sqft': round(pred, 2), 'confidence_lower': round(total_price - confidence_interval * avg_area, 2), 'confidence_upper': round(total_price + confidence_interval * avg_area, 2), 'confidence_interval': round(confidence_interval * avg_area, 2) }) return forecast_data except Exception as e: logger.error(f'Error in time series forecast: {e}') raise def _calculate_model_accuracy(self, model, X, y): """Calculate model accuracy metrics.""" predictions = model.predict(X) mae = mean_absolute_error(y, predictions) mse = mean_squared_error(y, predictions) rmse = np.sqrt(mse) # R-squared ss_res = np.sum((y - predictions) ** 2) ss_tot = np.sum((y - np.mean(y)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 return { 'mae': round(mae, 2), 'mse': round(mse, 2), 'rmse': round(rmse, 2), 'r_squared': round(r_squared, 4) } def _save_forecast(self, area_en, property_type, property_sub_type, forecast_data): """Save forecast data to database.""" try: for forecast_item in forecast_data['forecasts']: Forecast.objects.create( area_en=area_en, property_type=property_type, property_sub_type=property_sub_type, forecast_date=datetime.strptime(forecast_item['date'], '%Y-%m-%d'), predicted_price=forecast_item['predicted_price'], confidence_interval_lower=forecast_item['confidence_lower'], confidence_interval_upper=forecast_item['confidence_upper'], model_version='1.0', accuracy_score=forecast_data['model_accuracy']['r_squared'], metadata={ 'confidence_level': forecast_data['confidence_level'], 'model_accuracy': forecast_data['model_accuracy'] } ) except Exception as e: logger.error(f'Error saving forecast: {e}') # Don't raise exception here as forecast generation can still succeed