diff --git a/README.md b/README.md
index 124ff85..ba9fd10 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
- instant performance | Cohere / Parakeet / Whisper / Gemini / ElevenLabs / REST API | stylish visuals
+ instant performance | Cohere / Parakeet / Whisper / Gemini / ElevenLabs / 60db / REST API | stylish visuals
@@ -28,7 +28,7 @@ https://github.com/user-attachments/assets/4c223e85-2916-494f-b7b1-766ce1bdc991
- **GPU memory efficient** - Limit or zero memory usage easily, more for other local models
- **onnx-asr for wild CPU speeds** - No GPU? Optimized for great speed on any hardware
- **Translation** - Translate non-English to English with a single config
-- **REST API or websockets** - Secure, fast wires to top clouds like Gemini, ElevenLabs
+- **REST API or websockets** - Secure, fast wires to top clouds like Gemini, ElevenLabs, 60db
- **Themed visualizer** - Visualizes your voice, will automatch Omarchy theme
- **Word overides and prompts** - Custom hot keys, common words, and more
- **Multi-lingual** - Great performance in many languages
diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md
index 7933f03..4d088ae 100644
--- a/docs/CONFIGURATION.md
+++ b/docs/CONFIGURATION.md
@@ -172,7 +172,7 @@ Use a different hotkey for a specific language:
> **Note**: Works with backends that support language parameters:
> - **REST API**: Works if the endpoint accepts `language` in the request body
-> - **Realtime WebSocket**: Fully supported (OpenAI, Google, ElevenLabs)
+> - **Realtime WebSocket**: Fully supported (OpenAI, Google, ElevenLabs, 60db)
> - **Local whisper models**: Fully supported (all pywhispercpp models)
> - **Custom REST endpoints**: May not work if the endpoint doesn't accept a language parameter
@@ -289,7 +289,7 @@ For up-to-date accuracy rankings across open-source models, see the [Open ASR Le
| faster-whisper | Local | NVIDIA or CPU | Fast | 99 | Very good | — |
| whisper.cpp | Local | NVIDIA, AMD/Intel, CPU | Very fast | 99 | Very good | — |
| REST API | Cloud | — | Varies | Varies | Varies | Cohere, OpenAI, Groq, Regolo |
-| Realtime WebSocket | Cloud | — | Real-time | Varies | Varies | Google Gemini, OpenAI, ElevenLabs |
+| Realtime WebSocket | Cloud | — | Real-time | Varies | Varies | Google Gemini, OpenAI, ElevenLabs, 60db |
---
@@ -691,6 +691,54 @@ Uses native 16kHz audio (no resampling) and auto-reconnects on connection drops.
}
```
+#### 60db Realtime STT
+
+Realtime streaming transcription via [60db](https://60db.ai/), 39 languages with optional speaker diarization.
+
+Bring an API key from your 60db account (sent as `?apiKey=` on the socket; keys start with `sk_live_`).
+
+Uses native 16kHz audio (no resampling) and auto-reconnects on connection drops.
+
+- **transcribe** (default) - speech-to-text
+
+```jsonc
+{
+ "transcription_backend": "realtime-ws",
+ "websocket_provider": "60db",
+ "websocket_model": "60db-stt-realtime",
+ "realtime_timeout": 30, // Advanced: seconds to wait after stop for final transcript
+ "realtime_buffer_max_seconds": 5, // Advanced: max unsent audio backlog (seconds) before dropping old chunks
+ "sixtydb_diarize": false, // Advanced: enable speaker diarization (adds cost)
+ "sixtydb_utterance_end_ms": 500, // Advanced: silence (ms) before an utterance is finalized (>=300)
+ "sixtydb_audio_enhancement": "adaptive" // Advanced: "off", "light", or "adaptive"
+}
+```
+
+#### 60db Text-to-Speech (CLI)
+
+60db also provides text-to-speech. This is a standalone utility — it is **not** wired into the dictation pipeline (hyprwhspr never speaks on its own). It reuses the `60db` API key stored above.
+
+List the voices available to your account:
+
+```bash
+hyprwhspr 60db voices
+```
+
+Synthesize text to a WAV file:
+
+```bash
+hyprwhspr 60db tts --text "Hello from hyprwhspr" --voice --out hello.wav
+# Options: --sample-rate {8000,16000,24000,48000} (default 24000), --speed 0.5-2.0 (default 1.0)
+```
+
+Set a default voice so `--voice` can be omitted:
+
+```jsonc
+{
+ "sixtydb_tts_voice_id": "" // Default voice for `hyprwhspr 60db tts`
+}
+```
+
## Audio and visual feedback
### Themed visualizer
diff --git a/lib/cli.py b/lib/cli.py
index ab91d46..324094e 100755
--- a/lib/cli.py
+++ b/lib/cli.py
@@ -37,6 +37,7 @@
keyboard_command,
record_command,
record_capture_command,
+ sixtydb_command,
)
@@ -178,6 +179,22 @@ def main():
help='Language code for transcription (e.g., en, it, de)')
record_subparsers.add_parser('status', help='Show current recording status')
+ # 60db command (text-to-speech utilities)
+ sixtydb_parser = subparsers.add_parser('60db', help='60db text-to-speech utilities')
+ sixtydb_subparsers = sixtydb_parser.add_subparsers(dest='sixtydb_action', help='60db actions')
+ sixtydb_subparsers.add_parser('voices', help='List voices available to your 60db account')
+ sixtydb_tts_parser = sixtydb_subparsers.add_parser('tts', help='Synthesize text to a WAV file')
+ sixtydb_tts_parser.add_argument('--text', required=True, help='Text to synthesize')
+ sixtydb_tts_parser.add_argument('--voice', dest='voice', metavar='VOICE_ID',
+ help='Voice ID (default: sixtydb_tts_voice_id from config)')
+ sixtydb_tts_parser.add_argument('--out', metavar='PATH', default='tts-output.wav',
+ help='Output WAV path (default: tts-output.wav)')
+ sixtydb_tts_parser.add_argument('--sample-rate', dest='sample_rate', type=int, default=24000,
+ choices=[8000, 16000, 24000, 48000],
+ help='Output sample rate in Hz (default: 24000)')
+ sixtydb_tts_parser.add_argument('--speed', type=float, default=1.0,
+ help='Speech speed multiplier 0.5-2.0 (default: 1.0)')
+
# backend command
backend_parser = subparsers.add_parser('backend', help='Backend management')
backend_subparsers = backend_parser.add_subparsers(dest='backend_action', help='Backend actions')
@@ -312,6 +329,11 @@ def main():
record_capture_command(language=getattr(args, 'language', None))
else:
record_command(args.record_action, language=getattr(args, 'language', None))
+ elif args.command == '60db':
+ if not getattr(args, 'sixtydb_action', None):
+ sixtydb_parser.print_help()
+ sys.exit(1)
+ sixtydb_command(args.sixtydb_action, args=args)
elif args.command == 'uninstall':
uninstall_command(
keep_models=getattr(args, 'keep_models', False),
diff --git a/lib/src/cli_commands.py b/lib/src/cli_commands.py
index b431f64..f261c8a 100644
--- a/lib/src/cli_commands.py
+++ b/lib/src/cli_commands.py
@@ -5788,3 +5788,124 @@ def record_capture_command(language: str = None):
except OSError as e:
log_error(f"Capture socket error: {e}")
sys.exit(1)
+
+
+def sixtydb_command(action: str, args=None):
+ """
+ 60db text-to-speech utilities (independent of the dictation pipeline).
+
+ Actions:
+ voices - list the voices available to your 60db account
+ tts - synthesize text to a WAV file
+
+ Args:
+ action: Sub-action ('voices' or 'tts')
+ args: Parsed argparse namespace (used by 'tts')
+ """
+ api_key = get_credential('60db')
+ if not api_key:
+ log_error("No 60db API key found.")
+ log_info("Add one with the realtime setup, or store it directly:")
+ log_info(" hyprwhspr setup (choose realtime-ws -> 60db)")
+ sys.exit(1)
+
+ if action == 'voices':
+ _sixtydb_voices(api_key)
+ elif action == 'tts':
+ _sixtydb_tts(api_key, args)
+ else:
+ log_error(f"Unknown 60db action: {action}")
+ sys.exit(1)
+
+
+def _sixtydb_voices(api_key: str):
+ """List the caller's 60db voices in a table."""
+ try:
+ from .sixtydb_tts_client import list_my_voices, SixtyDbTTSError
+ except ImportError:
+ from sixtydb_tts_client import list_my_voices, SixtyDbTTSError
+
+ try:
+ voices = list_my_voices(api_key)
+ except SixtyDbTTSError as e:
+ log_error(f"Failed to list voices: {e}")
+ sys.exit(1)
+ except Exception as e:
+ log_error(f"Failed to list voices: {e}")
+ sys.exit(1)
+
+ if not voices:
+ log_warning("No voices found for this 60db account.")
+ return
+
+ console = Console()
+ table = Table(title="60db Voices")
+ table.add_column("Voice ID", style="cyan", no_wrap=True)
+ table.add_column("Name", style="green")
+ table.add_column("Category")
+ table.add_column("Model")
+ table.add_column("Language")
+ table.add_column("Gender")
+
+ for v in voices:
+ labels = v.get('labels') or {}
+ table.add_row(
+ str(v.get('voice_id', '')),
+ str(v.get('name', '')),
+ str(v.get('category', '')),
+ str(v.get('model', '')),
+ str(labels.get('language_name') or labels.get('language') or ''),
+ str(labels.get('gender', '')),
+ )
+
+ console.print(table)
+
+
+def _sixtydb_tts(api_key: str, args):
+ """Synthesize text to a WAV file via the 60db WebSocket TTS API."""
+ try:
+ from .sixtydb_tts_client import SixtyDbTTSClient, pcm16_to_wav, SixtyDbTTSError
+ except ImportError:
+ from sixtydb_tts_client import SixtyDbTTSClient, pcm16_to_wav, SixtyDbTTSError
+
+ text = getattr(args, 'text', None)
+ voice_id = getattr(args, 'voice', None)
+ out_path = getattr(args, 'out', None) or 'tts-output.wav'
+ sample_rate = int(getattr(args, 'sample_rate', None) or 24000)
+ speed = float(getattr(args, 'speed', None) or 1.0)
+
+ if not text:
+ log_error("--text is required")
+ sys.exit(1)
+
+ # Fall back to a configured default voice if one isn't supplied.
+ if not voice_id:
+ try:
+ voice_id = ConfigManager().get_setting('sixtydb_tts_voice_id', None)
+ except Exception:
+ voice_id = None
+ if not voice_id:
+ log_error("--voice is required (no sixtydb_tts_voice_id configured).")
+ log_info("List available voices with: hyprwhspr 60db voices")
+ sys.exit(1)
+
+ client = SixtyDbTTSClient(api_key)
+ try:
+ log_info(f"Synthesizing {len(text)} chars with voice {voice_id}...")
+ pcm = client.synthesize(text, voice_id, sample_rate=sample_rate, speed=speed)
+ except SixtyDbTTSError as e:
+ log_error(f"TTS failed: {e}")
+ sys.exit(1)
+ except Exception as e:
+ log_error(f"TTS failed: {e}")
+ sys.exit(1)
+
+ wav_bytes = pcm16_to_wav(pcm, sample_rate=sample_rate)
+ try:
+ Path(out_path).write_bytes(wav_bytes)
+ except OSError as e:
+ log_error(f"Failed to write {out_path}: {e}")
+ sys.exit(1)
+
+ duration = len(pcm) / 2 / sample_rate # 16-bit mono
+ log_success(f"Wrote {out_path} ({len(wav_bytes)} bytes, ~{duration:.1f}s audio)")
diff --git a/lib/src/provider_registry.py b/lib/src/provider_registry.py
index eabc0fb..d94f5fe 100644
--- a/lib/src/provider_registry.py
+++ b/lib/src/provider_registry.py
@@ -140,6 +140,27 @@
'hidden': True
}
}
+ },
+ '60db': {
+ 'name': '60db',
+ 'endpoint': 'https://api.60db.ai/stt',
+ 'websocket_endpoint': 'wss://api.60db.ai/ws/stt',
+ # 60db authenticates the WebSocket via an ?apiKey= query param (REST uses
+ # an Authorization: Bearer header). Keys are prefixed 'sk_live_' / 'sk_test_',
+ # but we don't enforce a prefix to stay permissive across key types.
+ 'api_key_prefix': None,
+ 'api_key_description': '60db API key (from app.60db.ai, starts with sk_live_)',
+ 'models': {
+ '60db-stt-realtime': {
+ 'name': '60db Realtime STT',
+ 'description': 'Realtime streaming transcription, 39 languages, optional diarization',
+ # 60db's STT WebSocket selects models server-side from languages/config,
+ # so there is no model_id to send. Kept for the REST batch endpoint.
+ 'body': {},
+ 'realtime_model': True,
+ 'hidden': True
+ }
+ }
}
}
diff --git a/lib/src/sixtydb_realtime_client.py b/lib/src/sixtydb_realtime_client.py
new file mode 100644
index 0000000..0b1a178
--- /dev/null
+++ b/lib/src/sixtydb_realtime_client.py
@@ -0,0 +1,656 @@
+"""
+60db Realtime STT client.
+
+Streaming speech-to-text over 60db's WebSocket API
+(wss://api.60db.ai/ws/stt). Provider-specific protocol, but exposes the same
+interface as RealtimeClient / ElevenLabsRealtimeClient so whisper_manager can
+drive it interchangeably:
+
+ connect / append_audio / clear_audio_buffer / commit_and_get_text /
+ update_language / set_max_buffer_seconds / close (+ `connected`, `language`)
+
+60db accepts 16kHz linear PCM natively, which is exactly what AudioCapture
+produces, so no resampling is needed (same as ElevenLabs Scribe v2).
+
+Protocol summary (see https://docs.60db.ai/websocket-api/stt):
+ - Auth via ?apiKey=... query param on the socket URL.
+ - Server emits {"connection_established": {...}} after auth.
+ - Client sends {"type": "start", languages, config} to open a session.
+ - Server replies {"type": "connected", ...} when ready.
+ - Client streams {"type": "audio", "audio": , ...}.
+ - Server emits {"type": "transcription", text, is_final, speech_final, ...}.
+ Interim results have is_final=False; finalized results have
+ is_final=True AND speech_final=True.
+ - Client sends {"type": "stop"} to flush and end the session.
+"""
+
+import sys
+import json
+import base64
+import threading
+import time
+from typing import Optional
+from queue import Queue, Empty
+from collections import deque
+
+try:
+ import numpy as np
+except (ImportError, ModuleNotFoundError) as e:
+ print("ERROR: python-numpy is not available in this Python environment.", file=sys.stderr)
+ print(f"ImportError: {e}", file=sys.stderr)
+ sys.exit(1)
+
+try:
+ import websocket
+except (ImportError, ModuleNotFoundError) as e:
+ print("ERROR: websocket-client is not available in this Python environment.", file=sys.stderr)
+ print(f"ImportError: {e}", file=sys.stderr)
+ print("\nThis is a required dependency. Please install it:", file=sys.stderr)
+ print(" pip install websocket-client>=1.6.0", file=sys.stderr)
+ sys.exit(1)
+
+
+class SixtyDbRealtimeClient:
+ """WebSocket client for 60db realtime speech-to-text."""
+
+ SAMPLE_RATE = 16000 # 60db supports 16kHz linear PCM natively (no resampling)
+ VALID_AUDIO_ENHANCEMENTS = {'off', 'light', 'adaptive'}
+
+ def __init__(self):
+ self.ws = None
+ self.url = None
+ self.api_key = None
+ self.model = None # Unused by 60db STT WS (kept for interface parity)
+ self.language = None # ISO 639-1 code, or None for auto-detect
+ self.partial_transcript_callback = None
+
+ # 60db-specific tuning (set by whisper_manager from config before connect)
+ self.diarize = False
+ self.utterance_end_ms = 500
+ self.audio_enhancement = 'adaptive'
+ self.continuous_mode = True
+
+ # 60db updates language by resending `config` mid-session, no reconnect needed.
+ self.supports_mid_session_language_update = True
+
+ # Threading
+ self.lock = threading.Lock()
+
+ # Connection state
+ self.connected = False # True once 60db sends {"type":"connected"}
+ self._socket_open = False # True once the WS transport is open
+ self.connecting = False
+ self.receiver_thread = None
+ self.receiver_running = False
+
+ # Event handling
+ self.event_queue = Queue()
+ self.response_event = threading.Event()
+
+ # Transcription assembly
+ self._transcript_generation = 0
+ self._committed_segments = []
+ self._partial_transcript = ""
+
+ # Track whether new audio has been queued since the last received transcript,
+ # so we don't return stale mid-stream text on stop.
+ self._audio_activity_id = 0
+ self._last_transcript_audio_activity_id = 0
+
+ # Audio streaming. append_audio() runs on the sounddevice callback thread,
+ # so it must be fast and non-blocking: just enqueue here, send elsewhere.
+ self._audio_queue = deque()
+ self.audio_buffer_seconds = 0.0
+ self.max_buffer_seconds = 5.0
+ self._dropped_chunks = 0
+ self._last_drop_log_time = 0.0
+
+ self._queue_cond = threading.Condition(self.lock)
+ self._sender_thread = None
+ self._sender_running = False
+
+ # Reconnection
+ self.reconnect_attempts = 0
+ self.max_reconnect_attempts = 5
+ self.reconnect_delays = [1, 2, 4, 8, 16] # Exponential backoff
+
+ # ------------------------------------------------------------------ #
+ # Connection
+ # ------------------------------------------------------------------ #
+ def connect(self, url: str, api_key: str, model: str, instructions: Optional[str] = None) -> bool:
+ """
+ Establish the WebSocket connection and open a 60db STT session.
+
+ Args:
+ url: Base WebSocket URL (e.g. 'wss://api.60db.ai/ws/stt').
+ api_key: 60db API key (sent as ?apiKey= query param).
+ model: Ignored by 60db STT (kept for interface parity).
+ instructions: Ignored.
+ """
+ self.url = url
+ self.api_key = api_key
+ self.model = model
+ return self._connect_internal()
+
+ def _auth_url(self) -> str:
+ """Append the apiKey query param expected by 60db."""
+ sep = '&' if ('?' in (self.url or '')) else '?'
+ return f'{self.url}{sep}apiKey={self.api_key}'
+
+ def _connect_internal(self) -> bool:
+ if self.connecting:
+ return False
+ self.connecting = True
+
+ try:
+ print(f'[60DB] Connecting to {self.url}...', flush=True)
+
+ self.ws = websocket.WebSocketApp(
+ self._auth_url(),
+ on_open=self._on_open,
+ on_message=self._on_message,
+ on_error=self._on_error,
+ on_close=self._on_close,
+ )
+
+ ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
+ ws_thread.start()
+
+ # Wait for the session to become ready ({"type":"connected"}).
+ timeout = 10.0
+ start_time = time.time()
+ while not self.connected and (time.time() - start_time) < timeout:
+ time.sleep(0.1)
+
+ if self.connected:
+ print('[60DB] Connected successfully', flush=True)
+ self.reconnect_attempts = 0
+ return True
+
+ print('[60DB] Connection timeout', flush=True)
+ try:
+ self.ws.close()
+ except Exception:
+ pass
+ return False
+
+ except Exception as e:
+ print(f'[60DB] Connection error: {e}', flush=True)
+ return False
+ finally:
+ self.connecting = False
+
+ def _on_open(self, _ws):
+ """Transport opened: start the receiver/sender threads.
+
+ The `start` message is sent only after the server confirms auth with
+ `connection_established` (see _handle_event), per the 60db handshake.
+ """
+ start_receiver = False
+ with self.lock:
+ self._socket_open = True
+ self.connecting = False
+ if not self.receiver_running:
+ self.receiver_running = True
+ start_receiver = True
+ self._queue_cond.notify_all()
+
+ if start_receiver:
+ self.receiver_thread = threading.Thread(target=self._receiver_loop, daemon=True)
+ self.receiver_thread.start()
+
+ self._start_sender_thread()
+
+ def _send_start(self):
+ """Send the session-initialization message that 60db requires before audio."""
+ if not self._socket_open or not self.ws:
+ return
+
+ start_msg = {
+ 'type': 'start',
+ 'config': self._build_config(),
+ }
+ languages = self._languages_list()
+ if languages:
+ start_msg['languages'] = languages
+
+ try:
+ self.ws.send(json.dumps(start_msg))
+ print('[60DB] Sent start', flush=True)
+ except Exception as e:
+ print(f'[60DB] Failed to send start: {e}', flush=True)
+
+ def _build_config(self) -> dict:
+ enhancement = self.audio_enhancement
+ if enhancement not in self.VALID_AUDIO_ENHANCEMENTS:
+ enhancement = 'adaptive'
+ return {
+ 'encoding': 'linear', # 16-bit linear PCM
+ 'sample_rate': self.SAMPLE_RATE,
+ 'utterance_end_ms': max(300, int(self.utterance_end_ms or 500)),
+ 'continuous_mode': bool(self.continuous_mode),
+ 'audio_enhancement': enhancement,
+ 'diarize': bool(self.diarize),
+ }
+
+ def _languages_list(self):
+ """60db wants an array of ISO codes (max 5); None/empty means auto-detect."""
+ if not self.language:
+ return None
+ return [self.language]
+
+ # ------------------------------------------------------------------ #
+ # Sender / receiver threads
+ # ------------------------------------------------------------------ #
+ def _start_sender_thread(self):
+ thread_to_join = None
+ with self.lock:
+ if self._sender_thread and self._sender_thread.is_alive():
+ if not self._sender_running:
+ thread_to_join = self._sender_thread
+ else:
+ return
+
+ if thread_to_join:
+ try:
+ thread_to_join.join(timeout=1.0)
+ except Exception:
+ pass
+
+ with self.lock:
+ if self._sender_thread and self._sender_thread.is_alive() and self._sender_running:
+ return
+ self._sender_running = True
+ self._sender_thread = threading.Thread(target=self._sender_loop, daemon=True)
+ self._sender_thread.start()
+
+ def _sender_loop(self):
+ """Drain queued audio and stream it as base64 linear-PCM `audio` messages."""
+ while True:
+ with self.lock:
+ self._queue_cond.wait_for(
+ lambda: (not self._sender_running)
+ or (self.connected and self.ws and len(self._audio_queue) > 0)
+ )
+
+ if not self._sender_running:
+ return
+
+ if not (self.connected and self.ws):
+ continue
+
+ audio_chunk = self._audio_queue.popleft()
+ chunk_duration = len(audio_chunk) / float(self.SAMPLE_RATE)
+ self.audio_buffer_seconds = max(0.0, self.audio_buffer_seconds - chunk_duration)
+ ws = self.ws
+
+ if not self._audio_queue:
+ self._queue_cond.notify_all()
+
+ try:
+ base64_audio = base64.b64encode(self._float32_to_pcm16(audio_chunk)).decode('utf-8')
+ event = {
+ 'type': 'audio',
+ 'audio': base64_audio,
+ 'encoding': 'linear',
+ 'sample_rate': self.SAMPLE_RATE,
+ }
+ ws.send(json.dumps(event))
+ except Exception as e:
+ print(f'[60DB] Failed to send queued audio: {e}', flush=True)
+
+ def _receiver_loop(self):
+ while self.receiver_running:
+ try:
+ event = self.event_queue.get(timeout=0.1)
+ self._handle_event(event)
+ except Empty:
+ continue
+ except Exception as e:
+ print(f'[60DB] Error in receiver loop: {e}', flush=True)
+
+ def _on_message(self, _ws, message):
+ try:
+ event = json.loads(message)
+ except json.JSONDecodeError as e:
+ print(f'[60DB] Failed to parse event: {e}', flush=True)
+ return
+ self.event_queue.put(event)
+
+ def _on_error(self, _ws, error):
+ print(f'[60DB] WebSocket error: {error}', flush=True)
+
+ def _on_close(self, _ws, close_status_code, _close_msg):
+ with self.lock:
+ self.connected = False
+ self._socket_open = False
+ self._sender_running = False
+ self._audio_queue.clear()
+ self.audio_buffer_seconds = 0.0
+ self._queue_cond.notify_all()
+
+ print(f'[60DB] WebSocket closed (code: {close_status_code})', flush=True)
+
+ if self.receiver_running and close_status_code != 1000: # 1000 = normal
+ self._attempt_reconnect()
+
+ # ------------------------------------------------------------------ #
+ # Event handling
+ # ------------------------------------------------------------------ #
+ def _handle_event(self, event: dict):
+ event_type = event.get('type', '')
+
+ # Some server frames are unkeyed objects (e.g. {"connection_established": {...}}).
+ if not event_type:
+ if 'connection_established' in event:
+ # Auth confirmed — only now open the transcription session.
+ print('[60DB] Authenticated', flush=True)
+ self._send_start()
+ return
+
+ if event_type == 'connected':
+ # Session ready — only now can we safely stream audio.
+ with self.lock:
+ self.connected = True
+ self._queue_cond.notify_all()
+ print('[60DB] Session ready', flush=True)
+
+ elif event_type == 'speech_started':
+ with self.lock:
+ self._partial_transcript = ""
+ self._notify_partial_transcript("")
+
+ elif event_type == 'transcription':
+ self._handle_transcription(event)
+
+ elif event_type in ('language_changed', 'mode_changed'):
+ print(f'[60DB] {event_type}', flush=True)
+
+ elif event_type == 'session_stopped':
+ print('[60DB] Session stopped', flush=True)
+
+ elif event_type == 'error':
+ message = event.get('message', 'Unknown error')
+ print(f'[60DB] Server error: {message}', flush=True)
+ with self.lock:
+ self._partial_transcript = ""
+ self._notify_partial_transcript("")
+ self.response_event.set() # Unblock any waiter
+
+ def _handle_transcription(self, event: dict):
+ text = (event.get('text', '') or '').strip()
+ is_final = bool(event.get('is_final', False))
+ speech_final = bool(event.get('speech_final', False))
+
+ # A finalized utterance: is_final AND speech_final (the canonical result).
+ if is_final and speech_final:
+ with self.lock:
+ if text: # Empty finals are silence/hallucination rejections — skip.
+ self._committed_segments.append(text)
+ self._transcript_generation += 1
+ self._last_transcript_audio_activity_id = self._audio_activity_id
+ self._partial_transcript = ""
+ self._notify_partial_transcript("")
+ self.response_event.set()
+ print(f'[60DB] Final transcript ({len(text)} chars)', flush=True)
+ else:
+ # Interim (is_final False) or fast pre-refinement final (speech_final False):
+ # treat as a live preview without committing.
+ with self.lock:
+ self._partial_transcript = text
+ self._notify_partial_transcript(text)
+
+ # ------------------------------------------------------------------ #
+ # Language / callbacks
+ # ------------------------------------------------------------------ #
+ def update_language(self, language: Optional[str]):
+ """Update transcription language (resends `config` mid-session, no reconnect)."""
+ self.language = language
+ if not self.connected or not self.ws:
+ return
+ msg = {'type': 'config', 'languages': self._languages_list() or []}
+ try:
+ self.ws.send(json.dumps(msg))
+ print(f'[60DB] Language set to: {language or "auto-detect"}', flush=True)
+ except Exception as e:
+ print(f'[60DB] Failed to update language: {e}', flush=True)
+
+ def set_partial_transcript_callback(self, callback):
+ """Register a callback for live transcription previews."""
+ self.partial_transcript_callback = callback
+
+ def _notify_partial_transcript(self, text: str):
+ callback = self.partial_transcript_callback
+ if not callback:
+ return
+ try:
+ callback(text)
+ except Exception as e:
+ print(f'[60DB] Partial transcript callback failed: {e}', flush=True)
+
+ # ------------------------------------------------------------------ #
+ # Reconnection
+ # ------------------------------------------------------------------ #
+ def _attempt_reconnect(self):
+ if self.reconnect_attempts >= self.max_reconnect_attempts:
+ print('[60DB] Max reconnection attempts reached', flush=True)
+ return False
+
+ delay = self.reconnect_delays[min(self.reconnect_attempts, len(self.reconnect_delays) - 1)]
+ self.reconnect_attempts += 1
+
+ print(
+ f'[60DB] Reconnecting (attempt {self.reconnect_attempts}/'
+ f'{self.max_reconnect_attempts}) in {delay}s...',
+ flush=True,
+ )
+ time.sleep(delay)
+ return self._connect_internal()
+
+ # ------------------------------------------------------------------ #
+ # Audio in
+ # ------------------------------------------------------------------ #
+ def _float32_to_pcm16(self, audio_data: np.ndarray) -> bytes:
+ audio_clipped = np.clip(audio_data, -1.0, 1.0)
+ audio_int16 = (audio_clipped * 32767).astype(np.int16)
+ return audio_int16.tobytes() # little-endian int16
+
+ def clear_audio_buffer(self):
+ """Reset transcription/audio state before a new recording."""
+ with self.lock:
+ self._audio_queue.clear()
+ self.audio_buffer_seconds = 0.0
+ self._transcript_generation = 0
+ self._committed_segments = []
+ self._partial_transcript = ""
+ self._audio_activity_id = 0
+ self._last_transcript_audio_activity_id = 0
+ self._dropped_chunks = 0
+ self._last_drop_log_time = 0.0
+ self._queue_cond.notify_all()
+ self.response_event.clear()
+ self._notify_partial_transcript("")
+
+ def append_audio(self, audio_chunk: np.ndarray):
+ """
+ Queue an audio chunk for streaming (float32, mono, 16kHz).
+
+ Called from the sounddevice callback thread — fast/non-blocking only.
+ Caps the unsent backlog at max_buffer_seconds by dropping the oldest chunks.
+ """
+ if not self.connected or not self.ws:
+ return
+
+ drop_msg = None
+ with self.lock:
+ chunk_duration = len(audio_chunk) / float(self.SAMPLE_RATE)
+
+ while (
+ (self.audio_buffer_seconds + chunk_duration) > self.max_buffer_seconds
+ and self._audio_queue
+ ):
+ dropped = self._audio_queue.popleft()
+ dropped_duration = len(dropped) / float(self.SAMPLE_RATE)
+ self.audio_buffer_seconds = max(0.0, self.audio_buffer_seconds - dropped_duration)
+ self._dropped_chunks += 1
+
+ if (self.audio_buffer_seconds + chunk_duration) > self.max_buffer_seconds:
+ self._dropped_chunks += 1
+ else:
+ self._audio_queue.append(audio_chunk)
+ self.audio_buffer_seconds += chunk_duration
+ self._audio_activity_id += 1
+ self._queue_cond.notify_all()
+
+ now = time.time()
+ if self._dropped_chunks and (now - self._last_drop_log_time) > 2.0:
+ drop_msg = (
+ f'[60DB] Dropping audio chunk(s) (queued>{self.max_buffer_seconds:.1f}s). '
+ f'dropped_chunks={self._dropped_chunks}'
+ )
+ self._last_drop_log_time = now
+
+ if drop_msg:
+ print(drop_msg, flush=True)
+
+ # ------------------------------------------------------------------ #
+ # Commit / fetch transcript
+ # ------------------------------------------------------------------ #
+ def _full_committed_text_locked(self) -> str:
+ parts = [p for p in self._committed_segments if p]
+ return ' '.join(parts).strip()
+
+ def commit_and_get_text(self, timeout: float = 30.0) -> str:
+ """
+ Flush the session and return the final transcript.
+
+ 60db finalizes utterances itself (VAD + utterance_end_ms), so a transcript
+ is often already available before the user stops. We drain queued audio,
+ send {"type":"stop"} to flush the tail, then wait for the newest final.
+ """
+ if not self.connected or not self.ws:
+ print('[60DB] Not connected, cannot commit', flush=True)
+ return ""
+
+ try:
+ with self.lock:
+ existing_generation = self._transcript_generation
+ existing_transcript = self._full_committed_text_locked()
+ has_new_audio_since_transcript = (
+ self._audio_activity_id != self._last_transcript_audio_activity_id
+ )
+ has_queued_audio = len(self._audio_queue) > 0
+
+ # Common case: 60db already finalized before the user released.
+ if existing_transcript and (not has_new_audio_since_transcript) and (not has_queued_audio):
+ result = existing_transcript
+ self._committed_segments = []
+ self._transcript_generation = 0
+ self.audio_buffer_seconds = 0.0
+ self.response_event.clear()
+ print(f'[60DB] Using existing transcript ({len(result)} chars)', flush=True)
+ return result
+
+ self.response_event.clear()
+ queued_seconds = float(self.audio_buffer_seconds)
+ max_backlog = float(self.max_buffer_seconds)
+
+ # Best-effort: let queued audio drain before flushing the session.
+ drain_timeout = min(max_backlog + 1.0, max(0.5, timeout * 0.5, queued_seconds + 0.25))
+ with self.lock:
+ self._queue_cond.wait_for(lambda: len(self._audio_queue) == 0, timeout=drain_timeout)
+
+ # Small grace so any in-flight send reaches the server before we flush.
+ time.sleep(0.05)
+
+ try:
+ self.ws.send(json.dumps({'type': 'stop'}))
+ print('[60DB] Sent stop, waiting for transcript...', flush=True)
+ except Exception as e:
+ print(f'[60DB] Failed to send stop: {e}', flush=True)
+
+ deadline = time.time() + max(0.0, timeout)
+ best_generation = existing_generation
+ best_text = ""
+
+ while time.time() < deadline:
+ remaining = max(0.0, deadline - time.time())
+ if not self.response_event.wait(timeout=remaining):
+ break
+
+ with self.lock:
+ if self._transcript_generation > best_generation:
+ best_generation = self._transcript_generation
+ best_text = self._full_committed_text_locked()
+
+ # Settle briefly to catch a late LLM-refined / punctuation final.
+ settle_deadline = min(deadline, time.time() + 0.6)
+ self.response_event.clear()
+ while time.time() < settle_deadline:
+ settle_remaining = max(0.0, settle_deadline - time.time())
+ if not self.response_event.wait(timeout=settle_remaining):
+ break
+ with self.lock:
+ if self._transcript_generation > best_generation:
+ best_generation = self._transcript_generation
+ best_text = self._full_committed_text_locked()
+ self.response_event.clear()
+
+ result = (best_text or "").strip()
+ with self.lock:
+ self._committed_segments = []
+ self._transcript_generation = 0
+ self.audio_buffer_seconds = 0.0
+ print(f'[60DB] Transcript received ({len(result)} chars)', flush=True)
+ return result
+
+ print(f'[60DB] Timeout waiting for transcript ({timeout}s)', flush=True)
+
+ # Fallback: latest committed text, else last partial, else empty.
+ with self.lock:
+ fallback = self._full_committed_text_locked()
+ if fallback:
+ self._committed_segments = []
+ self._transcript_generation = 0
+ return fallback
+ if self._partial_transcript:
+ result = self._partial_transcript.strip()
+ self._partial_transcript = ""
+ return result
+ return ""
+
+ except Exception as e:
+ print(f'[60DB] Error in commit_and_get_text: {e}', flush=True)
+ return ""
+
+ # ------------------------------------------------------------------ #
+ # Lifecycle
+ # ------------------------------------------------------------------ #
+ def close(self):
+ with self.lock:
+ self._sender_running = False
+ self.receiver_running = False
+ self._audio_queue.clear()
+ self.audio_buffer_seconds = 0.0
+ self._queue_cond.notify_all()
+
+ if self.ws:
+ try:
+ self.ws.close()
+ except Exception:
+ pass
+
+ if self.receiver_thread and self.receiver_thread.is_alive():
+ self.receiver_thread.join(timeout=1.0)
+
+ if self._sender_thread and self._sender_thread.is_alive():
+ self._sender_thread.join(timeout=1.0)
+
+ with self.lock:
+ self.connected = False
+ self._socket_open = False
+
+ print('[60DB] Connection closed', flush=True)
+
+ def set_max_buffer_seconds(self, seconds: float):
+ self.max_buffer_seconds = max(1.0, seconds)
diff --git a/lib/src/sixtydb_tts_client.py b/lib/src/sixtydb_tts_client.py
new file mode 100644
index 0000000..e832597
--- /dev/null
+++ b/lib/src/sixtydb_tts_client.py
@@ -0,0 +1,233 @@
+"""
+60db Text-to-Speech client (library + CLI helper).
+
+Synthesizes speech from text over 60db's WebSocket TTS API
+(wss://api.60db.ai/ws/tts) and lists the caller's voices via the REST
+endpoint (GET https://api.60db.ai/myvoices).
+
+This is a standalone capability: hyprwhspr's dictation pipeline does not consume
+TTS, so nothing here is wired to a hotkey. It exists for the `hyprwhspr 60db`
+CLI commands and for programmatic use.
+
+Protocol summary (see https://docs.60db.ai/websocket-api/tts):
+ - Auth via ?apiKey=... query param on the socket URL.
+ - Server emits {"connection_established": {...}} after auth.
+ - Client sends create_context (context_id, voice_id, audio_config, tuning).
+ - Server replies context_created.
+ - Client sends send_text, then flush_context to trigger synthesis.
+ - Server streams audio_chunk messages ({audioContent: }), then
+ flush_completed when the flushed text is fully synthesized.
+ - Client sends close_context; server replies context_closed and closes.
+
+Audio is 16-bit signed little-endian PCM (LINEAR16), mono, at the requested
+sample rate (8k/16k/24k/48k). Chunks concatenate directly.
+"""
+
+import io
+import json
+import sys
+import time
+import uuid
+import wave
+from typing import List, Optional
+
+try:
+ import websocket # websocket-client
+except (ImportError, ModuleNotFoundError) as e:
+ print("ERROR: websocket-client is not available in this Python environment.", file=sys.stderr)
+ print(f"ImportError: {e}", file=sys.stderr)
+ print("\nThis is a required dependency. Please install it:", file=sys.stderr)
+ print(" pip install websocket-client>=1.6.0", file=sys.stderr)
+ raise
+
+try:
+ import requests
+except (ImportError, ModuleNotFoundError) as e:
+ print("ERROR: requests is not available in this Python environment.", file=sys.stderr)
+ print(f"ImportError: {e}", file=sys.stderr)
+ raise
+
+
+DEFAULT_WS_URL = 'wss://api.60db.ai/ws/tts'
+DEFAULT_VOICES_URL = 'https://api.60db.ai/myvoices'
+VALID_SAMPLE_RATES = (8000, 16000, 24000, 48000)
+
+
+class SixtyDbTTSError(Exception):
+ """Raised on a 60db TTS synthesis or API failure."""
+
+
+def list_my_voices(api_key: str, url: str = DEFAULT_VOICES_URL, timeout: float = 30.0) -> List[dict]:
+ """
+ Return the caller's available 60db voices.
+
+ Args:
+ api_key: 60db API key (sent as 'Authorization: Bearer ...').
+
+ Returns:
+ A list of voice dicts (voice_id, name, category, model, labels, ...).
+ Empty list if the account has no voices.
+ """
+ resp = requests.get(
+ url,
+ headers={'Authorization': f'Bearer {api_key}'},
+ timeout=timeout,
+ )
+ if resp.status_code != 200:
+ raise SixtyDbTTSError(f'Voices request failed (HTTP {resp.status_code}): {resp.text[:200]}')
+
+ try:
+ payload = resp.json()
+ except ValueError as e:
+ raise SixtyDbTTSError(f'Voices response was not valid JSON: {e}')
+
+ if isinstance(payload, dict):
+ if payload.get('success') is False:
+ raise SixtyDbTTSError(payload.get('message', 'Voices request unsuccessful'))
+ data = payload.get('data', [])
+ else:
+ data = payload
+ return data if isinstance(data, list) else []
+
+
+class SixtyDbTTSClient:
+ """One-shot / reusable synchronous client for 60db WebSocket TTS."""
+
+ def __init__(self, api_key: str, ws_url: str = DEFAULT_WS_URL):
+ self.api_key = api_key
+ self.ws_url = ws_url
+
+ def _auth_url(self) -> str:
+ sep = '&' if ('?' in self.ws_url) else '?'
+ return f'{self.ws_url}{sep}apiKey={self.api_key}'
+
+ def synthesize(
+ self,
+ text: str,
+ voice_id: str,
+ sample_rate: int = 24000,
+ speed: float = 1.0,
+ stability: float = 50,
+ similarity: float = 75,
+ timeout: float = 60.0,
+ ) -> bytes:
+ """
+ Synthesize `text` with `voice_id` and return raw PCM16 (mono) bytes.
+
+ Use pcm16_to_wav() to wrap the result in a playable WAV container.
+ """
+ if not text or not text.strip():
+ raise SixtyDbTTSError('text is required')
+ if not voice_id:
+ raise SixtyDbTTSError('voice_id is required')
+ if sample_rate not in VALID_SAMPLE_RATES:
+ raise SixtyDbTTSError(
+ f'sample_rate must be one of {VALID_SAMPLE_RATES}, got {sample_rate}'
+ )
+
+ context_id = uuid.uuid4().hex
+ ws = websocket.create_connection(self._auth_url(), timeout=timeout)
+ audio = bytearray()
+ deadline = time.time() + timeout
+
+ try:
+ # 1) Open a synthesis context.
+ ws.send(json.dumps({
+ 'type': 'create_context',
+ 'context_id': context_id,
+ 'voice_id': voice_id,
+ 'audio_config': {
+ 'audio_encoding': 'LINEAR16',
+ 'sample_rate_hertz': sample_rate,
+ },
+ 'speed': speed,
+ 'stability': stability,
+ 'similarity': similarity,
+ }))
+
+ # 2) Send the text and flush to trigger synthesis.
+ ws.send(json.dumps({
+ 'type': 'send_text',
+ 'context_id': context_id,
+ 'text': text,
+ }))
+ ws.send(json.dumps({
+ 'type': 'flush_context',
+ 'context_id': context_id,
+ }))
+
+ # 3) Collect audio_chunk messages until flush completes.
+ flushed = False
+ while time.time() < deadline:
+ remaining = max(0.1, deadline - time.time())
+ ws.settimeout(remaining)
+ try:
+ raw = ws.recv()
+ except websocket.WebSocketTimeoutException:
+ break
+ if raw is None or raw == '':
+ continue
+ if isinstance(raw, bytes):
+ # Defensive: a server that streams binary frames sends raw PCM.
+ audio.extend(raw)
+ continue
+
+ msg = self._parse(raw)
+ if msg is None:
+ continue
+
+ mtype = msg.get('type') or self._implicit_type(msg)
+ if mtype == 'audio_chunk':
+ chunk_b64 = msg.get('audioContent') or msg.get('audio') or ''
+ if chunk_b64:
+ import base64
+ audio.extend(base64.b64decode(chunk_b64))
+ elif mtype == 'flush_completed':
+ flushed = True
+ break
+ elif mtype == 'error':
+ raise SixtyDbTTSError(msg.get('message', 'Unknown TTS error'))
+
+ if not flushed and not audio:
+ raise SixtyDbTTSError('No audio received before timeout')
+
+ # 4) Close the context cleanly.
+ try:
+ ws.send(json.dumps({'type': 'close_context', 'context_id': context_id}))
+ except Exception:
+ pass
+
+ return bytes(audio)
+
+ finally:
+ try:
+ ws.close()
+ except Exception:
+ pass
+
+ @staticmethod
+ def _parse(raw) -> Optional[dict]:
+ try:
+ obj = json.loads(raw)
+ return obj if isinstance(obj, dict) else None
+ except (json.JSONDecodeError, TypeError):
+ return None
+
+ @staticmethod
+ def _implicit_type(msg: dict) -> str:
+ """Map unkeyed server frames (e.g. {"context_created": {...}}) to a type."""
+ for key in ('audio_chunk', 'flush_completed', 'context_created', 'context_closed', 'error'):
+ if key in msg:
+ return key
+ return ''
+
+
+def pcm16_to_wav(pcm_bytes: bytes, sample_rate: int = 24000, channels: int = 1) -> bytes:
+ """Wrap raw 16-bit little-endian PCM in a WAV container."""
+ buf = io.BytesIO()
+ with wave.open(buf, 'wb') as wf:
+ wf.setnchannels(channels)
+ wf.setsampwidth(2) # 16-bit
+ wf.setframerate(sample_rate)
+ wf.writeframes(pcm_bytes)
+ return buf.getvalue()
diff --git a/lib/src/whisper_manager.py b/lib/src/whisper_manager.py
index cae6c10..714031b 100644
--- a/lib/src/whisper_manager.py
+++ b/lib/src/whisper_manager.py
@@ -480,9 +480,72 @@ def _send_direct(audio_chunk: np.ndarray):
self._realtime_client.append_audio(audio_chunk)
except Exception as e:
print(f'[ELEVENLABS] Streaming error: {e}', flush=True)
-
+
self._realtime_streaming_callback = _send_direct
-
+
+ elif provider_id == '60db':
+ # Use 60db-specific realtime STT client
+ try:
+ from .sixtydb_realtime_client import SixtyDbRealtimeClient
+ except ImportError:
+ from sixtydb_realtime_client import SixtyDbRealtimeClient
+
+ self._realtime_client = SixtyDbRealtimeClient()
+
+ # Get WebSocket URL
+ websocket_url = self.config.get_setting('websocket_url')
+ if not websocket_url:
+ provider = get_provider(provider_id)
+ if provider and 'websocket_endpoint' in provider:
+ websocket_url = provider['websocket_endpoint']
+ else:
+ websocket_url = 'wss://api.60db.ai/ws/stt'
+
+ # Set language (sent in the `start`/`config` message)
+ language = self.config.get_setting('language', None)
+ self._realtime_client.language = language
+
+ # 60db-specific tuning (all optional; sane defaults in the client)
+ self._realtime_client.diarize = bool(
+ self.config.get_setting('sixtydb_diarize', False)
+ )
+ self._realtime_client.utterance_end_ms = self.config.get_setting(
+ 'sixtydb_utterance_end_ms', 500
+ )
+ self._realtime_client.audio_enhancement = self.config.get_setting(
+ 'sixtydb_audio_enhancement', 'adaptive'
+ )
+
+ # Set buffer max seconds
+ buffer_max = self.config.get_setting('realtime_buffer_max_seconds', 5)
+ self._realtime_client.set_max_buffer_seconds(buffer_max)
+
+ # Connect (60db doesn't use instructions; API key goes in URL query param)
+ self._realtime_connect_params = {
+ 'websocket_url': websocket_url,
+ 'api_key': api_key,
+ 'model_id': model_id,
+ 'instructions': None,
+ }
+ if not self._realtime_client.connect(websocket_url, api_key, model_id, None):
+ print('ERROR: Failed to connect to 60db Realtime WebSocket')
+ try:
+ self._realtime_client.close()
+ except Exception:
+ pass
+ self._realtime_client = None
+ return False
+
+ # 60db uses 16kHz audio - no resampling needed!
+ def _send_direct(audio_chunk: np.ndarray):
+ """Send audio directly to 60db (16kHz, no resampling)"""
+ try:
+ self._realtime_client.append_audio(audio_chunk)
+ except Exception as e:
+ print(f'[60DB] Streaming error: {e}', flush=True)
+
+ self._realtime_streaming_callback = _send_direct
+
else:
# Use OpenAI-compatible client (default)
try:
diff --git a/requirements.txt b/requirements.txt
index 8c68aae..c10d1da 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -27,3 +27,6 @@ pycairo>=1.25.0
# ElevenLabs integration
elevenlabs>=2.40.0
+
+# 60db integration (realtime STT + TTS) reuses websocket-client and requests above;
+# no additional dependency required.
diff --git a/share/config.schema.json b/share/config.schema.json
index e75dc0f..37d75af 100644
--- a/share/config.schema.json
+++ b/share/config.schema.json
@@ -242,7 +242,7 @@
"websocket_provider": {
"type": ["string", "null"],
"default": null,
- "description": "Provider identifier for WebSocket credential lookup (e.g., 'openai', 'google', 'elevenlabs')"
+ "description": "Provider identifier for WebSocket credential lookup (e.g., 'openai', 'google', 'elevenlabs', '60db')"
},
"websocket_model": {
"type": ["string", "null"],
@@ -278,6 +278,28 @@
"default": "low",
"description": "Latency/accuracy delay setting for OpenAI gpt-realtime-whisper transcription"
},
+ "sixtydb_diarize": {
+ "type": "boolean",
+ "default": false,
+ "description": "Enable speaker diarization for the 60db realtime STT backend (adds cost)"
+ },
+ "sixtydb_utterance_end_ms": {
+ "type": "integer",
+ "minimum": 300,
+ "default": 500,
+ "description": "Silence threshold in ms before 60db finalizes an utterance (>=300)"
+ },
+ "sixtydb_audio_enhancement": {
+ "type": "string",
+ "enum": ["off", "light", "adaptive"],
+ "default": "adaptive",
+ "description": "Audio enhancement mode for the 60db realtime STT backend"
+ },
+ "sixtydb_tts_voice_id": {
+ "type": ["string", "null"],
+ "default": null,
+ "description": "Default 60db voice ID used by 'hyprwhspr 60db tts' when --voice is omitted"
+ },
"onnx_asr_model": {
"type": "string",
"default": "nemo-parakeet-tdt-0.6b-v3",
diff --git a/tests/test_sixtydb_client.py b/tests/test_sixtydb_client.py
new file mode 100644
index 0000000..22be049
--- /dev/null
+++ b/tests/test_sixtydb_client.py
@@ -0,0 +1,139 @@
+import json
+import sys
+import types
+import unittest
+from pathlib import Path
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "lib" / "src"))
+sys.modules.setdefault("websocket", types.SimpleNamespace(WebSocketApp=object))
+
+from sixtydb_realtime_client import SixtyDbRealtimeClient
+
+
+class FakeWebSocket:
+ def __init__(self):
+ self.sent = []
+
+ def send(self, payload):
+ self.sent.append(json.loads(payload))
+
+
+class SixtyDbRealtimeClientTests(unittest.TestCase):
+ def _client_with_ws(self):
+ client = SixtyDbRealtimeClient()
+ client.connected = True
+ client._socket_open = True
+ client.ws = FakeWebSocket()
+ return client
+
+ def test_auth_url_appends_api_key_query_param(self):
+ client = SixtyDbRealtimeClient()
+ client.url = "wss://api.60db.ai/ws/stt"
+ client.api_key = "sk_live_abc"
+ self.assertEqual(client._auth_url(), "wss://api.60db.ai/ws/stt?apiKey=sk_live_abc")
+
+ def test_start_message_carries_language_and_config(self):
+ client = self._client_with_ws()
+ client.language = "en"
+ client.diarize = True
+ client.utterance_end_ms = 700
+
+ client._send_start()
+
+ start = client.ws.sent[-1]
+ self.assertEqual(start["type"], "start")
+ self.assertEqual(start["languages"], ["en"])
+ self.assertEqual(start["config"]["encoding"], "linear")
+ self.assertEqual(start["config"]["sample_rate"], 16000)
+ self.assertEqual(start["config"]["utterance_end_ms"], 700)
+ self.assertTrue(start["config"]["diarize"])
+
+ def test_start_message_omits_languages_for_auto_detect(self):
+ client = self._client_with_ws()
+ client.language = None
+
+ client._send_start()
+
+ self.assertNotIn("languages", client.ws.sent[-1])
+
+ def test_interim_is_preview_only_and_final_commits(self):
+ previews = []
+ client = self._client_with_ws()
+ client.set_partial_transcript_callback(previews.append)
+
+ client._handle_event({"type": "transcription", "text": "hello", "is_final": False, "speech_final": False})
+ client._handle_event({"type": "transcription", "text": "hello world", "is_final": True, "speech_final": True})
+
+ self.assertEqual(previews, ["hello", ""])
+ self.assertEqual(client.commit_and_get_text(timeout=0.1), "hello world")
+
+ def test_fast_prefinal_is_not_committed(self):
+ # is_final True but speech_final False = fast pre-refinement text -> preview only.
+ previews = []
+ client = self._client_with_ws()
+ client.set_partial_transcript_callback(previews.append)
+
+ client._handle_event({"type": "transcription", "text": "draft text", "is_final": True, "speech_final": False})
+
+ self.assertEqual(client._partial_transcript, "draft text")
+ self.assertEqual(client._committed_segments, [])
+ self.assertEqual(previews, ["draft text"])
+
+ def test_empty_final_is_skipped(self):
+ client = self._client_with_ws()
+ client._handle_event({"type": "transcription", "text": "", "is_final": True, "speech_final": True})
+ self.assertEqual(client._committed_segments, [])
+
+ def test_multiple_finals_are_stitched_in_order(self):
+ client = self._client_with_ws()
+ client._handle_event({"type": "transcription", "text": "first.", "is_final": True, "speech_final": True})
+ client._handle_event({"type": "transcription", "text": "second.", "is_final": True, "speech_final": True})
+ self.assertEqual(client.commit_and_get_text(timeout=0.1), "first. second.")
+
+ def test_speech_started_clears_stale_partial(self):
+ previews = []
+ client = self._client_with_ws()
+ client.set_partial_transcript_callback(previews.append)
+
+ client._handle_event({"type": "transcription", "text": "stale", "is_final": False, "speech_final": False})
+ client._handle_event({"type": "speech_started"})
+
+ self.assertEqual(client._partial_transcript, "")
+ self.assertEqual(previews[-1], "")
+
+ def test_commit_sends_stop(self):
+ client = self._client_with_ws()
+ # No committed text yet and no queued audio -> falls through to sending stop.
+ client._handle_event({"type": "transcription", "text": "done", "is_final": True, "speech_final": True})
+ # Mark there is "new audio" so the fast path is skipped and stop is sent.
+ client._audio_activity_id = 5
+ client._last_transcript_audio_activity_id = 4
+ client.commit_and_get_text(timeout=0.1)
+ self.assertIn({"type": "stop"}, client.ws.sent)
+
+ def test_clear_audio_buffer_resets_state(self):
+ previews = []
+ client = self._client_with_ws()
+ client.set_partial_transcript_callback(previews.append)
+ client._handle_event({"type": "transcription", "text": "x", "is_final": True, "speech_final": True})
+
+ client.clear_audio_buffer()
+
+ self.assertEqual(client._committed_segments, [])
+ self.assertEqual(client._partial_transcript, "")
+ self.assertEqual(client._transcript_generation, 0)
+ self.assertEqual(previews[-1], "")
+
+ def test_provider_registry_has_60db_realtime_model(self):
+ from provider_registry import get_provider
+ provider = get_provider("60db")
+ self.assertIsNotNone(provider)
+ self.assertEqual(provider["websocket_endpoint"], "wss://api.60db.ai/ws/stt")
+ realtime = [m for m, d in provider["models"].items() if d.get("realtime_model")]
+ self.assertTrue(realtime)
+
+
+if __name__ == "__main__":
+ unittest.main()