Skip to content

How to start the lifecycle using Minio SDK? #1015

@InfernalAzazel

Description

@InfernalAzazel

oss.py

from datetime import timedelta
import uuid
from fastapi import UploadFile
from minio import Minio
from minio.commonconfig import Filter
from minio.error import S3Error
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
from ..core.config import settings
import os

class OSS:
    def __init__(self):
        self.bucket_name = settings.oss_bucket_name
        self.lifecycle_days = settings.oss_lifecycle_days
        self.client = Minio(
            endpoint=settings.oss_endpoint,
            access_key=settings.oss_access_key,
            secret_key=settings.oss_secret_key,
            secure=settings.oss_secure,
            region=settings.oss_region,
        )

    def new_uuid(self):
        return str(uuid.uuid4())

    def create_bucket(self):
        found = self.client.bucket_exists(self.bucket_name)
        if not found:
            try:
                self.client.make_bucket(self.bucket_name)
                self.set_lifecycle_expiration(days=self.lifecycle_days)
                print(f"Bucket {self.bucket_name} created successfully")
            except S3Error as exc:
                if exc.code == "BucketAlreadyOwnedByYou":
                    print(f"Bucket {self.bucket_name} already owned by you; continuing")
                else:
                    raise
        else:
            print(f"Bucket {self.bucket_name} already exists")

    def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
        """
        设置按天自动过期删除(MinIO 按天、每天巡检一次;<1天不生效)
        """
        rule_filter = Filter(prefix=prefix or "")
        rule = Rule(
            rule_id=f"expire-{prefix or 'all'}-{days}d",
            status="Enabled",
            rule_filter=rule_filter,
            expiration=Expiration(days=int(days)),
        )
        cfg = LifecycleConfig([rule])
        self.client.set_bucket_lifecycle(self.bucket_name, cfg)

    def upload_file(self, file: UploadFile):
        """
        上传文件到OSS,返回文件的UUID
        """
        ext = os.path.splitext(file.filename)[1]
        uuid = self.new_uuid()
        filename = f'{uuid}{ext}'
        file.file.seek(0)
        self.client.put_object(
            self.bucket_name, 
            filename, 
            file.file, 
            length=-1, 
            part_size=10*1024*1024,
        )
        return filename

    def get_presigned_url(self, filename: str):
        """
        获取文件的预签名URL,用于下载文件
        """
        return self.client.presigned_get_object(
            self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
        )
        

def get_oss():
    """
    获取OSS实例
    """
    return OSS()

test.py

from app.core.oss import get_oss

def go():
    oss = get_oss()
    oss.create_bucket()

if __name__ == "__main__":
    go()
  • No expiration mechanism enabled
Image - Is it currently running normally? What does the expiration deletion mark mean? Image

Metadata

Metadata

Assignees

Labels

questionFurther information is requested

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions