兼容 chat01.ai 最新 o3-mini/o3-mini-high 的api 的思维链函数

参考我之前的o1的帖子改的
给使用chat01各位佬友们的新年礼物,openwebui 显示 chat01上的 o1/o1pro 思维链 - 开发调优 - LINUX DO
还好chat01的api返回格式都是一样的,基本上没改什么,各位可以开箱即食

o3-mini

"""
title: chat01流式输出代码
author: konbakuyomu
description: 在OpwenWebUI中显示o3-mini模型的思维链
version: 1.0.1
licence: MIT
"""

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        CHAT01_API_BASE_URL: str = Field(
            default="https://chat01.ai/v1",
            description="chat01的基础请求地址",
        )
        CHAT01_API_KEY: str = Field(
            default="", description="用于身份验证的caht01密钥,可从控制台获取"
        )

    def __init__(self):
        self.valves = self.Valves()
        self.data_prefix = "data: "
        self.thinking = -1  # -1:未开始 0:思考中 1:已回答
        self.emitter = None

    def pipes(self):
        return [
            {
                "id": "o3-mini",
                "name": "chat01/o3-mini-cot",
            }
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """主处理管道(已移除缓冲)"""
        self.thinking = -1
        self.emitter = __event_emitter__

        # 验证配置
        if not self.valves.CHAT01_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 准备请求参数
        headers = {
            "Authorization": f"Bearer {self.valves.CHAT01_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 模型ID提取
            model_id = "o3-mini"  # 写死默认使用o3-mini
            payload = {**body, "model": model_id}

            # 处理消息以防止连续的相同角色
            messages = payload["messages"]
            i = 0
            while i < len(messages) - 1:
                if messages[i]["role"] == messages[i + 1]["role"]:
                    # 插入具有替代角色的占位符消息
                    alternate_role = (
                        "assistant" if messages[i]["role"] == "user" else "user"
                    )
                    messages.insert(
                        i + 1,
                        {"role": alternate_role, "content": "[Unfinished thinking]"},
                    )
                i += 1

            # yield json.dumps(payload, ensure_ascii=False)

            # 发起API请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.CHAT01_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=30,
                ) as response:
                    # 错误处理
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue

                        data = json.loads(line[len(self.data_prefix) :])
                        choice = data.get("choices", [{}])[0]

                        # 结束条件判断
                        if choice.get("finish_reason"):
                            return

                        # 状态机处理
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {})
                        )
                        if state_output:
                            yield state_output  # 直接发送状态标记
                            if state_output == "<think>":
                                yield "\n"

                        # 内容处理并立即发送
                        content = self._process_content(choice["delta"])
                        if content:
                            yield content

        except Exception as e:
            yield self._format_exception(e)

    async def _update_thinking_state(self, delta: dict) -> str:
        state_output = ""
        text = delta.get("content", "")

        # 检查 text 里是否含有“## 思考:”这样的字样
        if self.thinking == -1 and "## 思考:" in text:
            self.thinking = 0
            state_output = "<think>"
        elif self.thinking == 0 and "## 回答:" in text:
            self.thinking = 1
            state_output = "\n</think>\n\n"

        return state_output

    def _process_content(self, delta: dict) -> str:
        """优化内容处理,去除标题和时间信息"""
        content = delta.get("content", "")

        # 去除标题部分"## 回答:" 和 "## 思考:"
        content = re.sub(r"(?m)^## 回答:.*$", "", content)
        content = re.sub(r"(?m)^## 思考:.*$", "", content)

        # 去除持续时间信息
        content = re.sub(r"思考,持续\s.*?秒", "", content)

        # 返回优化后的内容
        return content

    def _format_error(self, status_code: int, error: bytes) -> str:
        """错误格式化保持不变"""
        try:
            err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
                :200
            ]
        except:
            err_msg = error.decode(errors="ignore")[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """异常格式化保持不变"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)

o3-mini-high

"""
title: chat01流式输出代码
author: konbakuyomu
description: 在OpwenWebUI中显示o3-mini-high模型的思维链
version: 1.0.1
licence: MIT
"""

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        CHAT01_API_BASE_URL: str = Field(
            default="https://chat01.ai/v1",
            description="chat01的基础请求地址",
        )
        CHAT01_API_KEY: str = Field(
            default="", description="用于身份验证的caht01密钥,可从控制台获取"
        )

    def __init__(self):
        self.valves = self.Valves()
        self.data_prefix = "data: "
        self.thinking = -1  # -1:未开始 0:思考中 1:已回答
        self.emitter = None

    def pipes(self):
        return [
            {
                "id": "o3-mini-high",
                "name": "chat01/o3-mini-high-cot",
            }
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """主处理管道(已移除缓冲)"""
        self.thinking = -1
        self.emitter = __event_emitter__

        # 验证配置
        if not self.valves.CHAT01_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 准备请求参数
        headers = {
            "Authorization": f"Bearer {self.valves.CHAT01_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 模型ID提取
            model_id = "o3-mini-high"  # 写死默认使用o3-mini-high
            payload = {**body, "model": model_id}

            # 处理消息以防止连续的相同角色
            messages = payload["messages"]
            i = 0
            while i < len(messages) - 1:
                if messages[i]["role"] == messages[i + 1]["role"]:
                    # 插入具有替代角色的占位符消息
                    alternate_role = (
                        "assistant" if messages[i]["role"] == "user" else "user"
                    )
                    messages.insert(
                        i + 1,
                        {"role": alternate_role, "content": "[Unfinished thinking]"},
                    )
                i += 1

            # yield json.dumps(payload, ensure_ascii=False)

            # 发起API请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.CHAT01_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=30,
                ) as response:
                    # 错误处理
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue

                        data = json.loads(line[len(self.data_prefix) :])
                        choice = data.get("choices", [{}])[0]

                        # 结束条件判断
                        if choice.get("finish_reason"):
                            return

                        # 状态机处理
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {})
                        )
                        if state_output:
                            yield state_output  # 直接发送状态标记
                            if state_output == "<think>":
                                yield "\n"

                        # 内容处理并立即发送
                        content = self._process_content(choice["delta"])
                        if content:
                            yield content

        except Exception as e:
            yield self._format_exception(e)

    async def _update_thinking_state(self, delta: dict) -> str:
        state_output = ""
        text = delta.get("content", "")

        # 检查 text 里是否含有“## 思考:”这样的字样
        if self.thinking == -1 and "## 思考:" in text:
            self.thinking = 0
            state_output = "<think>"
        elif self.thinking == 0 and "## 回答:" in text:
            self.thinking = 1
            state_output = "\n</think>\n\n"

        return state_output

    def _process_content(self, delta: dict) -> str:
        """优化内容处理,去除标题和时间信息"""
        content = delta.get("content", "")

        # 去除标题部分"## 回答:" 和 "## 思考:"
        content = re.sub(r"(?m)^## 回答:.*$", "", content)
        content = re.sub(r"(?m)^## 思考:.*$", "", content)

        # 去除持续时间信息
        content = re.sub(r"思考,持续\s.*?秒", "", content)

        # 返回优化后的内容
        return content

    def _format_error(self, status_code: int, error: bytes) -> str:
        """错误格式化保持不变"""
        try:
            err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
                :200
            ]
        except:
            err_msg = error.decode(errors="ignore")[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """异常格式化保持不变"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)

给萌新的pipe函数使用教程

1.第一步:点击右上角,选择管理员面板

2.第二步:进入函数,点击右上角的+号
image

3.第三步 替换函数然后点击右下角保存(左上角的函数名称注意第一次创建的时候只允许数字字母+下划线的命名组合,出现其他字符会报错,后面修改名字就不会报错了

4.第四步 回到上一级,也就是函数列表中,把开关打开(默认是关闭)
image

5.第五步 pipe函数你可以理解为直接就是一个模型了,此时你的可用模型能直接看到了,并且能编辑
image

5 个赞

太强了,感谢教程!

大帅哥高强度刷帖啊,感谢支持 :smiling_face_with_three_hearts:

没必要写两个函数,一个函数秒了:

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable, Optional, Any
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        CHAT01_API_BASE_URL: str = Field(
            default="https://chat01.ai/v1",
            description="chat01的基础请求地址",
        )
        CHAT01_API_KEY: str = Field(
            default="", description="用于身份验证的chat01密钥,可从控制台获取"
        )

    def __init__(self):
        self.valves = self.Valves()
        self.data_prefix = "data: "
        self.thinking = -1  # -1:未开始 0:思考中 1:已回答
        self.emitter = None
        # 新增一个标志,用于是否支持“思考”逻辑
        self.thinking_supported = True

    def pipes(self):
        """
        在 Open-WebUI 前端中将会列出多个可选的「模型」按钮/选项。
        """
        return [
            {"id": "o1", "name": "chat01-o1"},
            {"id": "o1-pro", "name": "chat01-o1-pro"},
            {"id": "gpt-4o", "name": "chat01-gpt-4o"},
            {"id": "o3-mini", "name": "chat01-o3-mini"},
            {"id": "o3-mini-high", "name": "chat01-o3-mini-high"},
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """
        主处理管道
        """
        self.thinking = -1
        self.emitter = __event_emitter__

        # 1) 验证配置
        if not self.valves.CHAT01_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 2) 准备请求头
        headers = {
            "Authorization": f"Bearer {self.valves.CHAT01_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 3) 确定模型ID
            supported_models = ["o1", "o1-pro", "gpt-4o", "o3-mini", "o3-mini-high"]
            requested_model = body.get("model", "o1")
            # 问题1:去掉 "chat01." 前缀
            if requested_model.startswith("chat01."):
                requested_model = requested_model[len("chat01.") :]

            # 如果模型不在可选列表里,就默认为 "o1"
            if requested_model not in supported_models:
                requested_model = "gpt-4o"

            # 记录到 payload
            payload = {**body, "model": requested_model}

            # 问题2:如果是 gpt-4o,则不输出思考过程
            self.thinking_supported = requested_model != "gpt-4o"

            # 4) 检查用户最新消息里有没有图片
            self._inject_image_if_any(payload)

            # 5) 防止连续角色相同
            messages = payload["messages"]
            i = 0
            while i < len(messages) - 1:
                if messages[i]["role"] == messages[i + 1]["role"]:
                    alternate_role = (
                        "assistant" if messages[i]["role"] == "user" else "user"
                    )
                    messages.insert(
                        i + 1,
                        {"role": alternate_role, "content": "[Unfinished thinking]"},
                    )
                i += 1

            # 6) 发起流式请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.CHAT01_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=30,
                ) as response:
                    # 如果HTTP出错,直接yield并return
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue
                        data = json.loads(line[len(self.data_prefix) :])
                        choice = data.get("choices", [{}])[0]

                        # 如果返回里有 finish_reason,说明回答结束
                        if choice.get("finish_reason"):
                            return

                        # (1) 状态机插入<think> 或 </think>
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {})
                        )
                        if state_output:
                            yield state_output
                            # 如果是 <think>,可以额外输出个换行以区分
                            if state_output == "<think>":
                                yield "\n"

                        # (2) 内容处理并发送
                        content = self._process_content(choice["delta"])
                        if content:
                            yield content

        except Exception as e:
            yield self._format_exception(e)

    # -----------------------------------------------------------------------
    # 下面是提取用户输入的图片URL相关逻辑
    # -----------------------------------------------------------------------
    def _inject_image_if_any(self, payload: dict) -> None:
        """
        检查用户最新的一条消息,若里面有图片URL,就把它(们)改造成可携带图片的结构
        """
        messages = payload.get("messages", [])
        if not messages:
            return
        last_msg = messages[-1]
        # 确保这条消息是 "user" 角色,否则就不处理
        if last_msg["role"] != "user":
            return
        content_str = last_msg.get("content", "")
        if not isinstance(content_str, str):
            return

        # 找出全部图片链接
        image_urls = self._find_image_urls_in_text(content_str)
        if not image_urls:
            return

        # 限制最多3张
        limited_urls = image_urls[:3]
        # 去掉文本里的所有这些URL,让文字里不再出现它们
        cleaned_text = re.sub(
            r"(https?://[^\s]+?\.(?:png|jpg|jpeg|gif|bmp|tiff|webp))",
            "",
            content_str,
            flags=re.IGNORECASE,
        ).strip()

        # 构造新的 content 数组
        content_list = []
        if cleaned_text:
            content_list.append({"type": "text", "text": cleaned_text})
        for url in limited_urls:
            content_list.append({"type": "image_url", "image_url": {"url": url}})

        # 用这个列表替换掉 last_msg["content"]
        last_msg["content"] = content_list

    def _find_image_urls_in_text(self, text: str) -> list:
        """
        使用正则表达式在文本中查找所有图片 URL。
        """
        pattern = re.compile(
            r"(https?://[^\s]+?\.(?:png|jpg|jpeg|gif|bmp|tiff|webp))", re.IGNORECASE
        )
        return pattern.findall(text)

    # -----------------------------------------------------------------------
    # 思考状态机
    # -----------------------------------------------------------------------
    async def _update_thinking_state(self, delta: dict) -> str:
        """
        当 thinking_supported=False(即模型是 gpt-4o)时,直接跳过。
        """
        if not self.thinking_supported:
            return ""

        state_output = ""
        text = delta.get("content", "")
        if self.thinking == -1 and "## 思考:" in text:
            self.thinking = 0
            state_output = "<think>"
        elif self.thinking == 0 and "## 回答:" in text:
            self.thinking = 1
            state_output = "\n</think>\n\n"
        return state_output

    # -----------------------------------------------------------------------
    # 内容清洗
    # -----------------------------------------------------------------------
    def _process_content(self, delta: dict) -> str:
        """去除标题、去除思考时间等不想要的内容"""
        content = delta.get("content", "")
        # 去除标题: ## 回答: / ## 思考:
        content = re.sub(r"(?m)^## 回答:.*$", "", content)
        content = re.sub(r"(?m)^## 思考:.*$", "", content)
        return content

    # -----------------------------------------------------------------------
    # 错误处理
    # -----------------------------------------------------------------------
    def _format_error(self, status_code: int, error: bytes) -> str:
        try:
            err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
                :200
            ]
        except:
            err_msg = error.decode(errors="ignore")[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)

3 个赞

确实很棒!

我去,我刚刚在搜openwebUI的函数,直接搜到了,谢谢佬

mark,对我有用

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。