"""S3 direct-upload helpers.""" from __future__ import annotations from pathlib import Path from uuid import uuid4 import boto3 from app.config.settings import get_settings from app.domain.enums import LibraryResourceType, WorkflowStepName RESOURCE_PREFIXES: dict[LibraryResourceType, str] = { LibraryResourceType.MODEL: "models", LibraryResourceType.SCENE: "scenes", LibraryResourceType.GARMENT: "garments", } class S3PresignService: """Generate presigned upload URLs and derived public URLs.""" def __init__(self) -> None: self.settings = get_settings() self._client = boto3.client( "s3", region_name=self.settings.s3_region or None, endpoint_url=self.settings.s3_endpoint or None, aws_access_key_id=self.settings.s3_access_key or None, aws_secret_access_key=self.settings.s3_secret_key or None, ) def create_upload(self, resource_type: LibraryResourceType, file_name: str, content_type: str) -> tuple[str, str]: """Return a storage key and presigned PUT URL for a resource file.""" storage_key = self._build_storage_key(resource_type, file_name) upload_url = self._client.generate_presigned_url( "put_object", Params={ "Bucket": self.settings.s3_bucket, "Key": storage_key, "ContentType": content_type, }, ExpiresIn=self.settings.s3_presign_expiry_seconds, HttpMethod="PUT", ) return storage_key, upload_url def get_public_url(self, storage_key: str) -> str: """Return the public CDN URL for an uploaded object.""" if self.settings.s3_cname: base = self.settings.s3_cname if not base.startswith("http://") and not base.startswith("https://"): base = f"https://{base}" return f"{base.rstrip('/')}/{storage_key}" endpoint = self.settings.s3_endpoint.rstrip("/") return f"{endpoint}/{self.settings.s3_bucket}/{storage_key}" def _build_storage_key(self, resource_type: LibraryResourceType, file_name: str) -> str: suffix = Path(file_name).suffix or ".bin" stem = Path(file_name).stem.replace(" ", "-").lower() or "file" return f"library/{RESOURCE_PREFIXES[resource_type]}/{uuid4().hex}-{stem}{suffix.lower()}" class S3ObjectStorageService: """Upload generated workflow artifacts to the configured object store.""" def __init__(self) -> None: self.settings = get_settings() self._client = boto3.client( "s3", region_name=self.settings.s3_region or None, endpoint_url=self.settings.s3_endpoint or None, aws_access_key_id=self.settings.s3_access_key or None, aws_secret_access_key=self.settings.s3_secret_key or None, ) self._presign = S3PresignService() async def upload_generated_image( self, *, order_id: int, step_name: WorkflowStepName, image_bytes: bytes, mime_type: str, ) -> tuple[str, str]: """Upload bytes and return the storage key plus public URL.""" storage_key = self._build_storage_key(order_id=order_id, step_name=step_name, mime_type=mime_type) self._client.put_object( Bucket=self.settings.s3_bucket, Key=storage_key, Body=image_bytes, ContentType=mime_type, ) return storage_key, self._presign.get_public_url(storage_key) @staticmethod def _build_storage_key(*, order_id: int, step_name: WorkflowStepName, mime_type: str) -> str: suffix = { "image/png": ".png", "image/jpeg": ".jpg", "image/webp": ".webp", }.get(mime_type, ".bin") return f"orders/{order_id}/{step_name.value}/{uuid4().hex}{suffix}"