saas-market-analysis-dubai/apps/analytics/services.py
2025-09-17 03:04:22 +05:30

388 lines
16 KiB
Python

"""
Analytics services for data processing and forecasting.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from django.db.models import Avg, Count, Min, Max, Sum
from django.utils import timezone
from .models import Transaction, Forecast, MarketTrend
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.metrics import mean_absolute_error, mean_squared_error
import logging
logger = logging.getLogger(__name__)
class AnalyticsService:
"""Service for analytics calculations and market analysis."""
def get_market_analysis(self, area, property_type):
"""Get comprehensive market analysis for an area and property type."""
try:
# Get transactions for the area and property type
transactions = Transaction.objects.filter(
area_en__icontains=area,
property_type=property_type
).order_by('instance_date')
if not transactions.exists():
return {
'area': area,
'property_type': property_type,
'analysis_period': 'No data available',
'key_metrics': {},
'trends': [],
'recommendations': ['Insufficient data for analysis'],
'forecast_accuracy': None
}
# Calculate key metrics
key_metrics = self._calculate_key_metrics(transactions)
# Analyze trends
trends = self._analyze_trends(transactions)
# Generate recommendations
recommendations = self._generate_recommendations(key_metrics, trends)
# Calculate forecast accuracy if forecasts exist
forecast_accuracy = self._calculate_forecast_accuracy(area, property_type)
return {
'area': area,
'property_type': property_type,
'analysis_period': f"Last {transactions.count()} transactions",
'key_metrics': key_metrics,
'trends': trends,
'recommendations': recommendations,
'forecast_accuracy': forecast_accuracy
}
except Exception as e:
logger.error(f'Error in market analysis: {e}')
raise
def _calculate_key_metrics(self, transactions):
"""Calculate key market metrics."""
total_value = sum(t.transaction_value for t in transactions)
total_area = sum(t.actual_area for t in transactions if t.actual_area)
prices = [t.transaction_value for t in transactions]
areas = [t.actual_area for t in transactions if t.actual_area]
return {
'total_transactions': len(transactions),
'total_value': total_value,
'average_price': np.mean(prices),
'median_price': np.median(prices),
'min_price': min(prices),
'max_price': max(prices),
'average_price_per_sqft': total_value / total_area if total_area > 0 else 0,
'price_volatility': np.std(prices) / np.mean(prices) if np.mean(prices) > 0 else 0,
'average_area': np.mean(areas) if areas else 0,
}
def _analyze_trends(self, transactions):
"""Analyze market trends."""
trends = []
# Price trend over time
if len(transactions) >= 10:
# Split into two periods
mid_point = len(transactions) // 2
recent_transactions = transactions[mid_point:]
older_transactions = transactions[:mid_point]
recent_avg = np.mean([t.transaction_value for t in recent_transactions])
older_avg = np.mean([t.transaction_value for t in older_transactions])
price_change = ((recent_avg - older_avg) / older_avg * 100) if older_avg > 0 else 0
if price_change > 10:
trends.append(f"Strong price growth: {price_change:.1f}% increase")
elif price_change > 5:
trends.append(f"Moderate price growth: {price_change:.1f}% increase")
elif price_change < -10:
trends.append(f"Significant price decline: {price_change:.1f}% decrease")
elif price_change < -5:
trends.append(f"Moderate price decline: {price_change:.1f}% decrease")
else:
trends.append(f"Stable prices: {price_change:.1f}% change")
# Volume trend
if len(transactions) >= 6:
# Check last 3 months vs previous 3 months
three_months_ago = timezone.now() - timedelta(days=90)
six_months_ago = timezone.now() - timedelta(days=180)
recent_count = transactions.filter(instance_date__gte=three_months_ago).count()
previous_count = transactions.filter(
instance_date__gte=six_months_ago,
instance_date__lt=three_months_ago
).count()
if previous_count > 0:
volume_change = ((recent_count - previous_count) / previous_count * 100)
if volume_change > 20:
trends.append(f"High transaction volume: {volume_change:.1f}% increase")
elif volume_change < -20:
trends.append(f"Low transaction volume: {volume_change:.1f}% decrease")
else:
trends.append(f"Stable transaction volume: {volume_change:.1f}% change")
return trends
def _generate_recommendations(self, key_metrics, trends):
"""Generate market recommendations based on metrics and trends."""
recommendations = []
# Price recommendations
avg_price = key_metrics.get('average_price', 0)
volatility = key_metrics.get('price_volatility', 0)
if volatility > 0.3:
recommendations.append("High price volatility - consider market timing carefully")
elif volatility < 0.1:
recommendations.append("Low price volatility - stable market conditions")
# Volume recommendations
total_transactions = key_metrics.get('total_transactions', 0)
if total_transactions < 10:
recommendations.append("Limited transaction data - consider broader area analysis")
elif total_transactions > 100:
recommendations.append("High transaction volume - good market liquidity")
# Trend-based recommendations
for trend in trends:
if "Strong price growth" in trend:
recommendations.append("Consider investing before prices rise further")
elif "Significant price decline" in trend:
recommendations.append("Potential buying opportunity - prices may be undervalued")
elif "High transaction volume" in trend:
recommendations.append("Active market - good time for transactions")
elif "Low transaction volume" in trend:
recommendations.append("Quiet market - consider waiting for better conditions")
return recommendations
def _calculate_forecast_accuracy(self, area, property_type):
"""Calculate forecast accuracy for the area and property type."""
try:
# Get recent forecasts
recent_forecasts = Forecast.objects.filter(
area_en__icontains=area,
property_type=property_type,
forecast_date__gte=timezone.now() - timedelta(days=30)
)
if not recent_forecasts.exists():
return None
# Get actual prices for the forecast period
actual_prices = []
predicted_prices = []
for forecast in recent_forecasts:
actual_transactions = Transaction.objects.filter(
area_en__icontains=area,
property_type=property_type,
instance_date__gte=forecast.forecast_date,
instance_date__lt=forecast.forecast_date + timedelta(days=30)
)
if actual_transactions.exists():
actual_avg = np.mean([t.transaction_value for t in actual_transactions])
actual_prices.append(actual_avg)
predicted_prices.append(float(forecast.predicted_price))
if len(actual_prices) < 3:
return None
# Calculate accuracy metrics
mae = mean_absolute_error(actual_prices, predicted_prices)
mse = mean_squared_error(actual_prices, predicted_prices)
rmse = np.sqrt(mse)
# Calculate percentage accuracy
avg_actual = np.mean(actual_prices)
accuracy = max(0, 100 - (mae / avg_actual * 100)) if avg_actual > 0 else 0
return round(accuracy, 2)
except Exception as e:
logger.error(f'Error calculating forecast accuracy: {e}')
return None
class ForecastingService:
"""Service for property price forecasting."""
def generate_forecast(self, area_en, property_type, property_sub_type='',
forecast_periods=12, confidence_level=0.95):
"""Generate property price forecast."""
try:
# Get historical data
transactions = Transaction.objects.filter(
area_en__icontains=area_en,
property_type=property_type
).order_by('instance_date')
if property_sub_type:
transactions = transactions.filter(property_sub_type=property_sub_type)
if len(transactions) < 10:
raise ValueError("Insufficient data for forecasting")
# Prepare data for forecasting
df = self._prepare_forecast_data(transactions)
# Generate forecast
forecast_data = self._generate_time_series_forecast(
df, forecast_periods, confidence_level
)
# Save forecast to database
self._save_forecast(area_en, property_type, property_sub_type, forecast_data)
return forecast_data
except Exception as e:
logger.error(f'Error generating forecast: {e}')
raise
def _prepare_forecast_data(self, transactions):
"""Prepare transaction data for forecasting."""
data = []
for t in transactions:
data.append({
'date': t.instance_date,
'price': float(t.transaction_value),
'area': float(t.actual_area) if t.actual_area else 0,
'price_per_sqft': float(t.transaction_value / t.actual_area) if t.actual_area and t.actual_area > 0 else 0
})
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
df = df.resample('M').mean().dropna() # Monthly aggregation
return df
def _generate_time_series_forecast(self, df, periods, confidence_level):
"""Generate time series forecast using linear regression."""
try:
# Create time features
df['time_index'] = range(len(df))
df['price_per_sqft'] = df['price'] / df['area'].replace(0, 1) # Avoid division by zero
# Use price per sqft for forecasting
X = df[['time_index']].values
y = df['price_per_sqft'].values
# Fit linear regression model
model = LinearRegression()
model.fit(X, y)
# Generate future time indices
last_time = df['time_index'].iloc[-1]
future_indices = np.arange(last_time + 1, last_time + periods + 1).reshape(-1, 1)
# Make predictions
predictions = model.predict(future_indices)
# Calculate confidence intervals
residuals = y - model.predict(X)
std_error = np.std(residuals)
# Z-score for confidence level
if confidence_level == 0.95:
z_score = 1.96
elif confidence_level == 0.90:
z_score = 1.645
else:
z_score = 1.96
confidence_interval = z_score * std_error
# Generate forecast dates
last_date = df.index[-1]
forecast_dates = pd.date_range(
start=last_date + pd.DateOffset(months=1),
periods=periods,
freq='M'
)
# Prepare results
forecast_data = {
'area': df.index[0].strftime('%Y-%m-%d') + ' to ' + df.index[-1].strftime('%Y-%m-%d'),
'forecast_periods': periods,
'confidence_level': confidence_level,
'model_accuracy': self._calculate_model_accuracy(model, X, y),
'forecasts': []
}
for i, (date, pred) in enumerate(zip(forecast_dates, predictions)):
# Convert back to total price (assuming average area)
avg_area = df['area'].mean()
total_price = pred * avg_area
forecast_data['forecasts'].append({
'date': date.strftime('%Y-%m-%d'),
'predicted_price': round(total_price, 2),
'predicted_price_per_sqft': round(pred, 2),
'confidence_lower': round(total_price - confidence_interval * avg_area, 2),
'confidence_upper': round(total_price + confidence_interval * avg_area, 2),
'confidence_interval': round(confidence_interval * avg_area, 2)
})
return forecast_data
except Exception as e:
logger.error(f'Error in time series forecast: {e}')
raise
def _calculate_model_accuracy(self, model, X, y):
"""Calculate model accuracy metrics."""
predictions = model.predict(X)
mae = mean_absolute_error(y, predictions)
mse = mean_squared_error(y, predictions)
rmse = np.sqrt(mse)
# R-squared
ss_res = np.sum((y - predictions) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
return {
'mae': round(mae, 2),
'mse': round(mse, 2),
'rmse': round(rmse, 2),
'r_squared': round(r_squared, 4)
}
def _save_forecast(self, area_en, property_type, property_sub_type, forecast_data):
"""Save forecast data to database."""
try:
for forecast_item in forecast_data['forecasts']:
Forecast.objects.create(
area_en=area_en,
property_type=property_type,
property_sub_type=property_sub_type,
forecast_date=datetime.strptime(forecast_item['date'], '%Y-%m-%d'),
predicted_price=forecast_item['predicted_price'],
confidence_interval_lower=forecast_item['confidence_lower'],
confidence_interval_upper=forecast_item['confidence_upper'],
model_version='1.0',
accuracy_score=forecast_data['model_accuracy']['r_squared'],
metadata={
'confidence_level': forecast_data['confidence_level'],
'model_accuracy': forecast_data['model_accuracy']
}
)
except Exception as e:
logger.error(f'Error saving forecast: {e}')
# Don't raise exception here as forecast generation can still succeed