Files

72 lines
1.9 KiB
Python

"""Workflow API schemas."""
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict
from app.domain.enums import OrderStatus, ReviewTaskStatus, StepStatus, WorkflowStepName
class WorkflowStepRead(BaseModel):
"""Serialized workflow step record."""
model_config = ConfigDict(from_attributes=True)
id: int
workflow_run_id: int
step_name: WorkflowStepName
step_status: StepStatus
input_json: dict[str, Any] | None
output_json: dict[str, Any] | None
error_message: str | None
started_at: datetime
ended_at: datetime | None
class WorkflowStatusResponse(BaseModel):
"""Serialized workflow run details."""
order_id: int
workflow_id: str
workflow_type: str
workflow_status: OrderStatus
current_step: WorkflowStepName | None
current_revision_asset_id: int | None = None
current_revision_version: int | None = None
latest_revision_asset_id: int | None = None
latest_revision_version: int | None = None
revision_count: int = 0
review_task_status: ReviewTaskStatus | None = None
pending_manual_confirm: bool = False
steps: list[WorkflowStepRead]
created_at: datetime
updated_at: datetime
class WorkflowListItemResponse(BaseModel):
"""Workflow list item response for workflow home screens."""
order_id: int
workflow_id: str
workflow_type: str
workflow_status: OrderStatus
current_step: WorkflowStepName | None
updated_at: datetime
failure_count: int
review_task_status: ReviewTaskStatus | None = None
latest_revision_asset_id: int | None = None
latest_revision_version: int | None = None
revision_count: int = 0
pending_manual_confirm: bool = False
class WorkflowListResponse(BaseModel):
"""Workflow list response."""
page: int
limit: int
total: int
total_pages: int
items: list[WorkflowListItemResponse]