AI-Voice-Agent-Node/services/webSocket.js
2025-05-27 21:07:33 +05:30

531 lines
14 KiB
JavaScript

<<<<<<< HEAD
// const WebSocket = require('ws');
// const jwt = require('jsonwebtoken');
// const db = require('../config/database');
// const { getPool } = require('../config/database');
// // user_id -> websocket mapping
// const userSocketMap = new Map();
// // Keep track of connection attempts to prevent rapid reconnection loops
// const connectionAttempts = new WeakMap();
// /**
// * Initializes the WebSocket server.
// * @param {https.Server} server - The HTTP/HTTPS server instance from Express.
// */
// function initWebSocket(server) {
// const wss = new WebSocket.Server({
// server,
// clientTracking: true,
// // Consider adding ping/pong for connection health
// // clientTracking: true,
// // maxPayload: 100 * 1024 * 1024, // 100MB
// });
// console.log('WebSocket server initialized');
// // Handle new connections
// wss.on('connection', (ws, req) => {
// const clientIp = req.socket.remoteAddress;
// // console.log(`New WebSocket connection from ${clientIp}`);
// // Set up ping/pong to detect dead connections
// let isAlive = true;
// ws.isAlive = true;
// const pingInterval = setInterval(() => {
// if (!isAlive) {
// console.log('Terminating dead connection');
// return ws.terminate();
// }
// isAlive = false;
// try {
// ws.ping(() => {});
// } catch (e) {
// console.error('Error sending ping:', e);
// }
// }, 30000);
// ws.on('pong', () => {
// isAlive = true;
// });
// ws.on('message', (message) => {
// try {
// const data = JSON.parse(message);
// // Handle different message types
// if (data.type === 'MESSAGE') {
// console.log(`Received message: ${data.content}`);
// ws.send(JSON.stringify({
// type: 'MESSAGE_RECEIVED',
// content: data.content,
// timestamp: new Date().toISOString()
// }));
// return;
// }
// // Handle authentication with token
// const { token } = data;
// if (!token) {
// ws.send(JSON.stringify({
// type: 'ERROR',
// error: 'No token provided',
// timestamp: new Date().toISOString()
// }));
// return;
// }
// const decoded = jwt.decode(token); // Use jwt.verify() in production
// const userId = decoded?.claims?.user_id;
// if (!userId) {
// ws.send(JSON.stringify({
// type: 'ERROR',
// error: 'Invalid token or missing user_id in claims',
// timestamp: new Date().toISOString()
// }));
// return;
// }
// // console.log(`Authenticated user: ${userId}`);
// // Store the WebSocket connection with the user ID
// userSocketMap.set(userId, ws);
// // Store user ID in the WebSocket object for cleanup
// ws.userId = userId;
// ws.send(JSON.stringify({
// type: 'CONNECTED',
// message: `Connected as ${userId}`,
// timestamp: new Date().toISOString()
// }));
// } catch (err) {
// console.error('WebSocket message error:', err);
// try {
// ws.send(JSON.stringify({
// type: 'ERROR',
// error: 'Invalid message format or processing error',
// timestamp: new Date().toISOString()
// }));
// } catch (sendErr) {
// console.error('Failed to send error message:', sendErr);
// }
// }
// });
// // Handle connection close
// ws.on('close', () => {
// // console.log(`WebSocket closed for ${ws.userId || 'unknown user'}`);
// if (ws.userId) {
// userSocketMap.delete(ws.userId);
// }
// clearInterval(pingInterval);
// });
// // Handle errors
// ws.on('error', (error) => {
// console.error('WebSocket error:', error);
// if (ws.readyState === WebSocket.OPEN) {
// try {
// ws.close(1011, 'Internal server error');
// } catch (e) {
// // Ignore errors during close
// }
// }
// });
// });
// // Handle server errors
// wss.on('error', (error) => {
// console.error('WebSocket server error:', error);
// });
// return wss;
// }
// /**
// * Sends an alert to a user via WebSocket.
// * @param {string} userId - The target user ID.
// * @param {string|object} alert - The alert message or payload.
// * @returns {boolean} - Success/failure.
// */
// async function sendAlertToUser(userId, alert) {
// console.log("socket -- invoked---");
// console.log("userID---", userId);
// console.log("alert---", alert);
// // Save to database
// try {
// const pool = getPool();
// pool.query(
// 'INSERT INTO user_alerts (user_id, alert) VALUES (?, ?)',
// [userId, JSON.stringify(alert)],
// (err, results) => {
// if (err) {
// console.error("Failed to save alert to DB:", err);
// } else {
// console.log("Alert saved to database.");
// }
// }
// );
// } catch (err) {
// console.error("Database operation failed:", err);
// }
// const ws = userSocketMap.get(userId);
// if (ws && ws.readyState === WebSocket.OPEN) {
// ws.send(JSON.stringify({ type: 'ALERT', payload: alert }));
// return true;
// }
// return false;
// }
// module.exports = {
// initWebSocket,
// sendAlertToUser,
// };
const WebSocket = require('ws');
const jwt = require('jsonwebtoken');
const url = require('url');
const { getPool } = require('../config/database');
const userSocketMap = new Map();
/**
* Sends the user's alert history from DB.
*/
async function sendUserAlertHistory(ws, userId) {
try {
const pool = getPool(); // Assumes mysql2/promise pool
const [results] = await pool.query(
'SELECT * FROM user_alerts WHERE user_id = ? AND acknowledged = 0 ORDER BY created_at DESC',
[userId]
);
console.log(`✅ Fetched ${results.length} unacknowledged alerts for user ${userId}`);
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'ALERT_HISTORY',
alerts: results,
timestamp: new Date().toISOString()
}));
} else {
console.warn(`WebSocket not open for user ${userId}, skipping send.`);
}
} catch (err) {
console.error("❌ Error in sendUserAlertHistory:", err);
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Failed to load unacknowledged alerts',
timestamp: new Date().toISOString()
}));
}
}
}
/**
* Initializes the WebSocket server.
=======
const WebSocket = require('ws');
const jwt = require('jsonwebtoken');
// user_id -> websocket mapping
const userSocketMap = new Map();
// Keep track of connection attempts to prevent rapid reconnection loops
const connectionAttempts = new WeakMap();
/**
* Initializes the WebSocket server.
* @param {https.Server} server - The HTTP/HTTPS server instance from Express.
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
*/
function initWebSocket(server) {
const wss = new WebSocket.Server({
server,
clientTracking: true,
<<<<<<< HEAD
=======
// Consider adding ping/pong for connection health
// clientTracking: true,
// maxPayload: 100 * 1024 * 1024, // 100MB
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
});
console.log('WebSocket server initialized');
<<<<<<< HEAD
wss.on('connection', (ws, req) => {
const location = url.parse(req.url, true);
const token = location.query?.token;
console.log("received token is ",token)
if (!token) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Token missing from URL',
timestamp: new Date().toISOString()
}));
ws.close();
return;
}
const decoded = jwt.decode(token);
const userId = decoded?.claims?.user_id;
if (!userId) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid token or missing user_id',
timestamp: new Date().toISOString()
}));
ws.close();
return;
}
ws.userId = userId;
console.log("userid--",userId)
userSocketMap.set(userId, ws);
ws.send(JSON.stringify({
type: 'CONNECTED',
message: `Connected as ${userId}`,
timestamp: new Date().toISOString()
}));
// ✅ Send alert history on connect
sendUserAlertHistory(ws, userId);
// Keep-alive ping/pong
ws.isAlive = true;
const pingInterval = setInterval(() => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
try {
ws.ping(() => {});
} catch (e) {
console.error('Ping error:', e);
}
}, 30000);
ws.on('pong', () => { ws.isAlive = true; });
=======
// Handle new connections
wss.on('connection', (ws, req) => {
const clientIp = req.socket.remoteAddress;
// console.log(`New WebSocket connection from ${clientIp}`);
// Set up ping/pong to detect dead connections
let isAlive = true;
ws.isAlive = true;
const pingInterval = setInterval(() => {
if (!isAlive) {
console.log('Terminating dead connection');
return ws.terminate();
}
isAlive = false;
try {
ws.ping(() => {});
} catch (e) {
console.error('Error sending ping:', e);
}
}, 30000);
ws.on('pong', () => {
isAlive = true;
});
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
ws.on('message', (message) => {
try {
const data = JSON.parse(message);
<<<<<<< HEAD
if (data.type === 'MESSAGE') {
ws.send(JSON.stringify({
type: 'MESSAGE_RECEIVED',
content: data.content,
timestamp: new Date().toISOString()
}));
}
} catch (err) {
console.error("Message error:", err);
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid message format',
timestamp: new Date().toISOString()
}));
}
});
ws.on('close', () => {
if (ws.userId) userSocketMap.delete(ws.userId);
clearInterval(pingInterval);
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
if (ws.readyState === WebSocket.OPEN) {
ws.close(1011, 'Internal server error');
=======
// Handle different message types
if (data.type === 'MESSAGE') {
console.log(`Received message: ${data.content}`);
ws.send(JSON.stringify({
type: 'MESSAGE_RECEIVED',
content: data.content,
timestamp: new Date().toISOString()
}));
return;
}
// Handle authentication with token
const { token } = data;
if (!token) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'No token provided',
timestamp: new Date().toISOString()
}));
return;
}
const decoded = jwt.decode(token); // Use jwt.verify() in production
const userId = decoded?.claims?.user_id;
if (!userId) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid token or missing user_id in claims',
timestamp: new Date().toISOString()
}));
return;
}
console.log(`Authenticated user: ${userId}`);
// Store the WebSocket connection with the user ID
userSocketMap.set(userId, ws);
// Store user ID in the WebSocket object for cleanup
ws.userId = userId;
ws.send(JSON.stringify({
type: 'CONNECTED',
message: `Connected as ${userId}`,
timestamp: new Date().toISOString()
}));
} catch (err) {
console.error('WebSocket message error:', err);
try {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid message format or processing error',
timestamp: new Date().toISOString()
}));
} catch (sendErr) {
console.error('Failed to send error message:', sendErr);
}
}
});
// Handle connection close
ws.on('close', () => {
// console.log(`WebSocket closed for ${ws.userId || 'unknown user'}`);
if (ws.userId) {
userSocketMap.delete(ws.userId);
}
clearInterval(pingInterval);
});
// Handle errors
ws.on('error', (error) => {
console.error('WebSocket error:', error);
if (ws.readyState === WebSocket.OPEN) {
try {
ws.close(1011, 'Internal server error');
} catch (e) {
// Ignore errors during close
}
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
}
});
});
<<<<<<< HEAD
=======
// Handle server errors
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
wss.on('error', (error) => {
console.error('WebSocket server error:', error);
});
return wss;
}
/**
<<<<<<< HEAD
* Send a real-time alert to a specific user via WebSocket.
*/
async function sendAlertToUser(userId, alert) {
console.log("socket -- invoked---");
console.log("userID---", userId);
console.log("alert---", alert);
try {
const pool = getPool();
pool.query(
'INSERT INTO user_alerts (user_id, alert) VALUES (?, ?)',
[userId, JSON.stringify(alert)],
(err, results) => {
if (err) {
console.error("Failed to save alert to DB:", err);
} else {
console.log("Alert saved to database.");
}
}
);
} catch (err) {
console.error("Database operation failed:", err);
}
=======
* Sends an alert to a user via WebSocket.
* @param {string} userId - The target user ID.
* @param {string|object} alert - The alert message or payload.
* @returns {boolean} - Success/failure.
*/
function sendAlertToUser(userId, alert) {
console.log("socket -- invoked---")
>>>>>>> 3d47f2d539024e036b3db88eb1e020180e656065
const ws = userSocketMap.get(userId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ALERT', payload: alert }));
return true;
}
return false;
}
module.exports = {
initWebSocket,
sendAlertToUser,
};