函数参考了下面这个大佬的代码,用ai跑出来的。
Open WebUI 函数:使用当贝免费满血 DeepSeek R1
import asyncio
import secrets
import time
import uuid
import json
import re
import httpx
import html
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
API_DOMAIN: str = Field(
default="https://infer-modelarts-cn-southwest-2.modelarts-infer.com",
description="ModelArts API Domain",
)
API_KEY: str = Field(
default="你的api key",
description="ModelArts API Key",
)
MODEL_ID_R1_32K: str = Field(
default="952e4f88-ef93-4398-ae8d-af37f63f0d8e",
description="ModelArts DeepSeek-R1 Model ID",
)
MODEL_ID_V3_32K: str = Field(
default="fd53915b-8935-48fe-be70-449d76c0fc87",
description="ModelArts DeepSeek-V3 Model ID",
)
MODEL_ID_R1_8K: str = Field(
default="861b6827-e5ef-4fa6-90d2-5fd1b2975882",
description="ModelArts DeepSeek-R1 Alternative Model ID",
)
MODEL_ID_V3_8K: str = Field(
default="707c01c8-517c-46ca-827a-d0b21c71b074",
description="ModelArts DeepSeek-R1 Alternative Model ID",
)
MODEL_ID_V3_4K: str = Field(
default="f354eacc-a2c5-43b4-a785-e5aadca988b3",
description="ModelArts DeepSeek-R1 Alternative Model ID",
)
MODEL_ID_R1_4K: str = Field(
default="c3cfa9e2-40c9-485f-a747-caae405296ef",
description="ModelArts DeepSeek-R1 Alternative Model ID",
)
def __init__(self):
self.valves = self.Valves()
self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
def pipes(self):
return [
{"id": "deepseek-r1-32K", "name": "DeepSeek-R1-32K 华为"},
{"id": "deepseek-r1-8K", "name": "DeepSeek-R1-8K 华为"},
{"id": "deepseek-r1-4K", "name": "DeepSeek-R1-4K 华为"},
{"id": "deepseek-v3-32K", "name": "DeepSeek-V3-32K 华为"},
{"id": "deepseek-v3-8K", "name": "DeepSeek-V3-8K 华为"},
{"id": "deepseek-v3-4K", "name": "DeepSeek-V3-4K 华为"},
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
# 提取会话信息
conversation_id = None
device_id = None
for message in [m for m in body["messages"] if m["role"] == "assistant"]:
meta_str = re.search(
r"^<details>(.*?)</details>", message["content"], re.DOTALL
)
if meta_str:
meta_str = meta_str.group(1)
device_id_match = re.search(r"Device ID:\s*(\S+)", meta_str)
if device_id_match:
device_id = device_id_match.group(1)
conversation_id_match = re.search(r"Conversation ID:\s*(\S+)", meta_str)
if conversation_id_match:
conversation_id = conversation_id_match.group(1)
# 如果没有设备ID,生成一个新的
if not device_id:
device_id = self._generate_device_id()
# 如果没有会话ID,生成一个新的
if not conversation_id:
conversation_id = str(uuid.uuid4())
# 输出会话信息
yield "<details>\n"
yield f"<summary>Conversation Info</summary>\n"
yield f"Device ID: {device_id}\n"
yield f"Conversation ID: {conversation_id}\n"
yield "</details>\n\n"
# 准备消息历史
messages = []
# 添加系统消息
messages.append({"role": "system", "content": "You are a helpful assistant."})
# 添加历史消息
for msg in body["messages"]:
if msg["role"] in ["user", "assistant"]:
# 去除助手消息中的元数据
if msg["role"] == "assistant":
content = re.sub(
r"^<details>.*?</details>\s*",
"",
msg["content"],
flags=re.DOTALL,
)
messages.append({"role": "assistant", "content": content})
else:
messages.append({"role": msg["role"], "content": msg["content"]})
# 根据选择的模型ID确定模型名称、API端点和上下文长度
model_id = body["model"].split(".", 1)[-1]
model_name = re.sub(r"-\d{1,2}K$", "", model_id)
model_mapping = {
"deepseek-r1": {
"32K": (
self.valves.MODEL_ID_R1_32K,
"DeepSeek-R1",
32768,
),
"8K": (
self.valves.MODEL_ID_R1_8K,
"DeepSeek-R1",
8192,
),
"4K": (
self.valves.MODEL_ID_R1_4K,
"DeepSeek-R1",
4096,
),
},
"deepseek-v3": {
"32K": (
self.valves.MODEL_ID_V3_32K,
"DeepSeek-V3",
32768,
),
"8K": (
self.valves.MODEL_ID_V3_8K,
"DeepSeek-V3",
8192,
),
"4K": (
self.valves.MODEL_ID_V3_4K,
"DeepSeek-V3",
4096,
),
},
}
# 匹配模型配置
if model_name in model_mapping:
# 检查是否有明确的后缀匹配
suffix_matched = False
for suffix in ["32K", "8K", "4K"]:
if model_id.endswith(suffix):
model_id, model_name, max_context = model_mapping[model_name][
suffix
]
suffix_matched = True
break
if not suffix_matched:
# 默认使用32K配置
model_id, model_name, max_context = model_mapping[model_name]["32K"]
else:
# 默认使用R1-32K配置
model_id = self.valves.MODEL_ID_R1_32K
model_name = "DeepSeek-R1"
max_context = 32768
# 估算输入token
def estimate_input(messages):
total_chars = sum(
len(msg["content"])
for msg in messages
if isinstance(msg.get("content"), str)
)
return total_chars # 假设每个字符约0.25 token
input_tokens = estimate_input(messages)
max_tokens = max(max_context - input_tokens, 1) # 确保至少1 token的输出空间
# 构建请求载荷
payload = {
"model": model_name,
"max_tokens": max_tokens, # 动态设置max_tokens
"messages": messages,
"stream": True,
"temperature": 1.0,
}
# 设置请求头 - 根据ModelArts API文档
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.valves.API_KEY}",
}
# 构建API URL
api = f"{self.valves.API_DOMAIN}/v1/infers/{model_id}/v1/chat/completions"
is_thinking = False # 新增思考状态跟踪
try:
# 使用httpx发送请求
async with httpx.AsyncClient(verify=False) as client:
async with client.stream(
"POST",
api,
json=payload,
headers=headers,
timeout=1200,
) as response:
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 处理流式响应
buffer = ""
async for chunk in response.aiter_bytes():
buffer += chunk.decode("utf-8")
lines = buffer.split("\n")
buffer = lines.pop()
for line in lines:
line = line.strip()
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
json_str = line[6:]
try:
data = json.loads(json_str)
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
# 处理思考内容(包裹think标签)
if "reasoning_content" in delta:
if not is_thinking:
yield "<think>"
is_thinking = True
yield delta["reasoning_content"]
# 处理正式回答内容(关闭think标签)
if "content" in delta and delta["content"]:
if is_thinking:
yield "</think>"
is_thinking = False
yield delta["content"]
except json.JSONDecodeError as e:
yield json.dumps(
{
"error": f"JSON解析错误: {str(e)}\nData: {json_str}"
},
ensure_ascii=False,
)
return
except Exception as e:
yield self._format_exception(e)
if is_thinking:
yield "</think>"
def _format_error(self, status_code: int, error: bytes) -> str:
"""格式化HTTP错误"""
if isinstance(error, str):
error_str = error
else:
error_str = error.decode(errors="ignore")
try:
err_msg = json.loads(error_str).get("message", error_str)[:200]
except Exception:
err_msg = error_str[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
"""格式化异常"""
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
def _nanoid(self, size=21) -> str:
"""生成nanoid"""
url_alphabet = (
"useandom-26T198340PX75pxJACKVERYMINDBUSHWOLF_GQZbfghjklqvwyzrict"
)
size = max(int(size), 0)
random_bytes = secrets.token_bytes(size)
return "".join([url_alphabet[b & 63] for b in random_bytes])
def _generate_device_id(self) -> str:
"""生成设备ID"""
return f"{uuid.uuid4().hex}_{self._nanoid(20)}"