From 81dcfcd84364e4057dc73f96dd0eb79f58bf7bf0 Mon Sep 17 00:00:00 2001 From: yashwin-foxy Date: Thu, 18 Sep 2025 17:56:24 +0530 Subject: [PATCH] bulk red web hook feature implemented for crm data lead tasks and contacts working --- package-lock.json | 50 ++- package.json | 4 +- src/api/controllers/integrationController.js | 24 +- src/api/controllers/userController.js | 1 + src/api/routes/bulkReadRoutes.js | 131 +++++++ src/api/routes/integrationRoutes.js | 1 + src/api/routes/userRoutes.js | 74 +++- src/app.js | 2 + src/data/models/zohoAccountsBulk.js | 72 ++++ src/data/models/zohoBulkReadJobs.js | 70 ++++ src/data/models/zohoContactsBulk.js | 80 ++++ src/data/models/zohoInvoicesBulk.js | 76 ++++ src/data/models/zohoLeadsBulk.js | 76 ++++ src/data/models/zohoPurchaseOrdersBulk.js | 72 ++++ src/data/models/zohoSalesOrdersBulk.js | 72 ++++ src/data/models/zohoTasksBulk.js | 68 ++++ src/data/models/zohoVendorsBulk.js | 64 +++ src/data/repositories/userRepository.js | 6 +- .../repositories/zohoBulkReadRepository.js | 250 ++++++++++++ .../002_create_user_auth_tokens.sql | 2 +- .../003_create_zoho_contacts_bulk.sql | 26 ++ .../migrations/004_create_zoho_leads_bulk.sql | 25 ++ .../005_create_zoho_accounts_bulk.sql | 24 ++ .../migrations/006_create_zoho_tasks_bulk.sql | 23 ++ .../007_create_zoho_vendors_bulk.sql | 22 ++ .../008_create_zoho_invoices_bulk.sql | 25 ++ .../009_create_zoho_sales_orders_bulk.sql | 24 ++ .../010_create_zoho_purchase_orders_bulk.sql | 24 ++ .../011_create_zoho_bulk_read_jobs.sql | 22 ++ src/integrations/zoho/handler.js | 367 ++++++++++++++++++ src/services/bulkReadService.js | 304 +++++++++++++++ src/services/csvService.js | 314 +++++++++++++++ 32 files changed, 2377 insertions(+), 18 deletions(-) create mode 100644 src/api/routes/bulkReadRoutes.js create mode 100644 src/data/models/zohoAccountsBulk.js create mode 100644 src/data/models/zohoBulkReadJobs.js create mode 100644 src/data/models/zohoContactsBulk.js create mode 100644 src/data/models/zohoInvoicesBulk.js create mode 100644 src/data/models/zohoLeadsBulk.js create mode 100644 src/data/models/zohoPurchaseOrdersBulk.js create mode 100644 src/data/models/zohoSalesOrdersBulk.js create mode 100644 src/data/models/zohoTasksBulk.js create mode 100644 src/data/models/zohoVendorsBulk.js create mode 100644 src/data/repositories/zohoBulkReadRepository.js create mode 100644 src/db/migrations/003_create_zoho_contacts_bulk.sql create mode 100644 src/db/migrations/004_create_zoho_leads_bulk.sql create mode 100644 src/db/migrations/005_create_zoho_accounts_bulk.sql create mode 100644 src/db/migrations/006_create_zoho_tasks_bulk.sql create mode 100644 src/db/migrations/007_create_zoho_vendors_bulk.sql create mode 100644 src/db/migrations/008_create_zoho_invoices_bulk.sql create mode 100644 src/db/migrations/009_create_zoho_sales_orders_bulk.sql create mode 100644 src/db/migrations/010_create_zoho_purchase_orders_bulk.sql create mode 100644 src/db/migrations/011_create_zoho_bulk_read_jobs.sql create mode 100644 src/services/bulkReadService.js create mode 100644 src/services/csvService.js diff --git a/package-lock.json b/package-lock.json index cf12b19..2027fc0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,13 +1,18 @@ { - "name": "CentralizedReportingBackend", + "name": "centralized-reporting-backend", + "version": "1.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { + "name": "centralized-reporting-backend", + "version": "1.0.0", + "license": "MIT", "dependencies": { "axios": "^1.11.0", "bcrypt": "^6.0.0", "cors": "^2.8.5", + "csv-parser": "^3.2.0", "dotenv": "^17.2.2", "express": "^4.21.2", "express-async-errors": "^3.1.1", @@ -22,7 +27,8 @@ "sequelize": "^6.37.7", "swagger-jsdoc": "^6.2.8", "swagger-ui-express": "^5.0.1", - "uuid": "^13.0.0" + "uuid": "^13.0.0", + "yauzl": "^3.2.0" }, "devDependencies": { "nodemon": "^3.1.10" @@ -350,6 +356,15 @@ "node": ">=8" } }, + "node_modules/buffer-crc32": { + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/buffer-crc32/-/buffer-crc32-0.2.13.tgz", + "integrity": "sha512-VO9Ht/+p3SN7SKWqcrgEzjGbRSJYTx+Q1pTQC0wrWqHx0vpJraQ6GtHx8tvcg1rlK1byhU5gccxgOgj7B0TDkQ==", + "license": "MIT", + "engines": { + "node": "*" + } + }, "node_modules/buffer-equal-constant-time": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", @@ -542,6 +557,18 @@ "node": ">= 0.10" } }, + "node_modules/csv-parser": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/csv-parser/-/csv-parser-3.2.0.tgz", + "integrity": "sha512-fgKbp+AJbn1h2dcAHKIdKNSSjfp43BZZykXsCjzALjKy80VXQNHPFJ6T9Afwdzoj24aMkq8GwDS7KGcDPpejrA==", + "license": "MIT", + "bin": { + "csv-parser": "bin/csv-parser" + }, + "engines": { + "node": ">= 10" + } + }, "node_modules/debug": { "version": "2.6.9", "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", @@ -1828,6 +1855,12 @@ "integrity": "sha512-RA1GjUVMnvYFxuqovrEqZoxxW5NUZqbwKtYz/Tt7nXerk0LbLblQmrsgdeOxV5SFHf0UDggjS/bSeOZwt1pmEQ==", "license": "MIT" }, + "node_modules/pend": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/pend/-/pend-1.2.0.tgz", + "integrity": "sha512-F3asv42UuXchdzt+xXqfW1OGlVBe+mxa2mqI0pg5yAHZPvFmY3Y6drSf/GQ1A86WgWEN9Kzh/WrgKa6iGcHXLg==", + "license": "MIT" + }, "node_modules/pg-connection-string": { "version": "2.9.1", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.9.1.tgz", @@ -2525,6 +2558,19 @@ "node": ">= 6" } }, + "node_modules/yauzl": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/yauzl/-/yauzl-3.2.0.tgz", + "integrity": "sha512-Ow9nuGZE+qp1u4JIPvg+uCiUr7xGQWdff7JQSk5VGYTAZMDe2q8lxJ10ygv10qmSj031Ty/6FNJpLO4o1Sgc+w==", + "license": "MIT", + "dependencies": { + "buffer-crc32": "~0.2.3", + "pend": "~1.2.0" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/z-schema": { "version": "5.0.5", "resolved": "https://registry.npmjs.org/z-schema/-/z-schema-5.0.5.tgz", diff --git a/package.json b/package.json index b5e1588..32cd3c3 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "axios": "^1.11.0", "bcrypt": "^6.0.0", "cors": "^2.8.5", + "csv-parser": "^3.2.0", "dotenv": "^17.2.2", "express": "^4.21.2", "express-async-errors": "^3.1.1", @@ -27,7 +28,8 @@ "sequelize": "^6.37.7", "swagger-jsdoc": "^6.2.8", "swagger-ui-express": "^5.0.1", - "uuid": "^13.0.0" + "uuid": "^13.0.0", + "yauzl": "^3.2.0" }, "devDependencies": { "nodemon": "^3.1.10" diff --git a/src/api/controllers/integrationController.js b/src/api/controllers/integrationController.js index f9c741b..808ac89 100644 --- a/src/api/controllers/integrationController.js +++ b/src/api/controllers/integrationController.js @@ -5,7 +5,7 @@ async function getData(req, res) { try { const { provider, service, resource, page, limit, filters } = req.query; console.log('query is', req.query); - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { @@ -26,7 +26,7 @@ async function getData(req, res) { async function getServices(req, res) { try { const { provider } = req.query; - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const services = await integrationService.getAvailableServices(provider); res.json(success(`${provider} available services`, services)); } catch (error) { @@ -37,7 +37,7 @@ async function getServices(req, res) { async function getResources(req, res) { try { const { provider, service } = req.query; - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const resources = await integrationService.getAvailableResources(provider, service); res.json(success(`${provider} ${service} available resources`, resources)); } catch (error) { @@ -52,7 +52,7 @@ async function getPortals(req, res) { return res.status(400).json(failure('Portals are only available for Zoho provider', 'INVALID_PROVIDER')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const portals = await integrationService.getPortals(provider); res.json(success('Zoho portals retrieved successfully', portals)); } catch (error) { @@ -67,7 +67,7 @@ async function getAllProjects(req, res) { return res.status(400).json(failure('All projects are only available for Zoho provider', 'INVALID_PROVIDER')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -96,7 +96,7 @@ async function getAllProjectTasks(req, res) { return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -125,7 +125,7 @@ async function getAllProjectTaskLists(req, res) { return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -155,7 +155,7 @@ async function getAllProjectIssues(req, res) { return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -186,7 +186,7 @@ async function getAllProjectPhases(req, res) { return res.status(400).json(failure('portal_id is required in query parameters', 'MISSING_PORTAL_ID')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -211,7 +211,7 @@ async function getSalesOrders(req, res) { return res.status(400).json(failure('Sales Orders are only available for Zoho provider', 'INVALID_PROVIDER')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -236,7 +236,7 @@ async function getPurchaseOrders(req, res) { return res.status(400).json(failure('Purchase Orders are only available for Zoho provider', 'INVALID_PROVIDER')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { @@ -261,7 +261,7 @@ async function getInvoices(req, res) { return res.status(400).json(failure('Invoices are only available for Zoho provider', 'INVALID_PROVIDER')); } - const integrationService = new IntegrationService(req.user.id); + const integrationService = new IntegrationService(req.user.uuid); const params = { page, limit }; if (filters) { try { diff --git a/src/api/controllers/userController.js b/src/api/controllers/userController.js index cc3091e..b674e88 100644 --- a/src/api/controllers/userController.js +++ b/src/api/controllers/userController.js @@ -33,6 +33,7 @@ module.exports = { register, me, updateMe, removeMe }; // Exchange Zoho authorization code for tokens and persist async function exchangeZohoToken(req, res) { const { authorization_code, id, service_name } = req.body; + console.log('exchangeZohoToken', req.body); // Optional: ensure the id belongs to the authenticated user (if business rule requires) const params = new URLSearchParams(); params.append('code', authorization_code); diff --git a/src/api/routes/bulkReadRoutes.js b/src/api/routes/bulkReadRoutes.js new file mode 100644 index 0000000..6fa5686 --- /dev/null +++ b/src/api/routes/bulkReadRoutes.js @@ -0,0 +1,131 @@ +const express = require('express'); +const Joi = require('joi'); +const BulkReadService = require('../../services/bulkReadService'); +const auth = require('../middlewares/auth'); +const { success, failure } = require('../../utils/response'); + +const router = express.Router(); + +function validate(schema, source = 'query') { + return (req, res, next) => { + const data = source === 'body' ? req.body : req.query; + const { error, value } = schema.validate(data, { abortEarly: false, stripUnknown: true }); + if (error) { + return res.status(400).json({ + status: 'error', + message: 'Validation failed', + errorCode: 'VALIDATION_ERROR', + details: error.details, + timestamp: new Date().toISOString() + }); + } + if (source === 'body') { + req.body = value; + } else { + req.query = value; + } + next(); + }; +} + +// Initiate bulk read job +const initiateBulkReadSchema = Joi.object({ + module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(), + fields: Joi.array().items(Joi.string()).min(1).required(), + page: Joi.number().min(1).default(1), + limit: Joi.number().min(1).max(200000).default(200000) +}); + +router.post('/initiate', auth, validate(initiateBulkReadSchema, 'body'), async (req, res) => { + try { + const { module, fields, page, limit } = req.body; + const userId = req.user.uuid; + + const bulkReadService = new BulkReadService(); + const result = await bulkReadService.initiateBulkRead(userId, module, fields, { page, limit }); + + res.json(success('Bulk read job initiated successfully', result)); + } catch (error) { + res.status(400).json(failure(error.message, 'BULK_READ_ERROR')); + } +}); + +// Get bulk read job status +const jobStatusSchema = Joi.object({ + job_id: Joi.string().required() +}); + +router.get('/job/:job_id', auth, validate(jobStatusSchema, 'params'), async (req, res) => { + try { + const { job_id } = req.params; + const userId = req.user.uuid; + + const bulkReadService = new BulkReadService(); + const result = await bulkReadService.getBulkReadJobStatus(userId, job_id); + + res.json(success('Job status retrieved successfully', result)); + } catch (error) { + res.status(400).json(failure(error.message, 'JOB_STATUS_ERROR')); + } +}); + +// Get user's bulk read jobs +const userJobsSchema = Joi.object({ + page: Joi.number().min(1).default(1), + limit: Joi.number().min(1).max(100).default(50), + status: Joi.string().valid('pending', 'in_progress', 'completed', 'failed').optional() +}); + +router.get('/jobs', auth, validate(userJobsSchema), async (req, res) => { + try { + const { page, limit, status } = req.query; + const userId = req.user.uuid; + + const bulkReadService = new BulkReadService(); + const result = await bulkReadService.getUserBulkReadJobs(userId, { page, limit, status }); + + res.json(success('Bulk read jobs retrieved successfully', result)); + } catch (error) { + res.status(400).json(failure(error.message, 'JOBS_RETRIEVAL_ERROR')); + } +}); + +// Get bulk read data for a module +const moduleDataSchema = Joi.object({ + module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(), + page: Joi.number().min(1).default(1), + limit: Joi.number().min(1).max(1000).default(100), + orderBy: Joi.string().default('created_time'), + orderDirection: Joi.string().valid('ASC', 'DESC').default('DESC') +}); + +router.get('/data/:module', auth, validate(moduleDataSchema, 'params'), async (req, res) => { + try { + const { module } = req.params; + const { page, limit, orderBy, orderDirection } = req.query; + const userId = req.user.uuid; + + const bulkReadService = new BulkReadService(); + const result = await bulkReadService.getBulkReadData(userId, module, { + page, limit, orderBy, orderDirection + }); + + res.json(success(`${module} data retrieved successfully`, result)); + } catch (error) { + res.status(400).json(failure(error.message, 'DATA_RETRIEVAL_ERROR')); + } +}); + +// Get available modules +router.get('/modules', auth, async (req, res) => { + try { + const bulkReadService = new BulkReadService(); + const modules = bulkReadService.getAvailableModules(); + + res.json(success('Available modules retrieved successfully', modules)); + } catch (error) { + res.status(400).json(failure(error.message, 'MODULES_RETRIEVAL_ERROR')); + } +}); + +module.exports = router; diff --git a/src/api/routes/integrationRoutes.js b/src/api/routes/integrationRoutes.js index 28a268f..229b11c 100644 --- a/src/api/routes/integrationRoutes.js +++ b/src/api/routes/integrationRoutes.js @@ -151,5 +151,6 @@ const zohoHandler = new ZohoHandler(); router.post('/webhooks/zoho/crm', zohoHandler.handleCrmWebhook.bind(zohoHandler)); router.post('/webhooks/zoho/people', zohoHandler.handlePeopleWebhook.bind(zohoHandler)); router.post('/webhooks/zoho/projects', zohoHandler.handleProjectsWebhook.bind(zohoHandler)); +router.post('/webhooks/zoho/bulkread', zohoHandler.handleBulkReadWebhook.bind(zohoHandler)); module.exports = router; diff --git a/src/api/routes/userRoutes.js b/src/api/routes/userRoutes.js index d7bd7ed..4d4af0f 100644 --- a/src/api/routes/userRoutes.js +++ b/src/api/routes/userRoutes.js @@ -5,6 +5,7 @@ const { register, me, updateMe, removeMe, exchangeZohoToken } = require('../cont const auth = require('../middlewares/auth'); const { registerSchema, updateSchema } = require('../validators/userValidator'); const Joi = require('joi'); +const crypto = require('crypto'); const router = express.Router(); @@ -14,6 +15,20 @@ const storage = multer.diskStorage({ }); const upload = multer({ storage }); +// Decryption function +function decrypt(ciphertext) { + const algorithm = 'aes-256-gcm'; + const key = crypto.createHash('sha256').update('changeme').digest(); + const buf = Buffer.from(ciphertext, 'base64'); + const iv = buf.subarray(0, 12); + const authTag = buf.subarray(12, 28); + const encrypted = buf.subarray(28); + const decipher = crypto.createDecipheriv(algorithm, key, iv); + decipher.setAuthTag(authTag); + const decrypted = Buffer.concat([decipher.update(encrypted), decipher.final()]); + return decrypted.toString('utf8'); +} + function validate(schema) { return (req, res, next) => { const toValidate = req.method === 'GET' ? req.query : req.body; @@ -34,9 +49,66 @@ router.delete('/me', auth, removeMe); // OAuth token exchange (Zoho request currently) const zohoTokenSchema = Joi.object({ authorization_code: Joi.string().required(), - id: Joi.number().required(), + id: Joi.string().required(), service_name: Joi.string().valid('zoho', 'keka', 'bamboohr', 'hubspot', 'other').required() }); router.post('/zoho/token', auth, validate(zohoTokenSchema), exchangeZohoToken); +// Decrypt access token route +const decryptTokenSchema = Joi.object({ + service_name: Joi.string().valid('zoho', 'keka', 'bamboohr', 'hubspot', 'other').required() +}); +router.get('/decrypt-token', auth, validate(decryptTokenSchema), (req, res) => { + try { + const { service_name } = req.query; + const userId = req.user.uuid; + // Import the userAuthTokenRepository + const userAuthTokenRepo = require('../../data/repositories/userAuthTokenRepository'); + + // Find the access token for the user and service + userAuthTokenRepo.findByUserAndService(userId, service_name) + .then(token => { + if (!token) { + return res.status(404).json({ + status: 'error', + message: `${service_name} token not found for user`, + errorCode: 'TOKEN_NOT_FOUND', + timestamp: new Date().toISOString() + }); + } + + // Decrypt the access token + const decryptedToken = decrypt(token.accessToken); + + res.json({ + status: 'success', + message: `${service_name} token fetched successfully`, + data: { + service: service_name, + accessToken: decryptedToken, + expiresAt: token.expiresAt + }, + timestamp: new Date().toISOString() + }); + }) + .catch(error => { + res.status(500).json({ + status: 'error', + message: 'Error retrieving token', + errorCode: 'TOKEN_RETRIEVAL_ERROR', + details: error.message, + timestamp: new Date().toISOString() + }); + }); + } catch (error) { + res.status(500).json({ + status: 'error', + message: 'Error decrypting token', + errorCode: 'DECRYPTION_ERROR', + details: error.message, + timestamp: new Date().toISOString() + }); + } +}); + module.exports = router; diff --git a/src/app.js b/src/app.js index 9580aa0..9408d3c 100644 --- a/src/app.js +++ b/src/app.js @@ -11,6 +11,7 @@ const config = require('./config'); const userRoutes = require('./api/routes/userRoutes'); const authRoutes = require('./api/routes/authRoutes'); const integrationRoutes = require('./api/routes/integrationRoutes'); +const bulkReadRoutes = require('./api/routes/bulkReadRoutes'); const sequelize = require('./db/pool'); const app = express(); @@ -41,6 +42,7 @@ app.get('/health', async (req, res) => { app.use(`${config.app.apiPrefix}/auth`, authRoutes); app.use(`${config.app.apiPrefix}/users`, userRoutes); app.use(`${config.app.apiPrefix}/integrations`, integrationRoutes); +app.use(`${config.app.apiPrefix}/bulk-read`, bulkReadRoutes); module.exports = app; diff --git a/src/data/models/zohoAccountsBulk.js b/src/data/models/zohoAccountsBulk.js new file mode 100644 index 0000000..16ae79f --- /dev/null +++ b/src/data/models/zohoAccountsBulk.js @@ -0,0 +1,72 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoAccountsBulk = sequelize.define('ZohoAccountsBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + account_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + phone: { + type: DataTypes.STRING(255), + allowNull: true + }, + website: { + type: DataTypes.STRING(255), + allowNull: true + }, + industry: { + type: DataTypes.STRING(255), + allowNull: true + }, + ownership: { + type: DataTypes.STRING(255), + allowNull: true + }, + annual_revenue: { + type: DataTypes.DECIMAL(15, 2), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_accounts_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoAccountsBulk; diff --git a/src/data/models/zohoBulkReadJobs.js b/src/data/models/zohoBulkReadJobs.js new file mode 100644 index 0000000..10dda9a --- /dev/null +++ b/src/data/models/zohoBulkReadJobs.js @@ -0,0 +1,70 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoBulkReadJobs = sequelize.define('ZohoBulkReadJobs', { + id: { + type: DataTypes.STRING(255), + primaryKey: true, + allowNull: false + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + module: { + type: DataTypes.STRING(100), + allowNull: false + }, + operation: { + type: DataTypes.STRING(50), + allowNull: false + }, + state: { + type: DataTypes.STRING(50), + allowNull: false + }, + file_type: { + type: DataTypes.STRING(10), + allowNull: false + }, + download_url: { + type: DataTypes.TEXT, + allowNull: true + }, + records_count: { + type: DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, + processed_count: { + type: DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, + status: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'pending' + }, + error_message: { + type: DataTypes.TEXT, + allowNull: true + } +}, { + tableName: 'zoho_bulk_read_jobs', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['module'] }, + { fields: ['status'] } + ] +}); + +module.exports = ZohoBulkReadJobs; diff --git a/src/data/models/zohoContactsBulk.js b/src/data/models/zohoContactsBulk.js new file mode 100644 index 0000000..ee60b2f --- /dev/null +++ b/src/data/models/zohoContactsBulk.js @@ -0,0 +1,80 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoContactsBulk = sequelize.define('ZohoContactsBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + first_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + last_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + email: { + type: DataTypes.STRING(255), + allowNull: true + }, + phone: { + type: DataTypes.STRING(255), + allowNull: true + }, + mobile: { + type: DataTypes.STRING(255), + allowNull: true + }, + lead_source: { + type: DataTypes.STRING(255), + allowNull: true + }, + account_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + modified_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_contacts_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoContactsBulk; diff --git a/src/data/models/zohoInvoicesBulk.js b/src/data/models/zohoInvoicesBulk.js new file mode 100644 index 0000000..abd2864 --- /dev/null +++ b/src/data/models/zohoInvoicesBulk.js @@ -0,0 +1,76 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoInvoicesBulk = sequelize.define('ZohoInvoicesBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + invoice_number: { + type: DataTypes.STRING(255), + allowNull: true + }, + invoice_date: { + type: DataTypes.DATEONLY, + allowNull: true + }, + due_date: { + type: DataTypes.DATEONLY, + allowNull: true + }, + status: { + type: DataTypes.STRING(255), + allowNull: true + }, + total: { + type: DataTypes.DECIMAL(15, 2), + allowNull: true + }, + balance: { + type: DataTypes.DECIMAL(15, 2), + allowNull: true + }, + account_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_invoices_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoInvoicesBulk; diff --git a/src/data/models/zohoLeadsBulk.js b/src/data/models/zohoLeadsBulk.js new file mode 100644 index 0000000..4f7e090 --- /dev/null +++ b/src/data/models/zohoLeadsBulk.js @@ -0,0 +1,76 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoLeadsBulk = sequelize.define('ZohoLeadsBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + first_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + last_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + company: { + type: DataTypes.STRING(255), + allowNull: true + }, + lead_source: { + type: DataTypes.STRING(255), + allowNull: true + }, + lead_status: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + email: { + type: DataTypes.STRING(255), + allowNull: true + }, + phone: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_leads_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoLeadsBulk; diff --git a/src/data/models/zohoPurchaseOrdersBulk.js b/src/data/models/zohoPurchaseOrdersBulk.js new file mode 100644 index 0000000..a2fe48e --- /dev/null +++ b/src/data/models/zohoPurchaseOrdersBulk.js @@ -0,0 +1,72 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoPurchaseOrdersBulk = sequelize.define('ZohoPurchaseOrdersBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + purchase_order_number: { + type: DataTypes.STRING(255), + allowNull: true + }, + subject: { + type: DataTypes.STRING(500), + allowNull: true + }, + vendor_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + status: { + type: DataTypes.STRING(255), + allowNull: true + }, + due_date: { + type: DataTypes.DATEONLY, + allowNull: true + }, + total: { + type: DataTypes.DECIMAL(15, 2), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_purchase_orders_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoPurchaseOrdersBulk; diff --git a/src/data/models/zohoSalesOrdersBulk.js b/src/data/models/zohoSalesOrdersBulk.js new file mode 100644 index 0000000..61d17b1 --- /dev/null +++ b/src/data/models/zohoSalesOrdersBulk.js @@ -0,0 +1,72 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoSalesOrdersBulk = sequelize.define('ZohoSalesOrdersBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + sales_order_number: { + type: DataTypes.STRING(255), + allowNull: true + }, + subject: { + type: DataTypes.STRING(500), + allowNull: true + }, + status: { + type: DataTypes.STRING(255), + allowNull: true + }, + due_date: { + type: DataTypes.DATEONLY, + allowNull: true + }, + total: { + type: DataTypes.DECIMAL(15, 2), + allowNull: true + }, + account_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_sales_orders_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoSalesOrdersBulk; diff --git a/src/data/models/zohoTasksBulk.js b/src/data/models/zohoTasksBulk.js new file mode 100644 index 0000000..ef1e785 --- /dev/null +++ b/src/data/models/zohoTasksBulk.js @@ -0,0 +1,68 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoTasksBulk = sequelize.define('ZohoTasksBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + subject: { + type: DataTypes.STRING(500), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + status: { + type: DataTypes.STRING(255), + allowNull: true + }, + priority: { + type: DataTypes.STRING(255), + allowNull: true + }, + due_date: { + type: DataTypes.DATE, + allowNull: true + }, + what_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_tasks_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoTasksBulk; diff --git a/src/data/models/zohoVendorsBulk.js b/src/data/models/zohoVendorsBulk.js new file mode 100644 index 0000000..4675458 --- /dev/null +++ b/src/data/models/zohoVendorsBulk.js @@ -0,0 +1,64 @@ +const { DataTypes } = require('sequelize'); +const sequelize = require('../../db/pool'); + +const ZohoVendorsBulk = sequelize.define('ZohoVendorsBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false + }, + zoho_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_id: { + type: DataTypes.STRING(255), + allowNull: false + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'zoho' + }, + vendor_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + email: { + type: DataTypes.STRING(255), + allowNull: true + }, + phone: { + type: DataTypes.STRING(255), + allowNull: true + }, + website: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + } +}, { + tableName: 'zoho_vendors_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { fields: ['user_id', 'provider'] }, + { fields: ['bulk_job_id'] }, + { fields: ['created_time'] } + ] +}); + +module.exports = ZohoVendorsBulk; diff --git a/src/data/repositories/userRepository.js b/src/data/repositories/userRepository.js index d2e0eca..642aacf 100644 --- a/src/data/repositories/userRepository.js +++ b/src/data/repositories/userRepository.js @@ -4,6 +4,10 @@ async function createUser(payload) { return User.create(payload); } +async function findById(id) { + return User.findByPk(id); +} + async function findByEmail(email) { return User.findOne({ where: { email } }); } @@ -26,5 +30,5 @@ async function deleteByUuid(uuid) { return true; } -module.exports = { createUser, findByEmail, findByUuid, updateByUuid, deleteByUuid }; +module.exports = { createUser, findById, findByEmail, findByUuid, updateByUuid, deleteByUuid }; diff --git a/src/data/repositories/zohoBulkReadRepository.js b/src/data/repositories/zohoBulkReadRepository.js new file mode 100644 index 0000000..3ce9ead --- /dev/null +++ b/src/data/repositories/zohoBulkReadRepository.js @@ -0,0 +1,250 @@ +const ZohoContactsBulk = require('../models/zohoContactsBulk'); +const ZohoLeadsBulk = require('../models/zohoLeadsBulk'); +const ZohoAccountsBulk = require('../models/zohoAccountsBulk'); +const ZohoTasksBulk = require('../models/zohoTasksBulk'); +const ZohoVendorsBulk = require('../models/zohoVendorsBulk'); +const ZohoInvoicesBulk = require('../models/zohoInvoicesBulk'); +const ZohoSalesOrdersBulk = require('../models/zohoSalesOrdersBulk'); +const ZohoPurchaseOrdersBulk = require('../models/zohoPurchaseOrdersBulk'); +const ZohoBulkReadJobs = require('../models/zohoBulkReadJobs'); + +class ZohoBulkReadRepository { + constructor() { + this.models = { + 'contacts': ZohoContactsBulk, + 'leads': ZohoLeadsBulk, + 'accounts': ZohoAccountsBulk, + 'tasks': ZohoTasksBulk, + 'vendors': ZohoVendorsBulk, + 'invoices': ZohoInvoicesBulk, + 'sales_orders': ZohoSalesOrdersBulk, + 'purchase_orders': ZohoPurchaseOrdersBulk + }; + } + + /** + * Get the appropriate model for a module + * @param {string} module - Module name + * @returns {Object} Sequelize model + */ + getModel(module) { + const model = this.models[module.toLowerCase()]; + if (!model) { + throw new Error(`No model found for module: ${module}`); + } + return model; + } + + /** + * Bulk insert data for a specific module + * @param {string} module - Module name + * @param {Array} data - Array of records to insert + * @returns {Promise} Inserted records + */ + async bulkInsert(module, data) { + try { + const model = this.getModel(module); + console.log(`💾 Bulk inserting ${data.length} records for ${module}`); + + const result = await model.bulkCreate(data, { + ignoreDuplicates: true, + validate: true + }); + + console.log(`✅ Successfully inserted ${result.length} records for ${module}`); + return result; + } catch (error) { + console.error(`❌ Error bulk inserting ${module}:`, error.message); + throw error; + } + } + + /** + * Clear existing data for a user and module + * @param {string} userId - User UUID + * @param {string} module - Module name + * @param {string} jobId - Bulk job ID + * @returns {Promise} Number of deleted records + */ + async clearUserData(userId, module, jobId) { + try { + const model = this.getModel(module); + console.log(`🗑️ Clearing existing data for user ${userId}, module ${module}, job ${jobId}`); + + const result = await model.destroy({ + where: { + user_id: userId, + provider: 'zoho', + bulk_job_id: jobId + } + }); + + console.log(`✅ Cleared ${result} existing records for ${module}`); + return result; + } catch (error) { + console.error(`❌ Error clearing data for ${module}:`, error.message); + throw error; + } + } + + /** + * Get data for a user and module + * @param {string} userId - User UUID + * @param {string} module - Module name + * @param {Object} options - Query options + * @returns {Promise} Records + */ + async getUserData(userId, module, options = {}) { + try { + const model = this.getModel(module); + const { limit = 100, offset = 0, orderBy = 'created_time', orderDirection = 'DESC' } = options; + + const records = await model.findAll({ + where: { + user_id: userId, + provider: 'zoho' + }, + limit: parseInt(limit), + offset: parseInt(offset), + order: [[orderBy, orderDirection]] + }); + + return records; + } catch (error) { + console.error(`❌ Error getting user data for ${module}:`, error.message); + throw error; + } + } + + /** + * Get count of records for a user and module + * @param {string} userId - User UUID + * @param {string} module - Module name + * @returns {Promise} Record count + */ + async getUserDataCount(userId, module) { + try { + const model = this.getModel(module); + + const count = await model.count({ + where: { + user_id: userId, + provider: 'zoho' + } + }); + + return count; + } catch (error) { + console.error(`❌ Error getting count for ${module}:`, error.message); + throw error; + } + } + + // Bulk Read Jobs methods + /** + * Create a new bulk read job record + * @param {Object} jobData - Job data + * @returns {Promise} Created job + */ + async createBulkReadJob(jobData) { + try { + console.log('📝 Creating bulk read job:', jobData.id); + + const job = await ZohoBulkReadJobs.create({ + id: jobData.id, + user_id: jobData.user_id, + provider: 'zoho', + module: jobData.module, + operation: jobData.operation, + state: jobData.state, + file_type: jobData.file_type, + download_url: jobData.download_url, + records_count: jobData.records_count || 0, + status: 'pending' + }); + + console.log('✅ Bulk read job created successfully'); + return job; + } catch (error) { + console.error('❌ Error creating bulk read job:', error.message); + throw error; + } + } + + /** + * Update bulk read job status + * @param {string} jobId - Job ID + * @param {Object} updateData - Update data + * @returns {Promise} Updated job + */ + async updateBulkReadJob(jobId, updateData) { + try { + console.log(`🔄 Updating bulk read job ${jobId}:`, updateData); + + const [updatedRows] = await ZohoBulkReadJobs.update(updateData, { + where: { id: jobId } + }); + + if (updatedRows === 0) { + throw new Error(`Job ${jobId} not found`); + } + + const updatedJob = await ZohoBulkReadJobs.findByPk(jobId); + console.log('✅ Bulk read job updated successfully'); + return updatedJob; + } catch (error) { + console.error('❌ Error updating bulk read job:', error.message); + throw error; + } + } + + /** + * Get bulk read job by ID + * @param {string} jobId - Job ID + * @returns {Promise} Job record + */ + async getBulkReadJob(jobId) { + try { + const job = await ZohoBulkReadJobs.findByPk(jobId); + return job; + } catch (error) { + console.error('❌ Error getting bulk read job:', error.message); + throw error; + } + } + + /** + * Get bulk read jobs for a user + * @param {string} userId - User UUID + * @param {Object} options - Query options + * @returns {Promise} Job records + */ + async getUserBulkReadJobs(userId, options = {}) { + try { + const { limit = 50, offset = 0, status } = options; + + const whereClause = { + user_id: userId, + provider: 'zoho' + }; + + if (status) { + whereClause.status = status; + } + + const jobs = await ZohoBulkReadJobs.findAll({ + where: whereClause, + limit: parseInt(limit), + offset: parseInt(offset), + order: [['created_at', 'DESC']] + }); + + return jobs; + } catch (error) { + console.error('❌ Error getting user bulk read jobs:', error.message); + throw error; + } + } +} + +module.exports = new ZohoBulkReadRepository(); diff --git a/src/db/migrations/002_create_user_auth_tokens.sql b/src/db/migrations/002_create_user_auth_tokens.sql index 6c8a875..5d5d366 100644 --- a/src/db/migrations/002_create_user_auth_tokens.sql +++ b/src/db/migrations/002_create_user_auth_tokens.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS user_auth_tokens ( id INT AUTO_INCREMENT PRIMARY KEY, - user_id INT NOT NULL, + user_id CHAR(36) NOT NULL, service_name ENUM('zoho','keka','bamboohr','hubspot','other') NOT NULL, access_token TEXT NOT NULL, refresh_token TEXT NULL, diff --git a/src/db/migrations/003_create_zoho_contacts_bulk.sql b/src/db/migrations/003_create_zoho_contacts_bulk.sql new file mode 100644 index 0000000..613e933 --- /dev/null +++ b/src/db/migrations/003_create_zoho_contacts_bulk.sql @@ -0,0 +1,26 @@ +-- Migration: Create Zoho Contacts Bulk Table +-- Description: Creates table for storing bulk read contacts data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_contacts_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + first_name VARCHAR(255), + last_name VARCHAR(255), + email VARCHAR(255), + phone VARCHAR(255), + mobile VARCHAR(255), + lead_source VARCHAR(255), + account_name VARCHAR(255), + owner VARCHAR(255), + created_time DATETIME, + modified_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/004_create_zoho_leads_bulk.sql b/src/db/migrations/004_create_zoho_leads_bulk.sql new file mode 100644 index 0000000..227bdfe --- /dev/null +++ b/src/db/migrations/004_create_zoho_leads_bulk.sql @@ -0,0 +1,25 @@ +-- Migration: Create Zoho Leads Bulk Table +-- Description: Creates table for storing bulk read leads data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_leads_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + first_name VARCHAR(255), + last_name VARCHAR(255), + company VARCHAR(255), + lead_source VARCHAR(255), + lead_status VARCHAR(255), + owner VARCHAR(255), + email VARCHAR(255), + phone VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/005_create_zoho_accounts_bulk.sql b/src/db/migrations/005_create_zoho_accounts_bulk.sql new file mode 100644 index 0000000..f7a7c6a --- /dev/null +++ b/src/db/migrations/005_create_zoho_accounts_bulk.sql @@ -0,0 +1,24 @@ +-- Migration: Create Zoho Accounts Bulk Table +-- Description: Creates table for storing bulk read accounts data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_accounts_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + account_name VARCHAR(255), + phone VARCHAR(255), + website VARCHAR(255), + industry VARCHAR(255), + ownership VARCHAR(255), + annual_revenue DECIMAL(15,2), + owner VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/006_create_zoho_tasks_bulk.sql b/src/db/migrations/006_create_zoho_tasks_bulk.sql new file mode 100644 index 0000000..8982f37 --- /dev/null +++ b/src/db/migrations/006_create_zoho_tasks_bulk.sql @@ -0,0 +1,23 @@ +-- Migration: Create Zoho Tasks Bulk Table +-- Description: Creates table for storing bulk read tasks data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_tasks_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + subject VARCHAR(500), + owner VARCHAR(255), + status VARCHAR(255), + priority VARCHAR(255), + due_date DATETIME, + what_id VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/007_create_zoho_vendors_bulk.sql b/src/db/migrations/007_create_zoho_vendors_bulk.sql new file mode 100644 index 0000000..8e08dfa --- /dev/null +++ b/src/db/migrations/007_create_zoho_vendors_bulk.sql @@ -0,0 +1,22 @@ +-- Migration: Create Zoho Vendors Bulk Table +-- Description: Creates table for storing bulk read vendors data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_vendors_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + vendor_name VARCHAR(255), + email VARCHAR(255), + phone VARCHAR(255), + website VARCHAR(255), + owner VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/008_create_zoho_invoices_bulk.sql b/src/db/migrations/008_create_zoho_invoices_bulk.sql new file mode 100644 index 0000000..2ce5260 --- /dev/null +++ b/src/db/migrations/008_create_zoho_invoices_bulk.sql @@ -0,0 +1,25 @@ +-- Migration: Create Zoho Invoices Bulk Table +-- Description: Creates table for storing bulk read invoices data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_invoices_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + invoice_number VARCHAR(255), + invoice_date DATE, + due_date DATE, + status VARCHAR(255), + total DECIMAL(15,2), + balance DECIMAL(15,2), + account_name VARCHAR(255), + owner VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/009_create_zoho_sales_orders_bulk.sql b/src/db/migrations/009_create_zoho_sales_orders_bulk.sql new file mode 100644 index 0000000..032540f --- /dev/null +++ b/src/db/migrations/009_create_zoho_sales_orders_bulk.sql @@ -0,0 +1,24 @@ +-- Migration: Create Zoho Sales Orders Bulk Table +-- Description: Creates table for storing bulk read sales orders data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_sales_orders_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + sales_order_number VARCHAR(255), + subject VARCHAR(500), + status VARCHAR(255), + due_date DATE, + total DECIMAL(15,2), + account_name VARCHAR(255), + owner VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/010_create_zoho_purchase_orders_bulk.sql b/src/db/migrations/010_create_zoho_purchase_orders_bulk.sql new file mode 100644 index 0000000..4c358a6 --- /dev/null +++ b/src/db/migrations/010_create_zoho_purchase_orders_bulk.sql @@ -0,0 +1,24 @@ +-- Migration: Create Zoho Purchase Orders Bulk Table +-- Description: Creates table for storing bulk read purchase orders data from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_purchase_orders_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + zoho_id VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + purchase_order_number VARCHAR(255), + subject VARCHAR(500), + vendor_name VARCHAR(255), + status VARCHAR(255), + due_date DATE, + total DECIMAL(15,2), + owner VARCHAR(255), + created_time DATETIME, + bulk_job_id VARCHAR(255), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_zoho_id (zoho_id) +); diff --git a/src/db/migrations/011_create_zoho_bulk_read_jobs.sql b/src/db/migrations/011_create_zoho_bulk_read_jobs.sql new file mode 100644 index 0000000..907bb99 --- /dev/null +++ b/src/db/migrations/011_create_zoho_bulk_read_jobs.sql @@ -0,0 +1,22 @@ +-- Migration: Create Zoho Bulk Read Jobs Table +-- Description: Creates table for tracking bulk read jobs from Zoho CRM + +CREATE TABLE IF NOT EXISTS zoho_bulk_read_jobs ( + id VARCHAR(255) PRIMARY KEY, + user_id VARCHAR(255) NOT NULL, + provider VARCHAR(50) NOT NULL DEFAULT 'zoho', + module VARCHAR(100) NOT NULL, + operation VARCHAR(50) NOT NULL, + state VARCHAR(50) NOT NULL, + file_type VARCHAR(10) NOT NULL, + download_url TEXT, + records_count INT DEFAULT 0, + processed_count INT DEFAULT 0, + status VARCHAR(50) DEFAULT 'pending', + error_message TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider (user_id, provider), + INDEX idx_module (module), + INDEX idx_status (status) +); diff --git a/src/integrations/zoho/handler.js b/src/integrations/zoho/handler.js index 8624207..ebeae08 100644 --- a/src/integrations/zoho/handler.js +++ b/src/integrations/zoho/handler.js @@ -23,6 +23,7 @@ class ZohoHandler { // Handle Zoho CRM webhook events async handleCrmWebhook(req, res) { + console.log('handleCrmWebhook', JSON.stringify(req.body)); try { const signature = req.headers['x-zoho-signature']; const payload = JSON.stringify(req.body); @@ -229,6 +230,372 @@ class ZohoHandler { // Add your business logic here } + // Handle Zoho Bulk Read webhook events + async handleBulkReadWebhook(req, res) { + try { + console.log('=== ZOHO BULK READ WEBHOOK RECEIVED ==='); + console.log('Headers:', JSON.stringify(req.headers, null, 2)); + console.log('Body:', JSON.stringify(req.body, null, 2)); + console.log('Query Parameters:', JSON.stringify(req.query, null, 2)); + console.log('Request Method:', req.method); + console.log('Request URL:', req.url); + console.log('Request IP:', req.ip); + console.log('User Agent:', req.get('User-Agent')); + console.log('Content Type:', req.get('Content-Type')); + console.log('Content Length:', req.get('Content-Length')); + console.log('=========================================='); + + // Extract access token from query parameters + const accessToken = req.query.access_token; + if (!accessToken) { + console.error('❌ No access token provided in query parameters'); + return res.status(400).json({ + status: 'error', + message: 'Access token is required in query parameters' + }); + } + console.log('🔑 Access token received from query parameters'); + + // Optional: Verify webhook signature if provided + const signature = req.headers['x-zoho-signature']; + if (signature) { + const payload = JSON.stringify(req.body); + if (!this.verifyWebhookSignature(payload, signature)) { + logger.warn('Invalid Zoho Bulk Read webhook signature', { + correlationId: logger.getCorrelationId(req), + ip: req.ip + }); + return res.status(401).json({ status: 'error', message: 'Invalid signature' }); + } + console.log('✅ Webhook signature verified successfully'); + } else { + console.log('⚠️ No webhook signature provided - skipping verification'); + } + + // Log the bulk read data structure + const { job_id, operation, state, query, fileType, result } = req.body; + console.log('req body after extracting', req.body); + + console.log('📊 BULK READ JOB DETAILS:'); + console.log('- Job ID:', job_id); + console.log('- Operation:', operation); + console.log('- State:', state); + console.log('- Module:', query?.module); + console.log('- Fields:', query?.fields); + console.log('- File Type:', fileType); + console.log('- Records Count:', result?.count); + console.log('- Download URL:', result?.download_url); + console.log('- More Records:', result?.more_records); + + // Note: The actual data will be fetched from the download URL + console.log('📋 BULK READ DATA:'); + console.log('- Data will be fetched from download URL:', result?.download_url); + console.log('- Total records to process:', result?.count); + + // Log to structured logger as well + logger.info('Zoho Bulk Read webhook received', { + correlationId: logger.getCorrelationId(req), + jobId: job_id, + operation, + state, + module: query?.module, + recordsCount: result?.count, + hasDownloadUrl: !!result?.download_url, + fileType + }); + + // Process different bulk read events + switch (state) { + case 'COMPLETED': + console.log('✅ Bulk read job completed successfully'); + await this.handleBulkReadCompleted(req.body, accessToken); + break; + case 'FAILED': + console.log('❌ Bulk read job failed'); + await this.handleBulkReadFailed(req.body, accessToken); + break; + case 'IN_PROGRESS': + console.log('⏳ Bulk read job in progress'); + await this.handleBulkReadInProgress(req.body, accessToken); + break; + case 'CREATED': + console.log('🆕 Bulk read job created'); + await this.handleBulkReadCreated(req.body, accessToken); + break; + default: + console.log('❓ Unknown bulk read state:', state); + logger.warn('Unknown Zoho Bulk Read state', { state, jobId: job_id }); + } + + res.json({ + status: 'success', + message: 'Bulk read webhook processed', + receivedAt: new Date().toISOString() + }); + } catch (error) { + console.error('❌ Error processing bulk read webhook:', error); + logger.error('Zoho Bulk Read webhook processing failed', { + correlationId: logger.getCorrelationId(req), + error: error.message, + stack: error.stack + }); + res.status(500).json({ + status: 'error', + message: 'Bulk read webhook processing failed', + error: error.message + }); + } + } + + // Helper function to get user data from access token + async getUserFromAccessToken(accessToken) { + try { + const jwtService = require('../../auth/jwt.service'); + const { decrypt } = require('../../utils/crypto'); + + // Decrypt the access token if it's encrypted + let decryptedToken; + try { + decryptedToken = decrypt(accessToken); + } catch (error) { + // If decryption fails, assume it's already decrypted + decryptedToken = accessToken; + } + + // Verify and decode the JWT token using the same service + const decoded = jwtService.verify(decryptedToken); + + // Get user data from database using the user ID from token + const userRepository = require('../../data/repositories/userRepository'); + const user = await userRepository.findById(decoded.id); + + if (!user) { + throw new Error('User not found'); + } + + console.log(`👤 User authenticated: ${user.uuid} (${user.email})`); + return user; + } catch (error) { + console.error('❌ Error getting user from access token:', error); + throw error; + } + } + + // Bulk Read Event Handlers + async handleBulkReadCompleted(data, accessToken) { + console.log('🎉 Processing completed bulk read job:', data.job_id); + logger.info('Bulk read job completed', { + jobId: data.job_id, + recordsCount: data.records_count, + downloadUrl: data.download_url + }); + + try { + // Import services + const CsvService = require('../../services/csvService'); + const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository'); + const userAuthTokenRepo = require('../../data/repositories/userAuthTokenRepository'); + const { decrypt } = require('../../utils/crypto'); + + const csvService = new CsvService(); + + // Extract job details + const { job_id, query, result, operation, state, fileType } = data; + const module = query.module; + const downloadUrl = result.download_url; + const recordsCount = result.count; + + console.log(`📊 Processing bulk read for module: ${module}`); + console.log(`📥 Download URL: ${downloadUrl}`); + console.log(`📈 Records count: ${recordsCount}`); + + // Get user from access token + let user; + try { + user = await this.getUserFromAccessToken(accessToken); + console.log(`👤 User authenticated: ${user.uuid} (${user.email})`); + } catch (error) { + console.error('❌ Failed to authenticate user from access token:', error); + await ZohoBulkReadRepository.updateBulkReadJob(job_id, { + status: 'failed', + error_message: 'Failed to authenticate user from access token' + }); + return; + } + + const userId = user.uuid; + + // Get or create job record + let jobRecord = await ZohoBulkReadRepository.getBulkReadJob(job_id); + + if (!jobRecord) { + console.log('📝 Job record not found, creating new one...'); + // Create job record with the webhook data + const jobData = { + id: job_id, + user_id: userId, + module: query.module, + operation: operation, + state: state, + file_type: fileType, + download_url: result.download_url, + records_count: result.count + }; + + jobRecord = await ZohoBulkReadRepository.createBulkReadJob(jobData); + console.log('✅ Created new job record'); + } else { + // Update existing job record with user ID if it was unknown + if (jobRecord.user_id === 'unknown') { + await ZohoBulkReadRepository.updateBulkReadJob(job_id, { + user_id: userId + }); + console.log('✅ Updated job record with user ID'); + } + } + + // Get Zoho access token for the user + const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho'); + if (!tokenRecord) { + console.error('❌ No Zoho token found for user:', userId); + await ZohoBulkReadRepository.updateBulkReadJob(job_id, { + status: 'failed', + error_message: 'No Zoho access token found for user' + }); + return; + } + + const zohoAccessToken = decrypt(tokenRecord.accessToken); + console.log('🔑 Zoho access token retrieved successfully'); + + // Fetch CSV data + const csvData = await csvService.fetchCsvData(downloadUrl, zohoAccessToken); + console.log(`📥 Fetched ${csvData.length} records from CSV`); + + // Parse and map data + const mappedData = csvService.parseCsvData(csvData, module, userId, job_id); + console.log(`🔄 Mapped ${mappedData.length} records for database insertion`); + + // Clear existing data for this job + await ZohoBulkReadRepository.clearUserData(userId, module, job_id); + + // Bulk insert data + const insertedRecords = await ZohoBulkReadRepository.bulkInsert(module, mappedData); + console.log(`✅ Successfully inserted ${insertedRecords.length} records`); + + // Update job status + await ZohoBulkReadRepository.updateBulkReadJob(job_id, { + status: 'completed', + processed_count: insertedRecords.length, + state: 'COMPLETED' + }); + + console.log('🎉 Bulk read processing completed successfully'); + logger.info('Bulk read processing completed', { + jobId: job_id, + module, + userId, + recordsProcessed: insertedRecords.length, + totalRecords: recordsCount + }); + + } catch (error) { + console.error('❌ Error processing bulk read completion:', error); + logger.error('Bulk read processing failed', { + jobId: data.job_id, + error: error.message, + stack: error.stack + }); + + // Update job status to failed + try { + const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository'); + await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, { + status: 'failed', + error_message: error.message + }); + } catch (updateError) { + console.error('❌ Error updating job status:', updateError); + } + } + } + + async handleBulkReadFailed(data, accessToken) { + console.log('💥 Processing failed bulk read job:', data.job_id); + logger.error('Bulk read job failed', { + jobId: data.job_id, + errorMessage: data.error_message + }); + + try { + // Get user from access token to update job record + const user = await this.getUserFromAccessToken(accessToken); + const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository'); + + await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, { + status: 'failed', + state: 'FAILED', + error_message: data.error_message || 'Bulk read job failed' + }); + + console.log('✅ Updated job status to failed'); + } catch (error) { + console.error('❌ Error updating failed job status:', error); + } + } + + async handleBulkReadInProgress(data, accessToken) { + console.log('⏳ Bulk read job in progress:', data.job_id); + logger.info('Bulk read job in progress', { + jobId: data.job_id + }); + + try { + // Get user from access token to update job record + const user = await this.getUserFromAccessToken(accessToken); + const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository'); + + await ZohoBulkReadRepository.updateBulkReadJob(data.job_id, { + status: 'in_progress', + state: 'IN_PROGRESS' + }); + + console.log('✅ Updated job status to in_progress'); + } catch (error) { + console.error('❌ Error updating job status:', error); + } + } + + async handleBulkReadCreated(data, accessToken) { + console.log('🆕 Bulk read job created:', data.job_id); + logger.info('Bulk read job created', { + jobId: data.job_id, + module: data.query?.module + }); + + try { + // Get user from access token + const user = await this.getUserFromAccessToken(accessToken); + const ZohoBulkReadRepository = require('../../data/repositories/zohoBulkReadRepository'); + + const jobData = { + id: data.job_id, + user_id: user.uuid, + module: data.query?.module || 'unknown', + operation: data.operation || 'read', + state: data.state || 'CREATED', + file_type: data.fileType || 'csv', + download_url: data.result?.download_url, + records_count: data.result?.count || 0 + }; + + await ZohoBulkReadRepository.createBulkReadJob(jobData); + console.log('✅ Created job record in database'); + } catch (error) { + console.error('❌ Error creating job record:', error); + } + } + // Projects Event Handlers async handleProjectCreated(data) { logger.info('Project created', { projectId: data.id, name: data.name }); diff --git a/src/services/bulkReadService.js b/src/services/bulkReadService.js new file mode 100644 index 0000000..2be5140 --- /dev/null +++ b/src/services/bulkReadService.js @@ -0,0 +1,304 @@ +const axios = require('axios'); +const ZohoBulkReadRepository = require('../data/repositories/zohoBulkReadRepository'); +const userAuthTokenRepo = require('../data/repositories/userAuthTokenRepository'); +const { decrypt } = require('../utils/crypto'); +const logger = require('../utils/logger'); + +class BulkReadService { + constructor() { + this.baseUrl = 'https://www.zohoapis.com'; + } + + /** + * Initiate a bulk read job for a specific module + * @param {string} userId - User UUID + * @param {string} module - Zoho module name + * @param {Array} fields - Fields to fetch + * @param {Object} options - Additional options + * @returns {Promise} Job details + */ + async initiateBulkRead(userId, module, fields, options = {}) { + try { + console.log(`🚀 Initiating bulk read for user ${userId}, module ${module}`); + + // Get access token + const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho'); + if (!tokenRecord) { + throw new Error('No Zoho access token found for user'); + } + + const accessToken = decrypt(tokenRecord.accessToken); + + // Prepare bulk read request + const bulkReadData = { + callback: { + url: `${process.env.API_BASE_URL || 'http://localhost:3000'}/api/v1/integrations/webhooks/zoho/bulkread`, + method: 'post' + }, + query: { + module: module, + fields: fields, + page: options.page || 1, + limit: options.limit || 200000 + } + }; + + console.log('📋 Bulk read request:', JSON.stringify(bulkReadData, null, 2)); + + // Make API call to Zoho + const response = await axios.post( + `${this.baseUrl}/crm/bulk/v2/read`, + bulkReadData, + { + headers: { + 'Authorization': `Zoho-oauthtoken ${accessToken}`, + 'Content-Type': 'application/json' + } + } + ); + + const jobData = response.data; + console.log('✅ Bulk read job initiated:', jobData); + + // Store job in database + const jobRecord = await ZohoBulkReadRepository.createBulkReadJob({ + id: jobData.id, + user_id: userId, + module: module, + operation: 'read', + state: 'CREATED', + file_type: 'csv', + records_count: 0, + status: 'pending' + }); + + logger.info('Bulk read job initiated', { + userId, + module, + jobId: jobData.id, + fields: fields.length + }); + + return { + jobId: jobData.id, + status: 'initiated', + message: `Bulk read job initiated for ${module}`, + estimatedTime: this.getEstimatedTime(module, options.limit) + }; + + } catch (error) { + console.error('❌ Error initiating bulk read:', error.message); + logger.error('Bulk read initiation failed', { + userId, + module, + error: error.message + }); + throw error; + } + } + + /** + * Get bulk read job status + * @param {string} userId - User UUID + * @param {string} jobId - Job ID + * @returns {Promise} Job status + */ + async getBulkReadJobStatus(userId, jobId) { + try { + const job = await ZohoBulkReadRepository.getBulkReadJob(jobId); + + if (!job) { + throw new Error('Job not found'); + } + + if (job.user_id !== userId) { + throw new Error('Unauthorized access to job'); + } + + return { + jobId: job.id, + module: job.module, + status: job.status, + state: job.state, + recordsCount: job.records_count, + processedCount: job.processed_count, + createdAt: job.created_at, + updatedAt: job.updated_at, + errorMessage: job.error_message + }; + + } catch (error) { + console.error('❌ Error getting job status:', error.message); + throw error; + } + } + + /** + * Get user's bulk read jobs + * @param {string} userId - User UUID + * @param {Object} options - Query options + * @returns {Promise} Job list + */ + async getUserBulkReadJobs(userId, options = {}) { + try { + const jobs = await ZohoBulkReadRepository.getUserBulkReadJobs(userId, options); + + return jobs.map(job => ({ + jobId: job.id, + module: job.module, + status: job.status, + state: job.state, + recordsCount: job.records_count, + processedCount: job.processed_count, + createdAt: job.created_at, + updatedAt: job.updated_at + })); + + } catch (error) { + console.error('❌ Error getting user jobs:', error.message); + throw error; + } + } + + /** + * Get bulk read data for a module + * @param {string} userId - User UUID + * @param {string} module - Module name + * @param {Object} options - Query options + * @returns {Promise} Data and pagination info + */ + async getBulkReadData(userId, module, options = {}) { + try { + const { page = 1, limit = 100, orderBy = 'created_time', orderDirection = 'DESC' } = options; + + const data = await ZohoBulkReadRepository.getUserData(userId, module, { + limit: parseInt(limit), + offset: (parseInt(page) - 1) * parseInt(limit), + orderBy, + orderDirection + }); + + const totalCount = await ZohoBulkReadRepository.getUserDataCount(userId, module); + + return { + data: data, + pagination: { + page: parseInt(page), + limit: parseInt(limit), + total: totalCount, + pages: Math.ceil(totalCount / parseInt(limit)) + } + }; + + } catch (error) { + console.error('❌ Error getting bulk read data:', error.message); + throw error; + } + } + + /** + * Get estimated processing time for a module + * @param {string} module - Module name + * @param {number} limit - Record limit + * @returns {string} Estimated time + */ + getEstimatedTime(module, limit = 200000) { + const baseTime = { + 'contacts': 2, + 'leads': 2, + 'accounts': 1, + 'tasks': 3, + 'vendors': 1, + 'invoices': 2, + 'sales_orders': 2, + 'purchase_orders': 2 + }; + + const minutes = baseTime[module.toLowerCase()] || 2; + const adjustedMinutes = Math.ceil(minutes * (limit / 100000)); + + return `${adjustedMinutes} minutes`; + } + + /** + * Get available modules for bulk read + * @returns {Array} Available modules + */ + getAvailableModules() { + return [ + { + name: 'contacts', + displayName: 'Contacts', + description: 'Customer contact information', + fields: [ + 'id', 'First_Name', 'Last_Name', 'Email', 'Phone', 'Mobile', + 'Lead_Source', 'Account_Name.Account_Name', 'Owner', 'Created_Time', 'Modified_Time' + ] + }, + { + name: 'leads', + displayName: 'Leads', + description: 'Sales lead information', + fields: [ + 'id', 'First_Name', 'Last_Name', 'Company', 'Lead_Source', + 'Lead_Status', 'Owner', 'Email', 'Phone', 'Created_Time' + ] + }, + { + name: 'accounts', + displayName: 'Accounts', + description: 'Account information', + fields: [ + 'id', 'Account_Name', 'Phone', 'Website', 'Industry', + 'Ownership', 'Annual_Revenue', 'Owner', 'Created_Time' + ] + }, + { + name: 'tasks', + displayName: 'Tasks', + description: 'Task information', + fields: [ + 'id', 'Subject', 'Owner', 'Status', 'Priority', + 'Due_Date', 'What_Id', 'Created_Time' + ] + }, + { + name: 'vendors', + displayName: 'Vendors', + description: 'Vendor information', + fields: [ + 'id', 'Vendor_Name', 'Email', 'Phone', 'Website', 'Owner', 'Created_Time' + ] + }, + { + name: 'invoices', + displayName: 'Invoices', + description: 'Invoice information', + fields: [ + 'id', 'Invoice_Number', 'Invoice_Date', 'Due_Date', 'Status', + 'Total', 'Balance', 'Account_Name.Account_Name', 'Owner', 'Created_Time' + ] + }, + { + name: 'sales_orders', + displayName: 'Sales Orders', + description: 'Sales order information', + fields: [ + 'id', 'Sales_Order_Number', 'Subject', 'Status', 'Due_Date', + 'Total', 'Account_Name.Account_Name', 'Owner', 'Created_Time' + ] + }, + { + name: 'purchase_orders', + displayName: 'Purchase Orders', + description: 'Purchase order information', + fields: [ + 'id', 'Purchase_Order_Number', 'Subject', 'Vendor_Name.Vendor_Name', + 'Status', 'Due_Date', 'Total', 'Owner', 'Created_Time' + ] + } + ]; + } +} + +module.exports = BulkReadService; diff --git a/src/services/csvService.js b/src/services/csvService.js new file mode 100644 index 0000000..21219e0 --- /dev/null +++ b/src/services/csvService.js @@ -0,0 +1,314 @@ +const axios = require('axios'); +const csv = require('csv-parser'); +const { Readable } = require('stream'); +const yauzl = require('yauzl'); +const logger = require('../utils/logger'); + +class CsvService { + constructor() { + this.baseUrl = 'https://www.zohoapis.com'; + } + + /** + * Fetch CSV data from Zoho download URL + * @param {string} downloadUrl - The download URL from webhook + * @param {string} accessToken - Zoho access token + * @returns {Promise} Parsed CSV data as array of objects + */ + async fetchCsvData(downloadUrl, accessToken) { + try { + console.log('📥 Fetching CSV data from:', downloadUrl); + + const fullUrl = downloadUrl.startsWith('http') ? downloadUrl : `${this.baseUrl}${downloadUrl}`; + + const response = await axios.get(fullUrl, { + headers: { + 'Authorization': `Zoho-oauthtoken ${accessToken}`, + 'Accept': 'application/zip, application/octet-stream' + }, + responseType: 'arraybuffer' + }); + + console.log('✅ Data fetched successfully, size:', response.data.length); + console.log('📊 Response status:', response.status); + console.log('📊 Content-Type:', response.headers['content-type']); + + // Check if it's a ZIP file + const buffer = Buffer.from(response.data); + if (buffer.toString('hex', 0, 4) === '504b0304') { + console.log('📦 Detected ZIP file, extracting CSV...'); + return await this.extractCsvFromZip(buffer); + } else { + console.log('📄 Treating as direct CSV...'); + return await this.parseCsvFromBuffer(buffer); + } + } catch (error) { + console.error('❌ Error fetching CSV data:', error.message); + logger.error('CSV fetch failed', { + downloadUrl, + error: error.message, + status: error.response?.status, + statusText: error.response?.statusText + }); + throw error; + } + } + + async extractCsvFromZip(zipBuffer) { + return new Promise((resolve, reject) => { + yauzl.fromBuffer(zipBuffer, { lazyEntries: true }, (err, zipfile) => { + if (err) { + console.error('❌ Error opening ZIP file:', err); + return reject(err); + } + + zipfile.readEntry(); + zipfile.on('entry', (entry) => { + if (entry.fileName.endsWith('.csv')) { + console.log('📄 Found CSV file in ZIP:', entry.fileName); + zipfile.openReadStream(entry, (err, readStream) => { + if (err) { + console.error('❌ Error reading CSV from ZIP:', err); + return reject(err); + } + + const results = []; + const csvStream = readStream.pipe(csv()); + + csvStream.on('data', (data) => { + results.push(data); + }); + + csvStream.on('end', () => { + console.log(`📊 Extracted ${results.length} records from CSV in ZIP`); + resolve(results); + }); + + csvStream.on('error', (error) => { + console.error('❌ Error parsing CSV from ZIP:', error); + reject(error); + }); + }); + } else { + zipfile.readEntry(); + } + }); + + zipfile.on('end', () => { + console.log('📦 Finished reading ZIP file'); + }); + + zipfile.on('error', (error) => { + console.error('❌ ZIP file error:', error); + reject(error); + }); + }); + }); + } + + async parseCsvFromBuffer(buffer) { + return new Promise((resolve, reject) => { + const results = []; + const stream = Readable.from(buffer.toString()).pipe(csv()); + + stream.on('data', (data) => { + results.push(data); + }); + + stream.on('end', () => { + console.log(`📊 Parsed ${results.length} records from CSV`); + resolve(results); + }); + + stream.on('error', (error) => { + console.error('❌ Error parsing CSV:', error); + reject(error); + }); + }); + } + + /** + * Parse CSV data and map to database fields + * @param {Array} csvData - Raw CSV data + * @param {string} module - Zoho module name + * @param {string} userId - User UUID + * @param {string} jobId - Bulk job ID + * @returns {Array} Mapped data ready for database insertion + */ + parseCsvData(csvData, module, userId, jobId) { + console.log(`🔄 Parsing ${csvData.length} records for module: ${module}`); + + const mappedData = csvData.map(record => { + const baseRecord = { + user_id: userId, + provider: 'zoho', + bulk_job_id: jobId + }; + + // Map fields based on module type + switch (module.toLowerCase()) { + case 'contacts': + return this.mapContactFields(record, baseRecord); + case 'leads': + return this.mapLeadFields(record, baseRecord); + case 'accounts': + return this.mapAccountFields(record, baseRecord); + case 'tasks': + return this.mapTaskFields(record, baseRecord); + case 'vendors': + return this.mapVendorFields(record, baseRecord); + case 'invoices': + return this.mapInvoiceFields(record, baseRecord); + case 'sales_orders': + return this.mapSalesOrderFields(record, baseRecord); + case 'purchase_orders': + return this.mapPurchaseOrderFields(record, baseRecord); + default: + console.warn(`⚠️ Unknown module: ${module}`); + return { ...baseRecord, ...record }; + } + }); + + console.log(`✅ Mapped ${mappedData.length} records for ${module}`); + return mappedData; + } + + // Field mapping methods for each module + mapContactFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + first_name: record.First_Name, + last_name: record.Last_Name, + email: record.Email, + phone: record.Phone, + mobile: record.Mobile, + lead_source: record.Lead_Source, + account_name: record['Account_Name.Account_Name'] || record.Account_Name, + owner: record.Owner, + created_time: this.parseDate(record.Created_Time), + modified_time: this.parseDate(record.Modified_Time) + }; + } + + mapLeadFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + first_name: record.First_Name, + last_name: record.Last_Name, + company: record.Company, + lead_source: record.Lead_Source, + lead_status: record.Lead_Status, + owner: record.Owner, + email: record.Email, + phone: record.Phone, + created_time: this.parseDate(record.Created_Time) + }; + } + + mapAccountFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + account_name: record.Account_Name, + phone: record.Phone, + website: record.Website, + industry: record.Industry, + ownership: record.Ownership, + annual_revenue: this.parseDecimal(record.Annual_Revenue), + owner: record.Owner, + created_time: this.parseDate(record.Created_Time) + }; + } + + mapTaskFields(record, baseRecord) { + const mapped = { + ...baseRecord, + zoho_id: record.Id, + subject: record.Subject, + owner: record.Owner, + status: record.Status, + priority: record.Priority, + due_date: this.parseDate(record.Due_Date), + what_id: record.What_Id, + created_time: this.parseDate(record.Created_Time) + }; + + return mapped; + } + + mapVendorFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + vendor_name: record.Vendor_Name, + email: record.Email, + phone: record.Phone, + website: record.Website, + owner: record.Owner, + created_time: this.parseDate(record.Created_Time) + }; + } + + mapInvoiceFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + invoice_number: record.Invoice_Number, + invoice_date: this.parseDate(record.Invoice_Date), + due_date: this.parseDate(record.Due_Date), + status: record.Status, + total: this.parseDecimal(record.Total), + balance: this.parseDecimal(record.Balance), + account_name: record['Account_Name.Account_Name'] || record.Account_Name, + owner: record.Owner, + created_time: this.parseDate(record.Created_Time) + }; + } + + mapSalesOrderFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + sales_order_number: record.Sales_Order_Number, + subject: record.Subject, + status: record.Status, + due_date: this.parseDate(record.Due_Date), + total: this.parseDecimal(record.Total), + account_name: record['Account_Name.Account_Name'] || record.Account_Name, + owner: record.Owner, + created_time: this.parseDate(record.Created_Time) + }; + } + + mapPurchaseOrderFields(record, baseRecord) { + return { + ...baseRecord, + zoho_id: record.id, + purchase_order_number: record.Purchase_Order_Number, + subject: record.Subject, + vendor_name: record['Vendor_Name.Vendor_Name'] || record.Vendor_Name, + status: record.Status, + due_date: this.parseDate(record.Due_Date), + total: this.parseDecimal(record.Total), + owner: record.Owner, + created_time: this.parseDate(record.Created_Time) + }; + } + + // Utility methods + parseDate(dateString) { + if (!dateString || dateString === '') return null; + const date = new Date(dateString); + return isNaN(date.getTime()) ? null : date; + } + + parseDecimal(decimalString) { + if (!decimalString || decimalString === '') return null; + const parsed = parseFloat(decimalString); + return isNaN(parsed) ? null : parsed; + } +} + +module.exports = CsvService;