388 lines
16 KiB
Python
388 lines
16 KiB
Python
"""
|
|
Analytics services for data processing and forecasting.
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from django.db.models import Avg, Count, Min, Max, Sum
|
|
from django.utils import timezone
|
|
from .models import Transaction, Forecast, MarketTrend
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.preprocessing import PolynomialFeatures
|
|
from sklearn.metrics import mean_absolute_error, mean_squared_error
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AnalyticsService:
|
|
"""Service for analytics calculations and market analysis."""
|
|
|
|
def get_market_analysis(self, area, property_type):
|
|
"""Get comprehensive market analysis for an area and property type."""
|
|
try:
|
|
# Get transactions for the area and property type
|
|
transactions = Transaction.objects.filter(
|
|
area_en__icontains=area,
|
|
property_type=property_type
|
|
).order_by('instance_date')
|
|
|
|
if not transactions.exists():
|
|
return {
|
|
'area': area,
|
|
'property_type': property_type,
|
|
'analysis_period': 'No data available',
|
|
'key_metrics': {},
|
|
'trends': [],
|
|
'recommendations': ['Insufficient data for analysis'],
|
|
'forecast_accuracy': None
|
|
}
|
|
|
|
# Calculate key metrics
|
|
key_metrics = self._calculate_key_metrics(transactions)
|
|
|
|
# Analyze trends
|
|
trends = self._analyze_trends(transactions)
|
|
|
|
# Generate recommendations
|
|
recommendations = self._generate_recommendations(key_metrics, trends)
|
|
|
|
# Calculate forecast accuracy if forecasts exist
|
|
forecast_accuracy = self._calculate_forecast_accuracy(area, property_type)
|
|
|
|
return {
|
|
'area': area,
|
|
'property_type': property_type,
|
|
'analysis_period': f"Last {transactions.count()} transactions",
|
|
'key_metrics': key_metrics,
|
|
'trends': trends,
|
|
'recommendations': recommendations,
|
|
'forecast_accuracy': forecast_accuracy
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error in market analysis: {e}')
|
|
raise
|
|
|
|
def _calculate_key_metrics(self, transactions):
|
|
"""Calculate key market metrics."""
|
|
total_value = sum(t.transaction_value for t in transactions)
|
|
total_area = sum(t.actual_area for t in transactions if t.actual_area)
|
|
|
|
prices = [t.transaction_value for t in transactions]
|
|
areas = [t.actual_area for t in transactions if t.actual_area]
|
|
|
|
return {
|
|
'total_transactions': len(transactions),
|
|
'total_value': total_value,
|
|
'average_price': np.mean(prices),
|
|
'median_price': np.median(prices),
|
|
'min_price': min(prices),
|
|
'max_price': max(prices),
|
|
'average_price_per_sqft': total_value / total_area if total_area > 0 else 0,
|
|
'price_volatility': np.std(prices) / np.mean(prices) if np.mean(prices) > 0 else 0,
|
|
'average_area': np.mean(areas) if areas else 0,
|
|
}
|
|
|
|
def _analyze_trends(self, transactions):
|
|
"""Analyze market trends."""
|
|
trends = []
|
|
|
|
# Price trend over time
|
|
if len(transactions) >= 10:
|
|
# Split into two periods
|
|
mid_point = len(transactions) // 2
|
|
recent_transactions = transactions[mid_point:]
|
|
older_transactions = transactions[:mid_point]
|
|
|
|
recent_avg = np.mean([t.transaction_value for t in recent_transactions])
|
|
older_avg = np.mean([t.transaction_value for t in older_transactions])
|
|
|
|
price_change = ((recent_avg - older_avg) / older_avg * 100) if older_avg > 0 else 0
|
|
|
|
if price_change > 10:
|
|
trends.append(f"Strong price growth: {price_change:.1f}% increase")
|
|
elif price_change > 5:
|
|
trends.append(f"Moderate price growth: {price_change:.1f}% increase")
|
|
elif price_change < -10:
|
|
trends.append(f"Significant price decline: {price_change:.1f}% decrease")
|
|
elif price_change < -5:
|
|
trends.append(f"Moderate price decline: {price_change:.1f}% decrease")
|
|
else:
|
|
trends.append(f"Stable prices: {price_change:.1f}% change")
|
|
|
|
# Volume trend
|
|
if len(transactions) >= 6:
|
|
# Check last 3 months vs previous 3 months
|
|
three_months_ago = timezone.now() - timedelta(days=90)
|
|
six_months_ago = timezone.now() - timedelta(days=180)
|
|
|
|
recent_count = transactions.filter(instance_date__gte=three_months_ago).count()
|
|
previous_count = transactions.filter(
|
|
instance_date__gte=six_months_ago,
|
|
instance_date__lt=three_months_ago
|
|
).count()
|
|
|
|
if previous_count > 0:
|
|
volume_change = ((recent_count - previous_count) / previous_count * 100)
|
|
if volume_change > 20:
|
|
trends.append(f"High transaction volume: {volume_change:.1f}% increase")
|
|
elif volume_change < -20:
|
|
trends.append(f"Low transaction volume: {volume_change:.1f}% decrease")
|
|
else:
|
|
trends.append(f"Stable transaction volume: {volume_change:.1f}% change")
|
|
|
|
return trends
|
|
|
|
def _generate_recommendations(self, key_metrics, trends):
|
|
"""Generate market recommendations based on metrics and trends."""
|
|
recommendations = []
|
|
|
|
# Price recommendations
|
|
avg_price = key_metrics.get('average_price', 0)
|
|
volatility = key_metrics.get('price_volatility', 0)
|
|
|
|
if volatility > 0.3:
|
|
recommendations.append("High price volatility - consider market timing carefully")
|
|
elif volatility < 0.1:
|
|
recommendations.append("Low price volatility - stable market conditions")
|
|
|
|
# Volume recommendations
|
|
total_transactions = key_metrics.get('total_transactions', 0)
|
|
if total_transactions < 10:
|
|
recommendations.append("Limited transaction data - consider broader area analysis")
|
|
elif total_transactions > 100:
|
|
recommendations.append("High transaction volume - good market liquidity")
|
|
|
|
# Trend-based recommendations
|
|
for trend in trends:
|
|
if "Strong price growth" in trend:
|
|
recommendations.append("Consider investing before prices rise further")
|
|
elif "Significant price decline" in trend:
|
|
recommendations.append("Potential buying opportunity - prices may be undervalued")
|
|
elif "High transaction volume" in trend:
|
|
recommendations.append("Active market - good time for transactions")
|
|
elif "Low transaction volume" in trend:
|
|
recommendations.append("Quiet market - consider waiting for better conditions")
|
|
|
|
return recommendations
|
|
|
|
def _calculate_forecast_accuracy(self, area, property_type):
|
|
"""Calculate forecast accuracy for the area and property type."""
|
|
try:
|
|
# Get recent forecasts
|
|
recent_forecasts = Forecast.objects.filter(
|
|
area_en__icontains=area,
|
|
property_type=property_type,
|
|
forecast_date__gte=timezone.now() - timedelta(days=30)
|
|
)
|
|
|
|
if not recent_forecasts.exists():
|
|
return None
|
|
|
|
# Get actual prices for the forecast period
|
|
actual_prices = []
|
|
predicted_prices = []
|
|
|
|
for forecast in recent_forecasts:
|
|
actual_transactions = Transaction.objects.filter(
|
|
area_en__icontains=area,
|
|
property_type=property_type,
|
|
instance_date__gte=forecast.forecast_date,
|
|
instance_date__lt=forecast.forecast_date + timedelta(days=30)
|
|
)
|
|
|
|
if actual_transactions.exists():
|
|
actual_avg = np.mean([t.transaction_value for t in actual_transactions])
|
|
actual_prices.append(actual_avg)
|
|
predicted_prices.append(float(forecast.predicted_price))
|
|
|
|
if len(actual_prices) < 3:
|
|
return None
|
|
|
|
# Calculate accuracy metrics
|
|
mae = mean_absolute_error(actual_prices, predicted_prices)
|
|
mse = mean_squared_error(actual_prices, predicted_prices)
|
|
rmse = np.sqrt(mse)
|
|
|
|
# Calculate percentage accuracy
|
|
avg_actual = np.mean(actual_prices)
|
|
accuracy = max(0, 100 - (mae / avg_actual * 100)) if avg_actual > 0 else 0
|
|
|
|
return round(accuracy, 2)
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error calculating forecast accuracy: {e}')
|
|
return None
|
|
|
|
|
|
class ForecastingService:
|
|
"""Service for property price forecasting."""
|
|
|
|
def generate_forecast(self, area_en, property_type, property_sub_type='',
|
|
forecast_periods=12, confidence_level=0.95):
|
|
"""Generate property price forecast."""
|
|
try:
|
|
# Get historical data
|
|
transactions = Transaction.objects.filter(
|
|
area_en__icontains=area_en,
|
|
property_type=property_type
|
|
).order_by('instance_date')
|
|
|
|
if property_sub_type:
|
|
transactions = transactions.filter(property_sub_type=property_sub_type)
|
|
|
|
if len(transactions) < 10:
|
|
raise ValueError("Insufficient data for forecasting")
|
|
|
|
# Prepare data for forecasting
|
|
df = self._prepare_forecast_data(transactions)
|
|
|
|
# Generate forecast
|
|
forecast_data = self._generate_time_series_forecast(
|
|
df, forecast_periods, confidence_level
|
|
)
|
|
|
|
# Save forecast to database
|
|
self._save_forecast(area_en, property_type, property_sub_type, forecast_data)
|
|
|
|
return forecast_data
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error generating forecast: {e}')
|
|
raise
|
|
|
|
def _prepare_forecast_data(self, transactions):
|
|
"""Prepare transaction data for forecasting."""
|
|
data = []
|
|
for t in transactions:
|
|
data.append({
|
|
'date': t.instance_date,
|
|
'price': float(t.transaction_value),
|
|
'area': float(t.actual_area) if t.actual_area else 0,
|
|
'price_per_sqft': float(t.transaction_value / t.actual_area) if t.actual_area and t.actual_area > 0 else 0
|
|
})
|
|
|
|
df = pd.DataFrame(data)
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
df = df.set_index('date')
|
|
df = df.resample('M').mean().dropna() # Monthly aggregation
|
|
|
|
return df
|
|
|
|
def _generate_time_series_forecast(self, df, periods, confidence_level):
|
|
"""Generate time series forecast using linear regression."""
|
|
try:
|
|
# Create time features
|
|
df['time_index'] = range(len(df))
|
|
df['price_per_sqft'] = df['price'] / df['area'].replace(0, 1) # Avoid division by zero
|
|
|
|
# Use price per sqft for forecasting
|
|
X = df[['time_index']].values
|
|
y = df['price_per_sqft'].values
|
|
|
|
# Fit linear regression model
|
|
model = LinearRegression()
|
|
model.fit(X, y)
|
|
|
|
# Generate future time indices
|
|
last_time = df['time_index'].iloc[-1]
|
|
future_indices = np.arange(last_time + 1, last_time + periods + 1).reshape(-1, 1)
|
|
|
|
# Make predictions
|
|
predictions = model.predict(future_indices)
|
|
|
|
# Calculate confidence intervals
|
|
residuals = y - model.predict(X)
|
|
std_error = np.std(residuals)
|
|
|
|
# Z-score for confidence level
|
|
if confidence_level == 0.95:
|
|
z_score = 1.96
|
|
elif confidence_level == 0.90:
|
|
z_score = 1.645
|
|
else:
|
|
z_score = 1.96
|
|
|
|
confidence_interval = z_score * std_error
|
|
|
|
# Generate forecast dates
|
|
last_date = df.index[-1]
|
|
forecast_dates = pd.date_range(
|
|
start=last_date + pd.DateOffset(months=1),
|
|
periods=periods,
|
|
freq='M'
|
|
)
|
|
|
|
# Prepare results
|
|
forecast_data = {
|
|
'area': df.index[0].strftime('%Y-%m-%d') + ' to ' + df.index[-1].strftime('%Y-%m-%d'),
|
|
'forecast_periods': periods,
|
|
'confidence_level': confidence_level,
|
|
'model_accuracy': self._calculate_model_accuracy(model, X, y),
|
|
'forecasts': []
|
|
}
|
|
|
|
for i, (date, pred) in enumerate(zip(forecast_dates, predictions)):
|
|
# Convert back to total price (assuming average area)
|
|
avg_area = df['area'].mean()
|
|
total_price = pred * avg_area
|
|
|
|
forecast_data['forecasts'].append({
|
|
'date': date.strftime('%Y-%m-%d'),
|
|
'predicted_price': round(total_price, 2),
|
|
'predicted_price_per_sqft': round(pred, 2),
|
|
'confidence_lower': round(total_price - confidence_interval * avg_area, 2),
|
|
'confidence_upper': round(total_price + confidence_interval * avg_area, 2),
|
|
'confidence_interval': round(confidence_interval * avg_area, 2)
|
|
})
|
|
|
|
return forecast_data
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error in time series forecast: {e}')
|
|
raise
|
|
|
|
def _calculate_model_accuracy(self, model, X, y):
|
|
"""Calculate model accuracy metrics."""
|
|
predictions = model.predict(X)
|
|
mae = mean_absolute_error(y, predictions)
|
|
mse = mean_squared_error(y, predictions)
|
|
rmse = np.sqrt(mse)
|
|
|
|
# R-squared
|
|
ss_res = np.sum((y - predictions) ** 2)
|
|
ss_tot = np.sum((y - np.mean(y)) ** 2)
|
|
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
|
|
|
|
return {
|
|
'mae': round(mae, 2),
|
|
'mse': round(mse, 2),
|
|
'rmse': round(rmse, 2),
|
|
'r_squared': round(r_squared, 4)
|
|
}
|
|
|
|
def _save_forecast(self, area_en, property_type, property_sub_type, forecast_data):
|
|
"""Save forecast data to database."""
|
|
try:
|
|
for forecast_item in forecast_data['forecasts']:
|
|
Forecast.objects.create(
|
|
area_en=area_en,
|
|
property_type=property_type,
|
|
property_sub_type=property_sub_type,
|
|
forecast_date=datetime.strptime(forecast_item['date'], '%Y-%m-%d'),
|
|
predicted_price=forecast_item['predicted_price'],
|
|
confidence_interval_lower=forecast_item['confidence_lower'],
|
|
confidence_interval_upper=forecast_item['confidence_upper'],
|
|
model_version='1.0',
|
|
accuracy_score=forecast_data['model_accuracy']['r_squared'],
|
|
metadata={
|
|
'confidence_level': forecast_data['confidence_level'],
|
|
'model_accuracy': forecast_data['model_accuracy']
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f'Error saving forecast: {e}')
|
|
# Don't raise exception here as forecast generation can still succeed
|
|
|