Re_Backend/src/queues/tatWorker.ts

63 lines
1.5 KiB
TypeScript

import { Worker } from 'bullmq';
import { sharedRedisConnection } from './redisConnection';
import { handleTatJob } from './tatProcessor';
import logger from '@utils/logger';
let tatWorker: Worker | null = null;
try {
tatWorker = new Worker('tatQueue', handleTatJob, {
connection: sharedRedisConnection,
concurrency: 5,
autorun: true,
limiter: {
max: 10,
duration: 1000
}
});
logger.info('[TAT Worker] Worker initialized');
if (tatWorker) {
tatWorker.on('ready', () => {
logger.info('[TAT Worker] Ready and listening');
});
tatWorker.on('active', (job) => {
logger.info(`[TAT Worker] Processing: ${job.name}`);
});
tatWorker.on('completed', (job) => {
logger.info(`[TAT Worker] Completed: ${job.name}`);
});
tatWorker.on('failed', (job, err) => {
logger.error(`[TAT Worker] Failed: ${job?.name}`, err.message);
});
tatWorker.on('error', (err) => {
logger.error('[TAT Worker] Error:', err.message);
});
}
} catch (workerError: any) {
logger.error('[TAT Worker] Failed to create worker:', workerError);
tatWorker = null;
}
// Graceful shutdown
process.on('SIGTERM', async () => {
if (tatWorker) {
logger.info('[TAT Worker] SIGTERM received, closing worker...');
await tatWorker.close();
}
});
process.on('SIGINT', async () => {
if (tatWorker) {
logger.info('[TAT Worker] SIGINT received, closing worker...');
await tatWorker.close();
}
});
export { tatWorker };