From e39ed831e33c1dcf6038376fccdfceb1c48caf39 Mon Sep 17 00:00:00 2001 From: prakash Date: Wed, 26 Nov 2025 12:08:00 +0530 Subject: [PATCH] first commit --- ai_asistant.py | 446 +++++++++++++++++++++++++++++++++++++++ ai_asistant1.py | 520 ++++++++++++++++++++++++++++++++++++++++++++++ ai_assistant2.py | 531 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1497 insertions(+) create mode 100644 ai_asistant.py create mode 100644 ai_asistant1.py create mode 100644 ai_assistant2.py diff --git a/ai_asistant.py b/ai_asistant.py new file mode 100644 index 0000000..a98d207 --- /dev/null +++ b/ai_asistant.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 +""" +Truck HPC AI Assistant - POC Demo (OPTIMIZED + TRULY NATURAL VOICE + HINDI SUPPORT) +Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival +NATURAL VOICE: Downloads and uses better quality voices +OFFLINE: 100% offline capability +MULTILINGUAL: English and Hindi support +""" + +import requests +import json +import time +import psutil +import sounddevice as sd +import numpy as np +import subprocess +import os +import re +import tempfile +import wave +from multiprocessing import Process, Queue +from faster_whisper import WhisperModel +from datetime import datetime + +# -------------------------------------------------------------- +# TEXT CLEANING FUNCTION +# -------------------------------------------------------------- +def clean_text_for_speech(text): + """Removes markdown formatting and special characters""" + text = re.sub(r'#{1,6}\s*', '', text) + text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = re.sub(r'__(.+?)__', r'\1', text) + text = re.sub(r'\*(.+?)\*', r'\1', text) + text = re.sub(r'_(.+?)_', r'\1', text) + text = re.sub(r'```[\w]*\n', '', text) + text = re.sub(r'```', '', text) + text = re.sub(r'`(.+?)`', r'\1', text) + text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) + text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +# -------------------------------------------------------------- +# GTTS CACHED TTS WORKER (Natural Voice with Local Cache + Hindi Support) +# -------------------------------------------------------------- +def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): + """ + Uses gTTS with local caching for natural voice. + First run needs internet to download, then works offline. + Supports English and Hindi. + """ + try: + from gtts import gTTS + import hashlib + + # Create cache directory + cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") + os.makedirs(cache_dir, exist_ok=True) + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") + print("💡 First run needs internet, then works offline from cache\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + # Create hash for caching (include language in hash) + text_hash = hashlib.md5(f"{current_lang}_{clean_text}".encode()).hexdigest() + cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") + + # Check if cached + if not os.path.exists(cache_file): + # Generate with gTTS (needs internet first time) + if current_lang == "en": + tld = "co.uk" if voice_gender == "female" else "com" + tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) + else: # Hindi + tts = gTTS(text=clean_text, lang='hi', slow=False) + + tts.save(cache_file) + + # Play using mpg123 (faster than converting to WAV) + subprocess.run(['mpg123', '-q', cache_file], check=True) + + # Natural pause + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + # Fallback to espeak if gTTS fails + try: + if isinstance(data, dict): + subprocess.run(['espeak-ng', '-v', 'hi' if data.get('lang') == 'hi' else 'en', clean_text], + check=True, capture_output=True) + else: + subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) + except: + pass + + except ImportError: + print("\n❌ gTTS not installed. Install with: pip install gtts") + print("Falling back to espeak-ng...\n") + espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) + + +# -------------------------------------------------------------- +# ESPEAK-NG TTS WORKER (Fallback) +# -------------------------------------------------------------- +def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): + """Fallback to eSpeak-NG with Hindi support""" + + try: + subprocess.run(['espeak-ng', '--version'], + capture_output=True, text=True, timeout=2, check=True) + except: + print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") + return + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + espeak_voice = 'hi' if current_lang == 'hi' else voice + subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', '175', clean_text], + check=True, capture_output=True) + + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + + +# -------------------------------------------------------------- +# MAIN ASSISTANT CLASS +# -------------------------------------------------------------- +class TruckAssistant: + def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", + voice_gender="female", use_gtts=True, language="en"): + self.model = model + self.base_url = base_url + self.conversation_history = [] + self.language = language + + # Language-specific system prompts + self.system_prompts = { + "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", + "hi": "आप ट्रक ड्राइवरों के लिए एक सहायक AI सहायक हैं। स्पष्ट, संक्षिप्त और व्यावहारिक उत्तर प्रदान करें। कृपया हिंदी में उत्तर दें।" + } + + whisper_model = "tiny" if language == "hi" else "tiny.en" + print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") + self.whisper = WhisperModel( + whisper_model, + device="cpu", + compute_type="int8", + num_workers=2 + ) + + # TTS queue + process + self.tts_queue = Queue() + + if use_gtts: + self.tts_process = Process( + target=gtts_tts_worker, + args=(self.tts_queue, voice_gender, language), + daemon=True + ) + else: + voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" + self.tts_process = Process( + target=espeak_tts_worker, + args=(self.tts_queue, voice, language), + daemon=True + ) + + self.tts_process.start() + + # ========== ADAPTIVE MIC RECORDING WITH VAD ========== + def record_audio(self, max_duration=5, samplerate=16000): + """Records audio with Voice Activity Detection""" + print("\nListening... Speak now.\n") + + silence_threshold = 0.01 + silence_duration = 1.5 + + chunk_size = int(0.1 * samplerate) + max_chunks = int(max_duration / 0.1) + + audio_chunks = [] + silent_chunks = 0 + speech_detected = False + + stream = sd.InputStream(samplerate=samplerate, channels=1, dtype='float32') + stream.start() + + for i in range(max_chunks): + chunk, _ = stream.read(chunk_size) + audio_chunks.append(chunk) + + energy = np.sqrt(np.mean(chunk**2)) + + if energy > silence_threshold: + speech_detected = True + silent_chunks = 0 + elif speech_detected: + silent_chunks += 1 + + if silent_chunks > (silence_duration / 0.1): + print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") + break + + stream.stop() + stream.close() + + audio = np.concatenate(audio_chunks, axis=0).flatten() + return audio + + # ========== OPTIMIZED STT ========== + def speech_to_text(self, audio): + """Faster transcription with optimized parameters""" + print("Converting speech to text...") + + lang_code = "hi" if self.language == "hi" else "en" + + segments, info = self.whisper.transcribe( + audio, + beam_size=1, + vad_filter=True, + language=lang_code, + condition_on_previous_text=False + ) + + text = " ".join(seg.text for seg in segments).strip() + print(f"You said: {text}\n") + return text + + # ========== VOICE CHAT PIPELINE ========== + def voice_chat(self): + audio = self.record_audio() + text = self.speech_to_text(audio) + + if not text: + print("No speech detected. Try again.\n") + return + + self.chat(text) + + # ========== OPTIMIZED LLaMA CHAT WITH LIVE TTS ========== + def chat(self, prompt, stream=True): + url = f"{self.base_url}/api/chat" + + # Prepare messages with system prompt + messages = [{"role": "system", "content": self.system_prompts[self.language]}] + messages.extend(self.conversation_history) + messages.append({"role": "user", "content": prompt}) + + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 150, + "num_ctx": 2048 + } + } + + print(f"\nAssistant: ", end="", flush=True) + + start_time = time.time() + full_response = "" + token_count = 0 + + try: + response = requests.post(url, json=payload, stream=True, timeout=30) + + if stream: + sentence_buffer = "" + + for line in response.iter_lines(): + if not line: + continue + + chunk = json.loads(line) + + if "message" in chunk and "content" in chunk["message"]: + content = chunk["message"]["content"] + + print(content, end="", flush=True) + + full_response += content + sentence_buffer += content + token_count += 1 + + # Sentence end detection (works for both English and Hindi) + if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";", "।", "?"]): + stripped = sentence_buffer.strip() + if len(stripped) > 5: + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + + if sentence_buffer.strip(): + self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) + + else: + data = response.json() + full_response = data["message"]["content"] + print(full_response) + self.tts_queue.put({"text": full_response, "lang": self.language}) + + inference_time = time.time() - start_time + tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 + print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") + + self.conversation_history.append({"role": "user", "content": prompt}) + self.conversation_history.append({"role": "assistant", "content": full_response}) + + return full_response + + except Exception as e: + print(f"\n❌ Error: {e}") + return None + + # ========== CLEANUP ========== + def stop(self): + self.tts_queue.put("__EXIT__") + self.tts_process.terminate() + + +# -------------------------------------------------------------- +# MAIN +# -------------------------------------------------------------- +def main(): + print("\n🚀 Truck Assistant - Raspberry Pi 5") + print("🎤 Natural Human Voice (Google TTS)") + print("🌐 Multilingual Support (English & Hindi)\n") + + # Language selection + print("Select Language:") + print("1. English") + print("2. Hindi (हिंदी)") + + lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" + language = "en" if lang_choice == "1" else "hi" + + # Simple voice selection + print("\nSelect Voice:") + print("1. Female (Natural)") + print("2. Male (Natural)") + + voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" + voice_gender = "female" if voice_choice == "1" else "male" + + lang_display = "English" if language == "en" else "हिंदी" + print(f"\n✅ Language: {lang_display}") + print(f"✅ Voice: {voice_gender.capitalize()}") + print("📥 Installing dependencies if needed...\n") + + assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) + + # Check Ollama + try: + requests.get("http://localhost:11434/api/tags", timeout=5) + print("✅ Ollama running\n") + except: + print("❌ Ollama not running. Start with: ollama serve\n") + return + + print("="*60) + print("Mode:") + print("1. Demo") + print("2. Text chat") + print("3. Voice chat") + print("="*60) + + mode = input("\nSelect (1-3): ").strip() + + if mode == "3": + print("\n🎤 VOICE MODE - Press Enter to speak\n") + while True: + input("Press Enter...") + assistant.voice_chat() + else: + print("\n💬 TEXT MODE - type 'quit' to exit\n") + while True: + user_input = input("You: ").strip() + if user_input.lower() in ["quit", "exit", "q"]: + assistant.stop() + print("\n👋 Goodbye!") + break + if user_input: + assistant.chat(user_input) + + +if __name__ == "__main__": + main() diff --git a/ai_asistant1.py b/ai_asistant1.py new file mode 100644 index 0000000..958436e --- /dev/null +++ b/ai_asistant1.py @@ -0,0 +1,520 @@ +#!/usr/bin/env python3 +""" +Truck HPC AI Assistant - POC Demo (OPTIMIZED + TRULY NATURAL VOICE + HINDI SUPPORT) +Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival +NATURAL VOICE: Downloads and uses better quality voices +OFFLINE: 100% offline capability +MULTILINGUAL: English and Hindi support +FIXED: Auto-detects correct audio sample rate +""" + +import requests +import json +import time +import psutil +import sounddevice as sd +import numpy as np +import subprocess +import os +import re +import tempfile +import wave +from multiprocessing import Process, Queue +from faster_whisper import WhisperModel +from datetime import datetime + +# -------------------------------------------------------------- +# AUDIO DEVICE DETECTION +# -------------------------------------------------------------- +def get_default_samplerate(): + """Detect the default sample rate supported by the input device""" + try: + device_info = sd.query_devices(kind='input') + default_sr = int(device_info['default_samplerate']) + print(f"🎤 Detected audio device: {device_info['name']}") + print(f"🎵 Using sample rate: {default_sr} Hz") + return default_sr + except Exception as e: + print(f"⚠️ Could not detect sample rate, using 44100 Hz: {e}") + return 44100 + +# -------------------------------------------------------------- +# TEXT CLEANING FUNCTION +# -------------------------------------------------------------- +def clean_text_for_speech(text): + """Removes markdown formatting and special characters""" + text = re.sub(r'#{1,6}\s*', '', text) + text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = re.sub(r'__(.+?)__', r'\1', text) + text = re.sub(r'\*(.+?)\*', r'\1', text) + text = re.sub(r'_(.+?)_', r'\1', text) + text = re.sub(r'```[\w]*\n', '', text) + text = re.sub(r'```', '', text) + text = re.sub(r'`(.+?)`', r'\1', text) + text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) + text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +# -------------------------------------------------------------- +# GTTS CACHED TTS WORKER (Natural Voice with Local Cache + Hindi Support) +# -------------------------------------------------------------- +def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): + """ + Uses gTTS with local caching for natural voice. + First run needs internet to download, then works offline. + Supports English and Hindi. + """ + try: + from gtts import gTTS + import hashlib + + # Create cache directory + cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") + os.makedirs(cache_dir, exist_ok=True) + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") + print("💡 First run needs internet, then works offline from cache\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + # Create hash for caching (include language in hash) + text_hash = hashlib.md5(f"{current_lang}_{clean_text}".encode()).hexdigest() + cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") + + # Check if cached + if not os.path.exists(cache_file): + # Generate with gTTS (needs internet first time) + if current_lang == "en": + tld = "co.uk" if voice_gender == "female" else "com" + tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) + else: # Hindi + tts = gTTS(text=clean_text, lang='hi', slow=False) + + tts.save(cache_file) + + # Play using mpg123 (faster than converting to WAV) + subprocess.run(['mpg123', '-q', cache_file], check=True) + + # Natural pause + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + # Fallback to espeak if gTTS fails + try: + if isinstance(data, dict): + subprocess.run(['espeak-ng', '-v', 'hi' if data.get('lang') == 'hi' else 'en', clean_text], + check=True, capture_output=True) + else: + subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) + except: + pass + + except ImportError: + print("\n❌ gTTS not installed. Install with: pip install gtts") + print("Falling back to espeak-ng...\n") + espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) + + +# -------------------------------------------------------------- +# ESPEAK-NG TTS WORKER (Fallback) +# -------------------------------------------------------------- +def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): + """Fallback to eSpeak-NG with Hindi support""" + + try: + subprocess.run(['espeak-ng', '--version'], + capture_output=True, text=True, timeout=2, check=True) + except: + print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") + return + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + espeak_voice = 'hi' if current_lang == 'hi' else voice + subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', '175', clean_text], + check=True, capture_output=True) + + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + + +# -------------------------------------------------------------- +# AUDIO RESAMPLING FUNCTION +# -------------------------------------------------------------- +def resample_audio(audio, orig_sr, target_sr=16000): + """Resample audio to target sample rate for Whisper""" + if orig_sr == target_sr: + return audio + + # Simple resampling using linear interpolation + duration = len(audio) / orig_sr + target_length = int(duration * target_sr) + + from scipy import signal + resampled = signal.resample(audio, target_length) + return resampled.astype(np.float32) + + +# -------------------------------------------------------------- +# MAIN ASSISTANT CLASS +# -------------------------------------------------------------- +class TruckAssistant: + def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", + voice_gender="female", use_gtts=True, language="en"): + self.model = model + self.base_url = base_url + self.conversation_history = [] + self.language = language + + # Detect and store the device's native sample rate + self.native_samplerate = get_default_samplerate() + self.whisper_samplerate = 16000 # Whisper expects 16kHz + + # Language-specific system prompts + self.system_prompts = { + "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", + "hi": "आप ट्रक ड्राइवरों के लिए एक सहायक AI सहायक हैं। स्पष्ट, संक्षिप्त और व्यावहारिक उत्तर प्रदान करें। कृपया हिंदी में उत्तर दें।" + } + + whisper_model = "tiny" if language == "hi" else "tiny.en" + print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") + self.whisper = WhisperModel( + whisper_model, + device="cpu", + compute_type="int8", + num_workers=2 + ) + + # TTS queue + process + self.tts_queue = Queue() + + if use_gtts: + self.tts_process = Process( + target=gtts_tts_worker, + args=(self.tts_queue, voice_gender, language), + daemon=True + ) + else: + voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" + self.tts_process = Process( + target=espeak_tts_worker, + args=(self.tts_queue, voice, language), + daemon=True + ) + + self.tts_process.start() + + # ========== ADAPTIVE MIC RECORDING WITH VAD ========== + def record_audio(self, max_duration=5): + """Records audio with Voice Activity Detection using device's native sample rate""" + print("\nListening... Speak now.\n") + + silence_threshold = 0.01 + silence_duration = 1.5 + + chunk_size = int(0.1 * self.native_samplerate) + max_chunks = int(max_duration / 0.1) + + audio_chunks = [] + silent_chunks = 0 + speech_detected = False + + try: + stream = sd.InputStream( + samplerate=self.native_samplerate, + channels=1, + dtype='float32' + ) + stream.start() + + for i in range(max_chunks): + chunk, _ = stream.read(chunk_size) + audio_chunks.append(chunk) + + energy = np.sqrt(np.mean(chunk**2)) + + if energy > silence_threshold: + speech_detected = True + silent_chunks = 0 + elif speech_detected: + silent_chunks += 1 + + if silent_chunks > (silence_duration / 0.1): + print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") + break + + stream.stop() + stream.close() + + audio = np.concatenate(audio_chunks, axis=0).flatten() + + # Resample to 16kHz for Whisper + if self.native_samplerate != self.whisper_samplerate: + print(f"Resampling audio from {self.native_samplerate}Hz to {self.whisper_samplerate}Hz...") + audio = resample_audio(audio, self.native_samplerate, self.whisper_samplerate) + + return audio + + except Exception as e: + print(f"❌ Recording error: {e}") + return None + + # ========== OPTIMIZED STT ========== + def speech_to_text(self, audio): + """Faster transcription with optimized parameters""" + if audio is None: + return "" + + print("Converting speech to text...") + + lang_code = "hi" if self.language == "hi" else "en" + + try: + segments, info = self.whisper.transcribe( + audio, + beam_size=1, + vad_filter=True, + language=lang_code, + condition_on_previous_text=False + ) + + text = " ".join(seg.text for seg in segments).strip() + print(f"You said: {text}\n") + return text + except Exception as e: + print(f"❌ Transcription error: {e}") + return "" + + # ========== VOICE CHAT PIPELINE ========== + def voice_chat(self): + audio = self.record_audio() + + if audio is None: + print("Recording failed. Try again.\n") + return + + text = self.speech_to_text(audio) + + if not text: + print("No speech detected. Try again.\n") + return + + self.chat(text) + + # ========== OPTIMIZED LLaMA CHAT WITH LIVE TTS ========== + def chat(self, prompt, stream=True): + url = f"{self.base_url}/api/chat" + + # Prepare messages with system prompt + messages = [{"role": "system", "content": self.system_prompts[self.language]}] + messages.extend(self.conversation_history) + messages.append({"role": "user", "content": prompt}) + + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 150, + "num_ctx": 2048 + } + } + + print(f"\nAssistant: ", end="", flush=True) + + start_time = time.time() + full_response = "" + token_count = 0 + + try: + response = requests.post(url, json=payload, stream=True, timeout=30) + + if stream: + sentence_buffer = "" + + for line in response.iter_lines(): + if not line: + continue + + chunk = json.loads(line) + + if "message" in chunk and "content" in chunk["message"]: + content = chunk["message"]["content"] + + print(content, end="", flush=True) + + full_response += content + sentence_buffer += content + token_count += 1 + + # Sentence end detection (works for both English and Hindi) + if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";", "।", "?"]): + stripped = sentence_buffer.strip() + if len(stripped) > 5: + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + + if sentence_buffer.strip(): + self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) + + else: + data = response.json() + full_response = data["message"]["content"] + print(full_response) + self.tts_queue.put({"text": full_response, "lang": self.language}) + + inference_time = time.time() - start_time + tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 + print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") + + self.conversation_history.append({"role": "user", "content": prompt}) + self.conversation_history.append({"role": "assistant", "content": full_response}) + + return full_response + + except Exception as e: + print(f"\n❌ Error: {e}") + return None + + # ========== CLEANUP ========== + def stop(self): + self.tts_queue.put("__EXIT__") + self.tts_process.terminate() + + +# -------------------------------------------------------------- +# MAIN +# -------------------------------------------------------------- +def main(): + print("\n🚀 Truck Assistant - Raspberry Pi 5") + print("🎤 Natural Human Voice (Google TTS)") + print("🌐 Multilingual Support (English & Hindi)\n") + + # Language selection + print("Select Language:") + print("1. English") + print("2. Hindi (हिंदी)") + + lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" + language = "en" if lang_choice == "1" else "hi" + + # Simple voice selection + print("\nSelect Voice:") + print("1. Female (Natural)") + print("2. Male (Natural)") + + voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" + voice_gender = "female" if voice_choice == "1" else "male" + + lang_display = "English" if language == "en" else "हिंदी" + print(f"\n✅ Language: {lang_display}") + print(f"✅ Voice: {voice_gender.capitalize()}") + print("📥 Installing dependencies if needed...\n") + + assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) + + # Check Ollama + try: + requests.get("http://localhost:11434/api/tags", timeout=5) + print("✅ Ollama running\n") + except: + print("❌ Ollama not running. Start with: ollama serve\n") + return + + print("="*60) + print("Mode:") + print("1. Demo") + print("2. Text chat") + print("3. Voice chat") + print("="*60) + + mode = input("\nSelect (1-3): ").strip() + + if mode == "3": + print("\n🎤 VOICE MODE - Press Enter to speak\n") + while True: + try: + input("Press Enter...") + assistant.voice_chat() + except KeyboardInterrupt: + print("\n\n👋 Goodbye!") + assistant.stop() + break + else: + print("\n💬 TEXT MODE - type 'quit' to exit\n") + while True: + try: + user_input = input("You: ").strip() + if user_input.lower() in ["quit", "exit", "q"]: + assistant.stop() + print("\n👋 Goodbye!") + break + if user_input: + assistant.chat(user_input) + except KeyboardInterrupt: + print("\n\n👋 Goodbye!") + assistant.stop() + break + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ai_assistant2.py b/ai_assistant2.py new file mode 100644 index 0000000..ade3f0e --- /dev/null +++ b/ai_assistant2.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +Truck HPC AI Assistant - POC Demo (OPTIMIZED + TRULY NATURAL VOICE + HINDI SUPPORT) +Optimized for Raspberry Pi 5 with Ollama + Whisper STT + MaryTTS/Festival +NATURAL VOICE: Downloads and uses better quality voices +OFFLINE: 100% offline capability +MULTILINGUAL: English and Hindi support +FIXED: Auto-detects correct audio sample rate +FIXED: Proper loop control - waits for speech to complete before next input +""" + +import requests +import json +import time +import psutil +import sounddevice as sd +import numpy as np +import subprocess +import os +import re +import tempfile +import wave +from multiprocessing import Process, Queue +from faster_whisper import WhisperModel +from datetime import datetime + +# -------------------------------------------------------------- +# AUDIO DEVICE DETECTION +# -------------------------------------------------------------- +def get_default_samplerate(): + """Detect the default sample rate supported by the input device""" + try: + device_info = sd.query_devices(kind='input') + default_sr = int(device_info['default_samplerate']) + print(f"🎤 Detected audio device: {device_info['name']}") + print(f"🎵 Using sample rate: {default_sr} Hz") + return default_sr + except Exception as e: + print(f"⚠️ Could not detect sample rate, using 44100 Hz: {e}") + return 44100 + +# -------------------------------------------------------------- +# TEXT CLEANING FUNCTION +# -------------------------------------------------------------- +def clean_text_for_speech(text): + """Removes markdown formatting and special characters""" + text = re.sub(r'#{1,6}\s*', '', text) + text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) + text = re.sub(r'__(.+?)__', r'\1', text) + text = re.sub(r'\*(.+?)\*', r'\1', text) + text = re.sub(r'_(.+?)_', r'\1', text) + text = re.sub(r'```[\w]*\n', '', text) + text = re.sub(r'```', '', text) + text = re.sub(r'`(.+?)`', r'\1', text) + text = re.sub(r'^[-*_]{3,}$', '', text, flags=re.MULTILINE) + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) + text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'^\s*>\s+', '', text, flags=re.MULTILINE) + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +# -------------------------------------------------------------- +# GTTS CACHED TTS WORKER (Natural Voice with Local Cache + Hindi Support) +# -------------------------------------------------------------- +def gtts_tts_worker(tts_queue, voice_gender="female", language="en"): + """ + Uses gTTS with local caching for natural voice. + First run needs internet to download, then works offline. + Supports English and Hindi. + """ + try: + from gtts import gTTS + import hashlib + + # Create cache directory + cache_dir = os.path.expanduser("~/.cache/truck_assistant_tts") + os.makedirs(cache_dir, exist_ok=True) + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using Google TTS ({voice_gender} voice, {lang_name}) with local cache\n") + print("💡 First run needs internet, then works offline from cache\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + # Create hash for caching (include language in hash) + text_hash = hashlib.md5(f"{current_lang}_{clean_text}".encode()).hexdigest() + cache_file = os.path.join(cache_dir, f"{text_hash}.mp3") + + # Check if cached + if not os.path.exists(cache_file): + # Generate with gTTS (needs internet first time) + if current_lang == "en": + tld = "co.uk" if voice_gender == "female" else "com" + tts = gTTS(text=clean_text, lang='en', tld=tld, slow=False) + else: # Hindi + tts = gTTS(text=clean_text, lang='hi', slow=False) + + tts.save(cache_file) + + # Play using mpg123 (faster than converting to WAV) + subprocess.run(['mpg123', '-q', cache_file], check=True) + + # Natural pause + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + # Fallback to espeak if gTTS fails + try: + if isinstance(data, dict): + subprocess.run(['espeak-ng', '-v', 'hi' if data.get('lang') == 'hi' else 'en', clean_text], + check=True, capture_output=True) + else: + subprocess.run(['espeak-ng', clean_text], check=True, capture_output=True) + except: + pass + + except ImportError: + print("\n❌ gTTS not installed. Install with: pip install gtts") + print("Falling back to espeak-ng...\n") + espeak_tts_worker(tts_queue, "en-gb+f3" if voice_gender == "female" else "en-us+m3", language) + + +# -------------------------------------------------------------- +# ESPEAK-NG TTS WORKER (Fallback) +# -------------------------------------------------------------- +def espeak_tts_worker(tts_queue, voice="en-gb+f3", language="en"): + """Fallback to eSpeak-NG with Hindi support""" + + try: + subprocess.run(['espeak-ng', '--version'], + capture_output=True, text=True, timeout=2, check=True) + except: + print("\n❌ eSpeak-NG not found! Install with: sudo apt install espeak-ng") + return + + lang_name = "English" if language == "en" else "Hindi" + print(f"✅ Using eSpeak-NG ({voice} voice, {lang_name})\n") + + while True: + data = tts_queue.get() + if data == "__EXIT__": + break + + try: + # Support for language switching + if isinstance(data, dict): + text = data['text'] + current_lang = data.get('lang', language) + else: + text = data + current_lang = language + + clean_text = clean_text_for_speech(text) + + if not clean_text: + continue + + espeak_voice = 'hi' if current_lang == 'hi' else voice + subprocess.run(['espeak-ng', '-v', espeak_voice, '-s', '175', clean_text], + check=True, capture_output=True) + + if clean_text.endswith(("?", "!")): + time.sleep(0.15) + elif clean_text.endswith("."): + time.sleep(0.10) + else: + time.sleep(0.05) + + except Exception as e: + print(f"[TTS ERROR] {e}") + + +# -------------------------------------------------------------- +# AUDIO RESAMPLING FUNCTION +# -------------------------------------------------------------- +def resample_audio(audio, orig_sr, target_sr=16000): + """Resample audio to target sample rate for Whisper""" + if orig_sr == target_sr: + return audio + + # Simple resampling using linear interpolation + duration = len(audio) / orig_sr + target_length = int(duration * target_sr) + + from scipy import signal + resampled = signal.resample(audio, target_length) + return resampled.astype(np.float32) + + +# -------------------------------------------------------------- +# MAIN ASSISTANT CLASS +# -------------------------------------------------------------- +class TruckAssistant: + def __init__(self, model="llama3.2:3b-instruct-q4_K_M", base_url="http://localhost:11434", + voice_gender="female", use_gtts=True, language="en"): + self.model = model + self.base_url = base_url + self.conversation_history = [] + self.language = language + + # Detect and store the device's native sample rate + self.native_samplerate = get_default_samplerate() + self.whisper_samplerate = 16000 # Whisper expects 16kHz + + # Language-specific system prompts + self.system_prompts = { + "en": "You are a helpful AI assistant for truck drivers. Provide clear, concise, and practical answers.", + "hi": "आप ट्रक ड्राइवरों के लिए एक सहायक AI सहायक हैं। स्पष्ट, संक्षिप्त और व्यावहारिक उत्तर प्रदान करें। कृपया हिंदी में उत्तर दें।" + } + + whisper_model = "tiny" if language == "hi" else "tiny.en" + print(f"Loading Whisper model ({whisper_model} - optimized for speed)...") + self.whisper = WhisperModel( + whisper_model, + device="cpu", + compute_type="int8", + num_workers=2 + ) + + # TTS queue + process + self.tts_queue = Queue() + + if use_gtts: + self.tts_process = Process( + target=gtts_tts_worker, + args=(self.tts_queue, voice_gender, language), + daemon=True + ) + else: + voice = "en-gb+f3" if voice_gender == "female" else "en-us+m3" + self.tts_process = Process( + target=espeak_tts_worker, + args=(self.tts_queue, voice, language), + daemon=True + ) + + self.tts_process.start() + + # ========== ADAPTIVE MIC RECORDING WITH VAD ========== + def record_audio(self, max_duration=5): + """Records audio with Voice Activity Detection using device's native sample rate""" + print("\nListening... Speak now.\n") + + silence_threshold = 0.01 + silence_duration = 1.5 + + chunk_size = int(0.1 * self.native_samplerate) + max_chunks = int(max_duration / 0.1) + + audio_chunks = [] + silent_chunks = 0 + speech_detected = False + + try: + stream = sd.InputStream( + samplerate=self.native_samplerate, + channels=1, + dtype='float32' + ) + stream.start() + + for i in range(max_chunks): + chunk, _ = stream.read(chunk_size) + audio_chunks.append(chunk) + + energy = np.sqrt(np.mean(chunk**2)) + + if energy > silence_threshold: + speech_detected = True + silent_chunks = 0 + elif speech_detected: + silent_chunks += 1 + + if silent_chunks > (silence_duration / 0.1): + print(f"[Silence detected - stopping early after {(i+1)*0.1:.1f}s]") + break + + stream.stop() + stream.close() + + audio = np.concatenate(audio_chunks, axis=0).flatten() + + # Resample to 16kHz for Whisper + if self.native_samplerate != self.whisper_samplerate: + print(f"Resampling audio from {self.native_samplerate}Hz to {self.whisper_samplerate}Hz...") + audio = resample_audio(audio, self.native_samplerate, self.whisper_samplerate) + + return audio + + except Exception as e: + print(f"❌ Recording error: {e}") + return None + + # ========== OPTIMIZED STT ========== + def speech_to_text(self, audio): + """Faster transcription with optimized parameters""" + if audio is None: + return "" + + print("Converting speech to text...") + + lang_code = "hi" if self.language == "hi" else "en" + + try: + segments, info = self.whisper.transcribe( + audio, + beam_size=1, + vad_filter=True, + language=lang_code, + condition_on_previous_text=False + ) + + text = " ".join(seg.text for seg in segments).strip() + print(f"You said: {text}\n") + return text + except Exception as e: + print(f"❌ Transcription error: {e}") + return "" + + # ========== VOICE CHAT PIPELINE ========== + def voice_chat(self): + audio = self.record_audio() + + if audio is None: + print("Recording failed. Try again.\n") + return + + text = self.speech_to_text(audio) + + if not text: + print("No speech detected. Try again.\n") + return + + self.chat(text) + + # ========== OPTIMIZED LLaMA CHAT WITH LIVE TTS ========== + def chat(self, prompt, stream=True): + url = f"{self.base_url}/api/chat" + + # Prepare messages with system prompt + messages = [{"role": "system", "content": self.system_prompts[self.language]}] + messages.extend(self.conversation_history) + messages.append({"role": "user", "content": prompt}) + + payload = { + "model": self.model, + "messages": messages, + "stream": stream, + "options": { + "temperature": 0.7, + "top_p": 0.9, + "num_predict": 150, + "num_ctx": 2048 + } + } + + print(f"\nAssistant: ", end="", flush=True) + + start_time = time.time() + full_response = "" + token_count = 0 + + try: + response = requests.post(url, json=payload, stream=True, timeout=30) + + if stream: + sentence_buffer = "" + + for line in response.iter_lines(): + if not line: + continue + + chunk = json.loads(line) + + if "message" in chunk and "content" in chunk["message"]: + content = chunk["message"]["content"] + + print(content, end="", flush=True) + + full_response += content + sentence_buffer += content + token_count += 1 + + # Sentence end detection (works for both English and Hindi) + if any(sentence_buffer.endswith(p) for p in [".", "!", "?", ",", ";", "।", "?"]): + stripped = sentence_buffer.strip() + if len(stripped) > 5: + self.tts_queue.put({"text": stripped, "lang": self.language}) + sentence_buffer = "" + + if sentence_buffer.strip(): + self.tts_queue.put({"text": sentence_buffer.strip(), "lang": self.language}) + + else: + data = response.json() + full_response = data["message"]["content"] + print(full_response) + self.tts_queue.put({"text": full_response, "lang": self.language}) + + inference_time = time.time() - start_time + tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 + print(f"\n\n⚡ Time: {inference_time:.2f}s | Speed: {tokens_per_sec:.1f} tokens/sec") + + self.conversation_history.append({"role": "user", "content": prompt}) + self.conversation_history.append({"role": "assistant", "content": full_response}) + + # Wait for TTS queue to be empty (all speech completed) + print("\n[Waiting for speech to complete...]") + while not self.tts_queue.empty(): + time.sleep(0.1) + + # Additional small delay to ensure the last audio finishes playing + time.sleep(0.5) + + return full_response + + except Exception as e: + print(f"\n❌ Error: {e}") + return None + + # ========== CLEANUP ========== + def stop(self): + self.tts_queue.put("__EXIT__") + self.tts_process.terminate() + + +# -------------------------------------------------------------- +# MAIN +# -------------------------------------------------------------- +def main(): + print("\n🚀 Truck Assistant - Raspberry Pi 5") + print("🎤 Natural Human Voice (Google TTS)") + print("🌐 Multilingual Support (English & Hindi)\n") + + # Language selection + print("Select Language:") + print("1. English") + print("2. Hindi (हिंदी)") + + lang_choice = input("\nLanguage (1 or 2, default=1): ").strip() or "1" + language = "en" if lang_choice == "1" else "hi" + + # Simple voice selection + print("\nSelect Voice:") + print("1. Female (Natural)") + print("2. Male (Natural)") + + voice_choice = input("\nVoice (1 or 2, default=1): ").strip() or "1" + voice_gender = "female" if voice_choice == "1" else "male" + + lang_display = "English" if language == "en" else "हिंदी" + print(f"\n✅ Language: {lang_display}") + print(f"✅ Voice: {voice_gender.capitalize()}") + print("📥 Installing dependencies if needed...\n") + + assistant = TruckAssistant(voice_gender=voice_gender, use_gtts=True, language=language) + + # Check Ollama + try: + requests.get("http://localhost:11434/api/tags", timeout=5) + print("✅ Ollama running\n") + except: + print("❌ Ollama not running. Start with: ollama serve\n") + return + + print("="*60) + print("Mode:") + print("1. Demo") + print("2. Text chat") + print("3. Voice chat") + print("="*60) + + mode = input("\nSelect (1-3): ").strip() + + if mode == "3": + print("\n🎤 VOICE MODE - Press Enter to speak, Ctrl+C to exit\n") + try: + while True: + input("Press Enter to speak...") + assistant.voice_chat() + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + print("\n\n👋 Exiting gracefully...") + assistant.stop() + print("Goodbye!") + else: + print("\n💬 TEXT MODE - type 'quit' to exit\n") + try: + while True: + user_input = input("You: ").strip() + if user_input.lower() in ["quit", "exit", "q"]: + assistant.stop() + print("\n👋 Goodbye!") + break + if user_input: + assistant.chat(user_input) + print("\n" + "="*60 + "\n") + except KeyboardInterrupt: + print("\n\n👋 Exiting gracefully...") + assistant.stop() + print("Goodbye!") + + +if __name__ == "__main__": + main() \ No newline at end of file