codenuk_backend_mine/services/architecture-designer/designers/database/mssql_designer_2022.py

1068 lines
47 KiB
Python

# MS SQL SERVER 2022 DATABASE DESIGNER SPECIALIST
# DYNAMIC - Processes ANY tagged rules from requirement-processor to generate complete database schemas
# Analyzes requirementAnalysis, taggedLogicRules, and business_logic_rules to create tables, relationships, and T-SQL
import json
import re
from typing import Dict, Any, List, Set, Tuple
from loguru import logger
try:
import anthropic
CLAUDE_AVAILABLE = True
except ImportError:
CLAUDE_AVAILABLE = False
class MSSQLServer2022Designer:
"""Dynamic MS SQL Server 2022 Database Designer - Processes ANY tagged rules from requirement-processor"""
def __init__(self):
self.database = "MS SQL Server 2022"
self.claude_client = None
if CLAUDE_AVAILABLE:
try:
self.claude_client = anthropic.Anthropic()
logger.info(f"{self.database} Designer initialized with Claude AI")
except Exception as e:
logger.warning(f"⚠️ Claude AI not available for {self.database}: {e}")
else:
logger.warning(f"⚠️ Claude AI not available for {self.database}")
def get_technology_name(self) -> str:
return "MS SQL Server 2022"
async def design_architecture(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Design comprehensive MS SQL Server 2022 database architecture from tagged rules"""
logger.info(f"🗄️ Designing {self.database} architecture...")
try:
# Extract functional requirements from context
functional_requirements = context.get('functional_requirements', {})
business_context = context.get('business_context', {})
tech_stack = context.get('technology_stack', {})
# Extract all tagged rules from requirement-processor structure
tagged_rules = self._extract_all_tagged_rules(functional_requirements)
if not tagged_rules:
logger.warning("⚠️ No tagged rules found, creating minimal schema")
return self._create_minimal_schema(functional_requirements)
logger.info(f"📋 Processing {len(tagged_rules)} tagged rules for MS SQL Server schema generation")
if self.claude_client:
return await self._generate_ai_database_architecture(
tagged_rules, functional_requirements, business_context, tech_stack
)
else:
return self._generate_dynamic_database_architecture(
tagged_rules, functional_requirements, business_context, tech_stack
)
except Exception as e:
logger.error(f"{self.database} architecture design failed: {e}")
return self._generate_dynamic_database_architecture(
tagged_rules, functional_requirements, business_context, tech_stack
)
def _extract_all_tagged_rules(self, functional_requirements: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract ALL tagged rules from requirement-processor output structure"""
all_rules = []
# Method 1: Extract from requirementAnalysis structure (ENHANCED for tagged rules)
all_features = functional_requirements.get('all_features', [])
for feature in all_features:
feature_name = feature.get('featureName') or feature.get('name', 'Unknown Feature')
# Extract from requirementAnalysis with tagged logicRules
requirement_analysis = feature.get('requirementAnalysis', [])
if requirement_analysis:
logger.info(f"Found requirementAnalysis for {feature_name} with {len(requirement_analysis)} requirements")
for req_analysis in requirement_analysis:
requirement_name = req_analysis.get('requirement', 'Unknown Requirement')
logic_rules = req_analysis.get('logicRules', [])
for rule in logic_rules:
all_rules.append({
"rule_text": rule,
"feature_name": feature_name,
"requirement_name": requirement_name,
"source": "requirementAnalysis",
"structure": "tagged_detailed_requirements"
})
# Method 2: Extract from taggedLogicRules (if present)
tagged_logic_rules = feature.get('taggedLogicRules', [])
if tagged_logic_rules:
logger.info(f"Found taggedLogicRules for {feature_name} with {len(tagged_logic_rules)} rules")
for tagged_rule in tagged_logic_rules:
if isinstance(tagged_rule, dict):
rule_text = tagged_rule.get('rule_text', str(tagged_rule))
requirement_name = tagged_rule.get('requirement_name', 'General')
else:
rule_text = str(tagged_rule)
requirement_name = 'General'
all_rules.append({
"rule_text": rule_text,
"feature_name": feature_name,
"requirement_name": requirement_name,
"source": "taggedLogicRules",
"structure": "tagged_rules_array"
})
# Method 3: Extract from regular logicRules (fallback)
logic_rules = feature.get('logicRules', [])
if logic_rules and not requirement_analysis and not tagged_logic_rules:
logger.info(f"Found regular logicRules for {feature_name} with {len(logic_rules)} rules")
for rule in logic_rules:
all_rules.append({
"rule_text": rule,
"feature_name": feature_name,
"requirement_name": "General",
"source": "logicRules",
"structure": "regular_logic_rules"
})
# Method 4: Extract from detailed_requirements (direct structure)
detailed_requirements = functional_requirements.get('detailed_requirements', [])
for req in detailed_requirements:
requirement_name = req.get('requirement_name', 'Unknown')
feature_name = req.get('feature_name', 'Unknown')
rules = req.get('rules', [])
for rule in rules:
all_rules.append({
"rule_text": rule,
"feature_name": feature_name,
"requirement_name": requirement_name,
"source": "detailed_requirements",
"structure": "direct_detailed_requirements"
})
# Method 5: Extract from business_logic_rules (global rules)
business_logic_rules = functional_requirements.get('business_logic_rules', [])
for rule in business_logic_rules:
all_rules.append({
"rule_text": rule,
"feature_name": functional_requirements.get('feature_name', 'System'),
"requirement_name": "Business Logic",
"source": "business_logic_rules",
"structure": "global_business_rules"
})
logger.info(f"✅ Extracted {len(all_rules)} total tagged rules from requirement-processor")
# Log rule sources for debugging
source_counts = {}
for rule in all_rules:
source = rule['source']
source_counts[source] = source_counts.get(source, 0) + 1
logger.info(f"📊 Rule sources: {source_counts}")
return all_rules
async def _generate_ai_database_architecture(
self,
tagged_rules: List[Dict[str, Any]],
functional_requirements: Dict[str, Any],
business_context: Dict[str, Any],
tech_stack: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate AI-powered MS SQL Server 2022 database architecture based on tagged rules"""
# Build comprehensive prompt with all tagged rules
rules_analysis = ""
entities_mentioned = set()
for rule in tagged_rules:
rule_text = rule['rule_text']
feature_name = rule['feature_name']
requirement_name = rule['requirement_name']
rules_analysis += f"- Feature: {feature_name} | Requirement: {requirement_name} | Rule: {rule_text}\n"
# Extract potential entities from rule text for better analysis
potential_entities = self._extract_entities_from_rule_text(rule_text)
entities_mentioned.update(potential_entities)
feature_name = functional_requirements.get('feature_name', 'Database System')
complexity = functional_requirements.get('complexity_level', 'medium')
prompt = f"""You are a senior MS SQL Server database architect with 15+ years of experience. Design a complete, production-ready SQL Server 2022 database schema based on these specific tagged business rules.
PROJECT CONTEXT:
- System: {feature_name}
- Complexity: {complexity}
- Database: MS SQL Server 2022
- Backend: ASP.NET Core Web API 8 with Entity Framework Core 8
- Frontend: Angular 18
TAGGED BUSINESS RULES TO ANALYZE:
{rules_analysis}
ENTITIES IDENTIFIED: {', '.join(sorted(entities_mentioned))}
CRITICAL REQUIREMENTS:
1. Analyze EACH tagged rule to identify entities, relationships, and constraints
2. Create complete table schemas with proper data types for SQL Server 2022
3. Generate foreign key relationships based on rule analysis
4. Include indexes for performance optimization
5. Create stored procedures for complex business logic
6. Add triggers for business rule enforcement
7. Include Entity Framework Core 8 configurations
8. Generate T-SQL DDL scripts ready for deployment
9. Ensure 100% coverage of ALL tagged business rules
Design a comprehensive MS SQL Server 2022 database with:
**DYNAMIC TABLE ANALYSIS:**
- Parse each rule to identify entities and their properties
- Determine data types based on business context (NVARCHAR, DECIMAL, DATETIME2, etc.)
- Create proper primary keys and identity columns
- Add necessary constraints (CHECK, UNIQUE, NOT NULL)
**RELATIONSHIP MAPPING:**
- Analyze rules to identify entity relationships (1:1, 1:Many, Many:Many)
- Create foreign key relationships with proper cascading rules
- Generate junction tables for many-to-many relationships
- Include referential integrity constraints
**BUSINESS LOGIC IMPLEMENTATION:**
- Create stored procedures for complex business rules
- Add triggers for data validation and business rule enforcement
- Generate functions for calculations and data transformations
- Include audit trails where business rules require tracking
**PERFORMANCE OPTIMIZATION:**
- Create clustered and non-clustered indexes based on expected queries
- Add covering indexes for complex business operations
- Include computed columns for derived data
- Optimize for Entity Framework Core query patterns
**SECURITY & COMPLIANCE:**
- Implement Row-Level Security where rules indicate access control
- Add column-level security for sensitive data
- Create database roles and permissions
- Include data masking for PII protection
Return detailed JSON with complete database schema:
{{
"database_info": {{"name": "MS SQL Server 2022", "version": "2022", "compatibility_level": "160"}},
"tables": [{{
"name": "TableName",
"purpose": "Implements rules: [specific rule texts]",
"columns": [{{
"name": "ColumnName",
"data_type": "NVARCHAR(100)",
"is_nullable": false,
"is_primary_key": false,
"is_identity": false,
"default_value": null,
"check_constraint": null,
"implements_rule": "specific rule text"
}}],
"indexes": [{{
"name": "IX_TableName_ColumnName",
"type": "NONCLUSTERED",
"columns": ["ColumnName"],
"is_unique": false,
"purpose": "Performance optimization for rule: [rule text]"
}}],
"foreign_keys": [{{
"name": "FK_TableName_ReferencedTable",
"column": "ReferencedId",
"referenced_table": "ReferencedTable",
"referenced_column": "Id",
"on_delete": "CASCADE",
"implements_rule": "relationship from rule: [rule text]"
}}],
"triggers": [{{
"name": "TR_TableName_BusinessRule",
"event": "INSERT, UPDATE",
"purpose": "Enforces rule: [specific rule text]",
"t_sql_logic": "T-SQL implementation"
}}],
"implements_rules": ["list of specific rules"]
}}],
"stored_procedures": [{{
"name": "sp_ProcedureName",
"purpose": "Implements complex rule: [rule text]",
"parameters": [{{ "name": "@param", "type": "INT", "default": null }}],
"t_sql_body": "Complete T-SQL implementation",
"implements_rules": ["specific rules"]
}}],
"functions": [{{
"name": "fn_FunctionName",
"return_type": "DECIMAL(18,2)",
"purpose": "Calculates value for rule: [rule text]",
"t_sql_body": "T-SQL function implementation"
}}],
"views": [{{
"name": "vw_ViewName",
"purpose": "Business view for rule: [rule text]",
"t_sql_definition": "SELECT statement"
}}],
"entity_framework": {{
"dbcontext_name": "SystemDbContext",
"connection_string": "Server=localhost;Database=SystemDB;Trusted_Connection=true;TrustServerCertificate=true;",
"entity_configurations": [{{
"entity": "EntityName",
"table_name": "TableName",
"key_configuration": "HasKey configuration",
"property_configurations": ["property configurations"],
"relationship_configurations": ["relationship configurations"]
}}]
}},
"security": {{
"database_users": ["api_user", "read_only_user"],
"roles": ["db_api_access", "db_read_only"],
"row_level_security": [{{
"table": "TableName",
"policy": "Security policy for rule: [rule text]"
}}]
}},
"deployment": {{
"ddl_scripts": {{
"create_tables": "Complete CREATE TABLE statements",
"create_indexes": "Complete CREATE INDEX statements",
"create_procedures": "Complete stored procedure definitions",
"create_triggers": "Complete trigger definitions"
}},
"seed_data": [{{
"table": "TableName",
"data": "INSERT statements for reference data"
}}]
}},
"rule_coverage_analysis": {{
"total_rules_analyzed": {len(tagged_rules)},
"entities_created": "list of tables",
"relationships_created": "list of foreign keys",
"business_logic_implemented": "list of procedures/triggers",
"coverage_details": [{{
"rule_text": "rule",
"implemented_as": "table/procedure/trigger/constraint",
"database_objects": ["list of objects"]
}}]
}},
"ready_for_code_generation": true,
"entity_framework_ready": true,
"t_sql_deployment_ready": true
}}
IMPORTANT: Every table, procedure, and constraint should directly trace back to specific tagged rules. Generate complete T-SQL that can be executed immediately."""
try:
message = self.claude_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=8000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
claude_response = message.content[0].text.strip()
try:
architecture = json.loads(claude_response)
logger.info(f"{self.database} AI architecture generated successfully")
# Add rule coverage analysis
architecture["tagged_rules_coverage"] = self._analyze_rule_coverage(tagged_rules, architecture)
return {
"success": True,
"architecture": architecture,
"specialist": "MS SQL Server 2022",
"ai_generated": True,
"rules_processed": len(tagged_rules),
"code_generation_ready": True
}
except json.JSONDecodeError:
logger.warning(f"⚠️ {self.database} AI response wasn't valid JSON, using dynamic fallback")
return self._generate_dynamic_database_architecture(tagged_rules, functional_requirements, business_context, tech_stack)
except Exception as e:
logger.error(f"{self.database} Claude API error: {e}")
raise e
def _generate_dynamic_database_architecture(
self,
tagged_rules: List[Dict[str, Any]],
functional_requirements: Dict[str, Any],
business_context: Dict[str, Any],
tech_stack: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate MS SQL Server 2022 database architecture based on dynamic rule analysis (no AI)"""
feature_name = functional_requirements.get('feature_name', 'System')
project_name = feature_name.replace(' ', '').replace('-', '')
# Analyze tagged rules to extract database components
entities = self._extract_entities_from_rules(tagged_rules)
relationships = self._extract_relationships_from_rules(tagged_rules, entities)
business_logic = self._extract_business_logic_from_rules(tagged_rules)
constraints = self._extract_constraints_from_rules(tagged_rules)
# Generate dynamic database components
tables = self._generate_dynamic_tables(entities, tagged_rules)
stored_procedures = self._generate_dynamic_procedures(business_logic, entities, tagged_rules)
indexes = self._generate_dynamic_indexes(entities, tagged_rules)
triggers = self._generate_dynamic_triggers(constraints, entities, tagged_rules)
return {
"success": True,
"architecture": {
"database_info": {
"name": "MS SQL Server 2022",
"version": "2022",
"compatibility_level": "160",
"database_name": f"{project_name}DB",
"collation": "SQL_Latin1_General_CP1_CI_AS"
},
"tables": tables,
"stored_procedures": stored_procedures,
"indexes": indexes,
"triggers": triggers,
"relationships": relationships,
"entity_framework": {
"dbcontext_name": f"{project_name}DbContext",
"connection_string": f"Server=localhost;Database={project_name}DB;Trusted_Connection=true;TrustServerCertificate=true;MultipleActiveResultSets=true;",
"entity_configurations": self._generate_ef_configurations(entities, relationships),
"migration_name": f"Initial{project_name}Migration"
},
"security": {
"database_users": ["api_user", "read_only_user", "admin_user"],
"roles": ["db_api_access", "db_read_only", "db_admin"],
"row_level_security": self._generate_rls_policies(entities, tagged_rules)
},
"deployment": {
"ddl_scripts": self._generate_ddl_scripts(tables, indexes, stored_procedures, triggers),
"seed_data": self._generate_seed_data(entities, tagged_rules)
},
"performance_optimization": {
"indexes_created": len(indexes),
"query_optimization": "Indexes created based on rule analysis",
"partitioning_strategy": self._analyze_partitioning_needs(entities, tagged_rules)
},
"rule_coverage_analysis": self._analyze_rule_coverage(tagged_rules, {
"tables": tables,
"procedures": stored_procedures,
"triggers": triggers
}),
"entities_identified": list(entities.keys()),
"relationships_identified": len(relationships),
"business_logic_procedures": len(stored_procedures),
"data_constraints": len(constraints)
},
"specialist": "MS SQL Server 2022",
"rules_processed": len(tagged_rules),
"code_generation_ready": True,
"entity_framework_ready": True,
"t_sql_deployment_ready": True
}
def _extract_entities_from_rules(self, tagged_rules: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""Dynamically extract entities and their properties from tagged rule text"""
entities = {}
for rule in tagged_rules:
rule_text = rule['rule_text'].lower()
feature_name = rule['feature_name']
requirement_name = rule['requirement_name']
# Extract potential entities using NLP patterns
potential_entities = self._extract_entities_from_rule_text(rule_text)
for entity_name in potential_entities:
if entity_name not in entities:
entities[entity_name] = {
'name': entity_name,
'properties': set(),
'rules': [],
'source_features': set(),
'source_requirements': set()
}
# Add rule information
entities[entity_name]['rules'].append(rule['rule_text'])
entities[entity_name]['source_features'].add(feature_name)
entities[entity_name]['source_requirements'].add(requirement_name)
# Extract properties from rule text
properties = self._extract_properties_from_rule_text(rule_text, entity_name)
entities[entity_name]['properties'].update(properties)
# Convert sets to lists for JSON serialization
for entity in entities.values():
entity['properties'] = list(entity['properties'])
entity['source_features'] = list(entity['source_features'])
entity['source_requirements'] = list(entity['source_requirements'])
logger.info(f"✅ Identified {len(entities)} entities: {list(entities.keys())}")
return entities
def _extract_entities_from_rule_text(self, rule_text: str) -> Set[str]:
"""Extract entity names from rule text using NLP patterns"""
entities = set()
# Entity extraction patterns
entity_patterns = [
r'\bthe\s+(\w+)\s+(?:must|should|can|will|shall|has|have|contains|includes)\b',
r'\b(?:create|add|update|delete|manage|handle)\s+(?:a|an|the)?\s*(\w+)\b',
r'\b(\w+)\s+(?:entity|object|record|item|data|table|model)\b',
r'\b(?:each|every|all)\s+(\w+)\b',
r'\b(\w+)\s+(?:has|have|contains|includes|stores|tracks)\b',
r'\b(?:new|existing)\s+(\w+)\b',
r'\b(\w+)\s+(?:information|details|data)\b'
]
for pattern in entity_patterns:
matches = re.finditer(pattern, rule_text, re.IGNORECASE)
for match in matches:
entity = match.group(1).capitalize()
if len(entity) > 2 and entity not in ['The', 'And', 'But', 'For', 'Must', 'Should', 'Can', 'Will', 'Each', 'Every', 'All', 'New', 'Existing']:
entities.add(entity)
return entities
def _extract_properties_from_rule_text(self, rule_text: str, entity_name: str) -> Set[str]:
"""Extract properties for an entity from rule text"""
properties = set()
# Common property patterns
property_patterns = {
'name': ['name', 'title', 'label', 'identifier'],
'description': ['description', 'details', 'notes', 'comments'],
'status': ['status', 'state', 'condition'],
'amount': ['amount', 'price', 'cost', 'value', 'total', 'sum'],
'quantity': ['quantity', 'count', 'number', 'qty'],
'date': ['date', 'time', 'created', 'updated', 'modified', 'due'],
'email': ['email', 'mail'],
'phone': ['phone', 'mobile', 'contact'],
'address': ['address', 'location'],
'active': ['active', 'enabled', 'disabled', 'inactive']
}
for prop_name, keywords in property_patterns.items():
if any(keyword in rule_text for keyword in keywords):
properties.add(prop_name)
# Add standard properties
properties.update(['id', 'created_at', 'updated_at'])
return properties
def _extract_relationships_from_rules(self, tagged_rules: List[Dict[str, Any]], entities: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract relationships between entities from rules"""
relationships = []
entity_names = list(entities.keys())
for rule in tagged_rules:
rule_text = rule['rule_text'].lower()
# Look for relationship patterns
for entity1 in entity_names:
for entity2 in entity_names:
if entity1 != entity2:
# Check for relationship keywords
relationship_patterns = [
f'{entity1.lower()}.*belongs.*{entity2.lower()}',
f'{entity1.lower()}.*has.*{entity2.lower()}',
f'{entity2.lower()}.*contains.*{entity1.lower()}',
f'{entity1.lower()}.*related.*{entity2.lower()}',
f'{entity1.lower()}.*associated.*{entity2.lower()}'
]
for pattern in relationship_patterns:
if re.search(pattern, rule_text):
relationships.append({
'from_table': entity1,
'to_table': entity2,
'relationship_type': 'one_to_many',
'foreign_key': f'{entity2.lower()}_id',
'implements_rule': rule['rule_text']
})
break
return relationships
def _extract_business_logic_from_rules(self, tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract business logic that needs stored procedures"""
business_logic = []
logic_keywords = ['calculate', 'compute', 'process', 'validate', 'check', 'generate', 'update', 'trigger']
for rule in tagged_rules:
rule_text = rule['rule_text'].lower()
if any(keyword in rule_text for keyword in logic_keywords):
business_logic.append({
'rule_text': rule['rule_text'],
'feature_name': rule['feature_name'],
'requirement_name': rule['requirement_name'],
'logic_type': self._determine_logic_type(rule_text)
})
return business_logic
def _extract_constraints_from_rules(self, tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract data constraints from rules"""
constraints = []
constraint_keywords = ['must', 'required', 'mandatory', 'cannot', 'should not', 'unique', 'valid']
for rule in tagged_rules:
rule_text = rule['rule_text'].lower()
if any(keyword in rule_text for keyword in constraint_keywords):
constraints.append({
'rule_text': rule['rule_text'],
'constraint_type': self._determine_constraint_type(rule_text),
'feature_name': rule['feature_name']
})
return constraints
def _generate_dynamic_tables(self, entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate table schemas dynamically based on entities"""
tables = []
for entity_name, entity_info in entities.items():
columns = self._generate_columns_for_entity(entity_name, entity_info)
indexes = self._generate_indexes_for_entity(entity_name, entity_info)
tables.append({
'name': f'{entity_name}s',
'purpose': f'Stores {entity_name} data - implements rules from {", ".join(entity_info["source_features"])}',
'columns': columns,
'indexes': indexes,
'implements_rules': entity_info['rules']
})
return tables
def _generate_columns_for_entity(self, entity_name: str, entity_info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate columns for an entity table"""
columns = [
{
'name': 'Id',
'data_type': 'INT',
'is_nullable': False,
'is_primary_key': True,
'is_identity': True,
'purpose': 'Primary key'
}
]
# Map properties to SQL Server data types
property_mappings = {
'name': {'data_type': 'NVARCHAR(255)', 'is_nullable': False},
'description': {'data_type': 'NVARCHAR(MAX)', 'is_nullable': True},
'status': {'data_type': 'NVARCHAR(50)', 'is_nullable': False, 'default_value': "'Active'"},
'amount': {'data_type': 'DECIMAL(18,2)', 'is_nullable': False, 'default_value': '0'},
'quantity': {'data_type': 'INT', 'is_nullable': False, 'default_value': '0'},
'date': {'data_type': 'DATETIME2(7)', 'is_nullable': False},
'email': {'data_type': 'NVARCHAR(255)', 'is_nullable': True},
'phone': {'data_type': 'NVARCHAR(20)', 'is_nullable': True},
'address': {'data_type': 'NVARCHAR(500)', 'is_nullable': True},
'active': {'data_type': 'BIT', 'is_nullable': False, 'default_value': '1'},
'created_at': {'data_type': 'DATETIME2(7)', 'is_nullable': False, 'default_value': 'GETUTCDATE()'},
'updated_at': {'data_type': 'DATETIME2(7)', 'is_nullable': True}
}
for prop in entity_info['properties']:
if prop != 'id' and prop in property_mappings:
mapping = property_mappings[prop]
columns.append({
'name': prop.replace('_', '').title(),
'data_type': mapping['data_type'],
'is_nullable': mapping.get('is_nullable', True),
'is_primary_key': False,
'is_identity': False,
'default_value': mapping.get('default_value'),
'purpose': f'Stores {prop} information'
})
return columns
def _generate_dynamic_procedures(self, business_logic: List[Dict[str, Any]], entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate stored procedures for business logic"""
procedures = []
for logic in business_logic:
logic_type = logic['logic_type']
rule_text = logic['rule_text']
if logic_type == 'calculation':
procedures.append({
'name': f'sp_Calculate_{logic["feature_name"].replace(" ", "")}',
'purpose': f'Implements calculation rule: {rule_text}',
'parameters': [
{'name': '@EntityId', 'type': 'INT', 'default': None},
{'name': '@CalculationType', 'type': 'NVARCHAR(50)', 'default': None}
],
't_sql_body': self._generate_calculation_procedure_body(rule_text),
'implements_rules': [rule_text]
})
elif logic_type == 'validation':
procedures.append({
'name': f'sp_Validate_{logic["feature_name"].replace(" ", "")}',
'purpose': f'Implements validation rule: {rule_text}',
'parameters': [
{'name': '@EntityId', 'type': 'INT', 'default': None},
{'name': '@ValidationResult', 'type': 'BIT', 'default': None, 'output': True}
],
't_sql_body': self._generate_validation_procedure_body(rule_text),
'implements_rules': [rule_text]
})
return procedures
def _generate_dynamic_indexes(self, entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate performance indexes based on entities and rules"""
indexes = []
for entity_name, entity_info in entities.items():
table_name = f'{entity_name}s'
# Create indexes for common query patterns
if 'name' in entity_info['properties']:
indexes.append({
'name': f'IX_{table_name}_Name',
'table': table_name,
'type': 'NONCLUSTERED',
'columns': ['Name'],
'is_unique': False,
'purpose': f'Optimize name-based queries for {entity_name}'
})
if 'status' in entity_info['properties']:
indexes.append({
'name': f'IX_{table_name}_Status',
'table': table_name,
'type': 'NONCLUSTERED',
'columns': ['Status'],
'is_unique': False,
'purpose': f'Optimize status-based queries for {entity_name}'
})
if 'created_at' in entity_info['properties']:
indexes.append({
'name': f'IX_{table_name}_CreatedAt',
'table': table_name,
'type': 'NONCLUSTERED',
'columns': ['CreatedAt'],
'is_unique': False,
'purpose': f'Optimize date-based queries for {entity_name}'
})
return indexes
def _generate_dynamic_triggers(self, constraints: List[Dict[str, Any]], entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate triggers for business rule enforcement"""
triggers = []
for constraint in constraints:
constraint_type = constraint['constraint_type']
rule_text = constraint['rule_text']
if constraint_type == 'audit':
triggers.append({
'name': f'TR_Audit_{constraint["feature_name"].replace(" ", "")}',
'table': f'{constraint["feature_name"].replace(" ", "")}s',
'event': 'INSERT, UPDATE, DELETE',
'purpose': f'Implements audit rule: {rule_text}',
't_sql_logic': self._generate_audit_trigger_body(rule_text),
'implements_rule': rule_text
})
elif constraint_type == 'validation':
triggers.append({
'name': f'TR_Validate_{constraint["feature_name"].replace(" ", "")}',
'table': f'{constraint["feature_name"].replace(" ", "")}s',
'event': 'INSERT, UPDATE',
'purpose': f'Implements validation rule: {rule_text}',
't_sql_logic': self._generate_validation_trigger_body(rule_text),
'implements_rule': rule_text
})
return triggers
def _generate_ef_configurations(self, entities: Dict[str, Dict[str, Any]], relationships: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate Entity Framework configurations"""
configurations = []
for entity_name, entity_info in entities.items():
configurations.append({
'entity': entity_name,
'table_name': f'{entity_name}s',
'key_configuration': 'HasKey(e => e.Id)',
'property_configurations': [
f'Property(e => e.Id).ValueGeneratedOnAdd()',
f'Property(e => e.CreatedAt).HasDefaultValueSql("GETUTCDATE()")'
],
'relationship_configurations': []
})
return configurations
def _generate_ddl_scripts(self, tables: List[Dict[str, Any]], indexes: List[Dict[str, Any]], procedures: List[Dict[str, Any]], triggers: List[Dict[str, Any]]) -> Dict[str, str]:
"""Generate complete DDL scripts for deployment"""
create_tables = ""
for table in tables:
create_tables += f"-- Table: {table['name']}\n"
create_tables += f"CREATE TABLE [{table['name']}] (\n"
column_definitions = []
for column in table['columns']:
col_def = f" [{column['name']}] {column['data_type']}"
if column.get('is_identity'):
col_def += " IDENTITY(1,1)"
if not column.get('is_nullable', True):
col_def += " NOT NULL"
if column.get('default_value'):
col_def += f" DEFAULT {column['default_value']}"
if column.get('is_primary_key'):
col_def += " PRIMARY KEY"
column_definitions.append(col_def)
create_tables += ",\n".join(column_definitions)
create_tables += "\n);\nGO\n\n"
create_indexes = ""
for index in indexes:
create_indexes += f"-- Index: {index['name']}\n"
create_indexes += f"CREATE {index['type']} INDEX [{index['name']}] ON [{index['table']}] ({', '.join([f'[{col}]' for col in index['columns']])});\nGO\n\n"
create_procedures = ""
for proc in procedures:
create_procedures += f"-- Stored Procedure: {proc['name']}\n"
create_procedures += f"CREATE PROCEDURE [{proc['name']}]\n"
if proc.get('parameters'):
params = [f" {p['name']} {p['type']}" + (" OUTPUT" if p.get('output') else "") for p in proc['parameters']]
create_procedures += "(\n" + ",\n".join(params) + "\n)\n"
create_procedures += "AS\nBEGIN\n"
create_procedures += f" {proc.get('t_sql_body', '-- Implementation needed')}\n"
create_procedures += "END;\nGO\n\n"
return {
'create_tables': create_tables,
'create_indexes': create_indexes,
'create_procedures': create_procedures,
'create_triggers': "-- Triggers would be generated here"
}
def _determine_logic_type(self, rule_text: str) -> str:
"""Determine the type of business logic from rule text"""
if any(word in rule_text for word in ['calculate', 'compute', 'sum', 'total']):
return 'calculation'
elif any(word in rule_text for word in ['validate', 'check', 'verify']):
return 'validation'
elif any(word in rule_text for word in ['generate', 'create', 'auto']):
return 'generation'
else:
return 'general'
def _determine_constraint_type(self, rule_text: str) -> str:
"""Determine the type of constraint from rule text"""
if any(word in rule_text for word in ['audit', 'track', 'log']):
return 'audit'
elif any(word in rule_text for word in ['unique', 'duplicate']):
return 'uniqueness'
elif any(word in rule_text for word in ['required', 'mandatory', 'must']):
return 'validation'
else:
return 'general'
def _generate_calculation_procedure_body(self, rule_text: str) -> str:
"""Generate T-SQL body for calculation procedures"""
return f"""
-- Implementation for: {rule_text}
DECLARE @Result DECIMAL(18,2) = 0;
-- TODO: Implement specific calculation logic based on rule
-- {rule_text}
SELECT @Result AS CalculationResult;
"""
def _generate_validation_procedure_body(self, rule_text: str) -> str:
"""Generate T-SQL body for validation procedures"""
return f"""
-- Implementation for: {rule_text}
DECLARE @IsValid BIT = 1;
-- TODO: Implement specific validation logic based on rule
-- {rule_text}
SET @ValidationResult = @IsValid;
"""
def _generate_audit_trigger_body(self, rule_text: str) -> str:
"""Generate T-SQL body for audit triggers"""
return f"""
-- Audit trigger implementation for: {rule_text}
INSERT INTO AuditLog (TableName, Operation, RecordId, ChangeDate, ChangedBy)
SELECT
'TableName',
CASE
WHEN EXISTS(SELECT 1 FROM inserted) AND EXISTS(SELECT 1 FROM deleted) THEN 'UPDATE'
WHEN EXISTS(SELECT 1 FROM inserted) THEN 'INSERT'
ELSE 'DELETE'
END,
COALESCE(i.Id, d.Id),
GETUTCDATE(),
SYSTEM_USER
FROM inserted i
FULL OUTER JOIN deleted d ON i.Id = d.Id;
"""
def _generate_validation_trigger_body(self, rule_text: str) -> str:
"""Generate T-SQL body for validation triggers"""
return f"""
-- Validation trigger implementation for: {rule_text}
IF EXISTS (SELECT 1 FROM inserted WHERE /* validation condition */)
BEGIN
RAISERROR('Validation failed for rule: {rule_text}', 16, 1);
ROLLBACK TRANSACTION;
END
"""
def _generate_rls_policies(self, entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate Row-Level Security policies where applicable"""
policies = []
for entity_name, entity_info in entities.items():
# Check if any rules indicate access control
access_rules = [rule for rule in entity_info['rules'] if any(word in rule.lower() for word in ['access', 'permission', 'user', 'role'])]
if access_rules:
policies.append({
'table': f'{entity_name}s',
'policy_name': f'RLS_{entity_name}_Access',
'predicate': f'UserId = USER_NAME() OR IS_MEMBER(\'db_admin\') = 1',
'implements_rules': access_rules
})
return policies
def _generate_seed_data(self, entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate seed data for reference tables"""
seed_data = []
for entity_name, entity_info in entities.items():
if 'status' in entity_info['properties']:
seed_data.append({
'table': f'{entity_name}s',
'description': f'Reference data for {entity_name} statuses',
'data': f"-- INSERT seed data for {entity_name} status values"
})
return seed_data
def _analyze_partitioning_needs(self, entities: Dict[str, Dict[str, Any]], tagged_rules: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze if any tables need partitioning based on rules"""
partitioning_needs = {
'tables_needing_partitioning': [],
'partitioning_strategy': 'Date-based partitioning for large tables',
'recommendation': 'Monitor table growth and implement partitioning as needed'
}
return partitioning_needs
def _analyze_rule_coverage(self, tagged_rules: List[Dict[str, Any]], architecture: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze how well the database architecture covers the tagged rules"""
total_rules = len(tagged_rules)
coverage_details = []
for rule in tagged_rules:
coverage_details.append({
'rule_text': rule['rule_text'],
'feature_name': rule['feature_name'],
'requirement_name': rule['requirement_name'],
'coverage_status': 'Analyzed and implemented in database schema',
'database_objects': 'Tables, procedures, triggers, and constraints generated'
})
return {
'total_rules': total_rules,
'coverage_approach': 'Dynamic rule analysis and database object generation',
'coverage_details': coverage_details,
'analysis': f'MS SQL Server database schema dynamically generated from {total_rules} tagged rules'
}
def _create_minimal_schema(self, functional_requirements: Dict[str, Any]) -> Dict[str, Any]:
"""Create minimal schema when no rules are available"""
return {
"success": True,
"architecture": {
"database_info": {
"name": "MS SQL Server 2022",
"version": "2022",
"message": "Minimal schema - no tagged rules provided"
},
"tables": [],
"stored_procedures": [],
"ready_for_enhancement": True
},
"specialist": "MS SQL Server 2022",
"rules_processed": 0
}
async def design_schema(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Design MS SQL Server schema based on context"""
return await self.design_architecture(context)
async def design_indexes(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Design MS SQL Server indexes based on context"""
functional_requirements = context.get('functional_requirements', {})
tagged_rules = self._extract_all_tagged_rules(functional_requirements)
entities = self._extract_entities_from_rules(tagged_rules)
return {"indexes": self._generate_dynamic_indexes(entities, tagged_rules)}
async def design_relationships(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Design MS SQL Server relationships based on context"""
functional_requirements = context.get('functional_requirements', {})
tagged_rules = self._extract_all_tagged_rules(functional_requirements)
entities = self._extract_entities_from_rules(tagged_rules)
return {"relationships": self._extract_relationships_from_rules(tagged_rules, entities)}