Centralized_Reporting_Backend/src/integrations/zoho/handler.js

763 lines
27 KiB
JavaScript

// Webhook/event handler for Zoho integration
const crypto = require('crypto');
const logger = require('../../utils/logger');
class ZohoHandler {
constructor() {
this.webhookSecret = process.env.ZOHO_WEBHOOK_SECRET || 'changeme';
}
// Verify webhook signature from Zoho
verifyWebhookSignature(payload, signature, secret = null) {
const webhookSecret = secret || this.webhookSecret;
const expectedSignature = crypto
.createHmac('sha256', webhookSecret)
.update(payload, 'utf8')
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature, 'hex'),
Buffer.from(expectedSignature, 'hex')
);
}
// Handle Zoho CRM webhook events
async handleCrmWebhook(req, res) {
console.log('handleCrmWebhook', JSON.stringify(req.body));
try {
const signature = req.headers['x-zoho-signature'];
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho CRM webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
const { event, data } = req.body;
logger.info('Zoho CRM webhook received', {
correlationId: logger.getCorrelationId(req),
event,
recordId: data?.id
});
// Process different CRM events
switch (event) {
case 'leads.create':
await this.handleLeadCreated(data);
break;
case 'leads.update':
await this.handleLeadUpdated(data);
break;
case 'leads.delete':
await this.handleLeadDeleted(data);
break;
case 'contacts.create':
await this.handleContactCreated(data);
break;
case 'contacts.update':
await this.handleContactUpdated(data);
break;
case 'deals.create':
await this.handleDealCreated(data);
break;
case 'deals.update':
await this.handleDealUpdated(data);
break;
default:
logger.warn('Unknown Zoho CRM event', { event });
}
res.json({ status: 'success', message: 'Webhook processed' });
} catch (error) {
logger.error('Zoho CRM webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({ status: 'error', message: 'Webhook processing failed' });
}
}
// Handle Zoho People webhook events
async handlePeopleWebhook(req, res) {
try {
const signature = req.headers['x-zoho-signature'];
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho People webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
const { event, data } = req.body;
logger.info('Zoho People webhook received', {
correlationId: logger.getCorrelationId(req),
event,
employeeId: data?.id
});
// Process different People events
switch (event) {
case 'employees.create':
await this.handleEmployeeCreated(data);
break;
case 'employees.update':
await this.handleEmployeeUpdated(data);
break;
case 'employees.delete':
await this.handleEmployeeDeleted(data);
break;
default:
logger.warn('Unknown Zoho People event', { event });
}
res.json({ status: 'success', message: 'Webhook processed' });
} catch (error) {
logger.error('Zoho People webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({ status: 'error', message: 'Webhook processing failed' });
}
}
// Handle Zoho Projects webhook events
async handleProjectsWebhook(req, res) {
try {
const signature = req.headers['x-zoho-signature'];
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho Projects webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
const { event, data } = req.body;
logger.info('Zoho Projects webhook received', {
correlationId: logger.getCorrelationId(req),
event,
projectId: data?.id
});
// Process different Projects events
switch (event) {
case 'projects.create':
await this.handleProjectCreated(data);
break;
case 'projects.update':
await this.handleProjectUpdated(data);
break;
case 'tasks.create':
await this.handleTaskCreated(data);
break;
case 'tasks.update':
await this.handleTaskUpdated(data);
break;
default:
logger.warn('Unknown Zoho Projects event', { event });
}
res.json({ status: 'success', message: 'Webhook processed' });
} catch (error) {
logger.error('Zoho Projects webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({ status: 'error', message: 'Webhook processing failed' });
}
}
// Handle Zoho Books webhook events
async handleBooksWebhook(req, res) {
try {
const signature = req.headers['x-zoho-signature'];
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho Books webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
const { event, data } = req.body;
logger.info('Zoho Books webhook received', {
correlationId: logger.getCorrelationId(req),
event,
recordId: data?.id
});
// Process different Books events
switch (event) {
case 'customers.create':
await this.handleCustomerCreated(data);
break;
case 'customers.update':
await this.handleCustomerUpdated(data);
break;
case 'invoices.create':
await this.handleBooksInvoiceCreated(data);
break;
case 'invoices.update':
await this.handleBooksInvoiceUpdated(data);
break;
case 'bills.create':
await this.handleBillCreated(data);
break;
case 'bills.update':
await this.handleBillUpdated(data);
break;
case 'expenses.create':
await this.handleExpenseCreated(data);
break;
case 'expenses.update':
await this.handleExpenseUpdated(data);
break;
default:
logger.warn('Unknown Zoho Books event', { event });
}
res.json({ status: 'success', message: 'Webhook processed' });
} catch (error) {
logger.error('Zoho Books webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({ status: 'error', message: 'Webhook processing failed' });
}
}
// CRM Event Handlers
async handleLeadCreated(data) {
logger.info('Lead created', { leadId: data.id, name: data.Full_Name });
// Add your business logic here - e.g., sync to other systems
}
async handleLeadUpdated(data) {
logger.info('Lead updated', { leadId: data.id, name: data.Full_Name });
// Add your business logic here
}
async handleLeadDeleted(data) {
logger.info('Lead deleted', { leadId: data.id });
// Add your business logic here
}
async handleContactCreated(data) {
logger.info('Contact created', { contactId: data.id, name: `${data.First_Name} ${data.Last_Name}` });
// Add your business logic here
}
async handleContactUpdated(data) {
logger.info('Contact updated', { contactId: data.id, name: `${data.First_Name} ${data.Last_Name}` });
// Add your business logic here
}
async handleDealCreated(data) {
logger.info('Deal created', { dealId: data.id, name: data.Deal_Name, amount: data.Amount });
// Add your business logic here
}
async handleDealUpdated(data) {
logger.info('Deal updated', { dealId: data.id, name: data.Deal_Name, amount: data.Amount });
// Add your business logic here
}
// People Event Handlers
async handleEmployeeCreated(data) {
logger.info('Employee created', { employeeId: data.id, name: `${data.firstName} ${data.lastName}` });
// Add your business logic here
}
async handleEmployeeUpdated(data) {
logger.info('Employee updated', { employeeId: data.id, name: `${data.firstName} ${data.lastName}` });
// Add your business logic here
}
async handleEmployeeDeleted(data) {
logger.info('Employee deleted', { employeeId: data.id });
// Add your business logic here
}
// Handle Zoho Bulk Read webhook events
async handleBulkReadWebhook(req, res) {
try {
console.log('=== ZOHO BULK READ WEBHOOK RECEIVED ===');
console.log('Headers:', JSON.stringify(req.headers, null, 2));
console.log('Body:', JSON.stringify(req.body, null, 2));
console.log('Query Parameters:', JSON.stringify(req.query, null, 2));
console.log('Request Method:', req.method);
console.log('Request URL:', req.url);
console.log('Request IP:', req.ip);
console.log('User Agent:', req.get('User-Agent'));
console.log('Content Type:', req.get('Content-Type'));
console.log('Content Length:', req.get('Content-Length'));
console.log('==========================================');
// Extract access token from query parameters
const accessToken = req.query.access_token;
if (!accessToken) {
console.error('❌ No access token provided in query parameters');
return res.status(400).json({
status: 'error',
message: 'Access token is required in query parameters'
});
}
console.log('🔑 Access token received from query parameters');
// Optional: Verify webhook signature if provided
const signature = req.headers['x-zoho-signature'];
if (signature) {
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho Bulk Read webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
console.log('✅ Webhook signature verified successfully');
} else {
console.log('⚠️ No webhook signature provided - skipping verification');
}
// Log the bulk read data structure
const { job_id, operation, state, query, fileType, result } = req.body;
console.log('req body after extracting', req.body);
console.log('📊 BULK READ JOB DETAILS:');
console.log('- Job ID:', job_id);
console.log('- Operation:', operation);
console.log('- State:', state);
console.log('- Module:', query?.module);
console.log('- Fields:', query?.fields);
console.log('- File Type:', fileType);
console.log('- Records Count:', result?.count);
console.log('- Download URL:', result?.download_url);
console.log('- More Records:', result?.more_records);
// Note: The actual data will be fetched from the download URL
console.log('📋 BULK READ DATA:');
console.log('- Data will be fetched from download URL:', result?.download_url);
console.log('- Total records to process:', result?.count);
// Log to structured logger as well
logger.info('Zoho Bulk Read webhook received', {
correlationId: logger.getCorrelationId(req),
jobId: job_id,
operation,
state,
module: query?.module,
recordsCount: result?.count,
hasDownloadUrl: !!result?.download_url,
fileType
});
// Process different bulk read events
switch (state) {
case 'COMPLETED':
console.log('✅ Bulk read job completed successfully');
await this.handleBulkReadCompleted(req.body, accessToken);
break;
case 'FAILED':
console.log('❌ Bulk read job failed');
await this.handleBulkReadFailed(req.body, accessToken);
break;
case 'IN_PROGRESS':
console.log('⏳ Bulk read job in progress');
await this.handleBulkReadInProgress(req.body, accessToken);
break;
case 'CREATED':
console.log('🆕 Bulk read job created');
await this.handleBulkReadCreated(req.body, accessToken);
break;
default:
console.log('❓ Unknown bulk read state:', state);
logger.warn('Unknown Zoho Bulk Read state', { state, jobId: job_id });
}
res.json({
status: 'success',
message: 'Bulk read webhook processed',
receivedAt: new Date().toISOString()
});
} catch (error) {
console.error('❌ Error processing bulk read webhook:', error);
logger.error('Zoho Bulk Read webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({
status: 'error',
message: 'Bulk read webhook processing failed',
error: error.message
});
}
}
// Helper function to get user data from access token
async getUserFromAccessToken(accessToken) {
try {
const jwtService = require('../../auth/jwt.service');
const { decrypt } = require('../../utils/crypto');
// Decrypt the access token if it's encrypted
let decryptedToken;
try {
decryptedToken = decrypt(accessToken);
} catch (error) {
// If decryption fails, assume it's already decrypted
decryptedToken = accessToken;
}
// Verify and decode the JWT token using the same service
const decoded = jwtService.verify(decryptedToken);
// Get user data from database using the user UUID from token
const userRepository = require('../../data/repositories/userRepository');
const user = await userRepository.findByUuid(decoded.uuid);
if (!user) {
throw new Error('User not found');
}
console.log(`👤 User authenticated: ${user.uuid} (${user.email})`);
return user;
} catch (error) {
console.error('❌ Error getting user from access token:', error);
throw error;
}
}
// Bulk Read Event Handlers
async handleBulkReadCompleted(data, accessToken) {
console.log('🎉 Processing completed bulk read job:', data.job_id);
logger.info('Bulk read job completed', {
jobId: data.job_id,
recordsCount: data.records_count,
downloadUrl: data.download_url
});
try {
// Import services
const CsvService = require('../../services/csvService');
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
const userAuthTokenRepo = require('../../data/repositories/userAuthTokenRepository');
const { decrypt } = require('../../utils/crypto');
const csvService = new CsvService();
// Extract job details
const { job_id, query, result, operation, state, fileType } = data;
const module = query.module;
const downloadUrl = result.download_url;
const recordsCount = result.count;
console.log(`📊 Processing bulk read for module: ${module}`);
console.log(`📥 Download URL: ${downloadUrl}`);
console.log(`📈 Records count: ${recordsCount}`);
// Get user from access token
let user;
try {
user = await this.getUserFromAccessToken(accessToken);
console.log(`👤 User authenticated: ${user.uuid} (${user.email})`);
} catch (error) {
console.error('❌ Failed to authenticate user from access token:', error);
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'failed',
error_message: 'Failed to authenticate user from access token'
});
return;
}
const userId = user.uuid;
// Get or create job record
let jobRecord = await ZohoBulkReadRepository.getBulkReadJob(job_id);
if (!jobRecord) {
console.log('📝 Job record not found, creating new one...');
// Create job record with the webhook data
const jobData = {
id: job_id,
user_uuid: userId,
module: query.module,
operation: operation,
state: state,
file_type: fileType,
download_url: result.download_url,
records_count: result.count
};
jobRecord = await ZohoBulkReadRepository.createBulkReadJob(jobData);
console.log('✅ Created new job record');
} else {
// Update existing job record with user ID if it was unknown
if (jobRecord.user_uuid === 'unknown') {
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
user_uuid: userId
});
console.log('✅ Updated job record with user ID');
}
}
// Get Zoho access token for the user
const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho');
if (!tokenRecord) {
console.error('❌ No Zoho token found for user:', userId);
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'failed',
error_message: 'No Zoho access token found for user'
});
return;
}
const zohoAccessToken = decrypt(tokenRecord.accessToken);
console.log('🔑 Zoho access token retrieved successfully');
// Fetch CSV data
const csvData = await csvService.fetchCsvData(downloadUrl, zohoAccessToken);
console.log(`📥 Fetched ${csvData.length} records from CSV`);
// Parse and map data
const mappedData = csvService.parseCsvData(csvData, module, userId, job_id);
console.log(`🔄 Mapped ${mappedData.length} records for database insertion`);
// Use transaction to ensure atomicity of delete + insert operations
const sequelize = require('../../db/pool');
const transaction = await sequelize.transaction();
let insertedRecords = []; // Declare outside try block for scope
try {
// Clear existing data for this user and module (REPLACE strategy)
console.log(`🗑️ Starting data cleanup for user ${userId}, module ${module}`);
// Check count before deletion
const beforeCount = await ZohoBulkReadRepository.getUserDataCount(userId, module, transaction);
console.log(`📊 Records before cleanup: ${beforeCount}`);
const deletedCount = await ZohoBulkReadRepository.clearUserData(userId, module, transaction);
console.log(`🗑️ Deleted ${deletedCount} existing records for user ${userId}, module ${module}`);
// Verify data is cleared by checking count
const remainingCount = await ZohoBulkReadRepository.getUserDataCount(userId, module, transaction);
console.log(`🔍 Remaining records after cleanup: ${remainingCount}`);
if (remainingCount > 0) {
console.warn(`⚠️ Warning: ${remainingCount} records still exist after cleanup!`);
console.warn(`⚠️ This indicates the delete operation may not have worked properly`);
}
// Bulk insert new data
console.log(`📥 Starting bulk insert of ${mappedData.length} new records`);
insertedRecords = await ZohoBulkReadRepository.bulkInsert(module, mappedData, transaction);
console.log(`✅ Successfully inserted ${insertedRecords.length} records`);
// Verify final count
const finalCount = await ZohoBulkReadRepository.getUserDataCount(userId, module, transaction);
console.log(`📊 Final record count for user ${userId}, module ${module}: ${finalCount}`);
// Commit transaction
await transaction.commit();
console.log(`✅ Transaction committed successfully`);
} catch (error) {
// Rollback transaction on error
await transaction.rollback();
console.error(`❌ Transaction rolled back due to error:`, error.message);
throw error;
}
// Update job status
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'completed',
processed_count: insertedRecords.length,
state: 'COMPLETED'
});
console.log('🎉 Bulk read processing completed successfully');
logger.info('Bulk read processing completed', {
jobId: job_id,
module,
userId,
recordsProcessed: insertedRecords.length,
totalRecords: recordsCount
});
} catch (error) {
console.error('❌ Error processing bulk read completion:', error);
logger.error('Bulk read processing failed', {
jobId: data.job_id,
error: error.message,
stack: error.stack
});
// Update job status to failed
try {
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'failed',
error_message: error.message
});
} catch (updateError) {
console.error('❌ Error updating job status:', updateError);
}
}
}
async handleBulkReadFailed(data, accessToken) {
console.log('💥 Processing failed bulk read job:', data.job_id);
logger.error('Bulk read job failed', {
jobId: data.job_id,
errorMessage: data.error_message
});
try {
// Get user from access token to update job record
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'failed',
state: 'FAILED',
error_message: data.error_message || 'Bulk read job failed'
});
console.log('✅ Updated job status to failed');
} catch (error) {
console.error('❌ Error updating failed job status:', error);
}
}
async handleBulkReadInProgress(data, accessToken) {
console.log('⏳ Bulk read job in progress:', data.job_id);
logger.info('Bulk read job in progress', {
jobId: data.job_id
});
try {
// Get user from access token to update job record
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'in_progress',
state: 'IN_PROGRESS'
});
console.log('✅ Updated job status to in_progress');
} catch (error) {
console.error('❌ Error updating job status:', error);
}
}
async handleBulkReadCreated(data, accessToken) {
console.log('🆕 Bulk read job created:', data.job_id);
logger.info('Bulk read job created', {
jobId: data.job_id,
module: data.query?.module
});
try {
// Get user from access token
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
const jobData = {
id: data.job_id,
user_uuid: user.uuid,
module: data.query?.module || 'unknown',
operation: data.operation || 'read',
state: data.state || 'CREATED',
file_type: data.fileType || 'csv',
download_url: data.result?.download_url,
records_count: data.result?.count || 0
};
await ZohoBulkReadRepository.createBulkReadJob(jobData);
console.log('✅ Created job record in database');
} catch (error) {
console.error('❌ Error creating job record:', error);
}
}
// Projects Event Handlers
async handleProjectCreated(data) {
logger.info('Project created', { projectId: data.id, name: data.name });
// Add your business logic here
}
async handleProjectUpdated(data) {
logger.info('Project updated', { projectId: data.id, name: data.name });
// Add your business logic here
}
async handleTaskCreated(data) {
logger.info('Task created', { taskId: data.id, name: data.name, projectId: data.project?.id });
// Add your business logic here
}
async handleTaskUpdated(data) {
logger.info('Task updated', { taskId: data.id, name: data.name, projectId: data.project?.id });
// Add your business logic here
}
// Books Event Handlers
async handleCustomerCreated(data) {
logger.info('Customer created', { customerId: data.contact_id, name: data.contact_name });
// Add your business logic here
}
async handleCustomerUpdated(data) {
logger.info('Customer updated', { customerId: data.contact_id, name: data.contact_name });
// Add your business logic here
}
async handleBooksInvoiceCreated(data) {
logger.info('Books Invoice created', { invoiceId: data.invoice_id, invoiceNumber: data.invoice_number, total: data.total });
// Add your business logic here
}
async handleBooksInvoiceUpdated(data) {
logger.info('Books Invoice updated', { invoiceId: data.invoice_id, invoiceNumber: data.invoice_number, total: data.total });
// Add your business logic here
}
async handleBillCreated(data) {
logger.info('Bill created', { billId: data.bill_id, billNumber: data.bill_number, total: data.total });
// Add your business logic here
}
async handleBillUpdated(data) {
logger.info('Bill updated', { billId: data.bill_id, billNumber: data.bill_number, total: data.total });
// Add your business logic here
}
async handleExpenseCreated(data) {
logger.info('Expense created', { expenseId: data.expense_id, amount: data.amount, description: data.description });
// Add your business logic here
}
async handleExpenseUpdated(data) {
logger.info('Expense updated', { expenseId: data.expense_id, amount: data.amount, description: data.description });
// Add your business logic here
}
}
module.exports = ZohoHandler;