1411 lines
61 KiB
Python
1411 lines
61 KiB
Python
# # FLEXIBLE REQUIREMENT-PROCESSOR - ACCEPTS ANY BODY STRUCTURE
|
|
# # NO strict validation, accepts any JSON and extracts features dynamically
|
|
# # Just extract features and let Claude decide everything
|
|
|
|
# import os
|
|
# import sys
|
|
# import json
|
|
# from datetime import datetime
|
|
# from typing import Dict, Any, Optional, Union
|
|
# from pydantic import BaseModel
|
|
# from fastapi import FastAPI, HTTPException, Request
|
|
# from fastapi.middleware.cors import CORSMiddleware
|
|
# from loguru import logger
|
|
# import anthropic
|
|
|
|
# # Configure logging
|
|
# logger.remove()
|
|
# logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}")
|
|
|
|
# # Initialize Claude client
|
|
# try:
|
|
# claude_client = anthropic.Anthropic(
|
|
# api_key=os.getenv("ANTHROPIC_API_KEY", "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")
|
|
# )
|
|
# logger.info("✅ Claude client initialized successfully")
|
|
# except Exception as e:
|
|
# logger.warning(f"⚠️ Claude client not initialized: {e}")
|
|
# claude_client = None
|
|
|
|
# # ================================================================================================
|
|
# # FLEXIBLE MODELS
|
|
# # ================================================================================================
|
|
|
|
# class FlexibleRequirementRequest(BaseModel):
|
|
# """Flexible request model that accepts any structure"""
|
|
|
|
# class Config:
|
|
# extra = "allow" # Allow any additional fields
|
|
|
|
# # ================================================================================================
|
|
# # FLEXIBLE FASTAPI APPLICATION
|
|
# # ================================================================================================
|
|
|
|
# app = FastAPI(
|
|
# title="Flexible Requirements Processor",
|
|
# description="Flexible feature extraction - accepts any body structure, no strict validation",
|
|
# version="5.0.0"
|
|
# )
|
|
|
|
# app.add_middleware(
|
|
# CORSMiddleware,
|
|
# allow_origins=["*"],
|
|
# allow_credentials=True,
|
|
# allow_methods=["*"],
|
|
# allow_headers=["*"],
|
|
# )
|
|
|
|
# @app.get("/health")
|
|
# async def health_check():
|
|
# return {
|
|
# "status": "healthy",
|
|
# "service": "flexible-requirements-processor",
|
|
# "version": "5.0.0",
|
|
# "approach": "accepts_any_body_structure",
|
|
# "claude_available": claude_client is not None
|
|
# }
|
|
|
|
# @app.post("/api/v1/process-requirements")
|
|
# async def process_flexible_requirements(request: Request):
|
|
# """
|
|
# FLEXIBLE: Accepts ANY body structure and extracts features dynamically
|
|
# NO strict validation, NO required fields
|
|
# Works with any JSON structure from n8n
|
|
# """
|
|
# try:
|
|
# # Get raw JSON body
|
|
# raw_body = await request.json()
|
|
# logger.info(f"Received raw body: {json.dumps(raw_body, indent=2)}")
|
|
|
|
# # Extract project name from various possible locations
|
|
# project_name = extract_project_name(raw_body)
|
|
|
|
# # Extract description from various possible locations
|
|
# description = extract_description(raw_body)
|
|
|
|
# # Extract ALL features from ANY part of the data
|
|
# all_features, scale_info, complete_requirements = extract_all_data(raw_body)
|
|
|
|
# logger.info(f"✅ Extracted {len(all_features)} features from flexible structure")
|
|
|
|
# # STEP 3: Build simple response with ALL data preserved
|
|
# response = {
|
|
# "success": True,
|
|
# "data": {
|
|
# "project_id": f"flexible-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
|
|
# "project_name": project_name,
|
|
# "project_description": description,
|
|
|
|
# # PURE DATA - NO ANALYSIS
|
|
# "all_features": all_features,
|
|
# "total_features": len(all_features),
|
|
# "scale_information": scale_info,
|
|
# "complete_requirements": complete_requirements, # EVERYTHING PRESERVED
|
|
|
|
# "processing_metadata": {
|
|
# "approach": "flexible_data_extraction",
|
|
# "analysis_performed": "none_let_llm_decide",
|
|
# "features_extracted": len(all_features),
|
|
# "timestamp": datetime.utcnow().isoformat(),
|
|
# "input_structure": "flexible_any_body"
|
|
# }
|
|
# }
|
|
# }
|
|
|
|
# logger.info(f"✅ Successfully processed flexible requirements - {len(all_features)} features extracted")
|
|
# return response
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"❌ Flexible requirements processing failed: {e}")
|
|
# # Return error but don't crash
|
|
# return {
|
|
# "success": False,
|
|
# "error": str(e),
|
|
# "message": "Flexible processor encountered an error but continues running"
|
|
# }
|
|
|
|
# @app.post("/api/v1/generate-business-questions")
|
|
# async def generate_business_questions(request: Request):
|
|
# """
|
|
# Generate business questions based on enhanced feature analysis
|
|
# Input: {featureName, description, requirements, complexity, logicRules}
|
|
# Output: Same input + businessQuestions array
|
|
# """
|
|
# try:
|
|
# # Get the enhanced feature data
|
|
# feature_data = await request.json()
|
|
# logger.info(f"Generating business questions for: {feature_data.get('featureName', 'Unknown')}")
|
|
|
|
# # Extract feature information
|
|
# feature_name = feature_data.get('featureName', '')
|
|
# description = feature_data.get('description', '')
|
|
# requirements = feature_data.get('requirements', [])
|
|
# complexity = feature_data.get('complexity', 'medium')
|
|
# logic_rules = feature_data.get('logicRules', [])
|
|
|
|
# if not claude_client:
|
|
# logger.warning("Claude not available, using fallback business questions")
|
|
# business_questions = generate_fallback_business_questions(feature_name, complexity)
|
|
# else:
|
|
# business_questions = await generate_ai_business_questions(
|
|
# feature_name, description, requirements, complexity, logic_rules
|
|
# )
|
|
|
|
# # Return the complete feature data with business questions added
|
|
# response_data = {
|
|
# **feature_data, # Include all original data
|
|
# "businessQuestions": business_questions,
|
|
# "questionsGenerated": True,
|
|
# "timestamp": datetime.utcnow().isoformat()
|
|
# }
|
|
|
|
# logger.info(f"✅ Generated {len(business_questions)} business questions")
|
|
|
|
# return {
|
|
# "success": True,
|
|
# "data": response_data
|
|
# }
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"❌ Business questions generation failed: {e}")
|
|
# return {
|
|
# "success": False,
|
|
# "error": str(e),
|
|
# "message": "Failed to generate business questions"
|
|
# }
|
|
|
|
# @app.post("/api/v1/generate-comprehensive-business-questions")
|
|
# async def generate_comprehensive_business_questions(request: Request):
|
|
# """
|
|
# Generate comprehensive business questions for ALL features as ONE INTEGRATED SYSTEM
|
|
# Analyzes all logic rules, requirements, and feature interactions
|
|
# """
|
|
# try:
|
|
# request_data = await request.json()
|
|
# logger.info(f"Generating comprehensive business questions for integrated system")
|
|
|
|
# # Extract all features and their details
|
|
# all_features = request_data.get('allFeatures', [])
|
|
# project_name = request_data.get('projectName', 'Software System')
|
|
# project_type = request_data.get('projectType', 'Business Application')
|
|
|
|
# if not all_features:
|
|
# return {
|
|
# "success": False,
|
|
# "error": "No features provided for analysis"
|
|
# }
|
|
|
|
# logger.info(f"Processing {len(all_features)} features as integrated system")
|
|
|
|
# if not claude_client:
|
|
# logger.warning("Claude not available, using comprehensive fallback")
|
|
# business_questions = generate_comprehensive_fallback_questions(all_features, project_type)
|
|
# else:
|
|
# business_questions = await generate_comprehensive_ai_questions(
|
|
# all_features, project_name, project_type
|
|
# )
|
|
|
|
# logger.info(f"✅ Generated {len(business_questions)} comprehensive business questions")
|
|
|
|
# return {
|
|
# "success": True,
|
|
# "data": {
|
|
# "businessQuestions": business_questions,
|
|
# "questionsGenerated": True,
|
|
# "systemAnalysis": {
|
|
# "totalFeatures": len(all_features),
|
|
# "projectType": project_type,
|
|
# "analysisType": "comprehensive_integrated_system"
|
|
# },
|
|
# "timestamp": datetime.utcnow().isoformat()
|
|
# }
|
|
# }
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"❌ Comprehensive business questions generation failed: {e}")
|
|
# return {
|
|
# "success": False,
|
|
# "error": str(e),
|
|
# "message": "Failed to generate comprehensive business questions"
|
|
# }
|
|
|
|
# async def generate_ai_business_questions(feature_name: str, description: str, requirements: list, complexity: str, logic_rules: list):
|
|
# """Generate business questions using Claude AI"""
|
|
# try:
|
|
# requirements_text = "\n".join([f"- {req}" for req in requirements])
|
|
# logic_rules_text = "\n".join([f"- {rule}" for rule in logic_rules])
|
|
|
|
# prompt = f"""
|
|
# Based on this feature specification, generate relevant business questions that will help determine the technology stack and architecture requirements:
|
|
|
|
# Feature: {feature_name}
|
|
# Description: {description}
|
|
# Complexity: {complexity}
|
|
|
|
# Technical Requirements:
|
|
# {requirements_text}
|
|
|
|
# Business Logic Rules:
|
|
# {logic_rules_text}
|
|
|
|
# Generate 5-8 specific business questions that would help determine:
|
|
# 1. Scale and usage patterns
|
|
# 2. Performance requirements
|
|
# 3. Integration needs
|
|
# 4. Compliance and security requirements
|
|
# 5. Budget and timeline constraints
|
|
# 6. Team capabilities
|
|
|
|
# Return ONLY a JSON array of questions in this format:
|
|
# [
|
|
# "How many concurrent users do you expect for this feature?",
|
|
# "What is your expected data volume and growth rate?",
|
|
# "Do you have specific compliance requirements (HIPAA, GDPR, etc.)?",
|
|
# "What is your target response time for this feature?",
|
|
# "Do you need real-time data synchronization?",
|
|
# "What is your budget range for this implementation?",
|
|
# "What is your preferred deployment timeline?",
|
|
# "Do you have existing systems this needs to integrate with?"
|
|
# ]
|
|
# """
|
|
|
|
# message = await claude_client.messages.create(
|
|
# model="claude-3-5-sonnet-20241022",
|
|
# max_tokens=1000,
|
|
# temperature=0.3,
|
|
# messages=[{"role": "user", "content": prompt}]
|
|
# )
|
|
|
|
# response_text = message.content[0].text.strip()
|
|
# logger.info(f"Claude response: {response_text}")
|
|
|
|
# # Extract JSON array from response
|
|
# import re
|
|
# json_match = re.search(r'\[[\s\S]*\]', response_text)
|
|
# if json_match:
|
|
# questions = json.loads(json_match.group())
|
|
# return questions
|
|
# else:
|
|
# logger.warning("Could not parse Claude response as JSON array")
|
|
# return generate_fallback_business_questions(feature_name, complexity)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Claude business questions generation failed: {e}")
|
|
# return generate_fallback_business_questions(feature_name, complexity)
|
|
|
|
# async def generate_comprehensive_ai_questions(all_features: list, project_name: str, project_type: str):
|
|
# """Generate comprehensive business questions using Claude AI for integrated system"""
|
|
# try:
|
|
# # Extract all logic rules and requirements from features
|
|
# all_logic_rules = []
|
|
# all_requirements = []
|
|
# feature_complexities = []
|
|
|
|
# system_overview = f"INTEGRATED {project_type.upper()} SYSTEM: {project_name}\n\n"
|
|
|
|
# for idx, feature in enumerate(all_features, 1):
|
|
# feature_name = feature.get('featureName') or feature.get('name', f'Feature {idx}')
|
|
# feature_desc = feature.get('description', '')
|
|
# complexity = feature.get('complexity', 'medium')
|
|
|
|
# system_overview += f"{idx}. {feature_name.upper()}"
|
|
# if feature_desc:
|
|
# system_overview += f" - {feature_desc}"
|
|
# system_overview += f" (Complexity: {complexity})\n"
|
|
|
|
# # Extract logic rules
|
|
# logic_rules = feature.get('logicRules', [])
|
|
# if logic_rules:
|
|
# system_overview += f" Logic Rules ({len(logic_rules)}):\n"
|
|
# for rule_idx, rule in enumerate(logic_rules, 1):
|
|
# system_overview += f" R{rule_idx}: {rule}\n"
|
|
# all_logic_rules.append(f"{feature_name} - {rule}")
|
|
|
|
# # Extract requirements
|
|
# requirements = feature.get('requirements', [])
|
|
# if requirements:
|
|
# system_overview += f" Requirements ({len(requirements)}):\n"
|
|
# for req in requirements:
|
|
# system_overview += f" • {req}\n"
|
|
# all_requirements.append(f"{feature_name}: {req}")
|
|
|
|
# feature_complexities.append(complexity)
|
|
# system_overview += "\n"
|
|
|
|
# # Determine overall system complexity
|
|
# complexity_weights = {'low': 1, 'medium': 2, 'high': 3}
|
|
# avg_complexity = sum(complexity_weights.get(c, 2) for c in feature_complexities) / len(feature_complexities)
|
|
# system_complexity = 'high' if avg_complexity >= 2.5 else 'medium' if avg_complexity >= 1.5 else 'low'
|
|
|
|
# prompt = f"""
|
|
# You are a senior business analyst and technical architect. Analyze this COMPLETE INTEGRATED SOFTWARE SYSTEM and generate comprehensive business questions that will provide ALL necessary information for:
|
|
|
|
# 1. Technology Stack Selection
|
|
# 2. System Architecture Design
|
|
# 3. Code Generation and Implementation
|
|
# 4. Infrastructure and Deployment Planning
|
|
|
|
# {system_overview}
|
|
|
|
# SYSTEM ANALYSIS:
|
|
# - Total Features: {len(all_features)}
|
|
# - Overall Complexity: {system_complexity}
|
|
# - Total Logic Rules: {len(all_logic_rules)}
|
|
# - Total Requirements: {len(all_requirements)}
|
|
|
|
# GENERATE COMPREHENSIVE BUSINESS QUESTIONS COVERING:
|
|
|
|
# **SYSTEM INTEGRATION & DATA FLOW:**
|
|
# - How should these {len(all_features)} features integrate and share data?
|
|
# - What are the workflow dependencies between features?
|
|
# - How should data flow across the entire system?
|
|
|
|
# **TECHNICAL IMPLEMENTATION (Based on Logic Rules):**
|
|
# {chr(10).join([f"- Question about: {rule}" for rule in all_logic_rules[:10]])}
|
|
|
|
# **SCALE & PERFORMANCE:**
|
|
# - User load across all features combined
|
|
# - Data volume and growth projections
|
|
# - Performance requirements for integrated system
|
|
# - Concurrent usage patterns
|
|
|
|
# **SECURITY & COMPLIANCE:**
|
|
# - Authentication/authorization across all features
|
|
# - Data protection and privacy requirements
|
|
# - Industry-specific compliance needs
|
|
# - Audit and logging requirements
|
|
|
|
# **INFRASTRUCTURE & DEPLOYMENT:**
|
|
# - Cloud vs on-premise preferences
|
|
# - Scalability and high availability needs
|
|
# - Backup and disaster recovery
|
|
# - Integration with existing systems
|
|
|
|
# **BUSINESS OPERATIONS:**
|
|
# - Budget for complete system development
|
|
# - Timeline and phased rollout preferences
|
|
# - Team capabilities and training needs
|
|
# - Success metrics and KPIs
|
|
|
|
# **FEATURE-SPECIFIC TECHNICAL DECISIONS:**
|
|
# Generate specific questions for each complex logic rule that impacts technical choices.
|
|
|
|
# IMPORTANT:
|
|
# - Generate as many questions as needed for COMPLETE coverage
|
|
# - Each question should help make specific technical decisions
|
|
# - Consider feature interactions and dependencies
|
|
# - Include questions that clarify implementation details for ALL logic rules
|
|
# - Think about the system as ONE integrated platform, not separate features
|
|
|
|
# Return ONLY a JSON array of comprehensive business questions:
|
|
# [
|
|
# "How many total users will access your integrated {project_type} system across all features?",
|
|
# "What data should be shared between [specific features based on analysis]?",
|
|
# "How should [specific logic rule] be implemented technically?",
|
|
# ...
|
|
# ]
|
|
# """
|
|
|
|
# message = await claude_client.messages.create(
|
|
# model="claude-3-5-sonnet-20241022",
|
|
# max_tokens=4000,
|
|
# temperature=0.3,
|
|
# messages=[{"role": "user", "content": prompt}]
|
|
# )
|
|
|
|
# response_text = message.content[0].text.strip()
|
|
# logger.info(f"Claude comprehensive response length: {len(response_text)}")
|
|
|
|
# # Extract JSON array from response
|
|
# import re
|
|
# json_match = re.search(r'\[[\s\S]*\]', response_text)
|
|
# if json_match:
|
|
# questions = json.loads(json_match.group())
|
|
# logger.info(f"Successfully parsed {len(questions)} comprehensive questions")
|
|
# return questions
|
|
# else:
|
|
# logger.warning("Could not parse Claude response as JSON array, using fallback")
|
|
# return generate_comprehensive_fallback_questions(all_features, project_type)
|
|
|
|
# except Exception as e:
|
|
# logger.error(f"Claude comprehensive questions generation failed: {e}")
|
|
# return generate_comprehensive_fallback_questions(all_features, project_type)
|
|
|
|
# def generate_fallback_business_questions(feature_name: str, complexity: str):
|
|
# """Generate fallback business questions when Claude is not available"""
|
|
|
|
# base_questions = [
|
|
# f"How many users do you expect to use the {feature_name} feature?",
|
|
# "What is your expected launch timeline for this feature?",
|
|
# "Do you have specific performance requirements?",
|
|
# "What is your budget range for this implementation?",
|
|
# "Do you need this feature to integrate with existing systems?"
|
|
# ]
|
|
|
|
# if complexity == "high":
|
|
# base_questions.extend([
|
|
# "Do you have specific compliance or security requirements?",
|
|
# "What level of data encryption do you need?",
|
|
# "Do you need real-time data processing capabilities?"
|
|
# ])
|
|
# elif complexity == "medium":
|
|
# base_questions.extend([
|
|
# "Do you need user authentication and authorization?",
|
|
# "What level of data backup and recovery do you need?"
|
|
# ])
|
|
|
|
# return base_questions
|
|
|
|
# def generate_comprehensive_fallback_questions(all_features: list, project_type: str):
|
|
# """Generate comprehensive fallback questions when Claude is not available"""
|
|
|
|
# feature_names = [f.get('featureName') or f.get('name', 'Feature') for f in all_features]
|
|
# features_text = ', '.join(feature_names)
|
|
|
|
# # Extract all logic rules for fallback questions
|
|
# all_logic_rules = []
|
|
# for feature in all_features:
|
|
# logic_rules = feature.get('logicRules', [])
|
|
# feature_name = feature.get('featureName') or feature.get('name', 'Feature')
|
|
# for rule in logic_rules:
|
|
# all_logic_rules.append((feature_name, rule))
|
|
|
|
# comprehensive_questions = [
|
|
# # System Integration Questions
|
|
# f"How many total users will access your integrated {project_type} system?",
|
|
# f"How should {features_text} features integrate and share data?",
|
|
# f"What are the workflow dependencies between {features_text}?",
|
|
# f"Do you need real-time data synchronization across all features?",
|
|
|
|
# # Scale and Performance
|
|
# f"What is the expected concurrent user load for your complete {project_type} system?",
|
|
# f"What data volume do you expect across all {len(all_features)} features?",
|
|
# f"What are your performance requirements for the integrated system?",
|
|
# f"Do you need the system to handle peak loads during business hours?",
|
|
|
|
# # Technical Implementation
|
|
# f"What is your total budget for developing this complete {project_type} system?",
|
|
# f"What is your preferred timeline for implementing all {len(all_features)} features?",
|
|
# f"Do you prefer cloud-based or on-premise deployment for the entire system?",
|
|
# f"What existing systems need to integrate with your new {project_type} platform?",
|
|
|
|
# # Security and Compliance
|
|
# f"What authentication method do you prefer for users across all features?",
|
|
# f"Do you have specific security requirements for your {project_type} system?",
|
|
# f"What level of data backup and recovery do you need for the complete system?",
|
|
# f"Are there any compliance requirements (GDPR, HIPAA, SOX) for your industry?",
|
|
|
|
# # Business Operations
|
|
# f"How do you measure success for your {project_type} system?",
|
|
# f"What reporting capabilities do you need across all features?",
|
|
# f"Do you need mobile access for your {project_type} system?",
|
|
# f"What level of customization do you need for different user roles?"
|
|
# ]
|
|
|
|
# # Add specific questions for logic rules
|
|
# for feature_name, logic_rule in all_logic_rules[:10]: # Limit to avoid too many questions
|
|
# if 'tax' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What tax jurisdictions and rates do you need to support?")
|
|
# elif 'currency' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What currencies do you need to support and do you need real-time exchange rates?")
|
|
# elif 'approval' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What approval workflows and thresholds do you need for {feature_name}?")
|
|
# elif 'notification' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"How should users be notified for {feature_name} events?")
|
|
# elif 'integration' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What third-party integrations do you need for {feature_name}?")
|
|
# elif 'status' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What status tracking and reporting do you need for {feature_name}?")
|
|
# elif 'numbering' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What numbering or identification systems do you need for {feature_name}?")
|
|
# elif 'payment' in logic_rule.lower():
|
|
# comprehensive_questions.append(f"What payment processing capabilities do you need for {feature_name}?")
|
|
|
|
# # Remove duplicates while preserving order
|
|
# seen = set()
|
|
# unique_questions = []
|
|
# for question in comprehensive_questions:
|
|
# if question.lower() not in seen:
|
|
# seen.add(question.lower())
|
|
# unique_questions.append(question)
|
|
|
|
# return unique_questions
|
|
|
|
# def extract_project_name(data: Dict[str, Any]) -> str:
|
|
# """Extract project name from various possible locations"""
|
|
|
|
# # Try different possible locations
|
|
# possible_locations = [
|
|
# data.get('project_name'),
|
|
# data.get('projectName'),
|
|
# data.get('name'),
|
|
# data.get('title'),
|
|
# ]
|
|
|
|
# # Check in nested structures
|
|
# if isinstance(data.get('body'), dict):
|
|
# possible_locations.extend([
|
|
# data['body'].get('project_name'),
|
|
# data['body'].get('projectName'),
|
|
# data['body'].get('name'),
|
|
# ])
|
|
|
|
# if isinstance(data.get('requirements'), dict):
|
|
# possible_locations.extend([
|
|
# data['requirements'].get('project_name'),
|
|
# data['requirements'].get('name'),
|
|
# ])
|
|
|
|
# # Return the first non-empty value found
|
|
# for location in possible_locations:
|
|
# if location and isinstance(location, str) and location.strip():
|
|
# return location.strip()
|
|
|
|
# return "Unknown Project"
|
|
|
|
# def extract_description(data: Dict[str, Any]) -> str:
|
|
# """Extract description from various possible locations"""
|
|
|
|
# possible_locations = [
|
|
# data.get('description'),
|
|
# data.get('desc'),
|
|
# data.get('project_description'),
|
|
# ]
|
|
|
|
# # Check in nested structures
|
|
# if isinstance(data.get('body'), dict):
|
|
# possible_locations.extend([
|
|
# data['body'].get('description'),
|
|
# data['body'].get('desc'),
|
|
# ])
|
|
|
|
# # Return the first non-empty value found
|
|
# for location in possible_locations:
|
|
# if location and isinstance(location, str) and location.strip():
|
|
# return location.strip()
|
|
|
|
# return ""
|
|
|
|
# def extract_all_data(data: Dict[str, Any]) -> tuple[list, dict, dict]:
|
|
# """Extract ALL features, scale info, and complete requirements from ANY structure"""
|
|
|
|
# all_features = []
|
|
# scale_info = {}
|
|
# complete_requirements = {}
|
|
|
|
# # Recursive function to find all boolean features and scale info
|
|
# def extract_from_object(obj: Any, path: str = ""):
|
|
# if isinstance(obj, dict):
|
|
# for key, value in obj.items():
|
|
# current_path = f"{path}.{key}" if path else key
|
|
|
|
# # Extract boolean features
|
|
# if value is True:
|
|
# all_features.append(key)
|
|
# complete_requirements[key] = value
|
|
|
|
# # Extract scale information
|
|
# elif key in ['team_size', 'timeline', 'budget', 'expected_users', 'industry', 'scalability',
|
|
# 'concurrent_users', 'data_volume', 'performance_requirements', 'compliance_requirements']:
|
|
# scale_info[key] = value
|
|
# complete_requirements[key] = value
|
|
|
|
# # Extract other non-boolean values that might be features
|
|
# elif isinstance(value, str) and value.strip():
|
|
# complete_requirements[key] = value
|
|
|
|
# # Extract numeric values
|
|
# elif isinstance(value, (int, float)) and not isinstance(value, bool):
|
|
# complete_requirements[key] = value
|
|
|
|
# # Recurse into nested objects
|
|
# elif isinstance(value, (dict, list)):
|
|
# extract_from_object(value, current_path)
|
|
|
|
# elif isinstance(obj, list):
|
|
# for i, item in enumerate(obj):
|
|
# if isinstance(item, (dict, list)):
|
|
# extract_from_object(item, f"{path}[{i}]" if path else f"[{i}]")
|
|
|
|
# # Extract from the entire data structure
|
|
# extract_from_object(data)
|
|
|
|
# # Also try to extract from common nested locations
|
|
# nested_locations = [
|
|
# data.get('body'),
|
|
# data.get('requirements'),
|
|
# data.get('params'),
|
|
# data.get('query'),
|
|
# data.get('data')
|
|
# ]
|
|
|
|
# for nested_data in nested_locations:
|
|
# if isinstance(nested_data, dict):
|
|
# extract_from_object(nested_data)
|
|
|
|
# # Remove duplicates
|
|
# all_features = list(set(all_features))
|
|
|
|
# logger.info(f"Extracted features: {all_features}")
|
|
# logger.info(f"Extracted scale info: {scale_info}")
|
|
|
|
# return all_features, scale_info, complete_requirements
|
|
|
|
# if __name__ == "__main__":
|
|
# import uvicorn
|
|
|
|
# logger.info("🚀 FLEXIBLE REQUIREMENTS PROCESSOR - Accepts Any Body Structure")
|
|
# logger.info("✅ NO strict validation, NO required fields")
|
|
# logger.info("✅ Accepts any JSON structure from n8n")
|
|
# logger.info("✅ Extracts features from anywhere in the data")
|
|
# logger.info("✅ Generates business questions with Claude AI")
|
|
# logger.info("✅ NEW: Comprehensive business questions for integrated systems")
|
|
|
|
# uvicorn.run("main:app", host="0.0.0.0", port=5678, log_level="info")
|
|
|
|
|
|
# FLEXIBLE REQUIREMENT-PROCESSOR - ACCEPTS ANY BODY STRUCTURE
|
|
# NO strict validation, accepts any JSON and extracts features dynamically
|
|
# Just extract features and let Claude decide everything
|
|
# ENHANCED: Now supports tagged rules from detailed requirements
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, Union
|
|
from pydantic import BaseModel
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from loguru import logger
|
|
import anthropic
|
|
|
|
# Configure logging
|
|
logger.remove()
|
|
logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}")
|
|
|
|
# Initialize Claude client
|
|
try:
|
|
claude_client = anthropic.Anthropic(
|
|
api_key=os.getenv("ANTHROPIC_API_KEY", "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")
|
|
)
|
|
logger.info("✅ Claude client initialized successfully")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Claude client not initialized: {e}")
|
|
claude_client = None
|
|
|
|
# ================================================================================================
|
|
# FLEXIBLE MODELS
|
|
# ================================================================================================
|
|
|
|
class FlexibleRequirementRequest(BaseModel):
|
|
"""Flexible request model that accepts any structure"""
|
|
|
|
class Config:
|
|
extra = "allow" # Allow any additional fields
|
|
|
|
# ================================================================================================
|
|
# FLEXIBLE FASTAPI APPLICATION
|
|
# ================================================================================================
|
|
|
|
app = FastAPI(
|
|
title="Flexible Requirements Processor",
|
|
description="Flexible feature extraction - accepts any body structure, no strict validation",
|
|
version="5.1.0"
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {
|
|
"status": "healthy",
|
|
"service": "flexible-requirements-processor",
|
|
"version": "5.1.0",
|
|
"approach": "accepts_any_body_structure",
|
|
"claude_available": claude_client is not None,
|
|
"new_features": ["tagged_rules_support", "enhanced_comprehensive_questions"]
|
|
}
|
|
|
|
@app.post("/api/v1/process-requirements")
|
|
async def process_flexible_requirements(request: Request):
|
|
"""
|
|
FLEXIBLE: Accepts ANY body structure and extracts features dynamically
|
|
NO strict validation, NO required fields
|
|
Works with any JSON structure from n8n
|
|
"""
|
|
try:
|
|
# Get raw JSON body
|
|
raw_body = await request.json()
|
|
logger.info(f"Received raw body: {json.dumps(raw_body, indent=2)}")
|
|
|
|
# Extract project name from various possible locations
|
|
project_name = extract_project_name(raw_body)
|
|
|
|
# Extract description from various possible locations
|
|
description = extract_description(raw_body)
|
|
|
|
# Extract ALL features from ANY part of the data
|
|
all_features, scale_info, complete_requirements = extract_all_data(raw_body)
|
|
|
|
logger.info(f"✅ Extracted {len(all_features)} features from flexible structure")
|
|
|
|
# STEP 3: Build simple response with ALL data preserved
|
|
response = {
|
|
"success": True,
|
|
"data": {
|
|
"project_id": f"flexible-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
|
|
"project_name": project_name,
|
|
"project_description": description,
|
|
|
|
# PURE DATA - NO ANALYSIS
|
|
"all_features": all_features,
|
|
"total_features": len(all_features),
|
|
"scale_information": scale_info,
|
|
"complete_requirements": complete_requirements, # EVERYTHING PRESERVED
|
|
|
|
"processing_metadata": {
|
|
"approach": "flexible_data_extraction",
|
|
"analysis_performed": "none_let_llm_decide",
|
|
"features_extracted": len(all_features),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"input_structure": "flexible_any_body"
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.info(f"✅ Successfully processed flexible requirements - {len(all_features)} features extracted")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Flexible requirements processing failed: {e}")
|
|
# Return error but don't crash
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"message": "Flexible processor encountered an error but continues running"
|
|
}
|
|
|
|
@app.post("/api/v1/generate-business-questions")
|
|
async def generate_business_questions(request: Request):
|
|
"""
|
|
Generate business questions based on enhanced feature analysis
|
|
Input: {featureName, description, requirements, complexity, logicRules}
|
|
Output: Same input + businessQuestions array
|
|
"""
|
|
try:
|
|
# Get the enhanced feature data
|
|
feature_data = await request.json()
|
|
logger.info(f"Generating business questions for: {feature_data.get('featureName', 'Unknown')}")
|
|
|
|
# Extract feature information
|
|
feature_name = feature_data.get('featureName', '')
|
|
description = feature_data.get('description', '')
|
|
requirements = feature_data.get('requirements', [])
|
|
complexity = feature_data.get('complexity', 'medium')
|
|
logic_rules = feature_data.get('logicRules', [])
|
|
|
|
if not claude_client:
|
|
logger.warning("Claude not available, using fallback business questions")
|
|
business_questions = generate_fallback_business_questions(feature_name, complexity)
|
|
else:
|
|
business_questions = await generate_ai_business_questions(
|
|
feature_name, description, requirements, complexity, logic_rules
|
|
)
|
|
|
|
# Return the complete feature data with business questions added
|
|
response_data = {
|
|
**feature_data, # Include all original data
|
|
"businessQuestions": business_questions,
|
|
"questionsGenerated": True,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
logger.info(f"✅ Generated {len(business_questions)} business questions")
|
|
|
|
return {
|
|
"success": True,
|
|
"data": response_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Business questions generation failed: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"message": "Failed to generate business questions"
|
|
}
|
|
|
|
@app.post("/api/v1/generate-comprehensive-business-questions")
|
|
async def generate_comprehensive_business_questions(request: Request):
|
|
"""
|
|
ENHANCED: Generate comprehensive business questions for ALL features as ONE INTEGRATED SYSTEM
|
|
Now supports tagged rules from detailed requirements + AI features
|
|
Analyzes all logic rules, requirements, and feature interactions
|
|
"""
|
|
try:
|
|
request_data = await request.json()
|
|
logger.info(f"🚀 Generating comprehensive business questions for integrated system")
|
|
|
|
# Extract all features and their details - ENHANCED to handle tagged rules
|
|
all_features = request_data.get('allFeatures', [])
|
|
project_name = request_data.get('projectName', 'Software System')
|
|
project_type = request_data.get('projectType', 'Business Application')
|
|
|
|
if not all_features:
|
|
return {
|
|
"success": False,
|
|
"error": "No features provided for analysis"
|
|
}
|
|
|
|
logger.info(f"📊 Processing {len(all_features)} features as integrated system")
|
|
|
|
# Log the structure of features to understand the data
|
|
for idx, feature in enumerate(all_features):
|
|
feature_name = feature.get('featureName') or feature.get('name', f'Feature {idx+1}')
|
|
has_requirement_analysis = 'requirementAnalysis' in feature
|
|
has_tagged_rules = 'taggedLogicRules' in feature
|
|
has_regular_rules = 'logicRules' in feature
|
|
|
|
logger.info(f" Feature {idx+1}: {feature_name}")
|
|
logger.info(f" - Has requirementAnalysis: {has_requirement_analysis}")
|
|
logger.info(f" - Has taggedLogicRules: {has_tagged_rules}")
|
|
logger.info(f" - Has regular logicRules: {has_regular_rules}")
|
|
|
|
if not claude_client:
|
|
logger.warning("Claude not available, using comprehensive fallback")
|
|
business_questions = generate_enhanced_comprehensive_fallback_questions(all_features, project_type)
|
|
else:
|
|
business_questions = await generate_enhanced_comprehensive_ai_questions(
|
|
all_features, project_name, project_type
|
|
)
|
|
|
|
logger.info(f"✅ Generated {len(business_questions)} comprehensive business questions")
|
|
|
|
return {
|
|
"success": True,
|
|
"data": {
|
|
"businessQuestions": business_questions,
|
|
"questionsGenerated": True,
|
|
"systemAnalysis": {
|
|
"totalFeatures": len(all_features),
|
|
"projectType": project_type,
|
|
"analysisType": "enhanced_comprehensive_integrated_system_with_tagged_rules"
|
|
},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Comprehensive business questions generation failed: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"message": "Failed to generate comprehensive business questions"
|
|
}
|
|
|
|
async def generate_ai_business_questions(feature_name: str, description: str, requirements: list, complexity: str, logic_rules: list):
|
|
"""Generate business questions using Claude AI"""
|
|
try:
|
|
requirements_text = "\n".join([f"- {req}" for req in requirements])
|
|
logic_rules_text = "\n".join([f"- {rule}" for rule in logic_rules])
|
|
|
|
prompt = f"""
|
|
Based on this feature specification, generate relevant business questions that will help determine the technology stack and architecture requirements:
|
|
|
|
Feature: {feature_name}
|
|
Description: {description}
|
|
Complexity: {complexity}
|
|
|
|
Technical Requirements:
|
|
{requirements_text}
|
|
|
|
Business Logic Rules:
|
|
{logic_rules_text}
|
|
|
|
Generate 5-8 specific business questions that would help determine:
|
|
1. Scale and usage patterns
|
|
2. Performance requirements
|
|
3. Integration needs
|
|
4. Compliance and security requirements
|
|
5. Budget and timeline constraints
|
|
6. Team capabilities
|
|
|
|
Return ONLY a JSON array of questions in this format:
|
|
[
|
|
"How many concurrent users do you expect for this feature?",
|
|
"What is your expected data volume and growth rate?",
|
|
"Do you have specific compliance requirements (HIPAA, GDPR, etc.)?",
|
|
"What is your target response time for this feature?",
|
|
"Do you need real-time data synchronization?",
|
|
"What is your budget range for this implementation?",
|
|
"What is your preferred deployment timeline?",
|
|
"Do you have existing systems this needs to integrate with?"
|
|
]
|
|
"""
|
|
|
|
message = await claude_client.messages.create(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=1000,
|
|
temperature=0.3,
|
|
messages=[{"role": "user", "content": prompt}]
|
|
)
|
|
|
|
response_text = message.content[0].text.strip()
|
|
logger.info(f"Claude response: {response_text}")
|
|
|
|
# Extract JSON array from response
|
|
import re
|
|
json_match = re.search(r'\[[\s\S]*\]', response_text)
|
|
if json_match:
|
|
questions = json.loads(json_match.group())
|
|
return questions
|
|
else:
|
|
logger.warning("Could not parse Claude response as JSON array")
|
|
return generate_fallback_business_questions(feature_name, complexity)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Claude business questions generation failed: {e}")
|
|
return generate_fallback_business_questions(feature_name, complexity)
|
|
|
|
async def generate_enhanced_comprehensive_ai_questions(all_features: list, project_name: str, project_type: str):
|
|
"""ENHANCED: Generate comprehensive business questions using Claude AI for integrated system with tagged rules"""
|
|
try:
|
|
# Extract all logic rules and requirements from features - ENHANCED for tagged rules
|
|
all_logic_rules = []
|
|
all_requirements = []
|
|
feature_complexities = []
|
|
detailed_requirements = []
|
|
|
|
system_overview = f"INTEGRATED {project_type.upper()} SYSTEM: {project_name}\n\n"
|
|
|
|
for idx, feature in enumerate(all_features, 1):
|
|
feature_name = feature.get('featureName') or feature.get('name', f'Feature {idx}')
|
|
feature_desc = feature.get('description', '')
|
|
complexity = feature.get('complexity', 'medium')
|
|
|
|
system_overview += f"{idx}. {feature_name.upper()}"
|
|
if feature_desc:
|
|
system_overview += f" - {feature_desc}"
|
|
system_overview += f" (Complexity: {complexity})\n"
|
|
|
|
# ENHANCED: Extract tagged rules from requirementAnalysis
|
|
requirement_analysis = feature.get('requirementAnalysis', [])
|
|
if requirement_analysis:
|
|
system_overview += f" DETAILED REQUIREMENTS WITH TAGGED RULES:\n"
|
|
for req_idx, req_analysis in enumerate(requirement_analysis):
|
|
req_name = req_analysis.get('requirement', f'Requirement {req_idx+1}')
|
|
req_rules = req_analysis.get('logicRules', [])
|
|
|
|
system_overview += f" • {req_name.upper()}:\n"
|
|
detailed_requirements.append(req_name)
|
|
|
|
if req_rules:
|
|
for rule_idx, rule in enumerate(req_rules, 1):
|
|
system_overview += f" R{rule_idx}: {rule}\n"
|
|
all_logic_rules.append(f"{feature_name}→{req_name}: {rule}")
|
|
system_overview += f"\n"
|
|
|
|
# Fallback: Extract regular logic rules if no tagged rules
|
|
elif feature.get('logicRules'):
|
|
logic_rules = feature.get('logicRules', [])
|
|
system_overview += f" Logic Rules ({len(logic_rules)}):\n"
|
|
for rule_idx, rule in enumerate(logic_rules, 1):
|
|
system_overview += f" R{rule_idx}: {rule}\n"
|
|
all_logic_rules.append(f"{feature_name}: {rule}")
|
|
|
|
# Extract requirements
|
|
requirements = feature.get('requirements', [])
|
|
if requirements:
|
|
system_overview += f" General Requirements ({len(requirements)}):\n"
|
|
for req in requirements:
|
|
system_overview += f" • {req}\n"
|
|
all_requirements.append(f"{feature_name}: {req}")
|
|
|
|
feature_complexities.append(complexity)
|
|
system_overview += "\n"
|
|
|
|
# Determine overall system complexity
|
|
complexity_weights = {'low': 1, 'medium': 2, 'high': 3}
|
|
avg_complexity = sum(complexity_weights.get(c, 2) for c in feature_complexities) / len(feature_complexities)
|
|
system_complexity = 'high' if avg_complexity >= 2.5 else 'medium' if avg_complexity >= 1.5 else 'low'
|
|
|
|
prompt = f"""
|
|
You are a senior business analyst and technical architect. Analyze this COMPLETE INTEGRATED SOFTWARE SYSTEM with TAGGED LOGIC RULES and generate comprehensive business questions that will provide ALL necessary information for:
|
|
|
|
1. Technology Stack Selection
|
|
2. System Architecture Design
|
|
3. Code Generation and Implementation
|
|
4. Infrastructure and Deployment Planning
|
|
|
|
{system_overview}
|
|
|
|
ENHANCED SYSTEM ANALYSIS:
|
|
- Total Features: {len(all_features)}
|
|
- Detailed Requirements: {len(detailed_requirements)}
|
|
- Overall Complexity: {system_complexity}
|
|
- Total Tagged Logic Rules: {len(all_logic_rules)}
|
|
- Total Requirements: {len(all_requirements)}
|
|
|
|
GENERATE COMPREHENSIVE BUSINESS QUESTIONS COVERING:
|
|
|
|
**SYSTEM INTEGRATION & DATA FLOW:**
|
|
- How should these {len(all_features)} features with {len(detailed_requirements)} detailed requirements integrate?
|
|
- What data should be shared between specific detailed requirements?
|
|
- How should workflow dependencies work across the integrated system?
|
|
|
|
**TECHNICAL IMPLEMENTATION (Based on ALL Tagged Logic Rules):**
|
|
Generate specific technical questions for EACH major logic rule category:
|
|
{chr(10).join([f"- {rule}" for rule in all_logic_rules[:15]])}
|
|
|
|
**DETAILED REQUIREMENT INTEGRATION:**
|
|
For each detailed requirement, ask specific integration questions:
|
|
{chr(10).join([f"- How should '{req}' integrate with other system components?" for req in detailed_requirements[:8]])}
|
|
|
|
**SCALE & PERFORMANCE:**
|
|
- User load across all features and detailed requirements
|
|
- Data volume projections for integrated workflows
|
|
- Performance requirements considering all logic rules
|
|
- Concurrent usage patterns across detailed requirements
|
|
|
|
**SECURITY & COMPLIANCE:**
|
|
- Authentication/authorization across all detailed requirements
|
|
- Data protection for integrated workflows
|
|
- Compliance needs considering all business logic rules
|
|
- Audit requirements for complex integrated operations
|
|
|
|
**INFRASTRUCTURE & DEPLOYMENT:**
|
|
- Cloud architecture for integrated system with multiple detailed requirements
|
|
- Scalability for complex logic rule processing
|
|
- Integration points with existing systems
|
|
- Deployment strategy for feature dependencies
|
|
|
|
**BUSINESS OPERATIONS:**
|
|
- Budget for complete integrated system development
|
|
- Phased rollout considering detailed requirement dependencies
|
|
- Success metrics across all integrated features
|
|
- Training needs for complex integrated workflows
|
|
|
|
**LOGIC RULE SPECIFIC QUESTIONS:**
|
|
Generate targeted questions for complex logic rules that need technical clarification.
|
|
|
|
IMPORTANT:
|
|
- Generate comprehensive questions covering ALL detailed requirements
|
|
- Each question should help make specific technical architecture decisions
|
|
- Consider interactions between detailed requirements and their tagged rules
|
|
- Include questions about data flow between specific requirements
|
|
- Ask about technical implementation of complex rule combinations
|
|
- Focus on the system as ONE integrated platform with detailed sub-components
|
|
|
|
Return ONLY a JSON array of comprehensive business questions:
|
|
[
|
|
"How many total users will access your integrated {project_type} system across all detailed requirements?",
|
|
"How should data flow between [specific detailed requirements based on analysis]?",
|
|
"What technical infrastructure do you need for [specific complex logic rule]?",
|
|
"How should [detailed requirement A] integrate with [detailed requirement B]?",
|
|
...
|
|
]
|
|
"""
|
|
|
|
message = await claude_client.messages.create(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=4000,
|
|
temperature=0.3,
|
|
messages=[{"role": "user", "content": prompt}]
|
|
)
|
|
|
|
response_text = message.content[0].text.strip()
|
|
logger.info(f"Claude comprehensive response length: {len(response_text)}")
|
|
|
|
# Extract JSON array from response
|
|
import re
|
|
json_match = re.search(r'\[[\s\S]*\]', response_text)
|
|
if json_match:
|
|
questions = json.loads(json_match.group())
|
|
logger.info(f"Successfully parsed {len(questions)} enhanced comprehensive questions")
|
|
return questions
|
|
else:
|
|
logger.warning("Could not parse Claude response as JSON array, using enhanced fallback")
|
|
return generate_enhanced_comprehensive_fallback_questions(all_features, project_type)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Claude enhanced comprehensive questions generation failed: {e}")
|
|
return generate_enhanced_comprehensive_fallback_questions(all_features, project_type)
|
|
|
|
# LEGACY FUNCTION - PRESERVED for backward compatibility
|
|
async def generate_comprehensive_ai_questions(all_features: list, project_name: str, project_type: str):
|
|
"""LEGACY: Generate comprehensive business questions using Claude AI for integrated system"""
|
|
# Call the enhanced version for backward compatibility
|
|
return await generate_enhanced_comprehensive_ai_questions(all_features, project_name, project_type)
|
|
|
|
def generate_fallback_business_questions(feature_name: str, complexity: str):
|
|
"""Generate fallback business questions when Claude is not available"""
|
|
|
|
base_questions = [
|
|
f"How many users do you expect to use the {feature_name} feature?",
|
|
"What is your expected launch timeline for this feature?",
|
|
"Do you have specific performance requirements?",
|
|
"What is your budget range for this implementation?",
|
|
"Do you need this feature to integrate with existing systems?"
|
|
]
|
|
|
|
if complexity == "high":
|
|
base_questions.extend([
|
|
"Do you have specific compliance or security requirements?",
|
|
"What level of data encryption do you need?",
|
|
"Do you need real-time data processing capabilities?"
|
|
])
|
|
elif complexity == "medium":
|
|
base_questions.extend([
|
|
"Do you need user authentication and authorization?",
|
|
"What level of data backup and recovery do you need?"
|
|
])
|
|
|
|
return base_questions
|
|
|
|
def generate_enhanced_comprehensive_fallback_questions(all_features: list, project_type: str):
|
|
"""ENHANCED: Generate comprehensive fallback questions with tagged rules support"""
|
|
|
|
feature_names = [f.get('featureName') or f.get('name', 'Feature') for f in all_features]
|
|
features_text = ', '.join(feature_names)
|
|
|
|
# ENHANCED: Extract all tagged logic rules and detailed requirements
|
|
all_logic_rules = []
|
|
detailed_requirements = []
|
|
|
|
for feature in all_features:
|
|
feature_name = feature.get('featureName') or feature.get('name', 'Feature')
|
|
|
|
# Extract from tagged rules structure
|
|
requirement_analysis = feature.get('requirementAnalysis', [])
|
|
if requirement_analysis:
|
|
for req_analysis in requirement_analysis:
|
|
req_name = req_analysis.get('requirement', 'Requirement')
|
|
detailed_requirements.append(req_name)
|
|
req_rules = req_analysis.get('logicRules', [])
|
|
for rule in req_rules:
|
|
all_logic_rules.append((feature_name, req_name, rule))
|
|
|
|
# Fallback to regular logic rules
|
|
else:
|
|
logic_rules = feature.get('logicRules', [])
|
|
for rule in logic_rules:
|
|
all_logic_rules.append((feature_name, 'General', rule))
|
|
|
|
comprehensive_questions = [
|
|
# System Integration Questions - ENHANCED
|
|
f"How many total users will access your integrated {project_type} system across all detailed requirements?",
|
|
f"How should {features_text} features with their detailed requirements integrate and share data?",
|
|
f"What are the workflow dependencies between detailed requirements: {', '.join(detailed_requirements[:5])}?",
|
|
f"Do you need real-time data synchronization across all detailed requirements?",
|
|
|
|
# Detailed Requirements Integration - NEW
|
|
f"How should data flow between these detailed requirements: {', '.join(detailed_requirements[:3])}?",
|
|
f"What shared services are needed across detailed requirements?",
|
|
f"How should user permissions work across {len(detailed_requirements)} detailed requirements?",
|
|
|
|
# Scale and Performance - ENHANCED
|
|
f"What is the expected concurrent user load across all {len(detailed_requirements)} detailed requirements?",
|
|
f"What data volume do you expect for integrated workflows across detailed requirements?",
|
|
f"What are your performance requirements for complex operations involving multiple detailed requirements?",
|
|
f"Do you need the system to handle peak loads across all detailed requirements simultaneously?",
|
|
|
|
# Technical Implementation - ENHANCED
|
|
f"What is your total budget for developing this integrated {project_type} system with {len(detailed_requirements)} detailed requirements?",
|
|
f"What is your preferred timeline for implementing all detailed requirements with their complex logic rules?",
|
|
f"Do you prefer cloud-based or on-premise deployment for the complete integrated system?",
|
|
f"What existing systems need to integrate with detailed requirements in your new {project_type} platform?",
|
|
|
|
# Security and Compliance - ENHANCED
|
|
f"What authentication method do you prefer for users across all detailed requirements?",
|
|
f"Do you have specific security requirements for data shared between detailed requirements?",
|
|
f"What level of data backup and recovery do you need for integrated workflows?",
|
|
f"Are there any compliance requirements (GDPR, HIPAA, SOX) that affect multiple detailed requirements?",
|
|
|
|
# Business Operations - ENHANCED
|
|
f"How do you measure success for your integrated {project_type} system across all detailed requirements?",
|
|
f"What reporting capabilities do you need that combine data from multiple detailed requirements?",
|
|
f"Do you need mobile access for all detailed requirements in your {project_type} system?",
|
|
f"What level of customization do you need for different user roles across detailed requirements?"
|
|
]
|
|
|
|
# Add specific questions for tagged logic rules - ENHANCED
|
|
for feature_name, req_name, logic_rule in all_logic_rules[:12]: # Process more rules
|
|
if 'status' in logic_rule.lower() and 'workflow' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What status workflow systems do you need for {req_name} in {feature_name}?")
|
|
elif 'tax' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What tax calculation requirements do you have for {req_name}?")
|
|
elif 'currency' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What currency support do you need for {req_name} and do you need real-time exchange rates?")
|
|
elif 'approval' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What approval workflows and thresholds do you need for {req_name}?")
|
|
elif 'validation' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What data validation rules are required for {req_name}?")
|
|
elif 'notification' in logic_rule.lower():
|
|
comprehensive_questions.append(f"How should users be notified for {req_name} events?")
|
|
elif 'integration' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What third-party integrations do you need for {req_name}?")
|
|
elif 'numbering' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What numbering or identification systems do you need for {req_name}?")
|
|
elif 'payment' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What payment processing capabilities do you need for {req_name}?")
|
|
elif 'audit' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What audit logging requirements do you have for {req_name}?")
|
|
elif 'document' in logic_rule.lower():
|
|
comprehensive_questions.append(f"What document management capabilities do you need for {req_name}?")
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
unique_questions = []
|
|
for question in comprehensive_questions:
|
|
if question.lower() not in seen:
|
|
seen.add(question.lower())
|
|
unique_questions.append(question)
|
|
|
|
return unique_questions
|
|
|
|
# LEGACY FUNCTION - PRESERVED for backward compatibility
|
|
def generate_comprehensive_fallback_questions(all_features: list, project_type: str):
|
|
"""LEGACY: Generate comprehensive fallback questions when Claude is not available"""
|
|
# Call the enhanced version for backward compatibility
|
|
return generate_enhanced_comprehensive_fallback_questions(all_features, project_type)
|
|
|
|
def extract_project_name(data: Dict[str, Any]) -> str:
|
|
"""Extract project name from various possible locations"""
|
|
|
|
# Try different possible locations
|
|
possible_locations = [
|
|
data.get('project_name'),
|
|
data.get('projectName'),
|
|
data.get('name'),
|
|
data.get('title'),
|
|
]
|
|
|
|
# Check in nested structures
|
|
if isinstance(data.get('body'), dict):
|
|
possible_locations.extend([
|
|
data['body'].get('project_name'),
|
|
data['body'].get('projectName'),
|
|
data['body'].get('name'),
|
|
])
|
|
|
|
if isinstance(data.get('requirements'), dict):
|
|
possible_locations.extend([
|
|
data['requirements'].get('project_name'),
|
|
data['requirements'].get('name'),
|
|
])
|
|
|
|
# Return the first non-empty value found
|
|
for location in possible_locations:
|
|
if location and isinstance(location, str) and location.strip():
|
|
return location.strip()
|
|
|
|
return "Unknown Project"
|
|
|
|
def extract_description(data: Dict[str, Any]) -> str:
|
|
"""Extract description from various possible locations"""
|
|
|
|
possible_locations = [
|
|
data.get('description'),
|
|
data.get('desc'),
|
|
data.get('project_description'),
|
|
]
|
|
|
|
# Check in nested structures
|
|
if isinstance(data.get('body'), dict):
|
|
possible_locations.extend([
|
|
data['body'].get('description'),
|
|
data['body'].get('desc'),
|
|
])
|
|
|
|
# Return the first non-empty value found
|
|
for location in possible_locations:
|
|
if location and isinstance(location, str) and location.strip():
|
|
return location.strip()
|
|
|
|
return ""
|
|
|
|
def extract_all_data(data: Dict[str, Any]) -> tuple[list, dict, dict]:
|
|
"""Extract ALL features, scale info, and complete requirements from ANY structure"""
|
|
|
|
all_features = []
|
|
scale_info = {}
|
|
complete_requirements = {}
|
|
|
|
# Recursive function to find all boolean features and scale info
|
|
def extract_from_object(obj: Any, path: str = ""):
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
current_path = f"{path}.{key}" if path else key
|
|
|
|
# Extract boolean features
|
|
if value is True:
|
|
all_features.append(key)
|
|
complete_requirements[key] = value
|
|
|
|
# Extract scale information
|
|
elif key in ['team_size', 'timeline', 'budget', 'expected_users', 'industry', 'scalability',
|
|
'concurrent_users', 'data_volume', 'performance_requirements', 'compliance_requirements']:
|
|
scale_info[key] = value
|
|
complete_requirements[key] = value
|
|
|
|
# Extract other non-boolean values that might be features
|
|
elif isinstance(value, str) and value.strip():
|
|
complete_requirements[key] = value
|
|
|
|
# Extract numeric values
|
|
elif isinstance(value, (int, float)) and not isinstance(value, bool):
|
|
complete_requirements[key] = value
|
|
|
|
# Recurse into nested objects
|
|
elif isinstance(value, (dict, list)):
|
|
extract_from_object(value, current_path)
|
|
|
|
elif isinstance(obj, list):
|
|
for i, item in enumerate(obj):
|
|
if isinstance(item, (dict, list)):
|
|
extract_from_object(item, f"{path}[{i}]" if path else f"[{i}]")
|
|
|
|
# Extract from the entire data structure
|
|
extract_from_object(data)
|
|
|
|
# Also try to extract from common nested locations
|
|
nested_locations = [
|
|
data.get('body'),
|
|
data.get('requirements'),
|
|
data.get('params'),
|
|
data.get('query'),
|
|
data.get('data')
|
|
]
|
|
|
|
for nested_data in nested_locations:
|
|
if isinstance(nested_data, dict):
|
|
extract_from_object(nested_data)
|
|
|
|
# Remove duplicates
|
|
all_features = list(set(all_features))
|
|
|
|
logger.info(f"Extracted features: {all_features}")
|
|
logger.info(f"Extracted scale info: {scale_info}")
|
|
|
|
return all_features, scale_info, complete_requirements
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
logger.info("🚀 ENHANCED FLEXIBLE REQUIREMENTS PROCESSOR - Accepts Any Body Structure")
|
|
logger.info("✅ NO strict validation, NO required fields")
|
|
logger.info("✅ Accepts any JSON structure from n8n")
|
|
logger.info("✅ Extracts features from anywhere in the data")
|
|
logger.info("✅ Generates business questions with Claude AI")
|
|
logger.info("✅ ENHANCED: Comprehensive business questions for integrated systems")
|
|
logger.info("✅ NEW: Tagged rules support for detailed requirements")
|
|
logger.info("✅ NEW: Enhanced fallback questions with detailed requirement integration")
|
|
|
|
uvicorn.run("main:app", host="0.0.0.0", port=5678, log_level="info") |