AI-Voice-Agent-Node/services/webSocket.js
2025-05-26 22:26:00 +05:30

169 lines
4.4 KiB
JavaScript

const WebSocket = require('ws');
const jwt = require('jsonwebtoken');
// user_id -> websocket mapping
const userSocketMap = new Map();
// Keep track of connection attempts to prevent rapid reconnection loops
const connectionAttempts = new WeakMap();
/**
* Initializes the WebSocket server.
* @param {https.Server} server - The HTTP/HTTPS server instance from Express.
*/
function initWebSocket(server) {
const wss = new WebSocket.Server({
server,
clientTracking: true,
// Consider adding ping/pong for connection health
// clientTracking: true,
// maxPayload: 100 * 1024 * 1024, // 100MB
});
console.log('WebSocket server initialized');
// Handle new connections
wss.on('connection', (ws, req) => {
const clientIp = req.socket.remoteAddress;
// console.log(`New WebSocket connection from ${clientIp}`);
// Set up ping/pong to detect dead connections
let isAlive = true;
ws.isAlive = true;
const pingInterval = setInterval(() => {
if (!isAlive) {
console.log('Terminating dead connection');
return ws.terminate();
}
isAlive = false;
try {
ws.ping(() => {});
} catch (e) {
console.error('Error sending ping:', e);
}
}, 30000);
ws.on('pong', () => {
isAlive = true;
});
ws.on('message', (message) => {
try {
const data = JSON.parse(message);
// Handle different message types
if (data.type === 'MESSAGE') {
console.log(`Received message: ${data.content}`);
ws.send(JSON.stringify({
type: 'MESSAGE_RECEIVED',
content: data.content,
timestamp: new Date().toISOString()
}));
return;
}
// Handle authentication with token
const { token } = data;
if (!token) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'No token provided',
timestamp: new Date().toISOString()
}));
return;
}
const decoded = jwt.decode(token); // Use jwt.verify() in production
const userId = decoded?.claims?.user_id;
if (!userId) {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid token or missing user_id in claims',
timestamp: new Date().toISOString()
}));
return;
}
console.log(`Authenticated user: ${userId}`);
// Store the WebSocket connection with the user ID
userSocketMap.set(userId, ws);
// Store user ID in the WebSocket object for cleanup
ws.userId = userId;
ws.send(JSON.stringify({
type: 'CONNECTED',
message: `Connected as ${userId}`,
timestamp: new Date().toISOString()
}));
} catch (err) {
console.error('WebSocket message error:', err);
try {
ws.send(JSON.stringify({
type: 'ERROR',
error: 'Invalid message format or processing error',
timestamp: new Date().toISOString()
}));
} catch (sendErr) {
console.error('Failed to send error message:', sendErr);
}
}
});
// Handle connection close
ws.on('close', () => {
// console.log(`WebSocket closed for ${ws.userId || 'unknown user'}`);
if (ws.userId) {
userSocketMap.delete(ws.userId);
}
clearInterval(pingInterval);
});
// Handle errors
ws.on('error', (error) => {
console.error('WebSocket error:', error);
if (ws.readyState === WebSocket.OPEN) {
try {
ws.close(1011, 'Internal server error');
} catch (e) {
// Ignore errors during close
}
}
});
});
// Handle server errors
wss.on('error', (error) => {
console.error('WebSocket server error:', error);
});
return wss;
}
/**
* Sends an alert to a user via WebSocket.
* @param {string} userId - The target user ID.
* @param {string|object} alert - The alert message or payload.
* @returns {boolean} - Success/failure.
*/
function sendAlertToUser(userId, alert) {
console.log("socket -- invoked---")
const ws = userSocketMap.get(userId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ALERT', payload: alert }));
return true;
}
return false;
}
module.exports = {
initWebSocket,
sendAlertToUser,
};