华为 deepseek open webui 函数

华为云DeepSeek满血版200万Token

函数参考了下面这个大佬的代码,用ai跑出来的。
Open WebUI 函数:使用当贝免费满血 DeepSeek R1

import asyncio
import secrets
import time
import uuid
import json
import re
import httpx
import html
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        API_DOMAIN: str = Field(
            default="https://infer-modelarts-cn-southwest-2.modelarts-infer.com",
            description="ModelArts API Domain",
        )
        API_KEY: str = Field(
            default="你的api key",
            description="ModelArts API Key",
        )
        MODEL_ID_R1_32K: str = Field(
            default="952e4f88-ef93-4398-ae8d-af37f63f0d8e",
            description="ModelArts DeepSeek-R1 Model ID",
        )
        MODEL_ID_V3_32K: str = Field(
            default="fd53915b-8935-48fe-be70-449d76c0fc87",
            description="ModelArts DeepSeek-V3 Model ID",
        )
        MODEL_ID_R1_8K: str = Field(
            default="861b6827-e5ef-4fa6-90d2-5fd1b2975882",
            description="ModelArts DeepSeek-R1 Alternative Model ID",
        )
        MODEL_ID_V3_8K: str = Field(
            default="707c01c8-517c-46ca-827a-d0b21c71b074",
            description="ModelArts DeepSeek-R1 Alternative Model ID",
        )
        MODEL_ID_V3_4K: str = Field(
            default="f354eacc-a2c5-43b4-a785-e5aadca988b3",
            description="ModelArts DeepSeek-R1 Alternative Model ID",
        )
        MODEL_ID_R1_4K: str = Field(
            default="c3cfa9e2-40c9-485f-a747-caae405296ef",
            description="ModelArts DeepSeek-R1 Alternative Model ID",
        )

    def __init__(self):
        self.valves = self.Valves()
        self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"

    def pipes(self):
        return [
            {"id": "deepseek-r1-32K", "name": "DeepSeek-R1-32K 华为"},
            {"id": "deepseek-r1-8K", "name": "DeepSeek-R1-8K 华为"},
            {"id": "deepseek-r1-4K", "name": "DeepSeek-R1-4K 华为"},
            {"id": "deepseek-v3-32K", "name": "DeepSeek-V3-32K 华为"},
            {"id": "deepseek-v3-8K", "name": "DeepSeek-V3-8K 华为"},
            {"id": "deepseek-v3-4K", "name": "DeepSeek-V3-4K 华为"},
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        # 提取会话信息
        conversation_id = None
        device_id = None

        for message in [m for m in body["messages"] if m["role"] == "assistant"]:
            meta_str = re.search(
                r"^<details>(.*?)</details>", message["content"], re.DOTALL
            )
            if meta_str:
                meta_str = meta_str.group(1)
                device_id_match = re.search(r"Device ID:\s*(\S+)", meta_str)
                if device_id_match:
                    device_id = device_id_match.group(1)

                conversation_id_match = re.search(r"Conversation ID:\s*(\S+)", meta_str)
                if conversation_id_match:
                    conversation_id = conversation_id_match.group(1)

        # 如果没有设备ID,生成一个新的
        if not device_id:
            device_id = self._generate_device_id()

        # 如果没有会话ID,生成一个新的
        if not conversation_id:
            conversation_id = str(uuid.uuid4())

        # 输出会话信息
        yield "<details>\n"
        yield f"<summary>Conversation Info</summary>\n"
        yield f"Device ID: {device_id}\n"
        yield f"Conversation ID: {conversation_id}\n"
        yield "</details>\n\n"

        # 准备消息历史
        messages = []
        # 添加系统消息
        messages.append({"role": "system", "content": "You are a helpful assistant."})

        # 添加历史消息
        for msg in body["messages"]:
            if msg["role"] in ["user", "assistant"]:
                # 去除助手消息中的元数据
                if msg["role"] == "assistant":
                    content = re.sub(
                        r"^<details>.*?</details>\s*",
                        "",
                        msg["content"],
                        flags=re.DOTALL,
                    )
                    messages.append({"role": "assistant", "content": content})
                else:
                    messages.append({"role": msg["role"], "content": msg["content"]})

        # 根据选择的模型ID确定模型名称、API端点和上下文长度
        model_id = body["model"].split(".", 1)[-1]

        model_name = re.sub(r"-\d{1,2}K$", "", model_id)
        model_mapping = {
            "deepseek-r1": {
                "32K": (
                    self.valves.MODEL_ID_R1_32K,
                    "DeepSeek-R1",
                    32768,
                ),
                "8K": (
                    self.valves.MODEL_ID_R1_8K,
                    "DeepSeek-R1",
                    8192,
                ),
                "4K": (
                    self.valves.MODEL_ID_R1_4K,
                    "DeepSeek-R1",
                    4096,
                ),
            },
            "deepseek-v3": {
                "32K": (
                    self.valves.MODEL_ID_V3_32K,
                    "DeepSeek-V3",
                    32768,
                ),
                "8K": (
                    self.valves.MODEL_ID_V3_8K,
                    "DeepSeek-V3",
                    8192,
                ),
                "4K": (
                    self.valves.MODEL_ID_V3_4K,
                    "DeepSeek-V3",
                    4096,
                ),
            },
        }

        # 匹配模型配置
        if model_name in model_mapping:
            # 检查是否有明确的后缀匹配
            suffix_matched = False
            for suffix in ["32K", "8K", "4K"]:
                if model_id.endswith(suffix):
                    model_id, model_name, max_context = model_mapping[model_name][
                        suffix
                    ]
                    suffix_matched = True
                    break
            if not suffix_matched:
                # 默认使用32K配置
                model_id, model_name, max_context = model_mapping[model_name]["32K"]
        else:
            # 默认使用R1-32K配置
            model_id = self.valves.MODEL_ID_R1_32K
            model_name = "DeepSeek-R1"
            max_context = 32768

        # 估算输入token
        def estimate_input(messages):
            total_chars = sum(
                len(msg["content"])
                for msg in messages
                if isinstance(msg.get("content"), str)
            )
            return total_chars  # 假设每个字符约0.25 token

        input_tokens = estimate_input(messages)
        max_tokens = max(max_context - input_tokens, 1)  # 确保至少1 token的输出空间

        # 构建请求载荷
        payload = {
            "model": model_name,
            "max_tokens": max_tokens,  # 动态设置max_tokens
            "messages": messages,
            "stream": True,
            "temperature": 1.0,
        }

        # 设置请求头 - 根据ModelArts API文档
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.valves.API_KEY}",
        }

        # 构建API URL
        api = f"{self.valves.API_DOMAIN}/v1/infers/{model_id}/v1/chat/completions"

        is_thinking = False  # 新增思考状态跟踪
        try:
            # 使用httpx发送请求

            async with httpx.AsyncClient(verify=False) as client:
                async with client.stream(
                    "POST",
                    api,
                    json=payload,
                    headers=headers,
                    timeout=1200,
                ) as response:
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 处理流式响应
                    buffer = ""

                    async for chunk in response.aiter_bytes():
                        buffer += chunk.decode("utf-8")
                        lines = buffer.split("\n")
                        buffer = lines.pop()

                        for line in lines:
                            line = line.strip()
                            if not line or line == "data: [DONE]":
                                continue
                            if line.startswith("data: "):
                                json_str = line[6:]
                                try:
                                    data = json.loads(json_str)
                                    if "choices" in data and len(data["choices"]) > 0:
                                        delta = data["choices"][0].get("delta", {})

                                        # 处理思考内容(包裹think标签)
                                        if "reasoning_content" in delta:
                                            if not is_thinking:
                                                yield "<think>"
                                                is_thinking = True
                                            yield delta["reasoning_content"]

                                        # 处理正式回答内容(关闭think标签)
                                        if "content" in delta and delta["content"]:
                                            if is_thinking:
                                                yield "</think>"
                                                is_thinking = False
                                            yield delta["content"]

                                except json.JSONDecodeError as e:
                                    yield json.dumps(
                                        {
                                            "error": f"JSON解析错误: {str(e)}\nData: {json_str}"
                                        },
                                        ensure_ascii=False,
                                    )
                                    return

        except Exception as e:
            yield self._format_exception(e)
        if is_thinking:
            yield "</think>"

    def _format_error(self, status_code: int, error: bytes) -> str:
        """格式化HTTP错误"""
        if isinstance(error, str):
            error_str = error
        else:
            error_str = error.decode(errors="ignore")

        try:
            err_msg = json.loads(error_str).get("message", error_str)[:200]
        except Exception:
            err_msg = error_str[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """格式化异常"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)

    def _nanoid(self, size=21) -> str:
        """生成nanoid"""
        url_alphabet = (
            "useandom-26T198340PX75pxJACKVERYMINDBUSHWOLF_GQZbfghjklqvwyzrict"
        )
        size = max(int(size), 0)
        random_bytes = secrets.token_bytes(size)
        return "".join([url_alphabet[b & 63] for b in random_bytes])

    def _generate_device_id(self) -> str:
        """生成设备ID"""
        return f"{uuid.uuid4().hex}_{self._nanoid(20)}"

9 个赞

不错,支持一下

1 个赞

支持一下大佬

1 个赞

感谢大佬!

1 个赞

感谢分享l

感谢分享

感谢分享

感谢分享!!

感谢分享

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。