547 lines
20 KiB
Python
547 lines
20 KiB
Python
"""
|
||
Comprehensive Post-Processor for Simulated Assessment Engine
|
||
===========================================================
|
||
|
||
This script performs all post-processing steps on generated assessment files:
|
||
1. Header Coloring: Green for omission items, Red for reverse-scored items
|
||
2. Omitted Value Replacement: Replace all values in omitted columns with "--"
|
||
3. Quality Verification: Comprehensive quality checks at granular level
|
||
|
||
Usage:
|
||
python scripts/comprehensive_post_processor.py [--skip-colors] [--skip-replacement] [--skip-quality]
|
||
|
||
Options:
|
||
--skip-colors: Skip header coloring step
|
||
--skip-replacement: Skip omitted value replacement step
|
||
--skip-quality: Skip quality verification step
|
||
"""
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
from openpyxl.styles import Font
|
||
from openpyxl.utils.dataframe import dataframe_to_rows
|
||
from pathlib import Path
|
||
import sys
|
||
import io
|
||
import json
|
||
from typing import Dict, List, Tuple, Optional
|
||
from datetime import datetime
|
||
|
||
# Fix Windows console encoding
|
||
if sys.platform == 'win32':
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
|
||
# ============================================================================
|
||
# CONFIGURATION
|
||
# ============================================================================
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||
OUTPUT_DIR = BASE_DIR / "output" / "full_run"
|
||
MAPPING_FILE = BASE_DIR / "data" / "AllQuestions.xlsx"
|
||
PERSONAS_FILE = BASE_DIR / "data" / "merged_personas.xlsx"
|
||
|
||
# Domain files to process
|
||
DOMAIN_FILES = {
|
||
'adolescense': [
|
||
'Personality_14-17.xlsx',
|
||
'Grit_14-17.xlsx',
|
||
'Emotional_Intelligence_14-17.xlsx',
|
||
'Vocational_Interest_14-17.xlsx',
|
||
'Learning_Strategies_14-17.xlsx'
|
||
],
|
||
'adults': [
|
||
'Personality_18-23.xlsx',
|
||
'Grit_18-23.xlsx',
|
||
'Emotional_Intelligence_18-23.xlsx',
|
||
'Vocational_Interest_18-23.xlsx',
|
||
'Learning_Strategies_18-23.xlsx'
|
||
]
|
||
}
|
||
|
||
# ============================================================================
|
||
# STEP 1: HEADER COLORING
|
||
# ============================================================================
|
||
|
||
def load_question_mapping() -> Tuple[set, set]:
|
||
"""Load omission and reverse-scored question codes from mapping file"""
|
||
if not MAPPING_FILE.exists():
|
||
raise FileNotFoundError(f"Mapping file not found: {MAPPING_FILE}")
|
||
|
||
map_df = pd.read_excel(MAPPING_FILE, engine='openpyxl')
|
||
|
||
# Get omission codes
|
||
omission_df = map_df[map_df['Type'].str.lower() == 'omission']
|
||
omission_codes = set(omission_df['code'].astype(str).str.strip().tolist())
|
||
|
||
# Get reverse-scored codes
|
||
reverse_df = map_df[map_df['tag'].str.lower().str.contains('reverse', na=False)]
|
||
reverse_codes = set(reverse_df['code'].astype(str).str.strip().tolist())
|
||
|
||
return omission_codes, reverse_codes
|
||
|
||
def color_headers(file_path: Path, omission_codes: set, reverse_codes: set) -> Tuple[bool, int]:
|
||
"""Color headers: Green for omission, Red for reverse-scored"""
|
||
try:
|
||
wb = load_workbook(file_path)
|
||
ws = wb.active
|
||
|
||
# Define font colors
|
||
green_font = Font(color="008000") # Dark Green
|
||
red_font = Font(color="FF0000") # Bright Red
|
||
|
||
headers = [cell.value for cell in ws[1]]
|
||
modified_cols = 0
|
||
|
||
for col_idx, header in enumerate(headers, start=1):
|
||
if not header:
|
||
continue
|
||
|
||
header_str = str(header).strip()
|
||
target_font = None
|
||
|
||
# Priority: Red (Reverse) > Green (Omission)
|
||
if header_str in reverse_codes:
|
||
target_font = red_font
|
||
elif header_str in omission_codes:
|
||
target_font = green_font
|
||
|
||
if target_font:
|
||
ws.cell(row=1, column=col_idx).font = target_font
|
||
modified_cols += 1
|
||
|
||
wb.save(file_path)
|
||
return True, modified_cols
|
||
except Exception as e:
|
||
return False, 0
|
||
|
||
def step1_color_headers(skip: bool = False) -> Dict:
|
||
"""Step 1: Color all headers"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 1: Header Coloring")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 1: HEADER COLORING")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
try:
|
||
omission_codes, reverse_codes = load_question_mapping()
|
||
print(f"📊 Loaded mapping: {len(omission_codes)} omission items, {len(reverse_codes)} reverse-scored items")
|
||
print()
|
||
except Exception as e:
|
||
print(f"❌ ERROR loading mapping: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
results = {
|
||
'total_files': 0,
|
||
'processed': 0,
|
||
'failed': [],
|
||
'total_colored': 0
|
||
}
|
||
|
||
for age_group, files in DOMAIN_FILES.items():
|
||
print(f"📂 Processing {age_group.upper()} files...")
|
||
print("-" * 80)
|
||
|
||
for file_name in files:
|
||
results['total_files'] += 1
|
||
file_path = OUTPUT_DIR / age_group / "5_domain" / file_name
|
||
|
||
if not file_path.exists():
|
||
print(f" ⚠️ SKIP: {file_name} (not found)")
|
||
results['failed'].append((file_name, "File not found"))
|
||
continue
|
||
|
||
print(f" 🎨 {file_name}")
|
||
success, result = color_headers(file_path, omission_codes, reverse_codes)
|
||
|
||
if success:
|
||
results['processed'] += 1
|
||
results['total_colored'] += result
|
||
print(f" ✅ {result} headers colored")
|
||
else:
|
||
results['failed'].append((file_name, result))
|
||
print(f" ❌ Error: {result}")
|
||
print()
|
||
|
||
print("=" * 80)
|
||
print(f"✅ STEP 1 COMPLETE: {results['processed']}/{results['total_files']} files processed")
|
||
print(f" Total headers colored: {results['total_colored']}")
|
||
if results['failed']:
|
||
print(f" Failed: {len(results['failed'])} files")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
return {'success': len(results['failed']) == 0, **results}
|
||
|
||
# ============================================================================
|
||
# STEP 2: OMITTED VALUE REPLACEMENT
|
||
# ============================================================================
|
||
|
||
def replace_omitted_values(file_path: Path, omitted_codes: set) -> Tuple[bool, int]:
|
||
"""Replace all values in omitted columns with '--', preserving header colors"""
|
||
try:
|
||
# Load with openpyxl to preserve formatting
|
||
wb = load_workbook(file_path)
|
||
ws = wb.active
|
||
|
||
# Load with pandas for data manipulation
|
||
df = pd.DataFrame(ws.iter_rows(min_row=1, values_only=True))
|
||
df.columns = df.iloc[0]
|
||
df = df[1:].reset_index(drop=True)
|
||
|
||
# Find omitted columns
|
||
omitted_cols = []
|
||
for col in df.columns:
|
||
if str(col).strip() in omitted_codes:
|
||
omitted_cols.append(col)
|
||
|
||
if not omitted_cols:
|
||
return True, 0
|
||
|
||
# Count values to replace
|
||
total_replaced = 0
|
||
for col in omitted_cols:
|
||
non_null = df[col].notna().sum()
|
||
df[col] = "--"
|
||
total_replaced += non_null
|
||
|
||
# Write back to worksheet (preserving formatting)
|
||
# Clear existing data (except headers)
|
||
for row_idx in range(2, ws.max_row + 1):
|
||
for col_idx in range(1, ws.max_column + 1):
|
||
ws.cell(row=row_idx, column=col_idx).value = None
|
||
|
||
# Write DataFrame rows
|
||
for r_idx, row_data in enumerate(dataframe_to_rows(df, index=False, header=False), 2):
|
||
for c_idx, value in enumerate(row_data, 1):
|
||
ws.cell(row=r_idx, column=c_idx, value=value)
|
||
|
||
wb.save(file_path)
|
||
return True, total_replaced
|
||
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
def step2_replace_omitted(skip: bool = False) -> Dict:
|
||
"""Step 2: Replace omitted values with '--'"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 2: Omitted Value Replacement")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 2: OMITTED VALUE REPLACEMENT")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
try:
|
||
omission_codes, _ = load_question_mapping()
|
||
print(f"📊 Loaded {len(omission_codes)} omitted question codes")
|
||
print()
|
||
except Exception as e:
|
||
print(f"❌ ERROR loading mapping: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
results = {
|
||
'total_files': 0,
|
||
'processed': 0,
|
||
'failed': [],
|
||
'total_values_replaced': 0
|
||
}
|
||
|
||
for age_group, files in DOMAIN_FILES.items():
|
||
print(f"📂 Processing {age_group.upper()} files...")
|
||
print("-" * 80)
|
||
|
||
for file_name in files:
|
||
results['total_files'] += 1
|
||
file_path = OUTPUT_DIR / age_group / "5_domain" / file_name
|
||
|
||
if not file_path.exists():
|
||
print(f" ⚠️ SKIP: {file_name} (not found)")
|
||
results['failed'].append((file_name, "File not found"))
|
||
continue
|
||
|
||
print(f" 🔄 {file_name}")
|
||
success, result = replace_omitted_values(file_path, omission_codes)
|
||
|
||
if success:
|
||
results['processed'] += 1
|
||
if isinstance(result, int):
|
||
results['total_values_replaced'] += result
|
||
if result > 0:
|
||
print(f" ✅ Replaced {result} values in omitted columns")
|
||
else:
|
||
print(f" ℹ️ No omitted columns found")
|
||
else:
|
||
print(f" ✅ Processed")
|
||
else:
|
||
results['failed'].append((file_name, result))
|
||
print(f" ❌ Error: {result}")
|
||
print()
|
||
|
||
print("=" * 80)
|
||
print(f"✅ STEP 2 COMPLETE: {results['processed']}/{results['total_files']} files processed")
|
||
print(f" Total values replaced: {results['total_values_replaced']:,}")
|
||
if results['failed']:
|
||
print(f" Failed: {len(results['failed'])} files")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
return {'success': len(results['failed']) == 0, **results}
|
||
|
||
# ============================================================================
|
||
# STEP 3: QUALITY VERIFICATION
|
||
# ============================================================================
|
||
|
||
def verify_file_quality(file_path: Path, domain_name: str, age_group: str) -> Dict:
|
||
"""Comprehensive quality check for a single file"""
|
||
results = {
|
||
'file': file_path.name,
|
||
'domain': domain_name,
|
||
'age_group': age_group,
|
||
'status': 'PASS',
|
||
'issues': [],
|
||
'metrics': {}
|
||
}
|
||
|
||
try:
|
||
df = pd.read_excel(file_path, engine='openpyxl')
|
||
|
||
# Basic metrics
|
||
results['metrics']['total_rows'] = len(df)
|
||
results['metrics']['total_cols'] = len(df.columns)
|
||
|
||
# Check ID column
|
||
id_col = 'Student CPID' if 'Student CPID' in df.columns else 'Participant'
|
||
if id_col not in df.columns:
|
||
results['status'] = 'FAIL'
|
||
results['issues'].append('Missing ID column')
|
||
return results
|
||
|
||
# Check unique IDs
|
||
unique_ids = df[id_col].dropna().nunique()
|
||
results['metrics']['unique_ids'] = unique_ids
|
||
if unique_ids != len(df):
|
||
results['status'] = 'FAIL'
|
||
results['issues'].append(f'Duplicate IDs: {unique_ids}/{len(df)}')
|
||
|
||
# Data density
|
||
metadata_cols = {'Participant', 'First Name', 'Last Name', 'Student CPID', 'Age', 'Gender', 'Age Category'}
|
||
question_cols = [c for c in df.columns if c not in metadata_cols]
|
||
question_df = df[question_cols]
|
||
|
||
# Count non-omitted questions for density
|
||
total_cells = len(question_df) * len(question_df.columns)
|
||
# Count cells that are not "--" and not null
|
||
valid_cells = ((question_df != "--") & question_df.notna()).sum().sum()
|
||
density = (valid_cells / total_cells) * 100 if total_cells > 0 else 0
|
||
results['metrics']['data_density'] = round(density, 2)
|
||
|
||
if density < 95:
|
||
results['status'] = 'WARN' if results['status'] == 'PASS' else results['status']
|
||
results['issues'].append(f'Low data density: {density:.2f}%')
|
||
|
||
# Response variance
|
||
numeric_df = question_df.apply(pd.to_numeric, errors='coerce')
|
||
numeric_df = numeric_df.replace("--", pd.NA)
|
||
std_devs = numeric_df.std(axis=1)
|
||
avg_variance = std_devs.mean()
|
||
results['metrics']['avg_variance'] = round(avg_variance, 3)
|
||
|
||
if avg_variance < 0.5:
|
||
results['status'] = 'WARN' if results['status'] == 'PASS' else results['status']
|
||
results['issues'].append(f'Low response variance: {avg_variance:.3f}')
|
||
|
||
# Check header colors (sample check)
|
||
try:
|
||
wb = load_workbook(file_path)
|
||
ws = wb.active
|
||
headers = [cell.value for cell in ws[1]]
|
||
colored_headers = 0
|
||
for col_idx, header in enumerate(headers, start=1):
|
||
cell_font = ws.cell(row=1, column=col_idx).font
|
||
if cell_font and cell_font.color:
|
||
colored_headers += 1
|
||
results['metrics']['colored_headers'] = colored_headers
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
results['status'] = 'FAIL'
|
||
results['issues'].append(f'Error: {str(e)}')
|
||
|
||
return results
|
||
|
||
def step3_quality_verification(skip: bool = False) -> Dict:
|
||
"""Step 3: Comprehensive quality verification"""
|
||
if skip:
|
||
print("⏭️ Skipping Step 3: Quality Verification")
|
||
return {'skipped': True}
|
||
|
||
print("=" * 80)
|
||
print("STEP 3: QUALITY VERIFICATION")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
results = {
|
||
'total_files': 0,
|
||
'passed': 0,
|
||
'warnings': 0,
|
||
'failed': 0,
|
||
'file_results': []
|
||
}
|
||
|
||
domain_names = {
|
||
'Personality_14-17.xlsx': 'Personality',
|
||
'Grit_14-17.xlsx': 'Grit',
|
||
'Emotional_Intelligence_14-17.xlsx': 'Emotional Intelligence',
|
||
'Vocational_Interest_14-17.xlsx': 'Vocational Interest',
|
||
'Learning_Strategies_14-17.xlsx': 'Learning Strategies',
|
||
'Personality_18-23.xlsx': 'Personality',
|
||
'Grit_18-23.xlsx': 'Grit',
|
||
'Emotional_Intelligence_18-23.xlsx': 'Emotional Intelligence',
|
||
'Vocational_Interest_18-23.xlsx': 'Vocational Interest',
|
||
'Learning_Strategies_18-23.xlsx': 'Learning Strategies',
|
||
}
|
||
|
||
for age_group, files in DOMAIN_FILES.items():
|
||
print(f"📂 Verifying {age_group.upper()} files...")
|
||
print("-" * 80)
|
||
|
||
for file_name in files:
|
||
results['total_files'] += 1
|
||
file_path = OUTPUT_DIR / age_group / "5_domain" / file_name
|
||
|
||
if not file_path.exists():
|
||
print(f" ❌ {file_name}: NOT FOUND")
|
||
results['failed'] += 1
|
||
continue
|
||
|
||
domain_name = domain_names.get(file_name, 'Unknown')
|
||
file_result = verify_file_quality(file_path, domain_name, age_group)
|
||
results['file_results'].append(file_result)
|
||
|
||
status_icon = "✅" if file_result['status'] == 'PASS' else "⚠️" if file_result['status'] == 'WARN' else "❌"
|
||
print(f" {status_icon} {file_name}")
|
||
print(f" Rows: {file_result['metrics'].get('total_rows', 'N/A')}, "
|
||
f"Cols: {file_result['metrics'].get('total_cols', 'N/A')}, "
|
||
f"Density: {file_result['metrics'].get('data_density', 'N/A')}%, "
|
||
f"Variance: {file_result['metrics'].get('avg_variance', 'N/A')}")
|
||
|
||
if file_result['issues']:
|
||
for issue in file_result['issues']:
|
||
print(f" ⚠️ {issue}")
|
||
|
||
if file_result['status'] == 'PASS':
|
||
results['passed'] += 1
|
||
elif file_result['status'] == 'WARN':
|
||
results['warnings'] += 1
|
||
else:
|
||
results['failed'] += 1
|
||
print()
|
||
|
||
print("=" * 80)
|
||
print(f"✅ STEP 3 COMPLETE: {results['passed']} passed, {results['warnings']} warnings, {results['failed']} failed")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
# Save detailed report
|
||
report_path = OUTPUT_DIR / "quality_report.json"
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
'timestamp': datetime.now().isoformat(),
|
||
'summary': {
|
||
'total_files': results['total_files'],
|
||
'passed': results['passed'],
|
||
'warnings': results['warnings'],
|
||
'failed': results['failed']
|
||
},
|
||
'file_results': results['file_results']
|
||
}, f, indent=2, ensure_ascii=False)
|
||
|
||
print(f"📄 Detailed quality report saved: {report_path}")
|
||
print()
|
||
|
||
return {'success': results['failed'] == 0, **results}
|
||
|
||
# ============================================================================
|
||
# MAIN ORCHESTRATION
|
||
# ============================================================================
|
||
|
||
def main():
|
||
"""Main post-processing orchestration"""
|
||
print("=" * 80)
|
||
print("COMPREHENSIVE POST-PROCESSOR")
|
||
print("Simulated Assessment Engine - Production Ready")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
# Parse command line arguments
|
||
skip_colors = '--skip-colors' in sys.argv
|
||
skip_replacement = '--skip-replacement' in sys.argv
|
||
skip_quality = '--skip-quality' in sys.argv
|
||
|
||
# Verify prerequisites
|
||
if not MAPPING_FILE.exists():
|
||
print(f"❌ ERROR: Mapping file not found: {MAPPING_FILE}")
|
||
print(" Please ensure AllQuestions.xlsx exists in data/ directory")
|
||
sys.exit(1)
|
||
|
||
if not OUTPUT_DIR.exists():
|
||
print(f"❌ ERROR: Output directory not found: {OUTPUT_DIR}")
|
||
print(" Please run simulation first (python main.py --full)")
|
||
sys.exit(1)
|
||
|
||
# Execute steps
|
||
all_results = {}
|
||
|
||
# Step 1: Header Coloring
|
||
all_results['step1'] = step1_color_headers(skip=skip_colors)
|
||
|
||
# Step 2: Omitted Value Replacement
|
||
all_results['step2'] = step2_replace_omitted(skip=skip_replacement)
|
||
|
||
# Step 3: Quality Verification
|
||
all_results['step3'] = step3_quality_verification(skip=skip_quality)
|
||
|
||
# Final summary
|
||
print("=" * 80)
|
||
print("POST-PROCESSING COMPLETE")
|
||
print("=" * 80)
|
||
|
||
if not skip_colors:
|
||
s1 = all_results['step1']
|
||
if s1.get('success', False):
|
||
print(f"✅ Step 1 (Header Coloring): {s1.get('processed', 0)}/{s1.get('total_files', 0)} files")
|
||
else:
|
||
print(f"❌ Step 1 (Header Coloring): Failed")
|
||
|
||
if not skip_replacement:
|
||
s2 = all_results['step2']
|
||
if s2.get('success', False):
|
||
print(f"✅ Step 2 (Omitted Replacement): {s2.get('processed', 0)}/{s2.get('total_files', 0)} files, {s2.get('total_values_replaced', 0):,} values")
|
||
else:
|
||
print(f"❌ Step 2 (Omitted Replacement): Failed")
|
||
|
||
if not skip_quality:
|
||
s3 = all_results['step3']
|
||
if s3.get('success', False):
|
||
print(f"✅ Step 3 (Quality Verification): {s3.get('passed', 0)} passed, {s3.get('warnings', 0)} warnings")
|
||
else:
|
||
print(f"❌ Step 3 (Quality Verification): {s3.get('failed', 0)} files failed")
|
||
|
||
print("=" * 80)
|
||
|
||
# Exit code
|
||
overall_success = all(
|
||
r.get('success', True) or r.get('skipped', False)
|
||
for r in [all_results.get('step1', {}), all_results.get('step2', {}), all_results.get('step3', {})]
|
||
)
|
||
|
||
sys.exit(0 if overall_success else 1)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|