This document provides a high-level technical overview of the Omi system, an open-source AI wearable platform for capturing, transcribing, and processing conversations. It covers the architecture spanning hardware devices, mobile/desktop applications, backend services, and the plugin ecosystem.
For detailed information on specific subsystems:
Omi captures audio from wearable devices, streams it to mobile apps via BLE, transcribes conversations in real-time, processes them with LLMs to extract insights, and provides an AI assistant interface. The system supports multiple hardware types, offline operation, end-to-end encryption, and extensibility through a plugin marketplace.
Sources: README.md1-105 backend/main.py1-136 app/lib/main.dart1-513
The system follows a five-layer architecture with clear separation of concerns:
Diagram: Five-Layer System Architecture with Code Entities
The architecture separates hardware capture, client-side state management, server-side processing, persistent storage, and external service integrations. Device types are identified by BtDevice.type enum values. Audio flows through getBleAudioBytesListener() to CaptureProvider.streamAudioToWs(), which creates a TranscriptSegmentSocketService connection to _stream_handler() on the backend. The handler calls STT processing functions like process_audio_dg() or process_audio_soniox() and stores results via conversations_db.upsert_conversation().
Sources: backend/routers/transcribe.py115-128 app/lib/providers/capture_provider.dart626-674 backend/main.py58-99 backend/database/conversations.py1-50 app/lib/backend/schema/bt_device/bt_device.dart1-100
Diagram: Real-time Audio Processing Flow with Function Calls
Audio captured by the device is OPUS-encoded and streamed via BLE to CaptureProvider.getBleAudioBytesListener(). The app extracts bytes with value.sublist(3) (removing 3-byte padding) and sends to the WAL service via _wal.onByteStream() for offline backup. Decoded audio is forwarded to _stream_handler() which waits for speech_profile_complete event before streaming to STT. The handler populates realtime_segment_buffers and sends MessageEvent JSON to the client. After conversation_creation_timeout (120s default), _process_conversation() triggers either Pusher processing or local _create_conversation_fallback(), which calls process_conversation() for LLM extraction and upsert_vector() for Pinecone storage.
Sources: backend/routers/transcribe.py115-656 app/lib/providers/capture_provider.dart626-674 backend/utils/conversations/process_conversation.py1-100 backend/database/conversations.py1-100
| Component | Technology | Role |
|---|---|---|
| Omi Device | nRF5340 + Zephyr RTOS | Primary wearable, dual-mic capture, OPUS encoding, BLE streaming |
| Omi DevKit | nRF52840 | Development kit for prototyping |
| Omi Glass | ESP32-S3 | Smart glasses variant with camera |
| Apple Watch | WatchOS | Companion app for iOS ecosystem |
All devices stream audio to the mobile app via BLE, with the Omi Device supporting offline SD card storage when disconnected.
Sources: README.md70-75 Diagram 1 from high-level diagrams
Built with Flutter, supporting iOS, Android, macOS, and Windows. Uses Provider pattern for state management with offline-first design via ChangeNotifierProvider and ChangeNotifierProxyProvider dependency injection app/lib/main.dart317-387
| Provider | File | Key Methods/Properties | Dependencies |
|---|---|---|---|
CaptureProvider | app/lib/providers/capture_provider.dart | streamDeviceRecording(), streamAudioToWs(), _initiateWebsocket(), recordingState | ConversationProvider, MessageProvider, PeopleProvider, UsageProvider |
ConversationProvider | app/lib/providers/conversation_provider.dart | getInitialConversations(), refreshConversations(), conversations list | None |
MessageProvider | app/lib/providers/message_provider.dart1-100 | sendMessage(), refreshMessages(), messages list | AppProvider |
DeviceProvider | app/lib/providers/device_provider.dart | scanAndConnectToDevice(), periodicConnect(), connectedDevice | CaptureProvider |
AppProvider | app/lib/providers/app_provider.dart | getApps(), enableApp(), disableApp(), apps list | None |
Data is cached locally in SharedPreferencesUtil app/lib/backend/preferences.dart16-350 with properties like btDevice, cachedConversations, cachedMessages for offline access. The app maintains a Write-Ahead Log (WAL) via IWalService interface app/lib/services/wals.dart1-25 accessed through ServiceManager.instance().wal for audio chunks on supported devices (Omi, OpenGlass with OPUS codec).
Sources: app/lib/main.dart317-387 app/lib/providers/capture_provider.dart62-237 app/lib/backend/preferences.dart16-350 app/lib/services/wals.dart1-25
Built with Python 3.11+ and FastAPI, deployed on Google Cloud Run. Core routers defined in backend/main.py58-99 and mounted to the api() FastAPI app backend/main.py119-136:
| Router | File | Key Endpoints/Functions | Purpose |
|---|---|---|---|
transcribe.router | backend/routers/transcribe.py | @router.websocket('/v1/listen')_stream_handler() line 115 | WebSocket transcription with STT streaming |
conversations.router | backend/routers/conversations.py | @router.get('/v1/conversations')get_conversations() | Conversation CRUD + processing |
chat.router | backend/routers/chat.py66-200 | @router.post('/v2/messages')send_message() returns StreamingResponse | Streaming AI chat with LangGraph |
apps.router | backend/routers/apps.py198-277 | @router.get('/v1/apps')@router.get('/v2/apps')get_apps_v2() | App marketplace with capability filtering |
memories.router | backend/routers/memories.py30-108 | @router.get('/v3/memories')get_memories() returns List[MemoryDB] | Memory CRUD with encryption support |
The backend uses LangGraph state machine via execute_graph_chat_stream() backend/utils/retrieval/graph.py1-250 which routes through determine_conversation_type() to select between simple response, persona chat, or agentic RAG with tool calling. The graph state is defined in GraphState(TypedDict) backend/utils/retrieval/graph.py95-118
Sources: backend/main.py58-136 backend/routers/transcribe.py101-115 backend/routers/chat.py66-200 backend/utils/retrieval/graph.py95-150
Diagram: Multi-Tier Storage System
The system implements a sophisticated caching strategy:
SharedPreferencesUtil stores conversations, messages, and app data for instant offline accessns1 for conversations, ns2 for memories)Sources: backend/database/redis_db.py1-100 backend/database/vector_db.py1-50 backend/utils/other/storage.py1-100 Diagram 5 from high-level diagrams
Sources: backend/requirements.txt1-200 app/lib/main.dart1-300 README.md70-86
The system supports multiple data protection levels configured per-user:
| Level | Description | Implementation |
|---|---|---|
| Standard | Server-side encryption at rest | Firestore default encryption |
| Enhanced | Per-field client-side encryption | AES with user-derived keys utils/encryption.py |
| E2EE (planned) | End-to-end encryption | Zero-knowledge architecture |
Decorators @prepare_for_write and @prepare_for_read backend/database/helpers.py automatically handle encryption/decryption for conversations backend/database/conversations.py memories backend/database/memories.py20-56 and chat messages backend/database/chat.py20-47
Authentication uses Firebase Auth with JWT tokens refreshed when expiring within 5 minutes app/lib/providers/auth_provider.dart
Sources: backend/database/memories.py20-56 backend/database/chat.py20-47 Diagram 7 from high-level diagrams
Omi supports four plugin types:
Plugins declare capabilities (memories, chat, external_integration), triggers (memory_creation, transcript_processed, audio_bytes), and actions (create_conversation, read_memories, etc.) backend/routers/apps.py400-450
The plugin execution engine backend/utils/app_integrations.py filters by capabilities, selects apps (AI-based or user-preferred), executes LLM processing, and stores results.
Sources: backend/routers/apps.py1-700 backend/utils/app_integrations.py Diagram 6 from high-level diagrams
Diagram: WebSocket Communication Architecture with Code Flow
The CaptureProvider._initiateWebsocket() app/lib/providers/capture_provider.dart424-475 creates a TranscriptSegmentSocketService that connects to _stream_handler() backend/routers/transcribe.py115-128 The handler uses mapCodecToSampleRate(codec) to determine audio format (supporting BleAudioCodec.opus, BleAudioCodec.pcm8, BleAudioCodec.pcm16, BleAudioCodec.lc3) and calls process_audio_soniox() or process_audio_dg() backend/utils/stt/streaming.py1-250 Transcripts arrive via stream_transcript(segments) callback and populate realtime_segment_buffers (a bounded deque with maxlen=MAX_SEGMENT_BUFFER_SIZE). The _process_segments() async loop sends MessageEvent.to_json() back to the client.
Connection health is maintained via:
send_heartbeat() task sending "ping" every 10s backend/routers/transcribe.py403-433inactivity_timeout_seconds = 90 checking last_activity_time backend/routers/transcribe.py397-423speech_profile_complete event controlling STT socket routing backend/routers/transcribe.py699-732_record_usage_periodically() for freemium credit monitoring backend/routers/transcribe.py300-373Sources: backend/routers/transcribe.py115-433 app/lib/providers/capture_provider.dart424-475 backend/utils/stt/streaming.py1-250
After conversation_creation_timeout (configurable via user setting, default 120s backend/routers/transcribe.py456-462), conversations enter post-processing:
Diagram: Conversation Processing Pipeline Flow
The pipeline stages:
_process_conversation() called after silence timeout backend/routers/transcribe.py601-614PUSHER_ENABLED environment variable to route to Pusher service or local fallback backend/routers/transcribe.py607-612get_cached_user_geolocation(uid) from Redis, then get_google_maps_location() backend/routers/transcribe.py488-491process_conversation() backend/utils/conversations/process_conversation.py1-100 calls qa_rag() to extract structured data (title, overview, action_items, category)upsert_vector(uid, conversation.id, transcript, category) to Pinecone namespace ns1 backend/database/vector_db.py1-100trigger_external_integrations() backend/utils/app_integrations.py1-100 filters apps with matching capabilities and posts webhookstrigger_webhook() POSTs conversation JSON to user-configured URLs backend/utils/webhooks.py1-100ConversationEvent(event_type='memory_created') sent to client via _send_message_event() backend/routers/transcribe.py466-476The conversation status enum progresses: ConversationStatus.in_progress → ConversationStatus.processing → ConversationStatus.completed or ConversationStatus.discarded backend/models/conversation.py1-50
Sources: backend/routers/transcribe.py456-614 backend/utils/conversations/process_conversation.py1-100 backend/database/vector_db.py1-100 backend/utils/app_integrations.py1-100
| Entry Point | Function/File | Initialization Steps | Purpose |
|---|---|---|---|
| Mobile App | main() app/lib/main.dart211-241 | _init() → Firebase.initializeApp() → ServiceManager.init() → NotificationService.instance.initialize() → SharedPreferencesUtil.init() → runApp(MyApp()) | Initializes Firebase, service manager, notifications, local storage, and starts Flutter widget tree |
| Backend API | api() backend/main.py119-136 | app = FastAPI() → app.include_router() for 35+ routers → app.add_middleware(CORSMiddleware) | Creates FastAPI app, mounts routers, adds CORS/compression middleware |
| Provider Setup | _MyAppState.build() app/lib/main.dart316-388 | MultiProvider(providers: [...]) → 20+ providers with dependency injection via ChangeNotifierProxyProvider | Creates provider hierarchy with dependency injection |
| WebSocket | _stream_handler() backend/routers/transcribe.py115-128 | Validate uid → _initiateWebsocket() → process_audio_soniox() → send_heartbeat() task → _record_usage_periodically() task | Accepts WebSocket, starts STT streaming, heartbeat, and usage tracking |
| Chat Graph | execute_graph_chat_stream() backend/utils/retrieval/graph.py1-250 | Build StateGraph() → add nodes (determine_conversation, no_context_conversation, agentic_context_dependent_conversation, persona_question) → graph.compile(checkpointer=MemorySaver()) | Compiles LangGraph state machine for chat routing |
Sources: app/lib/main.dart124-241 app/lib/main.dart316-388 backend/main.py119-136 backend/routers/transcribe.py115-128 backend/utils/retrieval/graph.py1-250
This overview provides the foundational understanding of the Omi system architecture. For detailed documentation on specific subsystems, refer to the linked wiki pages at the beginning of this document.
Refresh this wiki
This wiki was recently refreshed. Please wait 1 day to refresh again.