Files
auto-virtual-tryon/app/workers/workflows/low_end_pipeline.py

195 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""低端图片流水线工作流。
这个文件里的 `workflow` 只负责“编排顺序”和“等待结果”,
不直接访问数据库、不做真实 I/O。
真正的落库和 mock 执行都放在 activity 里完成。
"""
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
# Temporal workflow 代码需要尽量保持可重放deterministic
# 这里导入的模块会在 workflow 外部执行,所以用 imports_passed_through
# 明确告诉 SDK这些导入不是 workflow 重放逻辑的一部分。
with workflow.unsafe.imports_passed_through():
from app.domain.enums import OrderStatus, WorkflowStepName
from app.infra.temporal.task_queues import (
IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
IMAGE_PIPELINE_QC_TASK_QUEUE,
)
from app.workers.activities.export_activities import run_export_activity
from app.workers.activities.qc_activities import run_qc_activity
from app.workers.activities.review_activities import mark_workflow_failed_activity
from app.workers.activities.scene_activities import run_scene_activity
from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity
from app.workers.workflows.timeout_policy import DEFAULT_ACTIVITY_TIMEOUT, activity_timeout_for_task_queue
from app.workers.workflows.types import (
PipelineWorkflowInput,
StepActivityInput,
WorkflowFailureActivityInput,
)
ACTIVITY_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_attempts=3,
)
@workflow.defn
class LowEndPipelineWorkflow:
"""低端全自动工作流。
它对应的是一条从头跑到尾、不需要人工介入的流水线:
prepare_model -> tryon -> scene -> qc -> export
"""
@workflow.run
async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]:
"""执行低端工作流主流程。
可以把这里理解成“时序控制器”:
1. 按顺序调 activity
2. 把上一步产物传给下一步
3. 出错时统一标记 workflow 失败
"""
# current_step 用来在异常时把失败位置持久化到数据库。
current_step = WorkflowStepName.PREPARE_MODEL
try:
# 每个步骤都通过 execute_activity 触发真正执行。
# workflow 自己不做计算,只负责调度。
# prepare_model 的职责是把“模特资源 + 订单上下文”整理成后续可消费的人物底图。
# 这一步产出的 prepared.asset_id 会作为 tryon 的 source_asset_id。
prepared = await workflow.execute_activity(
prepare_model_activity,
StepActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
step_name=WorkflowStepName.PREPARE_MODEL,
model_id=payload.model_id,
pose_id=payload.pose_id,
garment_asset_id=payload.garment_asset_id,
scene_ref_asset_id=payload.scene_ref_asset_id,
),
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_CONTROL_TASK_QUEUE),
retry_policy=ACTIVITY_RETRY_POLICY,
)
current_step = WorkflowStepName.TRYON
# 下游步骤通过 source_asset_id 引用上一步生成的资产。
# tryon 是换装主步骤:把 prepared 模特图和 garment_asset_id 组合成试衣结果。
# 它产出的 tryon_result.asset_id 是后续 scene / qc 的基础输入。
tryon_result = await workflow.execute_activity(
run_tryon_activity,
StepActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
step_name=WorkflowStepName.TRYON,
source_asset_id=prepared.asset_id,
garment_asset_id=payload.garment_asset_id,
),
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE),
retry_policy=ACTIVITY_RETRY_POLICY,
)
scene_source_asset_id = tryon_result.asset_id
if payload.scene_ref_asset_id is not None:
current_step = WorkflowStepName.SCENE
# scene 是可选步骤。
# 有场景图时,用 scene_ref_asset_id 把 tryon 结果合成到目标背景;
# 没有场景图时,直接沿用 tryon 结果继续往下走。
scene_result = await workflow.execute_activity(
run_scene_activity,
StepActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
step_name=WorkflowStepName.SCENE,
source_asset_id=tryon_result.asset_id,
scene_ref_asset_id=payload.scene_ref_asset_id,
),
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE),
retry_policy=ACTIVITY_RETRY_POLICY,
)
scene_source_asset_id = scene_result.asset_id
current_step = WorkflowStepName.QC
# QC 是流程里的“闸门”。
# 如果这里不通过,低端流程直接失败,不会再 export。
# source_asset_id 指向当前链路里“最接近最终成品”的那张图:
# 有场景时是 scene 结果,没有场景时就是 tryon 结果。
qc_result = await workflow.execute_activity(
run_qc_activity,
StepActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
step_name=WorkflowStepName.QC,
source_asset_id=scene_source_asset_id,
),
task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE,
start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_QC_TASK_QUEUE),
retry_policy=ACTIVITY_RETRY_POLICY,
)
if not qc_result.passed:
await self._mark_failed(payload, current_step, qc_result.message)
return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None}
current_step = WorkflowStepName.EXPORT
# candidate_asset_ids 是 QC 推荐可导出的候选资产。
# 当前 MVP 只会返回一个候选;如果没有,就退回 scene 结果导出。
# export 是最后的收口步骤:生成最终交付资产,并把订单状态写成 succeeded。
final_result = await workflow.execute_activity(
run_export_activity,
StepActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
step_name=WorkflowStepName.EXPORT,
source_asset_id=(qc_result.candidate_asset_ids or [scene_source_asset_id])[0],
),
task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_EXPORT_TASK_QUEUE),
retry_policy=ACTIVITY_RETRY_POLICY,
)
return {
"order_id": payload.order_id,
"status": OrderStatus.SUCCEEDED.value,
"final_asset_id": final_result.asset_id,
}
except Exception as exc:
# workflow 出异常时,额外调一个 activity 把数据库状态补齐。
await self._mark_failed(payload, current_step, str(exc))
raise
async def _mark_failed(
self,
payload: PipelineWorkflowInput,
current_step: WorkflowStepName,
message: str,
) -> None:
"""持久化失败状态。
注意这里仍然通过 activity 落库,而不是在 workflow 里直连数据库。
这样能保持 workflow 的职责单一:只编排,不做外部副作用。
"""
await workflow.execute_activity(
mark_workflow_failed_activity,
WorkflowFailureActivityInput(
order_id=payload.order_id,
workflow_run_id=payload.workflow_run_id,
current_step=current_step,
message=message,
),
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
start_to_close_timeout=DEFAULT_ACTIVITY_TIMEOUT,
retry_policy=ACTIVITY_RETRY_POLICY,
)