Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Dict, Optional

from openviking.models.embedder.base import EmbedResult
from openviking.models.embedder.volcengine_embedders import is_429_error
from openviking.server.identity import RequestContext, Role
from openviking.storage.errors import CollectionNotFoundError
from openviking.storage.queuefs.embedding_msg import EmbeddingMsg
Expand Down Expand Up @@ -192,7 +193,6 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
self._initialize_embedder(config)

# Generate embedding vector(s)
is_rate_limit_error = False
if self._embedder:
try:
# embed() is a blocking HTTP call; offload to thread pool to avoid
Expand Down Expand Up @@ -221,12 +221,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
error_msg = f"Failed to generate embedding: {e}"
logger.error(error_msg)

# 检查是否是限流错误
from openviking.models.embedder.volcengine_embedders import is_429_error

if is_429_error(e):
is_rate_limit_error = True
logger.info("Rate limit error detected, will attempt to re-enqueue")
if is_429_error(e) and self._vikingdb.has_queue_manager:
try:
await self._vikingdb.enqueue_embedding_msg(embedding_msg)
logger.info(f"Re-enqueued embedding message: {embedding_msg.id}")
self.report_success()
return None
except Exception as requeue_err:
logger.error(f"Failed to re-enqueue message: {requeue_err}")

self.report_error(error_msg, data)
return None
Expand Down Expand Up @@ -283,24 +285,6 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
self.report_error(str(db_err), data)
return None

# 如果是限流错误,尝试重新入队
if is_rate_limit_error:
try:
# 安全地检查和使用 queue_manager
has_queue_manager = getattr(self._vikingdb, "has_queue_manager", False)
if has_queue_manager:
enqueue_func = getattr(self._vikingdb, "enqueue_embedding_msg", None)
if enqueue_func:
await enqueue_func(embedding_msg)
logger.info(
f"Successfully re-enqueued embedding message: {embedding_msg.id}"
)
# 报告成功,因为我们已经重新入队了
self.report_success()
return None
except Exception as requeue_err:
logger.error(f"Failed to re-enqueue message: {requeue_err}")

self.report_success()
return inserted_data

Expand Down
7 changes: 7 additions & 0 deletions openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,13 @@ async def get_stats(self) -> Dict[str, Any]:
def is_closing(self) -> bool:
return False

@property
def has_queue_manager(self) -> bool:
return False

async def enqueue_embedding_msg(self, _embedding_msg) -> bool:
raise NotImplementedError("Queue management requires VikingDBManager")

# =========================================================================
# Tenant-Aware 方法(保持向后兼容)
# =========================================================================
Expand Down
2 changes: 0 additions & 2 deletions tests/session/test_session_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ async def test_active_count_incremented_after_commit(self, client_with_resource_

# Look up the record by URI
records_before = await vikingdb.get_context_by_uri(
account_id="default",
uri=uri,
limit=1,
ctx=ctx,
Expand All @@ -120,7 +119,6 @@ async def test_active_count_incremented_after_commit(self, client_with_resource_

# Verify the count actually changed in storage
records_after = await vikingdb.get_context_by_uri(
account_id="default",
uri=uri,
limit=1,
ctx=ctx,
Expand Down
187 changes: 0 additions & 187 deletions tests/unit/session/test_deduplicator_uri.py
Original file line number Diff line number Diff line change
@@ -1,194 +1,7 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import AsyncMock, MagicMock

import pytest

from openviking.session.memory_deduplicator import MemoryDeduplicator
from openviking.session.memory_extractor import CandidateMemory, MemoryCategory
from tests.utils.mock_context import make_test_ctx, make_test_user

ctx = make_test_ctx()


class _DummyEmbedResult:
def __init__(self, dense_vector):
self.dense_vector = dense_vector


class _DummyEmbedder:
def embed(self, _text):
return _DummyEmbedResult([0.1, 0.2, 0.3])


def _make_candidate(category: MemoryCategory = MemoryCategory.PREFERENCES) -> CandidateMemory:
return CandidateMemory(
category=category,
abstract="User prefers concise summaries",
overview="User asks for concise answers frequently.",
content="The user prefers concise summaries over long explanations.",
source_session="session_test",
user=make_test_user(),
language="en",
)


def _make_existing_user_memory(uri_suffix: str = "existing.md") -> dict:
user_space = make_test_user().user_space_name()
return {
"id": f"uri_{uri_suffix}",
"uri": f"viking://user/{user_space}/memories/preferences/{uri_suffix}",
"context_type": "memory",
"level": 2,
"account_id": "acc1",
"owner_space": user_space,
"abstract": "Existing preference memory",
"category": "preferences",
"_score": 0.85,
}


def _make_existing_agent_memory(uri_suffix: str = "case1.md") -> dict:
user = make_test_user()
agent_space = user.agent_space_name()
return {
"id": f"uri_{uri_suffix}",
"uri": f"viking://agent/{agent_space}/memories/cases/{uri_suffix}",
"context_type": "memory",
"level": 2,
"account_id": "acc1",
"owner_space": agent_space,
"abstract": "Existing case memory",
"category": "cases",
"_score": 0.90,
}


@pytest.mark.asyncio
class TestFindSimilarMemoriesURIConversion:
async def test_user_uri_converted_to_temp_uri(self):
vikingdb = MagicMock()
vikingdb.get_embedder.return_value = _DummyEmbedder()
vikingdb.search_similar_memories = AsyncMock(
return_value=[_make_existing_user_memory("pref1.md")]
)

dedup = MemoryDeduplicator(vikingdb=vikingdb)
candidate = _make_candidate()

user_temp_uri = "viking://user/temp_user_123"
similar = await dedup._find_similar_memories(
candidate,
ctx,
user_temp_uri=user_temp_uri,
agent_temp_uri=None,
)

assert len(similar) == 1
user_space = make_test_user().user_space_name()
original_uri = f"viking://user/{user_space}/memories/preferences/pref1.md"
expected_uri = f"{user_temp_uri}/memories/preferences/pref1.md"
assert similar[0].uri == expected_uri
assert similar[0].uri != original_uri

async def test_agent_uri_converted_to_temp_uri(self):
vikingdb = MagicMock()
vikingdb.get_embedder.return_value = _DummyEmbedder()
vikingdb.search_similar_memories = AsyncMock(
return_value=[_make_existing_agent_memory("case1.md")]
)

dedup = MemoryDeduplicator(vikingdb=vikingdb)
candidate = _make_candidate(category=MemoryCategory.CASES)

agent_temp_uri = "viking://agent/temp_agent_456"
similar = await dedup._find_similar_memories(
candidate,
ctx,
user_temp_uri=None,
agent_temp_uri=agent_temp_uri,
)

assert len(similar) == 1
user = make_test_user()
agent_space = user.agent_space_name()
original_uri = f"viking://agent/{agent_space}/memories/cases/case1.md"
expected_uri = f"{agent_temp_uri}/memories/cases/case1.md"
assert similar[0].uri == expected_uri
assert similar[0].uri != original_uri

async def test_no_conversion_when_no_temp_uri(self):
vikingdb = MagicMock()
vikingdb.get_embedder.return_value = _DummyEmbedder()
vikingdb.search_similar_memories = AsyncMock(
return_value=[_make_existing_user_memory("pref1.md")]
)

dedup = MemoryDeduplicator(vikingdb=vikingdb)
candidate = _make_candidate()

similar = await dedup._find_similar_memories(
candidate,
ctx,
user_temp_uri=None,
agent_temp_uri=None,
)

assert len(similar) == 1
user_space = make_test_user().user_space_name()
expected_uri = f"viking://user/{user_space}/memories/preferences/pref1.md"
assert similar[0].uri == expected_uri

async def test_mixed_uris_only_convert_matching_type(self):
agent_space = make_test_user().agent_space_name()

vikingdb = MagicMock()
vikingdb.get_embedder.return_value = _DummyEmbedder()
vikingdb.search_similar_memories = AsyncMock(
return_value=[
_make_existing_user_memory("pref1.md"),
_make_existing_agent_memory("case1.md"),
]
)

dedup = MemoryDeduplicator(vikingdb=vikingdb)
candidate = _make_candidate()

user_temp_uri = "viking://user/temp_user_123"
similar = await dedup._find_similar_memories(
candidate,
ctx,
user_temp_uri=user_temp_uri,
agent_temp_uri=None,
)

assert len(similar) == 2
uris = {m.uri for m in similar}
assert f"{user_temp_uri}/memories/preferences/pref1.md" in uris
assert f"viking://agent/{agent_space}/memories/cases/case1.md" in uris

async def test_uri_conversion_preserves_meta_and_score(self):
vikingdb = MagicMock()
vikingdb.get_embedder.return_value = _DummyEmbedder()
vikingdb.search_similar_memories = AsyncMock(
return_value=[_make_existing_user_memory("pref1.md")]
)

dedup = MemoryDeduplicator(vikingdb=vikingdb)
candidate = _make_candidate()

user_temp_uri = "viking://user/temp_user_123"
similar = await dedup._find_similar_memories(
candidate,
ctx,
user_temp_uri=user_temp_uri,
agent_temp_uri=None,
)

assert len(similar) == 1
assert similar[0].meta is not None
assert similar[0].meta.get("_dedup_score") == 0.85


class TestExtractFacetKey:
Expand Down
Loading