diff --git a/ai_asistant3.py b/ai_asistant3.py new file mode 100644 index 0000000..f91ab37 --- /dev/null +++ b/ai_asistant3.py @@ -0,0 +1,643 @@ +#!/usr/bin/env python3 +""" +Truck HPC AI Assistant - POC Demo (ENHANCED HINDI SUPPORT) +Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival +ENHANCED: Better Hindi speech recognition and synthesis +NATURAL VOICE: Downloads and uses better quality voices +OFFLINE: 100% offline capability +MULTILINGUAL: English and Hindi support +FIXED: Auto-detects correct audio sample rate +FIXED: Proper loop control - waits for speech to complete before next input +""" + +import requests +import json +import time +import psutil +import sounddevice as sd +import numpy as np +import subprocess +import os +import re +import tempfile +import wave +from multiprocessing import Process, Queue +from faster_whisper import WhisperModel +from datetime import datetime + +# -------------------------------------------------------------- +# AUDIO DEVICE DETECTION +# -------------------------------------------------------------- +def get_default_samplerate(): + """Detect the default sample rate supported by the input device""" + try: + device_info = sd.query_devices(kind='input') + default_sr = int(device_info['default_samplerate']) + print(f"🎤 Detected audio device: {device_info['name']}") + print(f"🎵 Using sample rate: {default_sr} Hz") + return default_sr + except Exception as e: + print(f"⚠️ Could not detect sample rate, using 44100 Hz: {e}") + return 44100 + +# -------------------------------------------------------------- +# TEXT CLEANING FUNCTION +# -------------------------------------------------------------- +def clean_text_for_speech(text): + """Removes markdown formatting and special characters""" + text = re.sub(r'#{1,6}\s*', '', text) + text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = re.sub(r'__(.+?)__', r'\1', text) + text = re.sub(r'\*(.+?)\*', r'\1', text) + text = re.sub(r'_(.+?)_', r'\1', text) + text = re.sub(r'```[\w]*\n', '', text) + text = re.sub(r'```', '', text) + text = re.sub(r'`(.+?)`', r'\1', text) + text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) + text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +# -------------------------------------------------------------- +# GTTS CACHED TTS WORKER (Enhanced Hindi Support) +# -------------------------------------------------------------- +def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): + """ + Uses gTTS with local caching for natural voice. + ENHANCED: Better Hindi voice quality and pronunciation + First run needs internet to download, then works offline. + Supports English and Hindi. + """ + try: + from gtts import gTTS + import hashlib + + # Create cache directory + cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") + os.makedirs(cache_dir, exist_ok=True) + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") + print("💡 First run needs internet, then works offline from cache\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + # Create hash for caching (include language in hash) + text_hash = hashlib.md5(f"{current_lang}_{voice_gender}_{clean_text}".encode()).hexdigest() + cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") + + # Check if cached + if not os.path.exists(cache_file): + # Generate with gTTS (needs internet first time) + if current_lang == "en": + tld = "co.uk" if voice_gender == "female" else "com" + tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) + else: # Hindi - ENHANCED + # Use slower speed for better Hindi pronunciation + tts = gTTS(text=clean_text, lang='hi', slow=False) + + tts.save(cache_file) + + # Play using mpg123 (faster than converting to WAV) + subprocess.run(['mpg123', '-q', cache_file], check=True) + + # Natural pause - adjusted for Hindi + if current_lang == "hi": + # Hindi needs slightly longer pauses for better comprehension + if clean_text.endswith(("?", "!", "।")): + time.sleep(0.25) + elif clean_text.endswith("."): + time.sleep(0.20) + else: + time.sleep(0.08) + else: + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + # Fallback to espeak if gTTS fails + try: + if isinstance(data, dict): + lang_voice = 'hi' if data.get('lang') == 'hi' else 'en' + # Use slower speed for Hindi in espeak as well + speed = '150' if lang_voice == 'hi' else '175' + subprocess.run(['espeak-ng', '-v', lang_voice, '-s', speed, clean_text], + check=True, capture_output=True) + else: + subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) + except: + pass + + except ImportError: + print("\n❌ gTTS not installed. Install with: pip install gtts") + print("Falling back to espeak-ng...\n") + espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) + + +# -------------------------------------------------------------- +# ESPEAK-NG TTS WORKER (Fallback with Enhanced Hindi) +# -------------------------------------------------------------- +def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): + """Fallback to eSpeak-NG with enhanced Hindi support""" + + try: + subprocess.run(['espeak-ng', '--version'], + capture_output=True, text=True, timeout=2, check=True) + except: + print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") + return + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + espeak_voice = 'hi' if current_lang == 'hi' else voice + # Slower speed for Hindi for better pronunciation + speed = '150' if current_lang == 'hi' else '175' + subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', speed, clean_text], + check=True, capture_output=True) + + # Adjusted pauses for Hindi + if current_lang == 'hi': + if clean_text.endswith(("?", "!", "।")): + time.sleep(0.25) + elif clean_text.endswith("."): + time.sleep(0.20) + else: + time.sleep(0.08) + else: + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + + +# -------------------------------------------------------------- +# AUDIO RESAMPLING FUNCTION +# -------------------------------------------------------------- +def resample_audio(audio, orig_sr, target_sr=16000): + """Resample audio to target sample rate for Whisper""" + if orig_sr == target_sr: + return audio + + # Simple resampling using linear interpolation + duration = len(audio) / orig_sr + target_length = int(duration * target_sr) + + from scipy import signal + resampled = signal.resample(audio, target_length) + return resampled.astype(np.float32) + + +# -------------------------------------------------------------- +# MAIN ASSISTANT CLASS (ENHANCED HINDI SUPPORT) +# -------------------------------------------------------------- +class TruckAssistant: + def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", + voice_gender="female", use_gtts=True, language="en"): + self.model = model + self.base_url = base_url + self.conversation_history = [] + self.language = language + + # Detect and store the device's native sample rate + self.native_samplerate = get_default_samplerate() + self.whisper_samplerate = 16000 # Whisper expects 16kHz + + # ENHANCED: Better system prompts for Hindi + self.system_prompts = { + "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", + "hi": """आप ट्रक ड्राइवरों के लिए एक सहायक AI सहायक हैं। +स्पष्ट, संक्षिप्त और व्यावहारिक उत्तर प्रदान करें। +कृपया केवल हिंदी में उत्तर दें। सरल और समझने योग्य भाषा का उपयोग करें। +तकनीकी शब्दों को सरल हिंदी में समझाएं।""" + } + + # ENHANCED: Use larger Whisper model for Hindi for better accuracy + if language == "hi": + whisper_model = "small" # Better for Hindi than tiny + print(f"Loading Whisper model ({whisper_model} - Enhanced for Hindi accuracy)...") + compute_type = "int8" # Balanced performance + else: + whisper_model = "tiny.en" + print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") + compute_type = "int8" + + self.whisper = WhisperModel( + whisper_model, + device="cpu", + compute_type=compute_type, + num_workers=2 + ) + + # TTS queue + process + self.tts_queue = Queue() + + if use_gtts: + self.tts_process = Process( + target=gtts_tts_worker, + args=(self.tts_queue, voice_gender, language), + daemon=True + ) + else: + voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" + self.tts_process = Process( + target=espeak_tts_worker, + args=(self.tts_queue, voice, language), + daemon=True + ) + + self.tts_process.start() + + # ========== ENHANCED MIC RECORDING WITH BETTER VAD FOR HINDI ========== + def record_audio(self, max_duration=8): + """Records audio with Voice Activity Detection - Enhanced for Hindi""" + print("\n🎤 सुन रहा हूँ... अब बोलें। / Listening... Speak now.\n") + + # Adjusted thresholds for better Hindi detection + silence_threshold = 0.008 # Slightly lower for Hindi consonants + silence_duration = 2.0 if self.language == "hi" else 1.5 # Longer for Hindi + + chunk_size = int(0.1 * self.native_samplerate) + max_chunks = int(max_duration / 0.1) + + audio_chunks = [] + silent_chunks = 0 + speech_detected = False + + try: + stream = sd.InputStream( + samplerate=self.native_samplerate, + channels=1, + dtype='float32' + ) + stream.start() + + for i in range(max_chunks): + chunk, _ = stream.read(chunk_size) + audio_chunks.append(chunk) + + energy = np.sqrt(np.mean(chunk**2)) + + if energy > silence_threshold: + speech_detected = True + silent_chunks = 0 + elif speech_detected: + silent_chunks += 1 + + if silent_chunks > (silence_duration / 0.1): + print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") + break + + stream.stop() + stream.close() + + audio = np.concatenate(audio_chunks, axis=0).flatten() + + # Resample to 16kHz for Whisper + if self.native_samplerate != self.whisper_samplerate: + print(f"Resampling audio from {self.native_samplerate}Hz to {self.whisper_samplerate}Hz...") + audio = resample_audio(audio, self.native_samplerate, self.whisper_samplerate) + + return audio + + except Exception as e: + print(f"❌ Recording error: {e}") + return None + + # ========== ENHANCED STT FOR HINDI ========== + def speech_to_text(self, audio): + """Enhanced transcription with better Hindi support""" + if audio is None: + return "" + + status_msg = "बोली को टेक्स्ट में बदल रहे हैं..." if self.language == "hi" else "Converting speech to text..." + print(status_msg) + + lang_code = "hi" if self.language == "hi" else "en" + + try: + # ENHANCED: Better parameters for Hindi recognition + if self.language == "hi": + segments, info = self.whisper.transcribe( + audio, + beam_size=5, # Higher beam size for better Hindi accuracy + vad_filter=True, + vad_parameters=dict( + threshold=0.3, # Lower threshold for Hindi + min_speech_duration_ms=100, + min_silence_duration_ms=500 + ), + language="hi", + condition_on_previous_text=True, # Better context for Hindi + initial_prompt="ट्रक, ड्राइवर, सड़क, गाड़ी" # Domain-specific Hindi prompt + ) + else: + segments, info = self.whisper.transcribe( + audio, + beam_size=1, + vad_filter=True, + language="en", + condition_on_previous_text=False + ) + + text = " ".join(seg.text for seg in segments).strip() + + # Display with proper language prefix + prefix = "आपने कहा:" if self.language == "hi" else "You said:" + print(f"{prefix} {text}\n") + + return text + except Exception as e: + print(f"❌ Transcription error: {e}") + return "" + + # ========== VOICE CHAT PIPELINE ========== + def voice_chat(self): + audio = self.record_audio() + + if audio is None: + msg = "रिकॉर्डिंग विफल। पुनः प्रयास करें।" if self.language == "hi" else "Recording failed. Try again." + print(f"{msg}\n") + return + + text = self.speech_to_text(audio) + + if not text: + msg = "कोई बोली नहीं सुनी। पुनः प्रयास करें।" if self.language == "hi" else "No speech detected. Try again." + print(f"{msg}\n") + return + + self.chat(text) + + # ========== ENHANCED LLaMA CHAT WITH BETTER HINDI SUPPORT ========== + def chat(self, prompt, stream=True): + url = f"{self.base_url}/api/chat" + + # Prepare messages with enhanced system prompt + messages = [{"role": "system", "content": self.system_prompts[self.language]}] + messages.extend(self.conversation_history) + messages.append({"role": "user", "content": prompt}) + + # ENHANCED: Better parameters for Hindi generation + if self.language == "hi": + options = { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 200, # Slightly more for Hindi explanations + "num_ctx": 2048, + "repeat_penalty": 1.1, # Reduce repetition in Hindi + "stop": ["```", "---"] # Stop tokens + } + else: + options = { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 150, + "num_ctx": 2048 + } + + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": options + } + + assistant_prefix = "सहायक:" if self.language == "hi" else "Assistant:" + print(f"\n{assistant_prefix} ", end="", flush=True) + + start_time = time.time() + full_response = "" + token_count = 0 + + try: + response = requests.post(url, json=payload, stream=True, timeout=30) + + if stream: + sentence_buffer = "" + + for line in response.iter_lines(): + if not line: + continue + + chunk = json.loads(line) + + if "message" in chunk and "content" in chunk["message"]: + content = chunk["message"]["content"] + + print(content, end="", flush=True) + + full_response += content + sentence_buffer += content + token_count += 1 + + # ENHANCED: Better sentence detection for Hindi + if self.language == "hi": + # Hindi sentence endings: । (purna viram), ? and ! + if any(sentence_buffer.endswith(p) for p in ["।", "?", "!", ".", ","]): + stripped = sentence_buffer.strip() + if len(stripped) > 10: # Longer minimum for Hindi + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + else: + if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";"]): + stripped = sentence_buffer.strip() + if len(stripped) > 5: + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + + if sentence_buffer.strip(): + self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) + + else: + data = response.json() + full_response = data["message"]["content"] + print(full_response) + self.tts_queue.put({"text": full_response, "lang": self.language}) + + inference_time = time.time() - start_time + tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 + print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") + + self.conversation_history.append({"role": "user", "content": prompt}) + self.conversation_history.append({"role": "assistant", "content": full_response}) + + # Wait for TTS queue to be empty (all speech completed) + wait_msg = "[भाषण पूर्ण होने की प्रतीक्षा में...]" if self.language == "hi" else "[Waiting for speech to complete...]" + print(f"\n{wait_msg}") + while not self.tts_queue.empty(): + time.sleep(0.1) + + # Additional small delay to ensure the last audio finishes playing + time.sleep(0.5) + + return full_response + + except Exception as e: + print(f"\n❌ Error: {e}") + return None + + # ========== CLEANUP ========== + def stop(self): + self.tts_queue.put("__EXIT__") + self.tts_process.terminate() + + +# -------------------------------------------------------------- +# MAIN +# -------------------------------------------------------------- +def main(): + print("\n🚀 Truck Assistant - Raspberry Pi 5") + print("🎤 Natural Human Voice (Google TTS)") + print("🌍 Enhanced Hindi Support\n") + + # Language selection + print("Select Language / भाषा चुनें:") + print("1. English") + print("2. Hindi (हिंदी) - ENHANCED") + + lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" + language = "en" if lang_choice == "1" else "hi" + + # Simple voice selection + print("\nSelect Voice / आवाज़ चुनें:") + print("1. Female (Natural / महिला)") + print("2. Male (Natural / पुरुष)") + + voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" + voice_gender = "female" if voice_choice == "1" else "male" + + lang_display = "English" if language == "en" else "हिंदी (ENHANCED)" + print(f"\n✅ Language: {lang_display}") + print(f"✅ Voice: {voice_gender.capitalize()}") + + if language == "hi": + print("\n🔥 Hindi Enhancements:") + print(" • Better speech recognition (Whisper 'small' model)") + print(" • Improved pronunciation and pacing") + print(" • Enhanced LLM responses in Hindi") + print(" • Longer pause detection for natural speech") + + print("\n🔥 Installing dependencies if needed...\n") + + assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) + + # Check Ollama + try: + requests.get("http://localhost:11434/api/tags", timeout=5) + print("✅ Ollama running\n") + except: + print("❌ Ollama not running. Start with: ollama serve\n") + return + + print("="*60) + if language == "hi": + print("मोड चुनें:") + print("1. डेमो") + print("2. टेक्स्ट चैट") + print("3. वॉयस चैट (बेहतर हिंदी समर्थन)") + else: + print("Mode:") + print("1. Demo") + print("2. Text chat") + print("3. Voice chat") + print("="*60) + + mode = input("\nSelect (1-3): ").strip() + + if mode == "3": + if language == "hi": + print("\n🎤 वॉयस मोड - बोलने के लिए Enter दबाएं, बाहर निकलने के लिए Ctrl+C\n") + print("💡 टिप: स्पष्ट रूप से और थोड़ी धीमी गति से बोलें\n") + else: + print("\n🎤 VOICE MODE - Press Enter to speak, Ctrl+C to exit\n") + + try: + while True: + prompt_msg = "बोलने के लिए Enter दबाएं..." if language == "hi" else "Press Enter to speak..." + input(prompt_msg) + assistant.voice_chat() + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + bye_msg = "\n\n👋 धन्यवाद! फिर मिलेंगे..." if language == "hi" else "\n\n👋 Exiting gracefully..." + print(bye_msg) + assistant.stop() + print("Goodbye! / अलविदा!") + else: + if language == "hi": + print("\n💬 टेक्स्ट मोड - बाहर निकलने के लिए 'quit' लिखें\n") + else: + print("\n💬 TEXT MODE - type 'quit' to exit\n") + + try: + while True: + prompt_txt = "आप: " if language == "hi" else "You: " + user_input = input(prompt_txt).strip() + if user_input.lower() in ["quit", "exit", "q", "बाहर", "बंद"]: + assistant.stop() + bye_msg = "\n👋 धन्यवाद! फिर मिलेंगे!" if language == "hi" else "\n👋 Goodbye!" + print(bye_msg) + break + if user_input: + assistant.chat(user_input) + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + bye_msg = "\n\n👋 धन्यवाद! फिर मिलेंगे..." if language == "hi" else "\n\n👋 Exiting gracefully..." + print(bye_msg) + assistant.stop() + print("Goodbye! / अलविदा!") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ai_asistant4.py b/ai_asistant4.py new file mode 100644 index 0000000..6779af2 --- /dev/null +++ b/ai_asistant4.py @@ -0,0 +1,621 @@ +#!/usr/bin/env python3 +""" +Truck HPC AI Assistant - POC Demo (ENHANCED HINDI SUPPORT) +Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival +ENHANCED: Better Hindi speech recognition and synthesis +NATURAL VOICE: Downloads and uses better quality voices +OFFLINE: 100% offline capability +MULTILINGUAL: English and Hindi support +FIXED: Auto-detects correct audio sample rate +FIXED: Proper loop control - waits for speech to complete before next input +""" + +import requests +import json +import time +import psutil +import sounddevice as sd +import numpy as np +import subprocess +import os +import re +import tempfile +import wave +from multiprocessing import Process, Queue +from faster_whisper import WhisperModel +from datetime import datetime + +# -------------------------------------------------------------- +# AUDIO DEVICE DETECTION +# -------------------------------------------------------------- +def get_default_samplerate(): + """Detect the default sample rate supported by the input device""" + try: + device_info = sd.query_devices(kind='input') + default_sr = int(device_info['default_samplerate']) + print(f"🎤 Detected audio device: {device_info['name']}") + print(f"🎵 Using sample rate: {default_sr} Hz") + return default_sr + except Exception as e: + print(f"⚠️ Could not detect sample rate, using 44100 Hz: {e}") + return 44100 + +# -------------------------------------------------------------- +# TEXT CLEANING FUNCTION +# -------------------------------------------------------------- +def clean_text_for_speech(text): + """Removes markdown formatting and special characters""" + text = re.sub(r'#{1,6}\s*', '', text) + text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = re.sub(r'__(.+?)__', r'\1', text) + text = re.sub(r'\*(.+?)\*', r'\1', text) + text = re.sub(r'_(.+?)_', r'\1', text) + text = re.sub(r'```[\w]*\n', '', text) + text = re.sub(r'```', '', text) + text = re.sub(r'`(.+?)`', r'\1', text) + text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) + text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +# -------------------------------------------------------------- +# GTTS CACHED TTS WORKER (Enhanced Hindi Support) +# -------------------------------------------------------------- +def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): + """ + Uses gTTS with local caching for natural voice. + ENHANCED: Better Hindi voice quality and pronunciation + First run needs internet to download, then works offline. + Supports English and Hindi. + """ + try: + from gtts import gTTS + import hashlib + + # Create cache directory + cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") + os.makedirs(cache_dir, exist_ok=True) + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") + print("💡 First run needs internet, then works offline from cache\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + # Create hash for caching (include language in hash) + text_hash = hashlib.md5(f"{current_lang}_{voice_gender}_{clean_text}".encode()).hexdigest() + cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") + + # Check if cached + if not os.path.exists(cache_file): + # Generate with gTTS (needs internet first time) + if current_lang == "en": + tld = "co.uk" if voice_gender == "female" else "com" + tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) + else: # Hindi - ENHANCED + # Use slower speed for better Hindi pronunciation + tts = gTTS(text=clean_text, lang='hi', slow=False) + + tts.save(cache_file) + + # Play using mpg123 (faster than converting to WAV) + subprocess.run(['mpg123', '-q', cache_file], check=True) + + # Natural pause - adjusted for Hindi + if current_lang == "hi": + # Hindi needs slightly longer pauses for better comprehension + if clean_text.endswith(("?", "!", "।")): + time.sleep(0.25) + elif clean_text.endswith("."): + time.sleep(0.20) + else: + time.sleep(0.08) + else: + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + # Fallback to espeak if gTTS fails + try: + if isinstance(data, dict): + lang_voice = 'hi' if data.get('lang') == 'hi' else 'en' + # Use slower speed for Hindi in espeak as well + speed = '150' if lang_voice == 'hi' else '175' + subprocess.run(['espeak-ng', '-v', lang_voice, '-s', speed, clean_text], + check=True, capture_output=True) + else: + subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) + except: + pass + + except ImportError: + print("\n❌ gTTS not installed. Install with: pip install gtts") + print("Falling back to espeak-ng...\n") + espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) + + +# -------------------------------------------------------------- +# ESPEAK-NG TTS WORKER (Fallback with Enhanced Hindi) +# -------------------------------------------------------------- +def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): + """Fallback to eSpeak-NG with enhanced Hindi support""" + + try: + subprocess.run(['espeak-ng', '--version'], + capture_output=True, text=True, timeout=2, check=True) + except: + print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") + return + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + espeak_voice = 'hi' if current_lang == 'hi' else voice + # Slower speed for Hindi for better pronunciation + speed = '150' if current_lang == 'hi' else '175' + subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', speed, clean_text], + check=True, capture_output=True) + + # Adjusted pauses for Hindi + if current_lang == 'hi': + if clean_text.endswith(("?", "!", "।")): + time.sleep(0.25) + elif clean_text.endswith("."): + time.sleep(0.20) + else: + time.sleep(0.08) + else: + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + + +# -------------------------------------------------------------- +# AUDIO RESAMPLING FUNCTION +# -------------------------------------------------------------- +def resample_audio(audio, orig_sr, target_sr=16000): + """Resample audio to target sample rate for Whisper""" + if orig_sr == target_sr: + return audio + + # Simple resampling using linear interpolation + duration = len(audio) / orig_sr + target_length = int(duration * target_sr) + + from scipy import signal + resampled = signal.resample(audio, target_length) + return resampled.astype(np.float32) + + +# -------------------------------------------------------------- +# MAIN ASSISTANT CLASS (ENHANCED HINDI SUPPORT) +# -------------------------------------------------------------- +class TruckAssistant: + def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", + voice_gender="female", use_gtts=True, language="en"): + self.model = model + self.base_url = base_url + self.conversation_history = [] + self.language = language + + # Detect and store the device's native sample rate + self.native_samplerate = get_default_samplerate() + self.whisper_samplerate = 16000 # Whisper expects 16kHz + + # ENHANCED: Better system prompts for Hindi + self.system_prompts = { + "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", + "hi": """You are a helpful AI assistant for truck drivers. +Provide clear, concise, and practical answers in Hindi only. +Always respond in simple Hindi language that truck drivers can easily understand. +Explain technical terms in simple Hindi words.""" + } + + # ENHANCED: Use larger Whisper model for Hindi for better accuracy + if language == "hi": + whisper_model = "small" # Better for Hindi than tiny + print(f"Loading Whisper model ({whisper_model} - Enhanced for Hindi accuracy)...") + compute_type = "int8" # Balanced performance + else: + whisper_model = "tiny.en" + print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") + compute_type = "int8" + + self.whisper = WhisperModel( + whisper_model, + device="cpu", + compute_type=compute_type, + num_workers=2 + ) + + # TTS queue + process + self.tts_queue = Queue() + + if use_gtts: + self.tts_process = Process( + target=gtts_tts_worker, + args=(self.tts_queue, voice_gender, language), + daemon=True + ) + else: + voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" + self.tts_process = Process( + target=espeak_tts_worker, + args=(self.tts_queue, voice, language), + daemon=True + ) + + self.tts_process.start() + + # ========== ENHANCED MIC RECORDING WITH BETTER VAD FOR HINDI ========== + def record_audio(self, max_duration=8): + """Records audio with Voice Activity Detection - Enhanced for Hindi""" + print("\nListening... Speak now.\n") + + # Adjusted thresholds for better Hindi detection + silence_threshold = 0.008 # Slightly lower for Hindi consonants + silence_duration = 2.0 if self.language == "hi" else 1.5 # Longer for Hindi + + chunk_size = int(0.1 * self.native_samplerate) + max_chunks = int(max_duration / 0.1) + + audio_chunks = [] + silent_chunks = 0 + speech_detected = False + + try: + stream = sd.InputStream( + samplerate=self.native_samplerate, + channels=1, + dtype='float32' + ) + stream.start() + + for i in range(max_chunks): + chunk, _ = stream.read(chunk_size) + audio_chunks.append(chunk) + + energy = np.sqrt(np.mean(chunk**2)) + + if energy > silence_threshold: + speech_detected = True + silent_chunks = 0 + elif speech_detected: + silent_chunks += 1 + + if silent_chunks > (silence_duration / 0.1): + print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") + break + + stream.stop() + stream.close() + + audio = np.concatenate(audio_chunks, axis=0).flatten() + + # Resample to 16kHz for Whisper + if self.native_samplerate != self.whisper_samplerate: + print(f"Resampling audio from {self.native_samplerate}Hz to {self.whisper_samplerate}Hz...") + audio = resample_audio(audio, self.native_samplerate, self.whisper_samplerate) + + return audio + + except Exception as e: + print(f"❌ Recording error: {e}") + return None + + # ========== ENHANCED STT FOR HINDI ========== + def speech_to_text(self, audio): + """Enhanced transcription with better Hindi support""" + if audio is None: + return "" + + print("Converting speech to text...") + + lang_code = "hi" if self.language == "hi" else "en" + + try: + # ENHANCED: Better parameters for Hindi recognition + if self.language == "hi": + segments, info = self.whisper.transcribe( + audio, + beam_size=5, # Higher beam size for better Hindi accuracy + vad_filter=True, + vad_parameters=dict( + threshold=0.3, # Lower threshold for Hindi + min_speech_duration_ms=100, + min_silence_duration_ms=500 + ), + language="hi", + condition_on_previous_text=True, # Better context for Hindi + initial_prompt="ट्रक, ड्राइवर, सड़क, गाड़ी" # Domain-specific Hindi prompt + ) + else: + segments, info = self.whisper.transcribe( + audio, + beam_size=1, + vad_filter=True, + language="en", + condition_on_previous_text=False + ) + + text = " ".join(seg.text for seg in segments).strip() + + # Display with proper language prefix + print(f"You said: {text}\n") + + return text + except Exception as e: + print(f"❌ Transcription error: {e}") + return "" + + # ========== VOICE CHAT PIPELINE ========== + def voice_chat(self): + audio = self.record_audio() + + if audio is None: + print("Recording failed. Try again.\n") + return + + text = self.speech_to_text(audio) + + if not text: + print("No speech detected. Try again.\n") + return + + self.chat(text) + + # ========== ENHANCED LLaMA CHAT WITH BETTER HINDI SUPPORT ========== + def chat(self, prompt, stream=True): + url = f"{self.base_url}/api/chat" + + # Prepare messages with enhanced system prompt + messages = [{"role": "system", "content": self.system_prompts[self.language]}] + messages.extend(self.conversation_history) + messages.append({"role": "user", "content": prompt}) + + # ENHANCED: Better parameters for Hindi generation + if self.language == "hi": + options = { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 200, # Slightly more for Hindi explanations + "num_ctx": 2048, + "repeat_penalty": 1.1, # Reduce repetition in Hindi + "stop": ["```", "---"] # Stop tokens + } + else: + options = { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 150, + "num_ctx": 2048 + } + + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": options + } + + print(f"\nAssistant: ", end="", flush=True) + + start_time = time.time() + full_response = "" + token_count = 0 + + try: + response = requests.post(url, json=payload, stream=True, timeout=30) + + if stream: + sentence_buffer = "" + + for line in response.iter_lines(): + if not line: + continue + + chunk = json.loads(line) + + if "message" in chunk and "content" in chunk["message"]: + content = chunk["message"]["content"] + + print(content, end="", flush=True) + + full_response += content + sentence_buffer += content + token_count += 1 + + # ENHANCED: Better sentence detection for Hindi + if self.language == "hi": + # Hindi sentence endings: । (purna viram), ? and ! + if any(sentence_buffer.endswith(p) for p in ["।", "?", "!", ".", ","]): + stripped = sentence_buffer.strip() + if len(stripped) > 10: # Longer minimum for Hindi + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + else: + if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";"]): + stripped = sentence_buffer.strip() + if len(stripped) > 5: + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + + if sentence_buffer.strip(): + self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) + + else: + data = response.json() + full_response = data["message"]["content"] + print(full_response) + self.tts_queue.put({"text": full_response, "lang": self.language}) + + inference_time = time.time() - start_time + tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 + print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") + + self.conversation_history.append({"role": "user", "content": prompt}) + self.conversation_history.append({"role": "assistant", "content": full_response}) + + # Wait for TTS queue to be empty (all speech completed) + print("\n[Waiting for speech to complete...]") + while not self.tts_queue.empty(): + time.sleep(0.1) + + # Additional small delay to ensure the last audio finishes playing + time.sleep(0.5) + + return full_response + + except Exception as e: + print(f"\n❌ Error: {e}") + return None + + # ========== CLEANUP ========== + def stop(self): + self.tts_queue.put("__EXIT__") + self.tts_process.terminate() + + +# -------------------------------------------------------------- +# MAIN +# -------------------------------------------------------------- +def main(): + print("\n🚀 Truck Assistant - Raspberry Pi 5") + print("🎤 Natural Human Voice (Google TTS)") + print("🌍 Multilingual Support (English & Hindi)\n") + + # Language selection + print("Select Language:") + print("1. English") + print("2. Hindi (ENHANCED)") + + lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" + language = "en" if lang_choice == "1" else "hi" + + # Simple voice selection + print("\nSelect Voice:") + print("1. Female (Natural)") + print("2. Male (Natural)") + + voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" + voice_gender = "female" if voice_choice == "1" else "male" + + lang_display = "English" if language == "en" else "Hindi (ENHANCED)" + print(f"\n✅ Language: {lang_display}") + print(f"✅ Voice: {voice_gender.capitalize()}") + + if language == "hi": + print("\n🔥 Hindi Enhancements:") + print(" • Better speech recognition (Whisper 'small' model)") + print(" • Improved pronunciation and pacing") + print(" • Enhanced LLM responses in Hindi") + print(" • Longer pause detection for natural speech") + + print("\n🔥 Installing dependencies if needed...\n") + + assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) + + # Check Ollama + try: + requests.get("http://localhost:11434/api/tags", timeout=5) + print("✅ Ollama running\n") + except: + print("❌ Ollama not running. Start with: ollama serve\n") + return + + print("="*60) + print("Mode:") + print("1. Demo") + print("2. Text chat") + print("3. Voice chat") + print("="*60) + + mode = input("\nSelect (1-3): ").strip() + + if mode == "3": + print("\n🎤 VOICE MODE - Press Enter to speak, Ctrl+C to exit\n") + if language == "hi": + print("💡 Tip: Speak clearly and at a moderate pace for best results\n") + + try: + while True: + input("Press Enter to speak...") + assistant.voice_chat() + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + print("\n\n👋 Exiting gracefully...") + assistant.stop() + print("Goodbye!") + else: + print("\n💬 TEXT MODE - type 'quit' to exit\n") + + try: + while True: + user_input = input("You: ").strip() + if user_input.lower() in ["quit", "exit", "q"]: + assistant.stop() + print("\n👋 Goodbye!") + break + if user_input: + assistant.chat(user_input) + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + print("\n\n👋 Exiting gracefully...") + assistant.stop() + print("Goodbye!") + + +if __name__ == "__main__": + main() \ No newline at end of file