const db = require('../config/database'); const express = require('express'); const multer = require('multer'); const axios = require('axios'); const fs = require('fs'); const FormData = require('form-data'); // Ensure this is imported correctly const path = require('path') let queue = []; // Job queue let failedQueue = []; let isProcessing = false; // Flag to track processing state const CHECK_INTERVAL = 60000; // Interval to check status update (60 sec) const checkDocumentStatus = async (documentId) => { try { const [rows] = await db.query( 'SELECT processed_status, failed_page FROM documents WHERE id = ?', [documentId] ); if (rows && rows.processed_status) { const processed_status = rows.processed_status; const failed_page = rows.failed_page; return { processed_status, failed_page }; } else { return { processed_status: null, failed_page: null }; } } catch (error) { console.error(`Error checking document status: ${error.message}`); return { processed_status: null, failed_page: null }; } }; exports.repopulateQueueOnStartup = async () => { try { console.log("Checking documents on startup..."); // Query documents with 'Pending' or 'Failed' status const result = await db.query( 'SELECT id, file_url, hospital_id, failed_page FROM documents WHERE processed_status IN (?, ?)', ['Pending', 'Failed'] ); if (Array.isArray(result) && result.length > 0) { result.forEach(doc => { if (!doc.file_url || typeof doc.file_url !== "string" || doc.file_url.trim() === "") { console.warn(`⚠️ Skipping document ${doc.id}: Invalid or missing file_url`); return; // Skip documents with invalid file_url } queue.push({ file: { path: String(doc.file_url).trim(), // Ensure it's a valid string name: path.basename(doc.file_url) // Extract file name safely }, hospital_id: doc.hospital_id, documentId: doc.id, failed_page: doc.failed_page }); }); } // console.log("✅ Documents added to queue:", queue); // Start processing if the queue is not empty if (queue.length > 0 && !isProcessing) { processQueue(); } } catch (error) { console.error('Error repopulating queue on startup:', error.message); } }; // Function to process the queue const RETRY_DELAY = 5000; // 5 seconds const RETRY_LIMIT = 3; const retryMap = new Map(); // To track retry counts per document // Implementation Steps // Add new PDFs to the queue with Pending status. // Start processing only if no job is currently active. // Wait for Python API to update processed_status in the database. // Check database periodically for status change. // Once status is updated, process the next PDF. const processQueue = async () => { if (isProcessing) return; // If already processing, don't start again isProcessing = true; // Start the queue processing while (queue.length > 0 || failedQueue.length > 0) { // Check if there are jobs in the queue (either main or failed queue) if (queue.length === 0 && failedQueue.length === 0) { console.log("Queue is empty. Waiting for jobs..."); await new Promise(resolve => { const checkForNewJobs = setInterval(() => { if (queue.length > 0 || failedQueue.length > 0) { clearInterval(checkForNewJobs); resolve(); // Resume once a new job is added } }, 1000); // Check for new jobs every second }); } // Move jobs from failed queue to main queue if necessary if (queue.length === 0 && failedQueue.length > 0) { console.log("Switching to failed queue..."); queue.push(...failedQueue); failedQueue.length = 0; await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); // Delay before retrying } // If there are jobs, process the next one if (queue.length > 0) { console.log("the queue is :", queue); const job = queue.shift(); let filePath = path.resolve(__dirname, '..','..', 'uploads', 'documents', path.basename(job.file.path)); if (!fs.existsSync(filePath)) { console.error(`File not found: "${filePath}". Removing from queue.`); // Clean up retry tracking retryMap.delete(job.documentId); // Remove from queue queue = queue.filter(item => item.documentId !== job.documentId); // failedQueue = failedQueue.filter(item => item.documentId !== job.documentId); // const job = queue.shift(); continue; // Skip to next job } filePath = path.resolve(__dirname, '..','..', 'uploads', 'documents', path.basename(job.file.path)); console.log(`Processing document: ${job.file.path}`); await db.query('UPDATE documents SET processed_status = ? WHERE id = ?', ['Pending', job.documentId]); // const filePath = job.file.path.trim(); console.log("🔍 Checking file at:", filePath); if (!fs.existsSync(filePath)) { console.error(`File not found: "${filePath}"`); return; // Stop execution if the file does not exist } // Ensure filePath is valid before using fs.createReadStream const formData = new FormData(); try { const fileStream = fs.createReadStream(filePath); formData.append('pdf', fileStream); // Ensure fileStream is valid formData.append('doc_id', job.documentId); formData.append('hospital_id', job.hospital_id); formData.append('failed_page', job.failed_page); } catch (error) { // console.error(" Error creating read stream:", error.message); } try { await axios.post('http://127.0.0.1:5000/flask-api/process-pdf', formData, { headers: formData.getHeaders(), }); console.log(`Python API called for ${job.file.path}`); // Poll the status of the document until it is processed or fails const pollStatus = async () => { const statusData = await checkDocumentStatus(job.documentId); if (statusData.processed_status === 'Processed') { console.log(`Document ${job.file.path} marked as ${statusData.processed_status}.`); retryMap.delete(job.documentId); // Clear retry count } else if (statusData.processed_status === 'Failed') { const newRetry = (retryMap.get(job.documentId) || 0) + 1; retryMap.set(job.documentId, newRetry); if (newRetry >= RETRY_LIMIT) { console.warn(` Document ${job.file.path} failed ${newRetry} times. Removing from all queues.`); retryMap.delete(job.documentId); console.log("prompting user ---- ") await db.query( 'UPDATE documents SET reason = ? WHERE id = ?', ['This PDF could not be processed due to access restrictions.', job.documentId] ); console.log("prompted user---- ") queue = queue.filter(item => item.documentId !== job.documentId); failedQueue = failedQueue.filter(item => item.documentId !== job.documentId); } else { console.log(`Retrying (${newRetry}/${RETRY_LIMIT}) for ${job.file.path}`); // fetch freshly const statusdata = await checkDocumentStatus(job.documentId); failedQueue.push({ file: {path:statusdata.file_url}, hospital_id: statusdata.hospital_id, documentId: statusdata.id, failed_page: statusdata.failed_page }); // queue.push({ ...job }); } console.log(`Document ${job.file.path} failed. Adding back to failedQueue.`); } else { if (queue.length === 0 && failedQueue.length === 0) { console.log(" Queue is empty during polling. Stopping poll."); return; } setTimeout(pollStatus, CHECK_INTERVAL); } }; await pollStatus(); } catch (error) { console.error(`Error processing document ${job.file.path}: ${error.message}`); const newRetry = (retryMap.get(job.documentId) || 0) + 1; retryMap.set(job.documentId, newRetry); console.warn(` Document ${job.file.path} failed ${newRetry} times.`); const statusdata = await checkDocumentStatus(job.documentId); console.log('statusdata------',statusdata) if (newRetry >= RETRY_LIMIT) { console.warn(`Skipping ${job.file.path} after ${newRetry} failed attempts. Removing from all queues.`); retryMap.delete(job.documentId); await db.query( 'UPDATE documents SET reason = ? WHERE id = ?', ['This PDF could not be processed due to access restrictions.', job.documentId] ); queue = queue.filter(item => item.documentId !== job.documentId); failedQueue = failedQueue.filter(item => item.documentId !== job.documentId); } else { job.failed_page = statusdata.failed_page; failedQueue.push(job); await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); } } } } console.log("All jobs processed."); isProcessing = false; }; // Function to add a document to the queue const processDocumentFromPy = async (file, hospital_id, documentId, failed_page) => { queue.push({ file, hospital_id, documentId, failed_page }); // console.log(`Added to queue: ${file.path}`); if (!isProcessing) { processQueue(); // Start processing if idle } }; exports.uploadDocument = async (req, res) => { try { // const { hospital_id } = req.body; const hospital_id = req.user.hospital_id; const uploaded_by = req.user.id; const file_name = req.file.originalname; const file_url = `/uploads/documents/${req.file.filename}`; const failed_page = req.body.failed_page; console.log("req.user----",req.user) if (!["Superadmin","Admin",7,8].includes(req.user.role)) { return res .status(403) .json({ error: "You are not authorized to upload documents" }); } // Step 1: Insert document details into the `documents` table const insertQuery = ` INSERT INTO documents (hospital_id, uploaded_by, file_name, file_url, processed_status) VALUES (?, ?, ?, ?, 'Pending') `; const result = await db.query(insertQuery, [hospital_id, uploaded_by, file_name, file_url]); const documentId = result.insertId; // Step 2: Send the file to the Python Flask API // const pythonApiUrl = 'http://127.0.0.1:5000/process-pdf'; processDocumentFromPy(req.file, hospital_id, documentId, failed_page) if (result || result.affectedRows > 0) { res.status(200).json({ message: 'Document uploaded!', }); } } catch (error) { // console.error('Error uploading document:', error.message); res.status(500).json({ error: error.message }); } }; exports.getDocumentsByHospital = async (req, res) => { try { const { hospital_id } = req.params; // Ensure the authenticated user is either Admin or Superadmin if (!['Admin', 'Superadmin', 'Viewer', 8, 9, 7].includes(req.user.role)) { return res.status(403).json({ error: 'You are not authorized to view documents' }); } // Ensure the user belongs to the correct hospital if (req.user.hospital_id !== parseInt(hospital_id, 10)) { return res.status(403).json({ error: 'You are not authorized to access documents for this hospital' }); } // Fetch documents const documents = await db.query('SELECT * FROM documents WHERE hospital_id = ?', [hospital_id]); res.status(200).json({ documents }); } catch (error) { // console.error('Error fetching documents:', error.message); res.status(500).json({ error: 'Internal server error' }); } }; exports.getDocumentsByHospitalappUser = async (req, res) => { try { const { hospital_id } = req.params; // Fetch documents const documents = await db.query('SELECT * FROM documents WHERE hospital_id = ?', [hospital_id]); res.status(200).json({ documents }); } catch (error) { // console.error('Error fetching documents:', error.message); res.status(500).json({ error: 'Internal server error' }); } }; exports.updateDocumentStatus = async (req, res) => { try { const { id } = req.params; const { processed_status } = req.body; // Fetch the document to validate ownership const documentQuery = 'SELECT hospital_id FROM documents WHERE id = ?'; const documentResult = await db.query(documentQuery, [id]); if (documentResult.length === 0) { return res.status(404).json({ error: 'Document not found' }); } const document = documentResult[0]; // Ensure the authenticated user is either Admin or Superadmin if (!['Admin', 'Superadmin', 8, 7].includes(req.user.role)) { return res.status(403).json({ error: 'You are not authorized to update documents' }); } // Ensure the user belongs to the same hospital as the document if (req.user.hospital_id !== document.hospital_id) { return res.status(403).json({ error: 'You are not authorized to update documents for this hospital' }); } // Update document status const updateQuery = 'UPDATE documents SET processed_status = ? WHERE id = ?'; const result = await db.query(updateQuery, [processed_status, id]); if (result.affectedRows === 0) { return res.status(404).json({ message: 'Document not found or no changes made' }); } res.status(200).json({ message: 'Document status updated successfully!' }); } catch (error) { console.error('Error updating document status:', error.message); res.status(500).json({ error: 'Internal server error' }); } }; exports.deleteDocument = async (req, res) => { try { const { id } = req.params; if (!id) { return res.status(400).json({ error: 'Document ID is required' }); } // Fetch the document to validate ownership const documentQuery = 'SELECT * FROM documents WHERE id = ?'; const documentResult = await db.query(documentQuery, [id]); if (documentResult.length === 0) { return res.status(404).json({ error: 'Document not found' }); } const document = documentResult[0]; // Authorization check if (!['Admin', 'Superadmin', 8, 7].includes(req.user.role)) { return res.status(403).json({ error: 'You are not authorized to delete documents' }); } if (req.user.hospital_id !== document.hospital_id) { return res.status(403).json({ error: 'You are not authorized to delete documents for this hospital' }); } // 🔁 Make a call to Flask API to delete vectors try { const flaskResponse = await axios.delete('http://localhost:5000/flask-api/delete-document-vectors', { data: { hospital_id: document.hospital_id, doc_id: document.id } }); if (flaskResponse.status !== 200) { return res.status(flaskResponse.status).json(flaskResponse.data); } } catch (flaskError) { console.error('Flask API error:', flaskError.message); const errorData = flaskError.response?.data || { error: 'Failed to delete document vectors' }; return res.status(500).json(errorData); } // Delete dependent records try { await Promise.all([ db.query('DELETE FROM questions_answers WHERE document_id = ?', [id]), db.query('DELETE FROM document_metadata WHERE document_id = ?', [id]) ]); } catch (error) { console.error("Error deleting dependent records:", error.message); return res.status(500).json({ error: "Failed to delete dependent records" }); } // Delete file if it exists const filePath = path.join(__dirname, '..','..', 'uploads', document.file_url.replace(/^\/uploads\//, '')); fs.access(filePath, fs.constants.F_OK, (err) => { if (err) { console.warn(`File not found: ${filePath}`); } else { fs.unlink(filePath, (err) => { if (err) { console.error('Error deleting file:', err.message); } else { console.log('File deleted successfully:', filePath); } }); } }); // Finally, delete the document const deleteQuery = 'DELETE FROM documents WHERE id = ?'; const result = await db.query(deleteQuery, [id]); if (result.affectedRows === 0) { return res.status(404).json({ message: 'Document not found' }); } res.status(200).json({ message: 'Document deleted successfully!' }); } catch (error) { console.error('Error deleting document:', error.message); res.status(500).json({ error: 'Internal server error' }); } };