from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient from src.infrastructure.kafka.kafka.config import kafka_config from src.infrastructure.observability.logger import logger import json import os from datetime import datetime """ Kafka Producer Service Production-ready event producer with error handling, retries, and DLQ support """ class KafkaProducerService: def __init__(self): self.producer = None self.is_connected = False self.dlq_enabled = os.getenv('KAFKA_DLQ_ENABLED', 'true').lower() == 'true' self.max_retries = int(os.getenv('KAFKA_MAX_RETRIES', '3')) async def connect(self): """Connect producer to Kafka""" try: if self.is_connected: logger.warning('Kafka Producer: Already connected') return config = kafka_config.get_config() self.producer = Producer(config) self.is_connected = True logger.info('Kafka Producer: Connected successfully') except Exception as e: logger.error(f'Kafka Producer: Connection failed: {e}') self.is_connected = False raise async def disconnect(self): """Disconnect producer from Kafka""" try: if self.producer and self.is_connected: self.producer.flush(timeout=10) self.producer = None self.is_connected = False logger.info('Kafka Producer: Disconnected') except Exception as e: logger.error(f'Kafka Producer: Disconnect error: {e}') async def publish(self, topic: str, data: dict, key: str = None, headers: dict = None): """ Publish event to Kafka topic Args: topic: Topic name data: Event data key: Optional message key (for partitioning) headers: Optional message headers Returns: bool: True if published successfully """ try: if not self.is_connected: logger.warning('Kafka Producer: Not connected, attempting to connect...') await self.connect() message_value = { **data, 'timestamp': datetime.utcnow().isoformat(), 'source': kafka_config.get_client_id() } message_headers = { 'content-type': 'application/json', 'event-type': data.get('eventType', topic), **(headers or {}) } message_key = key or data.get('id') or data.get('key') def delivery_callback(err, msg): if err: logger.error(f'Kafka Producer: Message delivery failed: {err}') else: logger.debug(f'Kafka Producer: Event published to {msg.topic()} partition {msg.partition()} offset {msg.offset()}') self.producer.produce( topic=topic, key=message_key.encode() if message_key else None, value=json.dumps(message_value).encode(), headers=message_headers, callback=delivery_callback ) # Trigger delivery callbacks self.producer.poll(0) return True except Exception as e: logger.error(f'Kafka Producer: Publish error: {e}', extra={'topic': topic}) # Send to DLQ if enabled if self.dlq_enabled: await self._send_to_dlq(topic, data, e) return False async def publish_batch(self, events: list): """ Publish multiple events in batch Args: events: Array of {topic, data, key, headers} Returns: bool: True if all published successfully """ try: if not self.is_connected: await self.connect() for event in events: await self.publish( topic=event['topic'], data=event['data'], key=event.get('key'), headers=event.get('headers') ) # Flush all messages self.producer.flush(timeout=10) logger.debug(f'Kafka Producer: Batch published {len(events)} events') return True except Exception as e: logger.error(f'Kafka Producer: Batch publish error: {e}') return False async def _send_to_dlq(self, topic: str, data: dict, error: Exception): """Send failed message to Dead Letter Queue""" try: dlq_topic = f'{topic}.dlq' dlq_data = { 'originalTopic': topic, 'originalData': data, 'error': { 'message': str(error), 'type': type(error).__name__, 'timestamp': datetime.utcnow().isoformat() }, 'retryCount': 0 } await self.publish(dlq_topic, dlq_data) logger.warning(f'Kafka Producer: Sent to DLQ', extra={'originalTopic': topic, 'dlqTopic': dlq_topic}) except Exception as dlq_error: logger.error(f'Kafka Producer: DLQ send failed: {dlq_error}', extra={'topic': topic}) def is_ready(self): """Check if producer is connected""" return self.is_connected and self.producer is not None # Singleton instance kafka_producer = KafkaProducerService() # Auto-connect on module load (optional) if os.getenv('KAFKA_AUTO_CONNECT', 'true').lower() != 'false': import asyncio asyncio.create_task(kafka_producer.connect())