#!/usr/bin/env python3 """ Truck HPC AI Assistant - POC Demo (ENHANCED HINDI SUPPORT) Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival ENHANCED: Better Hindi speech recognition and synthesis NATURAL VOICE: Downloads and uses better quality voices OFFLINE: 100% offline capability MULTILINGUAL: English and Hindi support FIXED: Auto-detects correct audio sample rate FIXED: Proper loop control - waits for speech to complete before next input """ import requests import json import time import psutil import sounddevice as sd import numpy as np import subprocess import os import re import tempfile import wave from multiprocessing import Process, Queue from faster_whisper import WhisperModel from datetime import datetime # -------------------------------------------------------------- # AUDIO DEVICE DETECTION # -------------------------------------------------------------- def get_default_samplerate(): """Detect the default sample rate supported by the input device""" try: device_info = sd.query_devices(kind='input') default_sr = int(device_info['default_samplerate']) print(f"ЁЯОд Detected audio device: {device_info['name']}") print(f"ЁЯО╡ Using sample rate: {default_sr} Hz") return default_sr except Exception as e: print(f"тЪая╕П Could not detect sample rate, using 44100 Hz: {e}") return 44100 # -------------------------------------------------------------- # TEXT CLEANING FUNCTION # -------------------------------------------------------------- def clean_text_for_speech(text): """Removes markdown formatting and special characters""" text = re.sub(r'#{1,6}\s*', '', text) text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) text = re.sub(r'\*(.+?)\*', r'\1', text) text = re.sub(r'_(.+?)_', r'\1', text) text = re.sub(r'```[\w]*\n', '', text) text = re.sub(r'```', '', text) text = re.sub(r'`(.+?)`', r'\1', text) text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) text = re.sub(r'\s+', ' ', text) return text.strip() # -------------------------------------------------------------- # GTTS CACHED TTS WORKER (Enhanced Hindi Support) # -------------------------------------------------------------- def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): """ Uses gTTS with local caching for natural voice. ENHANCED: Better Hindi voice quality and pronunciation First run needs internet to download, then works offline. Supports English and Hindi. """ try: from gtts import gTTS import hashlib # Create cache directory cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") os.makedirs(cache_dir, exist_ok=True) lang_name = "English" if language == "en" else "Hindi" print(f"тЬЕ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") print("ЁЯТб First run needs internet, then works offline from cache\n") while True: data = tts_queue.get() if data == "__EXIT__": break try: # Support for language switching if isinstance(data, dict): text = data['text'] current_lang = data.get('lang', language) else: text = data current_lang = language clean_text = clean_text_for_speech(text) if not clean_text: continue # Create hash for caching (include language in hash) text_hash = hashlib.md5(f"{current_lang}_{voice_gender}_{clean_text}".encode()).hexdigest() cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") # Check if cached if not os.path.exists(cache_file): # Generate with gTTS (needs internet first time) if current_lang == "en": tld = "co.uk" if voice_gender == "female" else "com" tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) else: # Hindi - ENHANCED # Use slower speed for better Hindi pronunciation tts = gTTS(text=clean_text, lang='hi', slow=False) tts.save(cache_file) # Play using mpg123 (faster than converting to WAV) subprocess.run(['mpg123', '-q', cache_file], check=True) # Natural pause - adjusted for Hindi if current_lang == "hi": # Hindi needs slightly longer pauses for better comprehension if clean_text.endswith(("?", "!", "ред")): time.sleep(0.25) elif clean_text.endswith("."): time.sleep(0.20) else: time.sleep(0.08) else: if clean_text.endswith(("?", "!")): time.sleep(0.15) elif clean_text.endswith("."): time.sleep(0.10) else: time.sleep(0.05) except Exception as e: print(f"[TTS ERROR] {e}") # Fallback to espeak if gTTS fails try: if isinstance(data, dict): lang_voice = 'hi' if data.get('lang') == 'hi' else 'en' # Use slower speed for Hindi in espeak as well speed = '150' if lang_voice == 'hi' else '175' subprocess.run(['espeak-ng', '-v', lang_voice, '-s', speed, clean_text], check=True, capture_output=True) else: subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) except: pass except ImportError: print("\nтЭМ gTTS not installed. Install with: pip install gtts") print("Falling back to espeak-ng...\n") espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) # -------------------------------------------------------------- # ESPEAK-NG TTS WORKER (Fallback with Enhanced Hindi) # -------------------------------------------------------------- def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): """Fallback to eSpeak-NG with enhanced Hindi support""" try: subprocess.run(['espeak-ng', '--version'], capture_output=True, text=True, timeout=2, check=True) except: print("\nтЭМ eSpeak-NG not found! Install with: sudo apt install espeak-ng") return lang_name = "English" if language == "en" else "Hindi" print(f"тЬЕ Using eSpeak-NG ({voice} voice, {lang_name})\n") while True: data = tts_queue.get() if data == "__EXIT__": break try: # Support for language switching if isinstance(data, dict): text = data['text'] current_lang = data.get('lang', language) else: text = data current_lang = language clean_text = clean_text_for_speech(text) if not clean_text: continue espeak_voice = 'hi' if current_lang == 'hi' else voice # Slower speed for Hindi for better pronunciation speed = '150' if current_lang == 'hi' else '175' subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', speed, clean_text], check=True, capture_output=True) # Adjusted pauses for Hindi if current_lang == 'hi': if clean_text.endswith(("?", "!", "ред")): time.sleep(0.25) elif clean_text.endswith("."): time.sleep(0.20) else: time.sleep(0.08) else: if clean_text.endswith(("?", "!")): time.sleep(0.15) elif clean_text.endswith("."): time.sleep(0.10) else: time.sleep(0.05) except Exception as e: print(f"[TTS ERROR] {e}") # -------------------------------------------------------------- # AUDIO RESAMPLING FUNCTION # -------------------------------------------------------------- def resample_audio(audio, orig_sr, target_sr=16000): """Resample audio to target sample rate for Whisper""" if orig_sr == target_sr: return audio # Simple resampling using linear interpolation duration = len(audio) / orig_sr target_length = int(duration * target_sr) from scipy import signal resampled = signal.resample(audio, target_length) return resampled.astype(np.float32) # -------------------------------------------------------------- # MAIN ASSISTANT CLASS (ENHANCED HINDI SUPPORT) # -------------------------------------------------------------- class TruckAssistant: def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", voice_gender="female", use_gtts=True, language="en"): self.model = model self.base_url = base_url self.conversation_history = [] self.language = language # Detect and store the device's native sample rate self.native_samplerate = get_default_samplerate() self.whisper_samplerate = 16000 # Whisper expects 16kHz # ENHANCED: Better system prompts for Hindi self.system_prompts = { "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", "hi": """рдЖрдк рдЯреНрд░рдХ рдбреНрд░рд╛рдЗрд╡рд░реЛрдВ рдХреЗ рд▓рд┐рдП рдПрдХ рд╕рд╣рд╛рдпрдХ AI рд╕рд╣рд╛рдпрдХ рд╣реИрдВред рд╕реНрдкрд╖реНрдЯ, рд╕рдВрдХреНрд╖рд┐рдкреНрдд рдФрд░ рд╡реНрдпрд╛рд╡рд╣рд╛рд░рд┐рдХ рдЙрддреНрддрд░ рдкреНрд░рджрд╛рди рдХрд░реЗрдВред рдХреГрдкрдпрд╛ рдХреЗрд╡рд▓ рд╣рд┐рдВрджреА рдореЗрдВ рдЙрддреНрддрд░ рджреЗрдВред рд╕рд░рд▓ рдФрд░ рд╕рдордЭрдиреЗ рдпреЛрдЧреНрдп рднрд╛рд╖рд╛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВред рддрдХрдиреАрдХреА рд╢рдмреНрджреЛрдВ рдХреЛ рд╕рд░рд▓ рд╣рд┐рдВрджреА рдореЗрдВ рд╕рдордЭрд╛рдПрдВред""" } # ENHANCED: Use larger Whisper model for Hindi for better accuracy if language == "hi": whisper_model = "small" # Better for Hindi than tiny print(f"Loading Whisper model ({whisper_model} - Enhanced for Hindi accuracy)...") compute_type = "int8" # Balanced performance else: whisper_model = "tiny.en" print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") compute_type = "int8" self.whisper = WhisperModel( whisper_model, device="cpu", compute_type=compute_type, num_workers=2 ) # TTS queue + process self.tts_queue = Queue() if use_gtts: self.tts_process = Process( target=gtts_tts_worker, args=(self.tts_queue, voice_gender, language), daemon=True ) else: voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" self.tts_process = Process( target=espeak_tts_worker, args=(self.tts_queue, voice, language), daemon=True ) self.tts_process.start() # ========== ENHANCED MIC RECORDING WITH BETTER VAD FOR HINDI ========== def record_audio(self, max_duration=8): """Records audio with Voice Activity Detection - Enhanced for Hindi""" print("\nЁЯОд рд╕реБрди рд░рд╣рд╛ рд╣реВрдБ... рдЕрдм рдмреЛрд▓реЗрдВред / Listening... Speak now.\n") # Adjusted thresholds for better Hindi detection silence_threshold = 0.008 # Slightly lower for Hindi consonants silence_duration = 2.0 if self.language == "hi" else 1.5 # Longer for Hindi chunk_size = int(0.1 * self.native_samplerate) max_chunks = int(max_duration / 0.1) audio_chunks = [] silent_chunks = 0 speech_detected = False try: stream = sd.InputStream( samplerate=self.native_samplerate, channels=1, dtype='float32' ) stream.start() for i in range(max_chunks): chunk, _ = stream.read(chunk_size) audio_chunks.append(chunk) energy = np.sqrt(np.mean(chunk**2)) if energy > silence_threshold: speech_detected = True silent_chunks = 0 elif speech_detected: silent_chunks += 1 if silent_chunks > (silence_duration / 0.1): print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") break stream.stop() stream.close() audio = np.concatenate(audio_chunks, axis=0).flatten() # Resample to 16kHz for Whisper if self.native_samplerate != self.whisper_samplerate: print(f"Resampling audio from {self.native_samplerate}Hz to {self.whisper_samplerate}Hz...") audio = resample_audio(audio, self.native_samplerate, self.whisper_samplerate) return audio except Exception as e: print(f"тЭМ Recording error: {e}") return None # ========== ENHANCED STT FOR HINDI ========== def speech_to_text(self, audio): """Enhanced transcription with better Hindi support""" if audio is None: return "" status_msg = "рдмреЛрд▓реА рдХреЛ рдЯреЗрдХреНрд╕реНрдЯ рдореЗрдВ рдмрджрд▓ рд░рд╣реЗ рд╣реИрдВ..." if self.language == "hi" else "Converting speech to text..." print(status_msg) lang_code = "hi" if self.language == "hi" else "en" try: # ENHANCED: Better parameters for Hindi recognition if self.language == "hi": segments, info = self.whisper.transcribe( audio, beam_size=5, # Higher beam size for better Hindi accuracy vad_filter=True, vad_parameters=dict( threshold=0.3, # Lower threshold for Hindi min_speech_duration_ms=100, min_silence_duration_ms=500 ), language="hi", condition_on_previous_text=True, # Better context for Hindi initial_prompt="рдЯреНрд░рдХ, рдбреНрд░рд╛рдЗрд╡рд░, рд╕рдбрд╝рдХ, рдЧрд╛рдбрд╝реА" # Domain-specific Hindi prompt ) else: segments, info = self.whisper.transcribe( audio, beam_size=1, vad_filter=True, language="en", condition_on_previous_text=False ) text = " ".join(seg.text for seg in segments).strip() # Display with proper language prefix prefix = "рдЖрдкрдиреЗ рдХрд╣рд╛:" if self.language == "hi" else "You said:" print(f"{prefix} {text}\n") return text except Exception as e: print(f"тЭМ Transcription error: {e}") return "" # ========== VOICE CHAT PIPELINE ========== def voice_chat(self): audio = self.record_audio() if audio is None: msg = "рд░рд┐рдХреЙрд░реНрдбрд┐рдВрдЧ рд╡рд┐рдлрд▓ред рдкреБрдирдГ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВред" if self.language == "hi" else "Recording failed. Try again." print(f"{msg}\n") return text = self.speech_to_text(audio) if not text: msg = "рдХреЛрдИ рдмреЛрд▓реА рдирд╣реАрдВ рд╕реБрдиреАред рдкреБрдирдГ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВред" if self.language == "hi" else "No speech detected. Try again." print(f"{msg}\n") return self.chat(text) # ========== ENHANCED LLaMA CHAT WITH BETTER HINDI SUPPORT ========== def chat(self, prompt, stream=True): url = f"{self.base_url}/api/chat" # Prepare messages with enhanced system prompt messages = [{"role": "system", "content": self.system_prompts[self.language]}] messages.extend(self.conversation_history) messages.append({"role": "user", "content": prompt}) # ENHANCED: Better parameters for Hindi generation if self.language == "hi": options = { "temperature": 0.7, "top_p": 0.9, "num_predict": 200, # Slightly more for Hindi explanations "num_ctx": 2048, "repeat_penalty": 1.1, # Reduce repetition in Hindi "stop": ["```", "---"] # Stop tokens } else: options = { "temperature": 0.7, "top_p": 0.9, "num_predict": 150, "num_ctx": 2048 } payload = { "model": self.model, "messages": messages, "stream": stream, "options": options } assistant_prefix = "рд╕рд╣рд╛рдпрдХ:" if self.language == "hi" else "Assistant:" print(f"\n{assistant_prefix} ", end="", flush=True) start_time = time.time() full_response = "" token_count = 0 try: response = requests.post(url, json=payload, stream=True, timeout=30) if stream: sentence_buffer = "" for line in response.iter_lines(): if not line: continue chunk = json.loads(line) if "message" in chunk and "content" in chunk["message"]: content = chunk["message"]["content"] print(content, end="", flush=True) full_response += content sentence_buffer += content token_count += 1 # ENHANCED: Better sentence detection for Hindi if self.language == "hi": # Hindi sentence endings: ред (purna viram), ? and ! if any(sentence_buffer.endswith(p) for p in ["ред", "?", "!", ".", ","]): stripped = sentence_buffer.strip() if len(stripped) > 10: # Longer minimum for Hindi self.tts_queue.put({"text": stripped, "lang": self.language}) sentence_buffer = "" else: if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";"]): stripped = sentence_buffer.strip() if len(stripped) > 5: self.tts_queue.put({"text": stripped, "lang": self.language}) sentence_buffer = "" if sentence_buffer.strip(): self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) else: data = response.json() full_response = data["message"]["content"] print(full_response) self.tts_queue.put({"text": full_response, "lang": self.language}) inference_time = time.time() - start_time tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 print(f"\n\nтЪб Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") self.conversation_history.append({"role": "user", "content": prompt}) self.conversation_history.append({"role": "assistant", "content": full_response}) # Wait for TTS queue to be empty (all speech completed) wait_msg = "[рднрд╛рд╖рдг рдкреВрд░реНрдг рд╣реЛрдиреЗ рдХреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдореЗрдВ...]" if self.language == "hi" else "[Waiting for speech to complete...]" print(f"\n{wait_msg}") while not self.tts_queue.empty(): time.sleep(0.1) # Additional small delay to ensure the last audio finishes playing time.sleep(0.5) return full_response except Exception as e: print(f"\nтЭМ Error: {e}") return None # ========== CLEANUP ========== def stop(self): self.tts_queue.put("__EXIT__") self.tts_process.terminate() # -------------------------------------------------------------- # MAIN # -------------------------------------------------------------- def main(): print("\nЁЯЪА Truck Assistant - Raspberry Pi 5") print("ЁЯОд Natural Human Voice (Google TTS)") print("ЁЯМН Enhanced Hindi Support\n") # Language selection print("Select Language / рднрд╛рд╖рд╛ рдЪреБрдиреЗрдВ:") print("1. English") print("2. Hindi (рд╣рд┐рдВрджреА) - ENHANCED") lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" language = "en" if lang_choice == "1" else "hi" # Simple voice selection print("\nSelect Voice / рдЖрд╡рд╛рдЬрд╝ рдЪреБрдиреЗрдВ:") print("1. Female (Natural / рдорд╣рд┐рд▓рд╛)") print("2. Male (Natural / рдкреБрд░реБрд╖)") voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" voice_gender = "female" if voice_choice == "1" else "male" lang_display = "English" if language == "en" else "рд╣рд┐рдВрджреА (ENHANCED)" print(f"\nтЬЕ Language: {lang_display}") print(f"тЬЕ Voice: {voice_gender.capitalize()}") if language == "hi": print("\nЁЯФе Hindi Enhancements:") print(" тАв Better speech recognition (Whisper 'small' model)") print(" тАв Improved pronunciation and pacing") print(" тАв Enhanced LLM responses in Hindi") print(" тАв Longer pause detection for natural speech") print("\nЁЯФе Installing dependencies if needed...\n") assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) # Check Ollama try: requests.get("http://localhost:11434/api/tags", timeout=5) print("тЬЕ Ollama running\n") except: print("тЭМ Ollama not running. Start with: ollama serve\n") return print("="*60) if language == "hi": print("рдореЛрдб рдЪреБрдиреЗрдВ:") print("1. рдбреЗрдореЛ") print("2. рдЯреЗрдХреНрд╕реНрдЯ рдЪреИрдЯ") print("3. рд╡реЙрдпрд╕ рдЪреИрдЯ (рдмреЗрд╣рддрд░ рд╣рд┐рдВрджреА рд╕рдорд░реНрдерди)") else: print("Mode:") print("1. Demo") print("2. Text chat") print("3. Voice chat") print("="*60) mode = input("\nSelect (1-3): ").strip() if mode == "3": if language == "hi": print("\nЁЯОд рд╡реЙрдпрд╕ рдореЛрдб - рдмреЛрд▓рдиреЗ рдХреЗ рд▓рд┐рдП Enter рджрдмрд╛рдПрдВ, рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХреЗ рд▓рд┐рдП Ctrl+C\n") print("ЁЯТб рдЯрд┐рдк: рд╕реНрдкрд╖реНрдЯ рд░реВрдк рд╕реЗ рдФрд░ рдереЛрдбрд╝реА рдзреАрдореА рдЧрддрд┐ рд╕реЗ рдмреЛрд▓реЗрдВ\n") else: print("\nЁЯОд VOICE MODE - Press Enter to speak, Ctrl+C to exit\n") try: while True: prompt_msg = "рдмреЛрд▓рдиреЗ рдХреЗ рд▓рд┐рдП Enter рджрдмрд╛рдПрдВ..." if language == "hi" else "Press Enter to speak..." input(prompt_msg) assistant.voice_chat() print("\n" + "="*60 + "\n") except KeyboardInterrupt: bye_msg = "\n\nЁЯСЛ рдзрдиреНрдпрд╡рд╛рдж! рдлрд┐рд░ рдорд┐рд▓реЗрдВрдЧреЗ..." if language == "hi" else "\n\nЁЯСЛ Exiting gracefully..." print(bye_msg) assistant.stop() print("Goodbye! / рдЕрд▓рд╡рд┐рджрд╛!") else: if language == "hi": print("\nЁЯТм рдЯреЗрдХреНрд╕реНрдЯ рдореЛрдб - рдмрд╛рд╣рд░ рдирд┐рдХрд▓рдиреЗ рдХреЗ рд▓рд┐рдП 'quit' рд▓рд┐рдЦреЗрдВ\n") else: print("\nЁЯТм TEXT MODE - type 'quit' to exit\n") try: while True: prompt_txt = "рдЖрдк: " if language == "hi" else "You: " user_input = input(prompt_txt).strip() if user_input.lower() in ["quit", "exit", "q", "рдмрд╛рд╣рд░", "рдмрдВрдж"]: assistant.stop() bye_msg = "\nЁЯСЛ рдзрдиреНрдпрд╡рд╛рдж! рдлрд┐рд░ рдорд┐рд▓реЗрдВрдЧреЗ!" if language == "hi" else "\nЁЯСЛ Goodbye!" print(bye_msg) break if user_input: assistant.chat(user_input) print("\n" + "="*60 + "\n") except KeyboardInterrupt: bye_msg = "\n\nЁЯСЛ рдзрдиреНрдпрд╡рд╛рдж! рдлрд┐рд░ рдорд┐рд▓реЗрдВрдЧреЗ..." if language == "hi" else "\n\nЁЯСЛ Exiting gracefully..." print(bye_msg) assistant.stop() print("Goodbye! / рдЕрд▓рд╡рд┐рджрд╛!") if __name__ == "__main__": main()