Files

101 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Temporal worker 启动入口。
可以把 worker 理解成 Temporal 的“执行器进程”:
它负责监听 task queue然后真正执行 workflow / activity。
"""
import asyncio
from contextlib import AsyncExitStack
from temporalio.client import Client
from temporalio.worker import Worker
from app.infra.temporal.client import get_temporal_client
from app.infra.temporal.task_queues import (
IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE,
IMAGE_PIPELINE_QC_TASK_QUEUE,
)
from app.workers.activities.export_activities import run_export_activity
from app.workers.activities.face_activities import run_face_activity
from app.workers.activities.fusion_activities import run_fusion_activity
from app.workers.activities.qc_activities import run_qc_activity
from app.workers.activities.review_activities import (
complete_review_wait_activity,
mark_waiting_for_review_activity,
mark_workflow_failed_activity,
)
from app.workers.activities.scene_activities import run_scene_activity
from app.workers.activities.texture_activities import run_texture_activity
from app.workers.activities.tryon_activities import prepare_model_activity, run_tryon_activity
from app.workers.workflows.low_end_pipeline import LowEndPipelineWorkflow
from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow
def build_workers(client: Client) -> list[Worker]:
"""创建本项目需要的 worker 集合。
这里按 task queue 拆 worker目的是把不同类型的任务分开
- control: 流程控制、review、失败收尾
- image-gen: tryon / scene
- post-process: texture / face / fusion
- qc: 质检
- export: 导出
"""
return [
# control 队列负责 workflow 本体,以及少量“流程状态管理型” activity。
Worker(
client,
task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
workflows=[LowEndPipelineWorkflow, MidEndPipelineWorkflow],
activities=[
prepare_model_activity,
mark_waiting_for_review_activity,
complete_review_wait_activity,
mark_workflow_failed_activity,
],
),
# image-gen 队列放生成类步骤,便于后续横向扩容。
Worker(
client,
task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
activities=[run_tryon_activity, run_scene_activity],
),
# post-process 队列放增强/融合类步骤。
Worker(
client,
task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE,
activities=[run_texture_activity, run_face_activity, run_fusion_activity],
),
Worker(
client,
task_queue=IMAGE_PIPELINE_QC_TASK_QUEUE,
activities=[run_qc_activity],
),
Worker(
client,
task_queue=IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
activities=[run_export_activity],
),
]
async def run_workers() -> None:
"""启动全部 worker并保持进程常驻。"""
client = await get_temporal_client()
workers = build_workers(client)
async with AsyncExitStack() as stack:
for worker in workers:
# 逐个把 worker 注册到上下文里,退出时会自动优雅关闭。
await stack.enter_async_context(worker)
# 用一个永不触发的 Event 让进程保持存活。
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(run_workers())