codenuk_backend_mine/scripts/rabbitmq/test-queues.py
2025-10-10 08:56:39 +05:30

146 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RabbitMQ Queue Testing Script
Tests all queues and exchanges for the development pipeline
"""
import pika
import json
import sys
import time
from datetime import datetime
def test_rabbitmq_connection():
"""Test basic RabbitMQ connection"""
try:
# Connection parameters
credentials = pika.PlainCredentials('pipeline_admin', 'rabbit_secure_2024')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
# Establish connection
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("✅ Successfully connected to RabbitMQ")
# Test exchanges
exchanges = ['pipeline.direct', 'pipeline.fanout', 'pipeline.topic', 'pipeline.deadletter']
for exchange in exchanges:
try:
channel.exchange_declare(exchange=exchange, passive=True)
print(f"✅ Exchange '{exchange}' exists and is accessible")
except Exception as e:
print(f"❌ Exchange '{exchange}' error: {e}")
# Test queues
queues = [
'requirements.processing',
'techstack.selection',
'architecture.design',
'code.generation',
'test.generation',
'deployment.management',
'notifications',
'deadletter'
]
for queue in queues:
try:
method = channel.queue_declare(queue=queue, passive=True)
print(f"✅ Queue '{queue}' exists (messages: {method.method.message_count})")
except Exception as e:
print(f"❌ Queue '{queue}' error: {e}")
# Test message publishing and consuming
test_message = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Pipeline test message"
}
# Publish test message
channel.basic_publish(
exchange='pipeline.direct',
routing_key='requirements',
body=json.dumps(test_message),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
content_type='application/json',
timestamp=int(time.time())
)
)
print("✅ Test message published successfully")
# Consume test message
method, properties, body = channel.basic_get(queue='requirements.processing', auto_ack=True)
if method:
received_message = json.loads(body)
print(f"✅ Test message consumed successfully: {received_message['message']}")
else:
print("⚠️ No message received (queue might be empty)")
connection.close()
print("✅ RabbitMQ test completed successfully")
return True
except Exception as e:
print(f"❌ RabbitMQ connection failed: {e}")
return False
def show_queue_stats():
"""Show statistics for all queues"""
try:
credentials = pika.PlainCredentials('pipeline_admin', 'rabbit_secure_2024')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("\n📊 Queue Statistics:")
print("-" * 50)
queues = [
'requirements.processing',
'techstack.selection',
'architecture.design',
'code.generation',
'test.generation',
'deployment.management',
'notifications',
'deadletter'
]
for queue in queues:
try:
method = channel.queue_declare(queue=queue, passive=True)
print(f"{queue:<25} | Messages: {method.method.message_count:>3} | Consumers: {method.method.consumer_count:>2}")
except Exception as e:
print(f"{queue:<25} | Error: {str(e)[:20]}")
connection.close()
except Exception as e:
print(f"❌ Failed to get queue statistics: {e}")
if __name__ == "__main__":
print("🧪 Testing RabbitMQ Configuration")
print("=" * 40)
if test_rabbitmq_connection():
show_queue_stats()
sys.exit(0)
else:
sys.exit(1)