Re_Backend/src/queues/pauseResumeWorker.ts

78 lines
2.6 KiB
TypeScript

import { Worker } from 'bullmq';
import { sharedRedisConnection } from './redisConnection';
import { handlePauseResumeJob } from './pauseResumeProcessor';
import logger from '@utils/logger';
let pauseResumeWorker: Worker | null = null;
try {
pauseResumeWorker = new Worker('pauseResumeQueue', handlePauseResumeJob, {
connection: sharedRedisConnection,
concurrency: 1, // Process one at a time to avoid race conditions
autorun: true,
limiter: {
max: 1,
duration: 1000
}
});
if (pauseResumeWorker) {
pauseResumeWorker.on('ready', () => {
logger.info('[Pause Resume Worker] ✅ Ready and listening for pause resume jobs');
});
pauseResumeWorker.on('active', (job) => {
logger.info(`[Pause Resume Worker] Processing: ${job.name} (${job.id})`);
});
pauseResumeWorker.on('completed', (job) => {
logger.info(`[Pause Resume Worker] Completed: ${job.name} (${job.id})`);
});
pauseResumeWorker.on('failed', (job, err) => {
logger.error(`[Pause Resume Worker] Failed: ${job?.name} (${job?.id})`, err?.message || err);
});
pauseResumeWorker.on('error', (err) => {
// Connection errors are common if Redis is unavailable - log as warning
const errorCode = (err as any)?.code;
const isConnectionError = err?.message?.includes('connect') ||
err?.message?.includes('ECONNREFUSED') ||
err?.message?.includes('Redis') ||
errorCode === 'ECONNREFUSED';
if (isConnectionError) {
logger.warn('[Pause Resume Worker] Connection issue (Redis may be unavailable):', err?.message || errorCode || String(err));
} else {
// Log full error details for non-connection errors to diagnose issues
logger.error('[Pause Resume Worker] Error:', {
message: err?.message || 'Unknown error',
code: errorCode,
name: err?.name,
stack: err?.stack
});
}
});
}
} catch (workerError: any) {
logger.error('[Pause Resume Worker] Failed to create worker:', workerError?.message || workerError);
pauseResumeWorker = null;
}
// Graceful shutdown
process.on('SIGTERM', async () => {
if (pauseResumeWorker) {
logger.info('[Pause Resume Worker] SIGTERM received, closing worker...');
await pauseResumeWorker.close();
}
});
process.on('SIGINT', async () => {
if (pauseResumeWorker) {
logger.info('[Pause Resume Worker] SIGINT received, closing worker...');
await pauseResumeWorker.close();
}
});
export { pauseResumeWorker };