// const WebSocket = require('ws'); // const jwt = require('jsonwebtoken'); // const db = require('../config/database'); // const { getPool } = require('../config/database'); // // user_id -> websocket mapping // const userSocketMap = new Map(); // // Keep track of connection attempts to prevent rapid reconnection loops // const connectionAttempts = new WeakMap(); // /** // * Initializes the WebSocket server. // * @param {https.Server} server - The HTTP/HTTPS server instance from Express. // */ // function initWebSocket(server) { // const wss = new WebSocket.Server({ // server, // clientTracking: true, // // Consider adding ping/pong for connection health // // clientTracking: true, // // maxPayload: 100 * 1024 * 1024, // 100MB // }); // console.log('WebSocket server initialized'); // // Handle new connections // wss.on('connection', (ws, req) => { // const clientIp = req.socket.remoteAddress; // // console.log(`New WebSocket connection from ${clientIp}`); // // Set up ping/pong to detect dead connections // let isAlive = true; // ws.isAlive = true; // const pingInterval = setInterval(() => { // if (!isAlive) { // console.log('Terminating dead connection'); // return ws.terminate(); // } // isAlive = false; // try { // ws.ping(() => {}); // } catch (e) { // console.error('Error sending ping:', e); // } // }, 30000); // ws.on('pong', () => { // isAlive = true; // }); // ws.on('message', (message) => { // try { // const data = JSON.parse(message); // // Handle different message types // if (data.type === 'MESSAGE') { // console.log(`Received message: ${data.content}`); // ws.send(JSON.stringify({ // type: 'MESSAGE_RECEIVED', // content: data.content, // timestamp: new Date().toISOString() // })); // return; // } // // Handle authentication with token // const { token } = data; // if (!token) { // ws.send(JSON.stringify({ // type: 'ERROR', // error: 'No token provided', // timestamp: new Date().toISOString() // })); // return; // } // const decoded = jwt.decode(token); // Use jwt.verify() in production // const userId = decoded?.claims?.user_id; // if (!userId) { // ws.send(JSON.stringify({ // type: 'ERROR', // error: 'Invalid token or missing user_id in claims', // timestamp: new Date().toISOString() // })); // return; // } // // console.log(`Authenticated user: ${userId}`); // // Store the WebSocket connection with the user ID // userSocketMap.set(userId, ws); // // Store user ID in the WebSocket object for cleanup // ws.userId = userId; // ws.send(JSON.stringify({ // type: 'CONNECTED', // message: `Connected as ${userId}`, // timestamp: new Date().toISOString() // })); // } catch (err) { // console.error('WebSocket message error:', err); // try { // ws.send(JSON.stringify({ // type: 'ERROR', // error: 'Invalid message format or processing error', // timestamp: new Date().toISOString() // })); // } catch (sendErr) { // console.error('Failed to send error message:', sendErr); // } // } // }); // // Handle connection close // ws.on('close', () => { // // console.log(`WebSocket closed for ${ws.userId || 'unknown user'}`); // if (ws.userId) { // userSocketMap.delete(ws.userId); // } // clearInterval(pingInterval); // }); // // Handle errors // ws.on('error', (error) => { // console.error('WebSocket error:', error); // if (ws.readyState === WebSocket.OPEN) { // try { // ws.close(1011, 'Internal server error'); // } catch (e) { // // Ignore errors during close // } // } // }); // }); // // Handle server errors // wss.on('error', (error) => { // console.error('WebSocket server error:', error); // }); // return wss; // } // /** // * Sends an alert to a user via WebSocket. // * @param {string} userId - The target user ID. // * @param {string|object} alert - The alert message or payload. // * @returns {boolean} - Success/failure. // */ // async function sendAlertToUser(userId, alert) { // console.log("socket -- invoked---"); // console.log("userID---", userId); // console.log("alert---", alert); // // Save to database // try { // const pool = getPool(); // pool.query( // 'INSERT INTO user_alerts (user_id, alert) VALUES (?, ?)', // [userId, JSON.stringify(alert)], // (err, results) => { // if (err) { // console.error("Failed to save alert to DB:", err); // } else { // console.log("Alert saved to database."); // } // } // ); // } catch (err) { // console.error("Database operation failed:", err); // } // const ws = userSocketMap.get(userId); // if (ws && ws.readyState === WebSocket.OPEN) { // ws.send(JSON.stringify({ type: 'ALERT', payload: alert })); // return true; // } // return false; // } // module.exports = { // initWebSocket, // sendAlertToUser, // }; const WebSocket = require('ws'); const jwt = require('jsonwebtoken'); const url = require('url'); const { getPool } = require('../config/database'); const userSocketMap = new Map(); /** * Sends the user's alert history from DB. */ async function sendUserAlertHistory(ws, userId) { try { const pool = getPool(); // Assumes mysql2/promise pool const [results] = await pool.query( 'SELECT * FROM user_alerts WHERE user_id = ? AND acknowledged = 0 ORDER BY created_at DESC', [userId] ); console.log(`✅ Fetched ${results.length} unacknowledged alerts for user ${userId}`); if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'ALERT_HISTORY', alerts: results, timestamp: new Date().toISOString() })); } else { console.warn(`WebSocket not open for user ${userId}, skipping send.`); } } catch (err) { console.error("❌ Error in sendUserAlertHistory:", err); if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'ERROR', error: 'Failed to load unacknowledged alerts', timestamp: new Date().toISOString() })); } } } /** * Initializes the WebSocket server. */ function initWebSocket(server) { const wss = new WebSocket.Server({ server, clientTracking: true, }); console.log('WebSocket server initialized'); wss.on('connection', (ws, req) => { const location = url.parse(req.url, true); const token = location.query?.token; console.log("received token is ",token) if (!token) { ws.send(JSON.stringify({ type: 'ERROR', error: 'Token missing from URL', timestamp: new Date().toISOString() })); ws.close(); return; } const decoded = jwt.decode(token); const userId = decoded?.claims?.user_id; if (!userId) { ws.send(JSON.stringify({ type: 'ERROR', error: 'Invalid token or missing user_id', timestamp: new Date().toISOString() })); ws.close(); return; } ws.userId = userId; console.log("userid--",userId) userSocketMap.set(userId, ws); ws.send(JSON.stringify({ type: 'CONNECTED', message: `Connected as ${userId}`, timestamp: new Date().toISOString() })); // ✅ Send alert history on connect sendUserAlertHistory(ws, userId); // Keep-alive ping/pong ws.isAlive = true; const pingInterval = setInterval(() => { if (!ws.isAlive) return ws.terminate(); ws.isAlive = false; try { ws.ping(() => {}); } catch (e) { console.error('Ping error:', e); } }, 30000); ws.on('pong', () => { ws.isAlive = true; }); ws.on('message', (message) => { try { const data = JSON.parse(message); if (data.type === 'MESSAGE') { ws.send(JSON.stringify({ type: 'MESSAGE_RECEIVED', content: data.content, timestamp: new Date().toISOString() })); } } catch (err) { console.error("Message error:", err); ws.send(JSON.stringify({ type: 'ERROR', error: 'Invalid message format', timestamp: new Date().toISOString() })); } }); ws.on('close', () => { if (ws.userId) userSocketMap.delete(ws.userId); clearInterval(pingInterval); }); ws.on('error', (error) => { console.error('WebSocket error:', error); if (ws.readyState === WebSocket.OPEN) { ws.close(1011, 'Internal server error'); } }); }); wss.on('error', (error) => { console.error('WebSocket server error:', error); }); return wss; } /** * Send a real-time alert to a specific user via WebSocket. */ async function sendAlertToUser(userId, alert) { console.log("socket -- invoked---"); console.log("userID---", userId); console.log("alert---", alert); try { const pool = getPool(); pool.query( 'INSERT INTO user_alerts (user_id, alert) VALUES (?, ?)', [userId, JSON.stringify(alert)], (err, results) => { if (err) { console.error("Failed to save alert to DB:", err); } else { console.log("Alert saved to database."); } } ); } catch (err) { console.error("Database operation failed:", err); } const ws = userSocketMap.get(userId); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'ALERT', payload: alert })); return true; } return false; } module.exports = { initWebSocket, sendAlertToUser, };