OpenWebUI Monitor 的更新版本

原文:【已支持按条计费】为 OpenWebUI 编写了一个用量监控和用户余额管理平台

最近使用了佬友的这个 Filter 来控制用户的用量,细读代码发现可能会有一些小问题导致计费异常,所以手动改了下,现分享给社区

主要改动点

  1. 更换了变量名称,修改后需要重新设置 base_urlapi_key
  2. 移除了翻译文件,个人认为简单的英文不会影响阅读
  3. 移除了展示与否的开关,展示 Token,花费,余额信息
  4. 使用 httpx 替代 requests,可以直接格式化 BaseModel 而不需要手动 model_dump
  5. 修复了一些潜在 BUG;由于 Filter 常驻内存,之前存在大量的 self.xxx 可能导致不同用户的计费互相串门,user 变量声明为 {} 也容易导致使用异常

暂未阅读 OpenWebUI 相关源代码,“Filter 常驻内存” 这个结论来自社区插件 “Max Turns”,能在多次请求时判定不同用户是否超过限制,所以 Filter 应该是一个单例实现

效果展示

代码

"""
title: Usage Monitor
author: VariantConst & OVINC CN
version: 0.3.4
requirements: httpx
license: MIT
"""

import logging
from typing import Dict, Optional

from httpx import AsyncClient
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class CustomException(Exception):
    pass


class Filter:
    class Valves(BaseModel):
        api_endpoint: str = Field(
            default="", description="openwebui-monitor's base url"
        )
        api_key: str = Field(default="", description="openwebui-monitor's api key")
        priority: int = Field(default=5, description="filter priority")

    def __init__(self):
        self.type = "filter"
        self.valves = self.Valves()
        self.outage_map: Dict[str, bool] = {}

    async def request(self, client: AsyncClient, url: str, headers: dict, json: dict):
        response = await client.post(url=url, headers=headers, json=json)
        response.raise_for_status()
        response_data = response.json()
        if not response_data.get("success"):
            logger.error("[usage_monitor] req monitor failed: %s", response_data)
            raise CustomException(
                "calculate usage failed, please contact administrator"
            )
        return response_data

    async def inlet(
        self,
        body: dict,
        __metadata__: Optional[dict] = None,
        __user__: Optional[dict] = None,
    ) -> dict:
        __user__ = __user__ or {}
        __metadata__ = __metadata__ or {}
        user_id = __user__["id"]

        client = AsyncClient()

        try:
            response_data = await self.request(
                client=client,
                url=f"{self.valves.api_endpoint}/api/v1/inlet",
                headers={"Authorization": f"Bearer {self.valves.api_key}"},
                json={"user": __user__, "body": body},
            )
            self.outage_map[user_id] = response_data.get("balance", 0) <= 0
            if self.outage_map[user_id]:
                logger.info("[usage_monitor] no balance: %s", user_id)
                raise CustomException("no balance, please contact administrator")

            return body

        except Exception as err:
            logger.exception("[usage_monitor] error calculating usage: %s", err)
            if isinstance(err, CustomException):
                raise err
            raise Exception(f"error calculating usage, {err}") from err

        finally:
            await client.aclose()

    async def outlet(
        self,
        body: dict,
        __metadata__: Optional[dict] = None,
        __user__: Optional[dict] = None,
        __event_emitter__: callable = None,
    ) -> dict:
        __user__ = __user__ or {}
        __metadata__ = __metadata__ or {}
        user_id = __user__["id"]

        if self.outage_map[user_id]:
            return body

        client = AsyncClient()

        try:
            response_data = await self.request(
                client=client,
                url=f"{self.valves.api_endpoint}/api/v1/outlet",
                headers={"Authorization": f"Bearer {self.valves.api_key}"},
                json={"user": __user__, "body": body},
            )

            # pylint: disable=C0209
            stats = " | ".join(
                [
                    f"Tokens: {response_data['inputTokens']} + {response_data['outputTokens']}",
                    "Cost: %.4f" % response_data["totalCost"],
                    "Balance: %.4f" % response_data["newBalance"],
                ]
            )

            await __event_emitter__(
                {"type": "status", "data": {"description": stats, "done": True}}
            )

            logger.info("usage_monitor: %s %s", user_id, stats)
            return body

        except Exception as err:
            logger.exception("[usage_monitor] error calculating usage: %s", err)
            raise Exception(f"error calculating usage, {err}") from err

        finally:
            await client.aclose()

3 个赞

强,支持下大佬

谢谢,可否提交 PR

好的佬,这就去提

感谢大佬!

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。