diff --git a/.env b/.env index 19683b6..818f76c 100644 --- a/.env +++ b/.env @@ -3,12 +3,12 @@ ZOHO_CLIENT_SECRET=772c42df00054668efb6a5839f1874b1dc89e1a127 ZOHO_REDIRECT_URI=centralizedreportingsystem://oauth/callback SALESFORCE_CLIENT_ID=3MVG9GBhY6wQjl2sueQtv2NXMm3EuWtEvOQoeKRAzYcgs2.AWhkCPFitVFPYyUkiLRRdIww2fpr48_Inokd3F SALESFORCE_CLIENT_SECRET=B2B23A4A55B801C74C3C8228E88382121E8D851B67A9282AFA46A28A2EC6E187 -SALESFORCE_REDIRECT_URI=https://512acb53a4a4.ngrok-free.app/api/v1/users/oauth/callback +SALESFORCE_REDIRECT_URI=https://ab06d4e82ca0.ngrok-free.app/api/v1/users/oauth/callback DB_USER=root DB_PASSWORD=Admin@123 DB_NAME=centralized_reporting DB_HOST=127.0.0.1 DB_PORT=3306 -MY_BASE_URL=http://160.187.167.216 +MY_BASE_URL=https://41043ae70837.ngrok-free.app N8N_WEBHOOK_URL=https://workflows.tech4bizsolutions.com N8N_WEBHOOK_ID=04e677f5-ec57-4772-bf12-96f2610d4b9c \ No newline at end of file diff --git a/BULK_READ_REFACTORING_SUMMARY.md b/BULK_READ_REFACTORING_SUMMARY.md new file mode 100644 index 0000000..811b167 --- /dev/null +++ b/BULK_READ_REFACTORING_SUMMARY.md @@ -0,0 +1,266 @@ +# Bulk Read Functionality Refactoring Summary + +## Overview + +The bulk read functionality has been completely refactored to support multiple service providers and services instead of being limited to Zoho only. This refactoring enables the system to handle data from various providers like Zoho, Salesforce, HubSpot, Keka, BambooHR, Intuit/QuickBooks, etc. + +## Key Changes + +### 1. Database Schema Changes + +#### New Provider-Agnostic Tables + +Created new module-specific tables that support multiple providers: + +- `contacts_bulk` - Contact data from any provider/service +- `leads_bulk` - Lead data from any provider/service +- `accounts_bulk` - Account data from any provider/service +- `deals_bulk` - Deal/Opportunity data from any provider/service +- `tasks_bulk` - Task data from any provider/service +- `vendors_bulk` - Vendor data from any provider/service +- `invoices_bulk` - Invoice data from any provider/service +- `sales_orders_bulk` - Sales order data from any provider/service +- `purchase_orders_bulk` - Purchase order data from any provider/service +- `employees_bulk` - Employee/HR data from any provider/service +- `products_bulk` - Product/inventory data from any provider/service +- `customers_bulk` - Customer data from any provider/service + +#### New Job Tracking Table + +- `bulk_read_jobs` - Provider-agnostic job tracking table replacing `zoho_bulk_read_jobs` + +#### Key Schema Features + +- **Provider Field**: Identifies the service provider (zoho, salesforce, hubspot, etc.) +- **Service Field**: Identifies the specific service within a provider (crm, books, hr, etc.) +- **Optional Fields**: Most data fields are now optional/nullable to accommodate varying data structures +- **Raw Data Field**: JSON field to store original response data +- **Enhanced Indexing**: Composite indexes on (user_uuid, provider, service) for efficient querying + +### 2. API Endpoint Changes + +#### Updated Routes in `bulkReadRoutes.js` + +**Before:** +```javascript +POST /api/v1/bulk-read/initiate +Body: { module, fields, page, limit } + +GET /api/v1/bulk-read/data/:module +``` + +**After:** +```javascript +POST /api/v1/bulk-read/initiate +Body: { provider, service, module, fields, page, limit } + +GET /api/v1/bulk-read/data/:provider/:service/:module +GET /api/v1/bulk-read/providers +GET /api/v1/bulk-read/modules/:provider/:service +``` + +#### New Query Parameters +- Added `provider`, `service`, `module` filters for job listing +- Enhanced filtering capabilities for multi-provider support + +### 3. Service Layer Refactoring + +#### BulkReadService Updates + +**Provider Configuration System:** +- Configurable provider endpoints and authentication methods +- Support for multiple services per provider +- Provider-specific request formatting + +**Supported Providers & Services:** +```javascript +{ + zoho: { services: ['crm', 'books', 'inventory'] }, + salesforce: { services: ['crm'] }, + hubspot: { services: ['crm'] }, + keka: { services: ['hr'] }, + bamboohr: { services: ['hr'] }, + intuit: { services: ['accounting'] }, + quickbooks: { services: ['accounting'] } +} +``` + +**Key Methods:** +- `initiateBulkRead(userId, provider, service, module, fields, options)` +- `getBulkReadData(userId, provider, service, module, options)` +- `getAvailableProviders()` - Returns all supported providers/services +- `getAvailableModules(provider, service)` - Returns modules for specific provider/service + +#### Provider-Specific Adapters + +- **Request Formatting**: Each provider has custom request structure +- **Authentication Headers**: Provider-specific auth token handling +- **Response Parsing**: Custom job ID extraction per provider + +### 4. Repository Layer Changes + +#### New `bulkReadRepository.js` + +Replaces provider-specific repositories with a unified repository that: +- Handles multiple table operations +- Supports provider/service filtering +- Provides bulk data insertion capabilities +- Includes error handling for non-existent tables + +#### Key Methods +- `createBulkReadJob()` - Creates jobs in new schema +- `getUserData(userId, provider, service, module)` - Gets filtered data +- `insertBulkData(module, data)` - Inserts processed webhook data + +### 5. Data Models + +#### New Sequelize Models + +Created `contactsBulk.js` and `bulkReadJobs.js` models following the new schema structure. + +#### Features +- Provider/service aware models +- JSON field support for raw data +- Comprehensive indexing strategy +- Foreign key relationships maintained + +## Migration Path + +### Database Migrations (Files 014-026) + +1. **014-022**: Core module tables (contacts, leads, accounts, deals, tasks, vendors, invoices, sales_orders, purchase_orders) +2. **023**: New bulk_read_jobs table +3. **024-026**: Additional provider tables (employees, products, customers) + +### Backward Compatibility + +The refactoring maintains API compatibility for existing Zoho integrations while extending support for new providers. + +## Provider-Specific Implementation Details + +### Zoho Integration +- **CRM Service**: contacts, leads, accounts, deals, tasks, vendors +- **Books Service**: invoices, sales_orders, purchase_orders, customers, vendors +- **Inventory Service**: products, customers, vendors, sales_orders, purchase_orders + +### Salesforce Integration +- **CRM Service**: contacts, leads, accounts, deals, tasks +- Uses SOQL query format for bulk operations +- Bearer token authentication + +### HubSpot Integration +- **CRM Service**: contacts, leads, accounts, deals +- REST API with properties-based queries +- Bearer token authentication + +### HR Providers (Keka, BambooHR) +- **HR Service**: employees +- Employee data with comprehensive fields (hire_date, department, salary, etc.) + +### Accounting Providers (Intuit, QuickBooks) +- **Accounting Service**: customers, vendors, invoices, products +- Financial data with proper decimal handling + +## Benefits of Refactoring + +1. **Scalability**: Easy to add new providers and services +2. **Maintainability**: Single codebase handles all providers +3. **Flexibility**: Optional fields accommodate varying data structures +4. **Performance**: Optimized indexing for multi-provider queries +5. **Extensibility**: JSON raw_data field preserves original data +6. **Consistency**: Unified API across all providers + +## Usage Examples + +### Initiating a Bulk Read Job + +```javascript +// Zoho CRM Contacts +POST /api/v1/bulk-read/initiate +{ + "provider": "zoho", + "service": "crm", + "module": "contacts", + "fields": ["first_name", "last_name", "email", "phone"] +} + +// Salesforce CRM Leads +POST /api/v1/bulk-read/initiate +{ + "provider": "salesforce", + "service": "crm", + "module": "leads", + "fields": ["FirstName", "LastName", "Company", "Email"] +} + +// Keka HR Employees +POST /api/v1/bulk-read/initiate +{ + "provider": "keka", + "service": "hr", + "module": "employees", + "fields": ["employee_id", "first_name", "last_name", "department"] +} +``` + +### Retrieving Data + +```javascript +// Get Zoho CRM contacts +GET /api/v1/bulk-read/data/zoho/crm/contacts + +// Get Salesforce leads +GET /api/v1/bulk-read/data/salesforce/crm/leads + +// Get employee data from Keka +GET /api/v1/bulk-read/data/keka/hr/employees +``` + +### Discovery Endpoints + +```javascript +// Get all available providers +GET /api/v1/bulk-read/providers + +// Get modules for Zoho CRM +GET /api/v1/bulk-read/modules/zoho/crm + +// Get modules for Salesforce CRM +GET /api/v1/bulk-read/modules/salesforce/crm +``` + +## Next Steps + +1. **Run Database Migrations**: Execute migrations 014-026 to create new schema +2. **Update Webhook Handlers**: Modify webhook processors to use new repository methods +3. **Test Provider Integrations**: Validate each provider's bulk read functionality +4. **Update Documentation**: Update API documentation with new endpoints +5. **Monitor Performance**: Ensure new indexing strategy performs well with real data + +## Files Modified/Created + +### New Migrations +- `src/db/migrations/014_create_contacts_bulk.sql` +- `src/db/migrations/015_create_leads_bulk.sql` +- `src/db/migrations/016_create_accounts_bulk.sql` +- `src/db/migrations/017_create_deals_bulk.sql` +- `src/db/migrations/018_create_tasks_bulk.sql` +- `src/db/migrations/019_create_vendors_bulk.sql` +- `src/db/migrations/020_create_invoices_bulk.sql` +- `src/db/migrations/021_create_sales_orders_bulk.sql` +- `src/db/migrations/022_create_purchase_orders_bulk.sql` +- `src/db/migrations/023_create_bulk_read_jobs.sql` +- `src/db/migrations/024_create_employees_bulk.sql` +- `src/db/migrations/025_create_products_bulk.sql` +- `src/db/migrations/026_create_customers_bulk.sql` + +### Updated Files +- `src/api/routes/bulkReadRoutes.js` - Updated for multi-provider support +- `src/services/bulkReadService.js` - Complete refactoring for provider-agnostic operations + +### New Files +- `src/data/repositories/bulkReadRepository.js` - Unified repository for all providers +- `src/data/models/contactsBulk.js` - Example model for new schema +- `src/data/models/bulkReadJobs.js` - New job tracking model + +This refactoring provides a solid foundation for supporting multiple service providers while maintaining clean, maintainable code architecture. diff --git a/docs/BULK_READ_CORRECT_FLOW.md b/docs/BULK_READ_CORRECT_FLOW.md new file mode 100644 index 0000000..6355abd --- /dev/null +++ b/docs/BULK_READ_CORRECT_FLOW.md @@ -0,0 +1,380 @@ +# Bulk Read - Correct Flow with Provider Job IDs + +## Overview + +The bulk read process properly handles **two different job IDs**: +1. **Backend Job ID** - Created by backend for tracking +2. **Provider Job ID** - Returned by provider (Salesforce/Zoho) after job initiation + +--- + +## Complete Flow + +### **Step 1: Backend Initiates Request** + +User calls backend API: +```javascript +POST /api/v1/bulk-read/initiate +Headers: { Authorization: "Bearer user_jwt" } +Body: { + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName", "Email"] +} +``` + +**Backend creates tracking job:** +```javascript +const backendJobId = "salesforce_contacts_1698765432_abc123"; + +await BulkReadRepository.createBulkReadJob({ + id: backendJobId, // Backend's tracking ID + user_uuid: userId, + provider: "salesforce", + service: "crm", + module: "contacts", + status: "pending" +}); +``` + +**Backend sends to n8n:** +```javascript +POST https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://yourorg.my.salesforce.com + +Body: { + provider: "salesforce", + service: "crm", + module: "contacts", + fields: ["Id", "FirstName", "LastName", "Email"], + provider_access_token: "decrypted_token", + backend_access_token: "user_jwt", + callback_url: "https://backend.com/api/v1/bulk-read/webhook/callback", + job_id: "salesforce_contacts_1698765432_abc123", // Backend's job ID + user_id: "user-uuid-here" +} +``` + +--- + +### **Step 2: n8n Initiates Provider Job** + +**n8n calls Salesforce API:** +```javascript +POST https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query +Headers: { Authorization: "Bearer provider_access_token" } +Body: { + operation: "query", + query: "SELECT Id,FirstName,LastName,Email FROM Contact" +} + +// Salesforce Response: +{ + "id": "7504x00000AbCdEf", // ← PROVIDER'S JOB ID + "state": "UploadComplete", + "object": "Contact", + "createdDate": "2024-01-15T10:30:00.000Z" +} +``` + +**n8n stores both IDs:** +- `backend_job_id`: "salesforce_contacts_1698765432_abc123" (from request) +- `provider_job_id`: "7504x00000AbCdEf" (from Salesforce) + +--- + +### **Step 3: n8n Monitors Job** + +**n8n polls Salesforce:** +```javascript +GET https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf +Headers: { Authorization: "Bearer provider_access_token" } + +// Response (every 5 seconds until complete): +{ + "id": "7504x00000AbCdEf", + "state": "JobComplete", // ← Job finished! + "numberRecordsProcessed": 15420 +} +``` + +--- + +### **Step 4: n8n Sends Callback with PROVIDER Job ID** + +**When job completes, n8n calls backend:** +```javascript +POST https://backend.com/api/v1/bulk-read/webhook/callback +Headers: { Content-Type: "application/json" } +Body: { + "job_id": "7504x00000AbCdEf", // ← PROVIDER's job ID (IMPORTANT!) + "backend_job_id": "salesforce_contacts_1698765432_abc123", // ← Backend's job ID + "user_id": "user-uuid-here", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "provider_access_token": "token_for_download", + "metadata": { + "instance_url": "https://yourorg.my.salesforce.com", + "state": "JobComplete", + "numberRecordsProcessed": 15420 + } +} +``` + +**Key Points:** +- `job_id` = **Provider's job ID** (used to download results) +- `backend_job_id` = **Backend's tracking ID** (optional, for finding our record) +- `user_id` = Required to find backend job if `backend_job_id` not provided + +--- + +### **Step 5: Backend Processes Callback** + +**Backend finds its tracking record:** +```javascript +// Option 1: Use backend_job_id if provided +if (backend_job_id) { + job = await BulkReadRepository.getBulkReadJob(backend_job_id); +} +// Option 2: Find by user + provider + module +else { + const jobs = await BulkReadRepository.getUserBulkReadJobs(user_id, { + provider: "salesforce", + service: "crm", + module: "contacts", + status: "pending", + limit: 1 + }); + job = jobs[0]; +} +``` + +**Backend builds download URL using PROVIDER job_id:** +```javascript +const providerJobId = "7504x00000AbCdEf"; // From callback +const instanceUrl = metadata.instance_url; + +const downloadUrl = `${instanceUrl}/services/data/v57.0/jobs/query/${providerJobId}/results`; +// Result: https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results +``` + +**Backend downloads CSV using provider job_id:** +```javascript +GET https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results +Headers: { Authorization: "Bearer provider_access_token" } + +// Response: CSV file +"Id","FirstName","LastName","Email" +"0035g00000XXXXX","John","Doe","john@example.com" +"0035g00000YYYYY","Jane","Smith","jane@example.com" +``` + +**Backend parses and stores:** +```javascript +// 1. Parse CSV → JSON +const records = [{ + Id: "0035g00000XXXXX", + FirstName: "John", + LastName: "Doe", + Email: "john@example.com" +}, ...]; + +// 2. Map to database schema +const preparedRecords = records.map(record => ({ + external_id: record.Id, + user_uuid: "user-uuid", + provider: "salesforce", + service: "crm", + first_name: record.FirstName, + last_name: record.LastName, + email: record.Email, + raw_data: record, + bulk_job_id: "salesforce_contacts_1698765432_abc123" // Backend's job ID +})); + +// 3. Insert into contacts_bulk table +await BulkReadRepository.insertBulkData("contacts", preparedRecords); + +// 4. Update job status +await BulkReadRepository.updateBulkReadJob("salesforce_contacts_1698765432_abc123", { + status: "completed", + records_count: 15420, + processed_count: 15420, + response_meta: { + provider_job_id: "7504x00000AbCdEf" // Store provider's job ID + } +}); +``` + +--- + +## Two Job IDs Explained + +### **Backend Job ID** +```javascript +"salesforce_contacts_1698765432_abc123" +``` +- Created by backend +- Used for tracking in `bulk_read_jobs` table +- References in `{module}_bulk` tables +- Format: `{provider}_{module}_{timestamp}_{random}` + +### **Provider Job ID** +```javascript +"7504x00000AbCdEf" // Salesforce +"4384050000012345678" // Zoho +``` +- Created by provider (Salesforce/Zoho) +- Used to download results +- Stored in `response_meta.provider_job_id` +- Format varies by provider + +--- + +## Provider-Specific Download URLs + +### **Salesforce** +```javascript +// n8n receives from Salesforce +{ + "id": "7504x00000AbCdEf", + "state": "JobComplete" +} + +// n8n sends to backend +{ + "job_id": "7504x00000AbCdEf", + "metadata": { + "instance_url": "https://yourorg.my.salesforce.com" + } +} + +// Backend constructs download URL +`${instance_url}/services/data/v57.0/jobs/query/${job_id}/results` +// → https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results +``` + +### **Zoho** +```javascript +// n8n receives from Zoho +{ + "details": { + "id": "4384050000012345678", + "state": "COMPLETED" + } +} + +// n8n sends to backend +{ + "job_id": "4384050000012345678", + "metadata": { + "download_url": "https://www.zohoapis.com/crm/bulk/v2/read/4384050000012345678/result" + } +} + +// Backend uses download_url directly or constructs +`https://www.zohoapis.com/crm/bulk/v2/read/${job_id}/result` +``` + +--- + +## n8n Callback Format + +### **Required Fields** +```json +{ + "job_id": "7504x00000AbCdEf", // ✅ Provider's job ID + "user_id": "user-uuid", // ✅ User UUID + "status": "completed", // ✅ Job status + "provider": "salesforce", // ✅ Provider name + "service": "crm", // ✅ Service name + "module": "contacts", // ✅ Module name + "provider_access_token": "token", // ✅ For downloading CSV + "metadata": { // ✅ Provider-specific data + "instance_url": "https://yourorg.my.salesforce.com" + } +} +``` + +### **Optional Fields** +```json +{ + "backend_job_id": "salesforce_contacts_...", // Backend's tracking ID + "backend_access_token": "jwt_token", // For authentication + "error_message": "Error details if failed" // If status is "failed" +} +``` + +--- + +## Example: Complete Salesforce Flow + +### **1. User Request** +```bash +POST /api/v1/bulk-read/initiate +Body: { provider: "salesforce", service: "crm", module: "contacts", fields: [...] } +``` + +### **2. Backend Response** +```json +{ + "status": "success", + "data": { + "jobId": "salesforce_contacts_1698765432_abc123", // Backend's tracking ID + "status": "initiated" + } +} +``` + +### **3. Backend → n8n** +```json +{ + "job_id": "salesforce_contacts_1698765432_abc123", + "user_id": "user-uuid", + "provider": "salesforce", + ... +} +``` + +### **4. n8n → Salesforce** +``` +Creates job → Gets provider_job_id: "7504x00000AbCdEf" +``` + +### **5. n8n → Backend Callback** +```json +{ + "job_id": "7504x00000AbCdEf", // Provider's ID (for download) + "backend_job_id": "salesforce_contacts_1698765432_abc123", // Backend's ID (for tracking) + "metadata": { + "instance_url": "https://yourorg.my.salesforce.com" + } +} +``` + +### **6. Backend Downloads** +``` +URL: https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results +Using: provider_job_id from callback +``` + +### **7. Backend Stores** +``` +Table: contacts_bulk +bulk_job_id: "salesforce_contacts_1698765432_abc123" // Backend's ID +``` + +--- + +## Summary + +✅ **Two job IDs** are used throughout the process +✅ **Backend job ID** for internal tracking +✅ **Provider job ID** for downloading results +✅ n8n sends **provider job ID** in callback +✅ Backend builds download URL using **provider job ID** +✅ Backend stores data with **backend job ID** reference + +This separation allows proper tracking while enabling direct result downloads! 🎯 diff --git a/docs/N8N_BULK_READ_INTEGRATION.md b/docs/N8N_BULK_READ_INTEGRATION.md new file mode 100644 index 0000000..bb2c9bc --- /dev/null +++ b/docs/N8N_BULK_READ_INTEGRATION.md @@ -0,0 +1,510 @@ +# n8n Bulk Read Integration Guide + +## Overview + +The bulk read functionality now uses **n8n as the orchestration layer** to simplify the backend and make it completely provider-agnostic. The backend simply sends job requests to n8n, and n8n handles all provider-specific API interactions and callbacks. + +--- + +## Architecture Flow + +``` +┌─────────────┐ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ Backend │─────▶│ n8n │─────▶│ Provider │ │ Database │ +│ API Server │ │ Workflow │ │ (Salesforce/ │ │ (MySQL) │ +│ │◀─────│ │◀─────│ Zoho) │ │ │ +└─────────────┘ └─────────────┘ └──────────────┘ └─────────────┘ + │ │ + └──────────────────────────────────────────────────────────────────┘ + Stores job & processes data +``` + +### Process Steps: + +1. **User initiates bulk read** → Backend API +2. **Backend sends request** → n8n webhook +3. **n8n calls provider API** (Salesforce, Zoho, etc.) +4. **Provider processes job** asynchronously +5. **n8n receives data** from provider +6. **n8n calls backend webhook** with processed data +7. **Backend stores data** in MySQL tables + +--- + +## 1. Backend → n8n Request + +### Endpoint Configuration + +```javascript +// Environment Variable +N8N_BULK_READ_WEBHOOK_URL=https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3 + +// Or hardcoded in BulkReadService constructor +this.n8nWebhookUrl = 'https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3'; +``` + +### Request Format + +#### URL +``` +POST https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3 +``` + +#### Query Parameters (Salesforce Only) +```javascript +?instance_url=https://yourorg.my.salesforce.com +``` + +#### Request Body +```json +{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName", "Email", "Phone"], + "access_token": "00D5g000008XXXX!AQEAQXXX...", + "callback_url": "https://your-backend.com/api/v1/bulk-read/webhook/callback?access_token=backend_jwt_token", + "job_id": "salesforce_contacts_1698765432_abc123xyz", + "user_id": "550e8400-e29b-41d4-a716-446655440000", + "options": { + "page": 1, + "limit": 10000 + } +} +``` + +### Field Descriptions + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `provider` | string | ✅ | Provider name: `salesforce`, `zoho`, `hubspot`, etc. | +| `service` | string | ✅ | Service type: `crm`, `books`, `hr`, `accounting` | +| `module` | string | ✅ | Module name: `contacts`, `leads`, `accounts`, etc. | +| `fields` | array | ✅ | Array of field names to fetch from provider | +| `access_token` | string | ✅ | Decrypted provider access token | +| `callback_url` | string | ✅ | Backend webhook URL to call when complete | +| `job_id` | string | ✅ | Unique job identifier (generated by backend) | +| `user_id` | string | ✅ | User UUID from backend | +| `options` | object | ❌ | Additional options (page, limit, filters) | + +### Query Parameters + +| Parameter | Required | When | Description | +|-----------|----------|------|-------------| +| `instance_url` | ✅ | Salesforce only | User's Salesforce instance URL | + +--- + +## 2. n8n → Provider API + +### What n8n Should Do + +1. **Receive the webhook request** from backend +2. **Extract parameters** from body and query +3. **Format provider-specific request**: + - Salesforce: Create bulk query job + - Zoho: Create bulk read job + - Others: Provider-specific format +4. **Call provider API** with access token +5. **Poll for job completion** (if needed) +6. **Fetch results** when ready +7. **Call backend callback** with processed data + +### Example: Salesforce Flow in n8n + +```javascript +// Step 1: Parse webhook input +const { provider, service, module, fields, access_token, callback_url, job_id, user_id, options } = $input.item.json; +const instance_url = $input.item.query.instance_url; + +// Step 2: Create Salesforce bulk query +const salesforceObject = mapModuleToObject(module); // contacts → Contact +const query = `SELECT ${fields.join(',')} FROM ${salesforceObject}`; + +const bulkJobResponse = await axios.post( + `${instance_url}/services/data/v57.0/jobs/query`, + { + operation: 'query', + query: query + }, + { + headers: { + 'Authorization': `Bearer ${access_token}`, + 'Content-Type': 'application/json' + } + } +); + +const salesforceJobId = bulkJobResponse.data.id; + +// Step 3: Poll for job completion +let jobComplete = false; +while (!jobComplete) { + const statusResponse = await axios.get( + `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}`, + { + headers: { 'Authorization': `Bearer ${access_token}` } + } + ); + + if (statusResponse.data.state === 'JobComplete') { + jobComplete = true; + } else { + await sleep(5000); // Wait 5 seconds + } +} + +// Step 4: Fetch results +const resultsResponse = await axios.get( + `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}/results`, + { + headers: { + 'Authorization': `Bearer ${access_token}`, + 'Accept': 'application/json' + } + } +); + +const records = resultsResponse.data.records; + +// Step 5: Call backend callback +await axios.post(callback_url, { + job_id: job_id, + status: 'completed', + provider: provider, + service: service, + module: module, + records: records, + metadata: { + salesforce_job_id: salesforceJobId, + state: 'JobComplete', + total_records: records.length, + processing_time: processingTime + } +}); +``` + +--- + +## 3. n8n → Backend Callback + +### When Job Completes + +n8n should call the backend callback URL with the following format: + +### Callback URL +``` +POST https://your-backend.com/api/v1/bulk-read/webhook/callback?access_token=backend_jwt_token +``` + +### Expected Request Body Format + +#### ✅ Success Response + +```json +{ + "job_id": "salesforce_contacts_1698765432_abc123xyz", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "records": [ + { + "Id": "0035g00000XXXXX", + "FirstName": "John", + "LastName": "Doe", + "Email": "john.doe@example.com", + "Phone": "+1234567890", + "Account": { + "Name": "Acme Corp" + }, + "CreatedDate": "2024-01-15T10:30:00.000Z", + "LastModifiedDate": "2024-01-20T14:45:00.000Z" + }, + { + "Id": "0035g00000YYYYY", + "FirstName": "Jane", + "LastName": "Smith", + "Email": "jane.smith@example.com", + "Phone": "+0987654321", + "Account": { + "Name": "Tech Solutions" + }, + "CreatedDate": "2024-01-16T09:15:00.000Z", + "LastModifiedDate": "2024-01-21T11:30:00.000Z" + } + ], + "metadata": { + "salesforce_job_id": "7504x00000AbCdEf", + "state": "JobComplete", + "total_records": 2, + "processing_time": "45 seconds", + "query_executed": "SELECT Id,FirstName,LastName,Email,Phone FROM Contact" + } +} +``` + +#### ❌ Failure Response + +```json +{ + "job_id": "salesforce_contacts_1698765432_abc123xyz", + "status": "failed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "records": [], + "error_message": "INVALID_SESSION_ID: Session expired or invalid", + "metadata": { + "salesforce_job_id": "7504x00000AbCdEf", + "state": "Failed", + "error_code": "INVALID_SESSION_ID", + "failed_at": "2024-01-15T10:35:00.000Z" + } +} +``` + +### Field Descriptions + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `job_id` | string | ✅ | The job_id sent by backend (same as request) | +| `status` | string | ✅ | Job status: `completed`, `failed`, `in_progress` | +| `provider` | string | ✅ | Provider name (same as request) | +| `service` | string | ✅ | Service name (same as request) | +| `module` | string | ✅ | Module name (same as request) | +| `records` | array | ✅ | Array of record objects (empty if failed) | +| `metadata` | object | ❌ | Additional metadata about job execution | +| `error_message` | string | ❌ | Error message (required if status is `failed`) | + +### Record Format + +Each record in the `records` array should contain: +- **All requested fields** from the provider API +- **Original field names** from provider (e.g., `FirstName`, not `first_name`) +- **Nested objects** preserved (e.g., `Account.Name`) +- **Date fields** in ISO 8601 format + +The backend will automatically map these to the standardized database schema. + +--- + +## 4. Backend Processing + +### What Backend Does After Receiving Callback + +1. **Updates job status** in `bulk_read_jobs` table +2. **Maps provider fields** to standardized schema +3. **Inserts records** into module-specific table (e.g., `contacts_bulk`) +4. **Updates processed count** +5. **Sends response** to n8n + +### Automatic Field Mapping + +The backend automatically maps provider-specific fields to standardized fields: + +```javascript +// Salesforce → Database +{ + "FirstName": "John" → "first_name": "John" + "LastName": "Doe" → "last_name": "Doe" + "Email": "john@example.com" → "email": "john@example.com" + "Phone": "+1234567890" → "phone": "+1234567890" + "Account": { "Name": "Acme" } → "account_name": "Acme" + "CreatedDate": "2024-01-15..." → "created_time": "2024-01-15..." +} +``` + +### Database Storage + +Records are stored in module-specific tables with: +- `external_id`: Provider's record ID +- `user_uuid`: User identifier +- `provider`: Provider name +- `service`: Service name +- `raw_data`: Original JSON from provider +- `bulk_job_id`: Job identifier +- All mapped standardized fields + +--- + +## 5. Example: Complete Salesforce Flow + +### User Request +```bash +POST /api/v1/bulk-read/initiate +Authorization: Bearer +Content-Type: application/json + +{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName", "Email", "Phone"] +} +``` + +### Backend → n8n +```bash +POST https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://yourorg.my.salesforce.com + +{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName", "Email", "Phone"], + "access_token": "00D5g000008XXXX!AQEAQXXX...", + "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback?access_token=jwt_123", + "job_id": "salesforce_contacts_1698765432_abc123", + "user_id": "550e8400-e29b-41d4-a716-446655440000", + "options": {} +} +``` + +### n8n Processing +1. Creates Salesforce bulk query job +2. Polls for completion +3. Fetches results + +### n8n → Backend Callback +```bash +POST https://backend.com/api/v1/bulk-read/webhook/callback?access_token=jwt_123 + +{ + "job_id": "salesforce_contacts_1698765432_abc123", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "records": [...], + "metadata": {...} +} +``` + +### Backend Response to User +```json +{ + "status": "success", + "message": "Bulk read job initiated via n8n for salesforce crm contacts. Processing will be handled asynchronously.", + "data": { + "jobId": "salesforce_contacts_1698765432_abc123", + "status": "initiated", + "provider": "salesforce", + "service": "crm", + "estimatedTime": "2 minutes" + } +} +``` + +--- + +## 6. Error Handling + +### n8n Should Handle + +1. **API Errors**: Catch provider API errors and send failed status +2. **Timeout Errors**: Set maximum processing time (e.g., 30 minutes) +3. **Token Expiry**: Detect and report authentication errors +4. **Rate Limiting**: Handle rate limits with retries + +### Example Error Callback + +```json +{ + "job_id": "salesforce_contacts_1698765432_abc123", + "status": "failed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "records": [], + "error_message": "INVALID_SESSION_ID: Session expired or invalid", + "metadata": { + "error_code": "INVALID_SESSION_ID", + "failed_at": "2024-01-15T10:35:00.000Z" + } +} +``` + +--- + +## 7. Provider-Specific Notes + +### Salesforce +- Requires `instance_url` in query parameter +- Uses bulk query API v2.0 +- Field names are PascalCase +- Maximum 50,000 records per job + +### Zoho +- Uses bulk read API v2 +- Field names can vary (First_Name, First Name) +- Supports callback URLs natively +- Maximum 200,000 records per job + +### HubSpot +- Uses CRM API v3 +- Pagination with `after` parameter +- Property-based queries +- Maximum 100 records per page + +--- + +## 8. Configuration + +### Environment Variables + +```bash +# n8n webhook URL +N8N_BULK_READ_WEBHOOK_URL=https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3 + +# Backend callback base URL +API_BASE_URL=https://your-backend.com +``` + +--- + +## Benefits of n8n Integration + +✅ **Simplified Backend**: No provider-specific code in backend +✅ **Centralized Logic**: All provider integrations in n8n +✅ **Easy Updates**: Update workflows without deploying backend +✅ **Visual Workflows**: See and debug flows in n8n UI +✅ **Error Handling**: n8n handles retries and error workflows +✅ **Scalability**: n8n can handle high volume processing + +--- + +## Testing + +### Test n8n Webhook +```bash +curl -X POST 'https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://test.my.salesforce.com' \ +-H 'Content-Type: application/json' \ +-d '{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName"], + "access_token": "test_token", + "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback", + "job_id": "test_job_123", + "user_id": "test_user_456" +}' +``` + +### Test Backend Callback +```bash +curl -X POST 'https://your-backend.com/api/v1/bulk-read/webhook/callback' \ +-H 'Content-Type: application/json' \ +-d '{ + "job_id": "test_job_123", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "records": [ + {"Id": "001", "FirstName": "Test", "LastName": "User"} + ] +}' +``` + diff --git a/docs/N8N_BULK_READ_UPDATED_FLOW.md b/docs/N8N_BULK_READ_UPDATED_FLOW.md new file mode 100644 index 0000000..6915f82 --- /dev/null +++ b/docs/N8N_BULK_READ_UPDATED_FLOW.md @@ -0,0 +1,519 @@ +# n8n Bulk Read Integration - Updated Flow + +## Overview + +The bulk read process is split between **n8n** (job orchestration) and **backend** (data processing): + +- **n8n**: Initiates bulk jobs with providers and monitors completion +- **Backend**: Downloads CSV files, parses data, and stores in MySQL + +--- + +## Complete Flow Diagram + +``` +┌─────────────┐ ┌─────────────┐ ┌──────────────┐ +│ Backend │────▶│ n8n │────▶│ Provider │ +│ (Initiate) │ │ (Monitor) │ │ (Salesforce/ │ +│ │ │ │ │ Zoho) │ +└─────────────┘ └─────────────┘ └──────────────┘ + ▲ │ │ + │ │ │ + │ (Job Complete) (Job Ready) + │ │ │ + │ ▼ │ + │ ┌─────────────┐ │ + └────────────│ Callback │ │ + │ to Backend │ │ + └─────────────┘ │ + │ │ + ▼ │ + ┌─────────────┐ │ + │ Backend │◀─────────────┘ + │ Downloads │ (Download CSV) + │ & Parses │ + │ CSV File │ + └─────────────┘ + │ + ▼ + ┌─────────────┐ + │ MySQL │ + │ Tables │ + └─────────────┘ +``` + +--- + +## Step-by-Step Process + +### 1. Backend → n8n (Initiate Job) + +**Backend sends request to n8n:** + +```bash +POST https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://yourorg.my.salesforce.com +Content-Type: application/json + +{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName", "Email", "Phone"], + "provider_access_token": "00D5g000008XXXX!AQEAQXXX...", + "backend_access_token": "backend_jwt_token_here", + "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback", + "job_id": "salesforce_contacts_1698765432_abc123", + "user_id": "550e8400-e29b-41d4-a716-446655440000", + "options": {} +} +``` + +**Query Parameters:** +- `instance_url` - **Required for Salesforce** (user's instance URL) + +**Body Fields:** +| Field | Type | Description | +|-------|------|-------------| +| `provider` | string | Provider name (salesforce, zoho, etc.) | +| `service` | string | Service type (crm, books, etc.) | +| `module` | string | Module name (contacts, leads, etc.) | +| `fields` | array | Fields to fetch | +| `provider_access_token` | string | **Decrypted** provider token for API calls | +| `backend_access_token` | string | Backend JWT for callback authentication | +| `callback_url` | string | Backend webhook URL (without access_token in URL) | +| `job_id` | string | Unique job identifier | +| `user_id` | string | User UUID | +| `options` | object | Additional options | + +--- + +### 2. n8n Processing + +**What n8n should do:** + +#### For Salesforce: +```javascript +// 1. Extract data from webhook +const { + provider, + service, + module, + fields, + provider_access_token, + callback_url, + job_id +} = $input.item.json; + +const instance_url = $input.item.query.instance_url; + +// 2. Create Salesforce bulk query job +const bulkJobResponse = await axios.post( + `${instance_url}/services/data/v57.0/jobs/query`, + { + operation: 'query', + query: `SELECT ${fields.join(',')} FROM Contact` + }, + { + headers: { + 'Authorization': `Bearer ${provider_access_token}`, + 'Content-Type': 'application/json' + } + } +); + +const salesforceJobId = bulkJobResponse.data.id; + +// 3. Poll for job completion +let jobComplete = false; +let jobState = ''; +let downloadUrl = ''; + +while (!jobComplete) { + const statusResponse = await axios.get( + `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}`, + { + headers: { 'Authorization': `Bearer ${provider_access_token}` } + } + ); + + jobState = statusResponse.data.state; + + if (jobState === 'JobComplete') { + jobComplete = true; + downloadUrl = `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}/results`; + } else if (jobState === 'Failed' || jobState === 'Aborted') { + // Send failure callback + await axios.post(callback_url, { + job_id: job_id, + status: 'failed', + provider: provider, + service: service, + module: module, + provider_access_token: provider_access_token, + error_message: statusResponse.data.errorMessage, + metadata: { + salesforce_job_id: salesforceJobId, + state: jobState + } + }); + return; + } else { + await sleep(5000); // Wait 5 seconds + } +} + +// 4. Send success callback to backend +await axios.post(callback_url, { + job_id: job_id, + status: 'completed', + provider: provider, + service: service, + module: module, + provider_access_token: provider_access_token, + metadata: { + salesforce_job_id: salesforceJobId, + state: jobState, + download_url: downloadUrl, + result_url: downloadUrl + } +}); +``` + +#### For Zoho: +```javascript +// Similar flow but with Zoho bulk read API +// Zoho returns a download URL for zipped CSV file +``` + +--- + +### 3. n8n → Backend Callback + +**When job completes, n8n sends:** + +```bash +POST https://backend.com/api/v1/bulk-read/webhook/callback +Content-Type: application/json + +{ + "job_id": "salesforce_contacts_1698765432_abc123", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "provider_access_token": "00D5g000008XXXX!AQEAQXXX...", + "backend_access_token": "backend_jwt_token_here", + "metadata": { + "salesforce_job_id": "7504x00000AbCdEf", + "state": "JobComplete", + "download_url": "https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results", + "result_url": "https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results" + } +} +``` + +**Required Fields:** +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `job_id` | string | ✅ | Same job_id from initial request | +| `status` | string | ✅ | `completed` or `failed` | +| `provider` | string | ✅ | Provider name | +| `service` | string | ✅ | Service name | +| `module` | string | ✅ | Module name | +| `provider_access_token` | string | ✅ | Provider token for downloading CSV | +| `backend_access_token` | string | ❌ | Backend JWT (optional) | +| `metadata` | object | ✅ | Must include `download_url` or `result_url` | +| `error_message` | string | ❌ | Required if status is `failed` | + +**Metadata Object:** +```json +{ + "salesforce_job_id": "7504x00000AbCdEf", + "state": "JobComplete", + "download_url": "https://...", + "result_url": "https://...", + "total_processing_time": "45 seconds" +} +``` + +--- + +### 4. Backend Processing + +**What backend does after receiving callback:** + +1. **Updates job status** to `processing` +2. **Downloads CSV file** from `metadata.download_url` + - For Salesforce: Plain CSV file + - For Zoho: Zipped CSV file (extracts with JSZip) +3. **Parses CSV** to JSON records +4. **Maps fields** to standardized schema +5. **Inserts records** in batches of 1000 +6. **Updates job status** to `completed` + +#### Download & Extract Flow: + +```javascript +// For Salesforce - Direct CSV download +const response = await axios.get(downloadUrl, { + headers: { + 'Authorization': `Bearer ${provider_access_token}` + }, + responseType: 'text' +}); +const csvContent = response.data; + +// For Zoho - Download and extract zip +const response = await axios.get(downloadUrl, { + headers: { + 'Authorization': `Zoho-oauthtoken ${provider_access_token}` + }, + responseType: 'arraybuffer' +}); + +const JSZip = require('jszip'); +const zip = await JSZip.loadAsync(response.data); +const csvFileName = Object.keys(zip.files).find(name => name.endsWith('.csv')); +const csvContent = await zip.files[csvFileName].async('text'); +``` + +#### CSV Parsing: + +```javascript +const csv = require('csv-parser'); +const { Readable } = require('stream'); + +const records = []; +await Readable.from(csvContent) + .pipe(csv()) + .on('data', (row) => records.push(row)) + .on('end', () => { + console.log(`Parsed ${records.length} records`); + }); +``` + +#### Field Mapping Example: + +```javascript +// Salesforce CSV → Database +{ + "Id": "0035g00000XXXXX", + "FirstName": "John", + "LastName": "Doe", + "Email": "john@example.com" +} + +// Mapped to: +{ + "external_id": "0035g00000XXXXX", + "user_uuid": "user-uuid", + "provider": "salesforce", + "service": "crm", + "first_name": "John", + "last_name": "Doe", + "email": "john@example.com", + "raw_data": { /* original record */ }, + "bulk_job_id": "salesforce_contacts_1698765432_abc123" +} +``` + +--- + +## Error Scenarios + +### Scenario 1: Job Failed in Provider + +**n8n sends:** +```json +{ + "job_id": "salesforce_contacts_1698765432_abc123", + "status": "failed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "provider_access_token": "token_here", + "error_message": "INVALID_SESSION_ID: Session expired or invalid", + "metadata": { + "salesforce_job_id": "7504x00000AbCdEf", + "state": "Failed", + "error_code": "INVALID_SESSION_ID" + } +} +``` + +**Backend action:** +- Updates job status to `failed` +- Stores error message +- Returns success response to n8n + +--- + +### Scenario 2: Download Failed + +**Backend handles:** +- Catches download error +- Updates job status to `failed` +- Logs error details + +--- + +### Scenario 3: CSV Parsing Failed + +**Backend handles:** +- Catches parsing error +- Updates job status to `failed` +- Logs error with CSV sample + +--- + +## Provider-Specific Details + +### Salesforce + +**Download URL Format:** +``` +https://{instance_url}/services/data/v57.0/jobs/query/{job_id}/results +``` + +**CSV Format:** +```csv +Id,FirstName,LastName,Email,Phone +0035g00000XXXXX,John,Doe,john@example.com,+1234567890 +0035g00000YYYYY,Jane,Smith,jane@example.com,+0987654321 +``` + +**Headers:** +``` +Authorization: Bearer {access_token} +Accept: text/csv +``` + +--- + +### Zoho + +**Download URL Format:** +``` +https://www.zohoapis.com/crm/bulk/v2/read/{job_id}/result +``` + +**Response:** +- ZIP file containing CSV +- CSV file name varies + +**Headers:** +``` +Authorization: Zoho-oauthtoken {access_token} +``` + +**Extraction:** +```javascript +const JSZip = require('jszip'); +const zip = await JSZip.loadAsync(zipBuffer); +const csvFile = zip.files['Contacts.csv']; +const csvContent = await csvFile.async('text'); +``` + +--- + +## Database Storage + +### Job Status Flow: +``` +pending → processing → completed + ↓ + failed +``` + +### Tables Used: +1. **bulk_read_jobs** - Job tracking +2. **{module}_bulk** - Actual data (e.g., `contacts_bulk`, `leads_bulk`) + +--- + +## Configuration + +### Environment Variables: +```bash +# n8n webhook URL +N8N_BULK_READ_WEBHOOK_URL=https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3 + +# Backend callback base URL +API_BASE_URL=https://your-backend.com +``` + +### Required NPM Packages: +```json +{ + "csv-parser": "^3.2.0", + "jszip": "^3.10.1", + "axios": "^1.11.0" +} +``` + +Install: +```bash +npm install csv-parser jszip +``` + +--- + +## Testing + +### Test n8n Workflow: +```bash +curl -X POST 'https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://test.my.salesforce.com' \ +-H 'Content-Type: application/json' \ +-d '{ + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "fields": ["Id", "FirstName", "LastName"], + "provider_access_token": "test_token", + "backend_access_token": "test_jwt", + "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback", + "job_id": "test_job_123", + "user_id": "test_user_456" +}' +``` + +### Test Backend Callback: +```bash +curl -X POST 'https://backend.com/api/v1/bulk-read/webhook/callback' \ +-H 'Content-Type: application/json' \ +-d '{ + "job_id": "test_job_123", + "status": "completed", + "provider": "salesforce", + "service": "crm", + "module": "contacts", + "provider_access_token": "test_token", + "metadata": { + "download_url": "https://test.my.salesforce.com/services/data/v57.0/jobs/query/750.../results" + } +}' +``` + +--- + +## Summary + +### n8n Responsibilities: +✅ Initiate bulk job with provider +✅ Poll for job completion +✅ Send callback when complete +❌ Download CSV (backend handles) +❌ Parse CSV (backend handles) +❌ Store data (backend handles) + +### Backend Responsibilities: +✅ Send job request to n8n +✅ Receive callback from n8n +✅ Download CSV from provider +✅ Extract ZIP (for Zoho) +✅ Parse CSV to JSON +✅ Map fields +✅ Store in MySQL + +This separation keeps n8n lightweight and backend handles all data processing! 🎯 + diff --git a/src/api/routes/bulkReadRoutes.js b/src/api/routes/bulkReadRoutes.js index 6fa5686..b4cecf5 100644 --- a/src/api/routes/bulkReadRoutes.js +++ b/src/api/routes/bulkReadRoutes.js @@ -30,7 +30,9 @@ function validate(schema, source = 'query') { // Initiate bulk read job const initiateBulkReadSchema = Joi.object({ - module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(), + provider: Joi.string().valid('zoho', 'salesforce', 'hubspot', 'keka', 'bamboohr', 'intuit', 'quickbooks').required(), + service: Joi.string().valid('crm', 'books', 'inventory', 'hr', 'accounting', 'payroll').required(), + module: Joi.string().valid('contacts', 'leads', 'accounts', 'deals', 'opportunities', 'tasks', 'events', 'campaigns', 'cases', 'vendors', 'invoices', 'sales_orders', 'purchase_orders', 'employees', 'products', 'customers').required(), fields: Joi.array().items(Joi.string()).min(1).required(), page: Joi.number().min(1).default(1), limit: Joi.number().min(1).max(200000).default(200000) @@ -38,11 +40,11 @@ const initiateBulkReadSchema = Joi.object({ router.post('/initiate', auth, validate(initiateBulkReadSchema, 'body'), async (req, res) => { try { - const { module, fields, page, limit } = req.body; + const { provider, service, module, fields, page, limit } = req.body; const userId = req.user.uuid; const bulkReadService = new BulkReadService(); - const result = await bulkReadService.initiateBulkRead(userId, module, fields, { page, limit }); + const result = await bulkReadService.initiateBulkRead(userId, provider, service, module, fields, { page, limit }); res.json(success('Bulk read job initiated successfully', result)); } catch (error) { @@ -73,16 +75,19 @@ router.get('/job/:job_id', auth, validate(jobStatusSchema, 'params'), async (req const userJobsSchema = Joi.object({ page: Joi.number().min(1).default(1), limit: Joi.number().min(1).max(100).default(50), - status: Joi.string().valid('pending', 'in_progress', 'completed', 'failed').optional() + status: Joi.string().valid('pending', 'in_progress', 'completed', 'failed').optional(), + provider: Joi.string().valid('zoho', 'salesforce', 'hubspot', 'keka', 'bamboohr', 'intuit', 'quickbooks').optional(), + service: Joi.string().valid('crm', 'books', 'inventory', 'hr', 'accounting', 'payroll').optional(), + module: Joi.string().valid('contacts', 'leads', 'accounts', 'deals', 'opportunities', 'tasks', 'events', 'campaigns', 'cases', 'vendors', 'invoices', 'sales_orders', 'purchase_orders', 'employees', 'products', 'customers').optional() }); router.get('/jobs', auth, validate(userJobsSchema), async (req, res) => { try { - const { page, limit, status } = req.query; + const { page, limit, status, provider, service, module } = req.query; const userId = req.user.uuid; const bulkReadService = new BulkReadService(); - const result = await bulkReadService.getUserBulkReadJobs(userId, { page, limit, status }); + const result = await bulkReadService.getUserBulkReadJobs(userId, { page, limit, status, provider, service, module }); res.json(success('Bulk read jobs retrieved successfully', result)); } catch (error) { @@ -92,40 +97,85 @@ router.get('/jobs', auth, validate(userJobsSchema), async (req, res) => { // Get bulk read data for a module const moduleDataSchema = Joi.object({ - module: Joi.string().valid('contacts', 'leads', 'accounts', 'tasks', 'vendors', 'invoices', 'sales_orders', 'purchase_orders').required(), + provider: Joi.string().valid('zoho', 'salesforce', 'hubspot', 'keka', 'bamboohr', 'intuit', 'quickbooks').required(), + service: Joi.string().valid('crm', 'books', 'inventory', 'hr', 'accounting', 'payroll').required(), + module: Joi.string().valid('contacts', 'leads', 'accounts', 'deals', 'opportunities', 'tasks', 'events', 'campaigns', 'cases', 'vendors', 'invoices', 'sales_orders', 'purchase_orders', 'employees', 'products', 'customers').required(), page: Joi.number().min(1).default(1), limit: Joi.number().min(1).max(1000).default(100), orderBy: Joi.string().default('created_time'), orderDirection: Joi.string().valid('ASC', 'DESC').default('DESC') }); -router.get('/data/:module', auth, validate(moduleDataSchema, 'params'), async (req, res) => { +router.get('/data/:provider/:service/:module', auth, validate(moduleDataSchema, 'params'), async (req, res) => { try { - const { module } = req.params; + const { provider, service, module } = req.params; const { page, limit, orderBy, orderDirection } = req.query; const userId = req.user.uuid; const bulkReadService = new BulkReadService(); - const result = await bulkReadService.getBulkReadData(userId, module, { + const result = await bulkReadService.getBulkReadData(userId, provider, service, module, { page, limit, orderBy, orderDirection }); - res.json(success(`${module} data retrieved successfully`, result)); + res.json(success(`${provider} ${service} ${module} data retrieved successfully`, result)); } catch (error) { res.status(400).json(failure(error.message, 'DATA_RETRIEVAL_ERROR')); } }); -// Get available modules -router.get('/modules', auth, async (req, res) => { +// Get available providers, services and modules +router.get('/providers', auth, async (req, res) => { try { const bulkReadService = new BulkReadService(); - const modules = bulkReadService.getAvailableModules(); + const providers = bulkReadService.getAvailableProviders(); - res.json(success('Available modules retrieved successfully', modules)); + res.json(success('Available providers retrieved successfully', providers)); + } catch (error) { + res.status(400).json(failure(error.message, 'PROVIDERS_RETRIEVAL_ERROR')); + } +}); + +// Get available modules for a provider and service +router.get('/modules/:provider/:service', auth, async (req, res) => { + try { + const { provider, service } = req.params; + const bulkReadService = new BulkReadService(); + const modules = bulkReadService.getAvailableModules(provider, service); + + res.json(success(`Available modules for ${provider} ${service} retrieved successfully`, modules)); } catch (error) { res.status(400).json(failure(error.message, 'MODULES_RETRIEVAL_ERROR')); } }); +// Webhook callback endpoint for n8n +// n8n sends provider's job_id which is used to fetch results +const n8nCallbackSchema = Joi.object({ + job_id: Joi.string().required(), // Provider's job ID (from Salesforce/Zoho) + backend_job_id: Joi.string().optional(), // Backend's tracking job ID (optional) + user_id: Joi.string().required(), // User UUID + status: Joi.string().valid('pending', 'in_progress', 'completed', 'failed').required(), + provider: Joi.string().required(), + service: Joi.string().required(), + module: Joi.string().required(), + provider_access_token: Joi.string().required(), // For downloading CSV from provider + backend_access_token: Joi.string().optional(), // For user authentication (if needed) + metadata: Joi.object().optional(), // Should include instance_url for Salesforce + error_message: Joi.string().optional() +}); + +router.post('/webhook/callback', validate(n8nCallbackSchema, 'body'), async (req, res) => { + try { + const callbackData = req.body; + + const bulkReadService = new BulkReadService(); + const result = await bulkReadService.handleN8nCallback(callbackData); + + res.json(success('Webhook callback processed successfully', result)); + } catch (error) { + res.status(400).json(failure(error.message, 'WEBHOOK_CALLBACK_ERROR')); + } +}); + module.exports = router; + diff --git a/src/data/models/bulkReadJobs.js b/src/data/models/bulkReadJobs.js new file mode 100644 index 0000000..84b633c --- /dev/null +++ b/src/data/models/bulkReadJobs.js @@ -0,0 +1,107 @@ +const { DataTypes } = require('sequelize'); + +module.exports = (sequelize) => { + const BulkReadJobs = sequelize.define('BulkReadJobs', { + id: { + type: DataTypes.STRING(255), + primaryKey: true + }, + user_uuid: { + type: DataTypes.CHAR(36), + allowNull: false, + references: { + model: 'users', + key: 'uuid' + } + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false + }, + service: { + type: DataTypes.STRING(50), + allowNull: false + }, + module: { + type: DataTypes.STRING(100), + allowNull: false + }, + operation: { + type: DataTypes.STRING(50), + allowNull: false, + defaultValue: 'bulk_read' + }, + state: { + type: DataTypes.STRING(50), + allowNull: false + }, + file_type: { + type: DataTypes.STRING(10), + allowNull: false, + defaultValue: 'CSV' + }, + download_url: { + type: DataTypes.TEXT, + allowNull: true + }, + records_count: { + type: DataTypes.INTEGER, + defaultValue: 0 + }, + processed_count: { + type: DataTypes.INTEGER, + defaultValue: 0 + }, + status: { + type: DataTypes.STRING(50), + defaultValue: 'pending' + }, + error_message: { + type: DataTypes.TEXT, + allowNull: true + }, + request_params: { + type: DataTypes.JSON, + allowNull: true + }, + response_meta: { + type: DataTypes.JSON, + allowNull: true + }, + created_at: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW + }, + updated_at: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW + } + }, { + tableName: 'bulk_read_jobs', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { + name: 'idx_user_provider_service', + fields: ['user_uuid', 'provider', 'service'] + }, + { + name: 'idx_module', + fields: ['module'] + }, + { + name: 'idx_status', + fields: ['status'] + }, + { + name: 'idx_created_at', + fields: ['created_at'] + } + ] + }); + + return BulkReadJobs; +}; diff --git a/src/data/models/contactsBulk.js b/src/data/models/contactsBulk.js new file mode 100644 index 0000000..d06205d --- /dev/null +++ b/src/data/models/contactsBulk.js @@ -0,0 +1,178 @@ +const { DataTypes } = require('sequelize'); + +module.exports = (sequelize) => { + const ContactsBulk = sequelize.define('ContactsBulk', { + internal_id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true + }, + external_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + user_uuid: { + type: DataTypes.CHAR(36), + allowNull: false, + references: { + model: 'users', + key: 'uuid' + } + }, + provider: { + type: DataTypes.STRING(50), + allowNull: false + }, + service: { + type: DataTypes.STRING(50), + allowNull: false + }, + first_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + last_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + email: { + type: DataTypes.STRING(255), + allowNull: true + }, + phone: { + type: DataTypes.STRING(255), + allowNull: true + }, + mobile: { + type: DataTypes.STRING(255), + allowNull: true + }, + lead_source: { + type: DataTypes.STRING(255), + allowNull: true + }, + account_name: { + type: DataTypes.STRING(255), + allowNull: true + }, + company: { + type: DataTypes.STRING(255), + allowNull: true + }, + owner: { + type: DataTypes.STRING(255), + allowNull: true + }, + title: { + type: DataTypes.STRING(255), + allowNull: true + }, + department: { + type: DataTypes.STRING(255), + allowNull: true + }, + address_line_1: { + type: DataTypes.STRING(500), + allowNull: true + }, + address_line_2: { + type: DataTypes.STRING(500), + allowNull: true + }, + city: { + type: DataTypes.STRING(255), + allowNull: true + }, + state: { + type: DataTypes.STRING(255), + allowNull: true + }, + country: { + type: DataTypes.STRING(255), + allowNull: true + }, + postal_code: { + type: DataTypes.STRING(50), + allowNull: true + }, + website: { + type: DataTypes.STRING(255), + allowNull: true + }, + description: { + type: DataTypes.TEXT, + allowNull: true + }, + lead_status: { + type: DataTypes.STRING(255), + allowNull: true + }, + contact_status: { + type: DataTypes.STRING(255), + allowNull: true + }, + created_time: { + type: DataTypes.DATE, + allowNull: true + }, + modified_time: { + type: DataTypes.DATE, + allowNull: true + }, + bulk_job_id: { + type: DataTypes.STRING(255), + allowNull: true + }, + raw_data: { + type: DataTypes.JSON, + allowNull: true + }, + created_at: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW + }, + updated_at: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW + } + }, { + tableName: 'contacts_bulk', + timestamps: true, + createdAt: 'created_at', + updatedAt: 'updated_at', + indexes: [ + { + name: 'idx_user_provider_service', + fields: ['user_uuid', 'provider', 'service'] + }, + { + name: 'idx_external_id', + fields: ['external_id'] + }, + { + name: 'idx_bulk_job', + fields: ['bulk_job_id'] + }, + { + name: 'idx_created_time', + fields: ['created_time'] + }, + { + name: 'idx_email', + fields: ['email'] + }, + { + name: 'idx_provider', + fields: ['provider'] + }, + { + name: 'idx_service', + fields: ['service'] + } + ] + }); + + return ContactsBulk; +}; diff --git a/src/data/repositories/bulkReadRepository.js b/src/data/repositories/bulkReadRepository.js new file mode 100644 index 0000000..5b4c90c --- /dev/null +++ b/src/data/repositories/bulkReadRepository.js @@ -0,0 +1,420 @@ +const sequelize = require('../../db/pool'); +const logger = require('../../utils/logger'); + +class BulkReadRepository { + /** + * Create a new bulk read job + * @param {Object} jobData - Job data + * @returns {Promise} Created job + */ + static async createBulkReadJob(jobData) { + try { + const query = ` + INSERT INTO bulk_read_jobs ( + id, user_uuid, provider, service, module, operation, + state, file_type, records_count, processed_count, + status, request_params, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW(), NOW()) + `; + + const values = [ + jobData.id, + jobData.user_uuid, + jobData.provider, + jobData.service, + jobData.module, + jobData.operation || 'bulk_read', + jobData.state || 'ADDED', + jobData.file_type || 'CSV', + jobData.records_count || 0, + jobData.processed_count || 0, + jobData.status || 'pending', + JSON.stringify(jobData.request_params || {}) + ]; + + await sequelize.query(query, { + replacements: values, + type: sequelize.QueryTypes.INSERT + }); + + logger.info('Bulk read job created', { + jobId: jobData.id, + provider: jobData.provider, + service: jobData.service, + module: jobData.module + }); + + return jobData; + } catch (error) { + logger.error('Error creating bulk read job', { + jobData, + error: error.message + }); + throw error; + } + } + + /** + * Get bulk read job by ID + * @param {string} jobId - Job ID + * @returns {Promise} Job data + */ + static async getBulkReadJob(jobId) { + try { + const query = ` + SELECT * FROM bulk_read_jobs + WHERE id = ? + `; + + const rows = await sequelize.query(query, { + replacements: [jobId], + type: sequelize.QueryTypes.SELECT + }); + + return rows[0] || null; + } catch (error) { + logger.error('Error getting bulk read job', { + jobId, + error: error.message + }); + throw error; + } + } + + /** + * Update bulk read job + * @param {string} jobId - Job ID + * @param {Object} updateData - Data to update + * @returns {Promise} Success status + */ + static async updateBulkReadJob(jobId, updateData) { + try { + const updateFields = []; + const values = []; + + // Filter out undefined values and build the update query + Object.keys(updateData).forEach(key => { + const value = updateData[key]; + + // Skip undefined values + if (value === undefined) { + return; + } + + if (key === 'response_meta' || key === 'request_params') { + updateFields.push(`${key} = ?`); + values.push(JSON.stringify(value)); + } else { + updateFields.push(`${key} = ?`); + values.push(value); + } + }); + + // Add updated_at without placeholder since it uses NOW() + updateFields.push('updated_at = NOW()'); + + // Add jobId for WHERE clause + values.push(jobId); + + const query = ` + UPDATE bulk_read_jobs + SET ${updateFields.join(', ')} + WHERE id = ? + `; + + console.log('🔄 Update query:', query); + console.log('🔄 Update values:', values); + console.log('🔄 Values count:', values.length); + + const [results, metadata] = await sequelize.query(query, { + replacements: values, + type: sequelize.QueryTypes.UPDATE + }); + + // For UPDATE queries, Sequelize returns [results, metadata] where metadata contains affectedRows + return (metadata && metadata.affectedRows > 0) || results > 0; + } catch (error) { + logger.error('Error updating bulk read job', { + jobId, + updateData, + query: query, + values: values, + error: error.message + }); + throw error; + } + } + + /** + * Get user's bulk read jobs + * @param {string} userId - User UUID + * @param {Object} options - Query options + * @returns {Promise} Jobs list + */ + static async getUserBulkReadJobs(userId, options = {}) { + try { + const { page = 1, limit = 50, status, provider, service, module } = options; + const offset = (page - 1) * limit; + + let whereConditions = ['user_uuid = ?']; + let values = [userId]; + + if (status) { + whereConditions.push('status = ?'); + values.push(status); + } + + if (provider) { + whereConditions.push('provider = ?'); + values.push(provider); + } + + if (service) { + whereConditions.push('service = ?'); + values.push(service); + } + + if (module) { + whereConditions.push('module = ?'); + values.push(module); + } + + const query = ` + SELECT * FROM bulk_read_jobs + WHERE ${whereConditions.join(' AND ')} + ORDER BY created_at DESC + LIMIT ? OFFSET ? + `; + + values.push(limit, offset); + + const rows = await sequelize.query(query, { + replacements: values, + type: sequelize.QueryTypes.SELECT + }); + + return rows; + } catch (error) { + logger.error('Error getting user bulk read jobs', { + userId, + options, + error: error.message + }); + throw error; + } + } + + /** + * Get user data for a specific module + * @param {string} userId - User UUID + * @param {string} provider - Provider name + * @param {string} service - Service name + * @param {string} module - Module name + * @param {Object} options - Query options + * @returns {Promise} Data rows + */ + static async getUserData(userId, provider, service, module, options = {}) { + try { + const { limit = 100, offset = 0, orderBy = 'created_time', orderDirection = 'DESC' } = options; + + const tableName = `${module}_bulk`; + + const query = ` + SELECT * FROM ${tableName} + WHERE user_uuid = ? AND provider = ? AND service = ? + ORDER BY ${orderBy} ${orderDirection} + LIMIT ? OFFSET ? + `; + + const rows = await sequelize.query(query, { + replacements: [userId, provider, service, limit, offset], + type: sequelize.QueryTypes.SELECT + }); + + return rows; + } catch (error) { + // If table doesn't exist, return empty array + if (error.message.includes('ER_NO_SUCH_TABLE') || error.message.includes("doesn't exist")) { + return []; + } + + logger.error('Error getting user data', { + userId, + provider, + service, + module, + error: error.message + }); + throw error; + } + } + + /** + * Get user data count for a specific module + * @param {string} userId - User UUID + * @param {string} provider - Provider name + * @param {string} service - Service name + * @param {string} module - Module name + * @returns {Promise} Count + */ + static async getUserDataCount(userId, provider, service, module) { + try { + const tableName = `${module}_bulk`; + + const query = ` + SELECT COUNT(*) as count FROM ${tableName} + WHERE user_uuid = ? AND provider = ? AND service = ? + `; + + const rows = await sequelize.query(query, { + replacements: [userId, provider, service], + type: sequelize.QueryTypes.SELECT + }); + + return rows[0]?.count || 0; + } catch (error) { + // If table doesn't exist, return 0 + if (error.message.includes('ER_NO_SUCH_TABLE') || error.message.includes("doesn't exist")) { + return 0; + } + + logger.error('Error getting user data count', { + userId, + provider, + service, + module, + error: error.message + }); + throw error; + } + } + + /** + * Insert bulk data into module table + * @param {string} module - Module name + * @param {Array} data - Data to insert + * @returns {Promise} Number of inserted records + */ + static async insertBulkData(module, data) { + if (!data || data.length === 0) { + return 0; + } + + try { + const tableName = `${module}_bulk`; + const firstRecord = data[0]; + + // Get all columns from first record, filtering out undefined values + const allColumns = Object.keys(firstRecord); + const columns = allColumns.filter(col => firstRecord[col] !== undefined); + + const placeholders = columns.map(() => '?').join(', '); + const query = ` + INSERT INTO ${tableName} (${columns.join(', ')}) + VALUES ${data.map(() => `(${placeholders})`).join(', ')} + `; + + const values = []; + data.forEach(record => { + columns.forEach(column => { + let value = record[column]; + + // Handle undefined values - convert to NULL + if (value === undefined) { + value = null; + } + + // Handle JSON fields + if (column === 'raw_data' && typeof value === 'object' && value !== null) { + value = JSON.stringify(value); + } + + values.push(value); + }); + }); + + console.log('🔍 Insert Debug:'); + console.log('📋 Table:', tableName); + console.log('📋 Columns:', columns); + console.log('📋 Columns count:', columns.length); + console.log('📋 Records count:', data.length); + console.log('📋 Expected values count:', columns.length * data.length); + console.log('📋 Actual values count:', values.length); + console.log('📋 First record structure:', JSON.stringify(data[0], null, 2)); + console.log('📋 Query preview:', query.substring(0, 300) + '...'); + console.log('📋 Full query:', query); + console.log('📋 First record values:', values.slice(0, columns.length)); + console.log('📋 Values per record:', columns.length); + console.log('📋 Total values:', values.length); + console.log('📋 Expected values for all records:', data.length * columns.length); + + // Log each record being inserted + data.slice(0, 3).forEach((record, index) => { + console.log(`📋 Record ${index + 1} mapping:`); + columns.forEach((col, colIndex) => { + const valueIndex = index * columns.length + colIndex; + console.log(` ${col}: "${values[valueIndex]}"`); + }); + }); + + console.log('🚀 Executing SQL query...'); + let results, metadata; + + try { + [results, metadata] = await sequelize.query(query, { + replacements: values, + type: sequelize.QueryTypes.INSERT + }); + console.log('✅ SQL query executed successfully'); + console.log('📋 Results:', JSON.stringify(results, null, 2)); + console.log('📋 Metadata:', JSON.stringify(metadata, null, 2)); + + // Check for warnings + if (metadata.warningCount > 0) { + console.warn(`⚠️ SQL warnings detected: ${metadata.warningCount} warnings`); + } + } catch (sqlError) { + console.error('❌ SQL execution failed:'); + console.error('📋 Error code:', sqlError.code); + console.error('📋 Error message:', sqlError.message); + console.error('📋 SQL State:', sqlError.sqlState); + console.error('📋 SQL Query (first 500 chars):', query.substring(0, 500)); + console.error('📋 First 10 values:', values.slice(0, 10)); + throw sqlError; + } + + // For INSERT queries, check the actual results + const insertedCount = metadata.affectedRows || data.length; + + console.log(`📊 Insert Results Summary:`); + console.log(` Records attempted: ${data.length}`); + console.log(` Records actually inserted: ${insertedCount}`); + console.log(` Affected rows: ${metadata.affectedRows}`); + console.log(` Insert ID: ${metadata.insertId}`); + + if (insertedCount !== data.length) { + console.warn(`⚠️ Mismatch: Expected to insert ${data.length} records, but only ${insertedCount} were inserted`); + console.warn(` This could indicate constraint violations or other SQL issues`); + } + + logger.info('Bulk data inserted', { + module, + tableName, + recordsCount: data.length, + insertedCount: insertedCount + }); + + return insertedCount; + } catch (error) { + logger.error('Error inserting bulk data', { + module, + error: error.message, + recordsCount: data.length, + stack: error.stack + }); + throw error; + } + } +} + +module.exports = BulkReadRepository; diff --git a/src/db/migrations/014_create_contacts_bulk.sql b/src/db/migrations/014_create_contacts_bulk.sql new file mode 100644 index 0000000..34fdc8f --- /dev/null +++ b/src/db/migrations/014_create_contacts_bulk.sql @@ -0,0 +1,45 @@ +-- Migration: Create Contacts Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read contacts data from multiple service providers + +CREATE TABLE IF NOT EXISTS contacts_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + first_name VARCHAR(255) NULL, + last_name VARCHAR(255) NULL, + email VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + mobile VARCHAR(255) NULL, + lead_source VARCHAR(255) NULL, + account_name VARCHAR(255) NULL, + company VARCHAR(255) NULL, + owner VARCHAR(255) NULL, + title VARCHAR(255) NULL, + department VARCHAR(255) NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + website VARCHAR(255) NULL, + description TEXT NULL, + lead_status VARCHAR(255) NULL, + contact_status VARCHAR(255) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_email (email), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_contacts_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/015_create_leads_bulk.sql b/src/db/migrations/015_create_leads_bulk.sql new file mode 100644 index 0000000..59f7ea1 --- /dev/null +++ b/src/db/migrations/015_create_leads_bulk.sql @@ -0,0 +1,47 @@ +-- Migration: Create Leads Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read leads data from multiple service providers + +CREATE TABLE IF NOT EXISTS leads_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + first_name VARCHAR(255) NULL, + last_name VARCHAR(255) NULL, + company VARCHAR(255) NULL, + email VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + mobile VARCHAR(255) NULL, + lead_source VARCHAR(255) NULL, + lead_status VARCHAR(255) NULL, + owner VARCHAR(255) NULL, + title VARCHAR(255) NULL, + industry VARCHAR(255) NULL, + annual_revenue DECIMAL(15,2) NULL, + num_employees INT NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + website VARCHAR(255) NULL, + description TEXT NULL, + rating VARCHAR(50) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_email (email), + INDEX idx_lead_status (lead_status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_leads_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/016_create_accounts_bulk.sql b/src/db/migrations/016_create_accounts_bulk.sql new file mode 100644 index 0000000..7f41e32 --- /dev/null +++ b/src/db/migrations/016_create_accounts_bulk.sql @@ -0,0 +1,43 @@ +-- Migration: Create Accounts Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read accounts data from multiple service providers + +CREATE TABLE IF NOT EXISTS accounts_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + account_name VARCHAR(255) NULL, + account_number VARCHAR(255) NULL, + account_type VARCHAR(255) NULL, + industry VARCHAR(255) NULL, + annual_revenue DECIMAL(15,2) NULL, + num_employees INT NULL, + owner VARCHAR(255) NULL, + parent_account VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + fax VARCHAR(255) NULL, + website VARCHAR(255) NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + description TEXT NULL, + account_status VARCHAR(255) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_account_name (account_name), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_accounts_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/017_create_deals_bulk.sql b/src/db/migrations/017_create_deals_bulk.sql new file mode 100644 index 0000000..ee87e8a --- /dev/null +++ b/src/db/migrations/017_create_deals_bulk.sql @@ -0,0 +1,40 @@ +-- Migration: Create Deals Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read deals/opportunities data from multiple service providers + +CREATE TABLE IF NOT EXISTS deals_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + deal_name VARCHAR(255) NULL, + account_name VARCHAR(255) NULL, + contact_name VARCHAR(255) NULL, + amount DECIMAL(15,2) NULL, + stage VARCHAR(255) NULL, + probability DECIMAL(5,2) NULL, + expected_revenue DECIMAL(15,2) NULL, + close_date DATE NULL, + lead_source VARCHAR(255) NULL, + next_step VARCHAR(500) NULL, + type VARCHAR(255) NULL, + owner VARCHAR(255) NULL, + campaign_source VARCHAR(255) NULL, + description TEXT NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_deal_name (deal_name), + INDEX idx_stage (stage), + INDEX idx_close_date (close_date), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_deals_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/018_create_tasks_bulk.sql b/src/db/migrations/018_create_tasks_bulk.sql new file mode 100644 index 0000000..017926d --- /dev/null +++ b/src/db/migrations/018_create_tasks_bulk.sql @@ -0,0 +1,36 @@ +-- Migration: Create Tasks Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read tasks data from multiple service providers + +CREATE TABLE IF NOT EXISTS tasks_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + subject VARCHAR(255) NULL, + status VARCHAR(255) NULL, + priority VARCHAR(255) NULL, + due_date DATETIME NULL, + owner VARCHAR(255) NULL, + related_to VARCHAR(255) NULL, + related_to_type VARCHAR(100) NULL, + contact_name VARCHAR(255) NULL, + account_name VARCHAR(255) NULL, + description TEXT NULL, + comments TEXT NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_due_date (due_date), + INDEX idx_status (status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_tasks_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/019_create_vendors_bulk.sql b/src/db/migrations/019_create_vendors_bulk.sql new file mode 100644 index 0000000..53e72d0 --- /dev/null +++ b/src/db/migrations/019_create_vendors_bulk.sql @@ -0,0 +1,46 @@ +-- Migration: Create Vendors Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read vendors/suppliers data from multiple service providers + +CREATE TABLE IF NOT EXISTS vendors_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + vendor_name VARCHAR(255) NULL, + vendor_code VARCHAR(255) NULL, + vendor_type VARCHAR(255) NULL, + contact_person VARCHAR(255) NULL, + email VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + mobile VARCHAR(255) NULL, + fax VARCHAR(255) NULL, + website VARCHAR(255) NULL, + tax_id VARCHAR(255) NULL, + payment_terms VARCHAR(255) NULL, + currency VARCHAR(10) NULL, + credit_limit DECIMAL(15,2) NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + description TEXT NULL, + vendor_status VARCHAR(255) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_vendor_name (vendor_name), + INDEX idx_email (email), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_vendors_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/020_create_invoices_bulk.sql b/src/db/migrations/020_create_invoices_bulk.sql new file mode 100644 index 0000000..77f48cb --- /dev/null +++ b/src/db/migrations/020_create_invoices_bulk.sql @@ -0,0 +1,44 @@ +-- Migration: Create Invoices Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read invoices data from multiple service providers + +CREATE TABLE IF NOT EXISTS invoices_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + invoice_number VARCHAR(255) NULL, + customer_name VARCHAR(255) NULL, + customer_id VARCHAR(255) NULL, + invoice_date DATE NULL, + due_date DATE NULL, + status VARCHAR(255) NULL, + sub_total DECIMAL(15,2) NULL, + tax_amount DECIMAL(15,2) NULL, + total_amount DECIMAL(15,2) NULL, + balance DECIMAL(15,2) NULL, + currency VARCHAR(10) NULL, + payment_terms VARCHAR(255) NULL, + sales_person VARCHAR(255) NULL, + reference_number VARCHAR(255) NULL, + subject VARCHAR(255) NULL, + notes TEXT NULL, + terms_conditions TEXT NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_invoice_number (invoice_number), + INDEX idx_customer_name (customer_name), + INDEX idx_invoice_date (invoice_date), + INDEX idx_status (status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_invoices_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/021_create_sales_orders_bulk.sql b/src/db/migrations/021_create_sales_orders_bulk.sql new file mode 100644 index 0000000..cf552aa --- /dev/null +++ b/src/db/migrations/021_create_sales_orders_bulk.sql @@ -0,0 +1,46 @@ +-- Migration: Create Sales Orders Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read sales orders data from multiple service providers + +CREATE TABLE IF NOT EXISTS sales_orders_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + so_number VARCHAR(255) NULL, + customer_name VARCHAR(255) NULL, + customer_id VARCHAR(255) NULL, + order_date DATE NULL, + shipment_date DATE NULL, + delivery_date DATE NULL, + status VARCHAR(255) NULL, + sub_total DECIMAL(15,2) NULL, + tax_amount DECIMAL(15,2) NULL, + total_amount DECIMAL(15,2) NULL, + discount DECIMAL(15,2) NULL, + currency VARCHAR(10) NULL, + sales_person VARCHAR(255) NULL, + reference_number VARCHAR(255) NULL, + subject VARCHAR(255) NULL, + notes TEXT NULL, + terms_conditions TEXT NULL, + shipping_address TEXT NULL, + billing_address TEXT NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_so_number (so_number), + INDEX idx_customer_name (customer_name), + INDEX idx_order_date (order_date), + INDEX idx_status (status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_sales_orders_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/022_create_purchase_orders_bulk.sql b/src/db/migrations/022_create_purchase_orders_bulk.sql new file mode 100644 index 0000000..1efc949 --- /dev/null +++ b/src/db/migrations/022_create_purchase_orders_bulk.sql @@ -0,0 +1,46 @@ +-- Migration: Create Purchase Orders Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read purchase orders data from multiple service providers + +CREATE TABLE IF NOT EXISTS purchase_orders_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + po_number VARCHAR(255) NULL, + vendor_name VARCHAR(255) NULL, + vendor_id VARCHAR(255) NULL, + order_date DATE NULL, + expected_date DATE NULL, + delivery_date DATE NULL, + status VARCHAR(255) NULL, + sub_total DECIMAL(15,2) NULL, + tax_amount DECIMAL(15,2) NULL, + total_amount DECIMAL(15,2) NULL, + discount DECIMAL(15,2) NULL, + currency VARCHAR(10) NULL, + buyer VARCHAR(255) NULL, + reference_number VARCHAR(255) NULL, + subject VARCHAR(255) NULL, + notes TEXT NULL, + terms_conditions TEXT NULL, + shipping_address TEXT NULL, + billing_address TEXT NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_po_number (po_number), + INDEX idx_vendor_name (vendor_name), + INDEX idx_order_date (order_date), + INDEX idx_status (status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_purchase_orders_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/023_create_bulk_read_jobs.sql b/src/db/migrations/023_create_bulk_read_jobs.sql new file mode 100644 index 0000000..ca858be --- /dev/null +++ b/src/db/migrations/023_create_bulk_read_jobs.sql @@ -0,0 +1,27 @@ +-- Migration: Create Bulk Read Jobs Table (Provider Agnostic) +-- Description: Creates table for tracking bulk read jobs from multiple service providers + +CREATE TABLE IF NOT EXISTS bulk_read_jobs ( + id VARCHAR(255) PRIMARY KEY, + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + module VARCHAR(100) NOT NULL, + operation VARCHAR(50) NOT NULL DEFAULT 'bulk_read', + state VARCHAR(50) NOT NULL, + file_type VARCHAR(10) NOT NULL DEFAULT 'CSV', + download_url TEXT NULL, + records_count INT DEFAULT 0, + processed_count INT DEFAULT 0, + status VARCHAR(50) DEFAULT 'pending', + error_message TEXT NULL, + request_params JSON NULL, + response_meta JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_module (module), + INDEX idx_status (status), + INDEX idx_created_at (created_at), + CONSTRAINT fk_bulk_read_jobs_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/024_create_employees_bulk.sql b/src/db/migrations/024_create_employees_bulk.sql new file mode 100644 index 0000000..90d1b6f --- /dev/null +++ b/src/db/migrations/024_create_employees_bulk.sql @@ -0,0 +1,54 @@ +-- Migration: Create Employees Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read employee/HR data from multiple service providers (Keka, BambooHR, etc.) + +CREATE TABLE IF NOT EXISTS employees_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + employee_id VARCHAR(255) NULL, + first_name VARCHAR(255) NULL, + last_name VARCHAR(255) NULL, + email VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + mobile VARCHAR(255) NULL, + job_title VARCHAR(255) NULL, + department VARCHAR(255) NULL, + manager VARCHAR(255) NULL, + employment_type VARCHAR(255) NULL, + employment_status VARCHAR(255) NULL, + hire_date DATE NULL, + termination_date DATE NULL, + salary DECIMAL(15,2) NULL, + currency VARCHAR(10) NULL, + location VARCHAR(255) NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + date_of_birth DATE NULL, + gender VARCHAR(50) NULL, + marital_status VARCHAR(50) NULL, + emergency_contact VARCHAR(255) NULL, + emergency_phone VARCHAR(255) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_employee_id (employee_id), + INDEX idx_email (email), + INDEX idx_department (department), + INDEX idx_employment_status (employment_status), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_employees_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/025_create_products_bulk.sql b/src/db/migrations/025_create_products_bulk.sql new file mode 100644 index 0000000..b0dd65e --- /dev/null +++ b/src/db/migrations/025_create_products_bulk.sql @@ -0,0 +1,50 @@ +-- Migration: Create Products Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read product/inventory data from multiple service providers + +CREATE TABLE IF NOT EXISTS products_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + product_name VARCHAR(255) NULL, + product_code VARCHAR(255) NULL, + sku VARCHAR(255) NULL, + category VARCHAR(255) NULL, + subcategory VARCHAR(255) NULL, + brand VARCHAR(255) NULL, + unit VARCHAR(50) NULL, + description TEXT NULL, + cost_price DECIMAL(15,2) NULL, + selling_price DECIMAL(15,2) NULL, + mrp DECIMAL(15,2) NULL, + currency VARCHAR(10) NULL, + tax_percentage DECIMAL(5,2) NULL, + hsn_code VARCHAR(255) NULL, + barcode VARCHAR(255) NULL, + weight DECIMAL(10,3) NULL, + weight_unit VARCHAR(50) NULL, + dimensions VARCHAR(255) NULL, + stock_quantity INT NULL, + reorder_level INT NULL, + vendor_name VARCHAR(255) NULL, + product_status VARCHAR(255) NULL, + is_active BOOLEAN DEFAULT TRUE, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_product_name (product_name), + INDEX idx_product_code (product_code), + INDEX idx_sku (sku), + INDEX idx_category (category), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_products_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/db/migrations/026_create_customers_bulk.sql b/src/db/migrations/026_create_customers_bulk.sql new file mode 100644 index 0000000..85dd8c7 --- /dev/null +++ b/src/db/migrations/026_create_customers_bulk.sql @@ -0,0 +1,50 @@ +-- Migration: Create Customers Bulk Table (Provider Agnostic) +-- Description: Creates table for storing bulk read customer data from multiple service providers + +CREATE TABLE IF NOT EXISTS customers_bulk ( + internal_id INT AUTO_INCREMENT PRIMARY KEY, + external_id VARCHAR(255), + user_uuid CHAR(36) NOT NULL, + provider VARCHAR(50) NOT NULL, + service VARCHAR(50) NOT NULL, + customer_name VARCHAR(255) NULL, + customer_code VARCHAR(255) NULL, + customer_type VARCHAR(255) NULL, + contact_person VARCHAR(255) NULL, + email VARCHAR(255) NULL, + phone VARCHAR(255) NULL, + mobile VARCHAR(255) NULL, + fax VARCHAR(255) NULL, + website VARCHAR(255) NULL, + tax_id VARCHAR(255) NULL, + credit_limit DECIMAL(15,2) NULL, + payment_terms VARCHAR(255) NULL, + currency VARCHAR(10) NULL, + price_list VARCHAR(255) NULL, + sales_person VARCHAR(255) NULL, + address_line_1 VARCHAR(500) NULL, + address_line_2 VARCHAR(500) NULL, + city VARCHAR(255) NULL, + state VARCHAR(255) NULL, + country VARCHAR(255) NULL, + postal_code VARCHAR(50) NULL, + shipping_address TEXT NULL, + billing_address TEXT NULL, + notes TEXT NULL, + customer_status VARCHAR(255) NULL, + created_time DATETIME NULL, + modified_time DATETIME NULL, + bulk_job_id VARCHAR(255) NULL, + raw_data JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_user_provider_service (user_uuid, provider, service), + INDEX idx_external_id (external_id), + INDEX idx_bulk_job (bulk_job_id), + INDEX idx_created_time (created_time), + INDEX idx_customer_name (customer_name), + INDEX idx_email (email), + INDEX idx_provider (provider), + INDEX idx_service (service), + CONSTRAINT fk_customers_bulk_user FOREIGN KEY (user_uuid) REFERENCES users(uuid) ON DELETE CASCADE +); diff --git a/src/services/bulkReadService.js b/src/services/bulkReadService.js index 1e32dfd..75b4408 100644 --- a/src/services/bulkReadService.js +++ b/src/services/bulkReadService.js @@ -1,45 +1,929 @@ const axios = require('axios'); -const ZohoBulkReadRepository = require('../data/repositories/zohoBulkReadRepository'); +const BulkReadRepository = require('../data/repositories/bulkReadRepository'); const userAuthTokenRepo = require('../data/repositories/userAuthTokenRepository'); +const CsvService = require('./csvService'); const { decrypt } = require('../utils/crypto'); const logger = require('../utils/logger'); class BulkReadService { constructor() { - this.baseUrl = 'https://www.zohoapis.com'; + // n8n webhook URL for bulk read orchestration + this.n8nWebhookUrl = process.env.N8N_BULK_READ_WEBHOOK_URL || 'https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3'; + + // Backend callback URL base + this.backendCallbackBaseUrl = process.env.API_BASE_URL || process.env.MY_BASE_URL || 'http://localhost:3000'; + + this.providerConfigs = { + zoho: { + baseUrl: 'https://www.zohoapis.com', + services: ['crm', 'books', 'inventory'], + endpoints: { + crm: '/crm/bulk/v2/read', + books: '/books/v3/bulkread', + inventory: '/inventory/v1/bulkread' + } + }, + salesforce: { + baseUrl: 'https://login.salesforce.com', // Used for auth only + services: ['crm'], + endpoints: { + crm: '/services/data/v57.0/jobs/query' // Will be used with instance URL + } + }, + hubspot: { + baseUrl: 'https://api.hubapi.com', + services: ['crm'], + endpoints: { + crm: '/crm/v3/objects' + } + }, + keka: { + baseUrl: 'https://api.keka.com', + services: ['hr'], + endpoints: { + hr: '/v1/bulk-export' + } + }, + bamboohr: { + baseUrl: 'https://api.bamboohr.com', + services: ['hr'], + endpoints: { + hr: '/v1/reports' + } + }, + intuit: { + baseUrl: 'https://sandbox-quickbooks.api.intuit.com', + services: ['accounting'], + endpoints: { + accounting: '/v3/companyid/reports' + } + }, + quickbooks: { + baseUrl: 'https://sandbox-quickbooks.api.intuit.com', + services: ['accounting'], + endpoints: { + accounting: '/v3/companyid/reports' + } + } + }; } /** * Initiate a bulk read job for a specific module * @param {string} userId - User UUID - * @param {string} module - Zoho module name + * @param {string} provider - Provider name (zoho, salesforce, etc.) + * @param {string} service - Service name (crm, books, etc.) + * @param {string} module - Module name * @param {Array} fields - Fields to fetch * @param {Object} options - Additional options * @param {string} accessToken - Frontend access token for callback URL * @returns {Promise} Job details */ - async initiateBulkRead(userId, module, fields, options = {}, accessToken = null) { + async initiateBulkRead(userId, provider, service, module, fields, options = {}, accessToken = null) { try { - console.log(`🚀 Initiating bulk read for user ${userId}, module ${module}`); + console.log(`🚀 Initiating bulk read via n8n for user ${userId}, provider ${provider}, service ${service}, module ${module}`); + + // Validate user ID format (should be a valid UUID) + if (!userId || typeof userId !== 'string' || userId.length !== 36) { + throw new Error(`Invalid user ID format. Expected UUID, got: ${userId}`); + } + + // Validate provider and service + if (!this.providerConfigs[provider]) { + throw new Error(`Unsupported provider: ${provider}`); + } + + if (!this.providerConfigs[provider].services.includes(service)) { + throw new Error(`Unsupported service ${service} for provider ${provider}`); + } // Get access token - const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, 'zoho'); + const tokenRecord = await userAuthTokenRepo.findByUserAndService(userId, provider); if (!tokenRecord) { - throw new Error('No Zoho access token found for user'); + throw new Error(`No ${provider} access token found for user`); } - const zohoAccessToken = decrypt(tokenRecord.accessToken); + const accessTokenDecrypted = decrypt(tokenRecord.accessToken); - // Prepare bulk read request - const baseUrl = process.env.API_BASE_URL || process.env.MY_BASE_URL || 'http://localhost:3000'; - let callbackUrl = `${baseUrl}/api/v1/integrations/webhooks/zoho/bulkread`; + // Generate unique job ID + const jobId = `${provider}_${module}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; - // Add access token to callback URL if provided - if (accessToken) { - callbackUrl += `?access_token=${accessToken}`; + // Prepare callback URL for n8n to call when job is complete + const callbackUrl = `${this.backendCallbackBaseUrl}/api/v1/bulk-read/webhook/callback`; + + // Add access token to callback URL if provided (for user authentication) + const callbackUrlWithAuth = accessToken ? `${callbackUrl}?access_token=${accessToken}` : callbackUrl; + + // Prepare request body for n8n + // n8n will handle the entire bulk read process and return parsed records + const n8nRequestBody = { + provider: provider, + service: service, + module: module, + fields: fields, + provider_access_token: accessTokenDecrypted, // For provider API calls + backend_access_token: accessToken, // For backend callback authentication + callback_url: callbackUrl, // Without access_token in URL since it's in body + job_id: jobId, + user_id: userId, // IMPORTANT: This must be a valid user UUID that exists in users table + options: options + }; + + // Build n8n webhook URL with query params + let n8nUrl = this.n8nWebhookUrl; + + // For Salesforce, add instance_url as query parameter + if (provider === 'salesforce' && tokenRecord.instanceUrl) { + n8nUrl += `?instance_url=${encodeURIComponent(tokenRecord.instanceUrl)}`; + console.log(`🔗 Adding Salesforce instance URL to n8n request: ${tokenRecord.instanceUrl}`); } - const bulkReadData = { + console.log('📋 n8n webhook URL:', n8nUrl); + console.log('📋 Request to n8n:', JSON.stringify(n8nRequestBody, null, 2)); + + // Store job in database immediately with pending status + await BulkReadRepository.createBulkReadJob({ + id: jobId, + user_uuid: userId, + provider: provider, + service: service, + module: module, + operation: 'bulk_read', + state: 'INITIATED', + file_type: 'csv', + records_count: 0, + status: 'pending', + request_params: { fields, ...options } + }); + + // Send request to n8n webhook + const response = await axios.post(n8nUrl, n8nRequestBody, { + headers: { + 'Content-Type': 'application/json' + }, + timeout: 30000 // 30 second timeout + }); + + console.log(`✅ n8n webhook response:`, JSON.stringify(response.data, null, 2)); + + logger.info('Bulk read job sent to n8n', { + userId, + provider, + service, + module, + jobId: jobId + }); + + return { + jobId: jobId, + status: 'initiated', + message: `Bulk read job initiated via n8n for ${provider} ${service} ${module}. Processing will be handled asynchronously.`, + provider: provider, + service: service, + state: 'INITIATED', + operation: 'bulk_read', + estimatedTime: this.getEstimatedTime(module, options.limit) + }; + + } catch (error) { + console.error('❌ Error initiating bulk read via n8n:', error.message); + logger.error('Bulk read initiation via n8n failed', { + userId, + provider, + service, + module, + error: error.message, + errorDetails: error.response?.data + }); + throw error; + } + } + + /** + * Handle webhook callback from n8n after bulk read job completion + * n8n sends provider's job_id which is used to download results + * @param {Object} callbackData - Data from n8n webhook + * @returns {Promise} Processing result + */ + async handleN8nCallback(callbackData) { + try { + const { + job_id, // Provider's job ID (from Salesforce/Zoho) + backend_job_id, // Backend's tracking job ID (optional) + user_id, // User UUID + status, + provider, + service, + module, + provider_access_token, + metadata + } = callbackData; + + console.log(`📥 Received n8n callback for provider job ${job_id}, status: ${status}`); + console.log('📋 Callback data:', JSON.stringify({...callbackData, provider_access_token: '***'}, null, 2)); + + // Find backend job record + // Priority: Use backend_job_id if provided, otherwise find by user + provider + module + recent + let job; + if (backend_job_id) { + job = await BulkReadRepository.getBulkReadJob(backend_job_id); + } else { + // Find most recent pending/processing job for this user/provider/module + const jobs = await BulkReadRepository.getUserBulkReadJobs(user_id, { + provider, + service, + module, + status: 'pending', + limit: 1 + }); + job = jobs[0]; + } + + if (!job) { + throw new Error(`Job not found for user ${user_id}, provider ${provider}, module ${module}`); + } + + const backendJobId = job.id; + console.log(`📋 Found backend job: ${backendJobId} for provider job: ${job_id}`); + + // Update job status with provider job ID + await BulkReadRepository.updateBulkReadJob(backendJobId, { + status: status === 'completed' ? 'processing' : status, + state: metadata?.state || status.toUpperCase(), + response_meta: { + ...metadata, + provider_job_id: job_id // Store provider's job ID + } + }); + + // If job completed successfully, download and process CSV + if (status === 'completed') { + console.log(`🔽 Downloading CSV file from ${provider} using provider job_id: ${job_id}...`); + + // Build download URL using provider's job_id + const downloadUrl = this.buildDownloadUrl(provider, job_id, metadata); + console.log(`📥 Download URL: ${downloadUrl}`); + + // Download and parse CSV file (handles ZIP extraction for Zoho) + console.log(`🔽 Downloading CSV from: ${downloadUrl}`); + const records = await this.downloadAndParseResultFile( + provider, + downloadUrl, + provider_access_token, + metadata + ); + console.log('records igot from csv',records) + console.log(`📊 CSV parsing result: ${records ? records.length : 0} records`); + + console.log(`✅ Parsed ${records.length} records`); + console.log(`📋 First few record IDs: ${records.slice(0, 3).map(r => r.Id || r.id || r.ID).join(', ')}`); + + // Update record count + await BulkReadRepository.updateBulkReadJob(backendJobId, { + records_count: records.length + }); + + // Prepare and insert records in batches + if (records.length > 0) { + console.log(`💾 Starting preparation of ${records.length} records for ${module}_bulk table`); + console.log(`📋 Sample raw record keys: ${Object.keys(records[0] || {}).join(', ')}`); + + // Use user_id from n8n callback (which should be a valid user UUID) + const finalUserUuid = user_id; + console.log(`👤 Using user_uuid from n8n callback: ${finalUserUuid}`); + + const preparedRecords = records.map((record, index) => { + const mappedFields = this.mapRecordFields(module, record); + + const preparedRecord = { + external_id: record.Id || record.id || record.ID, + user_uuid: finalUserUuid, + provider: provider, + service: service, + ...mappedFields, + raw_data: record, + created_time: this.formatDateForMySQL(record.CreatedDate || record.created_time || record.Created_Time), + modified_time: this.formatDateForMySQL(record.LastModifiedDate || record.modified_time || record.Modified_Time), + bulk_job_id: backendJobId + }; + + // Log first record structure for debugging + if (index === 0) { + console.log('🔍 First prepared record structure:'); + console.log('📋 Mapped fields:', JSON.stringify(mappedFields, null, 2)); + console.log('📋 Final record keys:', Object.keys(preparedRecord)); + console.log('📋 Final record sample:', JSON.stringify(preparedRecord, (key, value) => { + if (key === 'raw_data') return '[OBJECT]'; + if (key === 'provider_access_token') return '***'; + return value; + }, 2)); + } + + return preparedRecord; + }); + + console.log(`✅ Successfully prepared ${preparedRecords.length} records for insertion`); + + // Insert in batches of 1000 + const batchSize = 1000; + let totalInserted = 0; + + for (let i = 0; i < preparedRecords.length; i += batchSize) { + const batch = preparedRecords.slice(i, i + batchSize); + console.log(`🔄 Processing batch ${Math.floor(i / batchSize) + 1}: ${batch.length} records`); + + const insertedCount = await BulkReadRepository.insertBulkData(module, batch); + totalInserted += insertedCount; + console.log(`📝 Inserted batch ${Math.floor(i / batchSize) + 1}: ${insertedCount} records`); + } + + console.log(`✅ Successfully inserted ${totalInserted} records`); + + // Update to completed status + await BulkReadRepository.updateBulkReadJob(backendJobId, { + status: 'completed', + processed_count: totalInserted + }); + } else { + await BulkReadRepository.updateBulkReadJob(backendJobId, { + status: 'completed', + processed_count: 0 + }); + } + } + + logger.info('n8n callback processed successfully', { + backend_job_id: backendJobId, + provider_job_id: job_id, + status, + provider, + service, + module + }); + + return { + success: true, + message: 'Callback processed successfully', + backend_job_id: backendJobId, + provider_job_id: job_id, + status: status + }; + + } catch (error) { + console.error('❌ Error processing n8n callback:', error.message); + logger.error('n8n callback processing failed', { + job_id: callbackData.job_id, + error: error.message, + stack: error.stack + }); + + // Try to update job status to failed + if (callbackData.backend_job_id) { + try { + await BulkReadRepository.updateBulkReadJob(callbackData.backend_job_id, { + status: 'failed', + error_message: error.message + }); + } catch (updateError) { + console.error('Failed to update job status:', updateError.message); + } + } else if (callbackData.user_id) { + // Try to find and update the job + try { + const jobs = await BulkReadRepository.getUserBulkReadJobs(callbackData.user_id, { + provider: callbackData.provider, + module: callbackData.module, + status: 'pending', + limit: 1 + }); + if (jobs[0]) { + await BulkReadRepository.updateBulkReadJob(jobs[0].id, { + status: 'failed', + error_message: error.message + }); + } + } catch (updateError) { + console.error('Failed to update job status:', updateError.message); + } + } + + throw error; + } + } + + /** + * Build download URL for provider's bulk job results + * @param {string} provider - Provider name + * @param {string} providerJobId - Provider's job ID + * @param {Object} metadata - Metadata from callback (may contain instance_url, etc.) + * @returns {string} Download URL + */ + buildDownloadUrl(provider, providerJobId, metadata = {}) { + switch (provider) { + case 'salesforce': + // Salesforce needs instance URL + job ID + const instanceUrl = metadata.instance_url || metadata.instanceUrl; + if (!instanceUrl) { + throw new Error('Salesforce instance_url not provided in metadata'); + } + return `${instanceUrl}/services/data/v57.0/jobs/query/${providerJobId}/results`; + + case 'zoho': + // Zoho provides full download URL in metadata or construct it + if (metadata.download_url) { + return metadata.download_url; + } + return `https://www.zohoapis.com/crm/bulk/v2/read/${providerJobId}/result`; + + default: + // Use metadata download_url if provided + if (metadata.download_url || metadata.result_url) { + return metadata.download_url || metadata.result_url; + } + throw new Error(`Cannot construct download URL for provider: ${provider}`); + } + } + + /** + * Download and parse bulk result file from provider + * Uses existing CsvService which handles ZIP extraction for Zoho + * @param {string} provider - Provider name + * @param {string} downloadUrl - Download URL from provider + * @param {string} accessToken - Provider access token + * @param {Object} metadata - Additional metadata + * @returns {Promise} Parsed records + */ + async downloadAndParseResultFile(provider, downloadUrl, accessToken, metadata) { + try { + let records; + + if (provider === 'zoho') { + // Use existing CsvService which handles Zoho ZIP extraction + console.log('📥 Using CsvService for Zoho ZIP handling...'); + const csvService = new CsvService(); + records = await csvService.fetchCsvData(downloadUrl, accessToken); + } else if (provider === 'salesforce') { + // Salesforce returns plain CSV + console.log('📥 Downloading Salesforce CSV...'); + console.log('🔗 Download URL:', downloadUrl); + console.log('📋 Auth headers:', JSON.stringify(this.getAuthHeaders(provider, accessToken), null, 2)); + + const response = await axios.get(downloadUrl, { + headers: this.getAuthHeaders(provider, accessToken), + responseType: 'text' + }); + + console.log('📥 Salesforce response status:', response.status); + console.log('📏 Salesforce response size:', response.data.length, 'characters'); + console.log('📄 Salesforce response preview (first 1000 chars):'); + console.log(response.data.substring(0, 1000)); + console.log('📄 Salesforce response preview (last 500 chars):'); + console.log(response.data.substring(Math.max(0, response.data.length - 500))); + + // Parse CSV + records = await this.parseCsvContent(response.data); + } else { + // Generic handler for other providers + console.log(`📥 Downloading ${provider} data...`); + const response = await axios.get(downloadUrl, { + headers: this.getAuthHeaders(provider, accessToken), + responseType: 'text' + }); + + records = await this.parseCsvContent(response.data); + } + + // Log sample record to debug data structure + if (records && records.length > 0) { + console.log('📋 Sample record structure:'); + console.log(JSON.stringify(records[0], null, 2)); + } + + return records; + + } catch (error) { + console.error('❌ Error downloading/parsing result file:', error.message); + throw new Error(`Failed to download/parse: ${error.message}`); + } + } + + /** + * Parse CSV content to JSON records + * @param {string} csvContent - CSV file content + * @returns {Promise} Parsed records + */ + async parseCsvContent(csvContent) { + try { + console.log('📄 CSV content preview (first 1000 chars):'); + console.log(csvContent.substring(0, 1000)); + console.log('📏 CSV content length:', csvContent.length); + console.log('📄 CSV content preview (last 500 chars):'); + console.log(csvContent.substring(Math.max(0, csvContent.length - 500))); + + // Check for different line endings + const hasWindows = csvContent.includes('\r\n'); + const hasUnix = csvContent.includes('\n'); + const hasMac = csvContent.includes('\r'); + + console.log('🔍 Line ending analysis:'); + console.log('- Windows (\\r\\n):', hasWindows); + console.log('- Unix (\\n):', hasUnix); + console.log('- Mac (\\r):', hasMac); + + // Count actual lines + const lines = csvContent.split(/\r?\n|\r/).filter(line => line.trim() !== ''); + console.log('📊 Total lines detected:', lines.length); + console.log('📋 All lines:'); + lines.forEach((line, index) => { + console.log(`${index + 1}: ${line}`); + }); + + // Debug: Test with sample data first + if (csvContent.includes('Acme (Sample)')) { + console.log('🧪 Detected sample Salesforce data - running inline test...'); + const testResult = await this.testSalesforceParsingInline(csvContent.substring(0, 1000)); + console.log(`🧪 Inline test result: ${testResult} records`); + + // Also test simple CSV parsing logic + console.log('🧪 Testing basic CSV line parsing...'); + const testLine = '"001dN00000g7Kt4QAE","Acme (Sample)","Manufacturing","Prospect","1 (800) 667-6389","New York","NY","USA","","005dN0000085cZVQAY","2025-09-17T10:43:32.000Z"'; + const testValues = this.parseCsvLine(testLine); + console.log(`🧪 Test line values (${testValues.length}):`, testValues); + } + + // Try multiple parsing approaches + return await this.tryMultipleCsvParsers(csvContent); + + } catch (error) { + console.error('❌ Error parsing CSV:', error.message); + throw new Error(`Failed to parse CSV: ${error.message}`); + } + } + + /** + * Test parsing with inline Salesforce sample data for debugging + * @param {string} sampleCsv - Sample CSV content + * @returns {Promise} Number of records parsed + */ + async testSalesforceParsingInline(sampleCsv) { + try { + const testCsv = `"Id","Name","Industry","Type","Phone","BillingCity","BillingState","BillingCountry","Website","OwnerId","CreatedDate" +"001dN00000g7Kt4QAE","Acme (Sample)","Manufacturing","Prospect","1 (800) 667-6389","New York","NY","USA","","005dN0000085cZVQAY","2025-09-17T10:43:32.000Z" +"001dN00000g7Kt5QAE","Global Media (Sample)","Media","Prospect","1 (800) 667-6389","Toronto","Ontario","Canada","","005dN0000085cZVQAY","2025-09-17T10:43:32.000Z"`; + + const lines = testCsv.split(/\r?\n|\r/).map(line => line.trim()).filter(line => line !== ''); + console.log(`🧪 Test CSV lines: ${lines.length}`); + + if (lines.length < 2) return 0; + + const headers = this.parseCsvLine(lines[0]); + console.log(`🧪 Test headers (${headers.length}):`, headers); + + let recordCount = 0; + for (let i = 1; i < lines.length; i++) { + const values = this.parseCsvLine(lines[i]); + console.log(`🧪 Line ${i} values (${values.length}):`, values.slice(0, 3)); + + if (values.length >= headers.length) { + recordCount++; + const record = {}; + headers.forEach((header, index) => { + const cleanHeader = header.replace(/"/g, '').trim(); + const value = values[index] ? values[index].replace(/"/g, '').trim() : ''; + record[cleanHeader] = value === '' ? null : value; + }); + console.log(`🧪 Test record ${recordCount}:`, JSON.stringify(record, null, 2)); + } + } + + return recordCount; + } catch (error) { + console.log('🧪 Test failed:', error.message); + return 0; + } + } + + /** + * Try multiple CSV parsing approaches to handle different formats + * @param {string} csvContent - CSV content + * @returns {Promise} Parsed records + */ + async tryMultipleCsvParsers(csvContent) { + // Method 1: Manual parsing for this specific Salesforce format + try { + console.log('🔄 Trying Method 1: Manual parsing for Salesforce format...'); + + // Clean the CSV content and split by actual newlines + const cleanContent = csvContent.trim(); + const lines = cleanContent.split(/\r?\n|\r/).map(line => line.trim()).filter(line => line !== ''); + + console.log(`📊 Lines after splitting: ${lines.length}`); + console.log('📋 First few lines:'); + lines.slice(0, 8).forEach((line, index) => { + console.log(`${index + 1}: "${line}"`); + console.log(` Length: ${line.length}, ends with \\r: ${line.endsWith('\r')}, character codes at end: [${line.slice(-3).split('').map(c => c.charCodeAt(0)).join(',')}]`); + }); + + if (lines.length < 2) { + throw new Error('CSV must have at least header + 1 data row'); + } + + // Parse header line + const headerLine = lines[0]; + const headers = this.parseCsvLine(headerLine); + console.log('📋 Parsed headers:', headers); + + // Parse data rows + const records = []; + for (let i = 1; i < lines.length; i++) { + const line = lines[i]; + if (line.trim() === '') continue; // Skip empty lines + + const values = this.parseCsvLine(line); + console.log(`🔍 Line ${i}: "${line.substring(0, 100)}${line.length > 100 ? '...' : ''}"`); + console.log(` Line length: ${line.length}, Values parsed: ${values.length}, Expected: ${headers.length}`); + + // Show all values for debugging + console.log(` All values: [${values.map((v, idx) => `${idx}:"${v}"`).join(', ')}]`); + + if (values.length >= headers.length) { + const record = {}; + headers.forEach((header, index) => { + const cleanHeader = header.replace(/"/g, '').trim(); // Remove quotes from headers + const value = values[index] ? values[index].replace(/"/g, '').trim() : ''; // Remove quotes from values + record[cleanHeader] = value === '' ? null : value; + }); + + console.log(`✅ ACCEPTED Line ${i} -> Record ${records.length + 1}`); + console.log(` Record keys: [${Object.keys(record).join(', ')}]`); + console.log(` Sample values: Id="${record.Id}", Name="${record.Name}", Industry="${record.Industry}"`); + + records.push(record); + } else { + console.error(`❌ REJECTED Line ${i}: has ${values.length} values, expected ${headers.length}`); + console.error(` Line content: "${line}"`); + console.error(` Parsed values: [${values.map(v => `"${v}"`).join(', ')}]`); + } + } + + console.log(`✅ Method 1 (Manual) result: ${records.length} records parsed successfully`); + console.log(`📊 Processing summary: ${lines.length - 1} lines processed, ${records.length} records created`); + + if (records.length > 0) { + console.log(`🎯 Returning ${records.length} records for database insertion`); + return records; + } else { + console.log('⚠️ No records were successfully parsed, trying next method...'); + } + + } catch (error) { + console.log('❌ Method 1 (Manual) failed:', error.message); + } + + // Method 2: Standard csv-parser with stream approach + try { + console.log('🔄 Trying Method 2: Standard csv-parser...'); + + const csv = require('csv-parser'); + const { Readable } = require('stream'); + + const records = await new Promise((resolve, reject) => { + const results = []; + + const readable = Readable.from(csvContent); + + readable + .pipe(csv({ + mapHeaders: ({ header }) => header.replace(/"/g, '').trim(), + strict: false, + skipEmptyLines: true + })) + .on('data', (row) => { + results.push(row); + }) + .on('end', () => { + console.log(`Method 2 result: ${results.length} records`); + resolve(results); + }) + .on('error', reject); + }); + + if (records.length > 0) { + return records; + } + + } catch (error) { + console.log('❌ Method 2 failed:', error.message); + } + + // If all methods fail, throw error + throw new Error('Unable to parse CSV with any method. CSV format might be invalid.'); + } + + /** + * Parse a single CSV line handling quotes and commas + * Handles the Salesforce CSV format with quoted fields + * @param {string} line - CSV line + * @returns {Array} Parsed values + */ + parseCsvLine(line) { + const result = []; + let current = ''; + let inQuotes = false; + + for (let i = 0; i < line.length; i++) { + const char = line[i]; + + if (char === '"') { + if (inQuotes && line[i + 1] === '"') { + // Escaped quote (double quotes) + current += '"'; + i++; // Skip next quote + } else { + // Toggle quote state + inQuotes = !inQuotes; + } + } else if (char === ',' && !inQuotes) { + // End of field - push current value + result.push(current); + current = ''; + } else { + current += char; + } + } + + // Add the last field + result.push(current); + + // Clean up the values - remove surrounding quotes and trim + return result.map(value => { + if (value.startsWith('"') && value.endsWith('"')) { + return value.slice(1, -1); // Remove surrounding quotes + } + return value.trim(); + }); + } + + /** + * Format datetime for MySQL + * Converts ISO 8601 format to MySQL datetime format + * @param {string|Date} dateValue - Date value from provider + * @returns {string|null} MySQL-compatible datetime string or null + */ + formatDateForMySQL(dateValue) { + if (!dateValue) { + return null; + } + + try { + const date = new Date(dateValue); + + // Check if date is valid + if (isNaN(date.getTime())) { + return null; + } + + // Format as YYYY-MM-DD HH:MM:SS for MySQL + const year = date.getFullYear(); + const month = String(date.getMonth() + 1).padStart(2, '0'); + const day = String(date.getDate()).padStart(2, '0'); + const hours = String(date.getHours()).padStart(2, '0'); + const minutes = String(date.getMinutes()).padStart(2, '0'); + const seconds = String(date.getSeconds()).padStart(2, '0'); + + return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`; + } catch (error) { + console.error('Error formatting date:', dateValue, error.message); + return null; + } + } + + /** + * Map record fields from provider format to standardized format + * @param {string} module - Module name + * @param {Object} record - Raw record from provider + * @returns {Object} Mapped fields + */ + mapRecordFields(module, record) { + // Common field mappings + const commonMappings = { + first_name: record.FirstName || record.first_name || record.First_Name, + last_name: record.LastName || record.last_name || record.Last_Name, + email: record.Email || record.email, + phone: record.Phone || record.phone, + mobile: record.Mobile || record.mobile || record.MobilePhone, + company: record.Company || record.company || record.Account?.Name, + title: record.Title || record.title, + owner: record.Owner?.Name || record.owner || record.OwnerId, + description: record.Description || record.description + }; + + // Module-specific mappings + const moduleMappings = { + contacts: { + ...commonMappings, + account_name: record.Account?.Name || record.account_name || record.Account_Name, + lead_source: record.LeadSource || record.lead_source || record.Lead_Source, + department: record.Department || record.department + }, + leads: { + ...commonMappings, + lead_source: record.LeadSource || record.lead_source || record.Lead_Source, + lead_status: record.Status || record.lead_status || record.Lead_Status + }, + accounts: { + account_name: record.Name || record.account_name || record.Account_Name, + account_type: record.Type || record.account_type, + phone: record.Phone || record.phone, + website: record.Website || record.website, + industry: record.Industry || record.industry, + annual_revenue: record.AnnualRevenue || record.annual_revenue, + num_employees: record.NumberOfEmployees || record.num_employees, + owner: record.Owner?.Name || record.OwnerId || record.owner, + city: record.BillingCity || record.city || record.City, + state: record.BillingState || record.state || record.State, + country: record.BillingCountry || record.country || record.Country, + description: record.Description || record.description + }, + deals: { + deal_name: record.Name || record.deal_name || record.Deal_Name, + amount: record.Amount || record.amount, + stage: record.StageName || record.Stage || record.stage, + close_date: this.formatDateForMySQL(record.CloseDate || record.close_date || record.Closing_Date), + probability: record.Probability || record.probability, + account_name: record.Account?.Name || record.account_name + }, + opportunities: { + deal_name: record.Name || record.deal_name, + amount: record.Amount || record.amount, + stage: record.StageName || record.Stage || record.stage, + close_date: this.formatDateForMySQL(record.CloseDate || record.close_date), + probability: record.Probability || record.probability + }, + tasks: { + subject: record.Subject || record.subject, + status: record.Status || record.status, + priority: record.Priority || record.priority, + due_date: this.formatDateForMySQL(record.ActivityDate || record.due_date || record.Due_Date), + owner: record.Owner?.Name || record.owner + }, + events: { + subject: record.Subject || record.subject, + start_date_time: this.formatDateForMySQL(record.StartDateTime || record.start_date_time), + end_date_time: this.formatDateForMySQL(record.EndDateTime || record.end_date_time), + location: record.Location || record.location, + is_all_day_event: record.IsAllDayEvent || record.is_all_day_event + } + }; + + return moduleMappings[module] || commonMappings; + } + + /** + * Get the appropriate base URL for API calls + * @param {string} provider - Provider name + * @param {Object} tokenRecord - Token record from database + * @returns {string} Base URL to use for API calls + */ + getApiBaseUrl(provider, tokenRecord) { + const defaultBaseUrl = this.providerConfigs[provider].baseUrl; + + switch (provider) { + case 'salesforce': + // For Salesforce, use instance URL if available, otherwise fall back to default + if (tokenRecord && tokenRecord.instanceUrl) { + console.log(`🔗 Using Salesforce instance URL: ${tokenRecord.instanceUrl}`); + return tokenRecord.instanceUrl; + } + console.warn('⚠️ No Salesforce instance URL found, using default auth URL'); + return defaultBaseUrl; + + case 'zoho': + // Zoho might also have different regional URLs in the future + if (tokenRecord && tokenRecord.instance_url) { + return tokenRecord.instance_url; + } + return defaultBaseUrl; + + default: + return defaultBaseUrl; + } + } + + /** + * Prepare bulk read request based on provider + * @param {string} provider - Provider name + * @param {string} service - Service name + * @param {string} module - Module name + * @param {Array} fields - Fields to fetch + * @param {Object} options - Additional options + * @param {string} callbackUrl - Webhook callback URL + * @returns {Object} Request data + */ + prepareBulkReadRequest(provider, service, module, fields, options, callbackUrl) { + switch (provider) { + case 'zoho': + return { callback: { url: callbackUrl, method: 'post' @@ -52,74 +936,128 @@ class BulkReadService { } }; - console.log('📋 Bulk read request:', JSON.stringify(bulkReadData, null, 2)); + case 'salesforce': + // Convert module name to Salesforce object name + const salesforceObject = this.getSalesforceObjectName(module); + return { + operation: 'query', + query: `SELECT ${fields.join(',')} FROM ${salesforceObject}` + }; - // Make API call to Zoho - const response = await axios.post( - `${this.baseUrl}/crm/bulk/v2/read`, - bulkReadData, - { - headers: { - 'Authorization': `Zoho-oauthtoken ${zohoAccessToken}`, - 'Content-Type': 'application/json' + case 'hubspot': + return { + properties: fields, + limit: options.limit || 100, + after: options.after || 0 + }; + + default: + return { + module: module, + fields: fields, + options: options, + callback: callbackUrl + }; + } + } + + /** + * Convert module name to Salesforce object name + * @param {string} module - Lowercase module name + * @returns {string} Salesforce object name + */ + getSalesforceObjectName(module) { + const objectMap = { + 'contacts': 'Contact', + 'leads': 'Lead', + 'accounts': 'Account', + 'deals': 'Opportunity', + 'tasks': 'Task', + 'events': 'Event', + 'campaigns': 'Campaign', + 'cases': 'Case', + 'opportunities': 'Opportunity' + }; + + return objectMap[module.toLowerCase()] || module; + } + + /** + * Get authentication headers for provider + * @param {string} provider - Provider name + * @param {string} accessToken - Access token + * @returns {Object} Headers + */ + getAuthHeaders(provider, accessToken) { + switch (provider) { + case 'zoho': + return { + 'Authorization': `Zoho-oauthtoken ${accessToken}`, + 'Content-Type': 'application/json' + }; + + case 'salesforce': + return { + 'Authorization': `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + 'Sforce-Call-Options': 'client=RestForce' + }; + + case 'hubspot': + return { + 'Authorization': `Bearer ${accessToken}`, + 'Content-Type': 'application/json' + }; + + default: + return { + 'Authorization': `Bearer ${accessToken}`, + 'Content-Type': 'application/json' + }; + } + } + + /** + * Extract job information from provider response + * @param {string} provider - Provider name + * @param {Object} responseData - API response data + * @returns {Object} Job ID and details + */ + extractJobInfo(provider, responseData) { + switch (provider) { + case 'zoho': + const jobDetails = responseData.data && responseData.data[0] && responseData.data[0].details; + return { + jobId: jobDetails?.id, + jobDetails: jobDetails || {} + }; + + case 'salesforce': + return { + jobId: responseData.id, + jobDetails: { + operation: responseData.operation, + state: responseData.state, + created_by: responseData.createdBy, + created_time: responseData.createdDate } - } - ); - - const jobData = response.data; - console.log('✅ Bulk read job response from Zoho:', JSON.stringify(jobData, null, 2)); - - // Extract job ID from nested response structure - const jobDetails = jobData.data && jobData.data[0] && jobData.data[0].details; - const jobId = jobDetails?.id; - - if (!jobId) { - console.log('⚠️ No job ID found in response structure'); - console.log('Response structure:', JSON.stringify(jobData, null, 2)); - throw new Error('Invalid response structure from Zoho API - no job ID found'); - } - - console.log('📋 Job ID received from Zoho:', jobId); - console.log('📋 Job details:', jobDetails); - - // Store job in database with real job ID - const jobRecord = await ZohoBulkReadRepository.createBulkReadJob({ - id: jobId, - user_uuid: userId, - module: module, - operation: jobDetails.operation || 'read', - state: jobDetails.state || 'ADDED', - file_type: 'csv', - records_count: 0, - status: 'pending' - }); - - logger.info('Bulk read job created and stored', { - userId, - module, - jobId: jobId, - zohoState: jobDetails.state || 'ADDED' - }); + }; + case 'hubspot': return { - jobId: jobId, - status: 'created', - message: `Bulk read job created for ${module}. Processing will be handled asynchronously via webhook.`, - zohoState: jobDetails.state || 'ADDED', - operation: jobDetails.operation || 'read', - createdBy: jobDetails.created_by, - createdTime: jobDetails.created_time, - estimatedTime: this.getEstimatedTime(module, options.limit) - }; + jobId: `hubspot_${Date.now()}`, + jobDetails: { + operation: 'export', + state: 'QUEUED', + created_time: new Date().toISOString() + } + }; - } catch (error) { - console.error('❌ Error initiating bulk read:', error.message); - logger.error('Bulk read initiation failed', { - userId, - module, - error: error.message - }); - throw error; + default: + return { + jobId: responseData.id || `${provider}_${Date.now()}`, + jobDetails: responseData + }; } } @@ -131,7 +1069,7 @@ class BulkReadService { */ async getBulkReadJobStatus(userId, jobId) { try { - const job = await ZohoBulkReadRepository.getBulkReadJob(jobId); + const job = await BulkReadRepository.getBulkReadJob(jobId); if (!job) { throw new Error('Job not found'); @@ -143,6 +1081,8 @@ class BulkReadService { return { jobId: job.id, + provider: job.provider, + service: job.service, module: job.module, status: job.status, state: job.state, @@ -150,7 +1090,9 @@ class BulkReadService { processedCount: job.processed_count, createdAt: job.created_at, updatedAt: job.updated_at, - errorMessage: job.error_message + errorMessage: job.error_message, + requestParams: job.request_params, + responseMeta: job.response_meta }; } catch (error) { @@ -167,10 +1109,12 @@ class BulkReadService { */ async getUserBulkReadJobs(userId, options = {}) { try { - const jobs = await ZohoBulkReadRepository.getUserBulkReadJobs(userId, options); + const jobs = await BulkReadRepository.getUserBulkReadJobs(userId, options); return jobs.map(job => ({ jobId: job.id, + provider: job.provider, + service: job.service, module: job.module, status: job.status, state: job.state, @@ -193,18 +1137,18 @@ class BulkReadService { * @param {Object} options - Query options * @returns {Promise} Data and pagination info */ - async getBulkReadData(userId, module, options = {}) { + async getBulkReadData(userId, provider, service, module, options = {}) { try { const { page = 1, limit = 100, orderBy = 'created_time', orderDirection = 'DESC' } = options; - const data = await ZohoBulkReadRepository.getUserData(userId, module, { + const data = await BulkReadRepository.getUserData(userId, provider, service, module, { limit: parseInt(limit), offset: (parseInt(page) - 1) * parseInt(limit), orderBy, orderDirection }); - const totalCount = await ZohoBulkReadRepository.getUserDataCount(userId, module); + const totalCount = await BulkReadRepository.getUserDataCount(userId, provider, service, module); return { data: data, @@ -248,93 +1192,201 @@ class BulkReadService { } /** - * Get available modules for bulk read - * @returns {Array} Available modules + * Get available providers + * @returns {Object} Available providers with their services and modules */ - getAvailableModules() { - return [ - { + getAvailableProviders() { + return { + zoho: { + name: 'Zoho', + services: { + crm: { + name: 'Zoho CRM', + modules: ['contacts', 'leads', 'accounts', 'deals', 'tasks', 'vendors'] + }, + books: { + name: 'Zoho Books', + modules: ['invoices', 'sales_orders', 'purchase_orders', 'customers', 'vendors'] + }, + inventory: { + name: 'Zoho Inventory', + modules: ['products', 'customers', 'vendors', 'sales_orders', 'purchase_orders'] + } + } + }, + salesforce: { + name: 'Salesforce', + services: { + crm: { + name: 'Salesforce CRM', + modules: ['contacts', 'leads', 'accounts', 'deals', 'opportunities', 'tasks', 'events', 'campaigns', 'cases'] + } + } + }, + hubspot: { + name: 'HubSpot', + services: { + crm: { + name: 'HubSpot CRM', + modules: ['contacts', 'leads', 'accounts', 'deals'] + } + } + }, + keka: { + name: 'Keka', + services: { + hr: { + name: 'Keka HR', + modules: ['employees'] + } + } + }, + bamboohr: { + name: 'BambooHR', + services: { + hr: { + name: 'BambooHR', + modules: ['employees'] + } + } + }, + intuit: { + name: 'Intuit', + services: { + accounting: { + name: 'QuickBooks', + modules: ['customers', 'vendors', 'invoices', 'products'] + } + } + }, + quickbooks: { + name: 'QuickBooks', + services: { + accounting: { + name: 'QuickBooks', + modules: ['customers', 'vendors', 'invoices', 'products'] + } + } + } + }; + } + + /** + * Get available modules for a specific provider and service + * @param {string} provider - Provider name + * @param {string} service - Service name + * @returns {Array} Available modules with their field configurations + */ + getAvailableModules(provider, service) { + const moduleConfigs = { + contacts: { name: 'contacts', displayName: 'Contacts', description: 'Customer contact information', - fields: [ - 'id', 'First_Name', 'Last_Name', 'Email', 'Phone', 'Mobile', - 'Lead_Source', 'Account_Name.Account_Name', 'Owner', 'Created_Time', 'Modified_Time' - ] + fields: ['id', 'first_name', 'last_name', 'email', 'phone', 'mobile', 'company', 'title', 'address', 'created_time', 'modified_time'] }, - { + leads: { name: 'leads', displayName: 'Leads', description: 'Sales lead information', - fields: [ - 'id', 'First_Name', 'Last_Name', 'Company', 'Lead_Source', - 'Lead_Status', 'Owner', 'Email', 'Phone', 'Created_Time' - ] + fields: ['id', 'first_name', 'last_name', 'company', 'lead_source', 'lead_status', 'email', 'phone', 'created_time', 'modified_time'] }, - { + accounts: { name: 'accounts', displayName: 'Accounts', description: 'Account information', - fields: [ - 'id', 'Account_Name', 'Phone', 'Website', 'Industry', - 'Ownership', 'Annual_Revenue', 'Owner', 'Created_Time' - ] + fields: ['id', 'account_name', 'phone', 'website', 'industry', 'annual_revenue', 'num_employees', 'created_time', 'modified_time'] }, - { + deals: { + name: 'deals', + displayName: 'Deals/Opportunities', + description: 'Sales deal information', + fields: ['id', 'deal_name', 'stage', 'amount', 'close_date', 'probability', 'account_name', 'contact_name', 'created_time', 'modified_time'] + }, + tasks: { name: 'tasks', displayName: 'Tasks', description: 'Task information', - fields: [ - 'id', 'Subject', 'Owner', 'Status', 'Priority', - 'Due_Date', 'What_Id', 'Created_Time' - ] + fields: ['id', 'subject', 'status', 'priority', 'due_date', 'owner', 'related_to', 'created_time', 'modified_time'] }, - { - name: 'deals', - displayName: 'Deals', - description: 'Sales deal information', - fields: [ - 'id', 'Deal_Name', 'Stage', 'Amount', 'Closing_Date', - 'Account_Name', 'Contact_Name', 'Pipeline', 'Probability', - 'Lead_Source', 'Owner', 'Created_Time', 'Modified_Time' - ] - }, - { + vendors: { name: 'vendors', - displayName: 'Vendors', + displayName: 'Vendors/Suppliers', description: 'Vendor information', - fields: [ - 'id', 'Vendor_Name', 'Email', 'Phone', 'Website', 'Owner', 'Created_Time' - ] + fields: ['id', 'vendor_name', 'email', 'phone', 'website', 'contact_person', 'payment_terms', 'created_time', 'modified_time'] }, - { + invoices: { name: 'invoices', displayName: 'Invoices', description: 'Invoice information', - fields: [ - 'id', 'Invoice_Number', 'Invoice_Date', 'Due_Date', 'Status', - 'Grand_Total', 'Account_Name.Account_Name', 'Owner', 'Created_Time' - ] + fields: ['id', 'invoice_number', 'customer_name', 'invoice_date', 'due_date', 'status', 'total_amount', 'created_time', 'modified_time'] }, - { + sales_orders: { name: 'sales_orders', displayName: 'Sales Orders', description: 'Sales order information', - fields: [ - 'id', 'Subject', 'Status', 'Due_Date', 'Grand_Total', - 'Account_Name.Account_Name', 'Owner', 'Created_Time' - ] + fields: ['id', 'so_number', 'customer_name', 'order_date', 'status', 'total_amount', 'created_time', 'modified_time'] }, - { + purchase_orders: { name: 'purchase_orders', displayName: 'Purchase Orders', description: 'Purchase order information', - fields: [ - 'id', 'Subject', 'Vendor_Name.Vendor_Name', 'Status', - 'Due_Date', 'Grand_Total', 'Owner', 'Created_Time' - ] + fields: ['id', 'po_number', 'vendor_name', 'order_date', 'status', 'total_amount', 'created_time', 'modified_time'] + }, + employees: { + name: 'employees', + displayName: 'Employees', + description: 'Employee information', + fields: ['id', 'employee_id', 'first_name', 'last_name', 'email', 'job_title', 'department', 'hire_date', 'employment_status', 'created_time', 'modified_time'] + }, + products: { + name: 'products', + displayName: 'Products', + description: 'Product information', + fields: ['id', 'product_name', 'product_code', 'sku', 'category', 'selling_price', 'cost_price', 'stock_quantity', 'created_time', 'modified_time'] + }, + customers: { + name: 'customers', + displayName: 'Customers', + description: 'Customer information', + fields: ['id', 'customer_name', 'email', 'phone', 'address', 'payment_terms', 'credit_limit', 'created_time', 'modified_time'] + }, + events: { + name: 'events', + displayName: 'Events', + description: 'Event/Calendar information', + fields: ['id', 'subject', 'start_date_time', 'end_date_time', 'location', 'description', 'owner_id', 'what_id', 'who_id', 'is_all_day_event', 'created_date'] + }, + campaigns: { + name: 'campaigns', + displayName: 'Campaigns', + description: 'Marketing campaign information', + fields: ['id', 'name', 'type', 'status', 'start_date', 'end_date', 'expected_revenue', 'budgeted_cost', 'actual_cost', 'num_sent', 'created_date'] + }, + cases: { + name: 'cases', + displayName: 'Cases', + description: 'Customer service case information', + fields: ['id', 'case_number', 'subject', 'status', 'priority', 'origin', 'type', 'account_name', 'contact_name', 'owner', 'created_date'] + }, + opportunities: { + name: 'opportunities', + displayName: 'Opportunities', + description: 'Sales opportunity information', + fields: ['id', 'name', 'stage_name', 'amount', 'close_date', 'probability', 'account_name', 'owner', 'created_date', 'modified_date'] } - ]; + }; + + const providers = this.getAvailableProviders(); + const providerConfig = providers[provider]; + + if (!providerConfig || !providerConfig.services[service]) { + return []; + } + + const availableModules = providerConfig.services[service].modules; + + return availableModules.map(module => moduleConfigs[module]).filter(Boolean); } }