codenuk_backend_mine/services/tech-stack-selector/src/main.py

944 lines
43 KiB
Python

# import os
# import sys
# import json
# from datetime import datetime
# from typing import Dict, Any, Optional, List
# from pydantic import BaseModel
# from fastapi import FastAPI, HTTPException, Request
# from fastapi.middleware.cors import CORSMiddleware
# from loguru import logger
# # AI integration
# try:
# import anthropic
# CLAUDE_AVAILABLE = True
# except ImportError:
# CLAUDE_AVAILABLE = False
# # Configure logging
# logger.remove()
# logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}")
# # API Key
# CLAUDE_API_KEY = "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"
# if not os.getenv("CLAUDE_API_KEY") and CLAUDE_API_KEY:
# os.environ["CLAUDE_API_KEY"] = CLAUDE_API_KEY
# # ================================================================================================
# # ENHANCED TECH STACK SELECTOR - WITH FUNCTIONAL REQUIREMENTS DISPLAY
# # ================================================================================================
# class EnhancedTechStackSelector:
# """Enhanced selector that handles business context + functional requirements"""
# def __init__(self):
# self.claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY) if CLAUDE_AVAILABLE else None
# logger.info("Enhanced Tech Stack Selector initialized")
# # ================================================================================================
# # FASTAPI APPLICATION
# # ================================================================================================
# app = FastAPI(
# title="Enhanced Tech Stack Selector",
# description="Enhanced tech stack recommendations with functional requirements display",
# version="11.0.0"
# )
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# # Initialize enhanced selector
# enhanced_selector = EnhancedTechStackSelector()
# @app.get("/health")
# async def health_check():
# """Health check"""
# return {
# "status": "healthy",
# "service": "enhanced-tech-stack-selector",
# "version": "11.0.0",
# "approach": "functional_requirements_aware_recommendations"
# }
# @app.post("/api/v1/select")
# async def select_enhanced_tech_stack(request: Request):
# """ENHANCED VERSION - Shows functional requirements + tech recommendations for architecture-designer"""
# try:
# request_data = await request.json()
# # Log exactly what we receive
# logger.info("=== RECEIVED ENHANCED DATA START ===")
# logger.info(json.dumps(request_data, indent=2, default=str))
# logger.info("=== RECEIVED ENHANCED DATA END ===")
# # Extract enhanced data components
# extracted_data = extract_enhanced_data(request_data)
# if not extracted_data["features"] and not extracted_data["feature_name"]:
# logger.error("❌ NO FEATURES OR FEATURE DATA FOUND")
# return {
# "error": "No features or feature data found in request",
# "received_data_keys": list(request_data.keys()) if isinstance(request_data, dict) else "not_dict",
# "extraction_attempted": "enhanced_data_extraction"
# }
# # Build comprehensive context for Claude
# context = build_comprehensive_context(extracted_data)
# # Generate enhanced tech stack recommendations
# recommendations = await generate_enhanced_recommendations(context)
# # NEW: Build complete response with functional requirements for architecture-designer
# complete_response = {
# "success": True,
# "enhanced_analysis": True,
# # PROJECT CONTEXT - For Web Dashboard Display
# "project_context": {
# "project_name": extracted_data["project_name"],
# "project_type": extracted_data["project_type"],
# "features_analyzed": len(extracted_data["features"]),
# "business_questions_answered": len(extracted_data["business_answers"]),
# "complexity": extracted_data["complexity"]
# },
# # FUNCTIONAL REQUIREMENTS - For Web Dashboard Display & Architecture Designer
# "functional_requirements": {
# "feature_name": extracted_data["feature_name"],
# "description": extracted_data["description"],
# "technical_requirements": extracted_data["requirements"],
# "business_logic_rules": extracted_data["logic_rules"],
# "complexity_level": extracted_data["complexity"],
# "all_features": extracted_data["features"],
# "business_context": {
# "questions": extracted_data["business_questions"],
# "answers": extracted_data["business_answers"]
# }
# },
# # TECHNOLOGY RECOMMENDATIONS - Claude Generated
# "claude_recommendations": recommendations,
# # COMPLETE DATA FOR ARCHITECTURE DESIGNER
# "architecture_designer_input": {
# "project_data": {
# "project_name": extracted_data["project_name"],
# "project_type": extracted_data["project_type"],
# "complexity": extracted_data["complexity"]
# },
# "functional_specifications": {
# "primary_feature": {
# "name": extracted_data["feature_name"],
# "description": extracted_data["description"],
# "requirements": extracted_data["requirements"],
# "logic_rules": extracted_data["logic_rules"]
# },
# "all_features": extracted_data["features"],
# "business_context": extracted_data["business_answers"]
# },
# "technology_stack": recommendations,
# "business_requirements": context["business_context"]
# },
# "analysis_timestamp": datetime.utcnow().isoformat(),
# "ready_for_architecture_design": True
# }
# logger.info(f"✅ Enhanced tech stack analysis completed with functional requirements")
# logger.info(f" Feature: {extracted_data['feature_name']}")
# logger.info(f" Requirements: {len(extracted_data['requirements'])}")
# logger.info(f" Logic Rules: {len(extracted_data['logic_rules'])}")
# logger.info(f" Business Answers: {len(extracted_data['business_answers'])}")
# return complete_response
# except Exception as e:
# logger.error(f"💥 ERROR in enhanced tech stack selection: {e}")
# return {
# "error": str(e),
# "debug": "Check service logs for detailed error information"
# }
# def extract_enhanced_data(request_data: Dict) -> Dict:
# """Extract enhanced data from web dashboard request"""
# extracted = {
# "project_name": "Unknown Project",
# "project_type": "unknown",
# "feature_name": "",
# "description": "",
# "requirements": [],
# "complexity": "medium",
# "logic_rules": [],
# "business_questions": [],
# "business_answers": [],
# "features": [],
# "all_features": []
# }
# logger.info("🔍 Extracting enhanced data with functional requirements...")
# # Path 1: Direct enhanced data format from web dashboard
# if isinstance(request_data, dict):
# # Extract main feature data
# extracted["feature_name"] = request_data.get("featureName", "")
# extracted["description"] = request_data.get("description", "")
# extracted["requirements"] = request_data.get("requirements", [])
# extracted["complexity"] = request_data.get("complexity", "medium")
# extracted["logic_rules"] = request_data.get("logicRules", [])
# extracted["business_questions"] = request_data.get("businessQuestions", [])
# extracted["business_answers"] = request_data.get("businessAnswers", [])
# extracted["project_name"] = request_data.get("projectName", "Unknown Project")
# extracted["project_type"] = request_data.get("projectType", "unknown")
# extracted["all_features"] = request_data.get("allFeatures", [])
# # If we have business answers in object format, convert to list
# if isinstance(extracted["business_answers"], dict):
# ba_list = []
# for key, value in extracted["business_answers"].items():
# if isinstance(value, str) and value.strip():
# question_idx = int(key) if key.isdigit() else 0
# if question_idx < len(extracted["business_questions"]):
# ba_list.append({
# "question": extracted["business_questions"][question_idx],
# "answer": value.strip()
# })
# extracted["business_answers"] = ba_list
# # Extract features list
# if extracted["feature_name"]:
# extracted["features"] = [extracted["feature_name"]]
# # Add all features if available
# if extracted["all_features"]:
# feature_names = []
# for feature in extracted["all_features"]:
# if isinstance(feature, dict):
# feature_names.append(feature.get("name", feature.get("featureName", "")))
# else:
# feature_names.append(str(feature))
# extracted["features"].extend([f for f in feature_names if f])
# logger.info(f"✅ Extracted enhanced data with functional requirements:")
# logger.info(f" Project: {extracted['project_name']} ({extracted['project_type']})")
# logger.info(f" Main feature: {extracted['feature_name']}")
# logger.info(f" Requirements: {len(extracted['requirements'])}")
# logger.info(f" Logic Rules: {len(extracted['logic_rules'])}")
# logger.info(f" Complexity: {extracted['complexity']}")
# logger.info(f" Business answers: {len(extracted['business_answers'])}")
# logger.info(f" Total features: {len(extracted['features'])}")
# return extracted
# def build_comprehensive_context(extracted_data: Dict) -> Dict:
# """Build comprehensive context for Claude analysis"""
# # Combine all requirements and business insights
# functional_requirements = []
# if extracted_data["feature_name"]:
# functional_requirements.append(f"Core Feature: {extracted_data['feature_name']}")
# if extracted_data["requirements"]:
# functional_requirements.extend([f"• {req}" for req in extracted_data["requirements"]])
# if extracted_data["features"]:
# for feature in extracted_data["features"]:
# if feature and feature != extracted_data["feature_name"]:
# functional_requirements.append(f"• {feature}")
# # Business context from answers
# business_context = {}
# if extracted_data["business_answers"]:
# for answer_data in extracted_data["business_answers"]:
# if isinstance(answer_data, dict):
# question = answer_data.get("question", "")
# answer = answer_data.get("answer", "")
# if question and answer:
# # Categorize business answers
# if any(keyword in question.lower() for keyword in ["user", "scale", "concurrent"]):
# business_context["scale_requirements"] = business_context.get("scale_requirements", [])
# business_context["scale_requirements"].append(f"{question}: {answer}")
# elif any(keyword in question.lower() for keyword in ["compliance", "security", "encryption"]):
# business_context["security_requirements"] = business_context.get("security_requirements", [])
# business_context["security_requirements"].append(f"{question}: {answer}")
# elif any(keyword in question.lower() for keyword in ["budget", "timeline", "timeline"]):
# business_context["project_constraints"] = business_context.get("project_constraints", [])
# business_context["project_constraints"].append(f"{question}: {answer}")
# else:
# business_context["other_requirements"] = business_context.get("other_requirements", [])
# business_context["other_requirements"].append(f"{question}: {answer}")
# return {
# "project_name": extracted_data["project_name"],
# "project_type": extracted_data["project_type"],
# "complexity": extracted_data["complexity"],
# "functional_requirements": functional_requirements,
# "business_context": business_context,
# "logic_rules": extracted_data["logic_rules"]
# }
# async def generate_enhanced_recommendations(context: Dict) -> Dict:
# """Generate enhanced tech stack recommendations using Claude with business context"""
# if not enhanced_selector.claude_client:
# logger.error("❌ Claude client not available")
# return {
# "error": "Claude AI not available",
# "fallback": "Basic recommendations would go here"
# }
# # Build comprehensive prompt with business context
# functional_reqs_text = "\n".join(context["functional_requirements"])
# business_context_text = ""
# for category, requirements in context["business_context"].items():
# business_context_text += f"\n{category.replace('_', ' ').title()}:\n"
# business_context_text += "\n".join([f" - {req}" for req in requirements]) + "\n"
# logic_rules_text = "\n".join([f" - {rule}" for rule in context["logic_rules"]])
# prompt = f"""You are a senior software architect. Analyze this comprehensive project context and recommend the optimal technology stack.
# PROJECT CONTEXT:
# - Name: {context["project_name"]}
# - Type: {context["project_type"]}
# - Complexity: {context["complexity"]}
# FUNCTIONAL REQUIREMENTS:
# {functional_reqs_text}
# BUSINESS CONTEXT & CONSTRAINTS:
# {business_context_text}
# BUSINESS LOGIC RULES:
# {logic_rules_text}
# Based on this comprehensive analysis, provide detailed technology recommendations as a JSON object:
# {{
# "technology_recommendations": {{
# "frontend": {{
# "framework": "recommended framework",
# "libraries": ["lib1", "lib2", "lib3"],
# "reasoning": "detailed reasoning based on requirements and business context"
# }},
# "backend": {{
# "framework": "recommended backend framework",
# "language": "programming language",
# "libraries": ["lib1", "lib2", "lib3"],
# "reasoning": "detailed reasoning based on complexity and business needs"
# }},
# "database": {{
# "primary": "primary database choice",
# "secondary": ["cache", "search", "analytics"],
# "reasoning": "database choice based on data requirements and scale"
# }},
# "infrastructure": {{
# "cloud_provider": "recommended cloud provider",
# "orchestration": "container/orchestration choice",
# "services": ["service1", "service2", "service3"],
# "reasoning": "infrastructure reasoning based on scale and budget"
# }},
# "security": {{
# "authentication": "auth strategy",
# "authorization": "authorization approach",
# "data_protection": "data protection measures",
# "compliance": "compliance approach",
# "reasoning": "security reasoning based on business context"
# }},
# "third_party_services": {{
# "communication": "communication services",
# "monitoring": "monitoring solution",
# "payment": "payment processing",
# "other_services": ["service1", "service2"],
# "reasoning": "third-party service reasoning"
# }}
# }},
# "implementation_strategy": {{
# "architecture_pattern": "recommended architecture pattern",
# "development_phases": ["phase1", "phase2", "phase3"],
# "deployment_strategy": "deployment approach",
# "scalability_approach": "scalability strategy",
# "timeline_estimate": "development timeline estimate"
# }},
# "business_alignment": {{
# "addresses_scale_requirements": "how recommendations address scale needs",
# "addresses_security_requirements": "how recommendations address security needs",
# "addresses_budget_constraints": "how recommendations fit budget",
# "addresses_timeline_constraints": "how recommendations fit timeline",
# "compliance_considerations": "compliance alignment"
# }}
# }}
# CRITICAL: Return ONLY valid JSON, no additional text. Base all recommendations on the provided functional requirements and business context."""
# try:
# logger.info("📞 Calling Claude for enhanced recommendations with functional requirements...")
# message = enhanced_selector.claude_client.messages.create(
# model="claude-3-5-sonnet-20241022",
# max_tokens=8000,
# temperature=0.1,
# messages=[{"role": "user", "content": prompt}]
# )
# claude_response = message.content[0].text.strip()
# logger.info("✅ Received Claude response for enhanced recommendations")
# # Parse JSON response
# try:
# recommendations = json.loads(claude_response)
# logger.info("✅ Successfully parsed enhanced recommendations JSON")
# return recommendations
# except json.JSONDecodeError as e:
# logger.error(f"❌ JSON parse error: {e}")
# return {
# "parse_error": str(e),
# "raw_response": claude_response[:1000] + "..." if len(claude_response) > 1000 else claude_response
# }
# except Exception as e:
# logger.error(f"❌ Claude API error: {e}")
# return {
# "error": str(e),
# "fallback": "Enhanced recommendations generation failed"
# }
# if __name__ == "__main__":
# import uvicorn
# logger.info("="*60)
# logger.info("🚀 ENHANCED TECH STACK SELECTOR v11.0 - FUNCTIONAL REQUIREMENTS AWARE")
# logger.info("="*60)
# logger.info("✅ Enhanced data extraction from web dashboard")
# logger.info("✅ Functional requirements display")
# logger.info("✅ Business context analysis")
# logger.info("✅ Complete data for architecture-designer")
# logger.info("✅ Comprehensive Claude recommendations")
# logger.info("="*60)
# uvicorn.run("main:app", host="0.0.0.0", port=8002, log_level="info")
# ENHANCED TECH STACK SELECTOR - SHOWS FUNCTIONAL REQUIREMENTS + TECH RECOMMENDATIONS
# Now includes requirement-processor data in output for architecture-designer
# ENHANCED: Added tagged rules support while preserving ALL working functionality
import os
import sys
import json
from datetime import datetime
from typing import Dict, Any, Optional, List
from pydantic import BaseModel
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
# AI integration
try:
import anthropic
CLAUDE_AVAILABLE = True
except ImportError:
CLAUDE_AVAILABLE = False
# Configure logging
logger.remove()
logger.add(sys.stdout, level="INFO", format="{time} | {level} | {message}")
# API Key
CLAUDE_API_KEY = "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"
if not os.getenv("CLAUDE_API_KEY") and CLAUDE_API_KEY:
os.environ["CLAUDE_API_KEY"] = CLAUDE_API_KEY
# ================================================================================================
# ENHANCED TECH STACK SELECTOR - WITH FUNCTIONAL REQUIREMENTS DISPLAY
# ================================================================================================
class EnhancedTechStackSelector:
"""Enhanced selector that handles business context + functional requirements"""
def __init__(self):
self.claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY) if CLAUDE_AVAILABLE else None
logger.info("Enhanced Tech Stack Selector initialized")
# ================================================================================================
# FASTAPI APPLICATION
# ================================================================================================
app = FastAPI(
title="Enhanced Tech Stack Selector",
description="Enhanced tech stack recommendations with functional requirements display",
version="11.1.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize enhanced selector
enhanced_selector = EnhancedTechStackSelector()
@app.get("/health")
async def health_check():
"""Health check"""
return {
"status": "healthy",
"service": "enhanced-tech-stack-selector",
"version": "11.1.0",
"approach": "functional_requirements_aware_recommendations",
"new_features": ["tagged_rules_support"]
}
@app.post("/api/v1/select")
async def select_enhanced_tech_stack(request: Request):
"""ENHANCED VERSION - Shows functional requirements + tech recommendations for architecture-designer"""
try:
request_data = await request.json()
# Log exactly what we receive
logger.info("=== RECEIVED ENHANCED DATA START ===")
logger.info(json.dumps(request_data, indent=2, default=str))
logger.info("=== RECEIVED ENHANCED DATA END ===")
# Extract enhanced data components - ENHANCED with tagged rules
extracted_data = extract_enhanced_data(request_data)
if not extracted_data["features"] and not extracted_data["feature_name"]:
logger.error("❌ NO FEATURES OR FEATURE DATA FOUND")
return {
"error": "No features or feature data found in request",
"received_data_keys": list(request_data.keys()) if isinstance(request_data, dict) else "not_dict",
"extraction_attempted": "enhanced_data_extraction"
}
# Build comprehensive context for Claude - ENHANCED with tagged rules
context = build_comprehensive_context(extracted_data)
# Generate enhanced tech stack recommendations - SAME working logic
recommendations = await generate_enhanced_recommendations(context)
# NEW: Build complete response with functional requirements for architecture-designer - ENHANCED
complete_response = {
"success": True,
"enhanced_analysis": True,
# PROJECT CONTEXT - For Web Dashboard Display
"project_context": {
"project_name": extracted_data["project_name"],
"project_type": extracted_data["project_type"],
"features_analyzed": len(extracted_data["features"]),
"business_questions_answered": len(extracted_data["business_answers"]),
"complexity": extracted_data["complexity"],
# NEW: Tagged rules info
"detailed_requirements_count": len(extracted_data.get("detailed_requirements", [])),
"total_tagged_rules": extracted_data.get("total_tagged_rules", 0)
},
# FUNCTIONAL REQUIREMENTS - For Web Dashboard Display & Architecture Designer - ENHANCED
"functional_requirements": {
"feature_name": extracted_data["feature_name"],
"description": extracted_data["description"],
"technical_requirements": extracted_data["requirements"],
"business_logic_rules": extracted_data["logic_rules"],
"complexity_level": extracted_data["complexity"],
"all_features": extracted_data["features"],
# NEW: Tagged rules data
"detailed_requirements": extracted_data.get("detailed_requirements", []),
"tagged_rules": extracted_data.get("tagged_rules", []),
"business_context": {
"questions": extracted_data["business_questions"],
"answers": extracted_data["business_answers"]
}
},
# TECHNOLOGY RECOMMENDATIONS - Claude Generated - SAME working logic
"claude_recommendations": recommendations,
# COMPLETE DATA FOR ARCHITECTURE DESIGNER - ENHANCED
"architecture_designer_input": {
"project_data": {
"project_name": extracted_data["project_name"],
"project_type": extracted_data["project_type"],
"complexity": extracted_data["complexity"]
},
"functional_specifications": {
"primary_feature": {
"name": extracted_data["feature_name"],
"description": extracted_data["description"],
"requirements": extracted_data["requirements"],
"logic_rules": extracted_data["logic_rules"]
},
"all_features": extracted_data["features"],
# NEW: Tagged rules for architecture designer
"detailed_requirements": extracted_data.get("detailed_requirements", []),
"tagged_rules": extracted_data.get("tagged_rules", []),
"business_context": extracted_data["business_answers"]
},
"technology_stack": recommendations,
"business_requirements": context["business_context"]
},
"analysis_timestamp": datetime.utcnow().isoformat(),
"ready_for_architecture_design": True
}
logger.info(f"✅ Enhanced tech stack analysis completed with functional requirements")
logger.info(f" Feature: {extracted_data['feature_name']}")
logger.info(f" Requirements: {len(extracted_data['requirements'])}")
logger.info(f" Logic Rules: {len(extracted_data['logic_rules'])}")
logger.info(f" Business Answers: {len(extracted_data['business_answers'])}")
# NEW: Tagged rules logging
logger.info(f" Detailed Requirements: {len(extracted_data.get('detailed_requirements', []))}")
logger.info(f" Tagged Rules: {extracted_data.get('total_tagged_rules', 0)}")
return complete_response
except Exception as e:
logger.error(f"💥 ERROR in enhanced tech stack selection: {e}")
return {
"error": str(e),
"debug": "Check service logs for detailed error information"
}
def extract_enhanced_data(request_data: Dict) -> Dict:
"""Extract enhanced data from web dashboard request - ENHANCED with tagged rules support"""
extracted = {
"project_name": "Unknown Project",
"project_type": "unknown",
"feature_name": "",
"description": "",
"requirements": [],
"complexity": "medium",
"logic_rules": [],
"business_questions": [],
"business_answers": [],
"features": [],
"all_features": [],
# NEW: Tagged rules support
"detailed_requirements": [],
"tagged_rules": [],
"total_tagged_rules": 0
}
logger.info("🔍 Extracting enhanced data with functional requirements and tagged rules...")
# Path 1: Direct enhanced data format from web dashboard - SAME working logic
if isinstance(request_data, dict):
# Extract main feature data - SAME as before
extracted["feature_name"] = request_data.get("featureName", "")
extracted["description"] = request_data.get("description", "")
extracted["requirements"] = request_data.get("requirements", [])
extracted["complexity"] = request_data.get("complexity", "medium")
extracted["logic_rules"] = request_data.get("logicRules", [])
extracted["business_questions"] = request_data.get("businessQuestions", [])
extracted["business_answers"] = request_data.get("businessAnswers", [])
extracted["project_name"] = request_data.get("projectName", "Unknown Project")
extracted["project_type"] = request_data.get("projectType", "unknown")
extracted["all_features"] = request_data.get("allFeatures", [])
# If we have business answers in object format, convert to list - SAME as before
if isinstance(extracted["business_answers"], dict):
ba_list = []
for key, value in extracted["business_answers"].items():
if isinstance(value, str) and value.strip():
question_idx = int(key) if key.isdigit() else 0
if question_idx < len(extracted["business_questions"]):
ba_list.append({
"question": extracted["business_questions"][question_idx],
"answer": value.strip()
})
extracted["business_answers"] = ba_list
# Extract features list - SAME as before
if extracted["feature_name"]:
extracted["features"] = [extracted["feature_name"]]
# Add all features if available - ENHANCED with tagged rules extraction
if extracted["all_features"]:
feature_names = []
for feature in extracted["all_features"]:
if isinstance(feature, dict):
feature_name = feature.get("name", feature.get("featureName", ""))
feature_names.append(feature_name)
# NEW: Extract tagged rules from requirementAnalysis
requirement_analysis = feature.get("requirementAnalysis", [])
if requirement_analysis:
logger.info(f"📋 Found tagged rules for feature: {feature_name}")
for req_analysis in requirement_analysis:
requirement_name = req_analysis.get("requirement", "Unknown Requirement")
requirement_rules = req_analysis.get("logicRules", [])
# Create detailed requirement entry
detailed_req = {
"feature_name": feature_name,
"requirement_name": requirement_name,
"description": feature.get("description", ""),
"complexity": req_analysis.get("complexity", "medium"),
"rules": requirement_rules
}
extracted["detailed_requirements"].append(detailed_req)
# Add tagged rules
for rule_idx, rule in enumerate(requirement_rules):
if rule and rule.strip():
tagged_rule = {
"rule_id": f"R{rule_idx + 1}",
"rule_text": rule.strip(),
"feature_name": feature_name,
"requirement_name": requirement_name
}
extracted["tagged_rules"].append(tagged_rule)
extracted["total_tagged_rules"] += 1
# Fallback: Add regular logic rules to main logic_rules if no tagged rules
elif feature.get("logicRules"):
regular_rules = feature.get("logicRules", [])
extracted["logic_rules"].extend(regular_rules)
else:
feature_names.append(str(feature))
extracted["features"].extend([f for f in feature_names if f])
logger.info(f"✅ Extracted enhanced data with functional requirements and tagged rules:")
logger.info(f" Project: {extracted['project_name']} ({extracted['project_type']})")
logger.info(f" Main feature: {extracted['feature_name']}")
logger.info(f" Requirements: {len(extracted['requirements'])}")
logger.info(f" Logic Rules: {len(extracted['logic_rules'])}")
logger.info(f" Complexity: {extracted['complexity']}")
logger.info(f" Business answers: {len(extracted['business_answers'])}")
logger.info(f" Total features: {len(extracted['features'])}")
# NEW: Tagged rules logging
logger.info(f" Detailed Requirements: {len(extracted['detailed_requirements'])}")
logger.info(f" Tagged Rules: {extracted['total_tagged_rules']}")
return extracted
def build_comprehensive_context(extracted_data: Dict) -> Dict:
"""Build comprehensive context for Claude analysis - ENHANCED with tagged rules"""
# Combine all requirements and business insights - SAME working logic
functional_requirements = []
if extracted_data["feature_name"]:
functional_requirements.append(f"Core Feature: {extracted_data['feature_name']}")
if extracted_data["requirements"]:
functional_requirements.extend([f"{req}" for req in extracted_data["requirements"]])
if extracted_data["features"]:
for feature in extracted_data["features"]:
if feature and feature != extracted_data["feature_name"]:
functional_requirements.append(f"{feature}")
# NEW: Add detailed requirements with tagged rules to functional requirements
detailed_requirements_text = []
for detailed_req in extracted_data.get("detailed_requirements", []):
req_text = f"📋 {detailed_req['feature_name']}{detailed_req['requirement_name']}:"
for rule in detailed_req["rules"]:
req_text += f"\n - {rule}"
detailed_requirements_text.append(req_text)
if detailed_requirements_text:
functional_requirements.extend(detailed_requirements_text)
# Business context from answers - SAME working logic
business_context = {}
if extracted_data["business_answers"]:
for answer_data in extracted_data["business_answers"]:
if isinstance(answer_data, dict):
question = answer_data.get("question", "")
answer = answer_data.get("answer", "")
if question and answer:
# Categorize business answers - SAME logic
if any(keyword in question.lower() for keyword in ["user", "scale", "concurrent"]):
business_context["scale_requirements"] = business_context.get("scale_requirements", [])
business_context["scale_requirements"].append(f"{question}: {answer}")
elif any(keyword in question.lower() for keyword in ["compliance", "security", "encryption"]):
business_context["security_requirements"] = business_context.get("security_requirements", [])
business_context["security_requirements"].append(f"{question}: {answer}")
elif any(keyword in question.lower() for keyword in ["budget", "timeline", "timeline"]):
business_context["project_constraints"] = business_context.get("project_constraints", [])
business_context["project_constraints"].append(f"{question}: {answer}")
else:
business_context["other_requirements"] = business_context.get("other_requirements", [])
business_context["other_requirements"].append(f"{question}: {answer}")
return {
"project_name": extracted_data["project_name"],
"project_type": extracted_data["project_type"],
"complexity": extracted_data["complexity"],
"functional_requirements": functional_requirements,
"business_context": business_context,
"logic_rules": extracted_data["logic_rules"],
# NEW: Include tagged rules data
"detailed_requirements": extracted_data.get("detailed_requirements", []),
"tagged_rules": extracted_data.get("tagged_rules", [])
}
async def generate_enhanced_recommendations(context: Dict) -> Dict:
"""Generate enhanced tech stack recommendations using Claude with business context - SAME working logic + tagged rules"""
if not enhanced_selector.claude_client:
logger.error("❌ Claude client not available")
return {
"error": "Claude AI not available",
"fallback": "Basic recommendations would go here"
}
# Build comprehensive prompt with business context - SAME working logic
functional_reqs_text = "\n".join(context["functional_requirements"])
business_context_text = ""
for category, requirements in context["business_context"].items():
business_context_text += f"\n{category.replace('_', ' ').title()}:\n"
business_context_text += "\n".join([f" - {req}" for req in requirements]) + "\n"
logic_rules_text = "\n".join([f" - {rule}" for rule in context["logic_rules"]])
# NEW: Add tagged rules info to prompt (only if tagged rules exist)
tagged_rules_text = ""
if context.get("tagged_rules"):
tagged_rules_text = f"\n\nDETAILED TAGGED RULES:\n"
for tagged_rule in context["tagged_rules"][:10]: # Limit to first 10 for prompt size
tagged_rules_text += f" {tagged_rule['rule_id']}: {tagged_rule['rule_text']} (Feature: {tagged_rule['feature_name']})\n"
if len(context["tagged_rules"]) > 10:
tagged_rules_text += f" ... and {len(context['tagged_rules']) - 10} more tagged rules\n"
# SAME working prompt structure with minimal enhancement
prompt = f"""You are a senior software architect. Analyze this comprehensive project context and recommend the optimal technology stack.
PROJECT CONTEXT:
- Name: {context["project_name"]}
- Type: {context["project_type"]}
- Complexity: {context["complexity"]}
FUNCTIONAL REQUIREMENTS:
{functional_reqs_text}
BUSINESS CONTEXT & CONSTRAINTS:
{business_context_text}
BUSINESS LOGIC RULES:
{logic_rules_text}
{tagged_rules_text}
Based on this comprehensive analysis, provide detailed technology recommendations as a JSON object:
{{
"technology_recommendations": {{
"frontend": {{
"framework": "recommended framework",
"libraries": ["lib1", "lib2", "lib3"],
"reasoning": "detailed reasoning based on requirements and business context"
}},
"backend": {{
"framework": "recommended backend framework",
"language": "programming language",
"libraries": ["lib1", "lib2", "lib3"],
"reasoning": "detailed reasoning based on complexity and business needs"
}},
"database": {{
"primary": "primary database choice",
"secondary": ["cache", "search", "analytics"],
"reasoning": "database choice based on data requirements and scale"
}},
"infrastructure": {{
"cloud_provider": "recommended cloud provider",
"orchestration": "container/orchestration choice",
"services": ["service1", "service2", "service3"],
"reasoning": "infrastructure reasoning based on scale and budget"
}},
"security": {{
"authentication": "auth strategy",
"authorization": "authorization approach",
"data_protection": "data protection measures",
"compliance": "compliance approach",
"reasoning": "security reasoning based on business context"
}},
"third_party_services": {{
"communication": "communication services",
"monitoring": "monitoring solution",
"payment": "payment processing",
"other_services": ["service1", "service2"],
"reasoning": "third-party service reasoning"
}}
}},
"implementation_strategy": {{
"architecture_pattern": "recommended architecture pattern",
"development_phases": ["phase1", "phase2", "phase3"],
"deployment_strategy": "deployment approach",
"scalability_approach": "scalability strategy",
"timeline_estimate": "development timeline estimate"
}},
"business_alignment": {{
"addresses_scale_requirements": "how recommendations address scale needs",
"addresses_security_requirements": "how recommendations address security needs",
"addresses_budget_constraints": "how recommendations fit budget",
"addresses_timeline_constraints": "how recommendations fit timeline",
"compliance_considerations": "compliance alignment"
}}
}}
CRITICAL: Return ONLY valid JSON, no additional text. Base all recommendations on the provided functional requirements and business context."""
try:
logger.info("📞 Calling Claude for enhanced recommendations with functional requirements and tagged rules...")
message = enhanced_selector.claude_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=8000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
claude_response = message.content[0].text.strip()
logger.info("✅ Received Claude response for enhanced recommendations")
# Parse JSON response - SAME working logic
try:
recommendations = json.loads(claude_response)
logger.info("✅ Successfully parsed enhanced recommendations JSON")
return recommendations
except json.JSONDecodeError as e:
logger.error(f"❌ JSON parse error: {e}")
return {
"parse_error": str(e),
"raw_response": claude_response[:1000] + "..." if len(claude_response) > 1000 else claude_response
}
except Exception as e:
logger.error(f"❌ Claude API error: {e}")
return {
"error": str(e),
"fallback": "Enhanced recommendations generation failed"
}
if __name__ == "__main__":
import uvicorn
logger.info("="*60)
logger.info("🚀 ENHANCED TECH STACK SELECTOR v11.1 - FUNCTIONAL REQUIREMENTS + TAGGED RULES")
logger.info("="*60)
logger.info("✅ Enhanced data extraction from web dashboard")
logger.info("✅ Functional requirements display")
logger.info("✅ Business context analysis")
logger.info("✅ NEW: Tagged rules support")
logger.info("✅ Complete data for architecture-designer")
logger.info("✅ Comprehensive Claude recommendations")
logger.info("="*60)
uvicorn.run("main:app", host="0.0.0.0", port=8002, log_level="info")