import { google } from 'googleapis'; import fs from 'fs/promises'; import path from 'path'; import { gmailConfig } from '../config/gmail.config'; import { IncomingEmail } from '../models/IncomingEmail'; import { WorkflowRequest } from '../models/WorkflowRequest'; import { ApprovalLevel } from '../models/ApprovalLevel'; import { User } from '../models/User'; import { ApprovalService } from './approval.service'; import { parseEmailAction } from '../utils/gmailParser'; import { ApprovalAction } from '../types/approval.types'; import logger from '../utils/logger'; const approvalService = new ApprovalService(); export class GmailService { private auth: any; private gmail: any; constructor() { this.initAuth(); } private initAuth() { try { const keyPath = path.resolve(gmailConfig.serviceAccountPath); // Use Service Account with Domain-Wide Delegation to impersonate the approval mailbox this.auth = new google.auth.JWT({ keyFile: keyPath, scopes: ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/gmail.settings.basic'], subject: gmailConfig.impersonateEmail }); this.gmail = google.gmail({ version: 'v1', auth: this.auth }); logger.info(`[GmailService] Initialized for ${gmailConfig.impersonateEmail}`); } catch (error) { logger.error(`[GmailService] Failed to initialize Google Auth:`, error); } } /** * Setup Gmail Watch for Pub/Sub notifications */ async setupWatch() { try { const res = await this.gmail.users.watch({ userId: 'me', requestBody: { labelIds: ['INBOX'], topicName: `projects/${process.env.GCP_PROJECT_ID}/topics/${gmailConfig.pubsubTopic}` } }); logger.info(`[GmailService] Watch setup successfully:`, res.data); return res.data; } catch (error) { logger.error(`[GmailService] Failed to setup watch:`, error); throw error; } } /** * Process notification from Pub/Sub */ async processNotification(notificationData: any) { try { const { emailAddress, historyId } = notificationData; logger.info(`[GmailService] Received notification for ${emailAddress}, historyId: ${historyId}`); // In a production environment, you might want to store the last processed historyId // and only fetch changes since then using users.history.list. // For simplicity in this requirement, we'll list the latest messages. const res = await this.gmail.users.messages.list({ userId: 'me', maxResults: 10, q: 'label:INBOX is:unread' }); const messages = res.data.messages || []; for (const msg of messages) { await this.processMessage(msg.id); } } catch (error) { logger.error(`[GmailService] Error processing notification:`, error); } } /** * Fetch and process a single message */ private async processMessage(messageId: string) { try { // 1. Check if already processed const existing = await IncomingEmail.findOne({ where: { messageId } }); if (existing && existing.processed) return; // 2. Fetch full message const res = await this.gmail.users.messages.get({ userId: 'me', id: messageId, format: 'full' }); const message = res.data; const headers = message.payload?.headers || []; const from = headers.find((h: any) => h.name?.toLowerCase() === 'from')?.value || ''; const to = headers.find((h: any) => h.name?.toLowerCase() === 'to')?.value || ''; const subject = headers.find((h: any) => h.name?.toLowerCase() === 'subject')?.value || ''; // Extract body let body = ''; if (message.payload?.parts) { const textPart = message.payload.parts.find((p: any) => p.mimeType === 'text/plain'); if (textPart && textPart.body?.data) { body = Buffer.from(textPart.body.data, 'base64').toString(); } } else if (message.payload?.body?.data) { body = Buffer.from(message.payload.body.data, 'base64').toString(); } logger.info(`[GmailService] Processing message ${messageId} from ${from}: ${subject}`); // 3. Save to database const incomingEmail = await IncomingEmail.create({ messageId, threadId: message.threadId!, from, to, subject, body, receivedAt: new Date(parseInt(message.internalDate!)), processed: false } as any); // 4. Parse Action const { action, requestNumber, comments } = parseEmailAction(subject, body); if (action === 'NONE' || !requestNumber) { logger.info(`[GmailService] No action or request number found in message ${messageId}`); await incomingEmail.update({ processed: true, actionTaken: 'NONE' }); // Mark as read anyway? await this.markAsRead(messageId); return; } // 5. Apply Workflow Logic await this.applyWorkflowAction(incomingEmail, { action, requestNumber, comments }); // 6. Mark message as read/processed in Gmail await this.markAsRead(messageId); } catch (error) { logger.error(`[GmailService] Error processing message ${messageId}:`, error); } } private async applyWorkflowAction(incomingEmail: any, parsedAction: any) { const { action, requestNumber, comments } = parsedAction; try { // 1. Find Request const request = await WorkflowRequest.findOne({ where: { requestNumber } }); if (!request) { throw new Error(`Request ${requestNumber} not found`); } // 2. Resolve User by Email // Extract email from "Name " const emailMatch = incomingEmail.from.match(/<(.+?)>/) || [null, incomingEmail.from]; const approverEmail = emailMatch[1].trim(); const user = await User.findOne({ where: { email: approverEmail } }); if (!user) { throw new Error(`User with email ${approverEmail} not found`); } // 3. Find current pending level for this user and request const currentLevel = await ApprovalLevel.findOne({ where: { requestId: request.requestId, approverId: user.userId, status: 'IN_PROGRESS' // Or PENDING if it was just assigned } }); if (!currentLevel) { // Broaden search to PENDING if not yet IN_PROGRESS (some implementations delay IN_PROGRESS) const pendingLevel = await ApprovalLevel.findOne({ where: { requestId: request.requestId, approverId: user.userId, status: 'PENDING' } }); if (!pendingLevel) { throw new Error(`No pending/in-progress approval step found for user ${approverEmail} on request ${requestNumber}`); } // Use pending level await this.executeApproval(request, pendingLevel, action, user, comments, incomingEmail); } else { await this.executeApproval(request, currentLevel, action, user, comments, incomingEmail); } } catch (error: any) { logger.error(`[GmailService] Workflow action failed for ${requestNumber}:`, error); await incomingEmail.update({ processed: true, error: error.message, actionTaken: action }); } } private async executeApproval(request: any, level: any, action: string, user: any, comments: string, incomingEmail: any) { const approvalAction: ApprovalAction = { action: action as any, comments: comments || `${action} via email`, rejectionReason: action === 'REJECT' ? (comments || 'Rejected via email') : undefined }; logger.info(`[GmailService] Executing ${action} for ${request.requestNumber} level ${level.levelNumber} by ${user.email}`); await approvalService.approveLevel( level.levelId, approvalAction, user.userId, { ipAddress: 'GMAIL_WEBHOOK', userAgent: 'Gmail/PubSub' } ); await incomingEmail.update({ processed: true, actionTaken: action, parsedComments: comments, requestId: request.requestId }); logger.info(`[GmailService] ✅ Workflow ${request.requestNumber} updated successfully via email.`); } private async markAsRead(messageId: string) { try { await this.gmail.users.messages.batchModify({ userId: 'me', requestBody: { ids: [messageId], removeLabelIds: ['UNREAD'] } }); } catch (error) { logger.error(`[GmailService] Failed to mark message ${messageId} as read:`, error); } } } export const gmailService = new GmailService();