refresh mechanism added for salesforce
This commit is contained in:
parent
c8eedf73ad
commit
fcdf2d1ead
File diff suppressed because it is too large
Load Diff
@ -61,6 +61,7 @@ class N8nClient {
|
|||||||
service: payload.service,
|
service: payload.service,
|
||||||
module: payload.module,
|
module: payload.module,
|
||||||
error: error.message,
|
error: error.message,
|
||||||
|
statusCode: error.response?.status,
|
||||||
response: error.response?.data
|
response: error.response?.data
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,10 @@ const { decrypt, encrypt } = require('../../utils/crypto');
|
|||||||
const logger = require('../../utils/logger');
|
const logger = require('../../utils/logger');
|
||||||
const axios = require('axios');
|
const axios = require('axios');
|
||||||
|
|
||||||
|
// Token refresh lock to prevent concurrent refresh attempts
|
||||||
|
// Map structure: { 'userId-serviceName': Promise }
|
||||||
|
const tokenRefreshLocks = new Map();
|
||||||
|
|
||||||
class N8nHandler {
|
class N8nHandler {
|
||||||
constructor(userId) {
|
constructor(userId) {
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
@ -31,10 +35,47 @@ class N8nHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Refresh Zoho access token
|
* Refresh Zoho access token with locking to prevent concurrent refreshes
|
||||||
* @returns {Promise<string>} New access token
|
* @returns {Promise<string>} New access token
|
||||||
*/
|
*/
|
||||||
async refreshZohoToken() {
|
async refreshZohoToken() {
|
||||||
|
const lockKey = `${this.userId}-zoho`;
|
||||||
|
|
||||||
|
// Check if there's already a refresh in progress
|
||||||
|
if (tokenRefreshLocks.has(lockKey)) {
|
||||||
|
logger.info('Token refresh already in progress, waiting for completion', { userId: this.userId, service: 'zoho' });
|
||||||
|
|
||||||
|
// Wait for the existing refresh to complete
|
||||||
|
try {
|
||||||
|
const newToken = await tokenRefreshLocks.get(lockKey);
|
||||||
|
return newToken;
|
||||||
|
} catch (error) {
|
||||||
|
// If the concurrent refresh failed, try again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new refresh promise
|
||||||
|
const refreshPromise = this._doRefreshZohoToken();
|
||||||
|
|
||||||
|
// Store it in the lock map
|
||||||
|
tokenRefreshLocks.set(lockKey, refreshPromise);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const newToken = await refreshPromise;
|
||||||
|
return newToken;
|
||||||
|
} finally {
|
||||||
|
// Clean up the lock after a short delay
|
||||||
|
setTimeout(() => {
|
||||||
|
tokenRefreshLocks.delete(lockKey);
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to actually refresh the Zoho token
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
async _doRefreshZohoToken() {
|
||||||
const { refreshToken } = await this.getServiceTokens('zoho');
|
const { refreshToken } = await this.getServiceTokens('zoho');
|
||||||
|
|
||||||
if (!refreshToken) {
|
if (!refreshToken) {
|
||||||
@ -80,10 +121,47 @@ class N8nHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Refresh Salesforce access token
|
* Refresh Salesforce access token with locking to prevent concurrent refreshes
|
||||||
* @returns {Promise<string>} New access token
|
* @returns {Promise<string>} New access token
|
||||||
*/
|
*/
|
||||||
async refreshSalesforceToken() {
|
async refreshSalesforceToken() {
|
||||||
|
const lockKey = `${this.userId}-salesforce`;
|
||||||
|
|
||||||
|
// Check if there's already a refresh in progress
|
||||||
|
if (tokenRefreshLocks.has(lockKey)) {
|
||||||
|
logger.info('Token refresh already in progress, waiting for completion', { userId: this.userId, service: 'salesforce' });
|
||||||
|
|
||||||
|
// Wait for the existing refresh to complete
|
||||||
|
try {
|
||||||
|
const newToken = await tokenRefreshLocks.get(lockKey);
|
||||||
|
return newToken;
|
||||||
|
} catch (error) {
|
||||||
|
// If the concurrent refresh failed, try again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new refresh promise
|
||||||
|
const refreshPromise = this._doRefreshSalesforceToken();
|
||||||
|
|
||||||
|
// Store it in the lock map
|
||||||
|
tokenRefreshLocks.set(lockKey, refreshPromise);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const newToken = await refreshPromise;
|
||||||
|
return newToken;
|
||||||
|
} finally {
|
||||||
|
// Clean up the lock after a short delay to allow concurrent requests to use the new token
|
||||||
|
setTimeout(() => {
|
||||||
|
tokenRefreshLocks.delete(lockKey);
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to actually refresh the Salesforce token
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
async _doRefreshSalesforceToken() {
|
||||||
const { refreshToken, instanceUrl } = await this.getServiceTokens('salesforce');
|
const { refreshToken, instanceUrl } = await this.getServiceTokens('salesforce');
|
||||||
|
|
||||||
if (!refreshToken) {
|
if (!refreshToken) {
|
||||||
@ -100,6 +178,7 @@ class N8nHandler {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const tokenUrl = process.env.SALESFORCE_INSTANCE_URL || 'https://login.salesforce.com';
|
const tokenUrl = process.env.SALESFORCE_INSTANCE_URL || 'https://login.salesforce.com';
|
||||||
|
|
||||||
const response = await axios.post(`${tokenUrl}/services/oauth2/token`, params.toString(), {
|
const response = await axios.post(`${tokenUrl}/services/oauth2/token`, params.toString(), {
|
||||||
headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
|
headers: { 'Content-Type': 'application/x-www-form-urlencoded' }
|
||||||
});
|
});
|
||||||
@ -247,13 +326,35 @@ class N8nHandler {
|
|||||||
query
|
query
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Check if the result contains an error (n8n returns 200 but with error in body)
|
||||||
|
if (result && (result.status === 401 || result.status === '401')) {
|
||||||
|
const error = new Error(result.message || 'Token expired');
|
||||||
|
error.response = { status: 401, data: result };
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
return this.normalizeResponse(result, 'salesforce');
|
return this.normalizeResponse(result, 'salesforce');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
// Check if error indicates token expiration
|
||||||
|
const isTokenError =
|
||||||
|
error.response?.status === 401 ||
|
||||||
|
error.message?.includes('401') ||
|
||||||
|
error.message?.includes('INVALID_SESSION_ID') ||
|
||||||
|
error.message?.includes('Session expired') ||
|
||||||
|
error.message?.includes('unauthorized');
|
||||||
|
|
||||||
// If unauthorized (401), try refreshing token once
|
// If unauthorized (401), try refreshing token once
|
||||||
if (error.message.includes('401') || error.message.includes('unauthorized')) {
|
if (isTokenError) {
|
||||||
logger.info('Received 401 error, attempting token refresh', { userId: this.userId });
|
logger.info('Received token expiration error, attempting token refresh', {
|
||||||
|
userId: this.userId,
|
||||||
|
errorMessage: error.message,
|
||||||
|
errorStatus: error.response?.status
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Refresh the Salesforce token
|
||||||
const newAccessToken = await this.refreshSalesforceToken();
|
const newAccessToken = await this.refreshSalesforceToken();
|
||||||
|
|
||||||
const { instanceUrl } = await this.getServiceTokens('salesforce');
|
const { instanceUrl } = await this.getServiceTokens('salesforce');
|
||||||
const query = {
|
const query = {
|
||||||
instance_url: instanceUrl,
|
instance_url: instanceUrl,
|
||||||
@ -262,7 +363,9 @@ class N8nHandler {
|
|||||||
nextRecordsUrl: options.nextRecordsUrl || null,
|
nextRecordsUrl: options.nextRecordsUrl || null,
|
||||||
...options.filters
|
...options.filters
|
||||||
};
|
};
|
||||||
|
|
||||||
const result = await this.client.fetchSalesforceData(service, module, newAccessToken, instanceUrl, query);
|
const result = await this.client.fetchSalesforceData(service, module, newAccessToken, instanceUrl, query);
|
||||||
|
|
||||||
return this.normalizeResponse(result, 'salesforce');
|
return this.normalizeResponse(result, 'salesforce');
|
||||||
} catch (retryError) {
|
} catch (retryError) {
|
||||||
logger.error('Failed to fetch Salesforce data after token refresh', {
|
logger.error('Failed to fetch Salesforce data after token refresh', {
|
||||||
|
|||||||
@ -14,7 +14,7 @@ function getCorrelationId(req) {
|
|||||||
function log(level, message, meta = {}) {
|
function log(level, message, meta = {}) {
|
||||||
const timestamp = new Date().toISOString();
|
const timestamp = new Date().toISOString();
|
||||||
const payload = { level, message, timestamp, ...meta };
|
const payload = { level, message, timestamp, ...meta };
|
||||||
// eslint-disable-next-line no-console
|
// eslint-disable-next-line no-consoles
|
||||||
console.log(JSON.stringify(payload));
|
console.log(JSON.stringify(payload));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user