#!/usr/bin/env python3 """ Truck HPC AI Assistant - POC Demo (OPTIMIZED + TRULY NATURAL VOICE + HINDI SUPPORT) Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival NATURAL VOICE: Downloads and uses better quality voices OFFLINE: 100% offline capability MULTILINGUAL: English and Hindi support """ import requests import json import time import psutil import sounddevice as sd import numpy as np import subprocess import os import re import tempfile import wave from multiprocessing import Process, Queue from faster_whisper import WhisperModel from datetime import datetime # -------------------------------------------------------------- # TEXT CLEANING FUNCTION # -------------------------------------------------------------- def clean_text_for_speech(text): """Removes markdown formatting and special characters""" text = re.sub(r'#{1,6}\s*', '', text) text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) text = re.sub(r'\*(.+?)\*', r'\1', text) text = re.sub(r'_(.+?)_', r'\1', text) text = re.sub(r'```[\w]*\n', '', text) text = re.sub(r'```', '', text) text = re.sub(r'`(.+?)`', r'\1', text) text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) text = re.sub(r'\s+', ' ', text) return text.strip() # -------------------------------------------------------------- # GTTS CACHED TTS WORKER (Natural Voice with Local Cache + Hindi Support) # -------------------------------------------------------------- def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): """ Uses gTTS with local caching for natural voice. First run needs internet to download, then works offline. Supports English and Hindi. """ try: from gtts import gTTS import hashlib # Create cache directory cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") os.makedirs(cache_dir, exist_ok=True) lang_name = "English" if language == "en" else "Hindi" print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") print("💡 First run needs internet, then works offline from cache\n") while True: data = tts_queue.get() if data == "__EXIT__": break try: # Support for language switching if isinstance(data, dict): text = data['text'] current_lang = data.get('lang', language) else: text = data current_lang = language clean_text = clean_text_for_speech(text) if not clean_text: continue # Create hash for caching (include language in hash) text_hash = hashlib.md5(f"{current_lang}_{clean_text}".encode()).hexdigest() cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") # Check if cached if not os.path.exists(cache_file): # Generate with gTTS (needs internet first time) if current_lang == "en": tld = "co.uk" if voice_gender == "female" else "com" tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) else: # Hindi tts = gTTS(text=clean_text, lang='hi', slow=False) tts.save(cache_file) # Play using mpg123 (faster than converting to WAV) subprocess.run(['mpg123', '-q', cache_file], check=True) # Natural pause if clean_text.endswith(("?", "!")): time.sleep(0.15) elif clean_text.endswith("."): time.sleep(0.10) else: time.sleep(0.05) except Exception as e: print(f"[TTS ERROR] {e}") # Fallback to espeak if gTTS fails try: if isinstance(data, dict): subprocess.run(['espeak-ng', '-v', 'hi' if data.get('lang') == 'hi' else 'en', clean_text], check=True, capture_output=True) else: subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) except: pass except ImportError: print("\n❌ gTTS not installed. Install with: pip install gtts") print("Falling back to espeak-ng...\n") espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) # -------------------------------------------------------------- # ESPEAK-NG TTS WORKER (Fallback) # -------------------------------------------------------------- def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): """Fallback to eSpeak-NG with Hindi support""" try: subprocess.run(['espeak-ng', '--version'], capture_output=True, text=True, timeout=2, check=True) except: print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") return lang_name = "English" if language == "en" else "Hindi" print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") while True: data = tts_queue.get() if data == "__EXIT__": break try: # Support for language switching if isinstance(data, dict): text = data['text'] current_lang = data.get('lang', language) else: text = data current_lang = language clean_text = clean_text_for_speech(text) if not clean_text: continue espeak_voice = 'hi' if current_lang == 'hi' else voice subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', '175', clean_text], check=True, capture_output=True) if clean_text.endswith(("?", "!")): time.sleep(0.15) elif clean_text.endswith("."): time.sleep(0.10) else: time.sleep(0.05) except Exception as e: print(f"[TTS ERROR] {e}") # -------------------------------------------------------------- # MAIN ASSISTANT CLASS # -------------------------------------------------------------- class TruckAssistant: def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", voice_gender="female", use_gtts=True, language="en"): self.model = model self.base_url = base_url self.conversation_history = [] self.language = language # Language-specific system prompts self.system_prompts = { "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", "hi": "आप ट्रक ड्राइवरों के लिए एक सहायक AI सहायक हैं। स्पष्ट, संक्षिप्त और व्यावहारिक उत्तर प्रदान करें। कृपया हिंदी में उत्तर दें।" } whisper_model = "tiny" if language == "hi" else "tiny.en" print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") self.whisper = WhisperModel( whisper_model, device="cpu", compute_type="int8", num_workers=2 ) # TTS queue + process self.tts_queue = Queue() if use_gtts: self.tts_process = Process( target=gtts_tts_worker, args=(self.tts_queue, voice_gender, language), daemon=True ) else: voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" self.tts_process = Process( target=espeak_tts_worker, args=(self.tts_queue, voice, language), daemon=True ) self.tts_process.start() # ========== ADAPTIVE MIC RECORDING WITH VAD ========== def record_audio(self, max_duration=5, samplerate=16000): """Records audio with Voice Activity Detection""" print("\nListening... Speak now.\n") silence_threshold = 0.01 silence_duration = 1.5 chunk_size = int(0.1 * samplerate) max_chunks = int(max_duration / 0.1) audio_chunks = [] silent_chunks = 0 speech_detected = False stream = sd.InputStream(samplerate=samplerate, channels=1, dtype='float32') stream.start() for i in range(max_chunks): chunk, _ = stream.read(chunk_size) audio_chunks.append(chunk) energy = np.sqrt(np.mean(chunk**2)) if energy > silence_threshold: speech_detected = True silent_chunks = 0 elif speech_detected: silent_chunks += 1 if silent_chunks > (silence_duration / 0.1): print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") break stream.stop() stream.close() audio = np.concatenate(audio_chunks, axis=0).flatten() return audio # ========== OPTIMIZED STT ========== def speech_to_text(self, audio): """Faster transcription with optimized parameters""" print("Converting speech to text...") lang_code = "hi" if self.language == "hi" else "en" segments, info = self.whisper.transcribe( audio, beam_size=1, vad_filter=True, language=lang_code, condition_on_previous_text=False ) text = " ".join(seg.text for seg in segments).strip() print(f"You said: {text}\n") return text # ========== VOICE CHAT PIPELINE ========== def voice_chat(self): audio = self.record_audio() text = self.speech_to_text(audio) if not text: print("No speech detected. Try again.\n") return self.chat(text) # ========== OPTIMIZED LLaMA CHAT WITH LIVE TTS ========== def chat(self, prompt, stream=True): url = f"{self.base_url}/api/chat" # Prepare messages with system prompt messages = [{"role": "system", "content": self.system_prompts[self.language]}] messages.extend(self.conversation_history) messages.append({"role": "user", "content": prompt}) payload = { "model": self.model, "messages": messages, "stream": stream, "options": { "temperature": 0.7, "top_p": 0.9, "num_predict": 150, "num_ctx": 2048 } } print(f"\nAssistant: ", end="", flush=True) start_time = time.time() full_response = "" token_count = 0 try: response = requests.post(url, json=payload, stream=True, timeout=30) if stream: sentence_buffer = "" for line in response.iter_lines(): if not line: continue chunk = json.loads(line) if "message" in chunk and "content" in chunk["message"]: content = chunk["message"]["content"] print(content, end="", flush=True) full_response += content sentence_buffer += content token_count += 1 # Sentence end detection (works for both English and Hindi) if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";", "।", "?"]): stripped = sentence_buffer.strip() if len(stripped) > 5: self.tts_queue.put({"text": stripped, "lang": self.language}) sentence_buffer = "" if sentence_buffer.strip(): self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) else: data = response.json() full_response = data["message"]["content"] print(full_response) self.tts_queue.put({"text": full_response, "lang": self.language}) inference_time = time.time() - start_time tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") self.conversation_history.append({"role": "user", "content": prompt}) self.conversation_history.append({"role": "assistant", "content": full_response}) return full_response except Exception as e: print(f"\n❌ Error: {e}") return None # ========== CLEANUP ========== def stop(self): self.tts_queue.put("__EXIT__") self.tts_process.terminate() # -------------------------------------------------------------- # MAIN # -------------------------------------------------------------- def main(): print("\n🚀 Truck Assistant - Raspberry Pi 5") print("🎤 Natural Human Voice (Google TTS)") print("🌐 Multilingual Support (English & Hindi)\n") # Language selection print("Select Language:") print("1. English") print("2. Hindi (हिंदी)") lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" language = "en" if lang_choice == "1" else "hi" # Simple voice selection print("\nSelect Voice:") print("1. Female (Natural)") print("2. Male (Natural)") voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" voice_gender = "female" if voice_choice == "1" else "male" lang_display = "English" if language == "en" else "हिंदी" print(f"\n✅ Language: {lang_display}") print(f"✅ Voice: {voice_gender.capitalize()}") print("📥 Installing dependencies if needed...\n") assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) # Check Ollama try: requests.get("http://localhost:11434/api/tags", timeout=5) print("✅ Ollama running\n") except: print("❌ Ollama not running. Start with: ollama serve\n") return print("="*60) print("Mode:") print("1. Demo") print("2. Text chat") print("3. Voice chat") print("="*60) mode = input("\nSelect (1-3): ").strip() if mode == "3": print("\n🎤 VOICE MODE - Press Enter to speak\n") while True: input("Press Enter...") assistant.voice_chat() else: print("\n💬 TEXT MODE - type 'quit' to exit\n") while True: user_input = input("You: ").strip() if user_input.lower() in ["quit", "exit", "q"]: assistant.stop() print("\n👋 Goodbye!") break if user_input: assistant.chat(user_input) if __name__ == "__main__": main()