bulk red web hook feature implemented for crm data lead tasks and contacts working

This commit is contained in:
yashwin-foxy 2025-09-18 17:56:24 +05:30
parent 8ac686c13e
commit 81dcfcd843
32 changed files with 2377 additions and 18 deletions

50
package-lock.json generated
View File

@ -1,13 +1,18 @@
{ {
"name": "CentralizedReportingBackend", "name": "centralized-reporting-backend",
"version": "1.0.0",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "centralized-reporting-backend",
"version": "1.0.0",
"license": "MIT",
"dependencies": { "dependencies": {
"axios": "^1.11.0", "axios": "^1.11.0",
"bcrypt": "^6.0.0", "bcrypt": "^6.0.0",
"cors": "^2.8.5", "cors": "^2.8.5",
"csv-parser": "^3.2.0",
"dotenv": "^17.2.2", "dotenv": "^17.2.2",
"express": "^4.21.2", "express": "^4.21.2",
"express-async-errors": "^3.1.1", "express-async-errors": "^3.1.1",
@ -22,7 +27,8 @@
"sequelize": "^6.37.7", "sequelize": "^6.37.7",
"swagger-jsdoc": "^6.2.8", "swagger-jsdoc": "^6.2.8",
"swagger-ui-express": "^5.0.1", "swagger-ui-express": "^5.0.1",
"uuid": "^13.0.0" "uuid": "^13.0.0",
"yauzl": "^3.2.0"
}, },
"devDependencies": { "devDependencies": {
"nodemon": "^3.1.10" "nodemon": "^3.1.10"
@ -350,6 +356,15 @@
"node": ">=8" "node": ">=8"
} }
}, },
"node_modules/buffer-crc32": {
"version": "0.2.13",
"resolved": "https://registry.npmjs.org/buffer-crc32/-/buffer-crc32-0.2.13.tgz",
"integrity": "sha512-VO9Ht/+p3SN7SKWqcrgEzjGbRSJYTx+Q1pTQC0wrWqHx0vpJraQ6GtHx8tvcg1rlK1byhU5gccxgOgj7B0TDkQ==",
"license": "MIT",
"engines": {
"node": "*"
}
},
"node_modules/buffer-equal-constant-time": { "node_modules/buffer-equal-constant-time": {
"version": "1.0.1", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz",
@ -542,6 +557,18 @@
"node": ">= 0.10" "node": ">= 0.10"
} }
}, },
"node_modules/csv-parser": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/csv-parser/-/csv-parser-3.2.0.tgz",
"integrity": "sha512-fgKbp+AJbn1h2dcAHKIdKNSSjfp43BZZykXsCjzALjKy80VXQNHPFJ6T9Afwdzoj24aMkq8GwDS7KGcDPpejrA==",
"license": "MIT",
"bin": {
"csv-parser": "bin/csv-parser"
},
"engines": {
"node": ">= 10"
}
},
"node_modules/debug": { "node_modules/debug": {
"version": "2.6.9", "version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
@ -1828,6 +1855,12 @@
"integrity": "sha512-RA1GjUVMnvYFxuqovrEqZoxxW5NUZqbwKtYz/Tt7nXerk0LbLblQmrsgdeOxV5SFHf0UDggjS/bSeOZwt1pmEQ==", "integrity": "sha512-RA1GjUVMnvYFxuqovrEqZoxxW5NUZqbwKtYz/Tt7nXerk0LbLblQmrsgdeOxV5SFHf0UDggjS/bSeOZwt1pmEQ==",
"license": "MIT" "license": "MIT"
}, },
"node_modules/pend": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/pend/-/pend-1.2.0.tgz",
"integrity": "sha512-F3asv42UuXchdzt+xXqfW1OGlVBe+mxa2mqI0pg5yAHZPvFmY3Y6drSf/GQ1A86WgWEN9Kzh/WrgKa6iGcHXLg==",
"license": "MIT"
},
"node_modules/pg-connection-string": { "node_modules/pg-connection-string": {
"version": "2.9.1", "version": "2.9.1",
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.9.1.tgz", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.9.1.tgz",
@ -2525,6 +2558,19 @@
"node": ">= 6" "node": ">= 6"
} }
}, },
"node_modules/yauzl": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/yauzl/-/yauzl-3.2.0.tgz",
"integrity": "sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w==",
"license": "MIT",
"dependencies": {
"buffer-crc32": "~0.2.3",
"pend": "~1.2.0"
},
"engines": {
"node": ">=12"
}
},
"node_modules/z-schema": { "node_modules/z-schema": {
"version": "5.0.5", "version": "5.0.5",
"resolved": "https://registry.npmjs.org/z-schema/-/z-schema-5.0.5.tgz", "resolved": "https://registry.npmjs.org/z-schema/-/z-schema-5.0.5.tgz",

View File

@ -13,6 +13,7 @@
"axios": "^1.11.0", "axios": "^1.11.0",
"bcrypt": "^6.0.0", "bcrypt": "^6.0.0",
"cors": "^2.8.5", "cors": "^2.8.5",
"csv-parser": "^3.2.0",
"dotenv": "^17.2.2", "dotenv": "^17.2.2",
"express": "^4.21.2", "express": "^4.21.2",
"express-async-errors": "^3.1.1", "express-async-errors": "^3.1.1",
@ -27,7 +28,8 @@
"sequelize": "^6.37.7", "sequelize": "^6.37.7",
"swagger-jsdoc": "^6.2.8", "swagger-jsdoc": "^6.2.8",
"swagger-ui-express": "^5.0.1", "swagger-ui-express": "^5.0.1",
"uuid": "^13.0.0" "uuid": "^13.0.0",
"yauzl": "^3.2.0"
}, },
"devDependencies": { "devDependencies": {
"nodemon": "^3.1.10" "nodemon": "^3.1.10"

View File

@ -5,7 +5,7 @@ async function getData(req, res) {
try { try {
const { provider, service, resource, page, limit, filters } = req.query; const { provider, service, resource, page, limit, filters } = req.query;
console.log('query is', req.query); console.log('query is', req.query);
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
@ -26,7 +26,7 @@ async function getData(req, res) {
async function getServices(req, res) { async function getServices(req, res) {
try { try {
const { provider } = req.query; const { provider } = req.query;
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const services = await integrationService.getAvailableServices(provider); const services = await integrationService.getAvailableServices(provider);
res.json(success(`${provider} available services`, services)); res.json(success(`${provider} available services`, services));
} catch (error) { } catch (error) {
@ -37,7 +37,7 @@ async function getServices(req, res) {
async function getResources(req, res) { async function getResources(req, res) {
try { try {
const { provider, service } = req.query; const { provider, service } = req.query;
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const resources = await integrationService.getAvailableResources(provider, service); const resources = await integrationService.getAvailableResources(provider, service);
res.json(success(`${provider} ${service} available resources`, resources)); res.json(success(`${provider} ${service} available resources`, resources));
} catch (error) { } catch (error) {
@ -52,7 +52,7 @@ async function getPortals(req, res) {
return res.status(400).json(failure('Portals are only available for Zoho provider', 'INVALID_PROVIDER')); return res.status(400).json(failure('Portals are only available for Zoho provider', 'INVALID_PROVIDER'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const portals = await integrationService.getPortals(provider); const portals = await integrationService.getPortals(provider);
res.json(success('Zoho portals retrieved successfully', portals)); res.json(success('Zoho portals retrieved successfully', portals));
} catch (error) { } catch (error) {
@ -67,7 +67,7 @@ async function getAllProjects(req, res) {
return res.status(400).json(failure('All projects are only available for Zoho provider', 'INVALID_PROVIDER')); return res.status(400).json(failure('All projects are only available for Zoho provider', 'INVALID_PROVIDER'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -96,7 +96,7 @@ async function getAllProjectTasks(req, res) {
return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -125,7 +125,7 @@ async function getAllProjectTaskLists(req, res) {
return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -155,7 +155,7 @@ async function getAllProjectIssues(req, res) {
return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -186,7 +186,7 @@ async function getAllProjectPhases(req, res) {
return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -211,7 +211,7 @@ async function getSalesOrders(req, res) {
return res.status(400).json(failure('Sales Orders are only available for Zoho provider', 'INVALID_PROVIDER')); return res.status(400).json(failure('Sales Orders are only available for Zoho provider', 'INVALID_PROVIDER'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -236,7 +236,7 @@ async function getPurchaseOrders(req, res) {
return res.status(400).json(failure('Purchase Orders are only available for Zoho provider', 'INVALID_PROVIDER')); return res.status(400).json(failure('Purchase Orders are only available for Zoho provider', 'INVALID_PROVIDER'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {
@ -261,7 +261,7 @@ async function getInvoices(req, res) {
return res.status(400).json(failure('Invoices are only available for Zoho provider', 'INVALID_PROVIDER')); return res.status(400).json(failure('Invoices are only available for Zoho provider', 'INVALID_PROVIDER'));
} }
const integrationService = new IntegrationService(req.user.id); const integrationService = new IntegrationService(req.user.uuid);
const params = { page, limit }; const params = { page, limit };
if (filters) { if (filters) {
try { try {

View File

@ -33,6 +33,7 @@ module.exports = { register, me, updateMe, removeMe };
// Exchange Zoho authorization code for tokens and persist // Exchange Zoho authorization code for tokens and persist
async function exchangeZohoToken(req, res) { async function exchangeZohoToken(req, res) {
const { authorization_code, id, service_name } = req.body; const { authorization_code, id, service_name } = req.body;
console.log('exchangeZohoToken', req.body);
// Optional: ensure the id belongs to the authenticated user (if business rule requires) // Optional: ensure the id belongs to the authenticated user (if business rule requires)
const params = new URLSearchParams(); const params = new URLSearchParams();
params.append('code', authorization_code); params.append('code', authorization_code);

View File

@ -0,0 +1,131 @@
const express = require('express');
const Joi = require('joi');
const BulkReadService = require('../../services/bulkReadService');
const auth = require('../middlewares/auth');
const { success, failure } = require('../../utils/response');
const router = express.Router();
function validate(schema, source = 'query') {
return (req, res, next) => {
const data = source === 'body' ? req.body : req.query;
const { error, value } = schema.validate(data, { abortEarly: false, stripUnknown: true });
if (error) {
return res.status(400).json({
status: 'error',
message: 'Validation failed',
errorCode: 'VALIDATION_ERROR',
details: error.details,
timestamp: new Date().toISOString()
});
}
if (source === 'body') {
req.body = value;
} else {
req.query = value;
}
next();
};
}
// Initiate bulk read job
const initiateBulkReadSchema = Joi.object({
module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(),
fields: Joi.array().items(Joi.string()).min(1).required(),
page: Joi.number().min(1).default(1),
limit: Joi.number().min(1).max(200000).default(200000)
});
router.post('/initiate', auth, validate(initiateBulkReadSchema, 'body'), async (req, res) => {
try {
const { module, fields, page, limit } = req.body;
const userId = req.user.uuid;
const bulkReadService = new BulkReadService();
const result = await bulkReadService.initiateBulkRead(userId, module, fields, { page, limit });
res.json(success('Bulk read job initiated successfully', result));
} catch (error) {
res.status(400).json(failure(error.message, 'BULK_READ_ERROR'));
}
});
// Get bulk read job status
const jobStatusSchema = Joi.object({
job_id: Joi.string().required()
});
router.get('/job/:job_id', auth, validate(jobStatusSchema, 'params'), async (req, res) => {
try {
const { job_id } = req.params;
const userId = req.user.uuid;
const bulkReadService = new BulkReadService();
const result = await bulkReadService.getBulkReadJobStatus(userId, job_id);
res.json(success('Job status retrieved successfully', result));
} catch (error) {
res.status(400).json(failure(error.message, 'JOB_STATUS_ERROR'));
}
});
// Get user's bulk read jobs
const userJobsSchema = Joi.object({
page: Joi.number().min(1).default(1),
limit: Joi.number().min(1).max(100).default(50),
status: Joi.string().valid('pending', 'in_progress', 'completed', 'failed').optional()
});
router.get('/jobs', auth, validate(userJobsSchema), async (req, res) => {
try {
const { page, limit, status } = req.query;
const userId = req.user.uuid;
const bulkReadService = new BulkReadService();
const result = await bulkReadService.getUserBulkReadJobs(userId, { page, limit, status });
res.json(success('Bulk read jobs retrieved successfully', result));
} catch (error) {
res.status(400).json(failure(error.message, 'JOBS_RETRIEVAL_ERROR'));
}
});
// Get bulk read data for a module
const moduleDataSchema = Joi.object({
module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(),
page: Joi.number().min(1).default(1),
limit: Joi.number().min(1).max(1000).default(100),
orderBy: Joi.string().default('created_time'),
orderDirection: Joi.string().valid('ASC', 'DESC').default('DESC')
});
router.get('/data/:module', auth, validate(moduleDataSchema, 'params'), async (req, res) => {
try {
const { module } = req.params;
const { page, limit, orderBy, orderDirection } = req.query;
const userId = req.user.uuid;
const bulkReadService = new BulkReadService();
const result = await bulkReadService.getBulkReadData(userId, module, {
page, limit, orderBy, orderDirection
});
res.json(success(`${module} data retrieved successfully`, result));
} catch (error) {
res.status(400).json(failure(error.message, 'DATA_RETRIEVAL_ERROR'));
}
});
// Get available modules
router.get('/modules', auth, async (req, res) => {
try {
const bulkReadService = new BulkReadService();
const modules = bulkReadService.getAvailableModules();
res.json(success('Available modules retrieved successfully', modules));
} catch (error) {
res.status(400).json(failure(error.message, 'MODULES_RETRIEVAL_ERROR'));
}
});
module.exports = router;

View File

@ -151,5 +151,6 @@ const zohoHandler = new ZohoHandler();
router.post('/webhooks/zoho/crm', zohoHandler.handleCrmWebhook.bind(zohoHandler)); router.post('/webhooks/zoho/crm', zohoHandler.handleCrmWebhook.bind(zohoHandler));
router.post('/webhooks/zoho/people', zohoHandler.handlePeopleWebhook.bind(zohoHandler)); router.post('/webhooks/zoho/people', zohoHandler.handlePeopleWebhook.bind(zohoHandler));
router.post('/webhooks/zoho/projects', zohoHandler.handleProjectsWebhook.bind(zohoHandler)); router.post('/webhooks/zoho/projects', zohoHandler.handleProjectsWebhook.bind(zohoHandler));
router.post('/webhooks/zoho/bulkread', zohoHandler.handleBulkReadWebhook.bind(zohoHandler));
module.exports = router; module.exports = router;

View File

@ -5,6 +5,7 @@ const { register, me, updateMe, removeMe, exchangeZohoToken } = require('../cont
const auth = require('../middlewares/auth'); const auth = require('../middlewares/auth');
const { registerSchema, updateSchema } = require('../validators/userValidator'); const { registerSchema, updateSchema } = require('../validators/userValidator');
const Joi = require('joi'); const Joi = require('joi');
const crypto = require('crypto');
const router = express.Router(); const router = express.Router();
@ -14,6 +15,20 @@ const storage = multer.diskStorage({
}); });
const upload = multer({ storage }); const upload = multer({ storage });
// Decryption function
function decrypt(ciphertext) {
const algorithm = 'aes-256-gcm';
const key = crypto.createHash('sha256').update('changeme').digest();
const buf = Buffer.from(ciphertext, 'base64');
const iv = buf.subarray(0, 12);
const authTag = buf.subarray(12, 28);
const encrypted = buf.subarray(28);
const decipher = crypto.createDecipheriv(algorithm, key, iv);
decipher.setAuthTag(authTag);
const decrypted = Buffer.concat([decipher.update(encrypted), decipher.final()]);
return decrypted.toString('utf8');
}
function validate(schema) { function validate(schema) {
return (req, res, next) => { return (req, res, next) => {
const toValidate = req.method === 'GET' ? req.query : req.body; const toValidate = req.method === 'GET' ? req.query : req.body;
@ -34,9 +49,66 @@ router.delete('/me', auth, removeMe);
// OAuth token exchange (Zoho request currently) // OAuth token exchange (Zoho request currently)
const zohoTokenSchema = Joi.object({ const zohoTokenSchema = Joi.object({
authorization_code: Joi.string().required(), authorization_code: Joi.string().required(),
id: Joi.number().required(), id: Joi.string().required(),
service_name: Joi.string().valid('zoho', 'keka', 'bamboohr', 'hubspot', 'other').required() service_name: Joi.string().valid('zoho', 'keka', 'bamboohr', 'hubspot', 'other').required()
}); });
router.post('/zoho/token', auth, validate(zohoTokenSchema), exchangeZohoToken); router.post('/zoho/token', auth, validate(zohoTokenSchema), exchangeZohoToken);
// Decrypt access token route
const decryptTokenSchema = Joi.object({
service_name: Joi.string().valid('zoho', 'keka', 'bamboohr', 'hubspot', 'other').required()
});
router.get('/decrypt-token', auth, validate(decryptTokenSchema), (req, res) => {
try {
const { service_name } = req.query;
const userId = req.user.uuid;
// Import the userAuthTokenRepository
const userAuthTokenRepo = require('../../data/repositories/userAuthTokenRepository');
// Find the access token for the user and service
userAuthTokenRepo.findByUserAndService(userId, service_name)
.then(token => {
if (!token) {
return res.status(404).json({
status: 'error',
message: `${service_name} token not found for user`,
errorCode: 'TOKEN_NOT_FOUND',
timestamp: new Date().toISOString()
});
}
// Decrypt the access token
const decryptedToken = decrypt(token.accessToken);
res.json({
status: 'success',
message: `${service_name} token fetched successfully`,
data: {
service: service_name,
accessToken: decryptedToken,
expiresAt: token.expiresAt
},
timestamp: new Date().toISOString()
});
})
.catch(error => {
res.status(500).json({
status: 'error',
message: 'Error retrieving token',
errorCode: 'TOKEN_RETRIEVAL_ERROR',
details: error.message,
timestamp: new Date().toISOString()
});
});
} catch (error) {
res.status(500).json({
status: 'error',
message: 'Error decrypting token',
errorCode: 'DECRYPTION_ERROR',
details: error.message,
timestamp: new Date().toISOString()
});
}
});
module.exports = router; module.exports = router;

View File

@ -11,6 +11,7 @@ const config = require('./config');
const userRoutes = require('./api/routes/userRoutes'); const userRoutes = require('./api/routes/userRoutes');
const authRoutes = require('./api/routes/authRoutes'); const authRoutes = require('./api/routes/authRoutes');
const integrationRoutes = require('./api/routes/integrationRoutes'); const integrationRoutes = require('./api/routes/integrationRoutes');
const bulkReadRoutes = require('./api/routes/bulkReadRoutes');
const sequelize = require('./db/pool'); const sequelize = require('./db/pool');
const app = express(); const app = express();
@ -41,6 +42,7 @@ app.get('/health', async (req, res) => {
app.use(`${config.app.apiPrefix}/auth`, authRoutes); app.use(`${config.app.apiPrefix}/auth`, authRoutes);
app.use(`${config.app.apiPrefix}/users`, userRoutes); app.use(`${config.app.apiPrefix}/users`, userRoutes);
app.use(`${config.app.apiPrefix}/integrations`, integrationRoutes); app.use(`${config.app.apiPrefix}/integrations`, integrationRoutes);
app.use(`${config.app.apiPrefix}/bulk-read`, bulkReadRoutes);
module.exports = app; module.exports = app;

View File

@ -0,0 +1,72 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoAccountsBulk = sequelize.define('ZohoAccountsBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
account_name: {
type: DataTypes.STRING(255),
allowNull: true
},
phone: {
type: DataTypes.STRING(255),
allowNull: true
},
website: {
type: DataTypes.STRING(255),
allowNull: true
},
industry: {
type: DataTypes.STRING(255),
allowNull: true
},
ownership: {
type: DataTypes.STRING(255),
allowNull: true
},
annual_revenue: {
type: DataTypes.DECIMAL(15, 2),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_accounts_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoAccountsBulk;

View File

@ -0,0 +1,70 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoBulkReadJobs = sequelize.define('ZohoBulkReadJobs', {
id: {
type: DataTypes.STRING(255),
primaryKey: true,
allowNull: false
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
module: {
type: DataTypes.STRING(100),
allowNull: false
},
operation: {
type: DataTypes.STRING(50),
allowNull: false
},
state: {
type: DataTypes.STRING(50),
allowNull: false
},
file_type: {
type: DataTypes.STRING(10),
allowNull: false
},
download_url: {
type: DataTypes.TEXT,
allowNull: true
},
records_count: {
type: DataTypes.INTEGER,
allowNull: false,
defaultValue: 0
},
processed_count: {
type: DataTypes.INTEGER,
allowNull: false,
defaultValue: 0
},
status: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'pending'
},
error_message: {
type: DataTypes.TEXT,
allowNull: true
}
}, {
tableName: 'zoho_bulk_read_jobs',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['module'] },
{ fields: ['status'] }
]
});
module.exports = ZohoBulkReadJobs;

View File

@ -0,0 +1,80 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoContactsBulk = sequelize.define('ZohoContactsBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
first_name: {
type: DataTypes.STRING(255),
allowNull: true
},
last_name: {
type: DataTypes.STRING(255),
allowNull: true
},
email: {
type: DataTypes.STRING(255),
allowNull: true
},
phone: {
type: DataTypes.STRING(255),
allowNull: true
},
mobile: {
type: DataTypes.STRING(255),
allowNull: true
},
lead_source: {
type: DataTypes.STRING(255),
allowNull: true
},
account_name: {
type: DataTypes.STRING(255),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
modified_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_contacts_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoContactsBulk;

View File

@ -0,0 +1,76 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoInvoicesBulk = sequelize.define('ZohoInvoicesBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
invoice_number: {
type: DataTypes.STRING(255),
allowNull: true
},
invoice_date: {
type: DataTypes.DATEONLY,
allowNull: true
},
due_date: {
type: DataTypes.DATEONLY,
allowNull: true
},
status: {
type: DataTypes.STRING(255),
allowNull: true
},
total: {
type: DataTypes.DECIMAL(15, 2),
allowNull: true
},
balance: {
type: DataTypes.DECIMAL(15, 2),
allowNull: true
},
account_name: {
type: DataTypes.STRING(255),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_invoices_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoInvoicesBulk;

View File

@ -0,0 +1,76 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoLeadsBulk = sequelize.define('ZohoLeadsBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
first_name: {
type: DataTypes.STRING(255),
allowNull: true
},
last_name: {
type: DataTypes.STRING(255),
allowNull: true
},
company: {
type: DataTypes.STRING(255),
allowNull: true
},
lead_source: {
type: DataTypes.STRING(255),
allowNull: true
},
lead_status: {
type: DataTypes.STRING(255),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
email: {
type: DataTypes.STRING(255),
allowNull: true
},
phone: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_leads_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoLeadsBulk;

View File

@ -0,0 +1,72 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoPurchaseOrdersBulk = sequelize.define('ZohoPurchaseOrdersBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
purchase_order_number: {
type: DataTypes.STRING(255),
allowNull: true
},
subject: {
type: DataTypes.STRING(500),
allowNull: true
},
vendor_name: {
type: DataTypes.STRING(255),
allowNull: true
},
status: {
type: DataTypes.STRING(255),
allowNull: true
},
due_date: {
type: DataTypes.DATEONLY,
allowNull: true
},
total: {
type: DataTypes.DECIMAL(15, 2),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_purchase_orders_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoPurchaseOrdersBulk;

View File

@ -0,0 +1,72 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoSalesOrdersBulk = sequelize.define('ZohoSalesOrdersBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
sales_order_number: {
type: DataTypes.STRING(255),
allowNull: true
},
subject: {
type: DataTypes.STRING(500),
allowNull: true
},
status: {
type: DataTypes.STRING(255),
allowNull: true
},
due_date: {
type: DataTypes.DATEONLY,
allowNull: true
},
total: {
type: DataTypes.DECIMAL(15, 2),
allowNull: true
},
account_name: {
type: DataTypes.STRING(255),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_sales_orders_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoSalesOrdersBulk;

View File

@ -0,0 +1,68 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoTasksBulk = sequelize.define('ZohoTasksBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
subject: {
type: DataTypes.STRING(500),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
status: {
type: DataTypes.STRING(255),
allowNull: true
},
priority: {
type: DataTypes.STRING(255),
allowNull: true
},
due_date: {
type: DataTypes.DATE,
allowNull: true
},
what_id: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_tasks_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoTasksBulk;

View File

@ -0,0 +1,64 @@
const { DataTypes } = require('sequelize');
const sequelize = require('../../db/pool');
const ZohoVendorsBulk = sequelize.define('ZohoVendorsBulk', {
internal_id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true,
allowNull: false
},
zoho_id: {
type: DataTypes.STRING(255),
allowNull: true
},
user_id: {
type: DataTypes.STRING(255),
allowNull: false
},
provider: {
type: DataTypes.STRING(50),
allowNull: false,
defaultValue: 'zoho'
},
vendor_name: {
type: DataTypes.STRING(255),
allowNull: true
},
email: {
type: DataTypes.STRING(255),
allowNull: true
},
phone: {
type: DataTypes.STRING(255),
allowNull: true
},
website: {
type: DataTypes.STRING(255),
allowNull: true
},
owner: {
type: DataTypes.STRING(255),
allowNull: true
},
created_time: {
type: DataTypes.DATE,
allowNull: true
},
bulk_job_id: {
type: DataTypes.STRING(255),
allowNull: true
}
}, {
tableName: 'zoho_vendors_bulk',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at',
indexes: [
{ fields: ['user_id', 'provider'] },
{ fields: ['bulk_job_id'] },
{ fields: ['created_time'] }
]
});
module.exports = ZohoVendorsBulk;

View File

@ -4,6 +4,10 @@ async function createUser(payload) {
return User.create(payload); return User.create(payload);
} }
async function findById(id) {
return User.findByPk(id);
}
async function findByEmail(email) { async function findByEmail(email) {
return User.findOne({ where: { email } }); return User.findOne({ where: { email } });
} }
@ -26,5 +30,5 @@ async function deleteByUuid(uuid) {
return true; return true;
} }
module.exports = { createUser, findByEmail, findByUuid, updateByUuid, deleteByUuid }; module.exports = { createUser, findById, findByEmail, findByUuid, updateByUuid, deleteByUuid };

View File

@ -0,0 +1,250 @@
const ZohoContactsBulk = require('../models/zohoContactsBulk');
const ZohoLeadsBulk = require('../models/zohoLeadsBulk');
const ZohoAccountsBulk = require('../models/zohoAccountsBulk');
const ZohoTasksBulk = require('../models/zohoTasksBulk');
const ZohoVendorsBulk = require('../models/zohoVendorsBulk');
const ZohoInvoicesBulk = require('../models/zohoInvoicesBulk');
const ZohoSalesOrdersBulk = require('../models/zohoSalesOrdersBulk');
const ZohoPurchaseOrdersBulk = require('../models/zohoPurchaseOrdersBulk');
const ZohoBulkReadJobs = require('../models/zohoBulkReadJobs');
class ZohoBulkReadRepository {
constructor() {
this.models = {
'contacts': ZohoContactsBulk,
'leads': ZohoLeadsBulk,
'accounts': ZohoAccountsBulk,
'tasks': ZohoTasksBulk,
'vendors': ZohoVendorsBulk,
'invoices': ZohoInvoicesBulk,
'sales_orders': ZohoSalesOrdersBulk,
'purchase_orders': ZohoPurchaseOrdersBulk
};
}
/**
* Get the appropriate model for a module
* @param {string} module - Module name
* @returns {Object} Sequelize model
*/
getModel(module) {
const model = this.models[module.toLowerCase()];
if (!model) {
throw new Error(`No model found for module: ${module}`);
}
return model;
}
/**
* Bulk insert data for a specific module
* @param {string} module - Module name
* @param {Array} data - Array of records to insert
* @returns {Promise<Array>} Inserted records
*/
async bulkInsert(module, data) {
try {
const model = this.getModel(module);
console.log(`💾 Bulk inserting ${data.length} records for ${module}`);
const result = await model.bulkCreate(data, {
ignoreDuplicates: true,
validate: true
});
console.log(`✅ Successfully inserted ${result.length} records for ${module}`);
return result;
} catch (error) {
console.error(`❌ Error bulk inserting ${module}:`, error.message);
throw error;
}
}
/**
* Clear existing data for a user and module
* @param {string} userId - User UUID
* @param {string} module - Module name
* @param {string} jobId - Bulk job ID
* @returns {Promise<number>} Number of deleted records
*/
async clearUserData(userId, module, jobId) {
try {
const model = this.getModel(module);
console.log(`🗑️ Clearing existing data for user ${userId}, module ${module}, job ${jobId}`);
const result = await model.destroy({
where: {
user_id: userId,
provider: 'zoho',
bulk_job_id: jobId
}
});
console.log(`✅ Cleared ${result} existing records for ${module}`);
return result;
} catch (error) {
console.error(`❌ Error clearing data for ${module}:`, error.message);
throw error;
}
}
/**
* Get data for a user and module
* @param {string} userId - User UUID
* @param {string} module - Module name
* @param {Object} options - Query options
* @returns {Promise<Array>} Records
*/
async getUserData(userId, module, options = {}) {
try {
const model = this.getModel(module);
const { limit = 100, offset = 0, orderBy = 'created_time', orderDirection = 'DESC' } = options;
const records = await model.findAll({
where: {
user_id: userId,
provider: 'zoho'
},
limit: parseInt(limit),
offset: parseInt(offset),
order: [[orderBy, orderDirection]]
});
return records;
} catch (error) {
console.error(`❌ Error getting user data for ${module}:`, error.message);
throw error;
}
}
/**
* Get count of records for a user and module
* @param {string} userId - User UUID
* @param {string} module - Module name
* @returns {Promise<number>} Record count
*/
async getUserDataCount(userId, module) {
try {
const model = this.getModel(module);
const count = await model.count({
where: {
user_id: userId,
provider: 'zoho'
}
});
return count;
} catch (error) {
console.error(`❌ Error getting count for ${module}:`, error.message);
throw error;
}
}
// Bulk Read Jobs methods
/**
* Create a new bulk read job record
* @param {Object} jobData - Job data
* @returns {Promise<Object>} Created job
*/
async createBulkReadJob(jobData) {
try {
console.log('📝 Creating bulk read job:', jobData.id);
const job = await ZohoBulkReadJobs.create({
id: jobData.id,
user_id: jobData.user_id,
provider: 'zoho',
module: jobData.module,
operation: jobData.operation,
state: jobData.state,
file_type: jobData.file_type,
download_url: jobData.download_url,
records_count: jobData.records_count || 0,
status: 'pending'
});
console.log('✅ Bulk read job created successfully');
return job;
} catch (error) {
console.error('❌ Error creating bulk read job:', error.message);
throw error;
}
}
/**
* Update bulk read job status
* @param {string} jobId - Job ID
* @param {Object} updateData - Update data
* @returns {Promise<Object>} Updated job
*/
async updateBulkReadJob(jobId, updateData) {
try {
console.log(`🔄 Updating bulk read job ${jobId}:`, updateData);
const [updatedRows] = await ZohoBulkReadJobs.update(updateData, {
where: { id: jobId }
});
if (updatedRows === 0) {
throw new Error(`Job ${jobId} not found`);
}
const updatedJob = await ZohoBulkReadJobs.findByPk(jobId);
console.log('✅ Bulk read job updated successfully');
return updatedJob;
} catch (error) {
console.error('❌ Error updating bulk read job:', error.message);
throw error;
}
}
/**
* Get bulk read job by ID
* @param {string} jobId - Job ID
* @returns {Promise<Object>} Job record
*/
async getBulkReadJob(jobId) {
try {
const job = await ZohoBulkReadJobs.findByPk(jobId);
return job;
} catch (error) {
console.error('❌ Error getting bulk read job:', error.message);
throw error;
}
}
/**
* Get bulk read jobs for a user
* @param {string} userId - User UUID
* @param {Object} options - Query options
* @returns {Promise<Array>} Job records
*/
async getUserBulkReadJobs(userId, options = {}) {
try {
const { limit = 50, offset = 0, status } = options;
const whereClause = {
user_id: userId,
provider: 'zoho'
};
if (status) {
whereClause.status = status;
}
const jobs = await ZohoBulkReadJobs.findAll({
where: whereClause,
limit: parseInt(limit),
offset: parseInt(offset),
order: [['created_at', 'DESC']]
});
return jobs;
} catch (error) {
console.error('❌ Error getting user bulk read jobs:', error.message);
throw error;
}
}
}
module.exports = new ZohoBulkReadRepository();

View File

@ -1,6 +1,6 @@
CREATE TABLE IF NOT EXISTS user_auth_tokens ( CREATE TABLE IF NOT EXISTS user_auth_tokens (
id INT AUTO_INCREMENT PRIMARY KEY, id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL, user_id CHAR(36) NOT NULL,
service_name ENUM('zoho','keka','bamboohr','hubspot','other') NOT NULL, service_name ENUM('zoho','keka','bamboohr','hubspot','other') NOT NULL,
access_token TEXT NOT NULL, access_token TEXT NOT NULL,
refresh_token TEXT NULL, refresh_token TEXT NULL,

View File

@ -0,0 +1,26 @@
-- Migration: Create Zoho Contacts Bulk Table
-- Description: Creates table for storing bulk read contacts data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_contacts_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
first_name VARCHAR(255),
last_name VARCHAR(255),
email VARCHAR(255),
phone VARCHAR(255),
mobile VARCHAR(255),
lead_source VARCHAR(255),
account_name VARCHAR(255),
owner VARCHAR(255),
created_time DATETIME,
modified_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,25 @@
-- Migration: Create Zoho Leads Bulk Table
-- Description: Creates table for storing bulk read leads data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_leads_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
first_name VARCHAR(255),
last_name VARCHAR(255),
company VARCHAR(255),
lead_source VARCHAR(255),
lead_status VARCHAR(255),
owner VARCHAR(255),
email VARCHAR(255),
phone VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,24 @@
-- Migration: Create Zoho Accounts Bulk Table
-- Description: Creates table for storing bulk read accounts data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_accounts_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
account_name VARCHAR(255),
phone VARCHAR(255),
website VARCHAR(255),
industry VARCHAR(255),
ownership VARCHAR(255),
annual_revenue DECIMAL(15,2),
owner VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,23 @@
-- Migration: Create Zoho Tasks Bulk Table
-- Description: Creates table for storing bulk read tasks data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_tasks_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
subject VARCHAR(500),
owner VARCHAR(255),
status VARCHAR(255),
priority VARCHAR(255),
due_date DATETIME,
what_id VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,22 @@
-- Migration: Create Zoho Vendors Bulk Table
-- Description: Creates table for storing bulk read vendors data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_vendors_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
vendor_name VARCHAR(255),
email VARCHAR(255),
phone VARCHAR(255),
website VARCHAR(255),
owner VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,25 @@
-- Migration: Create Zoho Invoices Bulk Table
-- Description: Creates table for storing bulk read invoices data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_invoices_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
invoice_number VARCHAR(255),
invoice_date DATE,
due_date DATE,
status VARCHAR(255),
total DECIMAL(15,2),
balance DECIMAL(15,2),
account_name VARCHAR(255),
owner VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,24 @@
-- Migration: Create Zoho Sales Orders Bulk Table
-- Description: Creates table for storing bulk read sales orders data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_sales_orders_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
sales_order_number VARCHAR(255),
subject VARCHAR(500),
status VARCHAR(255),
due_date DATE,
total DECIMAL(15,2),
account_name VARCHAR(255),
owner VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,24 @@
-- Migration: Create Zoho Purchase Orders Bulk Table
-- Description: Creates table for storing bulk read purchase orders data from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_purchase_orders_bulk (
internal_id INT AUTO_INCREMENT PRIMARY KEY,
zoho_id VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
purchase_order_number VARCHAR(255),
subject VARCHAR(500),
vendor_name VARCHAR(255),
status VARCHAR(255),
due_date DATE,
total DECIMAL(15,2),
owner VARCHAR(255),
created_time DATETIME,
bulk_job_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_bulk_job (bulk_job_id),
INDEX idx_created_time (created_time),
INDEX idx_zoho_id (zoho_id)
);

View File

@ -0,0 +1,22 @@
-- Migration: Create Zoho Bulk Read Jobs Table
-- Description: Creates table for tracking bulk read jobs from Zoho CRM
CREATE TABLE IF NOT EXISTS zoho_bulk_read_jobs (
id VARCHAR(255) PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
provider VARCHAR(50) NOT NULL DEFAULT 'zoho',
module VARCHAR(100) NOT NULL,
operation VARCHAR(50) NOT NULL,
state VARCHAR(50) NOT NULL,
file_type VARCHAR(10) NOT NULL,
download_url TEXT,
records_count INT DEFAULT 0,
processed_count INT DEFAULT 0,
status VARCHAR(50) DEFAULT 'pending',
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_provider (user_id, provider),
INDEX idx_module (module),
INDEX idx_status (status)
);

View File

@ -23,6 +23,7 @@ class ZohoHandler {
// Handle Zoho CRM webhook events // Handle Zoho CRM webhook events
async handleCrmWebhook(req, res) { async handleCrmWebhook(req, res) {
console.log('handleCrmWebhook', JSON.stringify(req.body));
try { try {
const signature = req.headers['x-zoho-signature']; const signature = req.headers['x-zoho-signature'];
const payload = JSON.stringify(req.body); const payload = JSON.stringify(req.body);
@ -229,6 +230,372 @@ class ZohoHandler {
// Add your business logic here // Add your business logic here
} }
// Handle Zoho Bulk Read webhook events
async handleBulkReadWebhook(req, res) {
try {
console.log('=== ZOHO BULK READ WEBHOOK RECEIVED ===');
console.log('Headers:', JSON.stringify(req.headers, null, 2));
console.log('Body:', JSON.stringify(req.body, null, 2));
console.log('Query Parameters:', JSON.stringify(req.query, null, 2));
console.log('Request Method:', req.method);
console.log('Request URL:', req.url);
console.log('Request IP:', req.ip);
console.log('User Agent:', req.get('User-Agent'));
console.log('Content Type:', req.get('Content-Type'));
console.log('Content Length:', req.get('Content-Length'));
console.log('==========================================');
// Extract access token from query parameters
const accessToken = req.query.access_token;
if (!accessToken) {
console.error('❌ No access token provided in query parameters');
return res.status(400).json({
status: 'error',
message: 'Access token is required in query parameters'
});
}
console.log('🔑 Access token received from query parameters');
// Optional: Verify webhook signature if provided
const signature = req.headers['x-zoho-signature'];
if (signature) {
const payload = JSON.stringify(req.body);
if (!this.verifyWebhookSignature(payload, signature)) {
logger.warn('Invalid Zoho Bulk Read webhook signature', {
correlationId: logger.getCorrelationId(req),
ip: req.ip
});
return res.status(401).json({ status: 'error', message: 'Invalid signature' });
}
console.log('✅ Webhook signature verified successfully');
} else {
console.log('⚠️ No webhook signature provided - skipping verification');
}
// Log the bulk read data structure
const { job_id, operation, state, query, fileType, result } = req.body;
console.log('req body after extracting', req.body);
console.log('📊 BULK READ JOB DETAILS:');
console.log('- Job ID:', job_id);
console.log('- Operation:', operation);
console.log('- State:', state);
console.log('- Module:', query?.module);
console.log('- Fields:', query?.fields);
console.log('- File Type:', fileType);
console.log('- Records Count:', result?.count);
console.log('- Download URL:', result?.download_url);
console.log('- More Records:', result?.more_records);
// Note: The actual data will be fetched from the download URL
console.log('📋 BULK READ DATA:');
console.log('- Data will be fetched from download URL:', result?.download_url);
console.log('- Total records to process:', result?.count);
// Log to structured logger as well
logger.info('Zoho Bulk Read webhook received', {
correlationId: logger.getCorrelationId(req),
jobId: job_id,
operation,
state,
module: query?.module,
recordsCount: result?.count,
hasDownloadUrl: !!result?.download_url,
fileType
});
// Process different bulk read events
switch (state) {
case 'COMPLETED':
console.log('✅ Bulk read job completed successfully');
await this.handleBulkReadCompleted(req.body, accessToken);
break;
case 'FAILED':
console.log('❌ Bulk read job failed');
await this.handleBulkReadFailed(req.body, accessToken);
break;
case 'IN_PROGRESS':
console.log('⏳ Bulk read job in progress');
await this.handleBulkReadInProgress(req.body, accessToken);
break;
case 'CREATED':
console.log('🆕 Bulk read job created');
await this.handleBulkReadCreated(req.body, accessToken);
break;
default:
console.log('❓ Unknown bulk read state:', state);
logger.warn('Unknown Zoho Bulk Read state', { state, jobId: job_id });
}
res.json({
status: 'success',
message: 'Bulk read webhook processed',
receivedAt: new Date().toISOString()
});
} catch (error) {
console.error('❌ Error processing bulk read webhook:', error);
logger.error('Zoho Bulk Read webhook processing failed', {
correlationId: logger.getCorrelationId(req),
error: error.message,
stack: error.stack
});
res.status(500).json({
status: 'error',
message: 'Bulk read webhook processing failed',
error: error.message
});
}
}
// Helper function to get user data from access token
async getUserFromAccessToken(accessToken) {
try {
const jwtService = require('../../auth/jwt.service');
const { decrypt } = require('../../utils/crypto');
// Decrypt the access token if it's encrypted
let decryptedToken;
try {
decryptedToken = decrypt(accessToken);
} catch (error) {
// If decryption fails, assume it's already decrypted
decryptedToken = accessToken;
}
// Verify and decode the JWT token using the same service
const decoded = jwtService.verify(decryptedToken);
// Get user data from database using the user ID from token
const userRepository = require('../../data/repositories/userRepository');
const user = await userRepository.findById(decoded.id);
if (!user) {
throw new Error('User not found');
}
console.log(`👤 User authenticated: ${user.uuid} (${user.email})`);
return user;
} catch (error) {
console.error('❌ Error getting user from access token:', error);
throw error;
}
}
// Bulk Read Event Handlers
async handleBulkReadCompleted(data, accessToken) {
console.log('🎉 Processing completed bulk read job:', data.job_id);
logger.info('Bulk read job completed', {
jobId: data.job_id,
recordsCount: data.records_count,
downloadUrl: data.download_url
});
try {
// Import services
const CsvService = require('../../services/csvService');
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
const userAuthTokenRepo = require('../../data/repositories/userAuthTokenRepository');
const { decrypt } = require('../../utils/crypto');
const csvService = new CsvService();
// Extract job details
const { job_id, query, result, operation, state, fileType } = data;
const module = query.module;
const downloadUrl = result.download_url;
const recordsCount = result.count;
console.log(`📊 Processing bulk read for module: ${module}`);
console.log(`📥 Download URL: ${downloadUrl}`);
console.log(`📈 Records count: ${recordsCount}`);
// Get user from access token
let user;
try {
user = await this.getUserFromAccessToken(accessToken);
console.log(`👤 User authenticated: ${user.uuid} (${user.email})`);
} catch (error) {
console.error('❌ Failed to authenticate user from access token:', error);
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'failed',
error_message: 'Failed to authenticate user from access token'
});
return;
}
const userId = user.uuid;
// Get or create job record
let jobRecord = await ZohoBulkReadRepository.getBulkReadJob(job_id);
if (!jobRecord) {
console.log('📝 Job record not found, creating new one...');
// Create job record with the webhook data
const jobData = {
id: job_id,
user_id: userId,
module: query.module,
operation: operation,
state: state,
file_type: fileType,
download_url: result.download_url,
records_count: result.count
};
jobRecord = await ZohoBulkReadRepository.createBulkReadJob(jobData);
console.log('✅ Created new job record');
} else {
// Update existing job record with user ID if it was unknown
if (jobRecord.user_id === 'unknown') {
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
user_id: userId
});
console.log('✅ Updated job record with user ID');
}
}
// Get Zoho access token for the user
const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho');
if (!tokenRecord) {
console.error('❌ No Zoho token found for user:', userId);
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'failed',
error_message: 'No Zoho access token found for user'
});
return;
}
const zohoAccessToken = decrypt(tokenRecord.accessToken);
console.log('🔑 Zoho access token retrieved successfully');
// Fetch CSV data
const csvData = await csvService.fetchCsvData(downloadUrl, zohoAccessToken);
console.log(`📥 Fetched ${csvData.length} records from CSV`);
// Parse and map data
const mappedData = csvService.parseCsvData(csvData, module, userId, job_id);
console.log(`🔄 Mapped ${mappedData.length} records for database insertion`);
// Clear existing data for this job
await ZohoBulkReadRepository.clearUserData(userId, module, job_id);
// Bulk insert data
const insertedRecords = await ZohoBulkReadRepository.bulkInsert(module, mappedData);
console.log(`✅ Successfully inserted ${insertedRecords.length} records`);
// Update job status
await ZohoBulkReadRepository.updateBulkReadJob(job_id, {
status: 'completed',
processed_count: insertedRecords.length,
state: 'COMPLETED'
});
console.log('🎉 Bulk read processing completed successfully');
logger.info('Bulk read processing completed', {
jobId: job_id,
module,
userId,
recordsProcessed: insertedRecords.length,
totalRecords: recordsCount
});
} catch (error) {
console.error('❌ Error processing bulk read completion:', error);
logger.error('Bulk read processing failed', {
jobId: data.job_id,
error: error.message,
stack: error.stack
});
// Update job status to failed
try {
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'failed',
error_message: error.message
});
} catch (updateError) {
console.error('❌ Error updating job status:', updateError);
}
}
}
async handleBulkReadFailed(data, accessToken) {
console.log('💥 Processing failed bulk read job:', data.job_id);
logger.error('Bulk read job failed', {
jobId: data.job_id,
errorMessage: data.error_message
});
try {
// Get user from access token to update job record
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'failed',
state: 'FAILED',
error_message: data.error_message || 'Bulk read job failed'
});
console.log('✅ Updated job status to failed');
} catch (error) {
console.error('❌ Error updating failed job status:', error);
}
}
async handleBulkReadInProgress(data, accessToken) {
console.log('⏳ Bulk read job in progress:', data.job_id);
logger.info('Bulk read job in progress', {
jobId: data.job_id
});
try {
// Get user from access token to update job record
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, {
status: 'in_progress',
state: 'IN_PROGRESS'
});
console.log('✅ Updated job status to in_progress');
} catch (error) {
console.error('❌ Error updating job status:', error);
}
}
async handleBulkReadCreated(data, accessToken) {
console.log('🆕 Bulk read job created:', data.job_id);
logger.info('Bulk read job created', {
jobId: data.job_id,
module: data.query?.module
});
try {
// Get user from access token
const user = await this.getUserFromAccessToken(accessToken);
const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository');
const jobData = {
id: data.job_id,
user_id: user.uuid,
module: data.query?.module || 'unknown',
operation: data.operation || 'read',
state: data.state || 'CREATED',
file_type: data.fileType || 'csv',
download_url: data.result?.download_url,
records_count: data.result?.count || 0
};
await ZohoBulkReadRepository.createBulkReadJob(jobData);
console.log('✅ Created job record in database');
} catch (error) {
console.error('❌ Error creating job record:', error);
}
}
// Projects Event Handlers // Projects Event Handlers
async handleProjectCreated(data) { async handleProjectCreated(data) {
logger.info('Project created', { projectId: data.id, name: data.name }); logger.info('Project created', { projectId: data.id, name: data.name });

View File

@ -0,0 +1,304 @@
const axios = require('axios');
const ZohoBulkReadRepository = require('../data/repositories/zohoBulkReadRepository');
const userAuthTokenRepo = require('../data/repositories/userAuthTokenRepository');
const { decrypt } = require('../utils/crypto');
const logger = require('../utils/logger');
class BulkReadService {
constructor() {
this.baseUrl = 'https://www.zohoapis.com';
}
/**
* Initiate a bulk read job for a specific module
* @param {string} userId - User UUID
* @param {string} module - Zoho module name
* @param {Array} fields - Fields to fetch
* @param {Object} options - Additional options
* @returns {Promise<Object>} Job details
*/
async initiateBulkRead(userId, module, fields, options = {}) {
try {
console.log(`🚀 Initiating bulk read for user ${userId}, module ${module}`);
// Get access token
const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho');
if (!tokenRecord) {
throw new Error('No Zoho access token found for user');
}
const accessToken = decrypt(tokenRecord.accessToken);
// Prepare bulk read request
const bulkReadData = {
callback: {
url: `${process.env.API_BASE_URL || 'http://localhost:3000'}/api/v1/integrations/webhooks/zoho/bulkread`,
method: 'post'
},
query: {
module: module,
fields: fields,
page: options.page || 1,
limit: options.limit || 200000
}
};
console.log('📋 Bulk read request:', JSON.stringify(bulkReadData, null, 2));
// Make API call to Zoho
const response = await axios.post(
`${this.baseUrl}/crm/bulk/v2/read`,
bulkReadData,
{
headers: {
'Authorization': `Zoho-oauthtoken ${accessToken}`,
'Content-Type': 'application/json'
}
}
);
const jobData = response.data;
console.log('✅ Bulk read job initiated:', jobData);
// Store job in database
const jobRecord = await ZohoBulkReadRepository.createBulkReadJob({
id: jobData.id,
user_id: userId,
module: module,
operation: 'read',
state: 'CREATED',
file_type: 'csv',
records_count: 0,
status: 'pending'
});
logger.info('Bulk read job initiated', {
userId,
module,
jobId: jobData.id,
fields: fields.length
});
return {
jobId: jobData.id,
status: 'initiated',
message: `Bulk read job initiated for ${module}`,
estimatedTime: this.getEstimatedTime(module, options.limit)
};
} catch (error) {
console.error('❌ Error initiating bulk read:', error.message);
logger.error('Bulk read initiation failed', {
userId,
module,
error: error.message
});
throw error;
}
}
/**
* Get bulk read job status
* @param {string} userId - User UUID
* @param {string} jobId - Job ID
* @returns {Promise<Object>} Job status
*/
async getBulkReadJobStatus(userId, jobId) {
try {
const job = await ZohoBulkReadRepository.getBulkReadJob(jobId);
if (!job) {
throw new Error('Job not found');
}
if (job.user_id !== userId) {
throw new Error('Unauthorized access to job');
}
return {
jobId: job.id,
module: job.module,
status: job.status,
state: job.state,
recordsCount: job.records_count,
processedCount: job.processed_count,
createdAt: job.created_at,
updatedAt: job.updated_at,
errorMessage: job.error_message
};
} catch (error) {
console.error('❌ Error getting job status:', error.message);
throw error;
}
}
/**
* Get user's bulk read jobs
* @param {string} userId - User UUID
* @param {Object} options - Query options
* @returns {Promise<Array>} Job list
*/
async getUserBulkReadJobs(userId, options = {}) {
try {
const jobs = await ZohoBulkReadRepository.getUserBulkReadJobs(userId, options);
return jobs.map(job => ({
jobId: job.id,
module: job.module,
status: job.status,
state: job.state,
recordsCount: job.records_count,
processedCount: job.processed_count,
createdAt: job.created_at,
updatedAt: job.updated_at
}));
} catch (error) {
console.error('❌ Error getting user jobs:', error.message);
throw error;
}
}
/**
* Get bulk read data for a module
* @param {string} userId - User UUID
* @param {string} module - Module name
* @param {Object} options - Query options
* @returns {Promise<Object>} Data and pagination info
*/
async getBulkReadData(userId, module, options = {}) {
try {
const { page = 1, limit = 100, orderBy = 'created_time', orderDirection = 'DESC' } = options;
const data = await ZohoBulkReadRepository.getUserData(userId, module, {
limit: parseInt(limit),
offset: (parseInt(page) - 1) * parseInt(limit),
orderBy,
orderDirection
});
const totalCount = await ZohoBulkReadRepository.getUserDataCount(userId, module);
return {
data: data,
pagination: {
page: parseInt(page),
limit: parseInt(limit),
total: totalCount,
pages: Math.ceil(totalCount / parseInt(limit))
}
};
} catch (error) {
console.error('❌ Error getting bulk read data:', error.message);
throw error;
}
}
/**
* Get estimated processing time for a module
* @param {string} module - Module name
* @param {number} limit - Record limit
* @returns {string} Estimated time
*/
getEstimatedTime(module, limit = 200000) {
const baseTime = {
'contacts': 2,
'leads': 2,
'accounts': 1,
'tasks': 3,
'vendors': 1,
'invoices': 2,
'sales_orders': 2,
'purchase_orders': 2
};
const minutes = baseTime[module.toLowerCase()] || 2;
const adjustedMinutes = Math.ceil(minutes * (limit / 100000));
return `${adjustedMinutes} minutes`;
}
/**
* Get available modules for bulk read
* @returns {Array} Available modules
*/
getAvailableModules() {
return [
{
name: 'contacts',
displayName: 'Contacts',
description: 'Customer contact information',
fields: [
'id', 'First_Name', 'Last_Name', 'Email', 'Phone', 'Mobile',
'Lead_Source', 'Account_Name.Account_Name', 'Owner', 'Created_Time', 'Modified_Time'
]
},
{
name: 'leads',
displayName: 'Leads',
description: 'Sales lead information',
fields: [
'id', 'First_Name', 'Last_Name', 'Company', 'Lead_Source',
'Lead_Status', 'Owner', 'Email', 'Phone', 'Created_Time'
]
},
{
name: 'accounts',
displayName: 'Accounts',
description: 'Account information',
fields: [
'id', 'Account_Name', 'Phone', 'Website', 'Industry',
'Ownership', 'Annual_Revenue', 'Owner', 'Created_Time'
]
},
{
name: 'tasks',
displayName: 'Tasks',
description: 'Task information',
fields: [
'id', 'Subject', 'Owner', 'Status', 'Priority',
'Due_Date', 'What_Id', 'Created_Time'
]
},
{
name: 'vendors',
displayName: 'Vendors',
description: 'Vendor information',
fields: [
'id', 'Vendor_Name', 'Email', 'Phone', 'Website', 'Owner', 'Created_Time'
]
},
{
name: 'invoices',
displayName: 'Invoices',
description: 'Invoice information',
fields: [
'id', 'Invoice_Number', 'Invoice_Date', 'Due_Date', 'Status',
'Total', 'Balance', 'Account_Name.Account_Name', 'Owner', 'Created_Time'
]
},
{
name: 'sales_orders',
displayName: 'Sales Orders',
description: 'Sales order information',
fields: [
'id', 'Sales_Order_Number', 'Subject', 'Status', 'Due_Date',
'Total', 'Account_Name.Account_Name', 'Owner', 'Created_Time'
]
},
{
name: 'purchase_orders',
displayName: 'Purchase Orders',
description: 'Purchase order information',
fields: [
'id', 'Purchase_Order_Number', 'Subject', 'Vendor_Name.Vendor_Name',
'Status', 'Due_Date', 'Total', 'Owner', 'Created_Time'
]
}
];
}
}
module.exports = BulkReadService;

314
src/services/csvService.js Normal file
View File

@ -0,0 +1,314 @@
const axios = require('axios');
const csv = require('csv-parser');
const { Readable } = require('stream');
const yauzl = require('yauzl');
const logger = require('../utils/logger');
class CsvService {
constructor() {
this.baseUrl = 'https://www.zohoapis.com';
}
/**
* Fetch CSV data from Zoho download URL
* @param {string} downloadUrl - The download URL from webhook
* @param {string} accessToken - Zoho access token
* @returns {Promise<Array>} Parsed CSV data as array of objects
*/
async fetchCsvData(downloadUrl, accessToken) {
try {
console.log('📥 Fetching CSV data from:', downloadUrl);
const fullUrl = downloadUrl.startsWith('http') ? downloadUrl : `${this.baseUrl}${downloadUrl}`;
const response = await axios.get(fullUrl, {
headers: {
'Authorization': `Zoho-oauthtoken ${accessToken}`,
'Accept': 'application/zip, application/octet-stream'
},
responseType: 'arraybuffer'
});
console.log('✅ Data fetched successfully, size:', response.data.length);
console.log('📊 Response status:', response.status);
console.log('📊 Content-Type:', response.headers['content-type']);
// Check if it's a ZIP file
const buffer = Buffer.from(response.data);
if (buffer.toString('hex', 0, 4) === '504b0304') {
console.log('📦 Detected ZIP file, extracting CSV...');
return await this.extractCsvFromZip(buffer);
} else {
console.log('📄 Treating as direct CSV...');
return await this.parseCsvFromBuffer(buffer);
}
} catch (error) {
console.error('❌ Error fetching CSV data:', error.message);
logger.error('CSV fetch failed', {
downloadUrl,
error: error.message,
status: error.response?.status,
statusText: error.response?.statusText
});
throw error;
}
}
async extractCsvFromZip(zipBuffer) {
return new Promise((resolve, reject) => {
yauzl.fromBuffer(zipBuffer, { lazyEntries: true }, (err, zipfile) => {
if (err) {
console.error('❌ Error opening ZIP file:', err);
return reject(err);
}
zipfile.readEntry();
zipfile.on('entry', (entry) => {
if (entry.fileName.endsWith('.csv')) {
console.log('📄 Found CSV file in ZIP:', entry.fileName);
zipfile.openReadStream(entry, (err, readStream) => {
if (err) {
console.error('❌ Error reading CSV from ZIP:', err);
return reject(err);
}
const results = [];
const csvStream = readStream.pipe(csv());
csvStream.on('data', (data) => {
results.push(data);
});
csvStream.on('end', () => {
console.log(`📊 Extracted ${results.length} records from CSV in ZIP`);
resolve(results);
});
csvStream.on('error', (error) => {
console.error('❌ Error parsing CSV from ZIP:', error);
reject(error);
});
});
} else {
zipfile.readEntry();
}
});
zipfile.on('end', () => {
console.log('📦 Finished reading ZIP file');
});
zipfile.on('error', (error) => {
console.error('❌ ZIP file error:', error);
reject(error);
});
});
});
}
async parseCsvFromBuffer(buffer) {
return new Promise((resolve, reject) => {
const results = [];
const stream = Readable.from(buffer.toString()).pipe(csv());
stream.on('data', (data) => {
results.push(data);
});
stream.on('end', () => {
console.log(`📊 Parsed ${results.length} records from CSV`);
resolve(results);
});
stream.on('error', (error) => {
console.error('❌ Error parsing CSV:', error);
reject(error);
});
});
}
/**
* Parse CSV data and map to database fields
* @param {Array} csvData - Raw CSV data
* @param {string} module - Zoho module name
* @param {string} userId - User UUID
* @param {string} jobId - Bulk job ID
* @returns {Array} Mapped data ready for database insertion
*/
parseCsvData(csvData, module, userId, jobId) {
console.log(`🔄 Parsing ${csvData.length} records for module: ${module}`);
const mappedData = csvData.map(record => {
const baseRecord = {
user_id: userId,
provider: 'zoho',
bulk_job_id: jobId
};
// Map fields based on module type
switch (module.toLowerCase()) {
case 'contacts':
return this.mapContactFields(record, baseRecord);
case 'leads':
return this.mapLeadFields(record, baseRecord);
case 'accounts':
return this.mapAccountFields(record, baseRecord);
case 'tasks':
return this.mapTaskFields(record, baseRecord);
case 'vendors':
return this.mapVendorFields(record, baseRecord);
case 'invoices':
return this.mapInvoiceFields(record, baseRecord);
case 'sales_orders':
return this.mapSalesOrderFields(record, baseRecord);
case 'purchase_orders':
return this.mapPurchaseOrderFields(record, baseRecord);
default:
console.warn(`⚠️ Unknown module: ${module}`);
return { ...baseRecord, ...record };
}
});
console.log(`✅ Mapped ${mappedData.length} records for ${module}`);
return mappedData;
}
// Field mapping methods for each module
mapContactFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
first_name: record.First_Name,
last_name: record.Last_Name,
email: record.Email,
phone: record.Phone,
mobile: record.Mobile,
lead_source: record.Lead_Source,
account_name: record['Account_Name.Account_Name'] || record.Account_Name,
owner: record.Owner,
created_time: this.parseDate(record.Created_Time),
modified_time: this.parseDate(record.Modified_Time)
};
}
mapLeadFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
first_name: record.First_Name,
last_name: record.Last_Name,
company: record.Company,
lead_source: record.Lead_Source,
lead_status: record.Lead_Status,
owner: record.Owner,
email: record.Email,
phone: record.Phone,
created_time: this.parseDate(record.Created_Time)
};
}
mapAccountFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
account_name: record.Account_Name,
phone: record.Phone,
website: record.Website,
industry: record.Industry,
ownership: record.Ownership,
annual_revenue: this.parseDecimal(record.Annual_Revenue),
owner: record.Owner,
created_time: this.parseDate(record.Created_Time)
};
}
mapTaskFields(record, baseRecord) {
const mapped = {
...baseRecord,
zoho_id: record.Id,
subject: record.Subject,
owner: record.Owner,
status: record.Status,
priority: record.Priority,
due_date: this.parseDate(record.Due_Date),
what_id: record.What_Id,
created_time: this.parseDate(record.Created_Time)
};
return mapped;
}
mapVendorFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
vendor_name: record.Vendor_Name,
email: record.Email,
phone: record.Phone,
website: record.Website,
owner: record.Owner,
created_time: this.parseDate(record.Created_Time)
};
}
mapInvoiceFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
invoice_number: record.Invoice_Number,
invoice_date: this.parseDate(record.Invoice_Date),
due_date: this.parseDate(record.Due_Date),
status: record.Status,
total: this.parseDecimal(record.Total),
balance: this.parseDecimal(record.Balance),
account_name: record['Account_Name.Account_Name'] || record.Account_Name,
owner: record.Owner,
created_time: this.parseDate(record.Created_Time)
};
}
mapSalesOrderFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
sales_order_number: record.Sales_Order_Number,
subject: record.Subject,
status: record.Status,
due_date: this.parseDate(record.Due_Date),
total: this.parseDecimal(record.Total),
account_name: record['Account_Name.Account_Name'] || record.Account_Name,
owner: record.Owner,
created_time: this.parseDate(record.Created_Time)
};
}
mapPurchaseOrderFields(record, baseRecord) {
return {
...baseRecord,
zoho_id: record.id,
purchase_order_number: record.Purchase_Order_Number,
subject: record.Subject,
vendor_name: record['Vendor_Name.Vendor_Name'] || record.Vendor_Name,
status: record.Status,
due_date: this.parseDate(record.Due_Date),
total: this.parseDecimal(record.Total),
owner: record.Owner,
created_time: this.parseDate(record.Created_Time)
};
}
// Utility methods
parseDate(dateString) {
if (!dateString || dateString === '') return null;
const date = new Date(dateString);
return isNaN(date.getTime()) ? null : date;
}
parseDecimal(decimalString) {
if (!decimalString || decimalString === '') return null;
const parsed = parseFloat(decimalString);
return isNaN(parsed) ? null : parsed;
}
}
module.exports = CsvService;