Python SDK Documentation
Official Python SDK for the OrbitalsAI API with synchronous, asynchronous, and real-time streaming support. Build powerful African language transcription into your Python applications.
Installation
Install the core SDK using pip:
pip install orbitalsaiTo enable streaming and audio helpers, use the optional extras defined in the Python package:
# Add streaming dependencies (WebSocket + NumPy)pip install "orbitalsai[streaming]"# Add audio conversion + microphone helperspip install "orbitalsai[audio]"# Install everything (streaming + audio extras)pip install "orbitalsai[all]"Authentication
Get your API key from the dashboard and initialize the client:
import orbitalsaiclient = orbitalsai.Client(api_key="your_api_key_here")Best Practice
Basic Usage
The sync client lets you upload audio and get back transcripts. The subsections below cover uploading and waiting, language and model selection, SRT generation, and listing past tasks.
Upload & Transcribe
Transcribe an audio file with automatic waiting:
import orbitalsaiclient = orbitalsai.Client(api_key="your_api_key_here")# Transcribe with automatic waitingtranscript = client.transcribe("audio.mp3", model_name="Perigee-1")print(transcript.text)print(transcript.task_id)print(transcript.original_filename)Automatic Waiting
transcribe() method automatically waits for the transcription to complete before returning.Specify a Language
Transcribe with a specific African language:
# Transcribe in Hausatranscript = client.transcribe( "audio.mp3", language="hausa")# Transcribe in Igbotranscript = client.transcribe( "audio.mp3", language="igbo")See the full list of supported languages.
Model Selection
Choose the best AI model for your needs. You can list available models and their pricing:
# List available modelsmodels = client.get_models()for model in models: print(f"{model.model_name}: $\{model.transcription_rate_per_hour}/hr")# Transcribe using a specific modeltranscript = client.transcribe( "audio.mp3", model_name="Perigee-1" # Default model)Generate SRT Subtitles
Generate subtitle files in SRT format:
transcript = client.transcribe( "audio.mp3", language="yoruba", generate_srt=True)print(transcript.text)print(transcript.srt_content) # SRT subtitle content# Save to filewith open("subtitles.srt", "w", encoding="utf-8") as f: f.write(transcript.srt_content)List Past Tasks
List all transcription tasks (paginated):
# List all transcription tasks (paginated)tasks = client.get_tasks(page=1, page_size=20)for task in tasks.items: print(f"{task.original_filename}: {task.status} (created: {task.created_at})")Paginate task history
page and page_size to paginate large task histories.Async Usage
Use the async client for non-blocking operations:
import asyncioimport orbitalsaiasync def transcribe_audio() -> None: async with orbitalsai.AsyncClient(api_key="your_api_key_here") as client: transcript = await client.transcribe( "audio.mp3", language="swahili", model_name="Perigee-1", ) print(transcript.text)if __name__ == "__main__": asyncio.run(transcribe_audio())Concurrent Processing
Manual Task Control
Submit a transcription task without waiting and poll for completion. Use wait=False to get back a task immediately, then poll with get_task() or block with wait_for_task().
import orbitalsaiclient = orbitalsai.Client(api_key="your_api_key_here")# Start transcription without waiting for completiontranscript = client.transcribe( "audio.mp3", language="hausa", model_name="Perigee-1", wait=False,)print(f"Task ID: {transcript.task_id}")# Poll the task statustask = client.get_task(transcript.task_id)print(f"Status: {task.status}")# Or block until the task completesfinal_transcript = client.wait_for_task(transcript.task_id)print(final_transcript.text)Error Handling
Handle common batch transcription errors gracefully:
from orbitalsai.exceptions import ( AuthenticationError, InsufficientBalanceError, TranscriptionError, TimeoutError)try: transcript = client.transcribe("audio.mp3", language="pidgin") print(transcript.text)except AuthenticationError: print("Invalid API key")except InsufficientBalanceError: print("Not enough balance. Please top up your account.")except TranscriptionError as e: print(f"Transcription failed: {e}")except TimeoutError: print("Transcription took too long")except Exception as e: print(f"Unexpected error: {e}")Balance Management
Check your account balance and usage:
from datetime import date, timedelta# Check current balancebalance = client.get_balance()print(f"Current balance: $\{balance.balance:.2f}")print(f"Last updated: {balance.last_updated}")# Get daily usage for the last 7 daysend_date = date.today()start_date = end_date - timedelta(days=7)daily_usage = client.get_daily_usage(start_date=start_date, end_date=end_date)print(f"Total cost: $\{daily_usage.total_cost:.2f}")print(f"Total audio processed: {daily_usage.total_audio_seconds:.1f} seconds")for record in daily_usage.daily_records: print(f"{record.date}: $\{record.total_cost:.4f} ({record.transcription_usage:.1f}s transcription)")Complete Example
A complete batch transcription example with error handling and balance checking:
import osfrom datetime import date, timedeltaimport orbitalsaifrom orbitalsai.exceptions import InsufficientBalanceError, TranscriptionErrordef main() -> None: # Initialize client client = orbitalsai.Client(api_key=os.environ["ORBITALSAI_API_KEY"]) # Check balance balance = client.get_balance() print(f"Current balance: $\{balance.balance:.2f}") if balance.balance <= 0: print("Low or zero balance. Please top up before transcribing.") return # Transcribe audio with SRT try: transcript = client.transcribe( "audio.mp3", language="hausa", model_name="Perigee-1", generate_srt=True, ) print("Transcription successful!\n") print(transcript.text) # Save transcript with open("transcript.txt", "w", encoding="utf-8") as f: f.write(transcript.text) # Save subtitles if available if transcript.srt_content: with open("subtitles.srt", "w", encoding="utf-8") as f: f.write(transcript.srt_content) except InsufficientBalanceError: print("Insufficient balance – please top up your account.") except TranscriptionError as e: print(f"Transcription failed: {e}") # Print recent usage summary end_date = date.today() start_date = end_date - timedelta(days=7) usage = client.get_daily_usage(start_date=start_date, end_date=end_date) print(f"\nLast 7 days cost: $\{usage.total_cost:.2f}") print(f"Total audio processed: {usage.total_audio_seconds:.1f}s")if __name__ == "__main__": main()Real-time streaming from Python
The Python SDK ships with a dedicated streaming module at orbitalsai.streaming for real-time transcription over WebSockets. Use the async client for most applications. The subsections below cover streaming from files, microphone input, configuration, and error handling.
import asynciofrom orbitalsai.streaming import AsyncStreamingClient, PrintingEventHandlersasync def main() -> None: async with AsyncStreamingClient(api_key="your_api_key_here") as client: await client.connect(PrintingEventHandlers()) with open("audio.pcm", "rb") as f: while chunk := f.read(16000): await client.send_audio(chunk) await client.flush()if __name__ == "__main__": asyncio.run(main())Stream from Audio Files (MP3, WAV, M4A…)
Use AudioConverter to convert common audio formats to PCM16 before streaming.
import asyncioimport osfrom orbitalsai.streaming import ( AsyncStreamingClient, StreamingConfig, PrintingEventHandlers, AudioConverter,)API_KEY = os.getenv("ORBITALSAI_API_KEY", "your_api_key_here")async def stream_file(file_path: str, language: str = "english") -> None: config = StreamingConfig( language=language, sample_rate=16000, chunk_size=8000, interim_results=True, ) # Convert audio file (MP3/WAV/M4A/FLAC/OGG) to PCM16 at 16kHz audio_bytes, _ = AudioConverter.from_file(file_path, target_sample_rate=16000) chunks = AudioConverter.split_chunks(audio_bytes, chunk_size=config.chunk_size) async with AsyncStreamingClient(api_key=API_KEY, config=config) as client: await client.connect(PrintingEventHandlers()) for chunk in chunks: await client.send_audio(chunk) await asyncio.sleep(config.chunk_duration_ms / 1000.0) await client.flush()if __name__ == "__main__": asyncio.run(stream_file("speech.mp3", language="english"))Install audio extras
pip install "orbitalsai[audio]" to get AudioConverter and its dependencies (soundfile, librosa).Stream from Microphone
import asyncioimport osimport sysimport numpy as npimport sounddevice as sdfrom orbitalsai.streaming import ( AsyncStreamingClient, StreamingConfig, PrintingEventHandlers,)API_KEY = os.getenv("ORBITALSAI_API_KEY", "your_api_key_here")async def stream_microphone(language: str = "english", duration: int = 30) -> None: sample_rate = 16000 blocksize = 8000 # 500ms chunks config = StreamingConfig( language=language, sample_rate=sample_rate, chunk_size=blocksize, interim_results=True, ) audio_queue: asyncio.Queue[bytes] = asyncio.Queue() def audio_callback(indata, frames, time_info, status) -> None: if status: print(f"Audio status: {status}", file=sys.stderr) audio_queue.put_nowait(indata.tobytes()) async with AsyncStreamingClient(api_key=API_KEY, config=config) as client: await client.connect(PrintingEventHandlers(show_partials=True)) with sd.InputStream( samplerate=sample_rate, channels=1, dtype="int16", blocksize=blocksize, callback=audio_callback, ): end_time = asyncio.get_event_loop().time() + duration while asyncio.get_event_loop().time() < end_time: try: audio_data = await asyncio.wait_for(audio_queue.get(), timeout=1.0) except asyncio.TimeoutError: continue await client.send_audio(audio_data) await client.flush() await asyncio.sleep(2.0)if __name__ == "__main__": asyncio.run(stream_microphone(duration=30))Local environment only
pip install "orbitalsai[all]" to include sounddevice.Word-Level Timestamps
When using streaming (via AsyncStreamingClient), you can request per-word timing by passing return_timestamps=True in the StreamingConfig. When enabled, each final event includes a timestamps list with entries shaped { "start": float, "end": float, "text": str }. These timestamps are also stored in the database for completed sessions and can be retrieved via GET /api/v1/audio/status/{task_id}.
import asynciofrom orbitalsai.streaming import ( AsyncStreamingClient, StreamingConfig, StreamingEventHandlers,)class MyHandlers(StreamingEventHandlers): def on_final(self, text: str, cost: float, audio_seconds: float, timestamps=None) -> None: print(f"Final: {text}") if timestamps: for word in timestamps: print(f" {word['start']:.2f}s – {word['end']:.2f}s: {word['text']}")async def main() -> None: config = StreamingConfig( language="english", sample_rate=16000, return_timestamps=True, # Enable word-level timestamps ) async with AsyncStreamingClient(api_key="your_api_key_here", config=config) as client: await client.connect(MyHandlers()) with open("audio.pcm", "rb") as f: while chunk := f.read(16000): await client.send_audio(chunk) await client.flush()if __name__ == "__main__": asyncio.run(main())Timestamps in saved sessions
GET /api/v1/audio/status/{task_id} — the response includes a timestamps field with the same { start, end, text } format.StreamingConfig Reference
StreamingConfig controls audio, language, and connection settings for each session.
from orbitalsai.streaming import StreamingConfigconfig = StreamingConfig( sample_rate=16000, # 8000–48000 Hz (16kHz recommended) chunk_size=8000, # Samples per chunk (~500ms at 16kHz) language="english", # english, hausa, igbo, yoruba interim_results=True, # Receive partial transcripts auto_flush=True, # Auto-flush on silence max_retries=5, retry_delay=1.0, connection_timeout=30.0,)Reference
| Field | Type | Description |
|---|---|---|
sample_rate | int | Audio sample rate in Hz (8000–48000). |
chunk_size | int | Samples per chunk (8000 ≈ 500ms at 16kHz). |
language | str | Target language: english, hausa, igbo, yoruba. |
interim_results | bool | Receive partial transcripts as speech is recognized. |
auto_flush | bool | Server flushes automatically on silence. |
return_timestamps | bool | Include per-word timestamps in final events. |
max_retries | int | Max reconnection attempts after unexpected disconnect. |
retry_delay | float | Initial retry delay in seconds (exponential backoff). |
connection_timeout | float | WebSocket connection timeout in seconds. |
Streaming Error Handling
import asynciofrom orbitalsai.streaming import AsyncStreamingClient, StreamingEventHandlersfrom orbitalsai.streaming.exceptions import ( ConnectionError, AuthenticationError, InsufficientCreditsError, ReconnectionFailedError, SessionClosedError,)class MyHandlers(StreamingEventHandlers): def on_error(self, error: Exception) -> None: if isinstance(error, AuthenticationError): print("Invalid API key – check your ORBITALSAI_API_KEY.") elif isinstance(error, InsufficientCreditsError): print("Credits exhausted – please top up your account.") elif isinstance(error, ReconnectionFailedError): print(f"Failed to reconnect after {error.attempts} attempts.") else: print(f"Streaming error: {error}")async def main() -> None: try: async with AsyncStreamingClient(api_key="your_api_key_here") as client: await client.connect(MyHandlers()) await client.flush() except ConnectionError as e: print(f"Connection failed: {e}") except SessionClosedError: print("Session was closed; stop sending audio.")if __name__ == "__main__": asyncio.run(main())