diff --git a/.gitignore b/.gitignore index 97c6293..7389eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,12 @@ .venv/ __pycache__/ .pytest_cache/ +.claude/ +.superpowers/ +CLAUDE.md +.DS_Store +docs/superpowers/previews/ +docs/superpowers/.DS_Store *.pyc *.pyo *.pyd diff --git a/README.md b/README.md index 4a3d266..082fcf2 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,11 @@ curl http://127.0.0.1:8000/api/v1/workflows/1 pytest ``` +说明: + +- 首次运行测试时,`temporalio` 会自动下载 Temporal test server 二进制,需要可用外网;下载完成后会复用本地缓存。 +- 如果是在全新环境中安装依赖,请优先使用 `python -m pip install -e .`,确保 `greenlet` 等运行时依赖一并安装。 + 覆盖范围: - 健康检查 diff --git a/alembic/versions/20260327_0002_manual_revision_schema.py b/alembic/versions/20260327_0002_manual_revision_schema.py new file mode 100644 index 0000000..e85a89e --- /dev/null +++ b/alembic/versions/20260327_0002_manual_revision_schema.py @@ -0,0 +1,58 @@ +"""manual revision schema support + +Revision ID: 20260327_0002 +Revises: 20260326_0001 +Create Date: 2026-03-27 21:55:00.000000 +""" + +from collections.abc import Sequence + +from alembic import op +import sqlalchemy as sa + + +revision: str = "20260327_0002" +down_revision: str | None = "20260326_0001" +branch_labels: Sequence[str] | None = None +depends_on: Sequence[str] | None = None + + +def upgrade() -> None: + """Add manual revision columns required by the live schema.""" + + with op.batch_alter_table("review_tasks") as batch_op: + batch_op.add_column(sa.Column("latest_revision_asset_id", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("resume_asset_id", sa.Integer(), nullable=True)) + + with op.batch_alter_table("assets") as batch_op: + batch_op.add_column(sa.Column("parent_asset_id", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("root_asset_id", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("version_no", sa.Integer(), nullable=False, server_default="0")) + batch_op.add_column( + sa.Column("is_current_version", sa.Boolean(), nullable=False, server_default=sa.false()) + ) + batch_op.create_index("ix_assets_parent_asset_id", ["parent_asset_id"], unique=False) + batch_op.create_index("ix_assets_root_asset_id", ["root_asset_id"], unique=False) + + op.execute("UPDATE assets SET version_no = 0 WHERE version_no IS NULL") + op.execute("UPDATE assets SET is_current_version = 0 WHERE is_current_version IS NULL") + + with op.batch_alter_table("assets") as batch_op: + batch_op.alter_column("version_no", server_default=None) + batch_op.alter_column("is_current_version", server_default=None) + + +def downgrade() -> None: + """Remove manual revision schema additions.""" + + with op.batch_alter_table("assets") as batch_op: + batch_op.drop_index("ix_assets_root_asset_id") + batch_op.drop_index("ix_assets_parent_asset_id") + batch_op.drop_column("is_current_version") + batch_op.drop_column("version_no") + batch_op.drop_column("root_asset_id") + batch_op.drop_column("parent_asset_id") + + with op.batch_alter_table("review_tasks") as batch_op: + batch_op.drop_column("resume_asset_id") + batch_op.drop_column("latest_revision_asset_id") diff --git a/app/api/routers/orders.py b/app/api/routers/orders.py index 0b5b5d7..135b98a 100644 --- a/app/api/routers/orders.py +++ b/app/api/routers/orders.py @@ -1,10 +1,16 @@ """Order routes.""" -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, Depends, Query, status from sqlalchemy.ext.asyncio import AsyncSession -from app.api.schemas.order import CreateOrderRequest, CreateOrderResponse, OrderDetailResponse +from app.api.schemas.order import ( + CreateOrderRequest, + CreateOrderResponse, + OrderDetailResponse, + OrderListResponse, +) from app.application.services.order_service import OrderService +from app.domain.enums import OrderStatus from app.infra.db.session import get_db_session router = APIRouter(prefix="/orders", tags=["orders"]) @@ -21,6 +27,27 @@ async def create_order( return await order_service.create_order(session, payload) +@router.get("", response_model=OrderListResponse) +async def list_orders( + page: int = Query(default=1, ge=1), + limit: int = Query(default=20, ge=1, le=100), + query: str | None = Query(default=None, min_length=1), + status_filter: OrderStatus | None = Query(default=None, alias="status"), + order_id: int | None = Query(default=None, ge=1), + session: AsyncSession = Depends(get_db_session), +) -> OrderListResponse: + """Fetch recent orders for dashboard overview pages.""" + + return await order_service.list_orders( + session, + page=page, + limit=limit, + query=query, + status_filter=status_filter, + order_id=order_id, + ) + + @router.get("/{order_id}", response_model=OrderDetailResponse) async def get_order( order_id: int, @@ -29,4 +56,3 @@ async def get_order( """Fetch order details.""" return await order_service.get_order(session, order_id) - diff --git a/app/api/routers/reviews.py b/app/api/routers/reviews.py index 1208697..d97bd73 100644 --- a/app/api/routers/reviews.py +++ b/app/api/routers/reviews.py @@ -3,6 +3,7 @@ from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession +from app.api.schemas.revision import ConfirmRevisionRequest, ConfirmRevisionResponse from app.api.schemas.review import PendingReviewResponse, SubmitReviewRequest, SubmitReviewResponse from app.application.services.review_service import ReviewService from app.infra.db.session import get_db_session @@ -30,3 +31,13 @@ async def submit_review( return await review_service.submit_review(session, order_id, payload) + +@router.post("/{order_id}/confirm-revision", response_model=ConfirmRevisionResponse) +async def confirm_revision( + order_id: int, + payload: ConfirmRevisionRequest, + session: AsyncSession = Depends(get_db_session), +) -> ConfirmRevisionResponse: + """Confirm a manual revision and resume the workflow.""" + + return await review_service.confirm_revision_continue(session, order_id, payload) diff --git a/app/api/routers/revisions.py b/app/api/routers/revisions.py new file mode 100644 index 0000000..3e64e12 --- /dev/null +++ b/app/api/routers/revisions.py @@ -0,0 +1,40 @@ +"""Manual revision routes.""" + +from fastapi import APIRouter, Depends, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.revision import ( + RegisterRevisionRequest, + RegisterRevisionResponse, + RevisionChainResponse, +) +from app.application.services.revision_service import RevisionService +from app.infra.db.session import get_db_session + +router = APIRouter(prefix="/orders", tags=["revisions"]) +revision_service = RevisionService() + + +@router.post( + "/{order_id}/revisions", + response_model=RegisterRevisionResponse, + status_code=status.HTTP_201_CREATED, +) +async def register_revision( + order_id: int, + payload: RegisterRevisionRequest, + session: AsyncSession = Depends(get_db_session), +) -> RegisterRevisionResponse: + """Register an offline manual revision asset for an order.""" + + return await revision_service.register_revision(session, order_id, payload) + + +@router.get("/{order_id}/revisions", response_model=RevisionChainResponse) +async def list_revisions( + order_id: int, + session: AsyncSession = Depends(get_db_session), +) -> RevisionChainResponse: + """List the single-line manual revision chain for an order.""" + + return await revision_service.list_revision_chain(session, order_id) diff --git a/app/api/routers/workflows.py b/app/api/routers/workflows.py index 5d8c1aa..a57a19c 100644 --- a/app/api/routers/workflows.py +++ b/app/api/routers/workflows.py @@ -1,16 +1,38 @@ """Workflow routes.""" -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession -from app.api.schemas.workflow import WorkflowStatusResponse +from app.api.schemas.workflow import WorkflowListResponse, WorkflowStatusResponse from app.application.services.workflow_service import WorkflowService +from app.domain.enums import OrderStatus from app.infra.db.session import get_db_session router = APIRouter(prefix="/workflows", tags=["workflows"]) workflow_service = WorkflowService() +@router.get("", response_model=WorkflowListResponse) +async def list_workflows( + page: int = Query(default=1, ge=1), + limit: int = Query(default=20, ge=1, le=100), + query: str | None = Query(default=None, min_length=1), + status_filter: OrderStatus | None = Query(default=None, alias="status"), + order_id: int | None = Query(default=None, ge=1), + session: AsyncSession = Depends(get_db_session), +) -> WorkflowListResponse: + """Fetch recent workflow runs for workflow lookup pages.""" + + return await workflow_service.list_workflows( + session, + page=page, + limit=limit, + query=query, + status_filter=status_filter, + order_id=order_id, + ) + + @router.get("/{order_id}", response_model=WorkflowStatusResponse) async def get_workflow_status( order_id: int, @@ -19,4 +41,3 @@ async def get_workflow_status( """Fetch persisted workflow status for an order.""" return await workflow_service.get_workflow_status(session, order_id) - diff --git a/app/api/schemas/asset.py b/app/api/schemas/asset.py index 1ffa77b..c074b7a 100644 --- a/app/api/schemas/asset.py +++ b/app/api/schemas/asset.py @@ -17,7 +17,10 @@ class AssetRead(BaseModel): order_id: int asset_type: AssetType step_name: WorkflowStepName | None + parent_asset_id: int | None = None + root_asset_id: int | None = None + version_no: int = 0 + is_current_version: bool = False uri: str metadata_json: dict[str, Any] | None created_at: datetime - diff --git a/app/api/schemas/order.py b/app/api/schemas/order.py index f03943a..de4f252 100644 --- a/app/api/schemas/order.py +++ b/app/api/schemas/order.py @@ -5,7 +5,7 @@ from datetime import datetime from pydantic import BaseModel from app.api.schemas.asset import AssetRead -from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode, WorkflowStepName +from app.domain.enums import CustomerLevel, OrderStatus, ReviewTaskStatus, ServiceMode, WorkflowStepName class CreateOrderRequest(BaseModel): @@ -41,7 +41,41 @@ class OrderDetailResponse(BaseModel): final_asset_id: int | None workflow_id: str | None current_step: WorkflowStepName | None + current_revision_asset_id: int | None = None + current_revision_version: int | None = None + latest_revision_asset_id: int | None = None + latest_revision_version: int | None = None + revision_count: int = 0 + review_task_status: ReviewTaskStatus | None = None + pending_manual_confirm: bool = False final_asset: AssetRead | None created_at: datetime updated_at: datetime + +class OrderListItemResponse(BaseModel): + """Order list item response for overview screens.""" + + order_id: int + workflow_id: str | None + customer_level: CustomerLevel + service_mode: ServiceMode + status: OrderStatus + current_step: WorkflowStepName | None + updated_at: datetime + final_asset_id: int | None + review_task_status: ReviewTaskStatus | None = None + latest_revision_asset_id: int | None = None + latest_revision_version: int | None = None + revision_count: int = 0 + pending_manual_confirm: bool = False + + +class OrderListResponse(BaseModel): + """Order list response.""" + + page: int + limit: int + total: int + total_pages: int + items: list[OrderListItemResponse] diff --git a/app/api/schemas/review.py b/app/api/schemas/review.py index 0f9ece6..7345090 100644 --- a/app/api/schemas/review.py +++ b/app/api/schemas/review.py @@ -4,7 +4,7 @@ from datetime import datetime from pydantic import BaseModel -from app.domain.enums import ReviewDecision, WorkflowStepName +from app.domain.enums import ReviewDecision, ReviewTaskStatus, WorkflowStepName class SubmitReviewRequest(BaseModel): @@ -32,5 +32,10 @@ class PendingReviewResponse(BaseModel): order_id: int workflow_id: str current_step: WorkflowStepName | None + review_task_status: ReviewTaskStatus = ReviewTaskStatus.PENDING + latest_revision_asset_id: int | None = None + current_revision_asset_id: int | None = None + latest_revision_version: int | None = None + revision_count: int = 0 + pending_manual_confirm: bool = False created_at: datetime - diff --git a/app/api/schemas/revision.py b/app/api/schemas/revision.py new file mode 100644 index 0000000..88252fc --- /dev/null +++ b/app/api/schemas/revision.py @@ -0,0 +1,69 @@ +"""Revision API schemas.""" + +from datetime import datetime + +from pydantic import BaseModel + +from app.domain.enums import ReviewDecision, ReviewTaskStatus + + +class RegisterRevisionRequest(BaseModel): + """Request payload for registering a manual revision asset.""" + + parent_asset_id: int + uploaded_uri: str + reviewer_id: int + comment: str | None = None + + +class RegisterRevisionResponse(BaseModel): + """Response returned after a manual revision has been registered.""" + + order_id: int + workflow_id: str + asset_id: int + parent_asset_id: int + root_asset_id: int + version_no: int + review_task_status: ReviewTaskStatus + latest_revision_asset_id: int + revision_count: int + + +class RevisionChainItem(BaseModel): + """One item in the manual revision chain.""" + + asset_id: int + order_id: int + parent_asset_id: int | None + root_asset_id: int | None + version_no: int + is_current_version: bool + uri: str + created_at: datetime + + +class RevisionChainResponse(BaseModel): + """Response returned when listing a revision chain.""" + + order_id: int + latest_revision_asset_id: int | None = None + revision_count: int = 0 + items: list[RevisionChainItem] + + +class ConfirmRevisionRequest(BaseModel): + """Request payload for confirming a manual revision.""" + + reviewer_id: int + comment: str | None = None + + +class ConfirmRevisionResponse(BaseModel): + """Response returned after confirming a manual revision.""" + + order_id: int + workflow_id: str + revision_asset_id: int + decision: ReviewDecision + status: str diff --git a/app/api/schemas/workflow.py b/app/api/schemas/workflow.py index 43ab140..7b5d633 100644 --- a/app/api/schemas/workflow.py +++ b/app/api/schemas/workflow.py @@ -5,7 +5,7 @@ from typing import Any from pydantic import BaseModel, ConfigDict -from app.domain.enums import OrderStatus, StepStatus, WorkflowStepName +from app.domain.enums import OrderStatus, ReviewTaskStatus, StepStatus, WorkflowStepName class WorkflowStepRead(BaseModel): @@ -32,7 +32,40 @@ class WorkflowStatusResponse(BaseModel): workflow_type: str workflow_status: OrderStatus current_step: WorkflowStepName | None + current_revision_asset_id: int | None = None + current_revision_version: int | None = None + latest_revision_asset_id: int | None = None + latest_revision_version: int | None = None + revision_count: int = 0 + review_task_status: ReviewTaskStatus | None = None + pending_manual_confirm: bool = False steps: list[WorkflowStepRead] created_at: datetime updated_at: datetime + +class WorkflowListItemResponse(BaseModel): + """Workflow list item response for workflow home screens.""" + + order_id: int + workflow_id: str + workflow_type: str + workflow_status: OrderStatus + current_step: WorkflowStepName | None + updated_at: datetime + failure_count: int + review_task_status: ReviewTaskStatus | None = None + latest_revision_asset_id: int | None = None + latest_revision_version: int | None = None + revision_count: int = 0 + pending_manual_confirm: bool = False + + +class WorkflowListResponse(BaseModel): + """Workflow list response.""" + + page: int + limit: int + total: int + total_pages: int + items: list[WorkflowListItemResponse] diff --git a/app/application/services/order_service.py b/app/application/services/order_service.py index 42ad39d..c68e80a 100644 --- a/app/application/services/order_service.py +++ b/app/application/services/order_service.py @@ -1,11 +1,20 @@ """Order application service.""" from fastapi import HTTPException, status -from sqlalchemy import select +from math import ceil + +from sqlalchemy import String, cast, func, or_, select from sqlalchemy.orm import selectinload from app.api.schemas.asset import AssetRead -from app.api.schemas.order import CreateOrderRequest, CreateOrderResponse, OrderDetailResponse +from app.api.schemas.order import ( + CreateOrderRequest, + CreateOrderResponse, + OrderDetailResponse, + OrderListItemResponse, + OrderListResponse, +) +from app.application.services.revision_service import RevisionService from app.application.services.workflow_service import WorkflowService from app.domain.enums import CustomerLevel, OrderStatus, ServiceMode from app.infra.db.models.order import OrderORM @@ -18,6 +27,7 @@ class OrderService: def __init__(self) -> None: self.workflow_service = WorkflowService() + self.revision_service = RevisionService() async def create_order(self, session, payload: CreateOrderRequest) -> CreateOrderResponse: """Create an order, persist a workflow run, and start Temporal execution.""" @@ -87,6 +97,7 @@ class OrderService: workflow_run = order.workflow_runs[0] if order.workflow_runs else None final_asset = next((asset for asset in order.assets if asset.id == order.final_asset_id), None) + snapshot = await self.revision_service.get_revision_snapshot(session, order_id) return OrderDetailResponse( order_id=order.id, @@ -100,11 +111,96 @@ class OrderService: final_asset_id=order.final_asset_id, workflow_id=workflow_run.workflow_id if workflow_run else None, current_step=workflow_run.current_step if workflow_run else None, + current_revision_asset_id=snapshot.current_revision_asset_id, + current_revision_version=snapshot.current_revision_version, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + latest_revision_version=snapshot.latest_revision_version, + revision_count=snapshot.revision_count, + review_task_status=snapshot.review_task_status, + pending_manual_confirm=snapshot.pending_manual_confirm, final_asset=AssetRead.model_validate(final_asset) if final_asset else None, created_at=order.created_at, updated_at=order.updated_at, ) + async def list_orders( + self, + session, + *, + page: int = 1, + limit: int = 20, + query: str | None = None, + status_filter: OrderStatus | None = None, + order_id: int | None = None, + ) -> OrderListResponse: + """Return recent orders for dashboard overview pages.""" + + filters = [] + + if status_filter is not None: + filters.append(OrderORM.status == status_filter) + + if order_id is not None: + filters.append(OrderORM.id == order_id) + + if query: + search_term = query.strip() + if search_term: + filters.append( + or_( + cast(OrderORM.id, String).ilike(f"{search_term}%"), + OrderORM.workflow_runs.any( + WorkflowRunORM.workflow_id.ilike(f"%{search_term}%") + ), + ) + ) + + query = select(OrderORM).options(selectinload(OrderORM.workflow_runs)) + count_query = select(func.count()).select_from(OrderORM) + + if filters: + query = query.where(*filters) + count_query = count_query.where(*filters) + + total = (await session.execute(count_query)).scalar_one() + total_pages = ceil(total / limit) if total else 0 + offset = (page - 1) * limit + + query = query.order_by(OrderORM.updated_at.desc(), OrderORM.id.desc()).offset(offset).limit(limit) + + result = await session.execute(query) + orders = result.scalars().all() + + items = [] + for order in orders: + workflow_run = order.workflow_runs[0] if order.workflow_runs else None + snapshot = await self.revision_service.get_revision_snapshot(session, order.id) + items.append( + OrderListItemResponse( + order_id=order.id, + workflow_id=workflow_run.workflow_id if workflow_run else None, + customer_level=order.customer_level, + service_mode=order.service_mode, + status=order.status, + current_step=workflow_run.current_step if workflow_run else None, + updated_at=order.updated_at, + final_asset_id=order.final_asset_id, + review_task_status=snapshot.review_task_status, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + latest_revision_version=snapshot.latest_revision_version, + revision_count=snapshot.revision_count, + pending_manual_confirm=snapshot.pending_manual_confirm, + ) + ) + + return OrderListResponse( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + items=items, + ) + @staticmethod def _validate_mode(customer_level: CustomerLevel, service_mode: ServiceMode) -> None: """Validate the allowed customer-level and service-mode combinations.""" @@ -119,4 +215,3 @@ class OrderService: status_code=status.HTTP_400_BAD_REQUEST, detail="Mid-level customers only support semi_pro", ) - diff --git a/app/application/services/review_service.py b/app/application/services/review_service.py index 01cc846..2841b05 100644 --- a/app/application/services/review_service.py +++ b/app/application/services/review_service.py @@ -3,9 +3,11 @@ from fastapi import HTTPException, status from sqlalchemy import select +from app.api.schemas.revision import ConfirmRevisionRequest, ConfirmRevisionResponse from app.api.schemas.review import PendingReviewResponse, SubmitReviewRequest, SubmitReviewResponse +from app.application.services.revision_service import RevisionService from app.application.services.workflow_service import WorkflowService -from app.domain.enums import OrderStatus, ReviewTaskStatus +from app.domain.enums import OrderStatus, ReviewDecision, ReviewTaskStatus from app.infra.db.models.asset import AssetORM from app.infra.db.models.order import OrderORM from app.infra.db.models.review_task import ReviewTaskORM @@ -18,6 +20,7 @@ class ReviewService: def __init__(self) -> None: self.workflow_service = WorkflowService() + self.revision_service = RevisionService() async def list_pending_reviews(self, session) -> list[PendingReviewResponse]: """Return all pending review tasks.""" @@ -25,20 +28,29 @@ class ReviewService: result = await session.execute( select(ReviewTaskORM, WorkflowRunORM) .join(WorkflowRunORM, WorkflowRunORM.order_id == ReviewTaskORM.order_id) - .where(ReviewTaskORM.status == ReviewTaskStatus.PENDING) + .where(ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED])) .order_by(ReviewTaskORM.created_at.asc()) ) - return [ - PendingReviewResponse( - review_task_id=review_task.id, - order_id=review_task.order_id, - workflow_id=workflow_run.workflow_id, - current_step=workflow_run.current_step, - created_at=review_task.created_at, + pending_reviews = [] + for review_task, workflow_run in result.all(): + snapshot = await self.revision_service.get_revision_snapshot(session, review_task.order_id) + pending_reviews.append( + PendingReviewResponse( + review_task_id=review_task.id, + order_id=review_task.order_id, + workflow_id=workflow_run.workflow_id, + current_step=workflow_run.current_step, + review_task_status=review_task.status, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + current_revision_asset_id=snapshot.current_revision_asset_id, + latest_revision_version=snapshot.latest_revision_version, + revision_count=snapshot.revision_count, + pending_manual_confirm=snapshot.pending_manual_confirm, + created_at=review_task.created_at, + ) ) - for review_task, workflow_run in result.all() - ] + return pending_reviews async def submit_review(self, session, order_id: int, payload: SubmitReviewRequest) -> SubmitReviewResponse: """Persist the review submission and signal the Temporal workflow.""" @@ -71,7 +83,7 @@ class ReviewService: select(ReviewTaskORM) .where( ReviewTaskORM.order_id == order_id, - ReviewTaskORM.status == ReviewTaskStatus.PENDING, + ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED]), ) .order_by(ReviewTaskORM.created_at.desc()) ) @@ -80,10 +92,22 @@ class ReviewService: review_task = ReviewTaskORM(order_id=order_id, status=ReviewTaskStatus.SUBMITTED) session.add(review_task) + current_task_status = review_task.status + selected_asset_id = payload.selected_asset_id + if selected_asset_id is None and current_task_status == ReviewTaskStatus.REVISION_UPLOADED: + selected_asset_id = review_task.resume_asset_id or review_task.latest_revision_asset_id + if selected_asset_id is not None: + asset = await session.get(AssetORM, selected_asset_id) + if asset is None or asset.order_id != order_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Selected asset does not belong to the order", + ) + review_task.status = ReviewTaskStatus.SUBMITTED review_task.decision = payload.decision review_task.reviewer_id = payload.reviewer_id - review_task.selected_asset_id = payload.selected_asset_id + review_task.selected_asset_id = selected_asset_id review_task.comment = payload.comment await session.commit() @@ -93,7 +117,7 @@ class ReviewService: ReviewSignalPayload( decision=payload.decision, reviewer_id=payload.reviewer_id, - selected_asset_id=payload.selected_asset_id, + selected_asset_id=selected_asset_id, comment=payload.comment, ), ) @@ -110,3 +134,88 @@ class ReviewService: status="submitted", ) + async def confirm_revision_continue( + self, + session, + order_id: int, + payload: ConfirmRevisionRequest, + ) -> ConfirmRevisionResponse: + """Confirm the latest revision and resume the existing workflow.""" + + order = await session.get(OrderORM, order_id) + if order is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found") + if order.status != OrderStatus.WAITING_REVIEW: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Order is not waiting for review", + ) + + workflow_result = await session.execute( + select(WorkflowRunORM).where(WorkflowRunORM.order_id == order_id) + ) + workflow_run = workflow_result.scalar_one_or_none() + if workflow_run is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found") + + review_result = await session.execute( + select(ReviewTaskORM) + .where( + ReviewTaskORM.order_id == order_id, + ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED]), + ) + .order_by(ReviewTaskORM.created_at.desc(), ReviewTaskORM.id.desc()) + ) + review_task = review_result.scalars().first() + if review_task is None: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="No active review task") + if review_task.status != ReviewTaskStatus.REVISION_UPLOADED: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="No uploaded revision to confirm", + ) + + revision_asset_id = review_task.resume_asset_id or review_task.latest_revision_asset_id + if revision_asset_id is None: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="No uploaded revision to confirm", + ) + + revision_asset = await session.get(AssetORM, revision_asset_id) + if revision_asset is None or revision_asset.order_id != order_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Revision asset does not belong to the order", + ) + + review_task.status = ReviewTaskStatus.SUBMITTED + review_task.decision = ReviewDecision.APPROVE + review_task.reviewer_id = payload.reviewer_id + review_task.selected_asset_id = revision_asset_id + review_task.comment = payload.comment + await session.commit() + + try: + await self.workflow_service.signal_review( + workflow_run.workflow_id, + ReviewSignalPayload( + decision=ReviewDecision.APPROVE, + reviewer_id=payload.reviewer_id, + selected_asset_id=revision_asset_id, + comment=payload.comment, + ), + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Failed to signal Temporal workflow: {exc}", + ) from exc + + return ConfirmRevisionResponse( + order_id=order_id, + workflow_id=workflow_run.workflow_id, + revision_asset_id=revision_asset_id, + decision=ReviewDecision.APPROVE, + status="submitted", + ) diff --git a/app/application/services/revision_service.py b/app/application/services/revision_service.py new file mode 100644 index 0000000..8076063 --- /dev/null +++ b/app/application/services/revision_service.py @@ -0,0 +1,251 @@ +"""Manual revision application service.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from fastapi import HTTPException, status +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.schemas.revision import ( + RegisterRevisionRequest, + RegisterRevisionResponse, + RevisionChainItem, + RevisionChainResponse, +) +from app.domain.enums import AssetType, OrderStatus, ReviewTaskStatus, WorkflowStepName +from app.infra.db.models.asset import AssetORM +from app.infra.db.models.order import OrderORM +from app.infra.db.models.review_task import ReviewTaskORM +from app.infra.db.models.workflow_run import WorkflowRunORM + + +@dataclass(slots=True) +class RevisionSnapshot: + """Revision summary used by the order, queue, and workflow responses.""" + + current_revision_asset_id: int | None + current_revision_version: int | None + latest_revision_asset_id: int | None + latest_revision_version: int | None + revision_count: int + review_task_status: ReviewTaskStatus | None + pending_manual_confirm: bool + root_asset_id: int | None + + +class RevisionService: + """Application service for manual revision registration and lookup.""" + + async def get_revision_snapshot(self, session: AsyncSession, order_id: int) -> RevisionSnapshot: + """Return the current revision summary for an order.""" + + await self._require_order(session, order_id) + + revision_result = await session.execute( + select(AssetORM) + .where( + AssetORM.order_id == order_id, + AssetORM.asset_type == AssetType.MANUAL_REVISION, + ) + .order_by(AssetORM.version_no.asc(), AssetORM.created_at.asc(), AssetORM.id.asc()) + ) + revisions = revision_result.scalars().all() + latest_revision = revisions[-1] if revisions else None + + review_result = await session.execute( + select(ReviewTaskORM) + .where(ReviewTaskORM.order_id == order_id) + .order_by(ReviewTaskORM.created_at.desc(), ReviewTaskORM.id.desc()) + ) + latest_review_task = review_result.scalars().first() + + return RevisionSnapshot( + current_revision_asset_id=latest_revision.id if latest_revision else None, + current_revision_version=latest_revision.version_no if latest_revision else None, + latest_revision_asset_id=latest_revision.id if latest_revision else None, + latest_revision_version=latest_revision.version_no if latest_revision else None, + revision_count=len(revisions), + review_task_status=latest_review_task.status if latest_review_task else None, + pending_manual_confirm=bool( + latest_review_task and latest_review_task.status == ReviewTaskStatus.REVISION_UPLOADED + ), + root_asset_id=latest_revision.root_asset_id if latest_revision else None, + ) + + async def register_revision( + self, + session: AsyncSession, + order_id: int, + payload: RegisterRevisionRequest, + ) -> RegisterRevisionResponse: + """Register an offline manual revision asset for a waiting-review order.""" + + order = await self._require_order(session, order_id) + if order.status != OrderStatus.WAITING_REVIEW: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Order is not waiting for review", + ) + + workflow_run = await self._require_workflow_run(session, order_id) + review_task = await self._get_active_review_task(session, order_id) + + parent_asset = await session.get(AssetORM, payload.parent_asset_id) + if parent_asset is None or parent_asset.order_id != order_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Parent asset does not belong to the order", + ) + + root_asset_id = parent_asset.root_asset_id or parent_asset.id + version_result = await session.execute( + select(func.max(AssetORM.version_no)).where( + AssetORM.order_id == order_id, + AssetORM.asset_type == AssetType.MANUAL_REVISION, + AssetORM.root_asset_id == root_asset_id, + ) + ) + latest_version = version_result.scalar_one() + if parent_asset.asset_type != AssetType.MANUAL_REVISION and latest_version: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Parent asset is not the current revision version", + ) + if parent_asset.asset_type == AssetType.MANUAL_REVISION and not parent_asset.is_current_version: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Parent asset is not the current revision version", + ) + + version_no = (latest_version or 0) + 1 + current_version_result = await session.execute( + select(AssetORM).where( + AssetORM.order_id == order_id, + AssetORM.asset_type == AssetType.MANUAL_REVISION, + AssetORM.root_asset_id == root_asset_id, + AssetORM.is_current_version.is_(True), + ) + ) + current_version_asset = current_version_result.scalar_one_or_none() + if current_version_asset is not None: + current_version_asset.is_current_version = False + + asset = AssetORM( + order_id=order_id, + asset_type=AssetType.MANUAL_REVISION, + step_name=WorkflowStepName.REVIEW, + parent_asset_id=payload.parent_asset_id, + root_asset_id=root_asset_id, + version_no=version_no, + is_current_version=True, + uri=payload.uploaded_uri, + metadata_json={ + "reviewer_id": payload.reviewer_id, + "comment": payload.comment, + "source": "manual_revision", + }, + ) + session.add(asset) + await session.flush() + + review_task.status = ReviewTaskStatus.REVISION_UPLOADED + review_task.latest_revision_asset_id = asset.id + review_task.resume_asset_id = asset.id + review_task.selected_asset_id = asset.id + review_task.reviewer_id = payload.reviewer_id + review_task.comment = payload.comment + await session.commit() + + revision_count = await self.count_manual_revisions(session, order_id) + return RegisterRevisionResponse( + order_id=order_id, + workflow_id=workflow_run.workflow_id, + asset_id=asset.id, + parent_asset_id=asset.parent_asset_id or payload.parent_asset_id, + root_asset_id=asset.root_asset_id or root_asset_id, + version_no=asset.version_no, + review_task_status=review_task.status, + latest_revision_asset_id=asset.id, + revision_count=revision_count, + ) + + async def list_revision_chain( + self, + session: AsyncSession, + order_id: int, + ) -> RevisionChainResponse: + """List all manual revisions for an order in version order.""" + + await self._require_order(session, order_id) + + result = await session.execute( + select(AssetORM) + .where( + AssetORM.order_id == order_id, + AssetORM.asset_type == AssetType.MANUAL_REVISION, + ) + .order_by(AssetORM.version_no.asc(), AssetORM.created_at.asc(), AssetORM.id.asc()) + ) + assets = result.scalars().all() + snapshot = await self.get_revision_snapshot(session, order_id) + + return RevisionChainResponse( + order_id=order_id, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + revision_count=len(assets), + items=[ + RevisionChainItem( + asset_id=asset.id, + order_id=asset.order_id, + parent_asset_id=asset.parent_asset_id, + root_asset_id=asset.root_asset_id, + version_no=asset.version_no, + is_current_version=asset.is_current_version, + uri=asset.uri, + created_at=asset.created_at, + ) + for asset in assets + ], + ) + + async def count_manual_revisions(self, session: AsyncSession, order_id: int) -> int: + """Return the number of manual revision assets for an order.""" + + result = await session.execute( + select(func.count(AssetORM.id)).where( + AssetORM.order_id == order_id, + AssetORM.asset_type == AssetType.MANUAL_REVISION, + ) + ) + return result.scalar_one() + + async def _require_order(self, session: AsyncSession, order_id: int) -> OrderORM: + order = await session.get(OrderORM, order_id) + if order is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found") + return order + + async def _require_workflow_run(self, session: AsyncSession, order_id: int) -> WorkflowRunORM: + result = await session.execute(select(WorkflowRunORM).where(WorkflowRunORM.order_id == order_id)) + workflow_run = result.scalar_one_or_none() + if workflow_run is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found") + return workflow_run + + async def _get_active_review_task(self, session: AsyncSession, order_id: int) -> ReviewTaskORM: + result = await session.execute( + select(ReviewTaskORM) + .where( + ReviewTaskORM.order_id == order_id, + ReviewTaskORM.status.in_( + [ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED] + ), + ) + .order_by(ReviewTaskORM.created_at.desc(), ReviewTaskORM.id.desc()) + ) + review_task = result.scalars().first() + if review_task is None: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="No active review task") + return review_task diff --git a/app/application/services/workflow_service.py b/app/application/services/workflow_service.py index cb4a7b6..0d6e908 100644 --- a/app/application/services/workflow_service.py +++ b/app/application/services/workflow_service.py @@ -7,12 +7,20 @@ """ from datetime import timedelta +from math import ceil from fastapi import HTTPException, status -from sqlalchemy import select +from sqlalchemy import String, cast, func, or_, select from sqlalchemy.orm import selectinload -from app.api.schemas.workflow import WorkflowStatusResponse, WorkflowStepRead +from app.api.schemas.workflow import ( + WorkflowListItemResponse, + WorkflowListResponse, + WorkflowStatusResponse, + WorkflowStepRead, +) +from app.domain.enums import OrderStatus +from app.application.services.revision_service import RevisionService from app.domain.enums import ServiceMode from app.infra.db.models.workflow_run import WorkflowRunORM from app.infra.temporal.client import get_temporal_client @@ -25,6 +33,9 @@ from app.workers.workflows.types import PipelineWorkflowInput, ReviewSignalPaylo class WorkflowService: """Temporal 编排服务。""" + def __init__(self) -> None: + self.revision_service = RevisionService() + @staticmethod def workflow_type_for_mode(service_mode: ServiceMode) -> str: """根据服务模式返回对应的 workflow 类型名。""" @@ -81,13 +92,101 @@ class WorkflowService: if workflow_run is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found") + snapshot = await self.revision_service.get_revision_snapshot(session, order_id) + return WorkflowStatusResponse( order_id=workflow_run.order_id, workflow_id=workflow_run.workflow_id, workflow_type=workflow_run.workflow_type, workflow_status=workflow_run.status, current_step=workflow_run.current_step, + current_revision_asset_id=snapshot.current_revision_asset_id, + current_revision_version=snapshot.current_revision_version, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + latest_revision_version=snapshot.latest_revision_version, + revision_count=snapshot.revision_count, + review_task_status=snapshot.review_task_status, + pending_manual_confirm=snapshot.pending_manual_confirm, steps=[WorkflowStepRead.model_validate(step) for step in workflow_run.steps], created_at=workflow_run.created_at, updated_at=workflow_run.updated_at, ) + + async def list_workflows( + self, + session, + *, + page: int = 1, + limit: int = 20, + query: str | None = None, + status_filter: OrderStatus | None = None, + order_id: int | None = None, + ) -> WorkflowListResponse: + """Return recent workflow runs for dashboard lookup pages.""" + + filters = [] + + if status_filter is not None: + filters.append(WorkflowRunORM.status == status_filter) + + if order_id is not None: + filters.append(WorkflowRunORM.order_id == order_id) + + if query: + search_term = query.strip() + if search_term: + filters.append( + or_( + cast(WorkflowRunORM.order_id, String).ilike(f"{search_term}%"), + WorkflowRunORM.workflow_id.ilike(f"%{search_term}%"), + ) + ) + + query = select(WorkflowRunORM).options(selectinload(WorkflowRunORM.steps)) + count_query = select(func.count()).select_from(WorkflowRunORM) + + if filters: + query = query.where(*filters) + count_query = count_query.where(*filters) + + total = (await session.execute(count_query)).scalar_one() + total_pages = ceil(total / limit) if total else 0 + offset = (page - 1) * limit + + query = query.order_by(WorkflowRunORM.updated_at.desc(), WorkflowRunORM.id.desc()).offset(offset).limit(limit) + + result = await session.execute(query) + workflow_runs = result.scalars().all() + + items = [] + for workflow_run in workflow_runs: + snapshot = await self.revision_service.get_revision_snapshot( + session, + workflow_run.order_id, + ) + items.append( + WorkflowListItemResponse( + order_id=workflow_run.order_id, + workflow_id=workflow_run.workflow_id, + workflow_type=workflow_run.workflow_type, + workflow_status=workflow_run.status, + current_step=workflow_run.current_step, + updated_at=workflow_run.updated_at, + failure_count=sum( + 1 for step in workflow_run.steps if step.step_status.value == "failed" + ), + review_task_status=snapshot.review_task_status, + latest_revision_asset_id=snapshot.latest_revision_asset_id, + latest_revision_version=snapshot.latest_revision_version, + revision_count=snapshot.revision_count, + pending_manual_confirm=snapshot.pending_manual_confirm, + ) + ) + + return WorkflowListResponse( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + items=items, + ) diff --git a/app/domain/enums.py b/app/domain/enums.py index c8569a2..bde6a8b 100644 --- a/app/domain/enums.py +++ b/app/domain/enums.py @@ -66,6 +66,7 @@ class ReviewTaskStatus(str, Enum): """Status of a human review task.""" PENDING = "pending" + REVISION_UPLOADED = "revision_uploaded" SUBMITTED = "submitted" @@ -79,5 +80,5 @@ class AssetType(str, Enum): FACE = "face" FUSION = "fusion" QC_CANDIDATE = "qc_candidate" + MANUAL_REVISION = "manual_revision" FINAL = "final" - diff --git a/app/infra/db/models/asset.py b/app/infra/db/models/asset.py index 854c28c..b695988 100644 --- a/app/infra/db/models/asset.py +++ b/app/infra/db/models/asset.py @@ -1,8 +1,10 @@ """Asset ORM model.""" +from __future__ import annotations + from typing import Any -from sqlalchemy import Enum, ForeignKey, Integer, JSON, String +from sqlalchemy import Boolean, Enum, ForeignKey, Integer, JSON, String from sqlalchemy.orm import Mapped, mapped_column, relationship from app.domain.enums import AssetType, WorkflowStepName @@ -24,8 +26,15 @@ class AssetORM(TimestampMixin, Base): Enum(WorkflowStepName, native_enum=False), nullable=True, ) + parent_asset_id: Mapped[int | None] = mapped_column( + ForeignKey("assets.id"), + nullable=True, + index=True, + ) + root_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True) + version_no: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + is_current_version: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) uri: Mapped[str] = mapped_column(String(500), nullable=False) metadata_json: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) order = relationship("OrderORM", back_populates="assets") - diff --git a/app/infra/db/models/review_task.py b/app/infra/db/models/review_task.py index f6d9351..c96037f 100644 --- a/app/infra/db/models/review_task.py +++ b/app/infra/db/models/review_task.py @@ -25,7 +25,8 @@ class ReviewTaskORM(TimestampMixin, Base): ) reviewer_id: Mapped[int | None] = mapped_column(Integer, nullable=True) selected_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + latest_revision_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + resume_asset_id: Mapped[int | None] = mapped_column(Integer, nullable=True) comment: Mapped[str | None] = mapped_column(Text, nullable=True) order = relationship("OrderORM", back_populates="review_tasks") - diff --git a/app/main.py b/app/main.py index e53f0ed..dd2ab7e 100644 --- a/app/main.py +++ b/app/main.py @@ -7,6 +7,7 @@ from fastapi import FastAPI from app.api.routers.assets import router as assets_router from app.api.routers.health import router as health_router from app.api.routers.orders import router as orders_router +from app.api.routers.revisions import router as revisions_router from app.api.routers.reviews import router as reviews_router from app.api.routers.workflows import router as workflows_router from app.config.settings import get_settings @@ -31,6 +32,7 @@ def create_app() -> FastAPI: app.include_router(health_router) app.include_router(orders_router, prefix=settings.api_prefix) app.include_router(assets_router, prefix=settings.api_prefix) + app.include_router(revisions_router, prefix=settings.api_prefix) app.include_router(reviews_router, prefix=settings.api_prefix) app.include_router(workflows_router, prefix=settings.api_prefix) return app diff --git a/docs/superpowers/plans/2026-03-27-manual-revision-backend.md b/docs/superpowers/plans/2026-03-27-manual-revision-backend.md new file mode 100644 index 0000000..86f36ac --- /dev/null +++ b/docs/superpowers/plans/2026-03-27-manual-revision-backend.md @@ -0,0 +1,401 @@ +# Manual Revision Backend Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add backend support for the mid-end manual revision flow: export current asset, register offline-edited revision assets as a single version chain, and require an explicit confirm action before the waiting workflow continues. + +**Architecture:** Reuse the existing `waiting_review` Temporal pause in `MidEndPipelineWorkflow` instead of inventing a second workflow state machine. Model manual revisions as first-class asset versions plus a richer review-task status, then implement dedicated HTTP endpoints for revision registration, revision-chain queries, and confirm-continue. The final confirm endpoint will bridge back into the existing review signal by approving the latest revision asset explicitly. + +**Tech Stack:** FastAPI, Pydantic, SQLAlchemy async ORM, SQLite, Temporal Python SDK, pytest + +--- + +### Task 1: Lock the desired backend behavior with failing integration tests + +**Files:** +- Modify: `tests/test_api.py` + +- [ ] **Step 1: Add a failing test for registering a manual revision asset while the order is waiting for review** + +Add a new integration test next to the existing mid-end review tests that: +- creates a `semi_pro` order +- waits for `waiting_review` +- calls the new revision registration endpoint +- asserts the response returns a new asset id, version number, parent asset id, and review task state `revision_uploaded` + +- [ ] **Step 2: Add a failing test for confirming the revision and letting the existing workflow finish** + +Add a second integration test that: +- creates a `semi_pro` order +- waits for `waiting_review` +- registers a revision asset +- calls the new confirm endpoint +- waits for the workflow result +- asserts the workflow succeeds and the order final asset is derived from the revision asset rather than the original QC candidate + +- [ ] **Step 3: Add a failing test for listing the single-line revision chain** + +Add an integration test that: +- creates a `semi_pro` order +- waits for `waiting_review` +- registers two manual revisions in sequence +- calls the revision chain query endpoint +- asserts the chain order is `v1 -> v2 -> v3` with correct parent-child relationships + +- [ ] **Step 4: Run the three new tests and verify they fail for missing routes and schema** + +Run: + +```bash +pytest tests/test_api.py -k "manual_revision or revision_chain or confirm_revision" -v +``` + +Expected: +- FastAPI returns `404` or `422` +- no revision assets are created +- current codebase does not satisfy the new flow yet + +- [ ] **Step 5: Commit the failing tests** + +```bash +git add tests/test_api.py +git commit -m "test: cover manual revision review flow" +``` + +### Task 2: Add persistence for revision assets and review-task substate + +**Files:** +- Modify: `app/domain/enums.py` +- Modify: `app/infra/db/models/asset.py` +- Modify: `app/infra/db/models/review_task.py` +- Modify: `app/infra/db/models/order.py` +- Modify: `app/infra/db/session.py` + +- [ ] **Step 1: Extend enums for manual revision support** + +Update `app/domain/enums.py` with: +- a new asset type such as `MANUAL_REVISION` +- a new review-task status such as `REVISION_UPLOADED` + +Keep `OrderStatus.WAITING_REVIEW` unchanged so the workflow can stay paused in the same state. + +- [ ] **Step 2: Add version-chain columns to `AssetORM`** + +Update `app/infra/db/models/asset.py` to add: +- `parent_asset_id: int | None` +- `root_asset_id: int | None` +- `version_no: int` +- `is_current_version: bool` + +Add a self-referential relationship only if it stays simple; otherwise keep reads explicit in services to avoid ORM complexity. + +- [ ] **Step 3: Add review-task fields for the current revision under review** + +Update `app/infra/db/models/review_task.py` to add: +- `latest_revision_asset_id: int | None` +- `resume_asset_id: int | None` + +The open review task should remain the single source of truth for: +- whether the order is still in `waiting_review` +- whether a revision was uploaded but not yet confirmed +- which asset should be used on confirm + +- [ ] **Step 4: Keep the model changes compatible with the current bootstrapping approach** + +Update `app/infra/db/session.py` only if imports need to change. Do not introduce Alembic in this MVP plan; this repo currently uses `Base.metadata.create_all`, so keep the first implementation aligned with the existing bootstrap model. + +- [ ] **Step 5: Run the focused tests again and verify failure has moved from schema absence to route/service absence** + +Run: + +```bash +pytest tests/test_api.py -k "manual_revision or revision_chain or confirm_revision" -v +``` + +Expected: +- tables boot successfully in test setup +- failures now point at missing service logic or missing endpoints + +- [ ] **Step 6: Commit the persistence changes** + +```bash +git add app/domain/enums.py app/infra/db/models/asset.py app/infra/db/models/review_task.py app/infra/db/models/order.py app/infra/db/session.py +git commit -m "feat: add persistence for manual revision state" +``` + +### Task 3: Add revision registration and revision-chain query APIs + +**Files:** +- Create: `app/api/schemas/revision.py` +- Create: `app/application/services/revision_service.py` +- Create: `app/api/routers/revisions.py` +- Modify: `app/main.py` +- Modify: `app/api/schemas/asset.py` +- Modify: `app/application/services/asset_service.py` + +- [ ] **Step 1: Define revision request and response schemas** + +Create `app/api/schemas/revision.py` with: +- `RegisterRevisionRequest` +- `RegisterRevisionResponse` +- `RevisionChainItem` +- `RevisionChainResponse` +- `ConfirmRevisionResponse` + +For the MVP, use JSON fields instead of multipart upload: +- `parent_asset_id` +- `uploaded_uri` +- `reviewer_id` +- `comment` + +This keeps the current mock-backed architecture coherent. Real object storage upload can be a later phase. + +- [ ] **Step 2: Implement `RevisionService.register_revision`** + +Create `app/application/services/revision_service.py` with logic that: +- loads the order and verifies it is `waiting_review` +- loads the active pending review task +- validates `parent_asset_id` belongs to the order +- creates a new `AssetORM` row with `asset_type=MANUAL_REVISION` +- computes `root_asset_id` and `version_no` +- marks previous revision asset as `is_current_version=False` +- updates the active review task to `REVISION_UPLOADED` +- sets `latest_revision_asset_id` and `resume_asset_id` to the new asset + +- [ ] **Step 3: Implement `RevisionService.list_revision_chain`** + +Query all order assets that belong to the same root chain, ordered by `version_no ASC`, and serialize them for the UI. + +- [ ] **Step 4: Expose the revision routes** + +Create `app/api/routers/revisions.py` with: +- `POST /api/v1/orders/{order_id}/revisions` +- `GET /api/v1/orders/{order_id}/revisions` + +Wire the router in `app/main.py`. + +- [ ] **Step 5: Extend asset serialization for the UI** + +Update `app/api/schemas/asset.py` to expose: +- `parent_asset_id` +- `root_asset_id` +- `version_no` +- `is_current_version` + +Update `app/application/services/asset_service.py` if ordering needs to prefer `version_no` over raw `created_at`. + +- [ ] **Step 6: Run the revision registration and chain tests** + +Run: + +```bash +pytest tests/test_api.py -k "manual_revision or revision_chain" -v +``` + +Expected: +- registration test passes +- chain test passes +- confirm test still fails because continue logic is not implemented yet + +- [ ] **Step 7: Commit the revision API work** + +```bash +git add app/api/schemas/revision.py app/application/services/revision_service.py app/api/routers/revisions.py app/main.py app/api/schemas/asset.py app/application/services/asset_service.py tests/test_api.py +git commit -m "feat: add manual revision registration and chain queries" +``` + +### Task 4: Add explicit confirm-continue that reuses the existing review signal + +**Files:** +- Modify: `app/api/schemas/review.py` +- Modify: `app/api/routers/reviews.py` +- Modify: `app/application/services/review_service.py` +- Modify: `app/workers/workflows/types.py` + +- [ ] **Step 1: Add confirm request and response models** + +Extend `app/api/schemas/review.py` with: +- `ConfirmRevisionRequest` +- `ConfirmRevisionResponse` + +Fields should include: +- `reviewer_id` +- `comment` + +Do not ask the caller for `selected_asset_id`; the backend should derive that from the active review task’s `resume_asset_id`. + +- [ ] **Step 2: Implement `confirm_revision_continue` in `ReviewService`** + +Add a new service method that: +- verifies the order is still `waiting_review` +- loads the active review task +- rejects the call unless task status is `REVISION_UPLOADED` +- verifies `resume_asset_id` is present +- marks the task as `SUBMITTED` +- reuses `WorkflowService.signal_review(...)` with: + - `decision=APPROVE` + - `selected_asset_id=resume_asset_id` + - `comment` prefixed or structured to indicate manual revision confirmation + +This is the key MVP simplification: the workflow does not need a new signal type because it already knows how to export an explicitly selected asset. + +- [ ] **Step 3: Expose a dedicated confirm route** + +Add a route such as: + +```text +POST /api/v1/reviews/{order_id}/confirm-revision +``` + +Keep it separate from `/submit` so the API remains clear and the front-end does not need to fake a normal approve call. + +- [ ] **Step 4: Normalize the Temporal payload type if needed** + +Update `app/workers/workflows/types.py` only if the review payload needs an optional metadata field such as `source="manual_revision_confirm"`. Skip this if current payload is already sufficient. + +- [ ] **Step 5: Run the confirm-flow test** + +Run: + +```bash +pytest tests/test_api.py -k "confirm_revision" -v +``` + +Expected: +- workflow resumes from the existing `waiting_review` +- export uses the revision asset id +- order finishes as `succeeded` + +- [ ] **Step 6: Commit the confirm flow** + +```bash +git add app/api/schemas/review.py app/api/routers/reviews.py app/application/services/review_service.py app/workers/workflows/types.py tests/test_api.py +git commit -m "feat: confirm manual revision and resume workflow" +``` + +### Task 5: Surface revision state in order, queue, and workflow responses + +**Files:** +- Modify: `app/api/schemas/order.py` +- Modify: `app/api/schemas/review.py` +- Modify: `app/api/schemas/workflow.py` +- Modify: `app/application/services/order_service.py` +- Modify: `app/application/services/review_service.py` +- Modify: `app/application/services/workflow_service.py` + +- [ ] **Step 1: Extend order detail response** + +Update `OrderDetailResponse` to include: +- `current_revision_asset_id` +- `current_revision_version` +- `revision_count` +- `review_task_status` + +- [ ] **Step 2: Extend pending review response** + +Update `PendingReviewResponse` to include: +- `review_status` +- `latest_revision_asset_id` +- `revision_count` + +This is what the new queue UI needs for labels like `可人工介入` and `待确认回流`. + +- [ ] **Step 3: Extend workflow/status response only with revision summary, not duplicated chain detail** + +Update `WorkflowStatusResponse` or add a nested summary object with: +- `latest_revision_asset_id` +- `latest_revision_version` +- `pending_manual_confirm: bool` + +Do not duplicate the full revision chain here; the dedicated revision-chain endpoint already covers that. + +- [ ] **Step 4: Implement the response assembly in services** + +Update: +- `OrderService.get_order` +- `ReviewService.list_pending_reviews` +- `WorkflowService.get_workflow_status` + +Use the current open review task plus current-version asset rows to compute the response fields. + +- [ ] **Step 5: Add or update tests for enriched responses** + +Extend `tests/test_api.py` assertions so the new endpoints and existing endpoints expose the fields required by the designed UI. + +- [ ] **Step 6: Run the full API test module** + +Run: + +```bash +pytest tests/test_api.py -v +``` + +Expected: +- existing approve/rerun tests still pass +- new manual revision tests pass +- no regression in low-end flow + +- [ ] **Step 7: Commit the response-shape changes** + +```bash +git add app/api/schemas/order.py app/api/schemas/review.py app/api/schemas/workflow.py app/application/services/order_service.py app/application/services/review_service.py app/application/services/workflow_service.py tests/test_api.py +git commit -m "feat: expose manual revision state in API responses" +``` + +### Task 6: Documentation and final verification + +**Files:** +- Modify: `README.md` +- Modify: `docs/superpowers/specs/2026-03-27-review-workbench-design.md` + +- [ ] **Step 1: Document the manual revision API flow** + +Update `README.md` with: +- revision registration endpoint +- revision chain endpoint +- confirm-revision endpoint +- the fact that current MVP uses URI registration instead of real binary upload + +- [ ] **Step 2: Sync the spec wording with the implemented API names** + +Update the design spec only where route names or payload names need to match the code. + +- [ ] **Step 3: Run the complete test suite** + +Run: + +```bash +pytest -q +``` + +Expected: +- all existing tests pass +- new manual revision flow is covered + +- [ ] **Step 4: Do a quick endpoint smoke pass** + +Run: + +```bash +pytest tests/test_api.py::test_mid_end_order_waits_review_then_approves -v +pytest tests/test_api.py -k "manual_revision or confirm_revision" -v +``` + +Expected: +- baseline approve flow still works +- manual revision register/confirm flow works + +- [ ] **Step 5: Commit docs and verification updates** + +```bash +git add README.md docs/superpowers/specs/2026-03-27-review-workbench-design.md +git commit -m "docs: describe manual revision backend flow" +``` + +## Notes for the Implementer + +- Keep the first implementation scoped to `semi_pro` and the existing single review pause. +- Do not add a second Temporal workflow or a second review wait state in this MVP. +- Do not implement real file storage upload in this pass; register an uploaded URI or mock URI first. +- Keep the version chain single-line. Reject requests that try to branch from a non-current version. +- If a persistent SQLite database already exists locally, schema changes may require deleting the dev DB before rerunning because this repo currently has no migration system. diff --git a/docs/superpowers/specs/2026-03-27-review-workbench-design.md b/docs/superpowers/specs/2026-03-27-review-workbench-design.md new file mode 100644 index 0000000..e3f4ca0 --- /dev/null +++ b/docs/superpowers/specs/2026-03-27-review-workbench-design.md @@ -0,0 +1,312 @@ +# 审核优先运营台设计文档 + +日期:2026-03-27 + +## 1. 背景 + +当前项目是一个 `FastAPI + Temporal + SQLite + SQLAlchemy` 的图片流水线 MVP,支持: + +- 低端自动流程 `auto_basic` +- 中端半自动流程 `semi_pro` +- 订单创建、订单详情、资产查询、待审核列表、审核提交、workflow 状态查询 +- `approve / rerun_scene / rerun_face / rerun_fusion` 审核信号 +- 面向人工审核节点的扩展设计:人工导出、离线修订、上传新副本并确认回流 + +本次目标不是实现完整前端系统,而是为当前后端能力设计一个可落地的桌面端运营页面原型,服务内部审核人员。 + +## 2. 设计目标 + +- 优先支持审核员处理 `waiting_review` 订单 +- 让审核员先看图,再结合流程信息做决策 +- 在单页内完成:选单、看图、流程判断、提交审核 +- 在人工审核节点支持“导出原图 -> 离线修改 -> 上传新副本 -> 确认继续流水线” +- 保留新建订单入口,但不干扰审核主任务 + +## 3. 用户与使用场景 + +目标用户是内部运营/审核人员,主要在桌面端使用。 + +核心任务链路: + +1. 从待审核队列中找到当前需要处理的订单 +2. 查看候选图、最终图和历史版本 +3. 结合 workflow 当前步骤、历史步骤与异常信息判断结果 +4. 提交 `approve` 或 `rerun_*` +5. 进入下一单继续审核 + +扩展任务链路: + +1. 在人工审核节点导出当前候选图 +2. 线下修订后上传为新的副本版本 +3. 在页面内检查新副本是否正确 +4. 手动点击“确认继续流水线”,从审核后的下一段继续 +5. 如需再次修订,则基于最新副本继续形成单线版本链 + +## 4. 信息架构 + +页面采用三栏结构,强调“详情审核”而不是“总览监控”。 + +### 左侧:待审核队列 + +- 搜索 +- 快速筛选:全部、待审核、超时、高优先级、最近更新 +- 订单卡片列表 + +每条卡片至少展示: + +- `order_id` +- `service_mode` +- 当前步骤 +- 等待时长 +- 异常标记 +- 人工介入标签:`可人工介入`、`待确认回流`、`修订中` + +左侧只负责选单,不直接执行审核动作。 + +### 中央:大图审核区 + +中央是页面视觉核心,采用看图优先布局。 + +包括: + +- 顶部状态条 +- 主图预览区 +- 版本链带:`原候选 -> 修订 v2 -> 修订 v3 -> 当前版本` +- 候选图 / 最终图 / 历史版本切换 +- 缩略图带 +- 局部放大检查 + +这个区域必须拿到页面最大面积,用于判断面部、纹理、融合边缘等细节。 + +### 右侧:审核与流程侧栏 + +右侧分为两块: + +- 上半区:审核动作面板 +- 下半区:压缩版流程时间线 + +审核动作包括: + +- `approve` +- `rerun_scene` +- `rerun_face` +- `rerun_fusion` +- 审核备注 +- `导出原图` +- `上传修订稿` +- `确认继续流水线` + +流程侧栏展示: + +- 当前步骤 +- 关键 step 历史 +- 最近重跑节点 +- 错误信息 / 异常状态 +- 人工修订记录:版本号、备注、上传时间、确认继续时间 + +### 顶部:订单状态条 + +顶部只保留最关键的信息: + +- 订单号 +- 客户层级 +- 服务模式 +- 当前步骤 +- 状态 +- SLA / 异常提醒 +- 当前版本号 +- 人工修订链次数 + +### 新建订单入口 + +保留在页面右上角,以按钮进入弹窗或二级页,不占据首页主区域。 + +## 5. 交互设计 + +### 主交互节奏 + +`左侧选单 -> 中央看图 -> 右侧做决定 -> 队列进入下一单或返回列表` + +### 队列交互 + +- 点击订单卡片后,中央与右侧联动刷新 +- 不打开新页面,不跳转 +- 当前选中卡片需要强视觉高亮 + +### 图片查看交互 + +- 默认展示主图 + 缩略图带 +- 主图和版本链联动,支持查看原候选、最新修订版、历史修订版 +- 支持在候选图、最终图、历史版本之间切换 +- 支持局部放大检查 +- 每张图需要有状态标签,例如:`QC 候选`、`历史版本`、`当前选中` + +### 人工修订交互 + +- 仅在中高端流程的人工审核节点展示 +- 点击 `导出原图` 后导出当前选中版本用于线下修订 +- 点击 `上传修订稿` 后创建一个新的副本版本,而不是覆盖原候选 +- 上传成功后不自动开跑,订单进入 `待确认回流` +- 审核员确认新副本无误后,点击 `确认继续流水线` +- 回流从人工审核后的下一段继续,不再让审核员选择 `rerun_*` +- 多轮人工修订采用单线版本链,只保留一个继续流转的最新版本 + +### 审核动作交互 + +- `approve` 是唯一主 CTA +- `rerun_*` 是次级动作,与通过操作明显区分 +- 执行 `rerun_*` 时要求填写备注 +- 如果存在多张候选图,`approve` 前必须明确选中提交对象 + +### 流程查看交互 + +- 右侧默认展示摘要版时间线 +- 重点体现:当前节点、失败节点、重跑来源 +- 在人工修订模式下补充“修订记录”摘要 +- 需要时可展开完整 step 历史 + +### 新建订单交互 + +- 从顶部按钮打开 +- 不抢首页视觉焦点 +- 表单字段直接对应当前后端 `CreateOrderRequest` + +## 6. 视觉方向 + +页面风格应为:`专业、冷静、偏高密度` 的内部工作台。 + +### 视觉原则 + +- 中央主图区优先级最高 +- 使用中性色为主,搭配单一强调色和语义状态色 +- 通过分区背景、边框和留白建立左右区域层次 +- 不使用花哨装饰和重营销化语言 + +### 状态表达 + +- `approve` 使用唯一主强调色 +- `rerun_*` 使用次级语义动作样式 +- 异常、超时、失败步骤在队列和顶部状态条中直接标红提示 +- step 时间线使用统一状态编码:已完成、处理中、等待审核、失败、重跑来源 + +### 动效原则 + +- 仅保留短过渡反馈 +- 不使用重动画 +- 重点反馈:切换订单、切换图片、审核提交状态变化 +- 在人工修订链中,版本切换和“待确认回流”需要有显式状态反馈 + +## 7. 数据映射 + +页面建立在当前已有 API 能力之上。 + +### 左侧队列 + +来源: + +- 待审核列表接口 + +### 顶部状态条 + +来源组合: + +- 订单详情 +- workflow 当前状态 +- 最新副本版本元数据 + +### 中央图片区 + +来源: + +- 订单资产接口 +- 人工修订版本链接口 / 版本元数据 + +### 右侧流程区 + +来源: + +- workflow 状态接口 + +### 审核动作 + +来源: + +- 审核提交接口 +- 导出资产接口 +- 修订稿上传接口 +- 人工确认继续接口 + +`approve` 与 `rerun_*` 复用同一提交接口,仅决策参数不同。 + +人工修订能力需要把“资产版本”和“流程推进”拆成两类操作: + +- 资产操作:导出、上传新副本、查看版本链 +- 流程操作:确认继续流水线 + +## 8. 异常与边界处理 + +- 资产为空时,显示“暂无候选图 / 等待流程产出” +- workflow 拉取失败时,右侧流程区单独报错,不阻塞中央图片区 +- 审核提交失败时,在右侧动作区就近展示错误 +- 订单异常或失败时,队列卡片与顶部状态条同步显式标记 +- 切换订单时,如果备注已编辑未提交,需要提示是否放弃 +- 触发 `rerun_*` 后,页面进入“已发起重跑,等待流程推进”的过渡状态 +- 导出失败时,仅动作区报错,不影响当前订单浏览 +- 上传失败时,不创建新版本节点,仍停留在当前版本 +- 上传成功但未确认继续时,队列中明确标记 `待确认回流` +- 确认继续失败时,保留最新副本版本,允许重试,不回退版本链 +- 同一订单支持多轮人工修订,但只保留一条线性继续链,不允许并行分支 + +## 9. 本次原型范围 + +本次原型只覆盖一个桌面端单页: + +- 待审核队列 +- 详情审核区 +- 流程时间线 +- 资产预览 +- 新建订单入口 +- 人工修订回流模式下的版本链与动作区 + +不包含: + +- 登录 +- 多页后台结构 +- 真实前端联调 +- 响应式移动端适配 + +## 10. 原型验收标准 + +原型应回答以下问题: + +1. 审核员能否快速找到待处理订单 +2. 进入详情后能否高效看图并做决策 +3. 流程与异常信息是否足够支持审核判断 +4. 页面主视觉是否明确体现“看图优先、审核优先” +5. 人工修订后能否清楚看见版本链、待确认状态和继续入口 + +## 11. 推荐原型结构 + +推荐使用单页桌面运营台结构: + +- 左 280px:队列栏 +- 中 1.25x 主区:大图审核区 +- 右 280-320px:审核动作 + 压缩流程区 + +这是当前最匹配项目能力与用户偏好的结构。 + +## 12. 人工修订状态机 + +建议将人工介入抽象成 4 个显式状态: + +- `可人工介入` +- `修订上传完成` +- `待确认回流` +- `已回流继续` + +状态规则: + +- `可人工介入` 时允许导出原图和上传修订稿 +- `修订上传完成 / 待确认回流` 时不允许再上传并行版本 +- `待确认回流` 时右侧主按钮固定为 `确认继续流水线` +- `已回流继续` 后保留完整版本链供回看,后续若再次进入人工审核,则从最新版本继续生成下一版 diff --git a/pyproject.toml b/pyproject.toml index fac0051..4f3a2e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "aiosqlite>=0.20,<1.0", "alembic>=1.13,<2.0", "fastapi>=0.115,<1.0", + "greenlet>=3.1,<4.0", "httpx>=0.27,<1.0", "pydantic>=2.8,<3.0", "pydantic-settings>=2.4,<3.0", diff --git a/tests/test_api.py b/tests/test_api.py index dcd6ac2..c0896d7 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -36,6 +36,25 @@ async def wait_for_step_count(client, order_id: int, step_name: str, minimum_cou ) +async def create_mid_end_order(client): + """Create a standard semi-pro order for review-path tests.""" + + response = await client.post( + "/api/v1/orders", + json={ + "customer_level": "mid", + "service_mode": "semi_pro", + "model_id": 101, + "pose_id": 3, + "garment_asset_id": 9001, + "scene_ref_asset_id": 8001, + }, + ) + + assert response.status_code == 201 + return response.json() + + @pytest.mark.asyncio async def test_healthcheck(api_runtime): """The health endpoint should always respond successfully.""" @@ -91,20 +110,7 @@ async def test_mid_end_order_waits_review_then_approves(api_runtime): """Mid-end orders should pause for review and continue after approval.""" client, env = api_runtime - response = await client.post( - "/api/v1/orders", - json={ - "customer_level": "mid", - "service_mode": "semi_pro", - "model_id": 101, - "pose_id": 3, - "garment_asset_id": 9001, - "scene_ref_asset_id": 8001, - }, - ) - - assert response.status_code == 201 - payload = response.json() + payload = await create_mid_end_order(client) await wait_for_workflow_status(client, payload["order_id"], "waiting_review") @@ -140,20 +146,7 @@ async def test_mid_end_rerun_paths_return_to_review(api_runtime, decision: str, """Each rerun decision should branch back to the correct step and pause again for review.""" client, env = api_runtime - response = await client.post( - "/api/v1/orders", - json={ - "customer_level": "mid", - "service_mode": "semi_pro", - "model_id": 101, - "pose_id": 3, - "garment_asset_id": 9001, - "scene_ref_asset_id": 8001, - }, - ) - - assert response.status_code == 201 - payload = response.json() + payload = await create_mid_end_order(client) await wait_for_workflow_status(client, payload["order_id"], "waiting_review") @@ -163,8 +156,9 @@ async def test_mid_end_rerun_paths_return_to_review(api_runtime, decision: str, ) assert review_response.status_code == 200 - workflow_payload = await wait_for_step_count(client, payload["order_id"], expected_step, 2) - workflow_payload = await wait_for_step_count(client, payload["order_id"], "review", 2) + await wait_for_step_count(client, payload["order_id"], expected_step, 2) + await wait_for_step_count(client, payload["order_id"], "review", 2) + workflow_payload = await wait_for_workflow_status(client, payload["order_id"], "waiting_review") assert workflow_payload["workflow_status"] == "waiting_review" approve_response = await client.post( @@ -176,3 +170,303 @@ async def test_mid_end_rerun_paths_return_to_review(api_runtime, decision: str, handle = env.client.get_workflow_handle(payload["workflow_id"]) result = await handle.result() assert result["status"] == "succeeded" + + +@pytest.mark.asyncio +async def test_mid_end_order_registers_manual_revision_and_updates_pending_queue(api_runtime): + """Registering a manual revision should keep the workflow paused and mark the queue item accordingly.""" + + client, _ = api_runtime + payload = await create_mid_end_order(client) + + workflow_payload = await wait_for_workflow_status(client, payload["order_id"], "waiting_review") + parent_asset_id = workflow_payload["steps"][-2]["output_json"]["candidate_asset_ids"][0] + + register_response = await client.post( + f"/api/v1/orders/{payload['order_id']}/revisions", + json={ + "parent_asset_id": parent_asset_id, + "uploaded_uri": "mock://manual-revision-v1", + "reviewer_id": 88, + "comment": "人工修订第一版", + }, + ) + + assert register_response.status_code == 201 + register_payload = register_response.json() + assert register_payload["order_id"] == payload["order_id"] + assert register_payload["parent_asset_id"] == parent_asset_id + assert register_payload["root_asset_id"] == parent_asset_id + assert register_payload["version_no"] == 1 + assert register_payload["review_task_status"] == "revision_uploaded" + + pending_response = await client.get("/api/v1/reviews/pending") + assert pending_response.status_code == 200 + queue_item = next(item for item in pending_response.json() if item["order_id"] == payload["order_id"]) + assert queue_item["review_task_status"] == "revision_uploaded" + assert queue_item["latest_revision_asset_id"] == register_payload["asset_id"] + assert queue_item["revision_count"] == 1 + + +@pytest.mark.asyncio +async def test_mid_end_order_lists_single_line_revision_chain(api_runtime): + """Listing revisions should return the uploaded manual revision chain in version order.""" + + client, _ = api_runtime + payload = await create_mid_end_order(client) + + workflow_payload = await wait_for_workflow_status(client, payload["order_id"], "waiting_review") + root_asset_id = workflow_payload["steps"][-2]["output_json"]["candidate_asset_ids"][0] + + first_response = await client.post( + f"/api/v1/orders/{payload['order_id']}/revisions", + json={ + "parent_asset_id": root_asset_id, + "uploaded_uri": "mock://manual-revision-v1", + "reviewer_id": 88, + "comment": "人工修订第一版", + }, + ) + assert first_response.status_code == 201 + first_payload = first_response.json() + + second_response = await client.post( + f"/api/v1/orders/{payload['order_id']}/revisions", + json={ + "parent_asset_id": first_payload["asset_id"], + "uploaded_uri": "mock://manual-revision-v2", + "reviewer_id": 88, + "comment": "人工修订第二版", + }, + ) + assert second_response.status_code == 201 + second_payload = second_response.json() + + chain_response = await client.get(f"/api/v1/orders/{payload['order_id']}/revisions") + assert chain_response.status_code == 200 + chain_payload = chain_response.json() + + assert chain_payload["order_id"] == payload["order_id"] + assert [item["asset_id"] for item in chain_payload["items"]] == [ + first_payload["asset_id"], + second_payload["asset_id"], + ] + assert [item["version_no"] for item in chain_payload["items"]] == [1, 2] + assert chain_payload["items"][0]["parent_asset_id"] == root_asset_id + assert chain_payload["items"][1]["parent_asset_id"] == first_payload["asset_id"] + assert chain_payload["items"][-1]["is_current_version"] is True + + +@pytest.mark.asyncio +async def test_mid_end_order_confirms_manual_revision_and_exports_revision_asset(api_runtime): + """Confirming a manual revision should resume the workflow and export the revision asset.""" + + client, env = api_runtime + payload = await create_mid_end_order(client) + + workflow_payload = await wait_for_workflow_status(client, payload["order_id"], "waiting_review") + parent_asset_id = workflow_payload["steps"][-2]["output_json"]["candidate_asset_ids"][0] + + register_response = await client.post( + f"/api/v1/orders/{payload['order_id']}/revisions", + json={ + "parent_asset_id": parent_asset_id, + "uploaded_uri": "mock://manual-revision-v1", + "reviewer_id": 88, + "comment": "人工修订第一版", + }, + ) + assert register_response.status_code == 201 + register_payload = register_response.json() + + confirm_response = await client.post( + f"/api/v1/reviews/{payload['order_id']}/confirm-revision", + json={ + "reviewer_id": 88, + "comment": "确认继续流水线", + }, + ) + + assert confirm_response.status_code == 200 + confirm_payload = confirm_response.json() + assert confirm_payload["revision_asset_id"] == register_payload["asset_id"] + assert confirm_payload["decision"] == "approve" + assert confirm_payload["status"] == "submitted" + + handle = env.client.get_workflow_handle(payload["workflow_id"]) + result = await handle.result() + assert result["status"] == "succeeded" + assert result["final_asset_id"] is not None + + order_response = await client.get(f"/api/v1/orders/{payload['order_id']}") + assert order_response.status_code == 200 + order_payload = order_response.json() + assert order_payload["status"] == "succeeded" + assert order_payload["final_asset"]["asset_type"] == "final" + assert order_payload["final_asset"]["metadata_json"]["source_asset_id"] == register_payload["asset_id"] + + +@pytest.mark.asyncio +async def test_orders_list_returns_recent_orders_with_revision_summary(api_runtime): + """Orders list should expose pagination metadata, filtering, and revision summary fields.""" + + client, env = api_runtime + + low_order = await client.post( + "/api/v1/orders", + json={ + "customer_level": "low", + "service_mode": "auto_basic", + "model_id": 201, + "pose_id": 11, + "garment_asset_id": 9101, + "scene_ref_asset_id": 8101, + }, + ) + assert low_order.status_code == 201 + low_payload = low_order.json() + await env.client.get_workflow_handle(low_payload["workflow_id"]).result() + + mid_payload = await create_mid_end_order(client) + workflow_payload = await wait_for_workflow_status(client, mid_payload["order_id"], "waiting_review") + parent_asset_id = workflow_payload["steps"][-2]["output_json"]["candidate_asset_ids"][0] + + register_response = await client.post( + f"/api/v1/orders/{mid_payload['order_id']}/revisions", + json={ + "parent_asset_id": parent_asset_id, + "uploaded_uri": "mock://manual-revision-v1", + "reviewer_id": 99, + "comment": "人工修订第一版", + }, + ) + assert register_response.status_code == 201 + + list_response = await client.get("/api/v1/orders", params={"page": 1, "limit": 1}) + assert list_response.status_code == 200 + first_page = list_response.json() + + assert first_page["page"] == 1 + assert first_page["limit"] == 1 + assert first_page["total"] == 2 + assert first_page["total_pages"] == 2 + assert [item["order_id"] for item in first_page["items"]] == [mid_payload["order_id"]] + assert first_page["items"][0]["workflow_id"] == mid_payload["workflow_id"] + assert first_page["items"][0]["review_task_status"] == "revision_uploaded" + assert first_page["items"][0]["latest_revision_version"] == 1 + assert first_page["items"][0]["revision_count"] == 1 + assert first_page["items"][0]["pending_manual_confirm"] is True + + second_page_response = await client.get("/api/v1/orders", params={"page": 2, "limit": 1}) + assert second_page_response.status_code == 200 + second_page = second_page_response.json() + assert second_page["page"] == 2 + assert second_page["limit"] == 1 + assert second_page["total"] == 2 + assert second_page["total_pages"] == 2 + assert [item["order_id"] for item in second_page["items"]] == [low_payload["order_id"]] + assert second_page["items"][0]["status"] == "succeeded" + + filtered_response = await client.get( + "/api/v1/orders", params={"page": 1, "limit": 10, "status": "waiting_review"} + ) + assert filtered_response.status_code == 200 + filtered_payload = filtered_response.json() + assert filtered_payload["page"] == 1 + assert filtered_payload["limit"] == 10 + assert filtered_payload["total"] == 1 + assert filtered_payload["total_pages"] == 1 + assert [item["order_id"] for item in filtered_payload["items"]] == [mid_payload["order_id"]] + + query_response = await client.get( + "/api/v1/orders", + params={"page": 1, "limit": 10, "query": mid_payload["workflow_id"]}, + ) + assert query_response.status_code == 200 + query_payload = query_response.json() + assert query_payload["total"] == 1 + assert [item["order_id"] for item in query_payload["items"]] == [mid_payload["order_id"]] + + +@pytest.mark.asyncio +async def test_workflows_list_returns_recent_runs_with_failure_count(api_runtime): + """Workflow list should expose pagination metadata, filtering, and revision summary.""" + + client, env = api_runtime + + low_order = await client.post( + "/api/v1/orders", + json={ + "customer_level": "low", + "service_mode": "auto_basic", + "model_id": 301, + "pose_id": 21, + "garment_asset_id": 9201, + "scene_ref_asset_id": 8201, + }, + ) + assert low_order.status_code == 201 + low_payload = low_order.json() + await env.client.get_workflow_handle(low_payload["workflow_id"]).result() + + mid_payload = await create_mid_end_order(client) + workflow_payload = await wait_for_workflow_status(client, mid_payload["order_id"], "waiting_review") + parent_asset_id = workflow_payload["steps"][-2]["output_json"]["candidate_asset_ids"][0] + + register_response = await client.post( + f"/api/v1/orders/{mid_payload['order_id']}/revisions", + json={ + "parent_asset_id": parent_asset_id, + "uploaded_uri": "mock://manual-revision-v1", + "reviewer_id": 77, + "comment": "人工修订第一版", + }, + ) + assert register_response.status_code == 201 + + list_response = await client.get("/api/v1/workflows", params={"page": 1, "limit": 1}) + assert list_response.status_code == 200 + first_page = list_response.json() + + assert first_page["page"] == 1 + assert first_page["limit"] == 1 + assert first_page["total"] == 2 + assert first_page["total_pages"] == 2 + assert [item["order_id"] for item in first_page["items"]] == [mid_payload["order_id"]] + assert first_page["items"][0]["workflow_id"] == mid_payload["workflow_id"] + assert first_page["items"][0]["workflow_status"] == "waiting_review" + assert first_page["items"][0]["review_task_status"] == "revision_uploaded" + assert first_page["items"][0]["latest_revision_version"] == 1 + assert first_page["items"][0]["revision_count"] == 1 + assert first_page["items"][0]["pending_manual_confirm"] is True + assert first_page["items"][0]["failure_count"] == 0 + + second_page_response = await client.get("/api/v1/workflows", params={"page": 2, "limit": 1}) + assert second_page_response.status_code == 200 + second_page = second_page_response.json() + assert second_page["page"] == 2 + assert second_page["limit"] == 1 + assert second_page["total"] == 2 + assert second_page["total_pages"] == 2 + assert [item["order_id"] for item in second_page["items"]] == [low_payload["order_id"]] + assert second_page["items"][0]["workflow_status"] == "succeeded" + + filtered_response = await client.get( + "/api/v1/workflows", params={"page": 1, "limit": 10, "status": "waiting_review"} + ) + assert filtered_response.status_code == 200 + filtered_payload = filtered_response.json() + assert filtered_payload["page"] == 1 + assert filtered_payload["limit"] == 10 + assert filtered_payload["total"] == 1 + assert filtered_payload["total_pages"] == 1 + assert [item["order_id"] for item in filtered_payload["items"]] == [mid_payload["order_id"]] + + query_response = await client.get( + "/api/v1/workflows", + params={"page": 1, "limit": 10, "query": str(low_payload["order_id"])}, + ) + assert query_response.status_code == 200 + query_payload = query_response.json() + assert query_payload["total"] == 1 + assert [item["order_id"] for item in query_payload["items"]] == [low_payload["order_id"]]