参考我之前的o1的帖子改的
给使用chat01各位佬友们的新年礼物,openwebui 显示 chat01上的 o1/o1pro 思维链 - 开发调优 - LINUX DO
还好chat01的api返回格式都是一样的,基本上没改什么,各位可以开箱即食
o3-mini
"""
title: chat01流式输出代码
author: konbakuyomu
description: 在OpwenWebUI中显示o3-mini模型的思维链
version: 1.0.1
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
CHAT01_API_BASE_URL: str = Field(
default="https://chat01.ai/v1",
description="chat01的基础请求地址",
)
CHAT01_API_KEY: str = Field(
default="", description="用于身份验证的caht01密钥,可从控制台获取"
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data: "
self.thinking = -1 # -1:未开始 0:思考中 1:已回答
self.emitter = None
def pipes(self):
return [
{
"id": "o3-mini",
"name": "chat01/o3-mini-cot",
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
self.thinking = -1
self.emitter = __event_emitter__
# 验证配置
if not self.valves.CHAT01_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.CHAT01_API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = "o3-mini" # 写死默认使用o3-mini
payload = {**body, "model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1,
{"role": alternate_role, "content": "[Unfinished thinking]"},
)
i += 1
# yield json.dumps(payload, ensure_ascii=False)
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.CHAT01_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=30,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data = json.loads(line[len(self.data_prefix) :])
choice = data.get("choices", [{}])[0]
# 结束条件判断
if choice.get("finish_reason"):
return
# 状态机处理
state_output = await self._update_thinking_state(
choice.get("delta", {})
)
if state_output:
yield state_output # 直接发送状态标记
if state_output == "<think>":
yield "\n"
# 内容处理并立即发送
content = self._process_content(choice["delta"])
if content:
yield content
except Exception as e:
yield self._format_exception(e)
async def _update_thinking_state(self, delta: dict) -> str:
state_output = ""
text = delta.get("content", "")
# 检查 text 里是否含有“## 思考:”这样的字样
if self.thinking == -1 and "## 思考:" in text:
self.thinking = 0
state_output = "<think>"
elif self.thinking == 0 and "## 回答:" in text:
self.thinking = 1
state_output = "\n</think>\n\n"
return state_output
def _process_content(self, delta: dict) -> str:
"""优化内容处理,去除标题和时间信息"""
content = delta.get("content", "")
# 去除标题部分"## 回答:" 和 "## 思考:"
content = re.sub(r"(?m)^## 回答:.*$", "", content)
content = re.sub(r"(?m)^## 思考:.*$", "", content)
# 去除持续时间信息
content = re.sub(r"思考,持续\s.*?秒", "", content)
# 返回优化后的内容
return content
def _format_error(self, status_code: int, error: bytes) -> str:
"""错误格式化保持不变"""
try:
err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
:200
]
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
"""异常格式化保持不变"""
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
o3-mini-high
"""
title: chat01流式输出代码
author: konbakuyomu
description: 在OpwenWebUI中显示o3-mini-high模型的思维链
version: 1.0.1
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
CHAT01_API_BASE_URL: str = Field(
default="https://chat01.ai/v1",
description="chat01的基础请求地址",
)
CHAT01_API_KEY: str = Field(
default="", description="用于身份验证的caht01密钥,可从控制台获取"
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data: "
self.thinking = -1 # -1:未开始 0:思考中 1:已回答
self.emitter = None
def pipes(self):
return [
{
"id": "o3-mini-high",
"name": "chat01/o3-mini-high-cot",
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
self.thinking = -1
self.emitter = __event_emitter__
# 验证配置
if not self.valves.CHAT01_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.CHAT01_API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = "o3-mini-high" # 写死默认使用o3-mini-high
payload = {**body, "model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1,
{"role": alternate_role, "content": "[Unfinished thinking]"},
)
i += 1
# yield json.dumps(payload, ensure_ascii=False)
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.CHAT01_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=30,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data = json.loads(line[len(self.data_prefix) :])
choice = data.get("choices", [{}])[0]
# 结束条件判断
if choice.get("finish_reason"):
return
# 状态机处理
state_output = await self._update_thinking_state(
choice.get("delta", {})
)
if state_output:
yield state_output # 直接发送状态标记
if state_output == "<think>":
yield "\n"
# 内容处理并立即发送
content = self._process_content(choice["delta"])
if content:
yield content
except Exception as e:
yield self._format_exception(e)
async def _update_thinking_state(self, delta: dict) -> str:
state_output = ""
text = delta.get("content", "")
# 检查 text 里是否含有“## 思考:”这样的字样
if self.thinking == -1 and "## 思考:" in text:
self.thinking = 0
state_output = "<think>"
elif self.thinking == 0 and "## 回答:" in text:
self.thinking = 1
state_output = "\n</think>\n\n"
return state_output
def _process_content(self, delta: dict) -> str:
"""优化内容处理,去除标题和时间信息"""
content = delta.get("content", "")
# 去除标题部分"## 回答:" 和 "## 思考:"
content = re.sub(r"(?m)^## 回答:.*$", "", content)
content = re.sub(r"(?m)^## 思考:.*$", "", content)
# 去除持续时间信息
content = re.sub(r"思考,持续\s.*?秒", "", content)
# 返回优化后的内容
return content
def _format_error(self, status_code: int, error: bytes) -> str:
"""错误格式化保持不变"""
try:
err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
:200
]
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
"""异常格式化保持不变"""
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
给萌新的pipe函数使用教程
1.第一步:点击右上角,选择管理员面板
2.第二步:进入函数,点击右上角的+号

3.第三步 替换函数然后点击右下角保存(左上角的函数名称注意第一次创建的时候只允许数字字母+下划线的命名组合,出现其他字符会报错,后面修改名字就不会报错了)
4.第四步 回到上一级,也就是函数列表中,把开关打开(默认是关闭)
![]()
5.第五步 pipe函数你可以理解为直接就是一个模型了,此时你的可用模型能直接看到了,并且能编辑


