183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
"""低端图片流水线工作流。
|
||
|
||
这个文件里的 `workflow` 只负责“编排顺序”和“等待结果”,
|
||
不直接访问数据库、不做真实 I/O。
|
||
真正的落库和 mock 执行都放在 activity 里完成。
|
||
"""
|
||
|
||
from datetime import timedelta
|
||
|
||
from temporalio import workflow
|
||
from temporalio.common import RetryPolicy
|
||
|
||
# Temporal workflow 代码需要尽量保持可重放(deterministic)。
|
||
# 这里导入的模块会在 workflow 外部执行,所以用 imports_passed_through
|
||
# 明确告诉 SDK:这些导入不是 workflow 重放逻辑的一部分。
|
||
with workflow.unsafe.imports_passed_through():
|
||
from app.domain.enums import OrderStatus, WorkflowStepName
|
||
from app.infra.temporal.task_queues import (
|
||
IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
|
||
IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
|
||
IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
|
||
IMAGE_PIPELINE_QC_TASK_QUEUE,
|
||
)
|
||
from app.workers.activities.export_activities import run_export_activity
|
||
from app.workers.activities.qc_activities import run_qc_activity
|
||
from app.workers.activities.review_activities import mark_workflow_failed_activity
|
||
from app.workers.activities.scene_activities import run_scene_activity
|
||
from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity
|
||
from app.workers.workflows.types import (
|
||
PipelineWorkflowInput,
|
||
StepActivityInput,
|
||
WorkflowFailureActivityInput,
|
||
)
|
||
|
||
|
||
ACTIVITY_TIMEOUT = timedelta(seconds=30)
|
||
ACTIVITY_RETRY_POLICY = RetryPolicy(
|
||
initial_interval=timedelta(seconds=1),
|
||
backoff_coefficient=2.0,
|
||
maximum_attempts=3,
|
||
)
|
||
|
||
|
||
@workflow.defn
|
||
class LowEndPipelineWorkflow:
|
||
"""低端全自动工作流。
|
||
|
||
它对应的是一条从头跑到尾、不需要人工介入的流水线:
|
||
prepare_model -> tryon -> scene -> qc -> export
|
||
"""
|
||
|
||
@workflow.run
|
||
async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]:
|
||
"""执行低端工作流主流程。
|
||
|
||
可以把这里理解成“时序控制器”:
|
||
1. 按顺序调 activity
|
||
2. 把上一步产物传给下一步
|
||
3. 出错时统一标记 workflow 失败
|
||
"""
|
||
|
||
# current_step 用来在异常时把失败位置持久化到数据库。
|
||
current_step = WorkflowStepName.PREPARE_MODEL
|
||
try:
|
||
# 每个步骤都通过 execute_activity 触发真正执行。
|
||
# workflow 自己不做计算,只负责调度。
|
||
prepared = await workflow.execute_activity(
|
||
prepare_model_activity,
|
||
StepActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
step_name=WorkflowStepName.PREPARE_MODEL,
|
||
model_id=payload.model_id,
|
||
pose_id=payload.pose_id,
|
||
garment_asset_id=payload.garment_asset_id,
|
||
scene_ref_asset_id=payload.scene_ref_asset_id,
|
||
),
|
||
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|
||
|
||
current_step = WorkflowStepName.TRYON
|
||
# 下游步骤通过 source_asset_id 引用上一步生成的资产。
|
||
tryon_result = await workflow.execute_activity(
|
||
run_tryon_activity,
|
||
StepActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
step_name=WorkflowStepName.TRYON,
|
||
source_asset_id=prepared.asset_id,
|
||
garment_asset_id=payload.garment_asset_id,
|
||
),
|
||
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|
||
|
||
current_step = WorkflowStepName.SCENE
|
||
scene_result = await workflow.execute_activity(
|
||
run_scene_activity,
|
||
StepActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
step_name=WorkflowStepName.SCENE,
|
||
source_asset_id=tryon_result.asset_id,
|
||
scene_ref_asset_id=payload.scene_ref_asset_id,
|
||
),
|
||
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|
||
|
||
current_step = WorkflowStepName.QC
|
||
# QC 是流程里的“闸门”。
|
||
# 如果这里不通过,低端流程直接失败,不会再 export。
|
||
qc_result = await workflow.execute_activity(
|
||
run_qc_activity,
|
||
StepActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
step_name=WorkflowStepName.QC,
|
||
source_asset_id=scene_result.asset_id,
|
||
),
|
||
task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|
||
|
||
if not qc_result.passed:
|
||
await self._mark_failed(payload, current_step, qc_result.message)
|
||
return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None}
|
||
|
||
current_step = WorkflowStepName.EXPORT
|
||
# candidate_asset_ids 是 QC 推荐可导出的候选资产。
|
||
# 当前 MVP 只会返回一个候选;如果没有,就退回 scene 结果导出。
|
||
final_result = await workflow.execute_activity(
|
||
run_export_activity,
|
||
StepActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
step_name=WorkflowStepName.EXPORT,
|
||
source_asset_id=(qc_result.candidate_asset_ids or [scene_result.asset_id])[0],
|
||
),
|
||
task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|
||
return {
|
||
"order_id": payload.order_id,
|
||
"status": OrderStatus.SUCCEEDED.value,
|
||
"final_asset_id": final_result.asset_id,
|
||
}
|
||
except Exception as exc:
|
||
# workflow 出异常时,额外调一个 activity 把数据库状态补齐。
|
||
await self._mark_failed(payload, current_step, str(exc))
|
||
raise
|
||
|
||
async def _mark_failed(
|
||
self,
|
||
payload: PipelineWorkflowInput,
|
||
current_step: WorkflowStepName,
|
||
message: str,
|
||
) -> None:
|
||
"""持久化失败状态。
|
||
|
||
注意这里仍然通过 activity 落库,而不是在 workflow 里直连数据库。
|
||
这样能保持 workflow 的职责单一:只编排,不做外部副作用。
|
||
"""
|
||
|
||
await workflow.execute_activity(
|
||
mark_workflow_failed_activity,
|
||
WorkflowFailureActivityInput(
|
||
order_id=payload.order_id,
|
||
workflow_run_id=payload.workflow_run_id,
|
||
current_step=current_step,
|
||
message=message,
|
||
),
|
||
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
|
||
start_to_close_timeout=ACTIVITY_TIMEOUT,
|
||
retry_policy=ACTIVITY_RETRY_POLICY,
|
||
)
|