-
Notifications
You must be signed in to change notification settings - Fork 764
Open
Labels
questionFurther information is requestedFurther information is requested
Description
oss.py
from datetime import timedelta
import uuid
from fastapi import UploadFile
from minio import Minio
from minio.commonconfig import Filter
from minio.error import S3Error
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
from ..core.config import settings
import os
class OSS:
def __init__(self):
self.bucket_name = settings.oss_bucket_name
self.lifecycle_days = settings.oss_lifecycle_days
self.client = Minio(
endpoint=settings.oss_endpoint,
access_key=settings.oss_access_key,
secret_key=settings.oss_secret_key,
secure=settings.oss_secure,
region=settings.oss_region,
)
def new_uuid(self):
return str(uuid.uuid4())
def create_bucket(self):
found = self.client.bucket_exists(self.bucket_name)
if not found:
try:
self.client.make_bucket(self.bucket_name)
self.set_lifecycle_expiration(days=self.lifecycle_days)
print(f"Bucket {self.bucket_name} created successfully")
except S3Error as exc:
if exc.code == "BucketAlreadyOwnedByYou":
print(f"Bucket {self.bucket_name} already owned by you; continuing")
else:
raise
else:
print(f"Bucket {self.bucket_name} already exists")
def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
"""
设置按天自动过期删除(MinIO 按天、每天巡检一次;<1天不生效)
"""
rule_filter = Filter(prefix=prefix or "")
rule = Rule(
rule_id=f"expire-{prefix or 'all'}-{days}d",
status="Enabled",
rule_filter=rule_filter,
expiration=Expiration(days=int(days)),
)
cfg = LifecycleConfig([rule])
self.client.set_bucket_lifecycle(self.bucket_name, cfg)
def upload_file(self, file: UploadFile):
"""
上传文件到OSS,返回文件的UUID
"""
ext = os.path.splitext(file.filename)[1]
uuid = self.new_uuid()
filename = f'{uuid}{ext}'
file.file.seek(0)
self.client.put_object(
self.bucket_name,
filename,
file.file,
length=-1,
part_size=10*1024*1024,
)
return filename
def get_presigned_url(self, filename: str):
"""
获取文件的预签名URL,用于下载文件
"""
return self.client.presigned_get_object(
self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
)
def get_oss():
"""
获取OSS实例
"""
return OSS()test.py
from app.core.oss import get_oss
def go():
oss = get_oss()
oss.create_bucket()
if __name__ == "__main__":
go()- No expiration mechanism enabled
- Is it currently running normally? What does the expiration deletion mark mean?

Metadata
Metadata
Assignees
Labels
questionFurther information is requestedFurther information is requested