如果你用的是 @coker 佬的pipe,也不想用各种newapi
"""
title: Gemini Pipe
author_url:https://linux.do/u/coker/summary
author:coker
version: 1.1.9
license: MIT
"""
import json
import random
import httpx
from typing import List, AsyncGenerator, Callable, Awaitable, Optional, Union
from pydantic import BaseModel, Field
import re
import time
import asyncio
import base64
class Pipe:
class Valves(BaseModel):
GOOGLE_API_KEYS: str = Field(
default="", description="API Keys for Google, use , to split"
)
BASE_URL: str = Field(
default="https://generativelanguage.googleapis.com/v1beta",
description="API Base Url",
)
OPEN_SEARCH_INFO: bool = Field(
default=True, description="Open search info show "
)
IMAGE_NUM: int = Field(default=2, description="1-4")
IMAGE_RATIO: str = Field(
default="16:9", description="1:1, 3:4, 4:3, 16:9, 9:16"
)
THINGING_BUDGET: int = Field(
default=1000, description="Thinking budget , Max 24576"
)
VIDEO_RATIO: str = Field(default="16:9", description="16:9, 9:16")
VIDEO_NUM: int = Field(default=1, description="1-2")
VIDEO_DURATION: int = Field(default=5, description="5-8")
VIDEO_NeGATIVE_PROMPT: str = Field(default="", description="Negative prompt")
def __init__(self):
self.type = "manifold"
self.name = "Google: "
self.valves = self.Valves()
self.OPEN_SEARCH_MODELS = ["gemini-2.5-pro"]
self.OPEN_THINK_BUDGET_MODELS = ["gemini-2.5-flash-preview-05-20"]
self.emitter = None
self.open_search = False
self.open_image = False
self.open_think = False
self.think_first = True
# 添加思考状态管理
self.thinking_state = {"thinking": -1} # -1: 未开始, 0: 思考中, 1: 思考结束
def get_google_models(self) -> List[dict]:
# 硬编码模型列表,你用什么自己改
models = []
# # 添加搜索变体
# models.append(
# {
# "id": "gemini-2.5-pro-preview-06-05-search",
# "name": "gemini-2.5-pro-preview-06-05-search",
# }
# )
# 添加思考变体
models.append(
{
"id": "gemini-2.5-pro-thinking",
"name": "gemini-2.5-pro-thinking",
}
)
return models
async def emit_status(
self,
message: str = "",
done: bool = False,
):
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": message,
"done": done,
},
}
)
def pipes(self) -> List[dict]:
return self.get_google_models()
def create_search_link(self, idx, web):
return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'
def create_think_info(self, think_info):
pass
def _get_safety_settings(self, model: str):
if (
model == "gemini-2.0-flash-exp"
or model == "gemini-2.0-flash-exp-image-generation"
):
return [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "OFF"},
]
return [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"},
]
def split_image(self, content):
pattern = r"!\[image\]\(data:([^;]+);base64,([^)]+)\)"
matches = re.findall(pattern, content)
image_data_list = []
if matches:
for mime_type, base64_data in matches:
image_data_list.append({"mimeType": mime_type, "data": base64_data})
content = re.sub(r"!\[image\]\(data:[^;]+;base64,[^)]+\)", "", content)
if not content:
content = "请参考图片内容"
return content, image_data_list
def convert_message(self, message) -> dict:
new_message = {
"role": "model" if message["role"] == "assistant" else "user",
"parts": [],
}
if isinstance(message.get("content"), str):
if not message["role"] == "assistant":
new_message["parts"].append({"text": message["content"]})
return new_message
content, image_data_list = self.split_image(message["content"])
new_message["parts"].append({"text": content})
if image_data_list:
for image_data in image_data_list:
new_message["parts"].append(
{
"inline_data": {
"mime_type": image_data["mimeType"],
"data": image_data["data"],
}
}
)
return new_message
if isinstance(message.get("content"), list):
for content in message["content"]:
if content["type"] == "text":
new_message["parts"].append({"text": content["text"]})
elif content["type"] == "image_url":
image_url = content["image_url"]["url"]
if image_url.startswith("data:image"):
image_data = image_url.split(",")[1]
new_message["parts"].append(
{
"inline_data": {
"mime_type": "image/jpeg",
"data": image_data,
}
}
)
return new_message
async def do_parts(self, parts):
res = ""
if not parts or not isinstance(parts, list):
return "Error: No parts found"
thinking_content = ""
regular_content = ""
for part in parts:
# 检查是否有 text 内容
if "text" in part and part["text"]:
# 检查是否为思考内容 (根据 Gemini API 文档,思考内容通过 thought 布尔值标识)
if part.get("thought", False):
thinking_content += part["text"]
else:
regular_content += part["text"]
# 处理图片内容
elif "inlineData" in part and part["inlineData"]:
try:
regular_content += f'\n  \n'
except:
pass
# 根据思考状态处理输出
if self.open_think:
# 如果有思考内容且是第一次输出思考
if thinking_content and self.thinking_state["thinking"] == -1:
self.thinking_state["thinking"] = 0
res += "<think>\n" + thinking_content
# 如果正在思考中,继续输出思考内容
elif thinking_content and self.thinking_state["thinking"] == 0:
res += thinking_content
# 如果有常规内容且之前在思考,结束思考标签
elif regular_content and self.thinking_state["thinking"] == 0:
self.thinking_state["thinking"] = 1
res += "\n</think>\n\n" + regular_content
# 如果思考已结束,只输出常规内容
elif regular_content and self.thinking_state["thinking"] == 1:
res += regular_content
# 只有思考内容但还没开始思考状态
elif thinking_content and not regular_content:
if self.thinking_state["thinking"] == -1:
self.thinking_state["thinking"] = 0
res += "<think>\n" + thinking_content
else:
res += thinking_content
else:
# 非思考模式,正常输出所有内容
res = thinking_content + regular_content
return res
async def pipe(
self,
body: dict,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> AsyncGenerator[str, None]:
self.emitter = __event_emitter__
# 重置思考状态
self.thinking_state = {"thinking": -1}
self.open_think = False
self.open_search = False
self.open_image = False
self.GOOGLE_API_KEY = random.choice(
self.valves.GOOGLE_API_KEYS.split(",")
).strip()
self.base_url = self.valves.BASE_URL
if not self.GOOGLE_API_KEY:
yield "Error: GOOGLE_API_KEY is not set"
return
try:
model_id = body["model"]
if "." in model_id:
model_id = model_id.split(".", 1)[1]
if "imagen" in model_id:
self.emit_status(message="🐎 图像生成中……")
async for res in self.gen_image(body["messages"][-1], model_id):
yield res
return
if "veo" in model_id:
self.emit_status(message="🐎 视频生成中……")
async for res in self.gen_veo(body["messages"][-1], model_id):
yield res
return
messages = body["messages"]
stream = body.get("stream", False)
# Prepare the request payload
contents = []
request_data = {
"generationConfig": {
"temperature": body.get("temperature", 0.7),
"topP": body.get("top_p", 0.9),
"topK": body.get("top_k", 40),
"maxOutputTokens": body.get("max_tokens", 16384),
"stopSequences": body.get("stop", []),
},
}
for message in messages:
if message["role"] == "system":
request_data["system_instruction"] = {
"parts": [{"text": message["content"]}]
}
continue
contents.append(self.convert_message(message))
# if len(str(contents)) >1000:
# yield "contents :"+str(contents)[:1000]+"......"
request_data["contents"] = contents
# 处理来自过滤器的工具
tools_from_filters = body.get("tools", [])
if model_id.endswith("-search"):
model_id = model_id[:-7]
# 合并搜索工具和过滤器工具
search_tool = {"googleSearch": {}}
if tools_from_filters:
request_data["tools"] = [search_tool] + tools_from_filters
else:
request_data["tools"] = [search_tool]
self.open_search = True
await self.emit_status(message="🔍 我好像在搜索……")
elif "thinking" in model_id:
# 思考模型也可以使用过滤器工具
if tools_from_filters:
request_data["tools"] = tools_from_filters
await self.emit_status(message="🧐 别急我在思考……")
self.open_think = True
self.think_first = True
if model_id.endswith("-thinking"):
model_id = model_id[:-9]
request_data["generationConfig"]["thinking_config"] = {
"thinking_budget": self.valves.THINGING_BUDGET,
"includeThoughts": True,
}
elif model_id.endswith("-image-generation"):
# 图片生成模型也可以使用过滤器工具
if tools_from_filters:
request_data["tools"] = tools_from_filters
request_data["generationConfig"]["response_modalities"] = [
"Text",
"Image",
]
self.open_image = True
else:
# 普通模型使用过滤器工具
if tools_from_filters:
request_data["tools"] = tools_from_filters
await self.emit_status(message="🚀 飞速生成中……")
request_data["safetySettings"] = self._get_safety_settings(model_id)
params = {"key": self.GOOGLE_API_KEY}
if stream:
url = f"{self.valves.BASE_URL}/models/{model_id}:streamGenerateContent"
params["alt"] = "sse"
else:
url = f"{self.valves.BASE_URL}/models/{model_id}:generateContent"
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
if stream:
async with client.stream(
"POST",
url,
json=request_data,
headers=headers,
params=params,
timeout=500,
) as response:
if response.status_code != 200:
error_content = await response.aread()
yield f"Error: HTTP {response.status_code}: {error_content.decode('utf-8')}"
await self.emit_status(message="❌ 生成失败", done=True)
return
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if "candidates" in data and data["candidates"]:
try:
parts = data["candidates"][0]["content"][
"parts"
]
except:
if (
"finishReason" in data["candidates"][0]
and data["candidates"][0][
"finishReason"
]
!= "STOP"
):
yield "\n---\n" + "异常结束: " + data[
"candidates"
][0]["finishReason"]
return
else:
continue
text = await self.do_parts(parts)
yield text
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
):
yield "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
yield self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
except Exception as e:
# yield f"Error parsing stream: {str(e)}"
pass
await self.emit_status(message="🎉 生成成功", done=True)
else:
response = await client.post(
url,
json=request_data,
headers=headers,
params=params,
timeout=120,
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
return
data = response.json()
res = ""
if "candidates" in data and data["candidates"]:
parts = data["candidates"][0]["content"]["parts"]
res = await self.do_parts(parts)
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0]["groundingMetadata"][
"groundingChunks"
]
):
res += "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
res += self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
await self.emit_status(message="🎉 生成成功", done=True)
yield res
else:
yield "No response data"
except Exception as e:
yield f"Error: {str(e)}"
await self.emit_status(message="❌ 生成失败", done=True)
async def gen_image(
self, message: Optional[Union[dict, list]], model: str
) -> AsyncGenerator[str, None]:
content = message.get("content", "")
if isinstance(content, str):
prompt = content
elif isinstance(content, list) and len(content) > 0:
for msg in content:
if msg["type"] == "text":
prompt = msg["text"]
break
else:
yield "Error: No prompt found"
return
url = f"{self.base_url}/models/{model}:predict"
params = {"key": self.GOOGLE_API_KEY}
headers = {"Content-Type": "application/json"}
request_data = {
"instances": [{"prompt": prompt}],
"parameters": {
"sampleCount": self.valves.IMAGE_NUM, # @param {type:"number", min:1, max:4}
"personGeneration": "allow_adult", # @param ["dont_allow", "allow_adult"]
"aspectRatio": self.valves.IMAGE_RATIO, # @param ["1:1", "3:4", "4:3", "16:9", "9:16"]
},
}
async with httpx.AsyncClient() as client:
response = await client.post(
url, json=request_data, headers=headers, params=params, timeout=120
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
self.emit_status(message="❌ 生成失败", done=True)
return
data = response.json()
self.emit_status(message="🎉 生成成功", done=True)
if "predictions" in data and isinstance(data["predictions"], list):
yield f"生成图像数量: {len(data['predictions'])}\n\n"
for index, prediction in enumerate(data["predictions"]):
base64_str = (
prediction["bytesBase64Encoded"]
if "bytesBase64Encoded" in prediction
else None
)
if base64_str:
size_bytes = len(base64_str) * 3 / 4
if size_bytes >= 1024 * 1024:
size = round(size_bytes / (1024 * 1024), 1)
unit = "MB"
else:
size = round(size_bytes / 1024, 1)
unit = "KB"
yield f"图像 {index+1} 大小: {size} {unit}\n"
yield f' \n\n'
else:
yield "No image data found"
async def gen_veo(
self, message: Optional[Union[dict, list]], model: str
) -> AsyncGenerator[str, None]:
content = message.get("content", "")
img_base64_str = None
if isinstance(content, str):
prompt = content
elif isinstance(content, list) and len(content) > 0:
for msg in content:
if msg["type"] == "text":
prompt = msg["text"]
elif msg["type"] == "image_url":
if msg["image_url"]["url"].startswith("data:image"):
img_base64_str = msg["image_url"]["url"].split(",")[1]
else:
yield "Error: Error message format"
return
url = f"{self.base_url}/models/{model}:predictLongRunning"
if not prompt:
yield "Error: No prompt found"
return
request_data = {
"instances": [
{
"prompt": prompt,
}
],
"parameters": {
"aspectRatio": self.valves.VIDEO_RATIO, # @param ["16:9", "9:16"]
"negativePrompt": self.valves.VIDEO_NeGATIVE_PROMPT, # @param {type:"string"}
"personGeneration": "allow_adult",
"sampleCount": self.valves.VIDEO_NUM, # @param {type:"number", min:1, max:2}
"durationSeconds": self.valves.VIDEO_DURATION, # @param {type:"number", min:5, max:8}
},
}
if img_base64_str:
request_data["instances"][0]["image"] = {
"bytesBase64Encoded": img_base64_str,
"mimeType": "image/jpeg",
}
request_data["parameters"].pop("personGeneration", None)
params = {"key": self.GOOGLE_API_KEY}
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
response = await client.post(
url, json=request_data, headers=headers, params=params, timeout=120
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
self.emit_status(message="❌ 生成失败", done=True)
return
try:
res_text = response.text
if res_text.startswith("data: "):
res_text = res_text[6:].strip()
data = json.loads(res_text)
plan_id = data.get("name", "")
if not plan_id:
yield "Error: No plan ID found"
return
self.emit_status(message=plan_id, done=False)
start_time = time.time()
result_url = []
while True:
if time.time() - start_time > 300:
yield "Error: Timeout"
return
response = await client.get(
f"{self.base_url}/{plan_id}",
headers=headers,
params=params,
timeout=120,
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
return
data = response.json()
if data.get("done", "") and data["done"]:
if "error" in data:
if "message" in data["error"]:
yield f"Error: {data['error']['message']}"
else:
yield f"Error: {response.text}"
self.emit_status(message="❌ 生成失败", done=True)
return
if (
"generateVideoResponse" not in data["response"]
or "generatedSamples"
not in data["response"]["generateVideoResponse"]
):
yield "Error: " + response.text
self.emit_status(message="❌ 生成失败", done=True)
return
self.emit_status(message="🎉 生成成功", done=True)
for i in data["response"]["generateVideoResponse"][
"generatedSamples"
]:
result_url.append(i["video"]["uri"].split("?")[0])
break
else:
self.emit_status(message="正在生成视频中……", done=False)
await asyncio.sleep(10)
if result_url:
params["alt"] = "media"
for idx, url in enumerate(result_url, 1):
try:
resp = await client.get(
url, params=params, timeout=120, follow_redirects=True
)
resp.raise_for_status()
video_bytes = resp.content
b64_video = base64.b64encode(video_bytes).decode("utf-8")
yield "\n\n" + "```html\n<video width='350px' height='280px' controls='controls' autoplay='autoplay' loop='loop' preload='auto' src='data:video/mp4;base64,{}'></video>\n```".format(
b64_video
)
except Exception as e:
yield f"Error downloading/encoding 视频 {idx}: {e}" + "\n\n"
continue
except Exception as e:
yield f"Error: {str(e)}"
self.emit_status(message="❌ 生成失败", done=True)
return
- 图片生成模型也可以使用过滤器工具等逻辑可能有问题,我用不上也没去查证,就简单写了一下
"""
title: 代码执行
description: 启用Gemini原生代码执行工具
licence: MIT
"""
from pydantic import BaseModel, Field
from typing import Optional
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=95, description="过滤器应用的优先级。数值越小,优先级越高。"
)
def __init__(self):
self.type = "filter"
self.name = "Code Execution Enabler"
self.valves = self.Valves()
self.toggle = True
self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiPjxwYXRoIGQ9Ik0zMjAtMjQwIDgwLTQ4MGwyNDAtMjQwIDU3IDU3LTE4NCAxODQgMTgzIDE4My01NiA1NlptMzIwIDAtNTctNTcgMTg0LTE4NC0xODMtMTgzIDU2LTU2IDI0MCAyNDAtMjQwIDI0MFoiLz48L3N2Zz4="""
pass
def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if "tools" not in body:
body["tools"] = []
# 使用正确的 Gemini API 原生工具格式
code_execution_tool = {"code_execution": {}}
# 为避免重复添加,先检查是否有同名的工具
tool_exists = any("code_execution" in t for t in body["tools"])
if not tool_exists:
body["tools"].append(code_execution_tool)
return body
"""
title: 网页上下文提取
description: 为Gemini启用原生网页上下文提取工具,允许模型理解 URL 内容
licence: MIT
"""
from pydantic import BaseModel, Field
from typing import Optional
class Filter:
class Valves(BaseModel):
# 将优先级设置为 105,以确保它在代码执行(95)和联网搜索(100)之后运行
priority: int = Field(
default=105, description="过滤器应用的优先级。数值越小,优先级越高。"
)
def __init__(self):
self.type = "filter"
self.name = "URL Context Extractor"
self.valves = self.Valves()
self.toggle = True
self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgLTk2MCA5NjAgOTYwIiBmaWxsPSJjdXJyZW50Q29sb3IiIGNsYXNzPSJzaXplLTYiPjxwYXRoIGQ9Ik02ODAtMTYwdi0xMjBINTYwdi04MGgxMjB2LTEyMGg4MHYxMjBoMTIwdjgwSDc2MHYxMjBoLTgwWk00NDAtMjgwSDI4MHEtODMgMC0xNDEuNS01OC41VDgwLTQ4MHEwLTgzIDU4LjUtMTQxLjVUMjgwLTY4MGgxNjB2ODBIMjgwcS01MCAwLTg1IDM1dC0zNSA4NXEwIDUwIDM1IDg1dDg1IDM1aDE2MHY4MFpNMzIwLTQ0MHYtODBoMzIwdjgwSDMyMFptNTYwLTQwaC04MHEwLTUwLTM1LTg1dC04NS0zNUg1MjB2LTgwaDE2MHE4MyAwIDE0MS41IDU4LjVUODgwLTQ4MFoiLz48L3N2Zz4="""
pass
def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if "tools" not in body:
body["tools"] = []
# 使用正确的 Gemini API 原生工具格式
url_context_tool = {"url_context": {}}
# 检查是否已存在同名工具,避免重复添加
tool_exists = any("url_context" in t for t in body["tools"])
if not tool_exists:
body["tools"].append(url_context_tool)
return body
"""
title: 联网搜索
description: 原生联网搜索
"""
from pydantic import BaseModel, Field
from typing import Optional
class Filter:
class Valves(BaseModel):
priority: int = Field(default=100, description="优先级")
def __init__(self):
self.valves = self.Valves()
self.toggle = True
self.icon = """data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIGZpbGw9Im5vbmUiIHZpZXdCb3g9IjAgMCAyNCAyNCIgc3Ryb2tlLXdpZHRoPSIxLjc1IiBzdHJva2U9ImN1cnJlbnRDb2xvciIgY2xhc3M9InNpemUtNSI+PHBhdGggc3Ryb2tlLWxpbmVjYXA9InJvdW5kIiBzdHJva2UtbGluZWpvaW49InJvdW5kIiBkPSJNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDAgOC43MTYtNi43NDNNMTIgMjFhOS4wMDQgOS4wMDQgMCAwIDEtOC43MTYtNi43NDNNMTIgMjFjMi40ODUgMCA0LjUtNC4wMyA0LjUtOVMxNC40ODUgMyAxMiAzbTAgMThjLTIuNDg1IDAtNC41LTQuMDMtNC41LTlTOS41MTUgMyAxMiAzbTAgMGE4Ljk5NyA4Ljk5NyAwIDAgMSA3Ljg0MyA0LjU4Mk0xMiAzYTguOTk3IDguOTk3IDAgMCAwLTcuODQzIDQuNTgybTE1LjY4NiAwQTExLjk1MyAxMS45NTMgMCAwIDEgMTIgMTAuNWMtMi45OTggMC01Ljc0LTEuMS03Ljg0My0yLjkxOG0xNS42ODYgMEE4Ljk1OSA4Ljk1OSAwIDAgMSAyMSAxMmMwIC43NzgtLjA5OSAxLjUzMy0uMjg0IDIuMjUzbTAgMEExNy45MTkgMTcuOTE5IDAgMCAxIDEyIDE2LjVjLTMuMTYyIDAtNi4xMzMtLjgxNS04LjcxNi0yLjI0N20wIDBBOS4wMTUgOS4wMTUgMCAwIDEgMyAxMmMwLTEuNjA1LjQyLTMuMTEzIDEuMTU3LTQuNDE4Ij48L3BhdGg+PC9zdmc+"""
pass
def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if "tools" not in body:
body["tools"] = []
# 使用正确的 Gemini API 原生工具格式
google_search_tool = {"google_search": {}}
# 检查工具是否已存在,避免重复添加
tool_exists = any("google_search" in t for t in body["tools"])
if not tool_exists:
body["tools"].append(google_search_tool)
return body