Files
auto-virtual-tryon/app/workers/activities/qc_activities.py

81 lines
3.3 KiB
Python

"""Quality-control mock activity."""
from temporalio import activity
from app.domain.enums import AssetType, OrderStatus, StepStatus
from app.infra.db.models.asset import AssetORM
from app.infra.db.session import get_session_factory
from app.workers.activities.tryon_activities import create_step_record, jsonable, load_order_and_run, mock_uri, utc_now
from app.workers.workflows.types import MockActivityResult, StepActivityInput
@activity.defn
async def run_qc_activity(payload: StepActivityInput) -> MockActivityResult:
"""Mock automated quality control."""
async with get_session_factory()() as session:
order, workflow_run = await load_order_and_run(session, payload.order_id, payload.workflow_run_id)
step = create_step_record(payload)
session.add(step)
order.status = OrderStatus.RUNNING
workflow_run.status = OrderStatus.RUNNING
workflow_run.current_step = payload.step_name
await session.flush()
try:
passed = not payload.metadata.get("force_fail", False)
candidate_asset_ids: list[int] = []
candidate_uri: str | None = None
if passed:
if payload.source_asset_id is None:
raise ValueError("run_qc_activity requires source_asset_id")
source_asset = await session.get(AssetORM, payload.source_asset_id)
if source_asset is None:
raise ValueError(f"Source asset {payload.source_asset_id} not found")
if source_asset.order_id != payload.order_id:
raise ValueError(
f"Source asset {payload.source_asset_id} does not belong to order {payload.order_id}"
)
candidate = AssetORM(
order_id=payload.order_id,
asset_type=AssetType.QC_CANDIDATE,
step_name=payload.step_name,
uri=source_asset.uri,
metadata_json=jsonable({"source_asset_id": payload.source_asset_id}),
)
session.add(candidate)
await session.flush()
candidate_asset_ids = [candidate.id]
candidate_uri = candidate.uri
result = MockActivityResult(
step_name=payload.step_name,
success=True,
asset_id=candidate_asset_ids[0] if candidate_asset_ids else None,
uri=candidate_uri,
score=0.95 if passed else 0.35,
passed=passed,
message="mock success" if passed else "mock qc rejected",
candidate_asset_ids=candidate_asset_ids,
metadata={"source_asset_id": payload.source_asset_id},
)
step.step_status = StepStatus.SUCCEEDED if passed else StepStatus.FAILED
step.output_json = jsonable(result)
step.error_message = None if passed else "QC rejected the asset"
step.ended_at = utc_now()
await session.commit()
return result
except Exception as exc:
step.step_status = StepStatus.FAILED
step.error_message = str(exc)
step.ended_at = utc_now()
order.status = OrderStatus.FAILED
workflow_run.status = OrderStatus.FAILED
await session.commit()
raise