Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions openviking/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
search_router,
sessions_router,
system_router,
tasks_router,
)
from openviking.service.core import OpenVikingService
from openviking.service.task_tracker import get_task_tracker
from openviking_cli.exceptions import OpenVikingError
from openviking_cli.utils import get_logger

Expand Down Expand Up @@ -83,9 +85,14 @@ async def lifespan(app: FastAPI):
config.host,
)

# Start TaskTracker cleanup loop
task_tracker = get_task_tracker()
task_tracker.start_cleanup_loop()

yield

# Cleanup
task_tracker.stop_cleanup_loop()
if service:
await service.close()
logger.info("OpenVikingService closed")
Expand Down Expand Up @@ -169,6 +176,7 @@ async def general_error_handler(request: Request, exc: Exception):
app.include_router(pack_router)
app.include_router(debug_router)
app.include_router(observer_router)
app.include_router(tasks_router)
app.include_router(bot_router, prefix="/bot/v1")

return app
2 changes: 2 additions & 0 deletions openviking/server/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from openviking.server.routers.search import router as search_router
from openviking.server.routers.sessions import router as sessions_router
from openviking.server.routers.system import router as system_router
from openviking.server.routers.tasks import router as tasks_router

__all__ = [
"admin_router",
Expand All @@ -28,4 +29,5 @@
"pack_router",
"debug_router",
"observer_router",
"tasks_router",
]
64 changes: 52 additions & 12 deletions openviking/server/routers/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from openviking.server.auth import get_request_context
from openviking.server.dependencies import get_service
from openviking.server.identity import RequestContext
from openviking.server.models import Response
from openviking.server.models import ErrorInfo, Response
from openviking.service.task_tracker import get_task_tracker

router = APIRouter(prefix="/api/v1/sessions", tags=["sessions"])
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -152,38 +153,77 @@ async def commit_session(
):
"""Commit a session (archive and extract memories).

When wait=False, the commit is processed in the background.
This is useful for avoiding blocking when the commit involves
LLM calls for memory extraction.
When wait=False, the commit is processed in the background and a
``task_id`` is returned. Use ``GET /tasks/{task_id}`` to poll for
completion status, results, or errors.

When wait=True (default), the commit blocks until complete and
returns the full result inline.
"""
service = get_service()
tracker = get_task_tracker()

if wait:
# Reject if same session already has a background commit running
if tracker.has_running("session_commit", session_id):
return Response(
status="error",
error=ErrorInfo(
code="CONFLICT",
message=f"Session {session_id} already has a commit in progress",
),
)
result = await service.sessions.commit_async(session_id, _ctx)
return Response(status="ok", result=result)

asyncio.create_task(_background_commit(service, session_id, _ctx))
# Atomically check + create to prevent race conditions
task = tracker.create_if_no_running("session_commit", session_id)
if task is None:
return Response(
status="error",
error=ErrorInfo(
code="CONFLICT",
message=f"Session {session_id} already has a commit in progress",
),
)
asyncio.create_task(_background_commit_tracked(service, session_id, _ctx, task.task_id))

return Response(
status="ok",
result={
"session_id": session_id,
"status": "accepted",
"task_id": task.task_id,
"message": "Commit is processing in the background",
},
)


async def _background_commit(service, session_id: str, ctx: RequestContext) -> None:
"""Run session commit in background."""
async def _background_commit_tracked(
service, session_id: str, ctx: RequestContext, task_id: str
) -> None:
"""Run session commit in background with task tracking."""
tracker = get_task_tracker()
tracker.start(task_id)
try:
result = await service.sessions.commit_async(session_id, ctx)
memories = result.get("memories_extracted", 0)
tracker.complete(
task_id,
{
"session_id": session_id,
"memories_extracted": result.get("memories_extracted", 0),
"archived": result.get("archived", False),
},
)
logger.info(
"Background commit completed: session=%s, memories=%d",
"Background commit completed: session=%s task=%s memories=%d",
session_id,
memories,
task_id,
result.get("memories_extracted", 0),
)
except Exception:
logger.exception("Background commit failed: session=%s", session_id)
except Exception as exc:
tracker.fail(task_id, str(exc))
logger.exception("Background commit failed: session=%s task=%s", session_id, task_id)


@router.post("/{session_id}/extract")
Expand Down
47 changes: 47 additions & 0 deletions openviking/server/routers/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Task tracking endpoints for OpenViking HTTP Server.

Provides observability for background operations (e.g. session commit
with ``wait=false``). Callers receive a ``task_id`` and can poll these
endpoints to check completion, results, or errors.
"""

from typing import Optional

from fastapi import APIRouter, HTTPException, Query

from openviking.server.models import Response
from openviking.service.task_tracker import get_task_tracker

router = APIRouter(prefix="/api/v1", tags=["tasks"])


@router.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""Get the status of a single background task."""
tracker = get_task_tracker()
task = tracker.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found or expired")
return Response(status="ok", result=task.to_dict())


@router.get("/tasks")
async def list_tasks(
task_type: Optional[str] = Query(None, description="Filter by task type (e.g. session_commit)"),
status: Optional[str] = Query(
None, description="Filter by status (pending/running/completed/failed)"
),
resource_id: Optional[str] = Query(None, description="Filter by resource ID (e.g. session_id)"),
limit: int = Query(50, le=200, description="Max results"),
):
"""List background tasks with optional filters."""
tracker = get_task_tracker()
tasks = tracker.list_tasks(
task_type=task_type,
status=status,
resource_id=resource_id,
limit=limit,
)
return Response(status="ok", result=[t.to_dict() for t in tasks])
Loading
Loading