Re_Backend/src/services/tatScheduler.service.ts

384 lines
14 KiB
TypeScript

import { tatQueue } from '../queues/tatQueue';
import { calculateDelay, addWorkingHours, addWorkingHoursExpress } from '@utils/tatTimeUtils';
import { getTatThresholds } from './configReader.service';
import dayjs from 'dayjs';
import logger, { logTATEvent } from '@utils/logger';
import { Priority } from '../types/common.types';
export class TatSchedulerService {
/**
* Schedule TAT notification jobs for an approval level
* @param requestId - The workflow request ID
* @param levelId - The approval level ID
* @param approverId - The approver user ID
* @param tatDurationHours - TAT duration in hours
* @param startTime - Optional start time (defaults to now)
* @param priority - Request priority (EXPRESS = 24/7, STANDARD = working hours only)
*/
async scheduleTatJobs(
requestId: string,
levelId: string,
approverId: string,
tatDurationHours: number,
startTime?: Date,
priority: Priority = Priority.STANDARD
): Promise<void> {
try {
// Check if tatQueue is available
if (!tatQueue) {
logger.warn(`[TAT Scheduler] TAT queue not available (Redis not connected). Skipping TAT job scheduling.`);
return;
}
const now = startTime || new Date();
// Handle both enum and string (case-insensitive) priority values
const priorityStr = typeof priority === 'string' ? priority.toUpperCase() : priority;
const isExpress = priorityStr === Priority.EXPRESS || priorityStr === 'EXPRESS';
// Get current thresholds from database configuration
const thresholds = await getTatThresholds();
// Calculate milestone times using configured thresholds
// EXPRESS mode: 24/7 calculation (includes holidays, weekends, non-working hours)
// STANDARD mode: Working hours only (excludes holidays, weekends, non-working hours)
let threshold1Time: Date;
let threshold2Time: Date;
let breachTime: Date;
if (isExpress) {
// EXPRESS: All calendar days (Mon-Sun, including weekends/holidays) but working hours only (9 AM - 6 PM)
const t1 = await addWorkingHoursExpress(now, tatDurationHours * (thresholds.first / 100));
const t2 = await addWorkingHoursExpress(now, tatDurationHours * (thresholds.second / 100));
const tBreach = await addWorkingHoursExpress(now, tatDurationHours);
threshold1Time = t1.toDate();
threshold2Time = t2.toDate();
breachTime = tBreach.toDate();
} else {
// STANDARD: Working days only (Mon-Fri), working hours (9 AM - 6 PM), excludes holidays
const t1 = await addWorkingHours(now, tatDurationHours * (thresholds.first / 100));
const t2 = await addWorkingHours(now, tatDurationHours * (thresholds.second / 100));
const tBreach = await addWorkingHours(now, tatDurationHours);
threshold1Time = t1.toDate();
threshold2Time = t2.toDate();
breachTime = tBreach.toDate();
}
logger.info(`[TAT Scheduler] Scheduling TAT jobs - Request: ${requestId}, Priority: ${priority}, TAT: ${tatDurationHours}h`);
const jobs = [
{
type: 'threshold1' as const,
threshold: thresholds.first,
delay: calculateDelay(threshold1Time),
targetTime: threshold1Time
},
{
type: 'threshold2' as const,
threshold: thresholds.second,
delay: calculateDelay(threshold2Time),
targetTime: threshold2Time
},
{
type: 'breach' as const,
threshold: 100,
delay: calculateDelay(breachTime),
targetTime: breachTime
}
];
// Check if test mode enabled (1 hour = 1 minute)
const isTestMode = process.env.TAT_TEST_MODE === 'true';
// Check if times collide (working hours calculation issue)
const uniqueTimes = new Set(jobs.map(j => j.targetTime.getTime()));
const hasCollision = uniqueTimes.size < jobs.length;
let jobIndex = 0;
for (const job of jobs) {
if (job.delay < 0) {
logger.error(`[TAT Scheduler] Skipping ${job.type} - time in past`);
continue;
}
let spacedDelay: number;
if (isTestMode) {
// Test mode: times are already in minutes (tatTimeUtils converts hours to minutes)
// Just ensure they have minimum spacing for BullMQ reliability
spacedDelay = Math.max(job.delay, 5000) + (jobIndex * 5000);
} else if (hasCollision) {
// Production with collision: add 5-minute spacing
spacedDelay = job.delay + (jobIndex * 300000);
} else {
// Production without collision: use calculated delays
spacedDelay = job.delay;
}
const jobId = `tat-${job.type}-${requestId}-${levelId}`;
await tatQueue.add(
job.type,
{
type: job.type,
threshold: job.threshold,
requestId,
levelId,
approverId
},
{
delay: spacedDelay,
jobId: jobId,
removeOnComplete: {
age: 3600, // Keep for 1 hour for debugging
count: 1000
},
removeOnFail: false
}
);
jobIndex++;
}
logTATEvent('warning', requestId, {
level: parseInt(levelId.split('-').pop() || '1'),
tatHours: tatDurationHours,
priority,
message: 'TAT jobs scheduled',
});
} catch (error) {
logger.error(`[TAT Scheduler] Failed to schedule TAT jobs:`, error);
throw error;
}
}
/**
* Schedule TAT jobs on resume - only schedules jobs for alerts that haven't been sent yet
* @param requestId - The workflow request ID
* @param levelId - The approval level ID
* @param approverId - The approver user ID
* @param remainingTatHours - Remaining TAT duration in hours (from resume point)
* @param startTime - Resume start time
* @param priority - Request priority
* @param alertStatus - Object indicating which alerts have already been sent and percentage used at pause
*/
async scheduleTatJobsOnResume(
requestId: string,
levelId: string,
approverId: string,
remainingTatHours: number,
startTime: Date,
priority: Priority = Priority.STANDARD,
alertStatus: {
tat50AlertSent: boolean;
tat75AlertSent: boolean;
tatBreached: boolean;
percentageUsedAtPause: number;
}
): Promise<void> {
try {
if (!tatQueue) {
logger.warn(`[TAT Scheduler] TAT queue not available (Redis not connected). Skipping TAT job scheduling on resume.`);
return;
}
const now = startTime;
// Handle both enum and string (case-insensitive) priority values
const priorityStr = typeof priority === 'string' ? priority.toUpperCase() : priority;
const isExpress = priorityStr === Priority.EXPRESS || priorityStr === 'EXPRESS';
// Get current thresholds from database configuration
const thresholds = await getTatThresholds();
// Calculate original TAT from remaining + elapsed
// Example: If 35 min used (58.33%) and 25 min remaining, original TAT = 60 min
const elapsedHours = alertStatus.percentageUsedAtPause > 0
? (remainingTatHours * alertStatus.percentageUsedAtPause) / (100 - alertStatus.percentageUsedAtPause)
: 0;
const originalTatHours = elapsedHours + remainingTatHours;
logger.info(`[TAT Scheduler] Resuming TAT scheduling - Request: ${requestId}, Remaining: ${(remainingTatHours * 60).toFixed(1)} min, Priority: ${isExpress ? 'EXPRESS' : 'STANDARD'}`);
// Jobs to schedule - only include those that haven't been sent and haven't been passed
const jobsToSchedule: Array<{
type: 'threshold1' | 'threshold2' | 'breach';
threshold: number;
alreadySent: boolean;
alreadyPassed: boolean;
hoursFromNow: number;
}> = [];
// Threshold 1 (e.g., 50%)
// Skip if: already sent OR already passed the threshold
if (!alertStatus.tat50AlertSent && alertStatus.percentageUsedAtPause < thresholds.first) {
// Calculate: How many hours from NOW until we reach this threshold?
// Formula: (thresholdHours - elapsedHours)
// thresholdHours = originalTatHours * (threshold/100)
const thresholdHours = originalTatHours * (thresholds.first / 100);
const hoursFromNow = thresholdHours - elapsedHours;
if (hoursFromNow > 0) {
jobsToSchedule.push({
type: 'threshold1',
threshold: thresholds.first,
alreadySent: false,
alreadyPassed: false,
hoursFromNow: hoursFromNow
});
}
}
// Threshold 2 (e.g., 75%)
if (!alertStatus.tat75AlertSent && alertStatus.percentageUsedAtPause < thresholds.second) {
const thresholdHours = originalTatHours * (thresholds.second / 100);
const hoursFromNow = thresholdHours - elapsedHours;
if (hoursFromNow > 0) {
jobsToSchedule.push({
type: 'threshold2',
threshold: thresholds.second,
alreadySent: false,
alreadyPassed: false,
hoursFromNow: hoursFromNow
});
}
}
// Breach (100%)
if (!alertStatus.tatBreached) {
// Breach is always scheduled for the end of remaining TAT
jobsToSchedule.push({
type: 'breach',
threshold: 100,
alreadySent: false,
alreadyPassed: false,
hoursFromNow: remainingTatHours
});
}
if (jobsToSchedule.length === 0) {
logger.info(`[TAT Scheduler] No TAT jobs to schedule (all alerts already sent)`);
return;
}
// Calculate actual times and schedule jobs
for (const job of jobsToSchedule) {
let targetTime: Date;
if (isExpress) {
targetTime = (await addWorkingHoursExpress(now, job.hoursFromNow)).toDate();
} else {
targetTime = (await addWorkingHours(now, job.hoursFromNow)).toDate();
}
const delay = calculateDelay(targetTime);
if (delay < 0) {
logger.warn(`[TAT Scheduler] Skipping ${job.type} - calculated time is in past`);
continue;
}
const jobId = `tat-${job.type}-${requestId}-${levelId}`;
await tatQueue.add(
job.type,
{
type: job.type,
threshold: job.threshold,
requestId,
levelId,
approverId
},
{
delay: delay,
jobId: jobId,
removeOnComplete: {
age: 3600,
count: 1000
},
removeOnFail: false
}
);
logger.info(`[TAT Scheduler] ✓ Scheduled ${job.type} (${job.threshold}%) for ${dayjs(targetTime).format('YYYY-MM-DD HH:mm')}`);
}
logger.info(`[TAT Scheduler] ✅ ${jobsToSchedule.length} TAT job(s) scheduled for request ${requestId}`);
} catch (error) {
logger.error(`[TAT Scheduler] Failed to schedule TAT jobs on resume:`, error);
throw error;
}
}
/**
* Cancel TAT jobs for a specific approval level
* Useful when an approver acts before TAT expires
* @param requestId - The workflow request ID
* @param levelId - The approval level ID
*/
async cancelTatJobs(requestId: string, levelId: string): Promise<void> {
try {
// Check if tatQueue is available
if (!tatQueue) {
logger.warn(`[TAT Scheduler] TAT queue not available. Skipping job cancellation.`);
return;
}
// Use generic job names that don't depend on threshold percentages
const jobIds = [
`tat-threshold1-${requestId}-${levelId}`,
`tat-threshold2-${requestId}-${levelId}`,
`tat-breach-${requestId}-${levelId}`
];
for (const jobId of jobIds) {
try {
const job = await tatQueue.getJob(jobId);
if (job) {
await job.remove();
logger.info(`[TAT Scheduler] Cancelled job ${jobId}`);
}
} catch (error) {
// Job might not exist, which is fine
logger.debug(`[TAT Scheduler] Job ${jobId} not found (may have already been processed)`);
}
}
logger.info(`[TAT Scheduler] ✅ TAT jobs cancelled for level ${levelId}`);
} catch (error) {
logger.error(`[TAT Scheduler] Failed to cancel TAT jobs:`, error);
// Don't throw - cancellation failure shouldn't break the workflow
}
}
/**
* Cancel all TAT jobs for a workflow request
* @param requestId - The workflow request ID
*/
async cancelAllTatJobsForRequest(requestId: string): Promise<void> {
try {
// Check if tatQueue is available
if (!tatQueue) {
logger.warn(`[TAT Scheduler] TAT queue not available. Skipping job cancellation.`);
return;
}
const jobs = await tatQueue.getJobs(['delayed', 'waiting']);
const requestJobs = jobs.filter(job => job.data.requestId === requestId);
for (const job of requestJobs) {
await job.remove();
logger.info(`[TAT Scheduler] Cancelled job ${job.id}`);
}
logger.info(`[TAT Scheduler] ✅ All TAT jobs cancelled for request ${requestId}`);
} catch (error) {
logger.error(`[TAT Scheduler] Failed to cancel all TAT jobs:`, error);
// Don't throw - cancellation failure shouldn't break the workflow
}
}
}
export const tatSchedulerService = new TatSchedulerService();