Files

108 lines
3.8 KiB
Python

"""S3 direct-upload helpers."""
from __future__ import annotations
from pathlib import Path
from uuid import uuid4
import boto3
from app.config.settings import get_settings
from app.domain.enums import LibraryResourceType, WorkflowStepName
RESOURCE_PREFIXES: dict[LibraryResourceType, str] = {
LibraryResourceType.MODEL: "models",
LibraryResourceType.SCENE: "scenes",
LibraryResourceType.GARMENT: "garments",
}
class S3PresignService:
"""Generate presigned upload URLs and derived public URLs."""
def __init__(self) -> None:
self.settings = get_settings()
self._client = boto3.client(
"s3",
region_name=self.settings.s3_region or None,
endpoint_url=self.settings.s3_endpoint or None,
aws_access_key_id=self.settings.s3_access_key or None,
aws_secret_access_key=self.settings.s3_secret_key or None,
)
def create_upload(self, resource_type: LibraryResourceType, file_name: str, content_type: str) -> tuple[str, str]:
"""Return a storage key and presigned PUT URL for a resource file."""
storage_key = self._build_storage_key(resource_type, file_name)
upload_url = self._client.generate_presigned_url(
"put_object",
Params={
"Bucket": self.settings.s3_bucket,
"Key": storage_key,
"ContentType": content_type,
},
ExpiresIn=self.settings.s3_presign_expiry_seconds,
HttpMethod="PUT",
)
return storage_key, upload_url
def get_public_url(self, storage_key: str) -> str:
"""Return the public CDN URL for an uploaded object."""
if self.settings.s3_cname:
base = self.settings.s3_cname
if not base.startswith("http://") and not base.startswith("https://"):
base = f"https://{base}"
return f"{base.rstrip('/')}/{storage_key}"
endpoint = self.settings.s3_endpoint.rstrip("/")
return f"{endpoint}/{self.settings.s3_bucket}/{storage_key}"
def _build_storage_key(self, resource_type: LibraryResourceType, file_name: str) -> str:
suffix = Path(file_name).suffix or ".bin"
stem = Path(file_name).stem.replace(" ", "-").lower() or "file"
return f"library/{RESOURCE_PREFIXES[resource_type]}/{uuid4().hex}-{stem}{suffix.lower()}"
class S3ObjectStorageService:
"""Upload generated workflow artifacts to the configured object store."""
def __init__(self) -> None:
self.settings = get_settings()
self._client = boto3.client(
"s3",
region_name=self.settings.s3_region or None,
endpoint_url=self.settings.s3_endpoint or None,
aws_access_key_id=self.settings.s3_access_key or None,
aws_secret_access_key=self.settings.s3_secret_key or None,
)
self._presign = S3PresignService()
async def upload_generated_image(
self,
*,
order_id: int,
step_name: WorkflowStepName,
image_bytes: bytes,
mime_type: str,
) -> tuple[str, str]:
"""Upload bytes and return the storage key plus public URL."""
storage_key = self._build_storage_key(order_id=order_id, step_name=step_name, mime_type=mime_type)
self._client.put_object(
Bucket=self.settings.s3_bucket,
Key=storage_key,
Body=image_bytes,
ContentType=mime_type,
)
return storage_key, self._presign.get_public_url(storage_key)
@staticmethod
def _build_storage_key(*, order_id: int, step_name: WorkflowStepName, mime_type: str) -> str:
suffix = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/webp": ".webp",
}.get(mime_type, ".bin")
return f"orders/{order_id}/{step_name.value}/{uuid4().hex}{suffix}"