From e743b8869a5a1e1f1da9ef1c6caa8dfff79e5ee4 Mon Sep 17 00:00:00 2001 From: Classic298 <27028174+Classic298@users.noreply.github.com> Date: Thu, 13 Nov 2025 10:01:29 +0100 Subject: [PATCH 1/2] Update utils.py --- backend/open_webui/retrieval/utils.py | 423 ++++++++++++++------------ 1 file changed, 221 insertions(+), 202 deletions(-) diff --git a/backend/open_webui/retrieval/utils.py b/backend/open_webui/retrieval/utils.py index f20884a4d2b..1426cd99a49 100644 --- a/backend/open_webui/retrieval/utils.py +++ b/backend/open_webui/retrieval/utils.py @@ -2,7 +2,8 @@ import os from typing import Optional, Union -import requests +import aiohttp +import asyncio import hashlib from concurrent.futures import ThreadPoolExecutor import time @@ -160,18 +161,10 @@ def query_doc_with_hybrid_search( hybrid_bm25_weight: float, ) -> dict: try: - # First check if collection_result has the required attributes if ( not collection_result or not hasattr(collection_result, "documents") - or not hasattr(collection_result, "metadatas") - ): - log.warning(f"query_doc_with_hybrid_search:no_docs {collection_name}") - return {"documents": [], "metadatas": [], "distances": []} - - # Now safely check the documents content after confirming attributes exist - if ( - not collection_result.documents + or not collection_result.documents or len(collection_result.documents) == 0 or not collection_result.documents[0] ): @@ -464,6 +457,168 @@ def process_query(collection_name, query): return merge_and_sort_query_results(results, k=k) +async def generate_openai_batch_embeddings_async( + model: str, + texts: list[str], + url: str = "https://api.openai.com/v1", + key: str = "", + prefix: str = None, + user: UserModel = None, +) -> Optional[list[list[float]]]: + try: + log.debug( + f"generate_openai_batch_embeddings_async:model {model} batch size: {len(texts)}" + ) + json_data = {"input": texts, "model": model} + if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): + json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {key}", + } + if ENABLE_FORWARD_USER_INFO_HEADERS and user: + headers.update({ + "X-OpenWebUI-User-Name": quote(user.name, safe=" "), + "X-OpenWebUI-User-Id": user.id, + "X-OpenWebUI-User-Email": user.email, + "X-OpenWebUI-User-Role": user.role, + }) + + async with aiohttp.ClientSession() as session: + async with session.post(f"{url}/embeddings", headers=headers, json=json_data) as r: + r.raise_for_status() + data = await r.json() + if "data" in data: + return [elem["embedding"] for elem in data["data"]] + else: + raise Exception("Something went wrong :/") + except Exception as e: + log.exception(f"Error generating openai batch embeddings: {e}") + return None + + +async def generate_azure_openai_batch_embeddings_async( + model: str, + texts: list[str], + url: str, + key: str = "", + version: str = "", + prefix: str = None, + user: UserModel = None, +) -> Optional[list[list[float]]]: + try: + log.debug( + f"generate_azure_openai_batch_embeddings_async:deployment {model} batch size: {len(texts)}" + ) + json_data = {"input": texts} + if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): + json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix + + full_url = f"{url}/openai/deployments/{model}/embeddings?api-version={version}" + + headers = { + "Content-Type": "application/json", + "api-key": key, + } + if ENABLE_FORWARD_USER_INFO_HEADERS and user: + headers.update({ + "X-OpenWebUI-User-Name": quote(user.name, safe=" "), + "X-OpenWebUI-User-Id": user.id, + "X-OpenWebUI-User-Email": user.email, + "X-OpenWebUI-User-Role": user.role, + }) + + for attempt in range(5): + async with aiohttp.ClientSession() as session: + async with session.post(full_url, headers=headers, json=json_data) as r: + if r.status == 429: + retry_after = float(r.headers.get("Retry-After", "1")) + log.warning(f"Rate limited, retrying after {retry_after}s") + await asyncio.sleep(retry_after) + continue + r.raise_for_status() + data = await r.json() + if "data" in data: + return [elem["embedding"] for elem in data["data"]] + else: + raise Exception("Something went wrong :/") + return None + except Exception as e: + log.exception(f"Error generating azure openai batch embeddings: {e}") + return None + + +async def generate_ollama_batch_embeddings_async( + model: str, + texts: list[str], + url: str, + key: str = "", + prefix: str = None, + user: UserModel = None, +) -> Optional[list[list[float]]]: + try: + log.debug( + f"generate_ollama_batch_embeddings_async:model {model} batch size: {len(texts)}" + ) + json_data = {"input": texts, "model": model} + if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): + json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {key}", + } + if ENABLE_FORWARD_USER_INFO_HEADERS and user: + headers.update({ + "X-OpenWebUI-User-Name": quote(user.name, safe=" "), + "X-OpenWebUI-User-Id": user.id, + "X-OpenWebUI-User-Email": user.email, + "X-OpenWebUI-User-Role": user.role, + }) + + async with aiohttp.ClientSession() as session: + async with session.post(f"{url}/api/embed", headers=headers, json=json_data) as r: + r.raise_for_status() + data = await r.json() + if "embeddings" in data: + return data["embeddings"] + else: + raise Exception("Something went wrong :/") + except Exception as e: + log.exception(f"Error generating ollama batch embeddings: {e}") + return None + + +async def generate_multiple_async(query, prefix, user, func_async, embedding_batch_size): + if isinstance(query, list): + # Create batches + batches = [ + query[i : i + embedding_batch_size] + for i in range(0, len(query), embedding_batch_size) + ] + + log.debug(f"generate_multiple_async: Processing {len(batches)} batches in parallel") + + # Execute all batches in parallel + tasks = [ + func_async(batch, prefix=prefix, user=user) + for batch in batches + ] + batch_results = await asyncio.gather(*tasks) + + # Flatten results + embeddings = [] + for batch_embeddings in batch_results: + if isinstance(batch_embeddings, list): + embeddings.extend(batch_embeddings) + + log.debug(f"generate_multiple_async: Generated {len(embeddings)} embeddings from {len(batches)} parallel batches") + return embeddings + else: + return await func_async([query], prefix=prefix, user=user) + + def get_embedding_function( embedding_engine, embedding_model, @@ -478,36 +633,46 @@ def get_embedding_function( query, **({"prompt": prefix} if prefix else {}) ).tolist() elif embedding_engine in ["ollama", "openai", "azure_openai"]: - func = lambda query, prefix=None, user=None: generate_embeddings( - engine=embedding_engine, - model=embedding_model, - text=query, - prefix=prefix, - url=url, - key=key, - user=user, - azure_api_version=azure_api_version, - ) + # Create async function based on engine + if embedding_engine == "openai": + func_async = lambda texts, prefix=None, user=None: generate_openai_batch_embeddings_async( + model=embedding_model, + texts=texts, + url=url, + key=key, + prefix=prefix, + user=user, + ) + elif embedding_engine == "azure_openai": + func_async = lambda texts, prefix=None, user=None: generate_azure_openai_batch_embeddings_async( + model=embedding_model, + texts=texts, + url=url, + key=key, + version=azure_api_version, + prefix=prefix, + user=user, + ) + elif embedding_engine == "ollama": + func_async = lambda texts, prefix=None, user=None: generate_ollama_batch_embeddings_async( + model=embedding_model, + texts=texts, + url=url, + key=key, + prefix=prefix, + user=user, + ) - def generate_multiple(query, prefix, user, func): + # Return synchronous wrapper that runs async function + def embedding_wrapper(query, prefix=None, user=None): if isinstance(query, list): - embeddings = [] - for i in range(0, len(query), embedding_batch_size): - batch_embeddings = func( - query[i : i + embedding_batch_size], - prefix=prefix, - user=user, - ) - - if isinstance(batch_embeddings, list): - embeddings.extend(batch_embeddings) - return embeddings + return asyncio.run( + generate_multiple_async(query, prefix, user, func_async, embedding_batch_size) + ) else: - return func(query, prefix, user) - - return lambda query, prefix=None, user=None: generate_multiple( - query, prefix, user, func - ) + return asyncio.run(func_async([query], prefix=prefix, user=user))[0] + + return embedding_wrapper else: raise ValueError(f"Unknown embedding engine: {embedding_engine}") @@ -516,13 +681,11 @@ def get_reranking_function(reranking_engine, reranking_model, reranking_function if reranking_function is None: return None if reranking_engine == "external": - return lambda query, documents, user=None: reranking_function.predict( - [(query, doc.page_content) for doc in documents], user=user + return lambda sentences, user=None: reranking_function.predict( + sentences, user=user ) else: - return lambda query, documents, user=None: reranking_function.predict( - [(query, doc.page_content) for doc in documents] - ) + return lambda sentences, user=None: reranking_function.predict(sentences) def get_sources_from_items( @@ -846,151 +1009,6 @@ def get_model_path(model: str, update_model: bool = False): return model -def generate_openai_batch_embeddings( - model: str, - texts: list[str], - url: str = "https://api.openai.com/v1", - key: str = "", - prefix: str = None, - user: UserModel = None, -) -> Optional[list[list[float]]]: - try: - log.debug( - f"generate_openai_batch_embeddings:model {model} batch size: {len(texts)}" - ) - json_data = {"input": texts, "model": model} - if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): - json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix - - r = requests.post( - f"{url}/embeddings", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {key}", - **( - { - "X-OpenWebUI-User-Name": quote(user.name, safe=" "), - "X-OpenWebUI-User-Id": user.id, - "X-OpenWebUI-User-Email": user.email, - "X-OpenWebUI-User-Role": user.role, - } - if ENABLE_FORWARD_USER_INFO_HEADERS and user - else {} - ), - }, - json=json_data, - ) - r.raise_for_status() - data = r.json() - if "data" in data: - return [elem["embedding"] for elem in data["data"]] - else: - raise "Something went wrong :/" - except Exception as e: - log.exception(f"Error generating openai batch embeddings: {e}") - return None - - -def generate_azure_openai_batch_embeddings( - model: str, - texts: list[str], - url: str, - key: str = "", - version: str = "", - prefix: str = None, - user: UserModel = None, -) -> Optional[list[list[float]]]: - try: - log.debug( - f"generate_azure_openai_batch_embeddings:deployment {model} batch size: {len(texts)}" - ) - json_data = {"input": texts} - if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): - json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix - - url = f"{url}/openai/deployments/{model}/embeddings?api-version={version}" - - for _ in range(5): - r = requests.post( - url, - headers={ - "Content-Type": "application/json", - "api-key": key, - **( - { - "X-OpenWebUI-User-Name": quote(user.name, safe=" "), - "X-OpenWebUI-User-Id": user.id, - "X-OpenWebUI-User-Email": user.email, - "X-OpenWebUI-User-Role": user.role, - } - if ENABLE_FORWARD_USER_INFO_HEADERS and user - else {} - ), - }, - json=json_data, - ) - if r.status_code == 429: - retry = float(r.headers.get("Retry-After", "1")) - time.sleep(retry) - continue - r.raise_for_status() - data = r.json() - if "data" in data: - return [elem["embedding"] for elem in data["data"]] - else: - raise Exception("Something went wrong :/") - return None - except Exception as e: - log.exception(f"Error generating azure openai batch embeddings: {e}") - return None - - -def generate_ollama_batch_embeddings( - model: str, - texts: list[str], - url: str, - key: str = "", - prefix: str = None, - user: UserModel = None, -) -> Optional[list[list[float]]]: - try: - log.debug( - f"generate_ollama_batch_embeddings:model {model} batch size: {len(texts)}" - ) - json_data = {"input": texts, "model": model} - if isinstance(RAG_EMBEDDING_PREFIX_FIELD_NAME, str) and isinstance(prefix, str): - json_data[RAG_EMBEDDING_PREFIX_FIELD_NAME] = prefix - - r = requests.post( - f"{url}/api/embed", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {key}", - **( - { - "X-OpenWebUI-User-Name": quote(user.name, safe=" "), - "X-OpenWebUI-User-Id": user.id, - "X-OpenWebUI-User-Email": user.email, - "X-OpenWebUI-User-Role": user.role, - } - if ENABLE_FORWARD_USER_INFO_HEADERS - else {} - ), - }, - json=json_data, - ) - r.raise_for_status() - data = r.json() - - if "embeddings" in data: - return data["embeddings"] - else: - raise "Something went wrong :/" - except Exception as e: - log.exception(f"Error generating ollama batch embeddings: {e}") - return None - - def generate_embeddings( engine: str, model: str, @@ -1008,26 +1026,25 @@ def generate_embeddings( else: text = f"{prefix}{text}" + # Run async embedding generation synchronously if engine == "ollama": - embeddings = generate_ollama_batch_embeddings( - **{ - "model": model, - "texts": text if isinstance(text, list) else [text], - "url": url, - "key": key, - "prefix": prefix, - "user": user, - } - ) + embeddings = asyncio.run(generate_ollama_batch_embeddings_async( + model=model, + texts=text if isinstance(text, list) else [text], + url=url, + key=key, + prefix=prefix, + user=user, + )) return embeddings[0] if isinstance(text, str) else embeddings elif engine == "openai": - embeddings = generate_openai_batch_embeddings( + embeddings = asyncio.run(generate_openai_batch_embeddings_async( model, text if isinstance(text, list) else [text], url, key, prefix, user - ) + )) return embeddings[0] if isinstance(text, str) else embeddings elif engine == "azure_openai": azure_api_version = kwargs.get("azure_api_version", "") - embeddings = generate_azure_openai_batch_embeddings( + embeddings = asyncio.run(generate_azure_openai_batch_embeddings_async( model, text if isinstance(text, list) else [text], url, @@ -1035,7 +1052,7 @@ def generate_embeddings( azure_api_version, prefix, user, - ) + )) return embeddings[0] if isinstance(text, str) else embeddings @@ -1066,7 +1083,9 @@ def compress_documents( scores = None if reranking: - scores = self.reranking_function(query, documents) + scores = self.reranking_function( + [(query, doc.page_content) for doc in documents] + ) else: from sentence_transformers import util From 769e1e589813c00b15fd68896ab03369f57f86d0 Mon Sep 17 00:00:00 2001 From: Classic298 <27028174+Classic298@users.noreply.github.com> Date: Thu, 13 Nov 2025 22:13:54 +0100 Subject: [PATCH 2/2] Update utils.py --- backend/open_webui/retrieval/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/open_webui/retrieval/utils.py b/backend/open_webui/retrieval/utils.py index 1426cd99a49..cd1921ebc95 100644 --- a/backend/open_webui/retrieval/utils.py +++ b/backend/open_webui/retrieval/utils.py @@ -164,6 +164,7 @@ def query_doc_with_hybrid_search( if ( not collection_result or not hasattr(collection_result, "documents") + or not hasattr(collection_result, "metadatas") or not collection_result.documents or len(collection_result.documents) == 0 or not collection_result.documents[0]