485 lines
17 KiB
Python
485 lines
17 KiB
Python
"""
|
||
Complete Pipeline Orchestrator - Simulated Assessment Engine
|
||
===========================================================
|
||
|
||
This script orchestrates the complete 3-step workflow:
|
||
1. Persona Preparation: Merge persona factory output with enrichment data
|
||
2. Simulation: Generate all assessment responses
|
||
3. Post-Processing: Color headers, replace omitted values, verify quality
|
||
|
||
Usage:
|
||
python run_complete_pipeline.py [--step1] [--step2] [--step3] [--all]
|
||
|
||
Options:
|
||
--step1: Run only persona preparation
|
||
--step2: Run only simulation
|
||
--step3: Run only post-processing
|
||
--all: Run all steps (default if no step specified)
|
||
--skip-prep: Skip persona preparation (use existing merged_personas.xlsx)
|
||
--skip-sim: Skip simulation (use existing output files)
|
||
--skip-post: Skip post-processing
|
||
--dry-run: Run simulation with 5 students only (for testing)
|
||
|
||
Examples:
|
||
python run_complete_pipeline.py --all
|
||
python run_complete_pipeline.py --step1
|
||
python run_complete_pipeline.py --step2 --dry-run
|
||
python run_complete_pipeline.py --step3
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import subprocess
|
||
from pathlib import Path
|
||
import time
|
||
from typing import Optional
|
||
|
||
# Add scripts directory to path
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
SCRIPTS_DIR = BASE_DIR / "scripts"
|
||
sys.path.insert(0, str(SCRIPTS_DIR))
|
||
|
||
# ============================================================================
|
||
# CONFIGURATION
|
||
# ============================================================================
|
||
|
||
# All paths are now relative to project directory
|
||
# Note: Persona factory is optional - if not present, use existing merged_personas.xlsx
|
||
PERSONA_FACTORY = BASE_DIR / "scripts" / "persona_factory.py" # Optional - can be added if needed
|
||
FIXED_PERSONAS = BASE_DIR / "support" / "fixed_3k_personas.xlsx"
|
||
PREPARE_DATA_SCRIPT = BASE_DIR / "scripts" / "prepare_data.py"
|
||
MAIN_SCRIPT = BASE_DIR / "main.py"
|
||
POST_PROCESS_SCRIPT = BASE_DIR / "scripts" / "comprehensive_post_processor.py"
|
||
|
||
MERGED_PERSONAS_OUTPUT = BASE_DIR / "data" / "merged_personas.xlsx"
|
||
STUDENTS_FILE = BASE_DIR / "support" / "3000-students.xlsx"
|
||
STUDENTS_OUTPUT_FILE = BASE_DIR / "support" / "3000_students_output.xlsx"
|
||
|
||
# ============================================================================
|
||
# STEP 1: PERSONA PREPARATION
|
||
# ============================================================================
|
||
|
||
def check_prerequisites_step1() -> tuple[bool, list[str]]:
|
||
"""Check prerequisites for Step 1"""
|
||
issues = []
|
||
|
||
# Persona factory is optional - if merged_personas.xlsx exists, we can skip
|
||
# Only check if merged_personas.xlsx doesn't exist
|
||
if not MERGED_PERSONAS_OUTPUT.exists():
|
||
# Check if fixed personas exists
|
||
if not FIXED_PERSONAS.exists():
|
||
issues.append(f"Fixed personas file not found: {FIXED_PERSONAS}")
|
||
issues.append(" Note: This file contains 22 enrichment columns (goals, interests, etc.)")
|
||
issues.append(" Location: support/fixed_3k_personas.xlsx")
|
||
|
||
# Check if prepare_data script exists
|
||
if not PREPARE_DATA_SCRIPT.exists():
|
||
issues.append(f"Prepare data script not found: {PREPARE_DATA_SCRIPT}")
|
||
|
||
# Check for student data files (needed for merging)
|
||
if not STUDENTS_FILE.exists():
|
||
issues.append(f"Student data file not found: {STUDENTS_FILE}")
|
||
issues.append(" Location: support/3000-students.xlsx")
|
||
|
||
if not STUDENTS_OUTPUT_FILE.exists():
|
||
issues.append(f"Student output file not found: {STUDENTS_OUTPUT_FILE}")
|
||
issues.append(" Location: support/3000_students_output.xlsx")
|
||
else:
|
||
# merged_personas.xlsx exists - can skip preparation
|
||
print(" ℹ️ merged_personas.xlsx already exists - Step 1 can be skipped")
|
||
|
||
return len(issues) == 0, issues
|
||
|
||
def run_step1_persona_preparation(skip: bool = False) -> dict:
|
||
"""Step 1: Prepare personas by merging factory output with enrichment data"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 1: Persona Preparation")
|
||
print(" Using existing merged_personas.xlsx")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 1: PERSONA PREPARATION")
|
||
print("=" * 80)
|
||
print()
|
||
print("This step:")
|
||
print(" 1. Generates personas using persona factory (if needed)")
|
||
print(" 2. Merges with enrichment columns from fixed_3k_personas.xlsx")
|
||
print(" 3. Combines with student data (3000-students.xlsx + 3000_students_output.xlsx)")
|
||
print(" 4. Creates merged_personas.xlsx for simulation")
|
||
print()
|
||
|
||
# Check prerequisites
|
||
print("🔍 Checking prerequisites...")
|
||
all_good, issues = check_prerequisites_step1()
|
||
|
||
if not all_good:
|
||
print("❌ PREREQUISITES NOT MET:")
|
||
for issue in issues:
|
||
print(f" - {issue}")
|
||
print()
|
||
print("💡 Note: Step 1 requires:")
|
||
print(" - Fixed personas file (support/fixed_3k_personas.xlsx) with 22 enrichment columns")
|
||
print(" - Student data files (support/3000-students.xlsx, support/3000_students_output.xlsx)")
|
||
print(" - Note: Persona factory is optional - existing merged_personas.xlsx can be used")
|
||
print()
|
||
return {'success': False, 'error': 'Prerequisites not met', 'issues': issues}
|
||
|
||
print("✅ All prerequisites met")
|
||
print()
|
||
|
||
# Run prepare_data script
|
||
print("🚀 Running persona preparation...")
|
||
print("-" * 80)
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
[sys.executable, str(PREPARE_DATA_SCRIPT)],
|
||
cwd=str(BASE_DIR),
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
print(result.stdout)
|
||
|
||
if MERGED_PERSONAS_OUTPUT.exists():
|
||
print()
|
||
print("=" * 80)
|
||
print("✅ STEP 1 COMPLETE: merged_personas.xlsx created")
|
||
print(f" Location: {MERGED_PERSONAS_OUTPUT}")
|
||
print("=" * 80)
|
||
print()
|
||
return {'success': True}
|
||
else:
|
||
print("❌ ERROR: merged_personas.xlsx was not created")
|
||
return {'success': False, 'error': 'Output file not created'}
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
print("❌ ERROR running persona preparation:")
|
||
print(e.stderr)
|
||
return {'success': False, 'error': str(e)}
|
||
except Exception as e:
|
||
print(f"❌ ERROR: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
# ============================================================================
|
||
# STEP 2: SIMULATION
|
||
# ============================================================================
|
||
|
||
def check_prerequisites_step2() -> tuple[bool, list[str]]:
|
||
"""Check prerequisites for Step 2"""
|
||
issues = []
|
||
|
||
# Check if merged personas exists
|
||
if not MERGED_PERSONAS_OUTPUT.exists():
|
||
issues.append(f"merged_personas.xlsx not found: {MERGED_PERSONAS_OUTPUT}")
|
||
issues.append(" Run Step 1 first to create this file")
|
||
|
||
# Check if main script exists
|
||
if not MAIN_SCRIPT.exists():
|
||
issues.append(f"Main simulation script not found: {MAIN_SCRIPT}")
|
||
|
||
# Check if AllQuestions.xlsx exists
|
||
questions_file = BASE_DIR / "data" / "AllQuestions.xlsx"
|
||
if not questions_file.exists():
|
||
issues.append(f"Questions file not found: {questions_file}")
|
||
|
||
return len(issues) == 0, issues
|
||
|
||
def run_step2_simulation(skip: bool = False, dry_run: bool = False) -> dict:
|
||
"""Step 2: Run simulation to generate assessment responses"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 2: Simulation")
|
||
print(" Using existing output files")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 2: SIMULATION")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
if dry_run:
|
||
print("🧪 DRY RUN MODE: Processing 5 students only (for testing)")
|
||
else:
|
||
print("🚀 PRODUCTION MODE: Processing all 3,000 students")
|
||
print()
|
||
print("This step:")
|
||
print(" 1. Loads personas from merged_personas.xlsx")
|
||
print(" 2. Simulates responses for 5 domains (Personality, Grit, EI, VI, LS)")
|
||
print(" 3. Simulates 12 cognition tests")
|
||
print(" 4. Generates 34 output files (10 domain + 24 cognition)")
|
||
print()
|
||
|
||
# Check prerequisites
|
||
print("🔍 Checking prerequisites...")
|
||
all_good, issues = check_prerequisites_step2()
|
||
|
||
if not all_good:
|
||
print("❌ PREREQUISITES NOT MET:")
|
||
for issue in issues:
|
||
print(f" - {issue}")
|
||
print()
|
||
return {'success': False, 'error': 'Prerequisites not met', 'issues': issues}
|
||
|
||
print("✅ All prerequisites met")
|
||
print()
|
||
|
||
# Run simulation
|
||
print("🚀 Starting simulation...")
|
||
print("-" * 80)
|
||
print(" ⚠️ This may take 12-15 hours for full 3,000 students")
|
||
print(" ⚠️ Progress is saved incrementally (safe to interrupt)")
|
||
print("-" * 80)
|
||
print()
|
||
|
||
try:
|
||
if dry_run:
|
||
result = subprocess.run(
|
||
[sys.executable, str(MAIN_SCRIPT), "--dry"],
|
||
cwd=str(BASE_DIR),
|
||
check=False # Don't fail on dry run
|
||
)
|
||
else:
|
||
result = subprocess.run(
|
||
[sys.executable, str(MAIN_SCRIPT), "--full"],
|
||
cwd=str(BASE_DIR),
|
||
check=False # Don't fail - simulation can be resumed
|
||
)
|
||
|
||
print()
|
||
print("=" * 80)
|
||
if result.returncode == 0:
|
||
print("✅ STEP 2 COMPLETE: Simulation finished")
|
||
else:
|
||
print("⚠️ STEP 2: Simulation ended (may be incomplete - can resume)")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
return {'success': True, 'returncode': result.returncode}
|
||
|
||
except Exception as e:
|
||
print(f"❌ ERROR: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
# ============================================================================
|
||
# STEP 3: POST-PROCESSING
|
||
# ============================================================================
|
||
|
||
def check_prerequisites_step3() -> tuple[bool, list[str]]:
|
||
"""Check prerequisites for Step 3"""
|
||
issues = []
|
||
|
||
# Check if output directory exists
|
||
output_dir = BASE_DIR / "output" / "full_run"
|
||
if not output_dir.exists():
|
||
issues.append(f"Output directory not found: {output_dir}")
|
||
issues.append(" Run Step 2 first to generate output files")
|
||
|
||
# Check if mapping file exists
|
||
mapping_file = BASE_DIR / "data" / "AllQuestions.xlsx"
|
||
if not mapping_file.exists():
|
||
issues.append(f"Mapping file not found: {mapping_file}")
|
||
|
||
# Check if post-process script exists
|
||
if not POST_PROCESS_SCRIPT.exists():
|
||
issues.append(f"Post-process script not found: {POST_PROCESS_SCRIPT}")
|
||
|
||
return len(issues) == 0, issues
|
||
|
||
def run_step3_post_processing(skip: bool = False) -> dict:
|
||
"""Step 3: Post-process output files"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 3: Post-Processing")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 3: POST-PROCESSING")
|
||
print("=" * 80)
|
||
print()
|
||
print("This step:")
|
||
print(" 1. Colors headers (Green: omission, Red: reverse-scored)")
|
||
print(" 2. Replaces omitted values with '--'")
|
||
print(" 3. Verifies quality (data density, variance, schema)")
|
||
print()
|
||
|
||
# Check prerequisites
|
||
print("🔍 Checking prerequisites...")
|
||
all_good, issues = check_prerequisites_step3()
|
||
|
||
if not all_good:
|
||
print("❌ PREREQUISITES NOT MET:")
|
||
for issue in issues:
|
||
print(f" - {issue}")
|
||
print()
|
||
return {'success': False, 'error': 'Prerequisites not met', 'issues': issues}
|
||
|
||
print("✅ All prerequisites met")
|
||
print()
|
||
|
||
# Run post-processing
|
||
print("🚀 Starting post-processing...")
|
||
print("-" * 80)
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
[sys.executable, str(POST_PROCESS_SCRIPT)],
|
||
cwd=str(BASE_DIR),
|
||
check=True
|
||
)
|
||
|
||
print()
|
||
print("=" * 80)
|
||
print("✅ STEP 3 COMPLETE: Post-processing finished")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
return {'success': True}
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"❌ ERROR: Post-processing failed with return code {e.returncode}")
|
||
return {'success': False, 'error': f'Return code: {e.returncode}'}
|
||
except Exception as e:
|
||
print(f"❌ ERROR: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
# ============================================================================
|
||
# MAIN ORCHESTRATION
|
||
# ============================================================================
|
||
|
||
def main():
|
||
"""Main orchestration"""
|
||
print("=" * 80)
|
||
print("COMPLETE PIPELINE ORCHESTRATOR")
|
||
print("Simulated Assessment Engine - Production Workflow")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
# Parse arguments
|
||
run_step1 = '--step1' in sys.argv
|
||
run_step2 = '--step2' in sys.argv
|
||
run_step3 = '--step3' in sys.argv
|
||
run_all = '--all' in sys.argv or (not run_step1 and not run_step2 and not run_step3)
|
||
|
||
skip_prep = '--skip-prep' in sys.argv
|
||
skip_sim = '--skip-sim' in sys.argv
|
||
skip_post = '--skip-post' in sys.argv
|
||
dry_run = '--dry-run' in sys.argv
|
||
|
||
# Determine which steps to run
|
||
if run_all:
|
||
run_step1 = True
|
||
run_step2 = True
|
||
run_step3 = True
|
||
|
||
print("📋 Execution Plan:")
|
||
if run_step1 and not skip_prep:
|
||
print(" ✅ Step 1: Persona Preparation")
|
||
elif skip_prep:
|
||
print(" ⏭️ Step 1: Persona Preparation (SKIPPED)")
|
||
|
||
if run_step2 and not skip_sim:
|
||
mode = "DRY RUN (5 students)" if dry_run else "FULL (3,000 students)"
|
||
print(f" ✅ Step 2: Simulation ({mode})")
|
||
elif skip_sim:
|
||
print(" ⏭️ Step 2: Simulation (SKIPPED)")
|
||
|
||
if run_step3 and not skip_post:
|
||
print(" ✅ Step 3: Post-Processing")
|
||
elif skip_post:
|
||
print(" ⏭️ Step 3: Post-Processing (SKIPPED)")
|
||
|
||
print()
|
||
|
||
# Confirm before starting
|
||
if run_step2 and not skip_sim and not dry_run:
|
||
print("⚠️ WARNING: Full simulation will process 3,000 students")
|
||
print(" This may take 12-15 hours and consume API credits")
|
||
print(" Press Ctrl+C within 5 seconds to cancel...")
|
||
print()
|
||
try:
|
||
time.sleep(5)
|
||
except KeyboardInterrupt:
|
||
print("\n❌ Cancelled by user")
|
||
sys.exit(0)
|
||
|
||
print()
|
||
print("=" * 80)
|
||
print("STARTING PIPELINE EXECUTION")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
start_time = time.time()
|
||
results = {}
|
||
|
||
# Step 1: Persona Preparation
|
||
if run_step1:
|
||
results['step1'] = run_step1_persona_preparation(skip=skip_prep)
|
||
if not results['step1'].get('success', False) and not results['step1'].get('skipped', False):
|
||
print("❌ Step 1 failed. Stopping pipeline.")
|
||
sys.exit(1)
|
||
|
||
# Step 2: Simulation
|
||
if run_step2:
|
||
results['step2'] = run_step2_simulation(skip=skip_sim, dry_run=dry_run)
|
||
# Don't fail on simulation - it can be resumed
|
||
|
||
# Step 3: Post-Processing
|
||
if run_step3:
|
||
results['step3'] = run_step3_post_processing(skip=skip_post)
|
||
if not results['step3'].get('success', False) and not results['step3'].get('skipped', False):
|
||
print("❌ Step 3 failed.")
|
||
sys.exit(1)
|
||
|
||
# Final summary
|
||
elapsed = time.time() - start_time
|
||
hours = int(elapsed // 3600)
|
||
minutes = int((elapsed % 3600) // 60)
|
||
|
||
print("=" * 80)
|
||
print("PIPELINE EXECUTION COMPLETE")
|
||
print("=" * 80)
|
||
print()
|
||
print(f"⏱️ Total time: {hours}h {minutes}m")
|
||
print()
|
||
|
||
if run_step1 and not skip_prep:
|
||
s1 = results.get('step1', {})
|
||
if s1.get('success'):
|
||
print("✅ Step 1: Persona Preparation - SUCCESS")
|
||
elif s1.get('skipped'):
|
||
print("⏭️ Step 1: Persona Preparation - SKIPPED")
|
||
else:
|
||
print("❌ Step 1: Persona Preparation - FAILED")
|
||
|
||
if run_step2 and not skip_sim:
|
||
s2 = results.get('step2', {})
|
||
if s2.get('success'):
|
||
print("✅ Step 2: Simulation - SUCCESS")
|
||
elif s2.get('skipped'):
|
||
print("⏭️ Step 2: Simulation - SKIPPED")
|
||
else:
|
||
print("⚠️ Step 2: Simulation - INCOMPLETE (can be resumed)")
|
||
|
||
if run_step3 and not skip_post:
|
||
s3 = results.get('step3', {})
|
||
if s3.get('success'):
|
||
print("✅ Step 3: Post-Processing - SUCCESS")
|
||
elif s3.get('skipped'):
|
||
print("⏭️ Step 3: Post-Processing - SKIPPED")
|
||
else:
|
||
print("❌ Step 3: Post-Processing - FAILED")
|
||
|
||
print()
|
||
print("=" * 80)
|
||
|
||
# Exit code
|
||
all_success = all(
|
||
r.get('success', True) or r.get('skipped', False)
|
||
for r in results.values()
|
||
)
|
||
|
||
sys.exit(0 if all_success else 1)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|