const axios = require('axios'); const WebSocket = require('ws'); const logger = require('../utils/logger'); const database = require('../config/database'); const redis = require('../config/redis'); const aiAgentService = require('./aiAgentService'); const alertService = require('./alertService'); class StreamPipesService { constructor() { // Support both old format (host:port) and new format (full URL) if (process.env.STREAMPIPES_BASE_URL) { this.baseUrl = process.env.STREAMPIPES_BASE_URL; } else { this.baseUrl = `http://${process.env.STREAMPIPES_HOST || 'localhost'}:${process.env.STREAMPIPES_PORT || 8080}`; } this.username = process.env.STREAMPIPES_USERNAME || 'admin'; this.password = process.env.STREAMPIPES_PASSWORD || 'admin'; this.token = null; this.wsConnections = new Map(); this.isInitialized = false; } async initialize() { try { await this.authenticate(); if (this.token) { await this.setupDataStreams(); this.isInitialized = true; logger.info('StreamPipes service initialized successfully'); } else { logger.warn('StreamPipes service not available - continuing without StreamPipes integration'); this.isInitialized = false; } } catch (error) { logger.warn('StreamPipes service not available - continuing without StreamPipes integration'); this.isInitialized = false; } } async authenticate() { try { const response = await axios.post(`${this.baseUrl}/streampipes-backend/api/v2/auth/login`, { username: this.username, password: this.password }); this.token = response.data.token; logger.info('StreamPipes authentication successful'); } catch (error) { logger.error('StreamPipes authentication failed:', { message: error.message, status: error.response?.status, statusText: error.response?.statusText }); // Don't throw error, just log it and continue this.token = null; } } async setupDataStreams() { try { // Get all data streams const streams = await this.getDataStreams(); for (const stream of streams) { await this.subscribeToStream(stream); } logger.info(`Subscribed to ${streams.length} data streams`); } catch (error) { logger.error('Failed to setup data streams:', error); throw error; } } async getDataStreams() { try { const response = await axios.get(`${this.baseUrl}/streampipes-backend/api/v2/streams`, { headers: { 'Authorization': `Bearer ${this.token}` } }); return response.data || []; } catch (error) { logger.error('Failed to get data streams:', error); return []; } } async subscribeToStream(stream) { try { // Convert HTTP URL to WebSocket URL let wsUrl; if (process.env.STREAMPIPES_BASE_URL) { wsUrl = process.env.STREAMPIPES_BASE_URL.replace('https://', 'wss://').replace('http://', 'ws://'); } else { wsUrl = `ws://${process.env.STREAMPIPES_HOST || 'localhost'}:${process.env.STREAMPIPES_PORT || 8080}`; } wsUrl += `/streampipes-backend/api/v2/streams/${stream.elementId}/data`; const ws = new WebSocket(wsUrl, { headers: { 'Authorization': `Bearer ${this.token}` } }); ws.on('open', () => { logger.info(`Connected to StreamPipes stream: ${stream.name}`); this.wsConnections.set(stream.elementId, ws); }); ws.on('message', async (data) => { try { const message = JSON.parse(data); await this.processStreamData(stream, message); } catch (error) { logger.error('Error processing stream data:', error); } }); ws.on('error', (error) => { logger.error(`WebSocket error for stream ${stream.name}:`, error); }); ws.on('close', () => { logger.info(`Disconnected from StreamPipes stream: ${stream.name}`); this.wsConnections.delete(stream.elementId); // Attempt to reconnect after 5 seconds setTimeout(() => { this.subscribeToStream(stream); }, 5000); }); } catch (error) { logger.error(`Failed to subscribe to stream ${stream.name}:`, error); } } async processStreamData(stream, data) { try { const startTime = Date.now(); // Extract device information const deviceId = data.deviceId || data.sensorId || stream.elementId; const timestamp = data.timestamp || new Date().toISOString(); // Log the incoming data logger.logDeviceData(deviceId, 'stream', data); // Store raw data in database await this.storeDeviceData(deviceId, stream, data, timestamp); // Cache latest data in Redis await redis.cacheDeviceData(deviceId, { stream: stream.name, data: data, timestamp: timestamp }); // Process with AI Agent if (process.env.AI_AGENT_ENABLED === 'true') { await aiAgentService.processDeviceData(deviceId, data, timestamp); } // Check for alerts await alertService.checkDeviceAlerts(deviceId, data, timestamp); const processingTime = Date.now() - startTime; logger.logPerformance('stream_data_processing', processingTime, { deviceId, streamName: stream.name, dataSize: JSON.stringify(data).length }); } catch (error) { logger.error('Error processing stream data:', error); } } async storeDeviceData(deviceId, stream, data, timestamp) { try { await database.query( `INSERT INTO device_data (device_id, stream_id, stream_name, raw_data, timestamp, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [deviceId, stream.elementId, stream.name, JSON.stringify(data), timestamp] ); } catch (error) { logger.error('Failed to store device data:', error); } } async getDeviceData(deviceId, limit = 100) { try { const data = await database.query( `SELECT * FROM device_data WHERE device_id = ? ORDER BY timestamp DESC LIMIT ?`, [deviceId, limit] ); return data; } catch (error) { logger.error('Failed to get device data:', error); return []; } } async getDeviceDataByTimeRange(deviceId, startTime, endTime) { try { const data = await database.query( `SELECT * FROM device_data WHERE device_id = ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp ASC`, [deviceId, startTime, endTime] ); return data; } catch (error) { logger.error('Failed to get device data by time range:', error); return []; } } async getStreamStatistics() { try { const stats = await database.query(` SELECT stream_name, COUNT(*) as message_count, MIN(timestamp) as first_message, MAX(timestamp) as last_message, AVG(JSON_LENGTH(raw_data)) as avg_data_size FROM device_data WHERE created_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR) GROUP BY stream_name ORDER BY message_count DESC `); return stats; } catch (error) { logger.error('Failed to get stream statistics:', error); return []; } } async cleanupOldData(daysToKeep = 30) { try { const result = await database.query( 'DELETE FROM device_data WHERE created_at < DATE_SUB(NOW(), INTERVAL ? DAY)', [daysToKeep] ); logger.info(`Cleaned up ${result.affectedRows} old data records`); return result.affectedRows; } catch (error) { logger.error('Failed to cleanup old data:', error); return 0; } } async healthCheck() { try { if (!this.isInitialized) { return { status: 'not_initialized', message: 'Service not initialized' }; } if (!this.token) { return { status: 'not_authenticated', message: 'Not authenticated' }; } // Test API connection await axios.get(`${this.baseUrl}/streampipes-backend/api/v2/streams`, { headers: { 'Authorization': `Bearer ${this.token}` } }); return { status: 'healthy', message: 'Service is healthy', activeConnections: this.wsConnections.size }; } catch (error) { return { status: 'unhealthy', message: 'Service health check failed', error: error.message }; } } async disconnect() { try { // Close all WebSocket connections for (const [streamId, ws] of this.wsConnections) { ws.close(); } this.wsConnections.clear(); logger.info('StreamPipes service disconnected'); } catch (error) { logger.error('Error disconnecting StreamPipes service:', error); } } } module.exports = new StreamPipesService();