"""低端图片流水线工作流。 这个文件里的 `workflow` 只负责“编排顺序”和“等待结果”, 不直接访问数据库、不做真实 I/O。 真正的落库和 mock 执行都放在 activity 里完成。 """ from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy # Temporal workflow 代码需要尽量保持可重放(deterministic)。 # 这里导入的模块会在 workflow 外部执行,所以用 imports_passed_through # 明确告诉 SDK:这些导入不是 workflow 重放逻辑的一部分。 with workflow.unsafe.imports_passed_through(): from app.domain.enums import OrderStatus, WorkflowStepName from app.infra.temporal.task_queues import ( IMAGE_PIPELINE_CONTROL_TASK_QUEUE, IMAGE_PIPELINE_EXPORT_TASK_QUEUE, IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, IMAGE_PIPELINE_QC_TASK_QUEUE, ) from app.workers.activities.export_activities import run_export_activity from app.workers.activities.qc_activities import run_qc_activity from app.workers.activities.review_activities import mark_workflow_failed_activity from app.workers.activities.scene_activities import run_scene_activity from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity from app.workers.workflows.timeout_policy import DEFAULT_ACTIVITY_TIMEOUT, activity_timeout_for_task_queue from app.workers.workflows.types import ( PipelineWorkflowInput, StepActivityInput, WorkflowFailureActivityInput, ) ACTIVITY_RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2.0, maximum_attempts=3, ) @workflow.defn class LowEndPipelineWorkflow: """低端全自动工作流。 它对应的是一条从头跑到尾、不需要人工介入的流水线: prepare_model -> tryon -> scene -> qc -> export """ @workflow.run async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]: """执行低端工作流主流程。 可以把这里理解成“时序控制器”: 1. 按顺序调 activity 2. 把上一步产物传给下一步 3. 出错时统一标记 workflow 失败 """ # current_step 用来在异常时把失败位置持久化到数据库。 current_step = WorkflowStepName.PREPARE_MODEL try: # 每个步骤都通过 execute_activity 触发真正执行。 # workflow 自己不做计算,只负责调度。 # prepare_model 的职责是把“模特资源 + 订单上下文”整理成后续可消费的人物底图。 # 这一步产出的 prepared.asset_id 会作为 tryon 的 source_asset_id。 prepared = await workflow.execute_activity( prepare_model_activity, StepActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, step_name=WorkflowStepName.PREPARE_MODEL, model_id=payload.model_id, pose_id=payload.pose_id, garment_asset_id=payload.garment_asset_id, scene_ref_asset_id=payload.scene_ref_asset_id, ), task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_CONTROL_TASK_QUEUE), retry_policy=ACTIVITY_RETRY_POLICY, ) current_step = WorkflowStepName.TRYON # 下游步骤通过 source_asset_id 引用上一步生成的资产。 # tryon 是换装主步骤:把 prepared 模特图和 garment_asset_id 组合成试衣结果。 # 它产出的 tryon_result.asset_id 是后续 scene / qc 的基础输入。 tryon_result = await workflow.execute_activity( run_tryon_activity, StepActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, step_name=WorkflowStepName.TRYON, source_asset_id=prepared.asset_id, garment_asset_id=payload.garment_asset_id, ), task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE), retry_policy=ACTIVITY_RETRY_POLICY, ) scene_source_asset_id = tryon_result.asset_id if payload.scene_ref_asset_id is not None: current_step = WorkflowStepName.SCENE # scene 是可选步骤。 # 有场景图时,用 scene_ref_asset_id 把 tryon 结果合成到目标背景; # 没有场景图时,直接沿用 tryon 结果继续往下走。 scene_result = await workflow.execute_activity( run_scene_activity, StepActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, step_name=WorkflowStepName.SCENE, source_asset_id=tryon_result.asset_id, scene_ref_asset_id=payload.scene_ref_asset_id, ), task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE), retry_policy=ACTIVITY_RETRY_POLICY, ) scene_source_asset_id = scene_result.asset_id current_step = WorkflowStepName.QC # QC 是流程里的“闸门”。 # 如果这里不通过,低端流程直接失败,不会再 export。 # source_asset_id 指向当前链路里“最接近最终成品”的那张图: # 有场景时是 scene 结果,没有场景时就是 tryon 结果。 qc_result = await workflow.execute_activity( run_qc_activity, StepActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, step_name=WorkflowStepName.QC, source_asset_id=scene_source_asset_id, ), task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE, start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_QC_TASK_QUEUE), retry_policy=ACTIVITY_RETRY_POLICY, ) if not qc_result.passed: await self._mark_failed(payload, current_step, qc_result.message) return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} current_step = WorkflowStepName.EXPORT # candidate_asset_ids 是 QC 推荐可导出的候选资产。 # 当前 MVP 只会返回一个候选;如果没有,就退回 scene 结果导出。 # export 是最后的收口步骤:生成最终交付资产,并把订单状态写成 succeeded。 final_result = await workflow.execute_activity( run_export_activity, StepActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, step_name=WorkflowStepName.EXPORT, source_asset_id=(qc_result.candidate_asset_ids or [scene_source_asset_id])[0], ), task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE, start_to_close_timeout=activity_timeout_for_task_queue(IMAGE_PIPELINE_EXPORT_TASK_QUEUE), retry_policy=ACTIVITY_RETRY_POLICY, ) return { "order_id": payload.order_id, "status": OrderStatus.SUCCEEDED.value, "final_asset_id": final_result.asset_id, } except Exception as exc: # workflow 出异常时,额外调一个 activity 把数据库状态补齐。 await self._mark_failed(payload, current_step, str(exc)) raise async def _mark_failed( self, payload: PipelineWorkflowInput, current_step: WorkflowStepName, message: str, ) -> None: """持久化失败状态。 注意这里仍然通过 activity 落库,而不是在 workflow 里直连数据库。 这样能保持 workflow 的职责单一:只编排,不做外部副作用。 """ await workflow.execute_activity( mark_workflow_failed_activity, WorkflowFailureActivityInput( order_id=payload.order_id, workflow_run_id=payload.workflow_run_id, current_step=current_step, message=message, ), task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, start_to_close_timeout=DEFAULT_ACTIVITY_TIMEOUT, retry_policy=ACTIVITY_RETRY_POLICY, )