Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
520 changes: 520 additions & 0 deletions docs/design/operation_telemetry.md

Large diffs are not rendered by default.

File renamed without changes.
17 changes: 14 additions & 3 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from openviking.client import LocalClient, Session
from openviking.service.debug_service import SystemStatus
from openviking.telemetry import TelemetryRequest
from openviking_cli.client.base import BaseClient
from openviking_cli.session.user_id import UserIdentifier
from openviking_cli.utils import get_logger
Expand Down Expand Up @@ -164,10 +165,12 @@ async def add_message(
session_id=session_id, role=role, content=content, parts=parts
)

async def commit_session(self, session_id: str) -> Dict[str, Any]:
async def commit_session(
self, session_id: str, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Commit a session (archive and extract memories)."""
await self._ensure_initialized()
return await self._client.commit_session(session_id)
return await self._client.commit_session(session_id, telemetry=telemetry)

# ============= Resource methods =============

Expand All @@ -182,6 +185,7 @@ async def add_resource(
timeout: float = None,
build_index: bool = True,
summarize: bool = False,
telemetry: TelemetryRequest = False,
**kwargs,
) -> Dict[str, Any]:
"""
Expand All @@ -196,10 +200,10 @@ async def add_resource(
parent: Target parent URI (must already exist).
build_index: Whether to build vector index immediately (default: True).
summarize: Whether to generate summary (default: False).
telemetry: Whether to attach operation telemetry data to the result.
"""
await self._ensure_initialized()

# Validate that only one of 'to' or 'parent' is set
if to and parent:
raise ValueError("Cannot specify both 'to' and 'parent' at the same time.")

Expand All @@ -213,6 +217,7 @@ async def add_resource(
timeout=timeout,
build_index=build_index,
summarize=summarize,
telemetry=telemetry,
**kwargs,
)

Expand Down Expand Up @@ -246,6 +251,7 @@ async def add_skill(
data: Any,
wait: bool = False,
timeout: float = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add skill to OpenViking.

Expand All @@ -258,6 +264,7 @@ async def add_skill(
data=data,
wait=wait,
timeout=timeout,
telemetry=telemetry,
)

# ============= Search methods =============
Expand All @@ -271,6 +278,7 @@ async def search(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict] = None,
telemetry: TelemetryRequest = False,
):
"""
Complex search with session context.
Expand All @@ -295,6 +303,7 @@ async def search(
limit=limit,
score_threshold=score_threshold,
filter=filter,
telemetry=telemetry,
)

async def find(
Expand All @@ -304,6 +313,7 @@ async def find(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict] = None,
telemetry: TelemetryRequest = False,
):
"""Semantic search"""
await self._ensure_initialized()
Expand All @@ -313,6 +323,7 @@ async def find(
limit=limit,
score_threshold=score_threshold,
filter=filter,
telemetry=telemetry,
)

# ============= FS methods =============
Expand Down
132 changes: 95 additions & 37 deletions openviking/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

from openviking.server.identity import RequestContext, Role
from openviking.service import OpenVikingService
from openviking.telemetry import TelemetryRequest
from openviking.telemetry.execution import (
attach_telemetry_payload,
run_with_telemetry,
)
from openviking_cli.client.base import BaseClient
from openviking_cli.session.user_id import UserIdentifier
from openviking_cli.utils import run_async
Expand Down Expand Up @@ -62,37 +67,58 @@ async def add_resource(
instruction: str = "",
wait: bool = False,
timeout: Optional[float] = None,
build_index: bool = True,
summarize: bool = False,
telemetry: TelemetryRequest = False,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking."""
# Validate that only one of 'to' or 'parent' is set
if to and parent:
raise ValueError("Cannot specify both 'to' and 'parent' at the same time.")

return await self._service.resources.add_resource(
path=path,
ctx=self._ctx,
to=to,
parent=parent,
reason=reason,
instruction=instruction,
wait=wait,
timeout=timeout,
**kwargs,
execution = await run_with_telemetry(
operation="resources.add_resource",
telemetry=telemetry,
fn=lambda: self._service.resources.add_resource(
path=path,
ctx=self._ctx,
to=to,
parent=parent,
reason=reason,
instruction=instruction,
wait=wait,
timeout=timeout,
build_index=build_index,
summarize=summarize,
**kwargs,
),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def add_skill(
self,
data: Any,
wait: bool = False,
timeout: Optional[float] = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add skill to OpenViking."""
return await self._service.resources.add_skill(
data=data,
ctx=self._ctx,
wait=wait,
timeout=timeout,
execution = await run_with_telemetry(
operation="resources.add_skill",
telemetry=telemetry,
fn=lambda: self._service.resources.add_skill(
data=data,
ctx=self._ctx,
wait=wait,
timeout=timeout,
),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]:
Expand Down Expand Up @@ -196,15 +222,24 @@ async def find(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict[str, Any]] = None,
telemetry: TelemetryRequest = False,
) -> Any:
"""Semantic search without session context."""
return await self._service.search.find(
query=query,
ctx=self._ctx,
target_uri=target_uri,
limit=limit,
score_threshold=score_threshold,
filter=filter,
execution = await run_with_telemetry(
operation="search.find",
telemetry=telemetry,
fn=lambda: self._service.search.find(
query=query,
ctx=self._ctx,
target_uri=target_uri,
limit=limit,
score_threshold=score_threshold,
filter=filter,
),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def search(
Expand All @@ -215,20 +250,33 @@ async def search(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict[str, Any]] = None,
telemetry: TelemetryRequest = False,
) -> Any:
"""Semantic search with optional session context."""
session = None
if session_id:
session = self._service.sessions.session(self._ctx, session_id)
await session.load()
return await self._service.search.search(
query=query,
ctx=self._ctx,
target_uri=target_uri,
session=session,
limit=limit,
score_threshold=score_threshold,
filter=filter,

async def _search():
session = None
if session_id:
session = self._service.sessions.session(self._ctx, session_id)
await session.load()
return await self._service.search.search(
query=query,
ctx=self._ctx,
target_uri=target_uri,
session=session,
limit=limit,
score_threshold=score_threshold,
filter=filter,
)

execution = await run_with_telemetry(
operation="search.search",
telemetry=telemetry,
fn=_search,
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def grep(self, uri: str, pattern: str, case_insensitive: bool = False) -> Dict[str, Any]:
Expand Down Expand Up @@ -284,9 +332,19 @@ async def delete_session(self, session_id: str) -> None:
"""Delete a session."""
await self._service.sessions.delete(session_id, self._ctx)

async def commit_session(self, session_id: str) -> Dict[str, Any]:
async def commit_session(
self, session_id: str, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Commit a session (archive and extract memories)."""
return await self._service.sessions.commit(session_id, self._ctx)
execution = await run_with_telemetry(
operation="session.commit",
telemetry=telemetry,
fn=lambda: self._service.sessions.commit(session_id, self._ctx),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def add_message(
self,
Expand Down
5 changes: 3 additions & 2 deletions openviking/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from openviking.message.part import Part
from openviking.telemetry import TelemetryRequest
from openviking_cli.session.user_id import UserIdentifier

if TYPE_CHECKING:
Expand Down Expand Up @@ -57,13 +58,13 @@ async def add_message(
return await self._client.add_message(self.session_id, role, parts=parts_dicts)
return await self._client.add_message(self.session_id, role, content=content)

async def commit(self) -> Dict[str, Any]:
async def commit(self, telemetry: TelemetryRequest = False) -> Dict[str, Any]:
"""Commit the session (archive messages and extract memories).

Returns:
Commit result
"""
return await self._client.commit_session(self.session_id)
return await self._client.commit_session(self.session_id, telemetry=telemetry)

async def delete(self) -> None:
"""Delete the session."""
Expand Down
32 changes: 32 additions & 0 deletions openviking/console/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import json
import re
from contextlib import asynccontextmanager
from pathlib import Path
Expand Down Expand Up @@ -46,6 +47,36 @@
}


def _is_json_content_type(content_type: str) -> bool:
value = (content_type or "").lower()
return "application/json" in value or "+json" in value


def _should_default_telemetry(upstream_path: str) -> bool:
if upstream_path in {"/api/v1/search/find", "/api/v1/resources"}:
return True
return upstream_path.startswith("/api/v1/sessions/") and upstream_path.endswith("/commit")


def _with_default_telemetry(request: Request, upstream_path: str, body: bytes) -> bytes:
if request.method.upper() != "POST":
return body
if not _should_default_telemetry(upstream_path):
return body
if not _is_json_content_type(request.headers.get("content-type", "")):
return body

try:
payload = json.loads(body.decode("utf-8")) if body else {}
except (json.JSONDecodeError, UnicodeDecodeError):
return body
if not isinstance(payload, dict):
return body

payload.setdefault("telemetry", True)
return json.dumps(payload).encode("utf-8")


def _error_response(status_code: int, code: str, message: str, details: Optional[dict] = None):
return JSONResponse(
status_code=status_code,
Expand Down Expand Up @@ -80,6 +111,7 @@ async def _forward_request(request: Request, upstream_path: str) -> Response:
"""Forward the incoming request to OpenViking upstream."""
client: httpx.AsyncClient = request.app.state.upstream_client
body = await request.body()
body = _with_default_telemetry(request, upstream_path, body)
try:
upstream_response = await client.request(
method=request.method,
Expand Down
2 changes: 1 addition & 1 deletion openviking/console/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _build_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--request-timeout-sec",
type=float,
default=30.0,
default=3600.0,
help="Upstream request timeout in seconds",
)
parser.add_argument(
Expand Down
Loading
Loading