Files

222 lines
9.4 KiB
Python

"""Review application service."""
from fastapi import HTTPException, status
from sqlalchemy import select
from app.api.schemas.revision import ConfirmRevisionRequest, ConfirmRevisionResponse
from app.api.schemas.review import PendingReviewResponse, SubmitReviewRequest, SubmitReviewResponse
from app.application.services.revision_service import RevisionService
from app.application.services.workflow_service import WorkflowService
from app.domain.enums import OrderStatus, ReviewDecision, ReviewTaskStatus
from app.infra.db.models.asset import AssetORM
from app.infra.db.models.order import OrderORM
from app.infra.db.models.review_task import ReviewTaskORM
from app.infra.db.models.workflow_run import WorkflowRunORM
from app.workers.workflows.types import ReviewSignalPayload
class ReviewService:
"""Application service for review flows."""
def __init__(self) -> None:
self.workflow_service = WorkflowService()
self.revision_service = RevisionService()
async def list_pending_reviews(self, session) -> list[PendingReviewResponse]:
"""Return all pending review tasks."""
result = await session.execute(
select(ReviewTaskORM, WorkflowRunORM)
.join(WorkflowRunORM, WorkflowRunORM.order_id == ReviewTaskORM.order_id)
.where(ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED]))
.order_by(ReviewTaskORM.created_at.asc())
)
pending_reviews = []
for review_task, workflow_run in result.all():
snapshot = await self.revision_service.get_revision_snapshot(session, review_task.order_id)
pending_reviews.append(
PendingReviewResponse(
review_task_id=review_task.id,
order_id=review_task.order_id,
workflow_id=workflow_run.workflow_id,
current_step=workflow_run.current_step,
review_task_status=review_task.status,
latest_revision_asset_id=snapshot.latest_revision_asset_id,
current_revision_asset_id=snapshot.current_revision_asset_id,
latest_revision_version=snapshot.latest_revision_version,
revision_count=snapshot.revision_count,
pending_manual_confirm=snapshot.pending_manual_confirm,
created_at=review_task.created_at,
)
)
return pending_reviews
async def submit_review(self, session, order_id: int, payload: SubmitReviewRequest) -> SubmitReviewResponse:
"""Persist the review submission and signal the Temporal workflow."""
order = await session.get(OrderORM, order_id)
if order is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found")
if order.status != OrderStatus.WAITING_REVIEW:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Order is not waiting for review",
)
workflow_result = await session.execute(
select(WorkflowRunORM).where(WorkflowRunORM.order_id == order_id)
)
workflow_run = workflow_result.scalar_one_or_none()
if workflow_run is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found")
if payload.selected_asset_id is not None:
asset = await session.get(AssetORM, payload.selected_asset_id)
if asset is None or asset.order_id != order_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Selected asset does not belong to the order",
)
pending_result = await session.execute(
select(ReviewTaskORM)
.where(
ReviewTaskORM.order_id == order_id,
ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED]),
)
.order_by(ReviewTaskORM.created_at.desc())
)
review_task = pending_result.scalars().first()
if review_task is None:
review_task = ReviewTaskORM(order_id=order_id, status=ReviewTaskStatus.SUBMITTED)
session.add(review_task)
current_task_status = review_task.status
selected_asset_id = payload.selected_asset_id
if selected_asset_id is None and current_task_status == ReviewTaskStatus.REVISION_UPLOADED:
selected_asset_id = review_task.resume_asset_id or review_task.latest_revision_asset_id
if selected_asset_id is not None:
asset = await session.get(AssetORM, selected_asset_id)
if asset is None or asset.order_id != order_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Selected asset does not belong to the order",
)
review_task.status = ReviewTaskStatus.SUBMITTED
review_task.decision = payload.decision
review_task.reviewer_id = payload.reviewer_id
review_task.selected_asset_id = selected_asset_id
review_task.comment = payload.comment
await session.commit()
try:
await self.workflow_service.signal_review(
workflow_run.workflow_id,
ReviewSignalPayload(
decision=payload.decision,
reviewer_id=payload.reviewer_id,
selected_asset_id=selected_asset_id,
comment=payload.comment,
),
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to signal Temporal workflow: {exc}",
) from exc
return SubmitReviewResponse(
order_id=order_id,
workflow_id=workflow_run.workflow_id,
decision=payload.decision,
status="submitted",
)
async def confirm_revision_continue(
self,
session,
order_id: int,
payload: ConfirmRevisionRequest,
) -> ConfirmRevisionResponse:
"""Confirm the latest revision and resume the existing workflow."""
order = await session.get(OrderORM, order_id)
if order is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Order not found")
if order.status != OrderStatus.WAITING_REVIEW:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Order is not waiting for review",
)
workflow_result = await session.execute(
select(WorkflowRunORM).where(WorkflowRunORM.order_id == order_id)
)
workflow_run = workflow_result.scalar_one_or_none()
if workflow_run is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found")
review_result = await session.execute(
select(ReviewTaskORM)
.where(
ReviewTaskORM.order_id == order_id,
ReviewTaskORM.status.in_([ReviewTaskStatus.PENDING, ReviewTaskStatus.REVISION_UPLOADED]),
)
.order_by(ReviewTaskORM.created_at.desc(), ReviewTaskORM.id.desc())
)
review_task = review_result.scalars().first()
if review_task is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="No active review task")
if review_task.status != ReviewTaskStatus.REVISION_UPLOADED:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="No uploaded revision to confirm",
)
revision_asset_id = review_task.resume_asset_id or review_task.latest_revision_asset_id
if revision_asset_id is None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="No uploaded revision to confirm",
)
revision_asset = await session.get(AssetORM, revision_asset_id)
if revision_asset is None or revision_asset.order_id != order_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Revision asset does not belong to the order",
)
review_task.status = ReviewTaskStatus.SUBMITTED
review_task.decision = ReviewDecision.APPROVE
review_task.reviewer_id = payload.reviewer_id
review_task.selected_asset_id = revision_asset_id
review_task.comment = payload.comment
await session.commit()
try:
await self.workflow_service.signal_review(
workflow_run.workflow_id,
ReviewSignalPayload(
decision=ReviewDecision.APPROVE,
reviewer_id=payload.reviewer_id,
selected_asset_id=revision_asset_id,
comment=payload.comment,
),
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to signal Temporal workflow: {exc}",
) from exc
return ConfirmRevisionResponse(
order_id=order_id,
workflow_id=workflow_run.workflow_id,
revision_asset_id=revision_asset_id,
decision=ReviewDecision.APPROVE,
status="submitted",
)