最近在用openwebui的原生格式用gpt5总是会报一些错误,很奇怪,就写了一个函数来调用,目前没看到报错,不过思考过程没法像lobechat那样显示,可能跟上游有关。
参考这个,已经非常完美的适配了思考
下边这个函数就不推荐了
"""
title: GPT-5 Pipe with Thinking Process
author_url: https://github.com/yourusername
author: Assistant
version: 1.0.0
license: MIT
description: OpenWebUI plugin for GPT-5 with thinking process visualization
"""
import json
import random
import httpx
import asyncio
import re
import time
from typing import List, AsyncGenerator, Callable, Awaitable, Optional, Union, Dict, Any
from pydantic import BaseModel, Field
class Pipe:
"""
GPT-5 管道插件,支持思考过程展示和自定义配置
"""
class Valves(BaseModel):
"""配置类,管理所有可配置参数"""
OPENAI_API_KEYS: str = Field(
default="", description="OpenAI API Keys,使用逗号分隔多个 Key"
)
BASE_URL: str = Field(
default="https://api.openai.com/v1",
description="API 基础 URL,可以使用自定义端点",
)
THINKING_LEVEL: int = Field(
default=5, description="思考深度级别 (1-10),越高思考越详细"
)
SHOW_THINKING_PROCESS: bool = Field(
default=True, description="是否显示思考过程"
)
THINKING_FORMAT: str = Field(
default="separate",
description="思考过程显示格式: inline(内联)或 separate(分离)",
)
# DISPLAY_MODELS 已废弃,请使用 SIMPLE_CUSTOM_MODELS
DEFAULT_MAX_TOKENS: int = Field(default=4096, description="默认最大 token 数")
STREAM_THINKING: bool = Field(default=True, description="是否流式显示思考过程")
ENABLE_THINKING: bool = Field(
default=True, description="是否为所有模型启用思考过程功能"
)
REQUEST_TIMEOUT: int = Field(default=120, description="请求超时时间(秒)")
RETRY_ATTEMPTS: int = Field(default=3, description="请求失败时的重试次数")
THINKING_MARKER: str = Field(
default="<thinking>", description="思考过程的标记符"
)
SIMPLE_CUSTOM_MODELS: str = Field(
default="gpt-5,gpt-5-mini",
description="简单自定义模型列表,逗号分隔。例如: claude-3-opus,llama-70b,my-local-model",
)
ADVANCED_CUSTOM_MODELS: str = Field(
default="",
description="""高级自定义模型配置(JSON 格式)。示例:
[
{
"id": "my-custom-gpt",
"name": "我的自定义 GPT",
"base_model": "gpt-4",
"max_tokens": 8192,
"supports_thinking": true,
"supports_vision": false,
"custom_params": {}
}
]""",
)
def __init__(self):
"""初始化 Pipe 类"""
self.type = "manifold"
self.name = "GPT-5: "
self.valves = self.Valves()
# 内部状态
self.emitter = None
self.current_api_key = None
self.is_thinking_mode = False
self.thinking_buffer = []
def get_api_key(self) -> str:
"""
获取 API Key,支持多个 Key 轮询
"""
if not self.valves.OPENAI_API_KEYS:
raise ValueError("未配置 OpenAI API Key")
keys = [k.strip() for k in self.valves.OPENAI_API_KEYS.split(",") if k.strip()]
if not keys:
raise ValueError("API Key 列表为空")
return random.choice(keys)
def validate_model_config(self, config: dict) -> bool:
"""
验证自定义模型配置的有效性
Args:
config: 模型配置字典
Returns:
配置是否有效
"""
required_fields = ["id"]
for field in required_fields:
if field not in config:
return False
# 验证数据类型
if not isinstance(config.get("id"), str):
return False
# 验证布尔值字段
bool_fields = ["supports_thinking", "supports_vision"]
for field in bool_fields:
if field in config and not isinstance(config[field], bool):
return False
return True
def parse_custom_models(self) -> List[dict]:
"""
解析自定义模型配置
Returns:
自定义模型列表
"""
custom_models = []
# 解析简单自定义模型
if self.valves.SIMPLE_CUSTOM_MODELS:
simple_models = [
m.strip()
for m in self.valves.SIMPLE_CUSTOM_MODELS.split(",")
if m.strip()
]
for model_id in simple_models:
custom_models.append(
{
"id": model_id,
"name": f"{model_id} (自定义)",
"is_custom": True,
"supports_thinking": True, # 默认支持思考模式
"supports_vision": False,
"base_model": model_id,
"max_tokens": None,
"custom_params": {},
}
)
# 解析高级自定义模型配置
if self.valves.ADVANCED_CUSTOM_MODELS:
try:
advanced_configs = json.loads(self.valves.ADVANCED_CUSTOM_MODELS)
if isinstance(advanced_configs, list):
for config in advanced_configs:
if self.validate_model_config(config):
# 添加默认值和标记
model_config = {
"id": config.get("id"),
"name": config.get("name", f"{config['id']} (自定义)"),
"is_custom": True,
"supports_thinking": config.get(
"supports_thinking", True
),
"supports_vision": config.get("supports_vision", False),
"base_model": config.get("base_model", config["id"]),
"max_tokens": config.get("max_tokens"),
"custom_params": config.get("custom_params", {}),
}
custom_models.append(model_config)
else:
# 记录无效配置但不中断
print(f"警告: 无效的模型配置 - {config}")
except json.JSONDecodeError as e:
# JSON 解析错误,返回已解析的简单模型
print(f"警告: 高级模型配置 JSON 解析失败 - {e}")
except Exception as e:
# 其他错误
print(f"警告: 解析高级模型配置时出错 - {e}")
return custom_models
def pipes(self) -> List[dict]:
"""
返回可用的模型列表 - 只显示用户配置的模型
"""
try:
models = []
# 只解析用户配置的简单自定义模型
if self.valves.SIMPLE_CUSTOM_MODELS:
model_names = [
m.strip()
for m in self.valves.SIMPLE_CUSTOM_MODELS.split(",")
if m.strip()
]
for model_id in model_names:
models.append({"id": model_id, "name": model_id})
# 如果没有配置任何模型,返回提示
if not models:
return [
{"id": "no-models", "name": "请在 SIMPLE_CUSTOM_MODELS 中配置模型"}
]
return models
except Exception as e:
return [{"id": "error", "name": f"错误: 无法获取模型列表 - {str(e)}"}]
async def emit_status(
self,
message: str = "",
done: bool = False,
) -> None:
"""
发送状态消息到客户端
"""
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": message,
"done": done,
},
}
)
def convert_message(self, message: dict) -> dict:
"""
将 OpenWebUI 消息格式转换为 OpenAI API 格式
"""
converted = {"role": message.get("role", "user")}
content = message.get("content", "")
# 处理字符串内容
if isinstance(content, str):
converted["content"] = content
# 处理多模态内容(图片等)
elif isinstance(content, list):
content_parts = []
for item in content:
if item.get("type") == "text":
content_parts.append({"type": "text", "text": item.get("text", "")})
elif item.get("type") == "image_url":
content_parts.append(
{"type": "image_url", "image_url": item.get("image_url", {})}
)
converted["content"] = content_parts
return converted
def inject_thinking_prompt(
self, messages: List[dict], thinking_level: int
) -> List[dict]:
"""
注入思考提示词,引导模型生成思考过程
Args:
messages: 原始消息列表
thinking_level: 思考深度级别 (1-10)
"""
thinking_prompts = {
1: "请简要思考后回答。",
3: "请先进行基本分析,然后给出答案。",
5: "请详细分析问题,展示推理步骤,然后给出答案。用 <thinking>...</thinking> 标记思考过程。",
7: "请进行深入的分析和推理,考虑多个角度和可能性。详细展示你的思考链。用 <thinking>...</thinking> 标记思考过程。",
10: "请进行极其详细的分析,包括:问题分解、多角度考虑、推理链展示、假设验证、潜在问题识别等。用 <thinking>...</thinking> 标记所有思考过程。",
}
# 选择最接近的思考级别提示
level_key = min(thinking_prompts.keys(), key=lambda x: abs(x - thinking_level))
thinking_instruction = thinking_prompts[level_key]
# 创建系统消息
system_message = {
"role": "system",
"content": f"""你是一个具有深度思考能力的 AI 助手。
{thinking_instruction}
重要规则:
1. 思考过程使用 <thinking></thinking> 标签包裹
2. 思考过程应该展示你的推理链和分析过程
3. 最终答案在思考标签之外给出
4. 保持思考过程的逻辑性和连贯性""",
}
# 检查是否已有系统消息
has_system = any(msg.get("role") == "system" for msg in messages)
if has_system:
# 合并到现有系统消息
for msg in messages:
if msg.get("role") == "system":
msg["content"] = f"{msg['content']}\n\n{system_message['content']}"
break
return messages
else:
# 添加新的系统消息
return [system_message] + messages
def extract_thinking_process(self, text: str) -> tuple[str, str]:
"""
从响应中提取思考过程和最终答案
Returns:
(thinking_process, final_answer)
"""
marker = self.valves.THINKING_MARKER
end_marker = marker.replace("<", "</")
# 使用正则表达式提取思考内容
pattern = f"{re.escape(marker)}(.*?){re.escape(end_marker)}"
thinking_matches = re.findall(pattern, text, re.DOTALL)
# 提取所有思考内容
thinking_process = "\n\n".join(thinking_matches) if thinking_matches else ""
# 移除思考标签,获取最终答案
final_answer = re.sub(pattern, "", text, flags=re.DOTALL).strip()
# 如果没有找到思考标记,返回原文作为答案
if not thinking_process:
return "", text
return thinking_process, final_answer
def format_thinking_output(
self, thinking: str, answer: str, format_type: str
) -> str:
"""
格式化思考过程和答案的输出
"""
if not thinking:
return answer
if format_type == "inline":
# 内联格式:思考和答案一起显示
return f"""💭 **思考过程:**
_{thinking}_
📝 **最终答案:**
{answer}"""
else:
# 分离格式:使用折叠块显示思考过程
return f"""<details>
<summary>💭 查看思考过程</summary>
{thinking}
</details>
---
{answer}"""
async def handle_streaming_response(
self, response: httpx.Response, model_id: str
) -> AsyncGenerator[str, None]:
"""
处理流式响应
"""
thinking_buffer = []
answer_buffer = []
in_thinking = False
marker = self.valves.THINKING_MARKER
end_marker = marker.replace("<", "</")
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
# 流结束,输出最终格式化内容
if self.is_thinking_mode and self.valves.SHOW_THINKING_PROCESS:
thinking_text = "".join(thinking_buffer)
answer_text = "".join(answer_buffer)
# 如果有思考内容,格式化输出
if thinking_text:
formatted = self.format_thinking_output(
thinking_text, answer_text, self.valves.THINKING_FORMAT
)
yield formatted
else:
yield answer_text
break
try:
data = json.loads(data_str)
# 提取内容
if "choices" in data and data["choices"]:
choice = data["choices"][0]
if "delta" in choice:
delta = choice["delta"]
if "content" in delta:
content = delta["content"]
# 检测思考标记
if marker in content:
in_thinking = True
content = content.replace(marker, "")
if end_marker in content:
in_thinking = False
content = content.replace(end_marker, "")
# 根据状态存储内容
if self.is_thinking_mode:
if in_thinking:
thinking_buffer.append(content)
# 如果启用流式思考显示
if self.valves.STREAM_THINKING:
yield f"💭 {content}"
else:
answer_buffer.append(content)
if (
not self.valves.STREAM_THINKING
or not thinking_buffer
):
yield content
else:
yield content
except json.JSONDecodeError:
continue
async def handle_non_streaming_response(
self, response_data: dict, model_id: str
) -> str:
"""
处理非流式响应
"""
if "choices" not in response_data or not response_data["choices"]:
return "错误:响应中没有内容"
content = response_data["choices"][0]["message"]["content"]
# 如果是思考模式,提取并格式化
if self.is_thinking_mode and self.valves.SHOW_THINKING_PROCESS:
thinking, answer = self.extract_thinking_process(content)
return self.format_thinking_output(
thinking, answer, self.valves.THINKING_FORMAT
)
return content
async def make_api_request(
self, url: str, headers: dict, payload: dict, stream: bool
) -> Union[httpx.Response, dict]:
"""
发送 API 请求,支持重试
"""
async with httpx.AsyncClient() as client:
for attempt in range(self.valves.RETRY_ATTEMPTS):
try:
if stream:
response = await client.post(
url,
headers=headers,
json=payload,
timeout=self.valves.REQUEST_TIMEOUT,
extensions={"stream": True},
)
else:
response = await client.post(
url,
headers=headers,
json=payload,
timeout=self.valves.REQUEST_TIMEOUT,
)
response.raise_for_status()
if not stream:
return response.json()
return response
except httpx.HTTPStatusError as e:
if attempt == self.valves.RETRY_ATTEMPTS - 1:
raise e
await asyncio.sleep(2**attempt) # 指数退避
except Exception as e:
if attempt == self.valves.RETRY_ATTEMPTS - 1:
raise e
await asyncio.sleep(2**attempt)
async def pipe(
self,
body: dict,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> AsyncGenerator[str, None]:
"""
主要的管道方法,处理请求和响应
"""
self.emitter = __event_emitter__
try:
# 获取 API Key
self.current_api_key = self.get_api_key()
# 提取模型信息
model_id = body.get("model", "gpt-5")
if "." in model_id:
model_id = model_id.split(".", 1)[1]
# 检测思考模式
self.is_thinking_mode = self.valves.ENABLE_THINKING
thinking_level = self.valves.THINKING_LEVEL
if self.is_thinking_mode:
await self.emit_status(message="💭 思考中...")
else:
await self.emit_status(message="✨ 生成中...")
# 准备消息
messages = [self.convert_message(msg) for msg in body.get("messages", [])]
# 如果是思考模式,注入思考提示
if self.is_thinking_mode:
messages = self.inject_thinking_prompt(messages, thinking_level)
# 简化版本:直接使用用户配置的模型 ID
# 构建请求负载
payload = {
"model": model_id,
"messages": messages,
"temperature": body.get("temperature", 0.7),
"max_tokens": body.get("max_tokens", self.valves.DEFAULT_MAX_TOKENS),
"top_p": body.get("top_p", 1.0),
"frequency_penalty": body.get("frequency_penalty", 0),
"presence_penalty": body.get("presence_penalty", 0),
"stream": body.get("stream", True),
}
# 直接使用配置的模型,无需额外处理
# 添加 stop sequences 如果存在
if "stop" in body:
payload["stop"] = body["stop"]
# 构建请求头
headers = {
"Authorization": f"Bearer {self.current_api_key}",
"Content-Type": "application/json",
}
# API URL
url = f"{self.valves.BASE_URL}/chat/completions"
# 根据流式/非流式处理
if payload["stream"]:
# 流式请求
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
url,
headers=headers,
json=payload,
timeout=self.valves.REQUEST_TIMEOUT,
) as response:
response.raise_for_status()
# 处理流式响应
async for chunk in self.handle_streaming_response(
response, model_id
):
yield chunk
await self.emit_status(message="✅ 完成", done=True)
else:
# 非流式请求
response_data = await self.make_api_request(
url, headers, payload, False
)
result = await self.handle_non_streaming_response(
response_data, model_id
)
yield result
await self.emit_status(message="✅ 完成", done=True)
except httpx.HTTPStatusError as e:
error_msg = f"API 请求失败: HTTP {e.response.status_code}"
try:
error_detail = e.response.json()
if "error" in error_detail:
error_msg += f" - {error_detail['error'].get('message', '')}"
except:
error_msg += f" - {e.response.text[:200]}"
yield f"❌ {error_msg}"
await self.emit_status(message="请求失败", done=True)
except ValueError as e:
yield f"❌ 配置错误: {str(e)}"
await self.emit_status(message="配置错误", done=True)
except Exception as e:
yield f"❌ 未知错误: {str(e)}"
await self.emit_status(message="发生错误", done=True)
