""" Complete Pipeline Orchestrator - Simulated Assessment Engine =========================================================== This script orchestrates the complete 3-step workflow: 1. Persona Preparation: Merge persona factory output with enrichment data 2. Simulation: Generate all assessment responses 3. Post-Processing: Color headers, replace omitted values, verify quality Usage: python run_complete_pipeline.py [--step1] [--step2] [--step3] [--all] Options: --step1: Run only persona preparation --step2: Run only simulation --step3: Run only post-processing --all: Run all steps (default if no step specified) --skip-prep: Skip persona preparation (use existing merged_personas.xlsx) --skip-sim: Skip simulation (use existing output files) --skip-post: Skip post-processing --dry-run: Run simulation with 5 students only (for testing) Examples: python run_complete_pipeline.py --all python run_complete_pipeline.py --step1 python run_complete_pipeline.py --step2 --dry-run python run_complete_pipeline.py --step3 """ import sys import os import subprocess from pathlib import Path import time from typing import Optional # Add scripts directory to path BASE_DIR = Path(__file__).resolve().parent SCRIPTS_DIR = BASE_DIR / "scripts" sys.path.insert(0, str(SCRIPTS_DIR)) # ============================================================================ # CONFIGURATION # ============================================================================ # All paths are now relative to project directory # Note: Persona factory is optional - if not present, use existing merged_personas.xlsx PERSONA_FACTORY = BASE_DIR / "scripts" / "persona_factory.py" # Optional - can be added if needed FIXED_PERSONAS = BASE_DIR / "support" / "fixed_3k_personas.xlsx" PREPARE_DATA_SCRIPT = BASE_DIR / "scripts" / "prepare_data.py" MAIN_SCRIPT = BASE_DIR / "main.py" POST_PROCESS_SCRIPT = BASE_DIR / "scripts" / "comprehensive_post_processor.py" MERGED_PERSONAS_OUTPUT = BASE_DIR / "data" / "merged_personas.xlsx" STUDENTS_FILE = BASE_DIR / "support" / "3000-students.xlsx" STUDENTS_OUTPUT_FILE = BASE_DIR / "support" / "3000_students_output.xlsx" # ============================================================================ # STEP 1: PERSONA PREPARATION # ============================================================================ def check_prerequisites_step1() -> tuple[bool, list[str]]: """Check prerequisites for Step 1""" issues = [] # Persona factory is optional - if merged_personas.xlsx exists, we can skip # Only check if merged_personas.xlsx doesn't exist if not MERGED_PERSONAS_OUTPUT.exists(): # Check if fixed personas exists if not FIXED_PERSONAS.exists(): issues.append(f"Fixed personas file not found: {FIXED_PERSONAS}") issues.append(" Note: This file contains 22 enrichment columns (goals, interests, etc.)") issues.append(" Location: support/fixed_3k_personas.xlsx") # Check if prepare_data script exists if not PREPARE_DATA_SCRIPT.exists(): issues.append(f"Prepare data script not found: {PREPARE_DATA_SCRIPT}") # Check for student data files (needed for merging) if not STUDENTS_FILE.exists(): issues.append(f"Student data file not found: {STUDENTS_FILE}") issues.append(" Location: support/3000-students.xlsx") if not STUDENTS_OUTPUT_FILE.exists(): issues.append(f"Student output file not found: {STUDENTS_OUTPUT_FILE}") issues.append(" Location: support/3000_students_output.xlsx") else: # merged_personas.xlsx exists - can skip preparation print(" โ„น๏ธ merged_personas.xlsx already exists - Step 1 can be skipped") return len(issues) == 0, issues def run_step1_persona_preparation(skip: bool = False) -> dict: """Step 1: Prepare personas by merging factory output with enrichment data""" if skip: print("โญ๏ธ Skipping Step 1: Persona Preparation") print(" Using existing merged_personas.xlsx") return {'skipped': True} print("=" * 80) print("STEP 1: PERSONA PREPARATION") print("=" * 80) print() print("This step:") print(" 1. Generates personas using persona factory (if needed)") print(" 2. Merges with enrichment columns from fixed_3k_personas.xlsx") print(" 3. Combines with student data (3000-students.xlsx + 3000_students_output.xlsx)") print(" 4. Creates merged_personas.xlsx for simulation") print() # Check prerequisites print("๐Ÿ” Checking prerequisites...") all_good, issues = check_prerequisites_step1() if not all_good: print("โŒ PREREQUISITES NOT MET:") for issue in issues: print(f" - {issue}") print() print("๐Ÿ’ก Note: Step 1 requires:") print(" - Fixed personas file (support/fixed_3k_personas.xlsx) with 22 enrichment columns") print(" - Student data files (support/3000-students.xlsx, support/3000_students_output.xlsx)") print(" - Note: Persona factory is optional - existing merged_personas.xlsx can be used") print() return {'success': False, 'error': 'Prerequisites not met', 'issues': issues} print("โœ… All prerequisites met") print() # Run prepare_data script print("๐Ÿš€ Running persona preparation...") print("-" * 80) try: result = subprocess.run( [sys.executable, str(PREPARE_DATA_SCRIPT)], cwd=str(BASE_DIR), capture_output=True, text=True, check=True ) print(result.stdout) if MERGED_PERSONAS_OUTPUT.exists(): print() print("=" * 80) print("โœ… STEP 1 COMPLETE: merged_personas.xlsx created") print(f" Location: {MERGED_PERSONAS_OUTPUT}") print("=" * 80) print() return {'success': True} else: print("โŒ ERROR: merged_personas.xlsx was not created") return {'success': False, 'error': 'Output file not created'} except subprocess.CalledProcessError as e: print("โŒ ERROR running persona preparation:") print(e.stderr) return {'success': False, 'error': str(e)} except Exception as e: print(f"โŒ ERROR: {e}") return {'success': False, 'error': str(e)} # ============================================================================ # STEP 2: SIMULATION # ============================================================================ def check_prerequisites_step2() -> tuple[bool, list[str]]: """Check prerequisites for Step 2""" issues = [] # Check if merged personas exists if not MERGED_PERSONAS_OUTPUT.exists(): issues.append(f"merged_personas.xlsx not found: {MERGED_PERSONAS_OUTPUT}") issues.append(" Run Step 1 first to create this file") # Check if main script exists if not MAIN_SCRIPT.exists(): issues.append(f"Main simulation script not found: {MAIN_SCRIPT}") # Check if AllQuestions.xlsx exists questions_file = BASE_DIR / "data" / "AllQuestions.xlsx" if not questions_file.exists(): issues.append(f"Questions file not found: {questions_file}") return len(issues) == 0, issues def run_step2_simulation(skip: bool = False, dry_run: bool = False) -> dict: """Step 2: Run simulation to generate assessment responses""" if skip: print("โญ๏ธ Skipping Step 2: Simulation") print(" Using existing output files") return {'skipped': True} print("=" * 80) print("STEP 2: SIMULATION") print("=" * 80) print() if dry_run: print("๐Ÿงช DRY RUN MODE: Processing 5 students only (for testing)") else: print("๐Ÿš€ PRODUCTION MODE: Processing all 3,000 students") print() print("This step:") print(" 1. Loads personas from merged_personas.xlsx") print(" 2. Simulates responses for 5 domains (Personality, Grit, EI, VI, LS)") print(" 3. Simulates 12 cognition tests") print(" 4. Generates 34 output files (10 domain + 24 cognition)") print() # Check prerequisites print("๐Ÿ” Checking prerequisites...") all_good, issues = check_prerequisites_step2() if not all_good: print("โŒ PREREQUISITES NOT MET:") for issue in issues: print(f" - {issue}") print() return {'success': False, 'error': 'Prerequisites not met', 'issues': issues} print("โœ… All prerequisites met") print() # Run simulation print("๐Ÿš€ Starting simulation...") print("-" * 80) print(" โš ๏ธ This may take 12-15 hours for full 3,000 students") print(" โš ๏ธ Progress is saved incrementally (safe to interrupt)") print("-" * 80) print() try: if dry_run: result = subprocess.run( [sys.executable, str(MAIN_SCRIPT), "--dry"], cwd=str(BASE_DIR), check=False # Don't fail on dry run ) else: result = subprocess.run( [sys.executable, str(MAIN_SCRIPT), "--full"], cwd=str(BASE_DIR), check=False # Don't fail - simulation can be resumed ) print() print("=" * 80) if result.returncode == 0: print("โœ… STEP 2 COMPLETE: Simulation finished") else: print("โš ๏ธ STEP 2: Simulation ended (may be incomplete - can resume)") print("=" * 80) print() return {'success': True, 'returncode': result.returncode} except Exception as e: print(f"โŒ ERROR: {e}") return {'success': False, 'error': str(e)} # ============================================================================ # STEP 3: POST-PROCESSING # ============================================================================ def check_prerequisites_step3() -> tuple[bool, list[str]]: """Check prerequisites for Step 3""" issues = [] # Check if output directory exists output_dir = BASE_DIR / "output" / "full_run" if not output_dir.exists(): issues.append(f"Output directory not found: {output_dir}") issues.append(" Run Step 2 first to generate output files") # Check if mapping file exists mapping_file = BASE_DIR / "data" / "AllQuestions.xlsx" if not mapping_file.exists(): issues.append(f"Mapping file not found: {mapping_file}") # Check if post-process script exists if not POST_PROCESS_SCRIPT.exists(): issues.append(f"Post-process script not found: {POST_PROCESS_SCRIPT}") return len(issues) == 0, issues def run_step3_post_processing(skip: bool = False) -> dict: """Step 3: Post-process output files""" if skip: print("โญ๏ธ Skipping Step 3: Post-Processing") return {'skipped': True} print("=" * 80) print("STEP 3: POST-PROCESSING") print("=" * 80) print() print("This step:") print(" 1. Colors headers (Green: omission, Red: reverse-scored)") print(" 2. Replaces omitted values with '--'") print(" 3. Verifies quality (data density, variance, schema)") print() # Check prerequisites print("๐Ÿ” Checking prerequisites...") all_good, issues = check_prerequisites_step3() if not all_good: print("โŒ PREREQUISITES NOT MET:") for issue in issues: print(f" - {issue}") print() return {'success': False, 'error': 'Prerequisites not met', 'issues': issues} print("โœ… All prerequisites met") print() # Run post-processing print("๐Ÿš€ Starting post-processing...") print("-" * 80) try: result = subprocess.run( [sys.executable, str(POST_PROCESS_SCRIPT)], cwd=str(BASE_DIR), check=True ) print() print("=" * 80) print("โœ… STEP 3 COMPLETE: Post-processing finished") print("=" * 80) print() return {'success': True} except subprocess.CalledProcessError as e: print(f"โŒ ERROR: Post-processing failed with return code {e.returncode}") return {'success': False, 'error': f'Return code: {e.returncode}'} except Exception as e: print(f"โŒ ERROR: {e}") return {'success': False, 'error': str(e)} # ============================================================================ # MAIN ORCHESTRATION # ============================================================================ def main(): """Main orchestration""" print("=" * 80) print("COMPLETE PIPELINE ORCHESTRATOR") print("Simulated Assessment Engine - Production Workflow") print("=" * 80) print() # Parse arguments run_step1 = '--step1' in sys.argv run_step2 = '--step2' in sys.argv run_step3 = '--step3' in sys.argv run_all = '--all' in sys.argv or (not run_step1 and not run_step2 and not run_step3) skip_prep = '--skip-prep' in sys.argv skip_sim = '--skip-sim' in sys.argv skip_post = '--skip-post' in sys.argv dry_run = '--dry-run' in sys.argv # Determine which steps to run if run_all: run_step1 = True run_step2 = True run_step3 = True print("๐Ÿ“‹ Execution Plan:") if run_step1 and not skip_prep: print(" โœ… Step 1: Persona Preparation") elif skip_prep: print(" โญ๏ธ Step 1: Persona Preparation (SKIPPED)") if run_step2 and not skip_sim: mode = "DRY RUN (5 students)" if dry_run else "FULL (3,000 students)" print(f" โœ… Step 2: Simulation ({mode})") elif skip_sim: print(" โญ๏ธ Step 2: Simulation (SKIPPED)") if run_step3 and not skip_post: print(" โœ… Step 3: Post-Processing") elif skip_post: print(" โญ๏ธ Step 3: Post-Processing (SKIPPED)") print() # Confirm before starting if run_step2 and not skip_sim and not dry_run: print("โš ๏ธ WARNING: Full simulation will process 3,000 students") print(" This may take 12-15 hours and consume API credits") print(" Press Ctrl+C within 5 seconds to cancel...") print() try: time.sleep(5) except KeyboardInterrupt: print("\nโŒ Cancelled by user") sys.exit(0) print() print("=" * 80) print("STARTING PIPELINE EXECUTION") print("=" * 80) print() start_time = time.time() results = {} # Step 1: Persona Preparation if run_step1: results['step1'] = run_step1_persona_preparation(skip=skip_prep) if not results['step1'].get('success', False) and not results['step1'].get('skipped', False): print("โŒ Step 1 failed. Stopping pipeline.") sys.exit(1) # Step 2: Simulation if run_step2: results['step2'] = run_step2_simulation(skip=skip_sim, dry_run=dry_run) # Don't fail on simulation - it can be resumed # Step 3: Post-Processing if run_step3: results['step3'] = run_step3_post_processing(skip=skip_post) if not results['step3'].get('success', False) and not results['step3'].get('skipped', False): print("โŒ Step 3 failed.") sys.exit(1) # Final summary elapsed = time.time() - start_time hours = int(elapsed // 3600) minutes = int((elapsed % 3600) // 60) print("=" * 80) print("PIPELINE EXECUTION COMPLETE") print("=" * 80) print() print(f"โฑ๏ธ Total time: {hours}h {minutes}m") print() if run_step1 and not skip_prep: s1 = results.get('step1', {}) if s1.get('success'): print("โœ… Step 1: Persona Preparation - SUCCESS") elif s1.get('skipped'): print("โญ๏ธ Step 1: Persona Preparation - SKIPPED") else: print("โŒ Step 1: Persona Preparation - FAILED") if run_step2 and not skip_sim: s2 = results.get('step2', {}) if s2.get('success'): print("โœ… Step 2: Simulation - SUCCESS") elif s2.get('skipped'): print("โญ๏ธ Step 2: Simulation - SKIPPED") else: print("โš ๏ธ Step 2: Simulation - INCOMPLETE (can be resumed)") if run_step3 and not skip_post: s3 = results.get('step3', {}) if s3.get('success'): print("โœ… Step 3: Post-Processing - SUCCESS") elif s3.get('skipped'): print("โญ๏ธ Step 3: Post-Processing - SKIPPED") else: print("โŒ Step 3: Post-Processing - FAILED") print() print("=" * 80) # Exit code all_success = all( r.get('success', True) or r.get('skipped', False) for r in results.values() ) sys.exit(0 if all_success else 1) if __name__ == "__main__": main()