给openwebui的gemini加上读取网页 联网和代码执行

注意 需要对接one-hub才行

new-api对gemini的支持不行

直接用gemini的原生openai格式也不行

读取网页内容

"""
title: 网页上下文提取
description: 为Gemini启用原生网页上下文提取工具,允许模型理解 URL 内容
licence: MIT
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:

    class Valves(BaseModel):
        # 将优先级设置为 105,以确保它在代码执行(95)和联网搜索(100)之后运行
        priority: int = Field(
            default=105, description="过滤器应用的优先级。数值越小,优先级越高。"
        )

    def __init__(self):
        self.type = "filter"
        self.name = "URL Context Extractor"
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiIGNsYXNzPSJzaXplLTYiPjxwYXRoIGQ9Ik02ODAtMTYwdi0xMjBINTYwdi04MGgxMjB2LTEyMGg4MHYxMjBoMTIwdjgwSDc2MHYxMjBoLTgwWk00NDAtMjgwSDI4MHEtODMgMC0xNDEuNS01OC41VDgwLTQ4MHEwLTgzIDU4LjUtMTQxLjVUMjgwLTY4MGgxNjB2ODBIMjgwcS01MCAwLTg1IDM1dC0zNSA4NXEwIDUwIDM1IDg1dDg1IDM1aDE2MHY4MFpNMzIwLTQ0MHYtODBoMzIwdjgwSDMyMFptNTYwLTQwaC04MHEwLTUwLTM1LTg1dC04NS0zNUg1MjB2LTgwaDE2MHE4MyAwIDE0MS41IDU4LjVUODgwLTQ4MFoiLz48L3N2Zz4="""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 构建 urlContext 工具
        url_context_tool = {
            "type": "function",
            "function": {"name": "urlContext", "parameters": {}},
        }

        # 检查是否已存在同名工具,避免重复添加
        tool_exists = any(
            t.get("function", {}).get("name") == "urlContext" for t in body["tools"]
        )

        if not tool_exists:
            body["tools"].append(url_context_tool)

        return body

联网

参考了 【OpenWebUI】新版下开关模型原生能力更优雅的方式

"""
title: 联网搜索
description: 原生联网搜索
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:
    class Valves(BaseModel):
        priority: int = Field(default=100, description="优先级")

    def __init__(self):
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIGZpbGw9Im5vbmUiIHZpZXdCb3g9IjAgMCAyNCAyNCIgc3Ryb2tlLXdpZHRoPSIxLjc1IiBzdHJva2U9ImN1cnJlbnRDb2xvciIgY2xhc3M9InNpemUtNSI+PHBhdGggc3Ryb2tlLWxpbmVjYXA9InJvdW5kIiBzdHJva2UtbGluZWpvaW49InJvdW5kIiBkPSJNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDAgOC43MTYtNi43NDdNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDEtOC43MTYtNi43NDdNMTIgMjFjMi40ODUgMCA0LjUtNC4wMyA0LjUtOVMxNC40ODUgMyAxMiAzbTAgMThjLTIuNDg1IDAtNC41LTQuMDMtNC41LTlTOS41MTUgMyAxMiAzbTAgMGE4Ljk5NyA4Ljk5NyAwIDAgMSA3Ljg0MyA0LjU4Mk0xMiAzYTguOTk3IDguOTk3IDAgMCAwLTcuODQzIDQuNTgybTE1LjY4NiAwQTExLjk1MyAxMS45NTMgMCAwIDEgMTIgMTAuNWMtMi45OTggMC01Ljc0LTEuMS03Ljg0My0yLjkxOG0xNS42ODYgMEE4Ljk1OSA4Ljk1OSAwIDAgMSAyMSAxMmMwIC43NzgtLjA5OSAxLjUzMy0uMjg0IDIuMjUzbTAgMEExNy45MTkgMTcuOTE5IDAgMCAxIDEyIDE2LjVjLTMuMTYyIDAtNi4xMzMtLjgxNS04LjcxNi0yLjI0N20wIDBBOS4wMTUgOS4wMTUgMCAwIDEgMyAxMmMwLTEuNjA1LjQyLTMuMTEzIDEuMTU3LTQuNDE4Ij48L3BhdGg+PC9zdmc+"""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 构建标准的 Function Calling 工具
        google_search_tool = {
            "type": "function",
            "function": {"name": "googleSearch", "parameters": {}},
        }

        # 检查工具是否已存在,避免重复添加
        tool_exists = any(
            t.get("function", {}).get("name") == "googleSearch" for t in body["tools"]
        )

        if not tool_exists:
            body["tools"].append(google_search_tool)

        return body

代码执行

他这个代码执行的输出会混在代码块里面 我不知道怎么解决

"""
title: 代码执行
description: 启用Gemini原生代码执行工具
licence: MIT
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:

    class Valves(BaseModel):
        priority: int = Field(
            default=95, description="过滤器应用的优先级。数值越小,优先级越高。"
        )

    def __init__(self):
        self.type = "filter"
        self.name = "Code Execution Enabler"
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiPjxwYXRoIGQ9Ik0zMjAtMjQwIDgwLTQ4MGwyNDAtMjQwIDU3IDU3LTE4NCAxODQgMTgzIDE4My01NiA1NlptMzIwIDAtNTctNTcgMTg0LTE4NC0xODMtMTgzIDU2LTU2IDI0MCAyNDAtMjQwIDI0MFoiLz48L3N2Zz4="""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 使用标准的 Function Calling 格式
        code_execution_tool = {
            "type": "function",
            "function": {"name": "codeExecution", "parameters": {}},
        }

        # 为避免重复添加,先检查是否有同名的工具
        tool_exists = any(
            t.get("function", {}).get("name") == "codeExecution" for t in body["tools"]
        )

        if not tool_exists:
            body["tools"].append(code_execution_tool)

        return body

使用方法

添加函数到openwebui

然后在模型设置上把相关的过滤器选择上


另外推荐使用这个gemini-2.5-pro的提示词优化输出格式

参考了 【提示词工程】增加Gemini输出格式的可读性,模仿ChatGPT的markdown格式:优化latex输出、文本重点加粗

你是一个高级语言模型。在输出文本时,请严格遵循以下格式要求,以确保信息的清晰、准确和易读:


1. **结构化内容**:
   - **段落分明**:使用清晰的段落来组织不同的思想或主题。
   - **标题和副标题**:使用不同级别的标题(如一级、二级、三级标题)来划分内容的层次结构,确保逻辑清晰。使用标题时注意不要把`一级标题`这种文字输出。


2. **使用Markdown语法**(如果平台支持):
   - **粗体和斜体**:用于强调关键词或概念。
     - 例如:**重要信息** 或 *强调部分*。
   - **项目符号和编号列表**:用于列举要点或步骤。
     - 无序列表:
       - 项目一
       - 项目二
     - 有序列表:
       1. 步骤一
       2. 步骤二
   - **代码块**:仅用于展示代码或需要保持原格式的内容,避免将数学公式放入代码块中。
     ```python
     def hello_world():
         print("Hello, World!")
     ```
   - **引用**:引用他人观点或重要信息时使用引用格式。
     > 这是一个引用的示例。
   - **数学公式和表格**:
     - **数学公式**:
       - **行间公式**:使用双美元符号 `$$` 或反斜杠 `$$` 和 `$$` 包裹公式,使其在新行中独立显示。
         例如:
         $$
         A = \begin{pmatrix}
         3 & 2 & 1 \\
         3 & 1 & 5 \\
         3 & 2 & 3 \\
         \end{pmatrix}
         $$
         或
         $$
         A = \begin{pmatrix}
         3 & 2 & 1 \\
         3 & 1 & 5 \\
         3 & 2 & 3 \\
         \end{pmatrix}
         $$
       - **行内公式**:应使用单个美元符号 `$` 包裹数学公式以在文本行内显示。**`$` 前后必须加空格:每个行内公式的起始 `$` 符号前必须添加一个空格,且结束 `$` 符号后也必须添加一个空格。这是强制规则,必须遵守。此规则的目的是确保行内公式及其定界符与周围文本内容之间存在清晰的空格分隔。**
         例如:矩阵 $A = \begin{pmatrix} 3 & 2 & 1 \\ 3 & 1 & 5 \\ 3 & 2 & 3 \end{pmatrix}$ 是一个 $3 \times 3$ 矩阵。
         **注意这种情况,你需要在冒号后面`$`前面加上空格,公式结尾的`$`后面`。`前面也得加上空格:`这将输出: $x - \frac{x^2}{2} + \frac{x^3}{3} - \frac{x^4}{4} + \frac{x^5}{5} - \frac{x^6}{6} + \frac{x^7}{7} - \frac{x^8}{8} + \frac{x^9}{9} - \frac{x^{10}}{10}$ 。`**
         **同时你也得注意这种情况,在`$\omega$`的后面需要加一个空格以和后面的`(`空开来:`角速度 $\omega$ (单位:rad/s)的计算公式`**
     - **表格**:用于展示结构化数据时使用Markdown表格,确保信息对齐且易于比较。
       例如:


       | 姓名 | 年龄 | 职业 |
       |------|------|------|
       | 张三 | 28   | 工程师 |
       | 李四 | 34   | 设计师 |


3. **分数和数学表示**:
   - **一致性**:保持分数表示的一致性,优先使用简化形式。
     - 例如:使用 `-8/11` 而非 `-16/22`。
   - **格式统一**:在整个文本中统一使用分数形式或小数形式,避免混用。


4. **详细说明和解释**:
   - **步骤说明**:在每个关键步骤中增加简要解释,说明为何进行此操作,帮助读者理解操作背后的原因。
     - 例如:“通过 R2 = R2 - R1 消去第二行的第一个元素,以简化矩阵。”
   - **数学准确性**:确保所有数学计算和结果的准确性,仔细检查每一步的运算,避免错误。


5. **一致性和格式统一**:
   - **符号和缩写**:统一使用符号和缩写,避免同一文档中出现不同的表示方式。
   - **字体和样式**:保持整个文本中使用的字体和样式一致,例如统一使用粗体标题、斜体强调等。


6. **视觉辅助**:
   - **颜色和强调**:适当使用颜色或其他Markdown特性来突出关键步骤或结果,增强视觉效果(如果平台支持)。
   - **间距和对齐**:确保文本和元素之间的间距合理,对齐整齐,提升整体美观性。


7. **适应性调整**:
   - 根据内容类型调整格式。例如,技术文档可能需要更多的代码示例和表格,而故事叙述则注重段落和描述。
   - **示例和比喻**:根据需要使用示例、比喻或图表来解释复杂概念,增强理解。


**重要提示**:
- **避免将数学公式放入代码块中**。数学公式应使用LaTeX语法在Markdown中正确显示。
- **确保数学公式的正确性和格式**,使用适当的符号和环境来展示复杂的数学表达式。


通过严格遵循以上格式要求,你能够生成结构清晰、内容准确、格式统一且易于阅读和理解的简体中文文本,帮助用户更有效地获取和理解所需的信息。

还有这个修改版的openwebui镜像
a15355447898a/open-webui:latest

优化代码块的折叠显示 在输入框添加了一个markdown编辑器

这是 【openwebui美化】open-webUI美化增强第二弹,为openwebui输入框增加带实时预览的文本编辑器。 的代码
以及感谢 【openwebui美化】open-webUI美化增强第二弹,为openwebui输入框增加带实时预览的文本编辑器。 - #25,来自 yuewei 的github action构建方案

39 个赞

感谢大佬 !

2 个赞

提示错误:我无法直接访问您提供的网页链接并提取内容来编辑新闻稿。我的工具集不包含网页抓取功能。

1 个赞

你用的是one-hub吗 :thinking:

我之前接了new-api发现不支持

3 个赞

用的原生gemini

中转下把

gemini的原生openai格式不知道有什么坑 :upside_down_face:

我这个函数照着one-hub文档写的

1 个赞

我已经有 new api, 支持读取网页,和联网吗? 我试了下,好像还是不行,难道要重新搭建 one hub

1 个赞

联网我实测是可以的 读取网页和代码执行我记得不行

new-api对gemini的支持一直不好

搭建one hub后, 你的函数 确实可以使用了,非常感谢。

1 个赞

很感谢佬 用上了 我的newapi可以 :tieba_072:

1 个赞

vertex的gemini同时只能用一个工具,aistudio的gemini可以同时启用联网和代码执行,或者联网和网页上下文

1 个赞

不错,Mark

2 个赞

哇,好厉害厉害啊

2 个赞

如果你用的是 @coker 佬的pipe,也不想用各种newapi

"""
title: Gemini Pipe
author_url:https://linux.do/u/coker/summary
author:coker
version: 1.1.9
license: MIT
"""

import json
import random
import httpx
from typing import List, AsyncGenerator, Callable, Awaitable, Optional, Union
from pydantic import BaseModel, Field
import re
import time
import asyncio
import base64


class Pipe:
    class Valves(BaseModel):
        GOOGLE_API_KEYS: str = Field(
            default="", description="API Keys for Google, use , to split"
        )
        BASE_URL: str = Field(
            default="https://generativelanguage.googleapis.com/v1beta",
            description="API Base Url",
        )
        OPEN_SEARCH_INFO: bool = Field(
            default=True, description="Open search info show "
        )
        IMAGE_NUM: int = Field(default=2, description="1-4")
        IMAGE_RATIO: str = Field(
            default="16:9", description="1:1, 3:4, 4:3, 16:9, 9:16"
        )
        THINGING_BUDGET: int = Field(
            default=1000, description="Thinking budget , Max 24576"
        )
        VIDEO_RATIO: str = Field(default="16:9", description="16:9, 9:16")
        VIDEO_NUM: int = Field(default=1, description="1-2")
        VIDEO_DURATION: int = Field(default=5, description="5-8")
        VIDEO_NeGATIVE_PROMPT: str = Field(default="", description="Negative prompt")

    def __init__(self):
        self.type = "manifold"
        self.name = "Google: "
        self.valves = self.Valves()
        self.OPEN_SEARCH_MODELS = ["gemini-2.5-pro"]
        self.OPEN_THINK_BUDGET_MODELS = ["gemini-2.5-flash-preview-05-20"]
        self.emitter = None
        self.open_search = False
        self.open_image = False
        self.open_think = False
        self.think_first = True
        # 添加思考状态管理
        self.thinking_state = {"thinking": -1}  # -1: 未开始, 0: 思考中, 1: 思考结束

    def get_google_models(self) -> List[dict]:
        # 硬编码模型列表,你用什么自己改
        models = []

        # # 添加搜索变体
        # models.append(
        #     {
        #         "id": "gemini-2.5-pro-preview-06-05-search",
        #         "name": "gemini-2.5-pro-preview-06-05-search",
        #     }
        # )

        # 添加思考变体
        models.append(
            {
                "id": "gemini-2.5-pro-thinking",
                "name": "gemini-2.5-pro-thinking",
            }
        )

        return models

    async def emit_status(
        self,
        message: str = "",
        done: bool = False,
    ):
        if self.emitter:
            await self.emitter(
                {
                    "type": "status",
                    "data": {
                        "description": message,
                        "done": done,
                    },
                }
            )

    def pipes(self) -> List[dict]:
        return self.get_google_models()

    def create_search_link(self, idx, web):
        return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'

    def create_think_info(self, think_info):
        pass

    def _get_safety_settings(self, model: str):
        if (
            model == "gemini-2.0-flash-exp"
            or model == "gemini-2.0-flash-exp-image-generation"
        ):
            return [
                {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
                {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
                {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
                {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
                {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "OFF"},
            ]
        return [
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
            {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"},
        ]

    def split_image(self, content):
        pattern = r"!\[image\]\(data:([^;]+);base64,([^)]+)\)"
        matches = re.findall(pattern, content)
        image_data_list = []
        if matches:
            for mime_type, base64_data in matches:
                image_data_list.append({"mimeType": mime_type, "data": base64_data})
            content = re.sub(r"!\[image\]\(data:[^;]+;base64,[^)]+\)", "", content)
        if not content:
            content = "请参考图片内容"
        return content, image_data_list

    def convert_message(self, message) -> dict:
        new_message = {
            "role": "model" if message["role"] == "assistant" else "user",
            "parts": [],
        }
        if isinstance(message.get("content"), str):
            if not message["role"] == "assistant":
                new_message["parts"].append({"text": message["content"]})
                return new_message
            content, image_data_list = self.split_image(message["content"])
            new_message["parts"].append({"text": content})
            if image_data_list:
                for image_data in image_data_list:
                    new_message["parts"].append(
                        {
                            "inline_data": {
                                "mime_type": image_data["mimeType"],
                                "data": image_data["data"],
                            }
                        }
                    )
            return new_message
        if isinstance(message.get("content"), list):
            for content in message["content"]:
                if content["type"] == "text":
                    new_message["parts"].append({"text": content["text"]})
                elif content["type"] == "image_url":
                    image_url = content["image_url"]["url"]
                    if image_url.startswith("data:image"):
                        image_data = image_url.split(",")[1]
                        new_message["parts"].append(
                            {
                                "inline_data": {
                                    "mime_type": "image/jpeg",
                                    "data": image_data,
                                }
                            }
                        )
        return new_message

    async def do_parts(self, parts):
        res = ""
        if not parts or not isinstance(parts, list):
            return "Error: No parts found"

        thinking_content = ""
        regular_content = ""

        for part in parts:
            # 检查是否有 text 内容
            if "text" in part and part["text"]:
                # 检查是否为思考内容 (根据 Gemini API 文档,思考内容通过 thought 布尔值标识)
                if part.get("thought", False):
                    thinking_content += part["text"]
                else:
                    regular_content += part["text"]
            # 处理图片内容
            elif "inlineData" in part and part["inlineData"]:
                try:
                    regular_content += f'\n ![image](data:{part["inlineData"]["mimeType"]};base64,{part["inlineData"]["data"]}) \n'
                except:
                    pass

        # 根据思考状态处理输出
        if self.open_think:
            # 如果有思考内容且是第一次输出思考
            if thinking_content and self.thinking_state["thinking"] == -1:
                self.thinking_state["thinking"] = 0
                res += "<think>\n" + thinking_content
            # 如果正在思考中,继续输出思考内容
            elif thinking_content and self.thinking_state["thinking"] == 0:
                res += thinking_content
            # 如果有常规内容且之前在思考,结束思考标签
            elif regular_content and self.thinking_state["thinking"] == 0:
                self.thinking_state["thinking"] = 1
                res += "\n</think>\n\n" + regular_content
            # 如果思考已结束,只输出常规内容
            elif regular_content and self.thinking_state["thinking"] == 1:
                res += regular_content
            # 只有思考内容但还没开始思考状态
            elif thinking_content and not regular_content:
                if self.thinking_state["thinking"] == -1:
                    self.thinking_state["thinking"] = 0
                    res += "<think>\n" + thinking_content
                else:
                    res += thinking_content
        else:
            # 非思考模式,正常输出所有内容
            res = thinking_content + regular_content

        return res

    async def pipe(
        self,
        body: dict,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> AsyncGenerator[str, None]:
        self.emitter = __event_emitter__
        # 重置思考状态
        self.thinking_state = {"thinking": -1}
        self.open_think = False
        self.open_search = False
        self.open_image = False

        self.GOOGLE_API_KEY = random.choice(
            self.valves.GOOGLE_API_KEYS.split(",")
        ).strip()
        self.base_url = self.valves.BASE_URL
        if not self.GOOGLE_API_KEY:
            yield "Error: GOOGLE_API_KEY is not set"
            return
        try:
            model_id = body["model"]
            if "." in model_id:
                model_id = model_id.split(".", 1)[1]
            if "imagen" in model_id:
                self.emit_status(message="🐎 图像生成中……")
                async for res in self.gen_image(body["messages"][-1], model_id):
                    yield res
                return
            if "veo" in model_id:
                self.emit_status(message="🐎 视频生成中……")
                async for res in self.gen_veo(body["messages"][-1], model_id):
                    yield res
                return
            messages = body["messages"]
            stream = body.get("stream", False)
            # Prepare the request payload
            contents = []
            request_data = {
                "generationConfig": {
                    "temperature": body.get("temperature", 0.7),
                    "topP": body.get("top_p", 0.9),
                    "topK": body.get("top_k", 40),
                    "maxOutputTokens": body.get("max_tokens", 16384),
                    "stopSequences": body.get("stop", []),
                },
            }

            for message in messages:
                if message["role"] == "system":
                    request_data["system_instruction"] = {
                        "parts": [{"text": message["content"]}]
                    }
                    continue
                contents.append(self.convert_message(message))
            # if len(str(contents)) >1000:
            #     yield "contents :"+str(contents)[:1000]+"......"
            request_data["contents"] = contents
            
            # 处理来自过滤器的工具
            tools_from_filters = body.get("tools", [])
            
            if model_id.endswith("-search"):
                model_id = model_id[:-7]
                # 合并搜索工具和过滤器工具
                search_tool = {"googleSearch": {}}
                if tools_from_filters:
                    request_data["tools"] = [search_tool] + tools_from_filters
                else:
                    request_data["tools"] = [search_tool]
                self.open_search = True
                await self.emit_status(message="🔍 我好像在搜索……")
            elif "thinking" in model_id:
                # 思考模型也可以使用过滤器工具
                if tools_from_filters:
                    request_data["tools"] = tools_from_filters
                await self.emit_status(message="🧐 别急我在思考……")
                self.open_think = True
                self.think_first = True
                if model_id.endswith("-thinking"):
                    model_id = model_id[:-9]
                    request_data["generationConfig"]["thinking_config"] = {
                        "thinking_budget": self.valves.THINGING_BUDGET,
                        "includeThoughts": True,
                    }
            elif model_id.endswith("-image-generation"):
                # 图片生成模型也可以使用过滤器工具
                if tools_from_filters:
                    request_data["tools"] = tools_from_filters
                request_data["generationConfig"]["response_modalities"] = [
                    "Text",
                    "Image",
                ]
                self.open_image = True
            else:
                # 普通模型使用过滤器工具
                if tools_from_filters:
                    request_data["tools"] = tools_from_filters
                await self.emit_status(message="🚀 飞速生成中……")
            request_data["safetySettings"] = self._get_safety_settings(model_id)
            params = {"key": self.GOOGLE_API_KEY}
            if stream:
                url = f"{self.valves.BASE_URL}/models/{model_id}:streamGenerateContent"
                params["alt"] = "sse"
            else:
                url = f"{self.valves.BASE_URL}/models/{model_id}:generateContent"
            headers = {"Content-Type": "application/json"}
            async with httpx.AsyncClient() as client:
                if stream:
                    async with client.stream(
                        "POST",
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=500,
                    ) as response:
                        if response.status_code != 200:
                            error_content = await response.aread()
                            yield f"Error: HTTP {response.status_code}: {error_content.decode('utf-8')}"
                            await self.emit_status(message="❌ 生成失败", done=True)
                            return

                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                try:
                                    data = json.loads(line[6:])
                                    if "candidates" in data and data["candidates"]:
                                        try:
                                            parts = data["candidates"][0]["content"][
                                                "parts"
                                            ]
                                        except:
                                            if (
                                                "finishReason" in data["candidates"][0]
                                                and data["candidates"][0][
                                                    "finishReason"
                                                ]
                                                != "STOP"
                                            ):
                                                yield "\n---\n" + "异常结束: " + data[
                                                    "candidates"
                                                ][0]["finishReason"]
                                                return
                                            else:
                                                continue
                                        text = await self.do_parts(parts)
                                        yield text
                                        try:
                                            if (
                                                self.open_search
                                                and self.valves.OPEN_SEARCH_INFO
                                                and data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                            ):
                                                yield "\n---------------------------------\n"
                                                groundingChunks = data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                                for idx, groundingChunk in enumerate(
                                                    groundingChunks, 1
                                                ):
                                                    if "web" in groundingChunk:
                                                        yield self.create_search_link(
                                                            idx, groundingChunk["web"]
                                                        )
                                        except Exception as e:
                                            pass
                                except Exception as e:
                                    # yield f"Error parsing stream: {str(e)}"
                                    pass
                        await self.emit_status(message="🎉 生成成功", done=True)
                else:
                    response = await client.post(
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    )
                    if response.status_code != 200:
                        yield f"Error: HTTP {response.status_code}: {response.text}"
                        return
                    data = response.json()
                    res = ""
                    if "candidates" in data and data["candidates"]:
                        parts = data["candidates"][0]["content"]["parts"]
                        res = await self.do_parts(parts)
                        try:
                            if (
                                self.open_search
                                and self.valves.OPEN_SEARCH_INFO
                                and data["candidates"][0]["groundingMetadata"][
                                    "groundingChunks"
                                ]
                            ):
                                res += "\n---------------------------------\n"
                                groundingChunks = data["candidates"][0][
                                    "groundingMetadata"
                                ]["groundingChunks"]
                                for idx, groundingChunk in enumerate(
                                    groundingChunks, 1
                                ):
                                    if "web" in groundingChunk:
                                        res += self.create_search_link(
                                            idx, groundingChunk["web"]
                                        )
                        except Exception as e:
                            pass
                        await self.emit_status(message="🎉 生成成功", done=True)
                        yield res
                    else:
                        yield "No response data"
        except Exception as e:
            yield f"Error: {str(e)}"
            await self.emit_status(message="❌ 生成失败", done=True)

    async def gen_image(
        self, message: Optional[Union[dict, list]], model: str
    ) -> AsyncGenerator[str, None]:
        content = message.get("content", "")
        if isinstance(content, str):
            prompt = content
        elif isinstance(content, list) and len(content) > 0:
            for msg in content:
                if msg["type"] == "text":
                    prompt = msg["text"]
                    break
        else:
            yield "Error: No prompt found"
            return
        url = f"{self.base_url}/models/{model}:predict"
        params = {"key": self.GOOGLE_API_KEY}
        headers = {"Content-Type": "application/json"}
        request_data = {
            "instances": [{"prompt": prompt}],
            "parameters": {
                "sampleCount": self.valves.IMAGE_NUM,  # @param {type:"number", min:1, max:4}
                "personGeneration": "allow_adult",  # @param ["dont_allow", "allow_adult"]
                "aspectRatio": self.valves.IMAGE_RATIO,  # @param ["1:1", "3:4", "4:3", "16:9", "9:16"]
            },
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url, json=request_data, headers=headers, params=params, timeout=120
            )
            if response.status_code != 200:
                yield f"Error: HTTP {response.status_code}: {response.text}"
                self.emit_status(message="❌ 生成失败", done=True)
                return
            data = response.json()
            self.emit_status(message="🎉 生成成功", done=True)
            if "predictions" in data and isinstance(data["predictions"], list):
                yield f"生成图像数量: {len(data['predictions'])}\n\n"
                for index, prediction in enumerate(data["predictions"]):
                    base64_str = (
                        prediction["bytesBase64Encoded"]
                        if "bytesBase64Encoded" in prediction
                        else None
                    )
                    if base64_str:
                        size_bytes = len(base64_str) * 3 / 4
                        if size_bytes >= 1024 * 1024:
                            size = round(size_bytes / (1024 * 1024), 1)
                            unit = "MB"
                        else:
                            size = round(size_bytes / 1024, 1)
                            unit = "KB"
                        yield f"图像 {index+1} 大小: {size} {unit}\n"
                        yield f'![image](data:{prediction["mimeType"]};base64,{base64_str}) \n\n'

                    else:
                        yield "No image data found"

    async def gen_veo(
        self, message: Optional[Union[dict, list]], model: str
    ) -> AsyncGenerator[str, None]:
        content = message.get("content", "")
        img_base64_str = None
        if isinstance(content, str):
            prompt = content
        elif isinstance(content, list) and len(content) > 0:
            for msg in content:
                if msg["type"] == "text":
                    prompt = msg["text"]
                elif msg["type"] == "image_url":
                    if msg["image_url"]["url"].startswith("data:image"):
                        img_base64_str = msg["image_url"]["url"].split(",")[1]
        else:
            yield "Error: Error message format"
            return
        url = f"{self.base_url}/models/{model}:predictLongRunning"
        if not prompt:
            yield "Error: No prompt found"
            return
        request_data = {
            "instances": [
                {
                    "prompt": prompt,
                }
            ],
            "parameters": {
                "aspectRatio": self.valves.VIDEO_RATIO,  # @param ["16:9", "9:16"]
                "negativePrompt": self.valves.VIDEO_NeGATIVE_PROMPT,  # @param {type:"string"}
                "personGeneration": "allow_adult",
                "sampleCount": self.valves.VIDEO_NUM,  # @param {type:"number", min:1, max:2}
                "durationSeconds": self.valves.VIDEO_DURATION,  # @param {type:"number", min:5, max:8}
            },
        }
        if img_base64_str:
            request_data["instances"][0]["image"] = {
                "bytesBase64Encoded": img_base64_str,
                "mimeType": "image/jpeg",
            }
            request_data["parameters"].pop("personGeneration", None)
        params = {"key": self.GOOGLE_API_KEY}
        headers = {"Content-Type": "application/json"}
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url, json=request_data, headers=headers, params=params, timeout=120
            )
            if response.status_code != 200:
                yield f"Error: HTTP {response.status_code}: {response.text}"
                self.emit_status(message="❌ 生成失败", done=True)
                return
            try:
                res_text = response.text
                if res_text.startswith("data: "):
                    res_text = res_text[6:].strip()
                data = json.loads(res_text)
                plan_id = data.get("name", "")
                if not plan_id:
                    yield "Error: No plan ID found"
                    return
                self.emit_status(message=plan_id, done=False)
                start_time = time.time()
                result_url = []
                while True:
                    if time.time() - start_time > 300:
                        yield "Error: Timeout"
                        return
                    response = await client.get(
                        f"{self.base_url}/{plan_id}",
                        headers=headers,
                        params=params,
                        timeout=120,
                    )
                    if response.status_code != 200:
                        yield f"Error: HTTP {response.status_code}: {response.text}"
                        return
                    data = response.json()
                    if data.get("done", "") and data["done"]:
                        if "error" in data:
                            if "message" in data["error"]:
                                yield f"Error: {data['error']['message']}"
                            else:
                                yield f"Error: {response.text}"
                            self.emit_status(message="❌ 生成失败", done=True)
                            return
                        if (
                            "generateVideoResponse" not in data["response"]
                            or "generatedSamples"
                            not in data["response"]["generateVideoResponse"]
                        ):
                            yield "Error: " + response.text
                            self.emit_status(message="❌ 生成失败", done=True)
                            return
                        self.emit_status(message="🎉 生成成功", done=True)
                        for i in data["response"]["generateVideoResponse"][
                            "generatedSamples"
                        ]:
                            result_url.append(i["video"]["uri"].split("?")[0])
                        break
                    else:
                        self.emit_status(message="正在生成视频中……", done=False)
                        await asyncio.sleep(10)
                if result_url:
                    params["alt"] = "media"
                    for idx, url in enumerate(result_url, 1):
                        try:
                            resp = await client.get(
                                url, params=params, timeout=120, follow_redirects=True
                            )
                            resp.raise_for_status()
                            video_bytes = resp.content
                            b64_video = base64.b64encode(video_bytes).decode("utf-8")
                            yield "\n\n" + "```html\n<video width='350px' height='280px' controls='controls' autoplay='autoplay' loop='loop' preload='auto' src='data:video/mp4;base64,{}'></video>\n```".format(
                                b64_video
                            )

                        except Exception as e:
                            yield f"Error downloading/encoding 视频 {idx}: {e}" + "\n\n"
                            continue
            except Exception as e:
                yield f"Error: {str(e)}"
                self.emit_status(message="❌ 生成失败", done=True)
                return
  • 图片生成模型也可以使用过滤器工具等逻辑可能有问题,我用不上也没去查证,就简单写了一下
"""
title: 代码执行
description: 启用Gemini原生代码执行工具
licence: MIT
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:

    class Valves(BaseModel):
        priority: int = Field(
            default=95, description="过滤器应用的优先级。数值越小,优先级越高。"
        )

    def __init__(self):
        self.type = "filter"
        self.name = "Code Execution Enabler"
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiPjxwYXRoIGQ9Ik0zMjAtMjQwIDgwLTQ4MGwyNDAtMjQwIDU3IDU3LTE4NCAxODQgMTgzIDE4My01NiA1NlptMzIwIDAtNTctNTcgMTg0LTE4NC0xODMtMTgzIDU2LTU2IDI0MCAyNDAtMjQwIDI0MFoiLz48L3N2Zz4="""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 使用正确的 Gemini API 原生工具格式
        code_execution_tool = {"code_execution": {}}

        # 为避免重复添加,先检查是否有同名的工具
        tool_exists = any("code_execution" in t for t in body["tools"])

        if not tool_exists:
            body["tools"].append(code_execution_tool)

        return body
"""
title: 网页上下文提取
description: 为Gemini启用原生网页上下文提取工具,允许模型理解 URL 内容
licence: MIT
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:

    class Valves(BaseModel):
        # 将优先级设置为 105,以确保它在代码执行(95)和联网搜索(100)之后运行
        priority: int = Field(
            default=105, description="过滤器应用的优先级。数值越小,优先级越高。"
        )

    def __init__(self):
        self.type = "filter"
        self.name = "URL Context Extractor"
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiIGNsYXNzPSJzaXplLTYiPjxwYXRoIGQ9Ik02ODAtMTYwdi0xMjBINTYwdi04MGgxMjB2LTEyMGg4MHYxMjBoMTIwdjgwSDc2MHYxMjBoLTgwWk00NDAtMjgwSDI4MHEtODMgMC0xNDEuNS01OC41VDgwLTQ4MHEwLTgzIDU4LjUtMTQxLjVUMjgwLTY4MGgxNjB2ODBIMjgwcS01MCAwLTg1IDM1dC0zNSA4NXEwIDUwIDM1IDg1dDg1IDM1aDE2MHY4MFpNMzIwLTQ0MHYtODBoMzIwdjgwSDMyMFptNTYwLTQwaC04MHEwLTUwLTM1LTg1dC04NS0zNUg1MjB2LTgwaDE2MHE4MyAwIDE0MS41IDU4LjVUODgwLTQ4MFoiLz48L3N2Zz4="""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 使用正确的 Gemini API 原生工具格式
        url_context_tool = {"url_context": {}}

        # 检查是否已存在同名工具,避免重复添加
        tool_exists = any("url_context" in t for t in body["tools"])

        if not tool_exists:
            body["tools"].append(url_context_tool)

        return body
"""
title: 联网搜索
description: 原生联网搜索
"""

from pydantic import BaseModel, Field
from typing import Optional


class Filter:
    class Valves(BaseModel):
        priority: int = Field(default=100, description="优先级")

    def __init__(self):
        self.valves = self.Valves()
        self.toggle = True
        self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIGZpbGw9Im5vbmUiIHZpZXdCb3g9IjAgMCAyNCAyNCIgc3Ryb2tlLXdpZHRoPSIxLjc1IiBzdHJva2U9ImN1cnJlbnRDb2xvciIgY2xhc3M9InNpemUtNSI+PHBhdGggc3Ryb2tlLWxpbmVjYXA9InJvdW5kIiBzdHJva2UtbGluZWpvaW49InJvdW5kIiBkPSJNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDAgOC43MTYtNi43NDNNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDEtOC43MTYtNi43NDNNMTIgMjFjMi40ODUgMCA0LjUtNC4wMyA0LjUtOVMxNC40ODUgMyAxMiAzbTAgMThjLTIuNDg1IDAtNC41LTQuMDMtNC41LTlTOS41MTUgMyAxMiAzbTAgMGE4Ljk5NyA4Ljk5NyAwIDAgMSA3Ljg0MyA0LjU4Mk0xMiAzYTguOTk3IDguOTk3IDAgMCAwLTcuODQzIDQuNTgybTE1LjY4NiAwQTExLjk1MyAxMS45NTMgMCAwIDEgMTIgMTAuNWMtMi45OTggMC01Ljc0LTEuMS03Ljg0My0yLjkxOG0xNS42ODYgMEE4Ljk1OSA4Ljk1OSAwIDAgMSAyMSAxMmMwIC43NzgtLjA5OSAxLjUzMy0uMjg0IDIuMjUzbTAgMEExNy45MTkgMTcuOTE5IDAgMCAxIDEyIDE2LjVjLTMuMTYyIDAtNi4xMzMtLjgxNS04LjcxNi0yLjI0N20wIDBBOS4wMTUgOS4wMTUgMCAwIDEgMyAxMmMwLTEuNjA1LjQyLTMuMTEzIDEuMTU3LTQuNDE4Ij48L3BhdGg+PC9zdmc+"""
        pass

    def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if "tools" not in body:
            body["tools"] = []

        # 使用正确的 Gemini API 原生工具格式
        google_search_tool = {"google_search": {}}

        # 检查工具是否已存在,避免重复添加
        tool_exists = any("google_search" in t for t in body["tools"])

        if not tool_exists:
            body["tools"].append(google_search_tool)

        return body
5 个赞

感谢分享

好帖子啊


onehub填的官key,显示如图,是不是不能用了啊,直接提示无法读取链接,佬,能不能帮助我一下,挺想使用这个功能的

我刚才试了试github的看不了,但是能看l站的
有点随机了,现在可以看了又,但是有点地方会乱说

1 个赞

你之前可以看github么