568 lines
16 KiB
JavaScript
568 lines
16 KiB
JavaScript
const logger = require('../utils/logger');
|
|
const database = require('../config/database');
|
|
const redis = require('../config/redis');
|
|
const notificationService = require('./notificationService');
|
|
|
|
class AlertService {
|
|
constructor() {
|
|
this.alertRules = new Map();
|
|
this.isInitialized = false;
|
|
}
|
|
|
|
async initialize() {
|
|
try {
|
|
await this.loadAlertRules();
|
|
await this.setupAlertMonitoring();
|
|
this.isInitialized = true;
|
|
logger.info('Alert service initialized successfully');
|
|
} catch (error) {
|
|
logger.error('Failed to initialize Alert service:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async loadAlertRules() {
|
|
try {
|
|
const rules = await database.query('SELECT * FROM alert_rules WHERE status = "active"');
|
|
|
|
for (const rule of rules) {
|
|
this.alertRules.set(rule.id, {
|
|
...rule,
|
|
conditions: JSON.parse(rule.conditions),
|
|
actions: JSON.parse(rule.actions)
|
|
});
|
|
}
|
|
|
|
logger.info(`Loaded ${rules.length} alert rules`);
|
|
} catch (error) {
|
|
logger.error('Failed to load alert rules:', error);
|
|
}
|
|
}
|
|
|
|
async setupAlertMonitoring() {
|
|
// Monitor for alert rule updates every 30 seconds
|
|
setInterval(async () => {
|
|
await this.refreshAlertRules();
|
|
}, 30 * 1000);
|
|
}
|
|
|
|
async checkDeviceAlerts(deviceId, data, timestamp) {
|
|
try {
|
|
const alerts = [];
|
|
|
|
// Check predefined alert rules
|
|
for (const [ruleId, rule] of this.alertRules) {
|
|
if (rule.device_id === deviceId || rule.device_id === null) {
|
|
const alert = await this.evaluateAlertRule(rule, data, timestamp);
|
|
if (alert) {
|
|
alerts.push(alert);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check threshold-based alerts
|
|
const thresholdAlerts = await this.checkThresholdAlerts(deviceId, data, timestamp);
|
|
alerts.push(...thresholdAlerts);
|
|
|
|
// Check anomaly-based alerts
|
|
const anomalyAlerts = await this.checkAnomalyAlerts(deviceId, data, timestamp);
|
|
alerts.push(...anomalyAlerts);
|
|
|
|
// Process and store alerts
|
|
for (const alert of alerts) {
|
|
await this.processAlert(alert);
|
|
}
|
|
|
|
return alerts;
|
|
} catch (error) {
|
|
logger.error('Error checking device alerts:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async evaluateAlertRule(rule, data, timestamp) {
|
|
try {
|
|
const conditions = rule.conditions;
|
|
let allConditionsMet = true;
|
|
|
|
for (const condition of conditions) {
|
|
const value = this.extractValueFromData(data, condition.field);
|
|
|
|
if (!this.evaluateCondition(value, condition.operator, condition.value)) {
|
|
allConditionsMet = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (allConditionsMet) {
|
|
return {
|
|
deviceId: rule.device_id,
|
|
type: rule.alert_type,
|
|
severity: rule.severity,
|
|
message: rule.message,
|
|
category: rule.category,
|
|
data: data,
|
|
timestamp: timestamp,
|
|
ruleId: rule.id,
|
|
actions: rule.actions
|
|
};
|
|
}
|
|
|
|
return null;
|
|
} catch (error) {
|
|
logger.error('Error evaluating alert rule:', error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
async checkThresholdAlerts(deviceId, data, timestamp) {
|
|
try {
|
|
const alerts = [];
|
|
|
|
// Get device thresholds
|
|
const thresholds = await database.query(
|
|
'SELECT * FROM device_thresholds WHERE device_id = ? AND status = "active"',
|
|
[deviceId]
|
|
);
|
|
|
|
for (const threshold of thresholds) {
|
|
const value = this.extractValueFromData(data, threshold.metric);
|
|
|
|
if (value !== null && value !== undefined) {
|
|
let alert = null;
|
|
|
|
if (threshold.operator === 'gt' && value > threshold.value) {
|
|
alert = {
|
|
deviceId,
|
|
type: 'threshold_exceeded',
|
|
severity: threshold.severity,
|
|
message: `${threshold.metric} exceeded threshold: ${value} > ${threshold.value}`,
|
|
category: 'threshold',
|
|
data: { metric: threshold.metric, value, threshold: threshold.value },
|
|
timestamp,
|
|
thresholdId: threshold.id
|
|
};
|
|
} else if (threshold.operator === 'lt' && value < threshold.value) {
|
|
alert = {
|
|
deviceId,
|
|
type: 'threshold_below',
|
|
severity: threshold.severity,
|
|
message: `${threshold.metric} below threshold: ${value} < ${threshold.value}`,
|
|
category: 'threshold',
|
|
data: { metric: threshold.metric, value, threshold: threshold.value },
|
|
timestamp,
|
|
thresholdId: threshold.id
|
|
};
|
|
}
|
|
|
|
if (alert) {
|
|
alerts.push(alert);
|
|
}
|
|
}
|
|
}
|
|
|
|
return alerts;
|
|
} catch (error) {
|
|
logger.error('Error checking threshold alerts:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async checkAnomalyAlerts(deviceId, data, timestamp) {
|
|
try {
|
|
const alerts = [];
|
|
|
|
// Get historical data for anomaly detection
|
|
const historicalData = await database.query(
|
|
'SELECT raw_data FROM device_data WHERE device_id = ? ORDER BY timestamp DESC LIMIT 50',
|
|
[deviceId]
|
|
);
|
|
|
|
if (historicalData.length < 10) {
|
|
return alerts; // Not enough data for anomaly detection
|
|
}
|
|
|
|
// Calculate statistical measures
|
|
const values = historicalData.map(d => this.extractNumericValues(d.raw_data)).flat();
|
|
const mean = values.reduce((a, b) => a + b, 0) / values.length;
|
|
const variance = values.reduce((a, b) => a + Math.pow(b - mean, 2), 0) / values.length;
|
|
const stdDev = Math.sqrt(variance);
|
|
|
|
// Check current data for anomalies
|
|
const currentValues = this.extractNumericValues(data);
|
|
const anomalyScores = currentValues.map(value => {
|
|
const zScore = Math.abs((value - mean) / stdDev);
|
|
return zScore;
|
|
});
|
|
|
|
const maxAnomalyScore = Math.max(...anomalyScores);
|
|
|
|
if (maxAnomalyScore > 3) { // 3 standard deviations
|
|
alerts.push({
|
|
deviceId,
|
|
type: 'anomaly_detected',
|
|
severity: maxAnomalyScore > 5 ? 'critical' : 'warning',
|
|
message: `Anomaly detected with z-score ${maxAnomalyScore.toFixed(2)}`,
|
|
category: 'anomaly',
|
|
data: { anomalyScore: maxAnomalyScore, values: currentValues },
|
|
timestamp
|
|
});
|
|
}
|
|
|
|
return alerts;
|
|
} catch (error) {
|
|
logger.error('Error checking anomaly alerts:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async processAlert(alert) {
|
|
try {
|
|
// Store alert in database
|
|
const alertId = await this.storeAlert(alert);
|
|
|
|
// Cache alert in Redis
|
|
await redis.cacheAlert(alertId, alert);
|
|
|
|
// Log alert
|
|
logger.logAlert(alert.type, alert.severity, alert.message, alert.deviceId);
|
|
|
|
// Execute alert actions
|
|
await this.executeAlertActions(alert);
|
|
|
|
// Send notifications
|
|
await this.sendAlertNotifications(alert);
|
|
|
|
// Trigger healing if enabled
|
|
if (alert.severity === 'critical' && process.env.AI_HEALING_ENABLED === 'true') {
|
|
await this.triggerHealing(alert);
|
|
}
|
|
|
|
return alertId;
|
|
} catch (error) {
|
|
logger.error('Error processing alert:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async storeAlert(alert) {
|
|
try {
|
|
const result = await database.query(
|
|
`INSERT INTO alerts
|
|
(device_id, type, severity, message, category, data, timestamp, status, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, NOW())`,
|
|
[
|
|
alert.deviceId,
|
|
alert.type,
|
|
alert.severity,
|
|
alert.message,
|
|
alert.category,
|
|
JSON.stringify(alert.data),
|
|
alert.timestamp,
|
|
'active'
|
|
]
|
|
);
|
|
|
|
return result.insertId;
|
|
} catch (error) {
|
|
logger.error('Failed to store alert:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async executeAlertActions(alert) {
|
|
try {
|
|
if (alert.actions && Array.isArray(alert.actions)) {
|
|
for (const action of alert.actions) {
|
|
await this.executeAction(action, alert);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error executing alert actions:', error);
|
|
}
|
|
}
|
|
|
|
async executeAction(action, alert) {
|
|
try {
|
|
switch (action.type) {
|
|
case 'email':
|
|
await notificationService.sendEmail(action.recipients, {
|
|
subject: `Alert: ${alert.type}`,
|
|
body: alert.message,
|
|
severity: alert.severity
|
|
});
|
|
break;
|
|
|
|
case 'sms':
|
|
await notificationService.sendSMS(action.recipients, {
|
|
message: `${alert.severity.toUpperCase()}: ${alert.message}`,
|
|
deviceId: alert.deviceId
|
|
});
|
|
break;
|
|
|
|
case 'webhook':
|
|
await notificationService.sendWebhook(action.url, {
|
|
alert: alert,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
break;
|
|
|
|
case 'device_control':
|
|
await this.executeDeviceControl(action, alert);
|
|
break;
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error executing action:', error);
|
|
}
|
|
}
|
|
|
|
async executeDeviceControl(action, alert) {
|
|
try {
|
|
// Store device control action
|
|
await database.query(
|
|
`INSERT INTO device_controls
|
|
(device_id, action, parameters, triggered_by_alert, status)
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
[
|
|
alert.deviceId,
|
|
action.control,
|
|
JSON.stringify(action.parameters),
|
|
alert.id,
|
|
'pending'
|
|
]
|
|
);
|
|
|
|
logger.info(`Device control action triggered by alert: ${action.control}`);
|
|
} catch (error) {
|
|
logger.error('Error executing device control:', error);
|
|
}
|
|
}
|
|
|
|
async sendAlertNotifications(alert) {
|
|
try {
|
|
// Get users who should be notified
|
|
const users = await this.getUsersToNotify(alert);
|
|
|
|
for (const user of users) {
|
|
await notificationService.sendNotification(user.id, {
|
|
type: 'alert',
|
|
title: `Alert: ${alert.type}`,
|
|
message: alert.message,
|
|
severity: alert.severity,
|
|
deviceId: alert.deviceId,
|
|
data: alert.data
|
|
});
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error sending alert notifications:', error);
|
|
}
|
|
}
|
|
|
|
async getUsersToNotify(alert) {
|
|
try {
|
|
// Get users based on alert severity and device access
|
|
let query = `
|
|
SELECT DISTINCT u.id, u.username, u.email, u.notification_preferences
|
|
FROM users u
|
|
LEFT JOIN user_device_access uda ON u.id = uda.user_id
|
|
WHERE u.status = 'active'
|
|
`;
|
|
|
|
const params = [];
|
|
|
|
if (alert.deviceId) {
|
|
query += ' AND (uda.device_id = ? OR u.role = "admin")';
|
|
params.push(alert.deviceId);
|
|
} else {
|
|
query += ' AND u.role = "admin"';
|
|
}
|
|
|
|
const users = await database.query(query, params);
|
|
|
|
// Filter users based on notification preferences
|
|
return users.filter(user => {
|
|
const preferences = JSON.parse(user.notification_preferences || '{}');
|
|
return preferences[alert.severity] !== false;
|
|
});
|
|
} catch (error) {
|
|
logger.error('Error getting users to notify:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async triggerHealing(alert) {
|
|
try {
|
|
// Import healing service dynamically to avoid circular dependency
|
|
const healingService = require('./healingService');
|
|
await healingService.triggerHealing(alert);
|
|
} catch (error) {
|
|
logger.error('Error triggering healing:', error);
|
|
}
|
|
}
|
|
|
|
async acknowledgeAlert(alertId, userId) {
|
|
try {
|
|
await database.query(
|
|
'UPDATE alerts SET acknowledged_by = ?, acknowledged_at = NOW(), status = "acknowledged" WHERE id = ?',
|
|
[userId, alertId]
|
|
);
|
|
|
|
logger.info(`Alert ${alertId} acknowledged by user ${userId}`);
|
|
} catch (error) {
|
|
logger.error('Error acknowledging alert:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async resolveAlert(alertId, userId, resolution = '') {
|
|
try {
|
|
await database.query(
|
|
'UPDATE alerts SET resolved_by = ?, resolved_at = NOW(), status = "resolved", resolution = ? WHERE id = ?',
|
|
[userId, resolution, alertId]
|
|
);
|
|
|
|
logger.info(`Alert ${alertId} resolved by user ${userId}`);
|
|
} catch (error) {
|
|
logger.error('Error resolving alert:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async getActiveAlerts(deviceId = null, limit = 100) {
|
|
try {
|
|
let query = 'SELECT * FROM alerts WHERE status = "active"';
|
|
const params = [];
|
|
|
|
if (deviceId) {
|
|
query += ' AND device_id = ?';
|
|
params.push(deviceId);
|
|
}
|
|
|
|
query += ' ORDER BY timestamp DESC LIMIT ?';
|
|
params.push(limit);
|
|
|
|
return await database.query(query, params);
|
|
} catch (error) {
|
|
logger.error('Error getting active alerts:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async getAlertHistory(deviceId = null, startDate = null, endDate = null, limit = 100) {
|
|
try {
|
|
let query = 'SELECT * FROM alerts WHERE 1=1';
|
|
const params = [];
|
|
|
|
if (deviceId) {
|
|
query += ' AND device_id = ?';
|
|
params.push(deviceId);
|
|
}
|
|
|
|
if (startDate) {
|
|
query += ' AND timestamp >= ?';
|
|
params.push(startDate);
|
|
}
|
|
|
|
if (endDate) {
|
|
query += ' AND timestamp <= ?';
|
|
params.push(endDate);
|
|
}
|
|
|
|
query += ' ORDER BY timestamp DESC LIMIT ?';
|
|
params.push(limit);
|
|
|
|
return await database.query(query, params);
|
|
} catch (error) {
|
|
logger.error('Error getting alert history:', error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async refreshAlertRules() {
|
|
try {
|
|
await this.loadAlertRules();
|
|
} catch (error) {
|
|
logger.error('Error refreshing alert rules:', error);
|
|
}
|
|
}
|
|
|
|
// Helper methods
|
|
extractValueFromData(data, field) {
|
|
try {
|
|
const keys = field.split('.');
|
|
let value = data;
|
|
|
|
for (const key of keys) {
|
|
if (value && typeof value === 'object' && key in value) {
|
|
value = value[key];
|
|
} else {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
return value;
|
|
} catch (error) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
evaluateCondition(value, operator, expectedValue) {
|
|
try {
|
|
switch (operator) {
|
|
case 'eq':
|
|
return value === expectedValue;
|
|
case 'ne':
|
|
return value !== expectedValue;
|
|
case 'gt':
|
|
return value > expectedValue;
|
|
case 'gte':
|
|
return value >= expectedValue;
|
|
case 'lt':
|
|
return value < expectedValue;
|
|
case 'lte':
|
|
return value <= expectedValue;
|
|
case 'contains':
|
|
return String(value).includes(String(expectedValue));
|
|
case 'regex':
|
|
return new RegExp(expectedValue).test(String(value));
|
|
default:
|
|
return false;
|
|
}
|
|
} catch (error) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
extractNumericValues(data) {
|
|
const values = [];
|
|
const extract = (obj) => {
|
|
for (const key in obj) {
|
|
if (typeof obj[key] === 'number') {
|
|
values.push(obj[key]);
|
|
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
|
|
extract(obj[key]);
|
|
}
|
|
}
|
|
};
|
|
extract(data);
|
|
return values;
|
|
}
|
|
|
|
async healthCheck() {
|
|
try {
|
|
return {
|
|
status: this.isInitialized ? 'healthy' : 'not_initialized',
|
|
message: this.isInitialized ? 'Alert service is healthy' : 'Service not initialized',
|
|
activeRules: this.alertRules.size
|
|
};
|
|
} catch (error) {
|
|
return {
|
|
status: 'unhealthy',
|
|
message: 'Alert service health check failed',
|
|
error: error.message
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = new AlertService();
|