101 lines
3.6 KiB
Python
101 lines
3.6 KiB
Python
"""Temporal worker 启动入口。
|
||
|
||
可以把 worker 理解成 Temporal 的“执行器进程”:
|
||
它负责监听 task queue,然后真正执行 workflow / activity。
|
||
"""
|
||
|
||
import asyncio
|
||
from contextlib import AsyncExitStack
|
||
|
||
from temporalio.client import Client
|
||
from temporalio.worker import Worker
|
||
|
||
from app.infra.temporal.client import get_temporal_client
|
||
from app.infra.temporal.task_queues import (
|
||
IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
|
||
IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
|
||
IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
|
||
IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE,
|
||
IMAGE_PIPELINE_QC_TASK_QUEUE,
|
||
)
|
||
from app.workers.activities.export_activities import run_export_activity
|
||
from app.workers.activities.face_activities import run_face_activity
|
||
from app.workers.activities.fusion_activities import run_fusion_activity
|
||
from app.workers.activities.qc_activities import run_qc_activity
|
||
from app.workers.activities.review_activities import (
|
||
complete_review_wait_activity,
|
||
mark_waiting_for_review_activity,
|
||
mark_workflow_failed_activity,
|
||
)
|
||
from app.workers.activities.scene_activities import run_scene_activity
|
||
from app.workers.activities.texture_activities import run_texture_activity
|
||
from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity
|
||
from app.workers.workflows.low_end_pipeline import LowEndPipelineWorkflow
|
||
from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow
|
||
|
||
|
||
def build_workers(client: Client) -> list[Worker]:
|
||
"""创建本项目需要的 worker 集合。
|
||
|
||
这里按 task queue 拆 worker,目的是把不同类型的任务分开:
|
||
- control: 流程控制、review、失败收尾
|
||
- image-gen: tryon / scene
|
||
- post-process: texture / face / fusion
|
||
- qc: 质检
|
||
- export: 导出
|
||
"""
|
||
|
||
return [
|
||
# control 队列负责 workflow 本体,以及少量“流程状态管理型” activity。
|
||
Worker(
|
||
client,
|
||
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
|
||
workflows=[LowEndPipelineWorkflow, MidEndPipelineWorkflow],
|
||
activities=[
|
||
prepare_model_activity,
|
||
mark_waiting_for_review_activity,
|
||
complete_review_wait_activity,
|
||
mark_workflow_failed_activity,
|
||
],
|
||
),
|
||
# image-gen 队列放生成类步骤,便于后续横向扩容。
|
||
Worker(
|
||
client,
|
||
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
|
||
activities=[run_tryon_activity, run_scene_activity],
|
||
),
|
||
# post-process 队列放增强/融合类步骤。
|
||
Worker(
|
||
client,
|
||
task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE,
|
||
activities=[run_texture_activity, run_face_activity, run_fusion_activity],
|
||
),
|
||
Worker(
|
||
client,
|
||
task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE,
|
||
activities=[run_qc_activity],
|
||
),
|
||
Worker(
|
||
client,
|
||
task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
|
||
activities=[run_export_activity],
|
||
),
|
||
]
|
||
|
||
|
||
async def run_workers() -> None:
|
||
"""启动全部 worker,并保持进程常驻。"""
|
||
|
||
client = await get_temporal_client()
|
||
workers = build_workers(client)
|
||
async with AsyncExitStack() as stack:
|
||
for worker in workers:
|
||
# 逐个把 worker 注册到上下文里,退出时会自动优雅关闭。
|
||
await stack.enter_async_context(worker)
|
||
# 用一个永不触发的 Event 让进程保持存活。
|
||
await asyncio.Event().wait()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run_workers())
|