384 lines
14 KiB
TypeScript
384 lines
14 KiB
TypeScript
import { tatQueue } from '../queues/tatQueue';
|
|
import { calculateDelay, addWorkingHours, addWorkingHoursExpress } from '@utils/tatTimeUtils';
|
|
import { getTatThresholds } from './configReader.service';
|
|
import dayjs from 'dayjs';
|
|
import logger, { logTATEvent } from '@utils/logger';
|
|
import { Priority } from '../types/common.types';
|
|
|
|
export class TatSchedulerService {
|
|
/**
|
|
* Schedule TAT notification jobs for an approval level
|
|
* @param requestId - The workflow request ID
|
|
* @param levelId - The approval level ID
|
|
* @param approverId - The approver user ID
|
|
* @param tatDurationHours - TAT duration in hours
|
|
* @param startTime - Optional start time (defaults to now)
|
|
* @param priority - Request priority (EXPRESS = 24/7, STANDARD = working hours only)
|
|
*/
|
|
async scheduleTatJobs(
|
|
requestId: string,
|
|
levelId: string,
|
|
approverId: string,
|
|
tatDurationHours: number,
|
|
startTime?: Date,
|
|
priority: Priority = Priority.STANDARD
|
|
): Promise<void> {
|
|
try {
|
|
// Check if tatQueue is available
|
|
if (!tatQueue) {
|
|
logger.warn(`[TAT Scheduler] TAT queue not available (Redis not connected). Skipping TAT job scheduling.`);
|
|
return;
|
|
}
|
|
|
|
const now = startTime || new Date();
|
|
// Handle both enum and string (case-insensitive) priority values
|
|
const priorityStr = typeof priority === 'string' ? priority.toUpperCase() : priority;
|
|
const isExpress = priorityStr === Priority.EXPRESS || priorityStr === 'EXPRESS';
|
|
|
|
// Get current thresholds from database configuration
|
|
const thresholds = await getTatThresholds();
|
|
|
|
// Calculate milestone times using configured thresholds
|
|
// EXPRESS mode: 24/7 calculation (includes holidays, weekends, non-working hours)
|
|
// STANDARD mode: Working hours only (excludes holidays, weekends, non-working hours)
|
|
let threshold1Time: Date;
|
|
let threshold2Time: Date;
|
|
let breachTime: Date;
|
|
|
|
if (isExpress) {
|
|
// EXPRESS: All calendar days (Mon-Sun, including weekends/holidays) but working hours only (9 AM - 6 PM)
|
|
const t1 = await addWorkingHoursExpress(now, tatDurationHours * (thresholds.first / 100));
|
|
const t2 = await addWorkingHoursExpress(now, tatDurationHours * (thresholds.second / 100));
|
|
const tBreach = await addWorkingHoursExpress(now, tatDurationHours);
|
|
threshold1Time = t1.toDate();
|
|
threshold2Time = t2.toDate();
|
|
breachTime = tBreach.toDate();
|
|
} else {
|
|
// STANDARD: Working days only (Mon-Fri), working hours (9 AM - 6 PM), excludes holidays
|
|
const t1 = await addWorkingHours(now, tatDurationHours * (thresholds.first / 100));
|
|
const t2 = await addWorkingHours(now, tatDurationHours * (thresholds.second / 100));
|
|
const tBreach = await addWorkingHours(now, tatDurationHours);
|
|
threshold1Time = t1.toDate();
|
|
threshold2Time = t2.toDate();
|
|
breachTime = tBreach.toDate();
|
|
}
|
|
|
|
logger.info(`[TAT Scheduler] Scheduling TAT jobs - Request: ${requestId}, Priority: ${priority}, TAT: ${tatDurationHours}h`);
|
|
|
|
const jobs = [
|
|
{
|
|
type: 'threshold1' as const,
|
|
threshold: thresholds.first,
|
|
delay: calculateDelay(threshold1Time),
|
|
targetTime: threshold1Time
|
|
},
|
|
{
|
|
type: 'threshold2' as const,
|
|
threshold: thresholds.second,
|
|
delay: calculateDelay(threshold2Time),
|
|
targetTime: threshold2Time
|
|
},
|
|
{
|
|
type: 'breach' as const,
|
|
threshold: 100,
|
|
delay: calculateDelay(breachTime),
|
|
targetTime: breachTime
|
|
}
|
|
];
|
|
|
|
|
|
// Check if test mode enabled (1 hour = 1 minute)
|
|
const isTestMode = process.env.TAT_TEST_MODE === 'true';
|
|
|
|
// Check if times collide (working hours calculation issue)
|
|
const uniqueTimes = new Set(jobs.map(j => j.targetTime.getTime()));
|
|
const hasCollision = uniqueTimes.size < jobs.length;
|
|
|
|
let jobIndex = 0;
|
|
for (const job of jobs) {
|
|
if (job.delay < 0) {
|
|
logger.error(`[TAT Scheduler] Skipping ${job.type} - time in past`);
|
|
continue;
|
|
}
|
|
|
|
let spacedDelay: number;
|
|
|
|
if (isTestMode) {
|
|
// Test mode: times are already in minutes (tatTimeUtils converts hours to minutes)
|
|
// Just ensure they have minimum spacing for BullMQ reliability
|
|
spacedDelay = Math.max(job.delay, 5000) + (jobIndex * 5000);
|
|
} else if (hasCollision) {
|
|
// Production with collision: add 5-minute spacing
|
|
spacedDelay = job.delay + (jobIndex * 300000);
|
|
} else {
|
|
// Production without collision: use calculated delays
|
|
spacedDelay = job.delay;
|
|
}
|
|
|
|
const jobId = `tat-${job.type}-${requestId}-${levelId}`;
|
|
|
|
await tatQueue.add(
|
|
job.type,
|
|
{
|
|
type: job.type,
|
|
threshold: job.threshold,
|
|
requestId,
|
|
levelId,
|
|
approverId
|
|
},
|
|
{
|
|
delay: spacedDelay,
|
|
jobId: jobId,
|
|
removeOnComplete: {
|
|
age: 3600, // Keep for 1 hour for debugging
|
|
count: 1000
|
|
},
|
|
removeOnFail: false
|
|
}
|
|
);
|
|
|
|
jobIndex++;
|
|
}
|
|
|
|
logTATEvent('warning', requestId, {
|
|
level: parseInt(levelId.split('-').pop() || '1'),
|
|
tatHours: tatDurationHours,
|
|
priority,
|
|
message: 'TAT jobs scheduled',
|
|
});
|
|
} catch (error) {
|
|
logger.error(`[TAT Scheduler] Failed to schedule TAT jobs:`, error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Schedule TAT jobs on resume - only schedules jobs for alerts that haven't been sent yet
|
|
* @param requestId - The workflow request ID
|
|
* @param levelId - The approval level ID
|
|
* @param approverId - The approver user ID
|
|
* @param remainingTatHours - Remaining TAT duration in hours (from resume point)
|
|
* @param startTime - Resume start time
|
|
* @param priority - Request priority
|
|
* @param alertStatus - Object indicating which alerts have already been sent and percentage used at pause
|
|
*/
|
|
async scheduleTatJobsOnResume(
|
|
requestId: string,
|
|
levelId: string,
|
|
approverId: string,
|
|
remainingTatHours: number,
|
|
startTime: Date,
|
|
priority: Priority = Priority.STANDARD,
|
|
alertStatus: {
|
|
tat50AlertSent: boolean;
|
|
tat75AlertSent: boolean;
|
|
tatBreached: boolean;
|
|
percentageUsedAtPause: number;
|
|
}
|
|
): Promise<void> {
|
|
try {
|
|
if (!tatQueue) {
|
|
logger.warn(`[TAT Scheduler] TAT queue not available (Redis not connected). Skipping TAT job scheduling on resume.`);
|
|
return;
|
|
}
|
|
|
|
const now = startTime;
|
|
// Handle both enum and string (case-insensitive) priority values
|
|
const priorityStr = typeof priority === 'string' ? priority.toUpperCase() : priority;
|
|
const isExpress = priorityStr === Priority.EXPRESS || priorityStr === 'EXPRESS';
|
|
|
|
// Get current thresholds from database configuration
|
|
const thresholds = await getTatThresholds();
|
|
|
|
// Calculate original TAT from remaining + elapsed
|
|
// Example: If 35 min used (58.33%) and 25 min remaining, original TAT = 60 min
|
|
const elapsedHours = alertStatus.percentageUsedAtPause > 0
|
|
? (remainingTatHours * alertStatus.percentageUsedAtPause) / (100 - alertStatus.percentageUsedAtPause)
|
|
: 0;
|
|
const originalTatHours = elapsedHours + remainingTatHours;
|
|
|
|
logger.info(`[TAT Scheduler] Resuming TAT scheduling - Request: ${requestId}, Remaining: ${(remainingTatHours * 60).toFixed(1)} min, Priority: ${isExpress ? 'EXPRESS' : 'STANDARD'}`);
|
|
|
|
// Jobs to schedule - only include those that haven't been sent and haven't been passed
|
|
const jobsToSchedule: Array<{
|
|
type: 'threshold1' | 'threshold2' | 'breach';
|
|
threshold: number;
|
|
alreadySent: boolean;
|
|
alreadyPassed: boolean;
|
|
hoursFromNow: number;
|
|
}> = [];
|
|
|
|
// Threshold 1 (e.g., 50%)
|
|
// Skip if: already sent OR already passed the threshold
|
|
if (!alertStatus.tat50AlertSent && alertStatus.percentageUsedAtPause < thresholds.first) {
|
|
// Calculate: How many hours from NOW until we reach this threshold?
|
|
// Formula: (thresholdHours - elapsedHours)
|
|
// thresholdHours = originalTatHours * (threshold/100)
|
|
const thresholdHours = originalTatHours * (thresholds.first / 100);
|
|
const hoursFromNow = thresholdHours - elapsedHours;
|
|
|
|
if (hoursFromNow > 0) {
|
|
jobsToSchedule.push({
|
|
type: 'threshold1',
|
|
threshold: thresholds.first,
|
|
alreadySent: false,
|
|
alreadyPassed: false,
|
|
hoursFromNow: hoursFromNow
|
|
});
|
|
}
|
|
}
|
|
|
|
// Threshold 2 (e.g., 75%)
|
|
if (!alertStatus.tat75AlertSent && alertStatus.percentageUsedAtPause < thresholds.second) {
|
|
const thresholdHours = originalTatHours * (thresholds.second / 100);
|
|
const hoursFromNow = thresholdHours - elapsedHours;
|
|
|
|
if (hoursFromNow > 0) {
|
|
jobsToSchedule.push({
|
|
type: 'threshold2',
|
|
threshold: thresholds.second,
|
|
alreadySent: false,
|
|
alreadyPassed: false,
|
|
hoursFromNow: hoursFromNow
|
|
});
|
|
}
|
|
}
|
|
|
|
// Breach (100%)
|
|
if (!alertStatus.tatBreached) {
|
|
// Breach is always scheduled for the end of remaining TAT
|
|
jobsToSchedule.push({
|
|
type: 'breach',
|
|
threshold: 100,
|
|
alreadySent: false,
|
|
alreadyPassed: false,
|
|
hoursFromNow: remainingTatHours
|
|
});
|
|
}
|
|
|
|
if (jobsToSchedule.length === 0) {
|
|
logger.info(`[TAT Scheduler] No TAT jobs to schedule (all alerts already sent)`);
|
|
return;
|
|
}
|
|
|
|
// Calculate actual times and schedule jobs
|
|
for (const job of jobsToSchedule) {
|
|
let targetTime: Date;
|
|
|
|
if (isExpress) {
|
|
targetTime = (await addWorkingHoursExpress(now, job.hoursFromNow)).toDate();
|
|
} else {
|
|
targetTime = (await addWorkingHours(now, job.hoursFromNow)).toDate();
|
|
}
|
|
|
|
const delay = calculateDelay(targetTime);
|
|
|
|
if (delay < 0) {
|
|
logger.warn(`[TAT Scheduler] Skipping ${job.type} - calculated time is in past`);
|
|
continue;
|
|
}
|
|
|
|
const jobId = `tat-${job.type}-${requestId}-${levelId}`;
|
|
|
|
await tatQueue.add(
|
|
job.type,
|
|
{
|
|
type: job.type,
|
|
threshold: job.threshold,
|
|
requestId,
|
|
levelId,
|
|
approverId
|
|
},
|
|
{
|
|
delay: delay,
|
|
jobId: jobId,
|
|
removeOnComplete: {
|
|
age: 3600,
|
|
count: 1000
|
|
},
|
|
removeOnFail: false
|
|
}
|
|
);
|
|
|
|
logger.info(`[TAT Scheduler] ✓ Scheduled ${job.type} (${job.threshold}%) for ${dayjs(targetTime).format('YYYY-MM-DD HH:mm')}`);
|
|
}
|
|
|
|
logger.info(`[TAT Scheduler] ✅ ${jobsToSchedule.length} TAT job(s) scheduled for request ${requestId}`);
|
|
} catch (error) {
|
|
logger.error(`[TAT Scheduler] Failed to schedule TAT jobs on resume:`, error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cancel TAT jobs for a specific approval level
|
|
* Useful when an approver acts before TAT expires
|
|
* @param requestId - The workflow request ID
|
|
* @param levelId - The approval level ID
|
|
*/
|
|
async cancelTatJobs(requestId: string, levelId: string): Promise<void> {
|
|
try {
|
|
// Check if tatQueue is available
|
|
if (!tatQueue) {
|
|
logger.warn(`[TAT Scheduler] TAT queue not available. Skipping job cancellation.`);
|
|
return;
|
|
}
|
|
|
|
// Use generic job names that don't depend on threshold percentages
|
|
const jobIds = [
|
|
`tat-threshold1-${requestId}-${levelId}`,
|
|
`tat-threshold2-${requestId}-${levelId}`,
|
|
`tat-breach-${requestId}-${levelId}`
|
|
];
|
|
|
|
for (const jobId of jobIds) {
|
|
try {
|
|
const job = await tatQueue.getJob(jobId);
|
|
if (job) {
|
|
await job.remove();
|
|
logger.info(`[TAT Scheduler] Cancelled job ${jobId}`);
|
|
}
|
|
} catch (error) {
|
|
// Job might not exist, which is fine
|
|
logger.debug(`[TAT Scheduler] Job ${jobId} not found (may have already been processed)`);
|
|
}
|
|
}
|
|
|
|
logger.info(`[TAT Scheduler] ✅ TAT jobs cancelled for level ${levelId}`);
|
|
} catch (error) {
|
|
logger.error(`[TAT Scheduler] Failed to cancel TAT jobs:`, error);
|
|
// Don't throw - cancellation failure shouldn't break the workflow
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cancel all TAT jobs for a workflow request
|
|
* @param requestId - The workflow request ID
|
|
*/
|
|
async cancelAllTatJobsForRequest(requestId: string): Promise<void> {
|
|
try {
|
|
// Check if tatQueue is available
|
|
if (!tatQueue) {
|
|
logger.warn(`[TAT Scheduler] TAT queue not available. Skipping job cancellation.`);
|
|
return;
|
|
}
|
|
|
|
const jobs = await tatQueue.getJobs(['delayed', 'waiting']);
|
|
const requestJobs = jobs.filter(job => job.data.requestId === requestId);
|
|
|
|
for (const job of requestJobs) {
|
|
await job.remove();
|
|
logger.info(`[TAT Scheduler] Cancelled job ${job.id}`);
|
|
}
|
|
|
|
logger.info(`[TAT Scheduler] ✅ All TAT jobs cancelled for request ${requestId}`);
|
|
} catch (error) {
|
|
logger.error(`[TAT Scheduler] Failed to cancel all TAT jobs:`, error);
|
|
// Don't throw - cancellation failure shouldn't break the workflow
|
|
}
|
|
}
|
|
}
|
|
|
|
export const tatSchedulerService = new TatSchedulerService();
|
|
|