Re_Backend/src/queues/tatWorker.ts

69 lines
2.0 KiB
TypeScript

import { Worker } from 'bullmq';
import { sharedRedisConnection } from './redisConnection';
import { handleTatJob } from './tatProcessor';
import logger from '@utils/logger';
let tatWorker: Worker | null = null;
try {
tatWorker = new Worker('tatQueue', handleTatJob, {
connection: sharedRedisConnection,
concurrency: 5,
autorun: true,
limiter: {
max: 10,
duration: 1000
}
});
if (tatWorker) {
tatWorker.on('ready', () => {
logger.info('[TAT Worker] ✅ Ready and listening for TAT jobs');
});
tatWorker.on('active', (job) => {
logger.info(`[TAT Worker] Processing: ${job.name} for request ${job.data.requestId}`);
});
tatWorker.on('completed', (job) => {
logger.info(`[TAT Worker] Completed: ${job.name}`);
});
tatWorker.on('failed', (job, err) => {
logger.error(`[TAT Worker] Failed: ${job?.name} (${job?.id})`, err?.message || err);
});
tatWorker.on('error', (err) => {
// Connection errors are common if Redis is unavailable - log as warning
const isConnectionError = err?.message?.includes('connect') ||
err?.message?.includes('ECONNREFUSED') ||
err?.message?.includes('Redis');
if (isConnectionError) {
logger.warn('[TAT Worker] Connection issue (Redis may be unavailable):', err?.message || err);
} else {
logger.error('[TAT Worker] Error:', err?.message || err);
}
});
}
} catch (workerError: any) {
logger.error('[TAT Worker] Failed to create worker:', workerError?.message || workerError);
tatWorker = null;
}
// Graceful shutdown
process.on('SIGTERM', async () => {
if (tatWorker) {
logger.info('[TAT Worker] SIGTERM received, closing worker...');
await tatWorker.close();
}
});
process.on('SIGINT', async () => {
if (tatWorker) {
logger.info('[TAT Worker] SIGINT received, closing worker...');
await tatWorker.close();
}
});
export { tatWorker };