# n8n Bulk Read Integration - Updated Flow ## Overview The bulk read process is split between **n8n** (job orchestration) and **backend** (data processing): - **n8n**: Initiates bulk jobs with providers and monitors completion - **Backend**: Downloads CSV files, parses data, and stores in MySQL --- ## Complete Flow Diagram ``` ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │ Backend │────▶│ n8n │────▶│ Provider │ │ (Initiate) │ │ (Monitor) │ │ (Salesforce/ │ │ │ │ │ │ Zoho) │ └─────────────┘ └─────────────┘ └──────────────┘ ▲ │ │ │ │ │ │ (Job Complete) (Job Ready) │ │ │ │ ▼ │ │ ┌─────────────┐ │ └────────────│ Callback │ │ │ to Backend │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Backend │◀─────────────┘ │ Downloads │ (Download CSV) │ & Parses │ │ CSV File │ └─────────────┘ │ ▼ ┌─────────────┐ │ MySQL │ │ Tables │ └─────────────┘ ``` --- ## Step-by-Step Process ### 1. Backend → n8n (Initiate Job) **Backend sends request to n8n:** ```bash POST https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://yourorg.my.salesforce.com Content-Type: application/json { "provider": "salesforce", "service": "crm", "module": "contacts", "fields": ["Id", "FirstName", "LastName", "Email", "Phone"], "provider_access_token": "00D5g000008XXXX!AQEAQXXX...", "backend_access_token": "backend_jwt_token_here", "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback", "job_id": "salesforce_contacts_1698765432_abc123", "user_id": "550e8400-e29b-41d4-a716-446655440000", "options": {} } ``` **Query Parameters:** - `instance_url` - **Required for Salesforce** (user's instance URL) **Body Fields:** | Field | Type | Description | |-------|------|-------------| | `provider` | string | Provider name (salesforce, zoho, etc.) | | `service` | string | Service type (crm, books, etc.) | | `module` | string | Module name (contacts, leads, etc.) | | `fields` | array | Fields to fetch | | `provider_access_token` | string | **Decrypted** provider token for API calls | | `backend_access_token` | string | Backend JWT for callback authentication | | `callback_url` | string | Backend webhook URL (without access_token in URL) | | `job_id` | string | Unique job identifier | | `user_id` | string | User UUID | | `options` | object | Additional options | --- ### 2. n8n Processing **What n8n should do:** #### For Salesforce: ```javascript // 1. Extract data from webhook const { provider, service, module, fields, provider_access_token, callback_url, job_id } = $input.item.json; const instance_url = $input.item.query.instance_url; // 2. Create Salesforce bulk query job const bulkJobResponse = await axios.post( `${instance_url}/services/data/v57.0/jobs/query`, { operation: 'query', query: `SELECT ${fields.join(',')} FROM Contact` }, { headers: { 'Authorization': `Bearer ${provider_access_token}`, 'Content-Type': 'application/json' } } ); const salesforceJobId = bulkJobResponse.data.id; // 3. Poll for job completion let jobComplete = false; let jobState = ''; let downloadUrl = ''; while (!jobComplete) { const statusResponse = await axios.get( `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}`, { headers: { 'Authorization': `Bearer ${provider_access_token}` } } ); jobState = statusResponse.data.state; if (jobState === 'JobComplete') { jobComplete = true; downloadUrl = `${instance_url}/services/data/v57.0/jobs/query/${salesforceJobId}/results`; } else if (jobState === 'Failed' || jobState === 'Aborted') { // Send failure callback await axios.post(callback_url, { job_id: job_id, status: 'failed', provider: provider, service: service, module: module, provider_access_token: provider_access_token, error_message: statusResponse.data.errorMessage, metadata: { salesforce_job_id: salesforceJobId, state: jobState } }); return; } else { await sleep(5000); // Wait 5 seconds } } // 4. Send success callback to backend await axios.post(callback_url, { job_id: job_id, status: 'completed', provider: provider, service: service, module: module, provider_access_token: provider_access_token, metadata: { salesforce_job_id: salesforceJobId, state: jobState, download_url: downloadUrl, result_url: downloadUrl } }); ``` #### For Zoho: ```javascript // Similar flow but with Zoho bulk read API // Zoho returns a download URL for zipped CSV file ``` --- ### 3. n8n → Backend Callback **When job completes, n8n sends:** ```bash POST https://backend.com/api/v1/bulk-read/webhook/callback Content-Type: application/json { "job_id": "salesforce_contacts_1698765432_abc123", "status": "completed", "provider": "salesforce", "service": "crm", "module": "contacts", "provider_access_token": "00D5g000008XXXX!AQEAQXXX...", "backend_access_token": "backend_jwt_token_here", "metadata": { "salesforce_job_id": "7504x00000AbCdEf", "state": "JobComplete", "download_url": "https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results", "result_url": "https://yourorg.my.salesforce.com/services/data/v57.0/jobs/query/7504x00000AbCdEf/results" } } ``` **Required Fields:** | Field | Type | Required | Description | |-------|------|----------|-------------| | `job_id` | string | ✅ | Same job_id from initial request | | `status` | string | ✅ | `completed` or `failed` | | `provider` | string | ✅ | Provider name | | `service` | string | ✅ | Service name | | `module` | string | ✅ | Module name | | `provider_access_token` | string | ✅ | Provider token for downloading CSV | | `backend_access_token` | string | ❌ | Backend JWT (optional) | | `metadata` | object | ✅ | Must include `download_url` or `result_url` | | `error_message` | string | ❌ | Required if status is `failed` | **Metadata Object:** ```json { "salesforce_job_id": "7504x00000AbCdEf", "state": "JobComplete", "download_url": "https://...", "result_url": "https://...", "total_processing_time": "45 seconds" } ``` --- ### 4. Backend Processing **What backend does after receiving callback:** 1. **Updates job status** to `processing` 2. **Downloads CSV file** from `metadata.download_url` - For Salesforce: Plain CSV file - For Zoho: Zipped CSV file (extracts with JSZip) 3. **Parses CSV** to JSON records 4. **Maps fields** to standardized schema 5. **Inserts records** in batches of 1000 6. **Updates job status** to `completed` #### Download & Extract Flow: ```javascript // For Salesforce - Direct CSV download const response = await axios.get(downloadUrl, { headers: { 'Authorization': `Bearer ${provider_access_token}` }, responseType: 'text' }); const csvContent = response.data; // For Zoho - Download and extract zip const response = await axios.get(downloadUrl, { headers: { 'Authorization': `Zoho-oauthtoken ${provider_access_token}` }, responseType: 'arraybuffer' }); const JSZip = require('jszip'); const zip = await JSZip.loadAsync(response.data); const csvFileName = Object.keys(zip.files).find(name => name.endsWith('.csv')); const csvContent = await zip.files[csvFileName].async('text'); ``` #### CSV Parsing: ```javascript const csv = require('csv-parser'); const { Readable } = require('stream'); const records = []; await Readable.from(csvContent) .pipe(csv()) .on('data', (row) => records.push(row)) .on('end', () => { console.log(`Parsed ${records.length} records`); }); ``` #### Field Mapping Example: ```javascript // Salesforce CSV → Database { "Id": "0035g00000XXXXX", "FirstName": "John", "LastName": "Doe", "Email": "john@example.com" } // Mapped to: { "external_id": "0035g00000XXXXX", "user_uuid": "user-uuid", "provider": "salesforce", "service": "crm", "first_name": "John", "last_name": "Doe", "email": "john@example.com", "raw_data": { /* original record */ }, "bulk_job_id": "salesforce_contacts_1698765432_abc123" } ``` --- ## Error Scenarios ### Scenario 1: Job Failed in Provider **n8n sends:** ```json { "job_id": "salesforce_contacts_1698765432_abc123", "status": "failed", "provider": "salesforce", "service": "crm", "module": "contacts", "provider_access_token": "token_here", "error_message": "INVALID_SESSION_ID: Session expired or invalid", "metadata": { "salesforce_job_id": "7504x00000AbCdEf", "state": "Failed", "error_code": "INVALID_SESSION_ID" } } ``` **Backend action:** - Updates job status to `failed` - Stores error message - Returns success response to n8n --- ### Scenario 2: Download Failed **Backend handles:** - Catches download error - Updates job status to `failed` - Logs error details --- ### Scenario 3: CSV Parsing Failed **Backend handles:** - Catches parsing error - Updates job status to `failed` - Logs error with CSV sample --- ## Provider-Specific Details ### Salesforce **Download URL Format:** ``` https://{instance_url}/services/data/v57.0/jobs/query/{job_id}/results ``` **CSV Format:** ```csv Id,FirstName,LastName,Email,Phone 0035g00000XXXXX,John,Doe,john@example.com,+1234567890 0035g00000YYYYY,Jane,Smith,jane@example.com,+0987654321 ``` **Headers:** ``` Authorization: Bearer {access_token} Accept: text/csv ``` --- ### Zoho **Download URL Format:** ``` https://www.zohoapis.com/crm/bulk/v2/read/{job_id}/result ``` **Response:** - ZIP file containing CSV - CSV file name varies **Headers:** ``` Authorization: Zoho-oauthtoken {access_token} ``` **Extraction:** ```javascript const JSZip = require('jszip'); const zip = await JSZip.loadAsync(zipBuffer); const csvFile = zip.files['Contacts.csv']; const csvContent = await csvFile.async('text'); ``` --- ## Database Storage ### Job Status Flow: ``` pending → processing → completed ↓ failed ``` ### Tables Used: 1. **bulk_read_jobs** - Job tracking 2. **{module}_bulk** - Actual data (e.g., `contacts_bulk`, `leads_bulk`) --- ## Configuration ### Environment Variables: ```bash # n8n webhook URL N8N_BULK_READ_WEBHOOK_URL=https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3 # Backend callback base URL API_BASE_URL=https://your-backend.com ``` ### Required NPM Packages: ```json { "csv-parser": "^3.2.0", "jszip": "^3.10.1", "axios": "^1.11.0" } ``` Install: ```bash npm install csv-parser jszip ``` --- ## Testing ### Test n8n Workflow: ```bash curl -X POST 'https://workflows.tech4bizsolutions.com/webhook-test/48b613f6-1bb8-4e9c-b35a-a93748acddb3?instance_url=https://test.my.salesforce.com' \ -H 'Content-Type: application/json' \ -d '{ "provider": "salesforce", "service": "crm", "module": "contacts", "fields": ["Id", "FirstName", "LastName"], "provider_access_token": "test_token", "backend_access_token": "test_jwt", "callback_url": "https://backend.com/api/v1/bulk-read/webhook/callback", "job_id": "test_job_123", "user_id": "test_user_456" }' ``` ### Test Backend Callback: ```bash curl -X POST 'https://backend.com/api/v1/bulk-read/webhook/callback' \ -H 'Content-Type: application/json' \ -d '{ "job_id": "test_job_123", "status": "completed", "provider": "salesforce", "service": "crm", "module": "contacts", "provider_access_token": "test_token", "metadata": { "download_url": "https://test.my.salesforce.com/services/data/v57.0/jobs/query/750.../results" } }' ``` --- ## Summary ### n8n Responsibilities: ✅ Initiate bulk job with provider ✅ Poll for job completion ✅ Send callback when complete ❌ Download CSV (backend handles) ❌ Parse CSV (backend handles) ❌ Store data (backend handles) ### Backend Responsibilities: ✅ Send job request to n8n ✅ Receive callback from n8n ✅ Download CSV from provider ✅ Extract ZIP (for Zoho) ✅ Parse CSV to JSON ✅ Map fields ✅ Store in MySQL This separation keeps n8n lightweight and backend handles all data processing! 🎯