"""Temporal worker 启动入口。 可以把 worker 理解成 Temporal 的“执行器进程”: 它负责监听 task queue,然后真正执行 workflow / activity。 """ import asyncio from contextlib import AsyncExitStack from temporalio.client import Client from temporalio.worker import Worker from app.infra.temporal.client import get_temporal_client from app.infra.temporal.task_queues import ( IMAGE_PIPELINE_CONTROL_TASK_QUEUE, IMAGE_PIPELINE_EXPORT_TASK_QUEUE, IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, IMAGE_PIPELINE_QC_TASK_QUEUE, ) from app.workers.activities.export_activities import run_export_activity from app.workers.activities.face_activities import run_face_activity from app.workers.activities.fusion_activities import run_fusion_activity from app.workers.activities.qc_activities import run_qc_activity from app.workers.activities.review_activities import ( complete_review_wait_activity, mark_waiting_for_review_activity, mark_workflow_failed_activity, ) from app.workers.activities.scene_activities import run_scene_activity from app.workers.activities.texture_activities import run_texture_activity from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity from app.workers.workflows.low_end_pipeline import LowEndPipelineWorkflow from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow def build_workers(client: Client) -> list[Worker]: """创建本项目需要的 worker 集合。 这里按 task queue 拆 worker,目的是把不同类型的任务分开: - control: 流程控制、review、失败收尾 - image-gen: tryon / scene - post-process: texture / face / fusion - qc: 质检 - export: 导出 """ return [ # control 队列负责 workflow 本体,以及少量“流程状态管理型” activity。 Worker( client, task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, workflows=[LowEndPipelineWorkflow, MidEndPipelineWorkflow], activities=[ prepare_model_activity, mark_waiting_for_review_activity, complete_review_wait_activity, mark_workflow_failed_activity, ], ), # image-gen 队列放生成类步骤,便于后续横向扩容。 Worker( client, task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, activities=[run_tryon_activity, run_scene_activity], ), # post-process 队列放增强/融合类步骤。 Worker( client, task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, activities=[run_texture_activity, run_face_activity, run_fusion_activity], ), Worker( client, task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE, activities=[run_qc_activity], ), Worker( client, task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE, activities=[run_export_activity], ), ] async def run_workers() -> None: """启动全部 worker,并保持进程常驻。""" client = await get_temporal_client() workers = build_workers(client) async with AsyncExitStack() as stack: for worker in workers: # 逐个把 worker 注册到上下文里,退出时会自动优雅关闭。 await stack.enter_async_context(worker) # 用一个永不触发的 Event 让进程保持存活。 await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(run_workers())