commit cc03da8a945094d4189eaadf4ac54ce4e11aec92 Author: Codex Date: Fri Mar 27 00:10:28 2026 +0800 Implement FastAPI Temporal MVP pipeline diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..7d3b879 --- /dev/null +++ b/.env.example @@ -0,0 +1,7 @@ +APP_NAME=temporal-image-pipeline +API_PREFIX=/api/v1 +DEBUG=false +AUTO_CREATE_TABLES=true +DATABASE_URL=sqlite+aiosqlite:///./temporal_demo.db +TEMPORAL_ADDRESS=localhost:7233 +TEMPORAL_NAMESPACE=default diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..97c6293 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.env +.venv/ +__pycache__/ +.pytest_cache/ +*.pyc +*.pyo +*.pyd +*.db +*.sqlite3 +*.egg-info/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..4a3d266 --- /dev/null +++ b/README.md @@ -0,0 +1,169 @@ +# FastAPI + Temporal MVP 图片流水线 + +这是一个最小可运行的图片生产流水线 MVP,使用 `FastAPI + Temporal + SQLite + SQLAlchemy` 实现,当前所有图像处理步骤都为 mock。 + +## 功能范围 + +- 低端全自动流程 `auto_basic` +- 中端半自动流程 `semi_pro` +- 创建订单、查询订单、查询资产、提交审核、查询 workflow 状态接口 +- 中端流程支持 `review signal` +- 中端流程支持 `rerun_face / rerun_fusion / rerun_scene` +- 提供 `.env.example`、Alembic 初始迁移、基础测试 + +## 环境要求 + +- Python 3.11+ +- Temporal CLI 或可用的 Temporal Server + +## 安装依赖 + +```bash +python -m venv .venv +.venv\Scripts\activate +python -m pip install -U pip +python -m pip install -e . +``` + +## 环境变量 + +复制示例环境变量: + +```bash +copy .env.example .env +``` + +默认数据库: + +- `sqlite+aiosqlite:///./temporal_demo.db` + +## 启动 Temporal Server + +```bash +temporal server start-dev +``` + +## 初始化数据库 + +推荐先执行 Alembic 迁移: + +```bash +alembic upgrade head +``` + +如果没有先迁移,API 启动时也会在 `AUTO_CREATE_TABLES=true` 下自动建表,方便本地 MVP 调试。 + +## 启动 FastAPI + +```bash +uvicorn app.main:app --reload +``` + +健康检查: + +```bash +curl http://127.0.0.1:8000/healthz +``` + +## 启动 Worker + +```bash +python -m app.workers.runner +``` + +## API 调用示例 + +### 创建低端订单 + +```bash +curl -X POST http://127.0.0.1:8000/api/v1/orders ^ + -H "Content-Type: application/json" ^ + -d "{\"customer_level\":\"low\",\"service_mode\":\"auto_basic\",\"model_id\":101,\"pose_id\":3,\"garment_asset_id\":9001,\"scene_ref_asset_id\":8001}" +``` + +### 创建中端订单 + +```bash +curl -X POST http://127.0.0.1:8000/api/v1/orders ^ + -H "Content-Type: application/json" ^ + -d "{\"customer_level\":\"mid\",\"service_mode\":\"semi_pro\",\"model_id\":101,\"pose_id\":3,\"garment_asset_id\":9001,\"scene_ref_asset_id\":8001}" +``` + +### 查询订单详情 + +```bash +curl http://127.0.0.1:8000/api/v1/orders/1 +``` + +### 查询订单资产 + +```bash +curl http://127.0.0.1:8000/api/v1/orders/1/assets +``` + +### 查询待审核列表 + +```bash +curl http://127.0.0.1:8000/api/v1/reviews/pending +``` + +### 提交审核通过 + +```bash +curl -X POST http://127.0.0.1:8000/api/v1/reviews/1/submit ^ + -H "Content-Type: application/json" ^ + -d "{\"decision\":\"approve\",\"reviewer_id\":77,\"comment\":\"通过\"}" +``` + +### 提交 rerun_face + +```bash +curl -X POST http://127.0.0.1:8000/api/v1/reviews/1/submit ^ + -H "Content-Type: application/json" ^ + -d "{\"decision\":\"rerun_face\",\"reviewer_id\":77,\"comment\":\"面部不自然,重跑 face\"}" +``` + +### 查询 workflow 状态 + +```bash +curl http://127.0.0.1:8000/api/v1/workflows/1 +``` + +## 流程说明 + +### 低端 `auto_basic` + +1. `prepare_model` +2. `tryon` +3. `scene` +4. `qc` +5. `export` + +### 中端 `semi_pro` + +1. `prepare_model` +2. `tryon` +3. `scene` +4. `texture` +5. `face` +6. `fusion` +7. `qc` +8. 进入 `waiting_review` +9. `approve` 后进入 `export` +10. `rerun_scene` 回到 `scene` +11. `rerun_face` 回到 `face` +12. `rerun_fusion` 回到 `fusion` + +## 测试 + +```bash +pytest +``` + +覆盖范围: + +- 健康检查 +- 低端流程跑通 +- 中端流程进入 `waiting_review` +- 中端流程审核通过 +- 中端流程 rerun 分支回流 diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..59ad7ce --- /dev/null +++ b/alembic.ini @@ -0,0 +1,37 @@ +[alembic] +script_location = alembic +prepend_sys_path = . +sqlalchemy.url = sqlite:///./temporal_demo.db + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = console +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s + diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..ebc97c4 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,61 @@ +"""Alembic environment configuration.""" + +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import engine_from_config, pool + +from app.config.settings import get_settings +from app.infra.db.base import Base +from app.infra.db.models.asset import AssetORM +from app.infra.db.models.order import OrderORM +from app.infra.db.models.review_task import ReviewTaskORM +from app.infra.db.models.workflow_run import WorkflowRunORM +from app.infra.db.models.workflow_step import WorkflowStepORM + +del AssetORM, OrderORM, ReviewTaskORM, WorkflowRunORM, WorkflowStepORM + +config = context.config + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +config.set_main_option("sqlalchemy.url", get_settings().sync_database_url) +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + """Run migrations in offline mode.""" + + context.configure( + url=config.get_main_option("sqlalchemy.url"), + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in online mode.""" + + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() + diff --git a/alembic/versions/20260326_0001_initial.py b/alembic/versions/20260326_0001_initial.py new file mode 100644 index 0000000..5b85595 --- /dev/null +++ b/alembic/versions/20260326_0001_initial.py @@ -0,0 +1,214 @@ +"""initial schema + +Revision ID: 20260326_0001 +Revises: +Create Date: 2026-03-26 23:59:00.000000 +""" + +from collections.abc import Sequence + +from alembic import op +import sqlalchemy as sa + + +revision: str = "20260326_0001" +down_revision: str | None = None +branch_labels: Sequence[str] | None = None +depends_on: Sequence[str] | None = None + + +def upgrade() -> None: + """Create the initial application tables.""" + + op.create_table( + "orders", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("customer_level", sa.Enum("low", "mid", name="customerlevel", native_enum=False), nullable=False), + sa.Column("service_mode", sa.Enum("auto_basic", "semi_pro", name="servicemode", native_enum=False), nullable=False), + sa.Column( + "status", + sa.Enum( + "created", + "running", + "waiting_review", + "succeeded", + "failed", + "cancelled", + name="orderstatus", + native_enum=False, + ), + nullable=False, + ), + sa.Column("model_id", sa.Integer(), nullable=False), + sa.Column("pose_id", sa.Integer(), nullable=False), + sa.Column("garment_asset_id", sa.Integer(), nullable=False), + sa.Column("scene_ref_asset_id", sa.Integer(), nullable=False), + sa.Column("final_asset_id", sa.Integer(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "assets", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("order_id", sa.Integer(), sa.ForeignKey("orders.id"), nullable=False), + sa.Column( + "asset_type", + sa.Enum( + "prepared_model", + "tryon", + "scene", + "texture", + "face", + "fusion", + "qc_candidate", + "final", + name="assettype", + native_enum=False, + ), + nullable=False, + ), + sa.Column( + "step_name", + sa.Enum( + "prepare_model", + "tryon", + "scene", + "texture", + "face", + "fusion", + "qc", + "export", + "review", + name="workflowstepname", + native_enum=False, + ), + nullable=True, + ), + sa.Column("uri", sa.String(length=500), nullable=False), + sa.Column("metadata_json", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index("ix_assets_order_id", "assets", ["order_id"]) + + op.create_table( + "review_tasks", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("order_id", sa.Integer(), sa.ForeignKey("orders.id"), nullable=False), + sa.Column( + "status", + sa.Enum("pending", "submitted", name="reviewtaskstatus", native_enum=False), + nullable=False, + ), + sa.Column( + "decision", + sa.Enum( + "approve", + "rerun_scene", + "rerun_face", + "rerun_fusion", + "reject", + name="reviewdecision", + native_enum=False, + ), + nullable=True, + ), + sa.Column("reviewer_id", sa.Integer(), nullable=True), + sa.Column("selected_asset_id", sa.Integer(), nullable=True), + sa.Column("comment", sa.Text(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index("ix_review_tasks_order_id", "review_tasks", ["order_id"]) + + op.create_table( + "workflow_runs", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("order_id", sa.Integer(), sa.ForeignKey("orders.id"), nullable=False), + sa.Column("workflow_id", sa.String(length=255), nullable=False), + sa.Column("workflow_type", sa.String(length=255), nullable=False), + sa.Column( + "status", + sa.Enum( + "created", + "running", + "waiting_review", + "succeeded", + "failed", + "cancelled", + name="orderstatus", + native_enum=False, + ), + nullable=False, + ), + sa.Column( + "current_step", + sa.Enum( + "prepare_model", + "tryon", + "scene", + "texture", + "face", + "fusion", + "qc", + "export", + "review", + name="workflowstepname", + native_enum=False, + ), + nullable=True, + ), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.UniqueConstraint("workflow_id"), + ) + op.create_index("ix_workflow_runs_order_id", "workflow_runs", ["order_id"]) + + op.create_table( + "workflow_steps", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("workflow_run_id", sa.Integer(), sa.ForeignKey("workflow_runs.id"), nullable=False), + sa.Column( + "step_name", + sa.Enum( + "prepare_model", + "tryon", + "scene", + "texture", + "face", + "fusion", + "qc", + "export", + "review", + name="workflowstepname", + native_enum=False, + ), + nullable=False, + ), + sa.Column( + "step_status", + sa.Enum("pending", "running", "waiting", "succeeded", "failed", name="stepstatus", native_enum=False), + nullable=False, + ), + sa.Column("input_json", sa.JSON(), nullable=True), + sa.Column("output_json", sa.JSON(), nullable=True), + sa.Column("error_message", sa.Text(), nullable=True), + sa.Column("started_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("ended_at", sa.DateTime(timezone=True), nullable=True), + ) + op.create_index("ix_workflow_steps_workflow_run_id", "workflow_steps", ["workflow_run_id"]) + + +def downgrade() -> None: + """Drop the application tables.""" + + op.drop_index("ix_workflow_steps_workflow_run_id", table_name="workflow_steps") + op.drop_table("workflow_steps") + op.drop_index("ix_workflow_runs_order_id", table_name="workflow_runs") + op.drop_table("workflow_runs") + op.drop_index("ix_review_tasks_order_id", table_name="review_tasks") + op.drop_table("review_tasks") + op.drop_index("ix_assets_order_id", table_name="assets") + op.drop_table("assets") + op.drop_table("orders") diff --git a/app/api/routers/assets.py b/app/api/routers/assets.py new file mode 100644 index 0000000..57e3508 --- /dev/null +++ b/app/api/routers/assets.py @@ -0,0 +1,22 @@ +"""Asset routes.""" + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.asset import AssetRead +from app.application.services.asset_service import AssetService +from app.infra.db.session import get_db_session + +router = APIRouter(prefix="/orders", tags=["assets"]) +asset_service = AssetService() + + +@router.get("/{order_id}/assets", response_model=list[AssetRead]) +async def list_order_assets( + order_id: int, + session: AsyncSession = Depends(get_db_session), +) -> list[AssetRead]: + """List assets generated for an order.""" + + return await asset_service.list_order_assets(session, order_id) + diff --git a/app/api/routers/health.py b/app/api/routers/health.py new file mode 100644 index 0000000..a88643d --- /dev/null +++ b/app/api/routers/health.py @@ -0,0 +1,13 @@ +"""Health check routes.""" + +from fastapi import APIRouter + +router = APIRouter(tags=["health"]) + + +@router.get("/healthz") +async def healthcheck() -> dict[str, str]: + """Return a simple health check response.""" + + return {"status": "ok"} + diff --git a/app/api/routers/orders.py b/app/api/routers/orders.py new file mode 100644 index 0000000..0b5b5d7 --- /dev/null +++ b/app/api/routers/orders.py @@ -0,0 +1,32 @@ +"""Order routes.""" + +from fastapi import APIRouter, Depends, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.order import CreateOrderRequest, CreateOrderResponse, OrderDetailResponse +from app.application.services.order_service import OrderService +from app.infra.db.session import get_db_session + +router = APIRouter(prefix="/orders", tags=["orders"]) +order_service = OrderService() + + +@router.post("", response_model=CreateOrderResponse, status_code=status.HTTP_201_CREATED) +async def create_order( + payload: CreateOrderRequest, + session: AsyncSession = Depends(get_db_session), +) -> CreateOrderResponse: + """Create a new image pipeline order.""" + + return await order_service.create_order(session, payload) + + +@router.get("/{order_id}", response_model=OrderDetailResponse) +async def get_order( + order_id: int, + session: AsyncSession = Depends(get_db_session), +) -> OrderDetailResponse: + """Fetch order details.""" + + return await order_service.get_order(session, order_id) + diff --git a/app/api/routers/reviews.py b/app/api/routers/reviews.py new file mode 100644 index 0000000..1208697 --- /dev/null +++ b/app/api/routers/reviews.py @@ -0,0 +1,32 @@ +"""Review routes.""" + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.review import PendingReviewResponse, SubmitReviewRequest, SubmitReviewResponse +from app.application.services.review_service import ReviewService +from app.infra.db.session import get_db_session + +router = APIRouter(prefix="/reviews", tags=["reviews"]) +review_service = ReviewService() + + +@router.get("/pending", response_model=list[PendingReviewResponse]) +async def list_pending_reviews( + session: AsyncSession = Depends(get_db_session), +) -> list[PendingReviewResponse]: + """List review tasks waiting for manual input.""" + + return await review_service.list_pending_reviews(session) + + +@router.post("/{order_id}/submit", response_model=SubmitReviewResponse) +async def submit_review( + order_id: int, + payload: SubmitReviewRequest, + session: AsyncSession = Depends(get_db_session), +) -> SubmitReviewResponse: + """Submit a review decision for a workflow.""" + + return await review_service.submit_review(session, order_id, payload) + diff --git a/app/api/routers/workflows.py b/app/api/routers/workflows.py new file mode 100644 index 0000000..5d8c1aa --- /dev/null +++ b/app/api/routers/workflows.py @@ -0,0 +1,22 @@ +"""Workflow routes.""" + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.workflow import WorkflowStatusResponse +from app.application.services.workflow_service import WorkflowService +from app.infra.db.session import get_db_session + +router = APIRouter(prefix="/workflows", tags=["workflows"]) +workflow_service = WorkflowService() + + +@router.get("/{order_id}", response_model=WorkflowStatusResponse) +async def get_workflow_status( + order_id: int, + session: AsyncSession = Depends(get_db_session), +) -> WorkflowStatusResponse: + """Fetch persisted workflow status for an order.""" + + return await workflow_service.get_workflow_status(session, order_id) + diff --git a/app/api/schemas/asset.py b/app/api/schemas/asset.py new file mode 100644 index 0000000..1ffa77b --- /dev/null +++ b/app/api/schemas/asset.py @@ -0,0 +1,23 @@ +"""Asset API schemas.""" + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict + +from app.domain.enums import AssetType, WorkflowStepName + + +class AssetRead(BaseModel): + """Serialized asset response.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + order_id: int + asset_type: AssetType + step_name: WorkflowStepName | None + uri: str + metadata_json: dict[str, Any] | None + created_at: datetime + diff --git a/app/api/schemas/order.py b/app/api/schemas/order.py new file mode 100644 index 0000000..f03943a --- /dev/null +++ b/app/api/schemas/order.py @@ -0,0 +1,47 @@ +"""Order API schemas.""" + +from datetime import datetime + +from pydantic import BaseModel + +from app.api.schemas.asset import AssetRead +from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode, WorkflowStepName + + +class CreateOrderRequest(BaseModel): + """Request payload for creating an order.""" + + customer_level: CustomerLevel + service_mode: ServiceMode + model_id: int + pose_id: int + garment_asset_id: int + scene_ref_asset_id: int + + +class CreateOrderResponse(BaseModel): + """Response returned after an order has been created.""" + + order_id: int + workflow_id: str + status: OrderStatus + + +class OrderDetailResponse(BaseModel): + """Order detail response.""" + + order_id: int + customer_level: CustomerLevel + service_mode: ServiceMode + status: OrderStatus + model_id: int + pose_id: int + garment_asset_id: int + scene_ref_asset_id: int + final_asset_id: int | None + workflow_id: str | None + current_step: WorkflowStepName | None + final_asset: AssetRead | None + created_at: datetime + updated_at: datetime + diff --git a/app/api/schemas/review.py b/app/api/schemas/review.py new file mode 100644 index 0000000..0f9ece6 --- /dev/null +++ b/app/api/schemas/review.py @@ -0,0 +1,36 @@ +"""Review API schemas.""" + +from datetime import datetime + +from pydantic import BaseModel + +from app.domain.enums import ReviewDecision, WorkflowStepName + + +class SubmitReviewRequest(BaseModel): + """Request payload for review submission.""" + + decision: ReviewDecision + reviewer_id: int + selected_asset_id: int | None = None + comment: str | None = None + + +class SubmitReviewResponse(BaseModel): + """Response returned after a review signal is sent.""" + + order_id: int + workflow_id: str + decision: ReviewDecision + status: str + + +class PendingReviewResponse(BaseModel): + """Response model for pending review items.""" + + review_task_id: int + order_id: int + workflow_id: str + current_step: WorkflowStepName | None + created_at: datetime + diff --git a/app/api/schemas/workflow.py b/app/api/schemas/workflow.py new file mode 100644 index 0000000..43ab140 --- /dev/null +++ b/app/api/schemas/workflow.py @@ -0,0 +1,38 @@ +"""Workflow API schemas.""" + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict + +from app.domain.enums import OrderStatus, StepStatus, WorkflowStepName + + +class WorkflowStepRead(BaseModel): + """Serialized workflow step record.""" + + model_config = ConfigDict(from_attributes=True) + + id: int + workflow_run_id: int + step_name: WorkflowStepName + step_status: StepStatus + input_json: dict[str, Any] | None + output_json: dict[str, Any] | None + error_message: str | None + started_at: datetime + ended_at: datetime | None + + +class WorkflowStatusResponse(BaseModel): + """Serialized workflow run details.""" + + order_id: int + workflow_id: str + workflow_type: str + workflow_status: OrderStatus + current_step: WorkflowStepName | None + steps: list[WorkflowStepRead] + created_at: datetime + updated_at: datetime + diff --git a/app/application/services/asset_service.py b/app/application/services/asset_service.py new file mode 100644 index 0000000..75d5124 --- /dev/null +++ b/app/application/services/asset_service.py @@ -0,0 +1,26 @@ +"""Asset application service.""" + +from fastapi import HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.asset import AssetRead +from app.infra.db.models.asset import AssetORM +from app.infra.db.models.order import OrderORM + + +class AssetService: + """Application service for asset queries.""" + + async def list_order_assets(self, session: AsyncSession, order_id: int) -> list[AssetRead]: + """Return all assets belonging to an order.""" + + order = await session.get(OrderORM, order_id) + if order is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found") + + result = await session.execute( + select(AssetORM).where(AssetORM.order_id == order_id).order_by(AssetORM.created_at.asc()) + ) + return [AssetRead.model_validate(asset) for asset in result.scalars().all()] + diff --git a/app/application/services/order_service.py b/app/application/services/order_service.py new file mode 100644 index 0000000..42ad39d --- /dev/null +++ b/app/application/services/order_service.py @@ -0,0 +1,122 @@ +"""Order application service.""" + +from fastapi import HTTPException, status +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.api.schemas.asset import AssetRead +from app.api.schemas.order import CreateOrderRequest, CreateOrderResponse, OrderDetailResponse +from app.application.services.workflow_service import WorkflowService +from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode +from app.infra.db.models.order import OrderORM +from app.infra.db.models.workflow_run import WorkflowRunORM +from app.workers.workflows.types import PipelineWorkflowInput + + +class OrderService: + """Application service for order management.""" + + def __init__(self) -> None: + self.workflow_service = WorkflowService() + + async def create_order(self, session, payload: CreateOrderRequest) -> CreateOrderResponse: + """Create an order, persist a workflow run, and start Temporal execution.""" + + self._validate_mode(payload.customer_level, payload.service_mode) + + order = OrderORM( + customer_level=payload.customer_level, + service_mode=payload.service_mode, + status=OrderStatus.CREATED, + model_id=payload.model_id, + pose_id=payload.pose_id, + garment_asset_id=payload.garment_asset_id, + scene_ref_asset_id=payload.scene_ref_asset_id, + ) + session.add(order) + await session.flush() + + workflow_id = f"order-{order.id}" + workflow_run = WorkflowRunORM( + order_id=order.id, + workflow_id=workflow_id, + workflow_type=self.workflow_service.workflow_type_for_mode(payload.service_mode), + status=OrderStatus.CREATED, + ) + session.add(workflow_run) + await session.commit() + + workflow_input = PipelineWorkflowInput( + order_id=order.id, + workflow_run_id=workflow_run.id, + customer_level=order.customer_level, + service_mode=order.service_mode, + model_id=order.model_id, + pose_id=order.pose_id, + garment_asset_id=order.garment_asset_id, + scene_ref_asset_id=order.scene_ref_asset_id, + ) + + try: + await self.workflow_service.start_workflow(workflow_input) + except Exception as exc: + order.status = OrderStatus.FAILED + workflow_run.status = OrderStatus.FAILED + await session.commit() + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Failed to start Temporal workflow: {exc}", + ) from exc + + return CreateOrderResponse(order_id=order.id, workflow_id=workflow_id, status=order.status) + + async def get_order(self, session, order_id: int) -> OrderDetailResponse: + """Return a single order with workflow context and final asset.""" + + result = await session.execute( + select(OrderORM) + .where(OrderORM.id == order_id) + .options( + selectinload(OrderORM.assets), + selectinload(OrderORM.workflow_runs), + ) + ) + order = result.scalar_one_or_none() + if order is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found") + + workflow_run = order.workflow_runs[0] if order.workflow_runs else None + final_asset = next((asset for asset in order.assets if asset.id == order.final_asset_id), None) + + return OrderDetailResponse( + order_id=order.id, + customer_level=order.customer_level, + service_mode=order.service_mode, + status=order.status, + model_id=order.model_id, + pose_id=order.pose_id, + garment_asset_id=order.garment_asset_id, + scene_ref_asset_id=order.scene_ref_asset_id, + final_asset_id=order.final_asset_id, + workflow_id=workflow_run.workflow_id if workflow_run else None, + current_step=workflow_run.current_step if workflow_run else None, + final_asset=AssetRead.model_validate(final_asset) if final_asset else None, + created_at=order.created_at, + updated_at=order.updated_at, + ) + + @staticmethod + def _validate_mode(customer_level: CustomerLevel, service_mode: ServiceMode) -> None: + """Validate the allowed customer-level and service-mode combinations.""" + + if customer_level == CustomerLevel.LOW and service_mode != ServiceMode.AUTO_BASIC: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Low-level customers only support auto_basic", + ) + if customer_level == CustomerLevel.MID and service_mode != ServiceMode.SEMI_PRO: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Mid-level customers only support semi_pro", + ) + diff --git a/app/application/services/review_service.py b/app/application/services/review_service.py new file mode 100644 index 0000000..01cc846 --- /dev/null +++ b/app/application/services/review_service.py @@ -0,0 +1,112 @@ +"""Review application service.""" + +from fastapi import HTTPException, status +from sqlalchemy import select + +from app.api.schemas.review import PendingReviewResponse, SubmitReviewRequest, SubmitReviewResponse +from app.application.services.workflow_service import WorkflowService +from app.domain.enums import OrderStatus, ReviewTaskStatus +from app.infra.db.models.asset import AssetORM +from app.infra.db.models.order import OrderORM +from app.infra.db.models.review_task import ReviewTaskORM +from app.infra.db.models.workflow_run import WorkflowRunORM +from app.workers.workflows.types import ReviewSignalPayload + + +class ReviewService: + """Application service for review flows.""" + + def __init__(self) -> None: + self.workflow_service = WorkflowService() + + async def list_pending_reviews(self, session) -> list[PendingReviewResponse]: + """Return all pending review tasks.""" + + result = await session.execute( + select(ReviewTaskORM, WorkflowRunORM) + .join(WorkflowRunORM, WorkflowRunORM.order_id == ReviewTaskORM.order_id) + .where(ReviewTaskORM.status == ReviewTaskStatus.PENDING) + .order_by(ReviewTaskORM.created_at.asc()) + ) + + return [ + PendingReviewResponse( + review_task_id=review_task.id, + order_id=review_task.order_id, + workflow_id=workflow_run.workflow_id, + current_step=workflow_run.current_step, + created_at=review_task.created_at, + ) + for review_task, workflow_run in result.all() + ] + + async def submit_review(self, session, order_id: int, payload: SubmitReviewRequest) -> SubmitReviewResponse: + """Persist the review submission and signal the Temporal workflow.""" + + order = await session.get(OrderORM, order_id) + if order is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found") + if order.status != OrderStatus.WAITING_REVIEW: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Order is not waiting for review", + ) + + workflow_result = await session.execute( + select(WorkflowRunORM).where(WorkflowRunORM.order_id == order_id) + ) + workflow_run = workflow_result.scalar_one_or_none() + if workflow_run is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found") + + if payload.selected_asset_id is not None: + asset = await session.get(AssetORM, payload.selected_asset_id) + if asset is None or asset.order_id != order_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Selected asset does not belong to the order", + ) + + pending_result = await session.execute( + select(ReviewTaskORM) + .where( + ReviewTaskORM.order_id == order_id, + ReviewTaskORM.status == ReviewTaskStatus.PENDING, + ) + .order_by(ReviewTaskORM.created_at.desc()) + ) + review_task = pending_result.scalars().first() + if review_task is None: + review_task = ReviewTaskORM(order_id=order_id, status=ReviewTaskStatus.SUBMITTED) + session.add(review_task) + + review_task.status = ReviewTaskStatus.SUBMITTED + review_task.decision = payload.decision + review_task.reviewer_id = payload.reviewer_id + review_task.selected_asset_id = payload.selected_asset_id + review_task.comment = payload.comment + await session.commit() + + try: + await self.workflow_service.signal_review( + workflow_run.workflow_id, + ReviewSignalPayload( + decision=payload.decision, + reviewer_id=payload.reviewer_id, + selected_asset_id=payload.selected_asset_id, + comment=payload.comment, + ), + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Failed to signal Temporal workflow: {exc}", + ) from exc + + return SubmitReviewResponse( + order_id=order_id, + workflow_id=workflow_run.workflow_id, + decision=payload.decision, + status="submitted", + ) + diff --git a/app/application/services/workflow_service.py b/app/application/services/workflow_service.py new file mode 100644 index 0000000..cc767c7 --- /dev/null +++ b/app/application/services/workflow_service.py @@ -0,0 +1,77 @@ +"""Temporal workflow application service.""" + +from datetime import timedelta + +from fastapi import HTTPException, status +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.api.schemas.workflow import WorkflowStatusResponse, WorkflowStepRead +from app.domain.enums import ServiceMode +from app.infra.db.models.workflow_run import WorkflowRunORM +from app.infra.temporal.client import get_temporal_client +from app.infra.temporal.task_queues import IMAGE_PIPELINE_CONTROL_TASK_QUEUE +from app.workers.workflows.low_end_pipeline import LowEndPipelineWorkflow +from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow +from app.workers.workflows.types import PipelineWorkflowInput, ReviewSignalPayload + + +class WorkflowService: + """Application service for Temporal workflow orchestration.""" + + @staticmethod + def workflow_type_for_mode(service_mode: ServiceMode) -> str: + """Return the workflow class name for a service mode.""" + + if service_mode == ServiceMode.AUTO_BASIC: + return LowEndPipelineWorkflow.__name__ + return MidEndPipelineWorkflow.__name__ + + async def start_workflow(self, workflow_input: PipelineWorkflowInput) -> None: + """Start the appropriate Temporal workflow for an order.""" + + client = await get_temporal_client() + workflow_id = f"order-{workflow_input.order_id}" + workflow_callable = ( + LowEndPipelineWorkflow.run + if workflow_input.service_mode == ServiceMode.AUTO_BASIC + else MidEndPipelineWorkflow.run + ) + await client.start_workflow( + workflow_callable, + workflow_input, + id=workflow_id, + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + run_timeout=timedelta(minutes=30), + task_timeout=timedelta(seconds=30), + ) + + async def signal_review(self, workflow_id: str, payload: ReviewSignalPayload) -> None: + """Send a review signal to a running Temporal workflow.""" + + client = await get_temporal_client() + handle = client.get_workflow_handle(workflow_id=workflow_id) + await handle.signal("submit_review", payload) + + async def get_workflow_status(self, session, order_id: int) -> WorkflowStatusResponse: + """Return persisted workflow execution state for an order.""" + + result = await session.execute( + select(WorkflowRunORM) + .where(WorkflowRunORM.order_id == order_id) + .options(selectinload(WorkflowRunORM.steps)) + ) + workflow_run = result.scalar_one_or_none() + if workflow_run is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found") + + return WorkflowStatusResponse( + order_id=workflow_run.order_id, + workflow_id=workflow_run.workflow_id, + workflow_type=workflow_run.workflow_type, + workflow_status=workflow_run.status, + current_step=workflow_run.current_step, + steps=[WorkflowStepRead.model_validate(step) for step in workflow_run.steps], + created_at=workflow_run.created_at, + updated_at=workflow_run.updated_at, + ) diff --git a/app/config/settings.py b/app/config/settings.py new file mode 100644 index 0000000..fa301db --- /dev/null +++ b/app/config/settings.py @@ -0,0 +1,37 @@ +"""Application settings.""" + +from functools import lru_cache + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """Runtime settings loaded from environment variables.""" + + app_name: str = "temporal-image-pipeline" + api_prefix: str = "/api/v1" + debug: bool = False + auto_create_tables: bool = True + database_url: str = "sqlite+aiosqlite:///./temporal_demo.db" + temporal_address: str = "localhost:7233" + temporal_namespace: str = "default" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + ) + + @property + def sync_database_url(self) -> str: + """Return a synchronous SQLAlchemy URL for migrations.""" + + return self.database_url.replace("+aiosqlite", "") + + +@lru_cache(maxsize=1) +def get_settings() -> Settings: + """Return the cached application settings.""" + + return Settings() + diff --git a/app/domain/enums.py b/app/domain/enums.py new file mode 100644 index 0000000..c8569a2 --- /dev/null +++ b/app/domain/enums.py @@ -0,0 +1,83 @@ +"""Domain enums shared across the application.""" + +from enum import Enum + + +class CustomerLevel(str, Enum): + """Supported customer tiers.""" + + LOW = "low" + MID = "mid" + + +class ServiceMode(str, Enum): + """Supported service delivery modes.""" + + AUTO_BASIC = "auto_basic" + SEMI_PRO = "semi_pro" + + +class OrderStatus(str, Enum): + """Lifecycle states for an order and workflow run.""" + + CREATED = "created" + RUNNING = "running" + WAITING_REVIEW = "waiting_review" + SUCCEEDED = "succeeded" + FAILED = "failed" + CANCELLED = "cancelled" + + +class WorkflowStepName(str, Enum): + """Canonical workflow step names.""" + + PREPARE_MODEL = "prepare_model" + TRYON = "tryon" + SCENE = "scene" + TEXTURE = "texture" + FACE = "face" + FUSION = "fusion" + QC = "qc" + EXPORT = "export" + REVIEW = "review" + + +class ReviewDecision(str, Enum): + """Supported review decisions for the mid-end workflow.""" + + APPROVE = "approve" + RERUN_SCENE = "rerun_scene" + RERUN_FACE = "rerun_face" + RERUN_FUSION = "rerun_fusion" + REJECT = "reject" + + +class StepStatus(str, Enum): + """Execution status of a single workflow step record.""" + + PENDING = "pending" + RUNNING = "running" + WAITING = "waiting" + SUCCEEDED = "succeeded" + FAILED = "failed" + + +class ReviewTaskStatus(str, Enum): + """Status of a human review task.""" + + PENDING = "pending" + SUBMITTED = "submitted" + + +class AssetType(str, Enum): + """Asset classes produced by the pipeline.""" + + PREPARED_MODEL = "prepared_model" + TRYON = "tryon" + SCENE = "scene" + TEXTURE = "texture" + FACE = "face" + FUSION = "fusion" + QC_CANDIDATE = "qc_candidate" + FINAL = "final" + diff --git a/app/domain/models/asset.py b/app/domain/models/asset.py new file mode 100644 index 0000000..9fd9003 --- /dev/null +++ b/app/domain/models/asset.py @@ -0,0 +1,21 @@ +"""Domain asset model.""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from app.domain.enums import AssetType, WorkflowStepName + + +@dataclass(slots=True) +class Asset: + """Business representation of a generated asset.""" + + id: int + order_id: int + asset_type: AssetType + step_name: WorkflowStepName | None + uri: str + metadata_json: dict[str, Any] | None + created_at: datetime + diff --git a/app/domain/models/order.py b/app/domain/models/order.py new file mode 100644 index 0000000..08e599f --- /dev/null +++ b/app/domain/models/order.py @@ -0,0 +1,26 @@ +"""Domain order model.""" + +from dataclasses import dataclass +from datetime import datetime + +from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode, WorkflowStepName + + +@dataclass(slots=True) +class Order: + """Business representation of an image order.""" + + id: int + customer_level: CustomerLevel + service_mode: ServiceMode + status: OrderStatus + model_id: int + pose_id: int + garment_asset_id: int + scene_ref_asset_id: int + final_asset_id: int | None + workflow_id: str | None + current_step: WorkflowStepName | None + created_at: datetime + updated_at: datetime + diff --git a/app/domain/models/review_task.py b/app/domain/models/review_task.py new file mode 100644 index 0000000..d21d2b3 --- /dev/null +++ b/app/domain/models/review_task.py @@ -0,0 +1,22 @@ +"""Domain review task model.""" + +from dataclasses import dataclass +from datetime import datetime + +from app.domain.enums import ReviewDecision, ReviewTaskStatus + + +@dataclass(slots=True) +class ReviewTask: + """Business representation of a review task.""" + + id: int + order_id: int + status: ReviewTaskStatus + decision: ReviewDecision | None + reviewer_id: int | None + selected_asset_id: int | None + comment: str | None + created_at: datetime + updated_at: datetime + diff --git a/app/domain/models/workflow_run.py b/app/domain/models/workflow_run.py new file mode 100644 index 0000000..677218a --- /dev/null +++ b/app/domain/models/workflow_run.py @@ -0,0 +1,21 @@ +"""Domain workflow run model.""" + +from dataclasses import dataclass +from datetime import datetime + +from app.domain.enums import OrderStatus, WorkflowStepName + + +@dataclass(slots=True) +class WorkflowRun: + """Business representation of a workflow execution.""" + + id: int + order_id: int + workflow_id: str + workflow_type: str + status: OrderStatus + current_step: WorkflowStepName | None + created_at: datetime + updated_at: datetime + diff --git a/app/domain/models/workflow_step.py b/app/domain/models/workflow_step.py new file mode 100644 index 0000000..13edd1f --- /dev/null +++ b/app/domain/models/workflow_step.py @@ -0,0 +1,23 @@ +"""Domain workflow step model.""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from app.domain.enums import StepStatus, WorkflowStepName + + +@dataclass(slots=True) +class WorkflowStep: + """Business representation of a workflow step attempt.""" + + id: int + workflow_run_id: int + step_name: WorkflowStepName + step_status: StepStatus + input_json: dict[str, Any] | None + output_json: dict[str, Any] | None + error_message: str | None + started_at: datetime + ended_at: datetime | None + diff --git a/app/infra/db/base.py b/app/infra/db/base.py new file mode 100644 index 0000000..7e757ae --- /dev/null +++ b/app/infra/db/base.py @@ -0,0 +1,33 @@ +"""Database base declarations.""" + +from datetime import datetime, timezone + +from sqlalchemy import DateTime +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +def utc_now() -> datetime: + """Return the current UTC timestamp.""" + + return datetime.now(timezone.utc) + + +class Base(DeclarativeBase): + """Shared declarative base.""" + + +class TimestampMixin: + """Mixin that adds created and updated timestamps.""" + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + nullable=False, + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + onupdate=utc_now, + nullable=False, + ) + diff --git a/app/infra/db/models/asset.py b/app/infra/db/models/asset.py new file mode 100644 index 0000000..854c28c --- /dev/null +++ b/app/infra/db/models/asset.py @@ -0,0 +1,31 @@ +"""Asset ORM model.""" + +from typing import Any + +from sqlalchemy import Enum, ForeignKey, Integer, JSON, String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.domain.enums import AssetType, WorkflowStepName +from app.infra.db.base import Base, TimestampMixin + + +class AssetORM(TimestampMixin, Base): + """Persisted generated asset.""" + + __tablename__ = "assets" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), nullable=False, index=True) + asset_type: Mapped[AssetType] = mapped_column( + Enum(AssetType, native_enum=False), + nullable=False, + ) + step_name: Mapped[WorkflowStepName | None] = mapped_column( + Enum(WorkflowStepName, native_enum=False), + nullable=True, + ) + uri: Mapped[str] = mapped_column(String(500), nullable=False) + metadata_json: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) + + order = relationship("OrderORM", back_populates="assets") + diff --git a/app/infra/db/models/order.py b/app/infra/db/models/order.py new file mode 100644 index 0000000..e3e2c55 --- /dev/null +++ b/app/infra/db/models/order.py @@ -0,0 +1,38 @@ +"""Order ORM model.""" + +from sqlalchemy import Enum, Integer +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode +from app.infra.db.base import Base, TimestampMixin + + +class OrderORM(TimestampMixin, Base): + """Persisted order record.""" + + __tablename__ = "orders" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + customer_level: Mapped[CustomerLevel] = mapped_column( + Enum(CustomerLevel, native_enum=False), + nullable=False, + ) + service_mode: Mapped[ServiceMode] = mapped_column( + Enum(ServiceMode, native_enum=False), + nullable=False, + ) + status: Mapped[OrderStatus] = mapped_column( + Enum(OrderStatus, native_enum=False), + nullable=False, + default=OrderStatus.CREATED, + ) + model_id: Mapped[int] = mapped_column(Integer, nullable=False) + pose_id: Mapped[int] = mapped_column(Integer, nullable=False) + garment_asset_id: Mapped[int] = mapped_column(Integer, nullable=False) + scene_ref_asset_id: Mapped[int] = mapped_column(Integer, nullable=False) + final_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + + assets = relationship("AssetORM", back_populates="order", lazy="selectin") + review_tasks = relationship("ReviewTaskORM", back_populates="order", lazy="selectin") + workflow_runs = relationship("WorkflowRunORM", back_populates="order", lazy="selectin") + diff --git a/app/infra/db/models/review_task.py b/app/infra/db/models/review_task.py new file mode 100644 index 0000000..f6d9351 --- /dev/null +++ b/app/infra/db/models/review_task.py @@ -0,0 +1,31 @@ +"""Review task ORM model.""" + +from sqlalchemy import Enum, ForeignKey, Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.domain.enums import ReviewDecision, ReviewTaskStatus +from app.infra.db.base import Base, TimestampMixin + + +class ReviewTaskORM(TimestampMixin, Base): + """Persisted review task.""" + + __tablename__ = "review_tasks" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), nullable=False, index=True) + status: Mapped[ReviewTaskStatus] = mapped_column( + Enum(ReviewTaskStatus, native_enum=False), + nullable=False, + default=ReviewTaskStatus.PENDING, + ) + decision: Mapped[ReviewDecision | None] = mapped_column( + Enum(ReviewDecision, native_enum=False), + nullable=True, + ) + reviewer_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + selected_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + comment: Mapped[str | None] = mapped_column(Text, nullable=True) + + order = relationship("OrderORM", back_populates="review_tasks") + diff --git a/app/infra/db/models/workflow_run.py b/app/infra/db/models/workflow_run.py new file mode 100644 index 0000000..4e84942 --- /dev/null +++ b/app/infra/db/models/workflow_run.py @@ -0,0 +1,36 @@ +"""Workflow run ORM model.""" + +from sqlalchemy import Enum, ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.domain.enums import OrderStatus, WorkflowStepName +from app.infra.db.base import Base, TimestampMixin + + +class WorkflowRunORM(TimestampMixin, Base): + """Persisted workflow execution state.""" + + __tablename__ = "workflow_runs" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), nullable=False, index=True) + workflow_id: Mapped[str] = mapped_column(String(255), nullable=False, unique=True) + workflow_type: Mapped[str] = mapped_column(String(255), nullable=False) + status: Mapped[OrderStatus] = mapped_column( + Enum(OrderStatus, native_enum=False), + nullable=False, + default=OrderStatus.CREATED, + ) + current_step: Mapped[WorkflowStepName | None] = mapped_column( + Enum(WorkflowStepName, native_enum=False), + nullable=True, + ) + + order = relationship("OrderORM", back_populates="workflow_runs") + steps = relationship( + "WorkflowStepORM", + back_populates="workflow_run", + lazy="selectin", + order_by="WorkflowStepORM.started_at", + ) + diff --git a/app/infra/db/models/workflow_step.py b/app/infra/db/models/workflow_step.py new file mode 100644 index 0000000..077ce64 --- /dev/null +++ b/app/infra/db/models/workflow_step.py @@ -0,0 +1,42 @@ +"""Workflow step ORM model.""" + +from datetime import datetime +from typing import Any + +from sqlalchemy import DateTime, Enum, ForeignKey, Integer, JSON, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.domain.enums import StepStatus, WorkflowStepName +from app.infra.db.base import Base, utc_now + + +class WorkflowStepORM(Base): + """Persisted workflow step execution record.""" + + __tablename__ = "workflow_steps" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + workflow_run_id: Mapped[int] = mapped_column( + ForeignKey("workflow_runs.id"), + nullable=False, + index=True, + ) + step_name: Mapped[WorkflowStepName] = mapped_column( + Enum(WorkflowStepName, native_enum=False), + nullable=False, + ) + step_status: Mapped[StepStatus] = mapped_column( + Enum(StepStatus, native_enum=False), + nullable=False, + ) + input_json: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) + output_json: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + started_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + nullable=False, + ) + ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + workflow_run = relationship("WorkflowRunORM", back_populates="steps") diff --git a/app/infra/db/session.py b/app/infra/db/session.py new file mode 100644 index 0000000..78b092f --- /dev/null +++ b/app/infra/db/session.py @@ -0,0 +1,65 @@ +"""Async database engine and session helpers.""" + +from collections.abc import AsyncGenerator + +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from app.config.settings import get_settings +from app.infra.db.base import Base + +_engine: AsyncEngine | None = None +_session_factory: async_sessionmaker[AsyncSession] | None = None + + +def get_async_engine() -> AsyncEngine: + """Return the lazily created async SQLAlchemy engine.""" + + global _engine + if _engine is None: + _engine = create_async_engine( + get_settings().database_url, + future=True, + echo=False, + ) + return _engine + + +def get_session_factory() -> async_sessionmaker[AsyncSession]: + """Return the lazily created async session factory.""" + + global _session_factory + if _session_factory is None: + _session_factory = async_sessionmaker(get_async_engine(), expire_on_commit=False) + return _session_factory + + +async def get_db_session() -> AsyncGenerator[AsyncSession, None]: + """Yield a database session for FastAPI dependencies.""" + + async with get_session_factory()() as session: + yield session + + +async def init_database() -> None: + """Create database tables when running the MVP without migrations.""" + + from app.infra.db.models.asset import AssetORM + from app.infra.db.models.order import OrderORM + from app.infra.db.models.review_task import ReviewTaskORM + from app.infra.db.models.workflow_run import WorkflowRunORM + from app.infra.db.models.workflow_step import WorkflowStepORM + + del AssetORM, OrderORM, ReviewTaskORM, WorkflowRunORM, WorkflowStepORM + + async with get_async_engine().begin() as connection: + await connection.run_sync(Base.metadata.create_all) + + +async def dispose_database() -> None: + """Dispose the active engine and clear cached session objects.""" + + global _engine, _session_factory + if _engine is not None: + await _engine.dispose() + _engine = None + _session_factory = None diff --git a/app/infra/temporal/client.py b/app/infra/temporal/client.py new file mode 100644 index 0000000..e471ec2 --- /dev/null +++ b/app/infra/temporal/client.py @@ -0,0 +1,35 @@ +"""Temporal client helpers.""" + +import asyncio + +from temporalio.client import Client + +from app.config.settings import get_settings + +_client: Client | None = None +_client_lock = asyncio.Lock() + + +async def get_temporal_client() -> Client: + """Return a cached Temporal client.""" + + global _client + if _client is not None: + return _client + + async with _client_lock: + if _client is None: + settings = get_settings() + _client = await Client.connect( + settings.temporal_address, + namespace=settings.temporal_namespace, + ) + return _client + + +def set_temporal_client(client: Client | None) -> None: + """Override the cached Temporal client, primarily for tests.""" + + global _client + _client = client + diff --git a/app/infra/temporal/task_queues.py b/app/infra/temporal/task_queues.py new file mode 100644 index 0000000..7570f74 --- /dev/null +++ b/app/infra/temporal/task_queues.py @@ -0,0 +1,8 @@ +"""Temporal task queue names.""" + +IMAGE_PIPELINE_CONTROL_TASK_QUEUE = "image-pipeline-control" +IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE = "image-pipeline-image-gen" +IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE = "image-pipeline-post-process" +IMAGE_PIPELINE_QC_TASK_QUEUE = "image-pipeline-qc" +IMAGE_PIPELINE_EXPORT_TASK_QUEUE = "image-pipeline-export" + diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..e53f0ed --- /dev/null +++ b/app/main.py @@ -0,0 +1,39 @@ +"""FastAPI application entrypoint.""" + +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.api.routers.assets import router as assets_router +from app.api.routers.health import router as health_router +from app.api.routers.orders import router as orders_router +from app.api.routers.reviews import router as reviews_router +from app.api.routers.workflows import router as workflows_router +from app.config.settings import get_settings +from app.infra.db.session import init_database + + +@asynccontextmanager +async def lifespan(_: FastAPI): + """Initialize local resources for the MVP runtime.""" + + settings = get_settings() + if settings.auto_create_tables: + await init_database() + yield + + +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + + settings = get_settings() + app = FastAPI(title=settings.app_name, debug=settings.debug, lifespan=lifespan) + app.include_router(health_router) + app.include_router(orders_router, prefix=settings.api_prefix) + app.include_router(assets_router, prefix=settings.api_prefix) + app.include_router(reviews_router, prefix=settings.api_prefix) + app.include_router(workflows_router, prefix=settings.api_prefix) + return app + + +app = create_app() diff --git a/app/workers/activities/export_activities.py b/app/workers/activities/export_activities.py new file mode 100644 index 0000000..605ff03 --- /dev/null +++ b/app/workers/activities/export_activities.py @@ -0,0 +1,20 @@ +"""Export mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType +from app.workers.activities.tryon_activities import execute_asset_step +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_export_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock final asset export.""" + + return await execute_asset_step( + payload, + AssetType.FINAL, + filename="final.png", + finalize=True, + ) + diff --git a/app/workers/activities/face_activities.py b/app/workers/activities/face_activities.py new file mode 100644 index 0000000..dd1dc4b --- /dev/null +++ b/app/workers/activities/face_activities.py @@ -0,0 +1,15 @@ +"""Face mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType +from app.workers.activities.tryon_activities import execute_asset_step +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_face_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock face enhancement.""" + + return await execute_asset_step(payload, AssetType.FACE) + diff --git a/app/workers/activities/fusion_activities.py b/app/workers/activities/fusion_activities.py new file mode 100644 index 0000000..6c2fb3b --- /dev/null +++ b/app/workers/activities/fusion_activities.py @@ -0,0 +1,19 @@ +"""Fusion mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType +from app.workers.activities.tryon_activities import execute_asset_step +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_fusion_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock face and body fusion.""" + + return await execute_asset_step( + payload, + AssetType.FUSION, + extra_metadata={"face_asset_id": payload.selected_asset_id}, + ) + diff --git a/app/workers/activities/qc_activities.py b/app/workers/activities/qc_activities.py new file mode 100644 index 0000000..1b10e1f --- /dev/null +++ b/app/workers/activities/qc_activities.py @@ -0,0 +1,69 @@ +"""Quality-control mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType, OrderStatus, StepStatus +from app.infra.db.models.asset import AssetORM +from app.infra.db.session import get_session_factory +from app.workers.activities.tryon_activities import create_step_record, jsonable, load_order_and_run, mock_uri, utc_now +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_qc_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock automated quality control.""" + + async with get_session_factory()() as session: + order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) + step = create_step_record(payload) + session.add(step) + + order.status = OrderStatus.RUNNING + workflow_run.status = OrderStatus.RUNNING + workflow_run.current_step = payload.step_name + await session.flush() + + try: + passed = not payload.metadata.get("force_fail", False) + candidate_asset_ids: list[int] = [] + candidate_uri: str | None = None + + if passed: + candidate = AssetORM( + order_id=payload.order_id, + asset_type=AssetType.QC_CANDIDATE, + step_name=payload.step_name, + uri=mock_uri(payload.order_id, payload.step_name.value, "candidate.png"), + metadata_json=jsonable({"source_asset_id": payload.source_asset_id}), + ) + session.add(candidate) + await session.flush() + candidate_asset_ids = [candidate.id] + candidate_uri = candidate.uri + + result = MockActivityResult( + step_name=payload.step_name, + success=True, + asset_id=candidate_asset_ids[0] if candidate_asset_ids else None, + uri=candidate_uri, + score=0.95 if passed else 0.35, + passed=passed, + message="mock success" if passed else "mock qc rejected", + candidate_asset_ids=candidate_asset_ids, + metadata={"source_asset_id": payload.source_asset_id}, + ) + + step.step_status = StepStatus.SUCCEEDED if passed else StepStatus.FAILED + step.output_json = jsonable(result) + step.error_message = None if passed else "QC rejected the asset" + step.ended_at = utc_now() + await session.commit() + return result + except Exception as exc: + step.step_status = StepStatus.FAILED + step.error_message = str(exc) + step.ended_at = utc_now() + order.status = OrderStatus.FAILED + workflow_run.status = OrderStatus.FAILED + await session.commit() + raise diff --git a/app/workers/activities/review_activities.py b/app/workers/activities/review_activities.py new file mode 100644 index 0000000..73e5d32 --- /dev/null +++ b/app/workers/activities/review_activities.py @@ -0,0 +1,117 @@ +"""Review state management mock activities.""" + +from sqlalchemy import select + +from temporalio import activity + +from app.domain.enums import OrderStatus, ReviewDecision, ReviewTaskStatus, StepStatus, WorkflowStepName +from app.infra.db.models.review_task import ReviewTaskORM +from app.infra.db.models.workflow_step import WorkflowStepORM +from app.infra.db.session import get_session_factory +from app.workers.activities.tryon_activities import jsonable, load_order_and_run, utc_now +from app.workers.workflows.types import ( + ReviewResolutionActivityInput, + ReviewWaitActivityInput, + WorkflowFailureActivityInput, +) + + +@activity.defn +async def mark_waiting_for_review_activity(payload: ReviewWaitActivityInput) -> None: + """Mark a workflow as waiting for a human review.""" + + async with get_session_factory()() as session: + order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) + review_step = WorkflowStepORM( + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.REVIEW, + step_status=StepStatus.WAITING, + input_json=jsonable(payload), + started_at=utc_now(), + ) + session.add(review_step) + session.add( + ReviewTaskORM( + order_id=payload.order_id, + status=ReviewTaskStatus.PENDING, + selected_asset_id=payload.candidate_asset_ids[0] if payload.candidate_asset_ids else None, + comment=payload.comment, + ) + ) + + order.status = OrderStatus.WAITING_REVIEW + workflow_run.status = OrderStatus.WAITING_REVIEW + workflow_run.current_step = WorkflowStepName.REVIEW + await session.commit() + + +@activity.defn +async def complete_review_wait_activity(payload: ReviewResolutionActivityInput) -> None: + """Resolve the current waiting-review step before the next branch runs.""" + + async with get_session_factory()() as session: + order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) + step_result = await session.execute( + select(WorkflowStepORM) + .where( + WorkflowStepORM.workflow_run_id == payload.workflow_run_id, + WorkflowStepORM.step_name == WorkflowStepName.REVIEW, + WorkflowStepORM.step_status == StepStatus.WAITING, + ) + .order_by(WorkflowStepORM.started_at.desc(), WorkflowStepORM.id.desc()) + ) + review_step = step_result.scalars().first() + if review_step is not None: + review_step.step_status = ( + StepStatus.FAILED if payload.decision == ReviewDecision.REJECT else StepStatus.SUCCEEDED + ) + review_step.output_json = jsonable(payload) + review_step.error_message = payload.comment if payload.decision == ReviewDecision.REJECT else None + review_step.ended_at = utc_now() + + if payload.decision == ReviewDecision.REJECT: + order.status = OrderStatus.FAILED + workflow_run.status = OrderStatus.FAILED + else: + order.status = OrderStatus.RUNNING + workflow_run.status = OrderStatus.RUNNING + + workflow_run.current_step = WorkflowStepName.REVIEW + await session.commit() + + +@activity.defn +async def mark_workflow_failed_activity(payload: WorkflowFailureActivityInput) -> None: + """Mark the persisted workflow state as failed.""" + + async with get_session_factory()() as session: + order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) + + step_result = await session.execute( + select(WorkflowStepORM) + .where( + WorkflowStepORM.workflow_run_id == payload.workflow_run_id, + WorkflowStepORM.step_name == payload.current_step, + ) + .order_by(WorkflowStepORM.started_at.desc(), WorkflowStepORM.id.desc()) + ) + workflow_step = step_result.scalars().first() + if workflow_step is None: + workflow_step = WorkflowStepORM( + workflow_run_id=payload.workflow_run_id, + step_name=payload.current_step, + step_status=StepStatus.FAILED, + input_json=jsonable(payload), + started_at=utc_now(), + ) + session.add(workflow_step) + + workflow_step.step_status = StepStatus.FAILED + workflow_step.error_message = payload.message + workflow_step.output_json = jsonable({"message": payload.message, "status": payload.status.value}) + workflow_step.ended_at = workflow_step.ended_at or utc_now() + + order.status = payload.status + workflow_run.status = payload.status + workflow_run.current_step = payload.current_step + await session.commit() diff --git a/app/workers/activities/scene_activities.py b/app/workers/activities/scene_activities.py new file mode 100644 index 0000000..05e81bf --- /dev/null +++ b/app/workers/activities/scene_activities.py @@ -0,0 +1,19 @@ +"""Scene mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType +from app.workers.activities.tryon_activities import execute_asset_step +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_scene_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock scene replacement.""" + + return await execute_asset_step( + payload, + AssetType.SCENE, + extra_metadata={"scene_ref_asset_id": payload.scene_ref_asset_id}, + ) + diff --git a/app/workers/activities/texture_activities.py b/app/workers/activities/texture_activities.py new file mode 100644 index 0000000..847a82a --- /dev/null +++ b/app/workers/activities/texture_activities.py @@ -0,0 +1,15 @@ +"""Texture mock activity.""" + +from temporalio import activity + +from app.domain.enums import AssetType +from app.workers.activities.tryon_activities import execute_asset_step +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +@activity.defn +async def run_texture_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock garment texture enhancement.""" + + return await execute_asset_step(payload, AssetType.TEXTURE) + diff --git a/app/workers/activities/tryon_activities.py b/app/workers/activities/tryon_activities.py new file mode 100644 index 0000000..412ad2e --- /dev/null +++ b/app/workers/activities/tryon_activities.py @@ -0,0 +1,170 @@ +"""Prepare-model and try-on mock activities plus shared helpers.""" + +from __future__ import annotations + +from dataclasses import asdict, is_dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Any +from uuid import uuid4 + +from temporalio import activity + +from app.domain.enums import AssetType, OrderStatus, StepStatus +from app.infra.db.models.asset import AssetORM +from app.infra.db.models.order import OrderORM +from app.infra.db.models.workflow_run import WorkflowRunORM +from app.infra.db.models.workflow_step import WorkflowStepORM +from app.infra.db.session import get_session_factory +from app.workers.workflows.types import MockActivityResult, StepActivityInput + + +def utc_now() -> datetime: + """Return the current UTC timestamp.""" + + return datetime.now(timezone.utc) + + +def jsonable(value: Any) -> Any: + """Convert enums, dataclasses, and nested values to JSON-safe structures.""" + + if value is None: + return None + if isinstance(value, Enum): + return value.value + if isinstance(value, datetime): + return value.isoformat() + if is_dataclass(value): + return jsonable(asdict(value)) + if isinstance(value, dict): + return {key: jsonable(item) for key, item in value.items() if item is not None} + if isinstance(value, (list, tuple, set)): + return [jsonable(item) for item in value] + return value + + +def mock_uri(order_id: int, step_name: str, filename: str = "result.png") -> str: + """Build a deterministic-looking mock URI for an order step.""" + + return f"mock://orders/{order_id}/{step_name}/{uuid4().hex[:8]}-{filename}" + + +async def load_order_and_run(session, order_id: int, workflow_run_id: int) -> tuple[OrderORM, WorkflowRunORM]: + """Load the order and workflow run required by an activity.""" + + order = await session.get(OrderORM, order_id) + workflow_run = await session.get(WorkflowRunORM, workflow_run_id) + if order is None or workflow_run is None: + raise ValueError("Order or workflow run not found for activity execution") + return order, workflow_run + + +def create_step_record(payload: StepActivityInput) -> WorkflowStepORM: + """Create a running workflow step row for an activity execution.""" + + return WorkflowStepORM( + workflow_run_id=payload.workflow_run_id, + step_name=payload.step_name, + step_status=StepStatus.RUNNING, + input_json=jsonable(payload), + started_at=utc_now(), + ) + + +async def execute_asset_step( + payload: StepActivityInput, + asset_type: AssetType, + *, + score: float = 0.95, + filename: str = "result.png", + message: str = "mock success", + extra_metadata: dict[str, Any] | None = None, + finalize: bool = False, +) -> MockActivityResult: + """Persist a mock asset-producing step and return its result.""" + + async with get_session_factory()() as session: + order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) + step = create_step_record(payload) + session.add(step) + + order.status = OrderStatus.RUNNING + workflow_run.status = OrderStatus.RUNNING + workflow_run.current_step = payload.step_name + await session.flush() + + try: + metadata = { + **payload.metadata, + "source_asset_id": payload.source_asset_id, + "selected_asset_id": payload.selected_asset_id, + **(extra_metadata or {}), + } + metadata = {key: value for key, value in metadata.items() if value is not None} + + asset = AssetORM( + order_id=payload.order_id, + asset_type=asset_type, + step_name=payload.step_name, + uri=mock_uri(payload.order_id, payload.step_name.value, filename), + metadata_json=jsonable(metadata), + ) + session.add(asset) + await session.flush() + + result = MockActivityResult( + step_name=payload.step_name, + success=True, + asset_id=asset.id, + uri=asset.uri, + score=score, + passed=True, + message=message, + metadata=jsonable(metadata) or {}, + ) + + if finalize: + order.final_asset_id = asset.id + order.status = OrderStatus.SUCCEEDED + workflow_run.status = OrderStatus.SUCCEEDED + + step.step_status = StepStatus.SUCCEEDED + step.output_json = jsonable(result) + step.ended_at = utc_now() + await session.commit() + return result + except Exception as exc: + step.step_status = StepStatus.FAILED + step.error_message = str(exc) + step.ended_at = utc_now() + order.status = OrderStatus.FAILED + workflow_run.status = OrderStatus.FAILED + await session.commit() + raise + + +@activity.defn +async def prepare_model_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock model preparation for the pipeline.""" + + return await execute_asset_step( + payload, + AssetType.PREPARED_MODEL, + extra_metadata={ + "model_id": payload.model_id, + "pose_id": payload.pose_id, + "garment_asset_id": payload.garment_asset_id, + "scene_ref_asset_id": payload.scene_ref_asset_id, + }, + ) + + +@activity.defn +async def run_tryon_activity(payload: StepActivityInput) -> MockActivityResult: + """Mock try-on rendering.""" + + return await execute_asset_step( + payload, + AssetType.TRYON, + extra_metadata={"prepared_asset_id": payload.source_asset_id}, + ) diff --git a/app/workers/runner.py b/app/workers/runner.py new file mode 100644 index 0000000..29634c3 --- /dev/null +++ b/app/workers/runner.py @@ -0,0 +1,84 @@ +"""Temporal worker runner.""" + +import asyncio +from contextlib import AsyncExitStack + +from temporalio.client import Client +from temporalio.worker import Worker + +from app.infra.temporal.client import get_temporal_client +from app.infra.temporal.task_queues import ( + IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + IMAGE_PIPELINE_QC_TASK_QUEUE, +) +from app.workers.activities.export_activities import run_export_activity +from app.workers.activities.face_activities import run_face_activity +from app.workers.activities.fusion_activities import run_fusion_activity +from app.workers.activities.qc_activities import run_qc_activity +from app.workers.activities.review_activities import ( + complete_review_wait_activity, + mark_waiting_for_review_activity, + mark_workflow_failed_activity, +) +from app.workers.activities.scene_activities import run_scene_activity +from app.workers.activities.texture_activities import run_texture_activity +from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity +from app.workers.workflows.low_end_pipeline import LowEndPipelineWorkflow +from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow + + +def build_workers(client: Client) -> list[Worker]: + """Create the worker set needed for the task queues in this MVP.""" + + return [ + Worker( + client, + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + workflows=[LowEndPipelineWorkflow, MidEndPipelineWorkflow], + activities=[ + prepare_model_activity, + mark_waiting_for_review_activity, + complete_review_wait_activity, + mark_workflow_failed_activity, + ], + ), + Worker( + client, + task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + activities=[run_tryon_activity, run_scene_activity], + ), + Worker( + client, + task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + activities=[run_texture_activity, run_face_activity, run_fusion_activity], + ), + Worker( + client, + task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE, + activities=[run_qc_activity], + ), + Worker( + client, + task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + activities=[run_export_activity], + ), + ] + + +async def run_workers() -> None: + """Start all Temporal workers and keep the process alive.""" + + client = await get_temporal_client() + workers = build_workers(client) + async with AsyncExitStack() as stack: + for worker in workers: + await stack.enter_async_context(worker) + await asyncio.Event().wait() + + +if __name__ == "__main__": + asyncio.run(run_workers()) + diff --git a/app/workers/workflows/low_end_pipeline.py b/app/workers/workflows/low_end_pipeline.py new file mode 100644 index 0000000..bdb1b4b --- /dev/null +++ b/app/workers/workflows/low_end_pipeline.py @@ -0,0 +1,152 @@ +"""Low-end image pipeline workflow.""" + +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +with workflow.unsafe.imports_passed_through(): + from app.domain.enums import OrderStatus, WorkflowStepName + from app.infra.temporal.task_queues import ( + IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + IMAGE_PIPELINE_QC_TASK_QUEUE, + ) + from app.workers.activities.export_activities import run_export_activity + from app.workers.activities.qc_activities import run_qc_activity + from app.workers.activities.review_activities import mark_workflow_failed_activity + from app.workers.activities.scene_activities import run_scene_activity + from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity + from app.workers.workflows.types import ( + PipelineWorkflowInput, + StepActivityInput, + WorkflowFailureActivityInput, + ) + + +ACTIVITY_TIMEOUT = timedelta(seconds=30) +ACTIVITY_RETRY_POLICY = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_attempts=3, +) + + +@workflow.defn +class LowEndPipelineWorkflow: + """Low-end fully automated image pipeline.""" + + @workflow.run + async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]: + """Execute the low-end workflow from start to finish.""" + + current_step = WorkflowStepName.PREPARE_MODEL + try: + prepared = await workflow.execute_activity( + prepare_model_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.PREPARE_MODEL, + model_id=payload.model_id, + pose_id=payload.pose_id, + garment_asset_id=payload.garment_asset_id, + scene_ref_asset_id=payload.scene_ref_asset_id, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + current_step = WorkflowStepName.TRYON + tryon_result = await workflow.execute_activity( + run_tryon_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.TRYON, + source_asset_id=prepared.asset_id, + garment_asset_id=payload.garment_asset_id, + ), + task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + current_step = WorkflowStepName.SCENE + scene_result = await workflow.execute_activity( + run_scene_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.SCENE, + source_asset_id=tryon_result.asset_id, + scene_ref_asset_id=payload.scene_ref_asset_id, + ), + task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + current_step = WorkflowStepName.QC + qc_result = await workflow.execute_activity( + run_qc_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.QC, + source_asset_id=scene_result.asset_id, + ), + task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + if not qc_result.passed: + await self._mark_failed(payload, current_step, qc_result.message) + return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + + current_step = WorkflowStepName.EXPORT + final_result = await workflow.execute_activity( + run_export_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.EXPORT, + source_asset_id=(qc_result.candidate_asset_ids or [scene_result.asset_id])[0], + ), + task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + return { + "order_id": payload.order_id, + "status": OrderStatus.SUCCEEDED.value, + "final_asset_id": final_result.asset_id, + } + except Exception as exc: + await self._mark_failed(payload, current_step, str(exc)) + raise + + async def _mark_failed( + self, + payload: PipelineWorkflowInput, + current_step: WorkflowStepName, + message: str, + ) -> None: + """Persist workflow failure state.""" + + await workflow.execute_activity( + mark_workflow_failed_activity, + WorkflowFailureActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + current_step=current_step, + message=message, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + diff --git a/app/workers/workflows/mid_end_pipeline.py b/app/workers/workflows/mid_end_pipeline.py new file mode 100644 index 0000000..406a8b6 --- /dev/null +++ b/app/workers/workflows/mid_end_pipeline.py @@ -0,0 +1,315 @@ +"""Mid-end image pipeline workflow with review signal support.""" + +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +with workflow.unsafe.imports_passed_through(): + from app.domain.enums import OrderStatus, ReviewDecision, WorkflowStepName + from app.infra.temporal.task_queues import ( + IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + IMAGE_PIPELINE_QC_TASK_QUEUE, + ) + from app.workers.activities.export_activities import run_export_activity + from app.workers.activities.face_activities import run_face_activity + from app.workers.activities.fusion_activities import run_fusion_activity + from app.workers.activities.qc_activities import run_qc_activity + from app.workers.activities.review_activities import ( + complete_review_wait_activity, + mark_waiting_for_review_activity, + mark_workflow_failed_activity, + ) + from app.workers.activities.scene_activities import run_scene_activity + from app.workers.activities.texture_activities import run_texture_activity + from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity + from app.workers.workflows.types import ( + MockActivityResult, + PipelineWorkflowInput, + ReviewResolutionActivityInput, + ReviewSignalPayload, + ReviewWaitActivityInput, + StepActivityInput, + WorkflowFailureActivityInput, + ) + + +ACTIVITY_TIMEOUT = timedelta(seconds=30) +ACTIVITY_RETRY_POLICY = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_attempts=3, +) + + +@workflow.defn +class MidEndPipelineWorkflow: + """Mid-end workflow that pauses for human review and supports reruns.""" + + def __init__(self) -> None: + self._review_payload: ReviewSignalPayload | None = None + + @workflow.signal + def submit_review(self, payload: ReviewSignalPayload) -> None: + """Receive a review decision from the API layer.""" + + self._review_payload = payload + + @workflow.run + async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]: + """Execute the mid-end workflow with a human review loop.""" + + current_step = WorkflowStepName.PREPARE_MODEL + try: + prepared = await workflow.execute_activity( + prepare_model_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.PREPARE_MODEL, + model_id=payload.model_id, + pose_id=payload.pose_id, + garment_asset_id=payload.garment_asset_id, + scene_ref_asset_id=payload.scene_ref_asset_id, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + current_step = WorkflowStepName.TRYON + tryon_result = await workflow.execute_activity( + run_tryon_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.TRYON, + source_asset_id=prepared.asset_id, + garment_asset_id=payload.garment_asset_id, + ), + task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + current_step = WorkflowStepName.SCENE + scene_result = await self._run_scene(payload, tryon_result.asset_id) + + current_step = WorkflowStepName.TEXTURE + texture_result = await self._run_texture(payload, scene_result.asset_id) + + current_step = WorkflowStepName.FACE + face_result = await self._run_face(payload, texture_result.asset_id) + + current_step = WorkflowStepName.FUSION + fusion_result = await self._run_fusion(payload, scene_result.asset_id, face_result.asset_id) + + current_step = WorkflowStepName.QC + qc_result = await self._run_qc(payload, fusion_result.asset_id) + if not qc_result.passed: + await self._mark_failed(payload, current_step, qc_result.message) + return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + + while True: + current_step = WorkflowStepName.REVIEW + await workflow.execute_activity( + mark_waiting_for_review_activity, + ReviewWaitActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + candidate_asset_ids=qc_result.candidate_asset_ids, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + review_payload = await self._wait_for_review() + await workflow.execute_activity( + complete_review_wait_activity, + ReviewResolutionActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + decision=review_payload.decision, + reviewer_id=review_payload.reviewer_id, + selected_asset_id=review_payload.selected_asset_id, + comment=review_payload.comment, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + if review_payload.decision == ReviewDecision.APPROVE: + current_step = WorkflowStepName.EXPORT + export_source_id = review_payload.selected_asset_id + if export_source_id is None: + export_source_id = (qc_result.candidate_asset_ids or [fusion_result.asset_id])[0] + final_result = await workflow.execute_activity( + run_export_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.EXPORT, + source_asset_id=export_source_id, + ), + task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + return { + "order_id": payload.order_id, + "status": OrderStatus.SUCCEEDED.value, + "final_asset_id": final_result.asset_id, + } + + if review_payload.decision == ReviewDecision.REJECT: + return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + + if review_payload.decision == ReviewDecision.RERUN_SCENE: + current_step = WorkflowStepName.SCENE + scene_result = await self._run_scene(payload, tryon_result.asset_id) + current_step = WorkflowStepName.TEXTURE + texture_result = await self._run_texture(payload, scene_result.asset_id) + current_step = WorkflowStepName.FACE + face_result = await self._run_face(payload, texture_result.asset_id) + current_step = WorkflowStepName.FUSION + fusion_result = await self._run_fusion(payload, scene_result.asset_id, face_result.asset_id) + elif review_payload.decision == ReviewDecision.RERUN_FACE: + current_step = WorkflowStepName.FACE + face_result = await self._run_face(payload, texture_result.asset_id) + current_step = WorkflowStepName.FUSION + fusion_result = await self._run_fusion(payload, scene_result.asset_id, face_result.asset_id) + elif review_payload.decision == ReviewDecision.RERUN_FUSION: + current_step = WorkflowStepName.FUSION + fusion_result = await self._run_fusion(payload, scene_result.asset_id, face_result.asset_id) + + current_step = WorkflowStepName.QC + qc_result = await self._run_qc(payload, fusion_result.asset_id) + if not qc_result.passed: + await self._mark_failed(payload, current_step, qc_result.message) + return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + except Exception as exc: + await self._mark_failed(payload, current_step, str(exc)) + raise + + async def _wait_for_review(self) -> ReviewSignalPayload: + """Suspend the workflow until a review signal arrives.""" + + if self._review_payload is None: + await workflow.wait_condition(lambda: self._review_payload is not None) + assert self._review_payload is not None + review_payload = self._review_payload + self._review_payload = None + return review_payload + + async def _run_scene(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: + """Execute the scene activity.""" + + return await workflow.execute_activity( + run_scene_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.SCENE, + source_asset_id=source_asset_id, + scene_ref_asset_id=payload.scene_ref_asset_id, + ), + task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + async def _run_texture(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: + """Execute the texture activity.""" + + return await workflow.execute_activity( + run_texture_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.TEXTURE, + source_asset_id=source_asset_id, + ), + task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + async def _run_face(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: + """Execute the face activity.""" + + return await workflow.execute_activity( + run_face_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.FACE, + source_asset_id=source_asset_id, + ), + task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + async def _run_fusion( + self, + payload: PipelineWorkflowInput, + source_asset_id: int | None, + face_asset_id: int | None, + ) -> MockActivityResult: + """Execute the fusion activity.""" + + return await workflow.execute_activity( + run_fusion_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.FUSION, + source_asset_id=source_asset_id, + selected_asset_id=face_asset_id, + ), + task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + async def _run_qc(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: + """Execute the QC activity.""" + + return await workflow.execute_activity( + run_qc_activity, + StepActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + step_name=WorkflowStepName.QC, + source_asset_id=source_asset_id, + ), + task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) + + async def _mark_failed( + self, + payload: PipelineWorkflowInput, + current_step: WorkflowStepName, + message: str, + ) -> None: + """Persist workflow failure state.""" + + await workflow.execute_activity( + mark_workflow_failed_activity, + WorkflowFailureActivityInput( + order_id=payload.order_id, + workflow_run_id=payload.workflow_run_id, + current_step=current_step, + message=message, + ), + task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, + start_to_close_timeout=ACTIVITY_TIMEOUT, + retry_policy=ACTIVITY_RETRY_POLICY, + ) diff --git a/app/workers/workflows/types.py b/app/workers/workflows/types.py new file mode 100644 index 0000000..92a0cee --- /dev/null +++ b/app/workers/workflows/types.py @@ -0,0 +1,137 @@ +"""Shared workflow and activity payload types.""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +from app.domain.enums import CustomerLevel, OrderStatus, ReviewDecision, ServiceMode, WorkflowStepName + + +def _coerce_enum(value: Any, enum_cls: type[Enum]) -> Any: + """Coerce raw Temporal payload values back into enum instances.""" + + if value is None or isinstance(value, enum_cls): + return value + if isinstance(value, list): + value = "".join(str(item) for item in value) + return enum_cls(value) + + +@dataclass(slots=True) +class PipelineWorkflowInput: + """Temporal workflow input for an image pipeline order.""" + + order_id: int + workflow_run_id: int + customer_level: CustomerLevel + service_mode: ServiceMode + model_id: int + pose_id: int + garment_asset_id: int + scene_ref_asset_id: int + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.customer_level = _coerce_enum(self.customer_level, CustomerLevel) + self.service_mode = _coerce_enum(self.service_mode, ServiceMode) + + +@dataclass(slots=True) +class StepActivityInput: + """Input payload shared by the mock pipeline activities.""" + + order_id: int + workflow_run_id: int + step_name: WorkflowStepName + model_id: int | None = None + pose_id: int | None = None + garment_asset_id: int | None = None + scene_ref_asset_id: int | None = None + source_asset_id: int | None = None + selected_asset_id: int | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.step_name = _coerce_enum(self.step_name, WorkflowStepName) + + +@dataclass(slots=True) +class MockActivityResult: + """Common mock activity result structure.""" + + step_name: WorkflowStepName + success: bool + asset_id: int | None + uri: str | None + score: float | None = None + passed: bool | None = None + message: str = "mock success" + candidate_asset_ids: list[int] = field(default_factory=list) + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.step_name = _coerce_enum(self.step_name, WorkflowStepName) + + +@dataclass(slots=True) +class ReviewSignalPayload: + """Signal payload sent from the API to the mid-end workflow.""" + + decision: ReviewDecision + reviewer_id: int + selected_asset_id: int | None = None + comment: str | None = None + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.decision = _coerce_enum(self.decision, ReviewDecision) + + +@dataclass(slots=True) +class ReviewWaitActivityInput: + """Input for marking a workflow as waiting for review.""" + + order_id: int + workflow_run_id: int + candidate_asset_ids: list[int] = field(default_factory=list) + comment: str | None = None + + +@dataclass(slots=True) +class ReviewResolutionActivityInput: + """Input for completing a waiting review state.""" + + order_id: int + workflow_run_id: int + decision: ReviewDecision + reviewer_id: int + selected_asset_id: int | None = None + comment: str | None = None + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.decision = _coerce_enum(self.decision, ReviewDecision) + + +@dataclass(slots=True) +class WorkflowFailureActivityInput: + """Input for marking a workflow as failed.""" + + order_id: int + workflow_run_id: int + current_step: WorkflowStepName + message: str + status: OrderStatus = OrderStatus.FAILED + + def __post_init__(self) -> None: + """Normalize enum-like values after Temporal deserialization.""" + + self.current_step = _coerce_enum(self.current_step, WorkflowStepName) + self.status = _coerce_enum(self.status, OrderStatus) diff --git a/codex_task.md b/codex_task.md new file mode 100644 index 0000000..c89db3a --- /dev/null +++ b/codex_task.md @@ -0,0 +1,543 @@ +# CODEX 执行文档 + +## 0. 任务标题 +为图片生产系统实现 FastAPI + Temporal MVP 骨架 + +## 1. 任务目标 +在现有仓库中搭建一个可运行的最小版本,满足以下目标: + +- 提供 FastAPI 服务 +- 提供 Temporal workflow 和 worker +- 支持两条流程: + - 低端客户:全自动 `auto_basic` + - 中端客户:半自动 `semi_pro` +- 提供最小接口: + - 创建订单 + - 查询订单 + - 查询订单资产 + - 提交审核结果 +- 代码应可本地启动,结构清晰,便于后续扩展 + +## 2. 背景说明 +这是一个图片生产流水线系统,面向两类客户: + +### 低端客户 +要求批量、快速、低成本。 +流程为: + +1. 选择模特和固定姿势 +2. 换装 +3. 换场景 +4. 基础质检 +5. 导出结果 + +### 中端客户 +要求成图质量更高,允许人工轻审核。 +流程为: + +1. 选择模特和姿势 +2. 换装 +3. 换场景 +4. 衣服材质增强 +5. 面部增强 +6. 面部融合 +7. 自动质检 +8. 等待人工审核 +9. 通过后导出,或从指定步骤重跑 + +## 3. 本次范围 +只实现 **MVP 骨架**,不要求接入真实 AI 平台。 + +本次只需要: + +- FastAPI 项目结构 +- Temporal workflow / activity / worker 结构 +- Pydantic 请求响应模型 +- 先用 mock activity 代替真实图像处理 +- 数据层先可用 SQLite + SQLAlchemy +- 资产先用本地路径或假 URI +- review signal 可跑通 + +本次不需要: + +- 真正的换装算法 +- 真正的图像增强 +- 真实对象存储 +- 权限系统 +- 完整后台页面 +- 第三方平台集成 + +## 4. 技术栈要求 +- Python 3.11+ +- FastAPI +- Uvicorn +- SQLAlchemy 2.x +- Pydantic v2 +- Temporal Python SDK +- Alembic +- pytest + +## 5. 目录结构要求 +严格按以下目录创建: + +```text +app/ + main.py + config/ + settings.py + api/ + routers/ + health.py + orders.py + assets.py + reviews.py + workflows.py + schemas/ + order.py + asset.py + review.py + workflow.py + application/ + services/ + order_service.py + workflow_service.py + review_service.py + asset_service.py + domain/ + enums.py + models/ + order.py + asset.py + review_task.py + workflow_run.py + workflow_step.py + infra/ + db/ + base.py + session.py + models/ + order.py + asset.py + review_task.py + workflow_run.py + workflow_step.py + temporal/ + client.py + task_queues.py + workers/ + runner.py + workflows/ + types.py + low_end_pipeline.py + mid_end_pipeline.py + activities/ + tryon_activities.py + scene_activities.py + texture_activities.py + face_activities.py + fusion_activities.py + qc_activities.py + export_activities.py + review_activities.py +tests/ +``` + +## 6. 代码风格要求 +- 使用类型注解 +- 使用 async 风格 +- 保持模块边界清晰 +- route 中不要写业务逻辑 +- workflow 中不要直接访问数据库 +- activity 中允许 I/O 和 mock 处理 +- 所有核心函数写 docstring +- 所有枚举统一放在 `domain/enums.py` + +## 7. 核心业务枚举 +实现以下枚举: + +- `CustomerLevel`: `low`, `mid` +- `ServiceMode`: `auto_basic`, `semi_pro` +- `OrderStatus`: `created`, `running`, `waiting_review`, `succeeded`, `failed`, `cancelled` +- `WorkflowStepName`: `prepare_model`, `tryon`, `scene`, `texture`, `face`, `fusion`, `qc`, `export`, `review` +- `ReviewDecision`: `approve`, `rerun_scene`, `rerun_face`, `rerun_fusion`, `reject` + +## 8. 数据模型要求 + +### orders +字段至少包含: +- id +- customer_level +- service_mode +- status +- model_id +- pose_id +- garment_asset_id +- scene_ref_asset_id +- final_asset_id +- created_at +- updated_at + +### assets +字段至少包含: +- id +- order_id +- asset_type +- step_name +- uri +- metadata_json +- created_at + +### review_tasks +字段至少包含: +- id +- order_id +- status +- decision +- reviewer_id +- selected_asset_id +- comment +- created_at +- updated_at + +### workflow_runs +字段至少包含: +- id +- order_id +- workflow_id +- workflow_type +- status +- current_step +- created_at +- updated_at + +### workflow_steps +字段至少包含: +- id +- workflow_run_id +- step_name +- step_status +- input_json +- output_json +- error_message +- started_at +- ended_at + +## 9. API 需求 + +### 9.1 健康检查 +`GET /healthz` +返回: +```json +{"status":"ok"} +``` + +### 9.2 创建订单 +`POST /api/v1/orders` + +请求体: +```json +{ + "customer_level": "low", + "service_mode": "auto_basic", + "model_id": 101, + "pose_id": 3, + "garment_asset_id": 9001, + "scene_ref_asset_id": 8001 +} +``` + +行为: +- 创建订单 +- 创建 workflow_run +- 启动 Temporal workflow +- 返回 order_id 和 workflow_id + +### 9.3 查询订单详情 +`GET /api/v1/orders/{order_id}` + +返回: +- 基本订单信息 +- 当前状态 +- 当前步骤 +- 最终资产(如果有) + +### 9.4 查询订单资产 +`GET /api/v1/orders/{order_id}/assets` + +返回该订单相关所有资产列表。 + +### 9.5 查询待审核列表 +`GET /api/v1/reviews/pending` + +返回所有 `waiting_review` 的订单。 + +### 9.6 提交审核结果 +`POST /api/v1/reviews/{order_id}/submit` + +请求体示例: +```json +{ + "decision": "approve", + "reviewer_id": 77, + "selected_asset_id": 1001, + "comment": "通过" +} +``` + +或: +```json +{ + "decision": "rerun_face", + "reviewer_id": 77, + "comment": "面部不自然,重跑 face" +} +``` + +行为: +- 记录 review_task +- 向对应 workflow 发送 signal + +### 9.7 查询工作流状态 +`GET /api/v1/workflows/{order_id}` + +返回: +- workflow_id +- workflow_status +- current_step +- step 列表 + +## 10. Workflow 需求 + +### 10.1 低端流程 `LowEndPipelineWorkflow` +顺序如下: + +1. prepare_model +2. tryon +3. scene +4. qc +5. export + +要求: +- 每一步都调用 activity +- 每一步都记录 step 状态 +- qc 不通过时可直接标记失败 +- export 完成后写订单成功状态 + +### 10.2 中端流程 `MidEndPipelineWorkflow` +顺序如下: + +1. prepare_model +2. tryon +3. scene +4. texture +5. face +6. fusion +7. qc +8. wait for review signal +9. approve 后 export +10. rerun_face 时回到 face +11. rerun_fusion 时回到 fusion +12. rerun_scene 时回到 scene + +要求: +- 使用 workflow signal 实现人工审核回流 +- 在等待审核时,订单状态应更新为 `waiting_review` +- 最终通过后导出 + +## 11. Activity 需求 +先全部实现为 mock 版本,统一返回假结果。 + +### tryon +输入: +- order_id +- source asset refs + +输出: +- 新 asset uri +- step result + +### scene +输入: +- 上一步结果 + +输出: +- 场景替换后 asset + +### texture +输入: +- 上一步结果 + +输出: +- 材质增强 asset + +### face +输入: +- 上一步结果 + +输出: +- 面部增强 asset + +### fusion +输入: +- 主图 + face 输出 + +输出: +- 融合后的 asset + +### qc +输入: +- 当前图 + +输出: +- score +- pass/fail +- candidate assets 可先只返回一张 + +### export +输入: +- 最终通过的 asset + +输出: +- final asset + +## 12. Temporal 实现要求 +- workflow 输入使用 dataclass +- workflow id 使用 `order-{order_id}` +- task queue 至少拆成: + - `image-pipeline-control` + - `image-pipeline-image-gen` + - `image-pipeline-post-process` + - `image-pipeline-qc` + - `image-pipeline-export` +- activity 配置合理的 `start_to_close_timeout` +- 为常见 activity 加 retry policy + +## 13. Mock 策略 +为了让系统可运行,所有 activity 先返回固定结构,例如: + +```json +{ + "step_name": "tryon", + "success": true, + "asset_id": 10001, + "uri": "mock://orders/1/tryon/result.png", + "score": 0.95, + "message": "mock success" +} +``` + +资产可直接写入数据库,不要求真实文件存在。 + +## 14. 验收标准 +完成后应满足: + +1. `uvicorn app.main:app --reload` 可以启动 +2. Temporal worker 可以启动 +3. 创建低端订单后,workflow 能跑到成功 +4. 创建中端订单后,workflow 能停在 `waiting_review` +5. 提交 `approve` 后,中端流程可以继续到成功 +6. 提交 `rerun_face` 后,中端流程会回到 `face` +7. 所有 API 有基础错误处理 +8. 代码能通过基础 lint 和测试 + +## 15. 输出要求 +请按以下顺序产出: + +1. 先创建项目目录和核心文件 +2. 再实现 domain enums 和 db models +3. 再实现 API schemas 和 routers +4. 再实现 application services +5. 再实现 Temporal client / workflows / activities / worker +6. 最后补充 README,写明本地启动方法 + +## 16. README 至少包含 +- 环境要求 +- 安装依赖 +- 启动 FastAPI +- 启动 Temporal server +- 启动 worker +- 调用示例 +- 中端审核 signal 示例 + +## 17. 不要做的事 +- 不要擅自改目录结构 +- 不要引入额外复杂框架 +- 不要接入真实第三方 AI 平台 +- 不要写前端页面 +- 不要把所有逻辑塞到一个文件 +- 不要省略测试 +- 不要跳过中端审核 signal + +## 18. 开发优先级 +优先级从高到低: + +1. 项目骨架 +2. 低端 workflow 跑通 +3. 中端 workflow + review signal 跑通 +4. API 完整 +5. README 和测试 + +## 19. 建议实现顺序 +建议你按以下顺序提交代码: + +- 第 1 批:目录、配置、枚举、数据库模型 +- 第 2 批:订单 API、创建订单 service、workflow 启动 +- 第 3 批:低端 workflow + mock activities +- 第 4 批:中端 workflow + signal +- 第 5 批:审核 API、README、测试 + +## 20. 交付物 +最终需要包含: + +- 可运行源码 +- Alembic 初始迁移 +- 示例 `.env.example` +- README +- 基础测试 + +--- + +## 给 Codex 的附加执行指令 + +请按以下工作方式执行: + +1. 先阅读整个任务文档,再开始编码。 +2. 先输出一个简短实施计划,再开始改动文件。 +3. 每完成一个阶段,就更新一次当前进展。 +4. 遇到不明确处时,优先采用最小可运行方案,不要过度设计。 +5. 保持每个提交块职责单一,便于审查。 +6. 若某一步无法完整实现,先给出可运行的 stub,不要阻塞整体流程。 +7. 所有 mock 返回值结构保持一致。 +8. 所有关键函数都加类型注解。 +9. 尽量保持文件短小,单文件不要过度膨胀。 +10. 先保证能跑通,再考虑优雅性。 + +--- + +## 适合直接发给 Codex 的首条消息 + +```md +请在当前仓库中实现一个 FastAPI + Temporal 的 MVP 图片流水线系统。 + +目标: +- 支持低端全自动 auto_basic +- 支持中端半自动 semi_pro +- 提供创建订单、查询订单、查询资产、提交审核、查询 workflow 状态接口 +- 使用 SQLite + SQLAlchemy + FastAPI + Temporal Python SDK +- 真实图像处理先全部 mock + +要求: +- 严格按我提供的目录结构创建文件 +- route 不写业务逻辑 +- workflow 只做编排 +- activity 做 mock 执行 +- 中端流程必须支持 review signal,并支持 rerun_face / rerun_fusion / rerun_scene +- 输出 README、.env.example、alembic 初始迁移、基础测试 + +交付标准: +- API 可启动 +- worker 可启动 +- 低端流程可跑通 +- 中端流程可停在 waiting_review,审核后可继续 +``` + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..fac0051 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "temporal-demo" +version = "0.1.0" +description = "FastAPI + Temporal MVP image pipeline demo" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "aiosqlite>=0.20,<1.0", + "alembic>=1.13,<2.0", + "fastapi>=0.115,<1.0", + "httpx>=0.27,<1.0", + "pydantic>=2.8,<3.0", + "pydantic-settings>=2.4,<3.0", + "pytest>=8.3,<9.0", + "pytest-asyncio>=0.24,<1.0", + "sqlalchemy>=2.0,<3.0", + "temporalio>=1.7,<2.0", + "uvicorn[standard]>=0.30,<1.0", +] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.setuptools.packages.find] +include = ["app*"] +namespaces = true diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c462711 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,44 @@ +"""Test fixtures for the Temporal demo.""" + +from contextlib import AsyncExitStack + +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from temporalio.testing import WorkflowEnvironment + +from app.config.settings import get_settings +from app.infra.db.session import dispose_database, init_database +from app.infra.temporal.client import set_temporal_client +from app.main import create_app +from app.workers.runner import build_workers + + +@pytest_asyncio.fixture +async def api_runtime(tmp_path, monkeypatch): + """Provide an API client and in-memory Temporal test environment.""" + + db_path = tmp_path / "test.db" + monkeypatch.setenv("DATABASE_URL", f"sqlite+aiosqlite:///{db_path.as_posix()}") + monkeypatch.setenv("AUTO_CREATE_TABLES", "true") + + get_settings.cache_clear() + await dispose_database() + await init_database() + + app = create_app() + + async with await WorkflowEnvironment.start_time_skipping() as env: + set_temporal_client(env.client) + async with AsyncExitStack() as stack: + for worker in build_workers(env.client): + await stack.enter_async_context(worker) + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + yield client, env + + set_temporal_client(None) + await dispose_database() + get_settings.cache_clear() + diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..dcd6ac2 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,178 @@ +"""Integration tests for the FastAPI + Temporal MVP.""" + +import asyncio + +import pytest + + +async def wait_for_workflow_status(client, order_id: int, expected_status: str, attempts: int = 120): + """Poll the workflow status endpoint until it reaches a target status.""" + + last_payload = None + for _ in range(attempts): + response = await client.get(f"/api/v1/workflows/{order_id}") + if response.status_code == 200: + last_payload = response.json() + if last_payload["workflow_status"] == expected_status: + return last_payload + await asyncio.sleep(0.05) + raise AssertionError(f"Workflow {order_id} never reached status {expected_status!r}: {last_payload}") + + +async def wait_for_step_count(client, order_id: int, step_name: str, minimum_count: int, attempts: int = 120): + """Poll until a workflow step has been recorded a minimum number of times.""" + + last_payload = None + for _ in range(attempts): + response = await client.get(f"/api/v1/workflows/{order_id}") + if response.status_code == 200: + last_payload = response.json() + count = sum(1 for step in last_payload["steps"] if step["step_name"] == step_name) + if count >= minimum_count: + return last_payload + await asyncio.sleep(0.05) + raise AssertionError( + f"Workflow {order_id} never recorded step {step_name!r} {minimum_count} times: {last_payload}" + ) + + +@pytest.mark.asyncio +async def test_healthcheck(api_runtime): + """The health endpoint should always respond successfully.""" + + client, _ = api_runtime + response = await client.get("/healthz") + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +@pytest.mark.asyncio +async def test_low_end_order_completes(api_runtime): + """Low-end orders should run through the full automated pipeline.""" + + client, env = api_runtime + response = await client.post( + "/api/v1/orders", + json={ + "customer_level": "low", + "service_mode": "auto_basic", + "model_id": 101, + "pose_id": 3, + "garment_asset_id": 9001, + "scene_ref_asset_id": 8001, + }, + ) + + assert response.status_code == 201 + payload = response.json() + assert payload["workflow_id"] == f"order-{payload['order_id']}" + + handle = env.client.get_workflow_handle(payload["workflow_id"]) + result = await handle.result() + + assert result["status"] == "succeeded" + + order_response = await client.get(f"/api/v1/orders/{payload['order_id']}") + assert order_response.status_code == 200 + assert order_response.json()["status"] == "succeeded" + + assets_response = await client.get(f"/api/v1/orders/{payload['order_id']}/assets") + assert assets_response.status_code == 200 + assert any(asset["asset_type"] == "final" for asset in assets_response.json()) + + workflow_response = await client.get(f"/api/v1/workflows/{payload['order_id']}") + assert workflow_response.status_code == 200 + assert workflow_response.json()["workflow_status"] == "succeeded" + + +@pytest.mark.asyncio +async def test_mid_end_order_waits_review_then_approves(api_runtime): + """Mid-end orders should pause for review and continue after approval.""" + + client, env = api_runtime + response = await client.post( + "/api/v1/orders", + json={ + "customer_level": "mid", + "service_mode": "semi_pro", + "model_id": 101, + "pose_id": 3, + "garment_asset_id": 9001, + "scene_ref_asset_id": 8001, + }, + ) + + assert response.status_code == 201 + payload = response.json() + + await wait_for_workflow_status(client, payload["order_id"], "waiting_review") + + pending_response = await client.get("/api/v1/reviews/pending") + assert pending_response.status_code == 200 + assert any(item["order_id"] == payload["order_id"] for item in pending_response.json()) + + review_response = await client.post( + f"/api/v1/reviews/{payload['order_id']}/submit", + json={"decision": "approve", "reviewer_id": 77, "comment": "通过"}, + ) + assert review_response.status_code == 200 + + handle = env.client.get_workflow_handle(payload["workflow_id"]) + result = await handle.result() + assert result["status"] == "succeeded" + + order_response = await client.get(f"/api/v1/orders/{payload['order_id']}") + assert order_response.status_code == 200 + assert order_response.json()["status"] == "succeeded" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("decision", "expected_step"), + [ + ("rerun_scene", "scene"), + ("rerun_face", "face"), + ("rerun_fusion", "fusion"), + ], +) +async def test_mid_end_rerun_paths_return_to_review(api_runtime, decision: str, expected_step: str): + """Each rerun decision should branch back to the correct step and pause again for review.""" + + client, env = api_runtime + response = await client.post( + "/api/v1/orders", + json={ + "customer_level": "mid", + "service_mode": "semi_pro", + "model_id": 101, + "pose_id": 3, + "garment_asset_id": 9001, + "scene_ref_asset_id": 8001, + }, + ) + + assert response.status_code == 201 + payload = response.json() + + await wait_for_workflow_status(client, payload["order_id"], "waiting_review") + + review_response = await client.post( + f"/api/v1/reviews/{payload['order_id']}/submit", + json={"decision": decision, "reviewer_id": 77, "comment": f"trigger {decision}"}, + ) + assert review_response.status_code == 200 + + workflow_payload = await wait_for_step_count(client, payload["order_id"], expected_step, 2) + workflow_payload = await wait_for_step_count(client, payload["order_id"], "review", 2) + assert workflow_payload["workflow_status"] == "waiting_review" + + approve_response = await client.post( + f"/api/v1/reviews/{payload['order_id']}/submit", + json={"decision": "approve", "reviewer_id": 77, "comment": "批准最终结果"}, + ) + assert approve_response.status_code == 200 + + handle = env.client.get_workflow_handle(payload["workflow_id"]) + result = await handle.result() + assert result["status"] == "succeeded"