From d02fc8565f8ef63a550847bfd51b8f5310e3af39 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 27 Mar 2026 00:13:54 +0800 Subject: [PATCH] Add Chinese comments for Temporal workflow flow --- app/application/services/workflow_service.py | 28 ++++++-- app/infra/temporal/client.py | 18 ++++-- app/workers/activities/review_activities.py | 29 +++++++-- app/workers/runner.py | 24 +++++-- app/workers/workflows/low_end_pipeline.py | 40 ++++++++++-- app/workers/workflows/mid_end_pipeline.py | 67 ++++++++++++++++---- app/workers/workflows/types.py | 48 +++++++++----- 7 files changed, 205 insertions(+), 49 deletions(-) diff --git a/app/application/services/workflow_service.py b/app/application/services/workflow_service.py index cc767c7..cb4a7b6 100644 --- a/app/application/services/workflow_service.py +++ b/app/application/services/workflow_service.py @@ -1,4 +1,10 @@ -"""Temporal workflow application service.""" +"""Temporal 工作流服务层。 + +这一层位于 API 和 Temporal 之间,负责: +1. 选择该启动哪个 workflow +2. 发送 signal +3. 查询已持久化的 workflow 状态 +""" from datetime import timedelta @@ -17,18 +23,22 @@ from app.workers.workflows.types import PipelineWorkflowInput, ReviewSignalPaylo class WorkflowService: - """Application service for Temporal workflow orchestration.""" + """Temporal 编排服务。""" @staticmethod def workflow_type_for_mode(service_mode: ServiceMode) -> str: - """Return the workflow class name for a service mode.""" + """根据服务模式返回对应的 workflow 类型名。""" if service_mode == ServiceMode.AUTO_BASIC: return LowEndPipelineWorkflow.__name__ return MidEndPipelineWorkflow.__name__ async def start_workflow(self, workflow_input: PipelineWorkflowInput) -> None: - """Start the appropriate Temporal workflow for an order.""" + """为订单启动对应的 Temporal workflow。 + + 这里做的只是“发起执行”: + 真正的流水线顺序仍然在 workflow 类里定义。 + """ client = await get_temporal_client() workflow_id = f"order-{workflow_input.order_id}" @@ -40,6 +50,7 @@ class WorkflowService: await client.start_workflow( workflow_callable, workflow_input, + # workflow_id 固定为 order-{order_id},方便 API 后续按订单回查。 id=workflow_id, task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, run_timeout=timedelta(minutes=30), @@ -47,14 +58,19 @@ class WorkflowService: ) async def signal_review(self, workflow_id: str, payload: ReviewSignalPayload) -> None: - """Send a review signal to a running Temporal workflow.""" + """向运行中的 workflow 发送审核 signal。""" client = await get_temporal_client() handle = client.get_workflow_handle(workflow_id=workflow_id) + # "submit_review" 对应 workflow 里用 @workflow.signal 标记的方法名。 await handle.signal("submit_review", payload) async def get_workflow_status(self, session, order_id: int) -> WorkflowStatusResponse: - """Return persisted workflow execution state for an order.""" + """返回订单对应的已持久化 workflow 状态。 + + 这里查的是我们自己数据库里的状态镜像,不是直接去 Temporal history 现查。 + 这么做更适合业务 API 对外暴露。 + """ result = await session.execute( select(WorkflowRunORM) diff --git a/app/infra/temporal/client.py b/app/infra/temporal/client.py index e471ec2..46e159b 100644 --- a/app/infra/temporal/client.py +++ b/app/infra/temporal/client.py @@ -1,4 +1,8 @@ -"""Temporal client helpers.""" +"""Temporal client 辅助函数。 + +API 和 worker 都需要连接 Temporal Server。 +这里做了一个简单的单例缓存,避免重复建立连接。 +""" import asyncio @@ -11,13 +15,17 @@ _client_lock = asyncio.Lock() async def get_temporal_client() -> Client: - """Return a cached Temporal client.""" + """返回缓存后的 Temporal Client。 + + 第一次调用时才真正连接 Temporal;后续复用同一个 client。 + """ global _client if _client is not None: return _client async with _client_lock: + # 双重检查,避免并发场景下重复 connect。 if _client is None: settings = get_settings() _client = await Client.connect( @@ -28,8 +36,10 @@ async def get_temporal_client() -> Client: def set_temporal_client(client: Client | None) -> None: - """Override the cached Temporal client, primarily for tests.""" + """覆盖缓存的 Temporal Client。 + + 主要用于测试场景,把真实连接替换成 Temporal 测试环境里的 client。 + """ global _client _client = client - diff --git a/app/workers/activities/review_activities.py b/app/workers/activities/review_activities.py index 73e5d32..4542306 100644 --- a/app/workers/activities/review_activities.py +++ b/app/workers/activities/review_activities.py @@ -1,4 +1,8 @@ -"""Review state management mock activities.""" +"""审核相关 activity。 + +这里的函数都运行在 worker 侧,可以安全地做数据库 I/O。 +workflow 本身只负责调用这些 activity,不直接写数据库。 +""" from sqlalchemy import select @@ -18,7 +22,13 @@ from app.workers.workflows.types import ( @activity.defn async def mark_waiting_for_review_activity(payload: ReviewWaitActivityInput) -> None: - """Mark a workflow as waiting for a human review.""" + """把 workflow 标记为等待人工审核。 + + 这一步会做三件事: + 1. 新增一条 review 类型的 workflow_step,状态是 waiting + 2. 新增一条 review_task,供 API 查询待审核列表 + 3. 把订单和 workflow_run 都切到 waiting_review + """ async with get_session_factory()() as session: order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) @@ -47,7 +57,12 @@ async def mark_waiting_for_review_activity(payload: ReviewWaitActivityInput) -> @activity.defn async def complete_review_wait_activity(payload: ReviewResolutionActivityInput) -> None: - """Resolve the current waiting-review step before the next branch runs.""" + """收口当前这次 waiting_review。 + + 这里的职责不是决定后续怎么跑,而是把“等待审核”这个数据库状态结束掉: + - approve / rerun -> review step 记为 succeeded + - reject -> review step 记为 failed + """ async with get_session_factory()() as session: order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) @@ -62,6 +77,8 @@ async def complete_review_wait_activity(payload: ReviewResolutionActivityInput) ) review_step = step_result.scalars().first() if review_step is not None: + # 只处理仍处于 waiting 的那条 review_step, + # 避免重复 signal 把历史 review 记录覆盖掉。 review_step.step_status = ( StepStatus.FAILED if payload.decision == ReviewDecision.REJECT else StepStatus.SUCCEEDED ) @@ -82,7 +99,11 @@ async def complete_review_wait_activity(payload: ReviewResolutionActivityInput) @activity.defn async def mark_workflow_failed_activity(payload: WorkflowFailureActivityInput) -> None: - """Mark the persisted workflow state as failed.""" + """把订单和 workflow_run 持久化为失败。 + + 这个 activity 是 workflow 的“兜底收尾器”: + 当任意步骤抛异常时,workflow 调它把数据库状态补完整。 + """ async with get_session_factory()() as session: order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id) diff --git a/app/workers/runner.py b/app/workers/runner.py index 29634c3..fbdfb38 100644 --- a/app/workers/runner.py +++ b/app/workers/runner.py @@ -1,4 +1,8 @@ -"""Temporal worker runner.""" +"""Temporal worker 启动入口。 + +可以把 worker 理解成 Temporal 的“执行器进程”: +它负责监听 task queue,然后真正执行 workflow / activity。 +""" import asyncio from contextlib import AsyncExitStack @@ -31,9 +35,18 @@ from app.workers.workflows.mid_end_pipeline import MidEndPipelineWorkflow def build_workers(client: Client) -> list[Worker]: - """Create the worker set needed for the task queues in this MVP.""" + """创建本项目需要的 worker 集合。 + + 这里按 task queue 拆 worker,目的是把不同类型的任务分开: + - control: 流程控制、review、失败收尾 + - image-gen: tryon / scene + - post-process: texture / face / fusion + - qc: 质检 + - export: 导出 + """ return [ + # control 队列负责 workflow 本体,以及少量“流程状态管理型” activity。 Worker( client, task_queue=IMAGE_PIPELINE_CONTROL_TASK_QUEUE, @@ -45,11 +58,13 @@ def build_workers(client: Client) -> list[Worker]: mark_workflow_failed_activity, ], ), + # image-gen 队列放生成类步骤,便于后续横向扩容。 Worker( client, task_queue=IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE, activities=[run_tryon_activity, run_scene_activity], ), + # post-process 队列放增强/融合类步骤。 Worker( client, task_queue=IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE, @@ -69,16 +84,17 @@ def build_workers(client: Client) -> list[Worker]: async def run_workers() -> None: - """Start all Temporal workers and keep the process alive.""" + """启动全部 worker,并保持进程常驻。""" client = await get_temporal_client() workers = build_workers(client) async with AsyncExitStack() as stack: for worker in workers: + # 逐个把 worker 注册到上下文里,退出时会自动优雅关闭。 await stack.enter_async_context(worker) + # 用一个永不触发的 Event 让进程保持存活。 await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(run_workers()) - diff --git a/app/workers/workflows/low_end_pipeline.py b/app/workers/workflows/low_end_pipeline.py index bdb1b4b..35ba9f1 100644 --- a/app/workers/workflows/low_end_pipeline.py +++ b/app/workers/workflows/low_end_pipeline.py @@ -1,10 +1,18 @@ -"""Low-end image pipeline workflow.""" +"""低端图片流水线工作流。 + +这个文件里的 `workflow` 只负责“编排顺序”和“等待结果”, +不直接访问数据库、不做真实 I/O。 +真正的落库和 mock 执行都放在 activity 里完成。 +""" from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy +# Temporal workflow 代码需要尽量保持可重放(deterministic)。 +# 这里导入的模块会在 workflow 外部执行,所以用 imports_passed_through +# 明确告诉 SDK:这些导入不是 workflow 重放逻辑的一部分。 with workflow.unsafe.imports_passed_through(): from app.domain.enums import OrderStatus, WorkflowStepName from app.infra.temporal.task_queues import ( @@ -35,14 +43,27 @@ ACTIVITY_RETRY_POLICY = RetryPolicy( @workflow.defn class LowEndPipelineWorkflow: - """Low-end fully automated image pipeline.""" + """低端全自动工作流。 + + 它对应的是一条从头跑到尾、不需要人工介入的流水线: + prepare_model -> tryon -> scene -> qc -> export + """ @workflow.run async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]: - """Execute the low-end workflow from start to finish.""" + """执行低端工作流主流程。 + 可以把这里理解成“时序控制器”: + 1. 按顺序调 activity + 2. 把上一步产物传给下一步 + 3. 出错时统一标记 workflow 失败 + """ + + # current_step 用来在异常时把失败位置持久化到数据库。 current_step = WorkflowStepName.PREPARE_MODEL try: + # 每个步骤都通过 execute_activity 触发真正执行。 + # workflow 自己不做计算,只负责调度。 prepared = await workflow.execute_activity( prepare_model_activity, StepActivityInput( @@ -60,6 +81,7 @@ class LowEndPipelineWorkflow: ) current_step = WorkflowStepName.TRYON + # 下游步骤通过 source_asset_id 引用上一步生成的资产。 tryon_result = await workflow.execute_activity( run_tryon_activity, StepActivityInput( @@ -90,6 +112,8 @@ class LowEndPipelineWorkflow: ) current_step = WorkflowStepName.QC + # QC 是流程里的“闸门”。 + # 如果这里不通过,低端流程直接失败,不会再 export。 qc_result = await workflow.execute_activity( run_qc_activity, StepActivityInput( @@ -108,6 +132,8 @@ class LowEndPipelineWorkflow: return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} current_step = WorkflowStepName.EXPORT + # candidate_asset_ids 是 QC 推荐可导出的候选资产。 + # 当前 MVP 只会返回一个候选;如果没有,就退回 scene 结果导出。 final_result = await workflow.execute_activity( run_export_activity, StepActivityInput( @@ -126,6 +152,7 @@ class LowEndPipelineWorkflow: "final_asset_id": final_result.asset_id, } except Exception as exc: + # workflow 出异常时,额外调一个 activity 把数据库状态补齐。 await self._mark_failed(payload, current_step, str(exc)) raise @@ -135,7 +162,11 @@ class LowEndPipelineWorkflow: current_step: WorkflowStepName, message: str, ) -> None: - """Persist workflow failure state.""" + """持久化失败状态。 + + 注意这里仍然通过 activity 落库,而不是在 workflow 里直连数据库。 + 这样能保持 workflow 的职责单一:只编排,不做外部副作用。 + """ await workflow.execute_activity( mark_workflow_failed_activity, @@ -149,4 +180,3 @@ class LowEndPipelineWorkflow: start_to_close_timeout=ACTIVITY_TIMEOUT, retry_policy=ACTIVITY_RETRY_POLICY, ) - diff --git a/app/workers/workflows/mid_end_pipeline.py b/app/workers/workflows/mid_end_pipeline.py index 406a8b6..6aa9f8c 100644 --- a/app/workers/workflows/mid_end_pipeline.py +++ b/app/workers/workflows/mid_end_pipeline.py @@ -1,10 +1,18 @@ -"""Mid-end image pipeline workflow with review signal support.""" +"""中端图片流水线工作流。 + +和低端流程相比,这里最大的区别是: +1. 会在 QC 之后停在 waiting_review +2. 通过 Temporal signal 接收人工审核结果 +3. 可以按审核意见回流到 scene / face / fusion 重新跑 +""" from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy +# 这些导入属于 workflow 外部世界的对象,明确标记为 pass-through, +# 避免把它们当成需要重放的 workflow 逻辑一部分。 with workflow.unsafe.imports_passed_through(): from app.domain.enums import OrderStatus, ReviewDecision, WorkflowStepName from app.infra.temporal.task_queues import ( @@ -47,21 +55,36 @@ ACTIVITY_RETRY_POLICY = RetryPolicy( @workflow.defn class MidEndPipelineWorkflow: - """Mid-end workflow that pauses for human review and supports reruns.""" + """中端半自动工作流。 + + 这个 workflow 会经历“自动生成 -> 等待人工审核 -> 按审核意见继续”。 + """ def __init__(self) -> None: + # signal 到达后,先暂存在 workflow 内存里, + # 主流程再通过 wait_condition 继续往下走。 self._review_payload: ReviewSignalPayload | None = None @workflow.signal def submit_review(self, payload: ReviewSignalPayload) -> None: - """Receive a review decision from the API layer.""" + """接收 API 层发来的审核 signal。 + + 这一步不会直接继续执行,只是把审核结果写进 workflow 内存状态。 + 真正恢复主流程是在 `_wait_for_review` 里。 + """ self._review_payload = payload @workflow.run async def run(self, payload: PipelineWorkflowInput) -> dict[str, int | str | None]: - """Execute the mid-end workflow with a human review loop.""" + """执行中端工作流主流程。 + 主线是: + prepare_model -> tryon -> scene -> texture -> face -> fusion -> qc + -> waiting_review -> approve/export 或 rerun + """ + + # current_step 用于失败时记录“最后跑到哪一步”。 current_step = WorkflowStepName.PREPARE_MODEL try: prepared = await workflow.execute_activity( @@ -113,8 +136,14 @@ class MidEndPipelineWorkflow: await self._mark_failed(payload, current_step, qc_result.message) return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + # 中端流程会一直循环到: + # 1. 审核 approve 然后 export 成功 + # 2. 审核 reject 直接结束 + # 3. rerun 后再次回到 waiting_review,继续等下一次人工输入 while True: current_step = WorkflowStepName.REVIEW + # 这里通过 activity 把数据库里的订单状态更新成 waiting_review, + # 同时创建 review_task,供 API 查询待审核列表。 await workflow.execute_activity( mark_waiting_for_review_activity, ReviewWaitActivityInput( @@ -127,7 +156,11 @@ class MidEndPipelineWorkflow: retry_policy=ACTIVITY_RETRY_POLICY, ) + # workflow 在这里“停住”,直到外部 signal 进来。 review_payload = await self._wait_for_review() + + # signal 到达后,先把 review 这一步的等待态收口成已处理, + # 这样数据库里的 review_step / review_task 状态是完整的。 await workflow.execute_activity( complete_review_wait_activity, ReviewResolutionActivityInput( @@ -145,6 +178,8 @@ class MidEndPipelineWorkflow: if review_payload.decision == ReviewDecision.APPROVE: current_step = WorkflowStepName.EXPORT + # 如果审核人显式选了资产,就导出该资产; + # 否则默认导出 QC 候选资产。 export_source_id = review_payload.selected_asset_id if export_source_id is None: export_source_id = (qc_result.candidate_asset_ids or [fusion_result.asset_id])[0] @@ -167,8 +202,11 @@ class MidEndPipelineWorkflow: } if review_payload.decision == ReviewDecision.REJECT: + # reject 不再重跑,直接结束。 return {"order_id": payload.order_id, "status": OrderStatus.FAILED.value, "final_asset_id": None} + # rerun 的核心思想是: + # 把指定节点后的链路重新跑一遍,然后再次进入 QC 和 waiting_review。 if review_payload.decision == ReviewDecision.RERUN_SCENE: current_step = WorkflowStepName.SCENE scene_result = await self._run_scene(payload, tryon_result.asset_id) @@ -197,7 +235,11 @@ class MidEndPipelineWorkflow: raise async def _wait_for_review(self) -> ReviewSignalPayload: - """Suspend the workflow until a review signal arrives.""" + """等待人工审核 signal。 + + `workflow.wait_condition` 是 Temporal 里很常见的等待方式: + workflow 会被安全地挂起,不会像普通 while + sleep 那样空转占资源。 + """ if self._review_payload is None: await workflow.wait_condition(lambda: self._review_payload is not None) @@ -207,7 +249,10 @@ class MidEndPipelineWorkflow: return review_payload async def _run_scene(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: - """Execute the scene activity.""" + """执行 scene activity。 + + 抽成私有方法后,rerun_scene 时可以直接复用,不需要复制整段 activity 调用代码。 + """ return await workflow.execute_activity( run_scene_activity, @@ -224,7 +269,7 @@ class MidEndPipelineWorkflow: ) async def _run_texture(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: - """Execute the texture activity.""" + """执行 texture activity。""" return await workflow.execute_activity( run_texture_activity, @@ -240,7 +285,7 @@ class MidEndPipelineWorkflow: ) async def _run_face(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: - """Execute the face activity.""" + """执行 face activity。""" return await workflow.execute_activity( run_face_activity, @@ -261,7 +306,7 @@ class MidEndPipelineWorkflow: source_asset_id: int | None, face_asset_id: int | None, ) -> MockActivityResult: - """Execute the fusion activity.""" + """执行 fusion activity。""" return await workflow.execute_activity( run_fusion_activity, @@ -278,7 +323,7 @@ class MidEndPipelineWorkflow: ) async def _run_qc(self, payload: PipelineWorkflowInput, source_asset_id: int | None) -> MockActivityResult: - """Execute the QC activity.""" + """执行 QC activity。""" return await workflow.execute_activity( run_qc_activity, @@ -299,7 +344,7 @@ class MidEndPipelineWorkflow: current_step: WorkflowStepName, message: str, ) -> None: - """Persist workflow failure state.""" + """持久化 workflow 失败状态。""" await workflow.execute_activity( mark_workflow_failed_activity, diff --git a/app/workers/workflows/types.py b/app/workers/workflows/types.py index 92a0cee..aee6eae 100644 --- a/app/workers/workflows/types.py +++ b/app/workers/workflows/types.py @@ -1,4 +1,8 @@ -"""Shared workflow and activity payload types.""" +"""workflow / activity 之间共享的数据类型。 + +这些 dataclass 是 Temporal 编排层最关键的“消息格式”: +workflow 把它们发给 activity,activity 再把结果返回给 workflow。 +""" from dataclasses import dataclass, field from enum import Enum @@ -8,7 +12,11 @@ from app.domain.enums import CustomerLevel, OrderStatus, ReviewDecision, Service def _coerce_enum(value: Any, enum_cls: type[Enum]) -> Any: - """Coerce raw Temporal payload values back into enum instances.""" + """把 Temporal 反序列化后的原始值转回枚举。 + + 在某些序列化场景下,枚举值可能先变成字符串,甚至被拆成字符列表。 + 这里统一做一次归一化,避免后面写数据库时类型不对。 + """ if value is None or isinstance(value, enum_cls): return value @@ -19,7 +27,10 @@ def _coerce_enum(value: Any, enum_cls: type[Enum]) -> Any: @dataclass(slots=True) class PipelineWorkflowInput: - """Temporal workflow input for an image pipeline order.""" + """工作流启动输入。 + + 这是一张订单进入 workflow 时携带的最小上下文。 + """ order_id: int workflow_run_id: int @@ -31,7 +42,7 @@ class PipelineWorkflowInput: scene_ref_asset_id: int def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.customer_level = _coerce_enum(self.customer_level, CustomerLevel) self.service_mode = _coerce_enum(self.service_mode, ServiceMode) @@ -39,7 +50,14 @@ class PipelineWorkflowInput: @dataclass(slots=True) class StepActivityInput: - """Input payload shared by the mock pipeline activities.""" + """通用 activity 输入。 + + 大多数图片处理步骤都只需要: + - 当前订单 + - 当前 workflow_run + - 这一步叫什么 + - 上一步产出的 asset_id + """ order_id: int workflow_run_id: int @@ -53,14 +71,14 @@ class StepActivityInput: metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.step_name = _coerce_enum(self.step_name, WorkflowStepName) @dataclass(slots=True) class MockActivityResult: - """Common mock activity result structure.""" + """通用 mock activity 返回结构。""" step_name: WorkflowStepName success: bool @@ -73,14 +91,14 @@ class MockActivityResult: metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.step_name = _coerce_enum(self.step_name, WorkflowStepName) @dataclass(slots=True) class ReviewSignalPayload: - """Signal payload sent from the API to the mid-end workflow.""" + """API 发给中端 workflow 的审核 signal 载荷。""" decision: ReviewDecision reviewer_id: int @@ -88,14 +106,14 @@ class ReviewSignalPayload: comment: str | None = None def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.decision = _coerce_enum(self.decision, ReviewDecision) @dataclass(slots=True) class ReviewWaitActivityInput: - """Input for marking a workflow as waiting for review.""" + """把流程切到 waiting_review 时传给 activity 的输入。""" order_id: int workflow_run_id: int @@ -105,7 +123,7 @@ class ReviewWaitActivityInput: @dataclass(slots=True) class ReviewResolutionActivityInput: - """Input for completing a waiting review state.""" + """审核结果到达后,用于结束 waiting_review 的输入。""" order_id: int workflow_run_id: int @@ -115,14 +133,14 @@ class ReviewResolutionActivityInput: comment: str | None = None def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.decision = _coerce_enum(self.decision, ReviewDecision) @dataclass(slots=True) class WorkflowFailureActivityInput: - """Input for marking a workflow as failed.""" + """流程失败收尾 activity 的输入。""" order_id: int workflow_run_id: int @@ -131,7 +149,7 @@ class WorkflowFailureActivityInput: status: OrderStatus = OrderStatus.FAILED def __post_init__(self) -> None: - """Normalize enum-like values after Temporal deserialization.""" + """在反序列化后把枚举字段修正回来。""" self.current_step = _coerce_enum(self.current_step, WorkflowStepName) self.status = _coerce_enum(self.status, OrderStatus)