Files
auto-virtual-tryon/tests/test_workflow_timeouts.py

34 lines
1.4 KiB
Python

"""Tests for workflow activity timeout policies."""
from datetime import timedelta
from app.infra.temporal.task_queues import (
IMAGE_PIPELINE_CONTROL_TASK_QUEUE,
IMAGE_PIPELINE_EXPORT_TASK_QUEUE,
IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE,
IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE,
IMAGE_PIPELINE_QC_TASK_QUEUE,
)
from app.workers.workflows.timeout_policy import (
DEFAULT_ACTIVITY_TIMEOUT,
LONG_RUNNING_ACTIVITY_TIMEOUT,
activity_timeout_for_task_queue,
)
def test_activity_timeout_for_task_queue_uses_long_timeout_for_image_work():
"""Image generation and post-processing queues should get a longer timeout."""
assert DEFAULT_ACTIVITY_TIMEOUT == timedelta(seconds=30)
assert LONG_RUNNING_ACTIVITY_TIMEOUT == timedelta(minutes=5)
assert activity_timeout_for_task_queue(IMAGE_PIPELINE_IMAGE_GEN_TASK_QUEUE) == LONG_RUNNING_ACTIVITY_TIMEOUT
assert activity_timeout_for_task_queue(IMAGE_PIPELINE_POST_PROCESS_TASK_QUEUE) == LONG_RUNNING_ACTIVITY_TIMEOUT
def test_activity_timeout_for_task_queue_keeps_short_timeout_for_light_steps():
"""Control, QC, and export queues should stay on the short timeout."""
assert activity_timeout_for_task_queue(IMAGE_PIPELINE_CONTROL_TASK_QUEUE) == DEFAULT_ACTIVITY_TIMEOUT
assert activity_timeout_for_task_queue(IMAGE_PIPELINE_QC_TASK_QUEUE) == DEFAULT_ACTIVITY_TIMEOUT
assert activity_timeout_for_task_queue(IMAGE_PIPELINE_EXPORT_TASK_QUEUE) == DEFAULT_ACTIVITY_TIMEOUT