Source code for gobstopper.tasks.queue

"""Task queue management for Gobstopper framework.

This module provides the core task queue system for Gobstopper, featuring:
- Priority-based task queuing with category separation
- Async worker pool management with graceful shutdown
- Automatic retry logic with exponential backoff
- Persistent storage using DuckDB (optional)
- Feature flag control via environment variable

The task system is disabled by default and must be explicitly enabled
via the WOPR_TASKS_ENABLED environment variable or constructor parameter.
This prevents accidental database creation and resource usage.

Classes:
    NoopStorage: Placeholder storage when tasks are disabled.
    TaskQueue: Main task queue with worker management and execution.

Example:
    Basic task queue usage::

        from gobstopper.tasks.queue import TaskQueue
        from gobstopper.tasks.models import TaskPriority
        import os

        # Enable tasks
        os.environ["WOPR_TASKS_ENABLED"] = "1"

        # Create queue
        queue = TaskQueue(enabled=True)

        # Register task functions
        @queue.register_task("send_email", category="notifications")
        async def send_email(to: str, subject: str):
            # Send email logic
            return {"sent": True, "to": to}

        # Start workers for category
        await queue.start_workers("notifications", worker_count=3)

        # Queue a task
        task_id = await queue.add_task(
            "send_email",
            category="notifications",
            priority=TaskPriority.HIGH,
            max_retries=3,
            "user@example.com",
            subject="Welcome!"
        )

        # Check task status
        task_info = await queue.get_task_info(task_id)
        print(f"Status: {task_info.status}")

        # Shutdown gracefully
        await queue.shutdown()

Environment Variables:
    WOPR_TASKS_ENABLED: Set to "1", "true", "True", or "yes" to enable
        the task system. Defaults to disabled.

Note:
    - Tasks are disabled by default to prevent unintended side effects
    - Storage (DuckDB) is created lazily on first use
    - Each category has its own queue and worker pool
    - Tasks within a category are ordered by priority then FIFO
    - Workers shut down gracefully, completing current tasks
"""

import asyncio
import os
import uuid
from collections import defaultdict
from datetime import datetime
from typing import Callable, Dict, List, Optional

from .models import TaskInfo, TaskStatus, TaskPriority

# Feature flag: tasks disabled by default unless explicitly enabled
_DEFAULT_ENABLED = os.getenv("WOPR_TASKS_ENABLED", "0") in {"1", "true", "True", "yes"}


[docs] class NoopStorage: """A storage implementation that does nothing (used when tasks are disabled). This class provides a null-object pattern implementation of the storage interface, allowing the task queue to function without raising errors when tasks are disabled. All methods are no-ops that return safe defaults. This prevents database file creation and avoids import errors when DuckDB is not installed but tasks are disabled anyway. Example: NoopStorage is used automatically when tasks are disabled:: queue = TaskQueue(enabled=False) # queue.storage will be NoopStorage() # No database file created, no errors raised Note: - All save operations are silently ignored - All query operations return None or empty lists - No exceptions are raised - Used internally by TaskQueue when enabled=False """
[docs] def save_task(self, task_info: TaskInfo): """No-op: task is not saved.""" return
[docs] def get_task(self, task_id: str) -> Optional[TaskInfo]: """No-op: always returns None.""" return None
[docs] def get_tasks(self, **kwargs): """No-op: always returns empty list.""" return []
[docs] def cleanup_old_tasks(self, **kwargs): """No-op: always returns 0 deleted.""" return 0
[docs] class TaskQueue: """Priority-based background task queue with worker pool management. TaskQueue provides a complete async task processing system with: - Category-based task organization (separate queues per category) - Priority ordering within each category queue - Configurable worker pools per category - Automatic retry logic with exponential backoff - Persistent storage using DuckDB (optional) - Graceful shutdown with task completion - Task cancellation and manual retry support The queue system is disabled by default and must be explicitly enabled via environment variable (WOPR_TASKS_ENABLED=1) or constructor parameter. Storage is created lazily on first use to avoid unnecessary database files. Attributes: enabled: Whether the task system is enabled. queues: Dict mapping category names to asyncio.PriorityQueue objects. running_tasks: Dict of currently executing tasks by task_id. workers: Dict mapping category names to lists of worker tasks. shutdown_event: asyncio.Event signaling worker shutdown. task_functions: Dict mapping task names to their registered functions. Example: Complete task queue workflow:: import asyncio from gobstopper.tasks.queue import TaskQueue from gobstopper.tasks.models import TaskPriority, TaskStatus # Enable and create queue queue = TaskQueue(enabled=True) # Register task functions @queue.register_task("send_email", category="notifications") async def send_email(to: str, subject: str, body: str): # Email sending logic await asyncio.sleep(1) # Simulate work return {"sent": True, "to": to} @queue.register_task("process_data", category="analytics") def process_data(data: dict): # Sync functions work too # Processing logic return {"records": len(data)} # Start workers await queue.start_workers("notifications", worker_count=3) await queue.start_workers("analytics", worker_count=5) # Queue tasks with different priorities task1 = await queue.add_task( "send_email", category="notifications", priority=TaskPriority.HIGH, max_retries=3, "user@example.com", subject="Welcome", body="Thanks for signing up!" ) task2 = await queue.add_task( "process_data", category="analytics", priority=TaskPriority.NORMAL, {"users": 1000} ) # Monitor task status while True: info = await queue.get_task_info(task1) if info.is_completed: if info.status == TaskStatus.SUCCESS: print(f"Result: {info.result}") else: print(f"Failed: {info.error}") break await asyncio.sleep(0.5) # Get statistics stats = await queue.get_task_stats() print(f"Total tasks: {stats['total']}") print(f"Running: {stats['running']}") print(f"By status: {stats['by_status']}") # Graceful shutdown await queue.shutdown() Note: - Each category has its own queue and worker pool - Tasks are ordered by priority (descending), then FIFO - Both async and sync task functions are supported - Sync functions run in thread pool executor to avoid blocking - Retry delay uses exponential backoff: min(2^retry_count, 60) seconds - Workers complete current tasks before shutting down """
[docs] def __init__(self, enabled: Optional[bool] = None, storage_factory=None): """Initialize a new TaskQueue. Args: enabled: Whether to enable the task system. If None, reads from WOPR_TASKS_ENABLED environment variable. Defaults to disabled. storage_factory: Optional factory function to create custom storage. If None, uses TaskStorage with DuckDB. Useful for testing. Example: Creating queues with different configurations:: # Use environment variable import os os.environ["WOPR_TASKS_ENABLED"] = "1" queue1 = TaskQueue() # enabled=True from env # Explicitly enable queue2 = TaskQueue(enabled=True) # Explicitly disable queue3 = TaskQueue(enabled=False) # Custom storage for testing from unittest.mock import Mock mock_storage = Mock() queue4 = TaskQueue( enabled=True, storage_factory=lambda: mock_storage ) """ self.enabled = _DEFAULT_ENABLED if enabled is None else enabled self.queues: Dict[str, asyncio.PriorityQueue] = {} self.running_tasks: Dict[str, TaskInfo] = {} self.workers: Dict[str, List[asyncio.Task]] = {} self.shutdown_event = asyncio.Event() self.task_functions: Dict[str, Callable] = {} # Lazy storage: created on first access only if enabled self._storage = None self._storage_factory = storage_factory
@property def storage(self): """Get the storage backend, creating it lazily if needed. Returns NoopStorage if tasks are disabled, otherwise creates and caches a TaskStorage instance (or custom storage from factory). Returns: Storage object implementing save_task, get_task, get_tasks, and cleanup_old_tasks methods. Note: - Storage is created only on first access (lazy initialization) - Returns NoopStorage if enabled=False - Uses storage_factory if provided, otherwise creates TaskStorage - DuckDB is imported only when storage is actually needed """ if not self.enabled: return NoopStorage() if self._storage is None: # Local import to avoid importing duckdb unless needed if self._storage_factory: self._storage = self._storage_factory() else: from .storage import TaskStorage # local import self._storage = TaskStorage() return self._storage
[docs] def register_task(self, name: str, category: str = "default"): """Decorator to register a task function with the queue. Registers a function (sync or async) to be available for task execution. The function can then be queued by name using add_task(). Both regular functions and coroutine functions are supported. Args: name: Unique name to identify this task function. category: Category for organizing tasks. Tasks in the same category share a queue and worker pool. Defaults to "default". Returns: Decorator function that registers and returns the original function. Example: Registering task functions:: queue = TaskQueue(enabled=True) # Register async task @queue.register_task("send_email", category="notifications") async def send_email(to: str, subject: str): # Async email logic return {"sent": True} # Register sync task @queue.register_task("process_file", category="analytics") def process_file(filename: str): # Sync file processing return {"lines": 1000} # Register with default category @queue.register_task("cleanup") def cleanup(): # Cleanup logic pass Note: - Task name must be unique across all categories - Category determines which queue and workers handle the task - Both sync and async functions are supported - The original function is returned unchanged (can be called directly) """ def decorator(func): self.task_functions[name] = func return func return decorator
[docs] async def add_task(self, name: str, category: str = "default", priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 0, *args, **kwargs) -> str: """Add a task to the queue for execution. Creates a TaskInfo object with a unique ID, saves it to storage, and adds it to the appropriate category queue. The task will be picked up by a worker when available. Args: name: Name of the registered task function to execute. category: Category queue to add the task to. Defaults to "default". priority: Task priority for queue ordering. Higher priority tasks execute first. Defaults to TaskPriority.NORMAL. max_retries: Maximum number of retry attempts on failure. Defaults to 0 (no retries). *args: Positional arguments to pass to the task function. **kwargs: Keyword arguments to pass to the task function. Returns: str: Unique task ID (UUID4) for tracking the task. Raises: RuntimeError: If task system is disabled (enabled=False). ValueError: If task name is not registered. Example: Adding tasks with different configurations:: queue = TaskQueue(enabled=True) # Simple task with no args task_id1 = await queue.add_task("cleanup") # Task with positional args task_id2 = await queue.add_task( "send_email", category="notifications", "user@example.com", "Welcome to our service" ) # Task with keyword args and high priority task_id3 = await queue.add_task( "process_order", category="orders", priority=TaskPriority.HIGH, max_retries=3, order_id=12345, customer_id=67890 ) # Critical task with immediate execution task_id4 = await queue.add_task( "send_alert", category="alerts", priority=TaskPriority.CRITICAL, alert_type="security_breach" ) Note: - Task function must be registered before adding to queue - Tasks are stored persistently before queueing - Priority queue ensures higher priority tasks execute first - Category queue is created automatically if it doesn't exist - Workers must be started for the category to process tasks """ if not self.enabled: raise RuntimeError("TaskQueue is disabled. Set WOPR_TASKS_ENABLED=1 to enable.") if name not in self.task_functions: raise ValueError(f"Task '{name}' not registered") task_info = TaskInfo( id=str(uuid.uuid4()), name=name, category=category, priority=priority, status=TaskStatus.PENDING, created_at=datetime.now(), max_retries=max_retries, args=args, kwargs=kwargs ) self.storage.save_task(task_info) if category not in self.queues: self.queues[category] = asyncio.PriorityQueue() await self.queues[category].put((-priority.value, task_info.id, task_info)) return task_info.id
[docs] async def get_task_info(self, task_id: str) -> Optional[TaskInfo]: """Retrieve task information and current status. Checks running tasks first (in-memory), then queries storage for completed or pending tasks. Provides real-time task status. Args: task_id: Unique identifier of the task to retrieve. Returns: TaskInfo object if found, None if task doesn't exist or tasks are disabled. Example: Monitoring task progress:: task_id = await queue.add_task("long_process") # Poll for completion while True: info = await queue.get_task_info(task_id) if info: print(f"Status: {info.status.value}") print(f"Progress: {info.progress}%") print(f"Message: {info.progress_message}") if info.is_completed: if info.status == TaskStatus.SUCCESS: print(f"Result: {info.result}") print(f"Duration: {info.elapsed_seconds}s") else: print(f"Error: {info.error}") break await asyncio.sleep(1) Note: - Returns None if tasks are disabled - Checks running tasks first for immediate status - Falls back to storage for persistent lookup - Real-time progress tracking if task updates progress field """ if not self.enabled: return None # Check running tasks first if task_id in self.running_tasks: return self.running_tasks[task_id] # Check storage return self.storage.get_task(task_id)
[docs] async def cancel_task(self, task_id: str) -> bool: """Cancel a pending task before execution. Marks a task as CANCELLED if it's still pending. Cannot cancel tasks that are already running or completed. Args: task_id: Unique identifier of the task to cancel. Returns: bool: True if task was successfully cancelled, False if task doesn't exist, is not pending, or tasks are disabled. Example: Cancelling tasks conditionally:: # Queue a low priority task task_id = await queue.add_task( "generate_report", priority=TaskPriority.LOW ) # Later, decide to cancel it if await queue.cancel_task(task_id): print("Task cancelled successfully") else: print("Task already started or completed") Note: - Only PENDING tasks can be cancelled - Running tasks cannot be cancelled this way - Cancelled tasks are marked with completed_at timestamp - Returns False if tasks are disabled """ if not self.enabled: return False task_info = await self.get_task_info(task_id) if not task_info: return False if task_info.status == TaskStatus.PENDING: task_info.status = TaskStatus.CANCELLED task_info.completed_at = datetime.now() self.storage.save_task(task_info) return True return False
[docs] async def retry_task(self, task_id: str) -> bool: """Manually retry a failed task. Resets a failed task's status and requeues it for execution. Useful for retrying tasks that failed due to transient issues after the automatic retry limit was reached. Args: task_id: Unique identifier of the task to retry. Returns: bool: True if task was successfully requeued, False if task doesn't exist, is not failed, or tasks are disabled. Example: Manual retry after investigation:: # Check failed task task = await queue.get_task_info(task_id) if task and task.status == TaskStatus.FAILED: print(f"Task failed: {task.error}") # Fix underlying issue (e.g., restore network) # ... # Retry the task if await queue.retry_task(task_id): print("Task requeued for retry") else: print("Could not retry task") Note: - Only FAILED tasks can be manually retried - Resets task status to PENDING - Clears error, timing, and progress information - Does not increment retry_count - Task is added back to its original category queue - Returns False if tasks are disabled """ if not self.enabled: return False task_info = await self.get_task_info(task_id) if not task_info or task_info.status != TaskStatus.FAILED: return False # Reset task status and re-queue task_info.status = TaskStatus.PENDING task_info.started_at = None task_info.completed_at = None task_info.error = None task_info.progress = 0.0 task_info.progress_message = "" self.storage.save_task(task_info) if task_info.category not in self.queues: self.queues[task_info.category] = asyncio.PriorityQueue() await self.queues[task_info.category].put( (-task_info.priority.value, task_info.id, task_info) ) return True
[docs] async def get_task_stats(self) -> dict: """Get aggregate statistics about tasks across all categories. Queries storage for recent tasks and computes statistics including counts by status, counts by category, running tasks, and queued tasks. Returns: dict: Statistics dictionary with keys: - total: Total number of tasks in storage (up to 1000 recent) - by_status: Dict mapping status values to counts - by_category: Dict mapping category names to counts - running: Number of currently executing tasks - queued: Number of tasks waiting in all queues Example: Monitoring queue health:: stats = await queue.get_task_stats() print(f"Total tasks: {stats['total']}") print(f"Currently running: {stats['running']}") print(f"Queued: {stats['queued']}") print("\nBy status:") for status, count in stats['by_status'].items(): print(f" {status}: {count}") print("\nBy category:") for category, count in stats['by_category'].items(): print(f" {category}: {count}") Note: - Returns zero/empty stats if tasks are disabled - Limited to 1000 most recent tasks for performance - by_status uses status string values, not enums - running count is from in-memory tracking - queued count is sum of all category queues """ if not self.enabled: return {"total": 0, "by_status": {}, "by_category": {}, "running": 0, "queued": 0} all_tasks = self.storage.get_tasks(limit=1000) # Get recent tasks stats = { "total": len(all_tasks), "by_status": defaultdict(int), "by_category": defaultdict(int), "running": len(self.running_tasks), "queued": sum(queue.qsize() for queue in self.queues.values()) } for task in all_tasks: stats["by_status"][task.status.value] += 1 stats["by_category"][task.category] += 1 return dict(stats)
[docs] async def start_workers(self, category: str, worker_count: int = 1): """Start worker tasks to process tasks in a category queue. Creates and starts async worker tasks that continuously poll the category queue and execute tasks. Workers run until shutdown is called or they are cancelled. Args: category: Name of the category queue to start workers for. worker_count: Number of worker tasks to create. Defaults to 1. Example: Starting workers for different workloads:: queue = TaskQueue(enabled=True) # Start 3 workers for email notifications await queue.start_workers("notifications", worker_count=3) # Start 10 workers for heavy processing await queue.start_workers("analytics", worker_count=10) # Start 1 worker for low-volume admin tasks await queue.start_workers("admin", worker_count=1) # Now tasks in these categories will be processed Note: - Does nothing if tasks are disabled - Category queue is created automatically if needed - Workers are tracked in self.workers[category] - Multiple calls add more workers to existing pool - Workers process tasks by priority, then FIFO - Each worker handles one task at a time - Workers shut down gracefully on queue.shutdown() """ if not self.enabled: return if category not in self.workers: self.workers[category] = [] for i in range(worker_count): worker = asyncio.create_task(self._worker(category, i)) self.workers[category].append(worker)
[docs] async def shutdown(self): """Shutdown all workers gracefully, completing current tasks. Sets the shutdown event, cancels all worker tasks, and waits for them to finish their current task and exit. Ensures clean shutdown without leaving tasks in inconsistent states. Example: Graceful application shutdown:: queue = TaskQueue(enabled=True) # Register tasks and start workers # ... try: # Run application await app.run() finally: # Ensure workers shut down cleanly await queue.shutdown() print("All workers stopped") Note: - Does nothing if tasks are disabled - Sets shutdown_event to signal workers to stop - Cancels all worker tasks across all categories - Waits for workers to complete current task execution - Safe to call multiple times (idempotent) - Exceptions during worker shutdown are suppressed """ if not self.enabled: return self.shutdown_event.set() # Cancel all workers for workers in self.workers.values(): for worker in workers: worker.cancel() # Wait for workers to finish for workers in self.workers.values(): await asyncio.gather(*workers, return_exceptions=True)
async def _worker(self, category: str, worker_id: int): """Worker coroutine that continuously processes tasks from a category queue. Internal method that runs in a loop, polling the category queue for tasks and executing them. Handles queue timeouts, shutdown signals, and exceptions during task execution. Args: category: Name of the category queue to process tasks from. worker_id: Unique identifier for this worker within the category. Note: - Internal method, not meant to be called directly - Created and managed by start_workers() - Runs until shutdown_event is set or worker is cancelled - Polls queue with 1-second timeout to check shutdown periodically - Delegates actual task execution to _execute_task() - Prints errors to stdout (should use logging in production) - Exits cleanly on CancelledError Worker Lifecycle: 1. Wait for task from queue (with 1s timeout) 2. If timeout, check shutdown event and loop 3. If task received, execute it via _execute_task() 4. Handle any exceptions and continue 5. Exit when shutdown_event is set or cancelled """ if category not in self.queues: self.queues[category] = asyncio.PriorityQueue() queue = self.queues[category] while not self.shutdown_event.is_set(): try: try: priority, task_id, task_info = await asyncio.wait_for(queue.get(), timeout=1.0) except asyncio.TimeoutError: continue await self._execute_task(task_info) except asyncio.CancelledError: break except Exception as e: print(f"Worker {category}-{worker_id} error: {e}") async def _execute_task(self, task_info: TaskInfo): """Execute a single task with retry logic and error handling. Internal method that handles the complete task execution lifecycle: - Updates status to STARTED and saves to storage - Executes the task function (async or sync) - Handles success, failure, and retry logic - Updates timing and progress information - Implements exponential backoff for retries Args: task_info: TaskInfo object containing task metadata and parameters. Task Execution Flow: 1. Verify task function is registered 2. Mark task as STARTED, save to storage 3. Add to running_tasks dict for tracking 4. Execute task function with provided args/kwargs 5. On success: - Mark as SUCCESS - Store result - Set elapsed time and 100% progress 6. On failure: - Increment retry_count - If retries remaining: - Mark as RETRY - Sleep with exponential backoff - Requeue task - If retries exhausted: - Mark as FAILED - Set elapsed time 7. Remove from running_tasks 8. Save final state to storage Retry Logic: - Retries only if retry_count <= max_retries - Backoff delay: min(2^retry_count, 60) seconds - Examples: - 1st retry: 2 seconds - 2nd retry: 4 seconds - 3rd retry: 8 seconds - 6th+ retry: 60 seconds (capped) Note: - Internal method, called by workers - Handles both async and sync task functions - Sync functions run in thread pool executor - All state changes are persisted to storage - Running tasks tracked in-memory for quick lookup - Task function not found is treated as immediate failure """ task_func = self.task_functions.get(task_info.name) if not task_func: task_info.status = TaskStatus.FAILED task_info.error = f"Task function '{task_info.name}' not found" task_info.completed_at = datetime.now() self.storage.save_task(task_info) return task_info.status = TaskStatus.STARTED task_info.started_at = datetime.now() self.running_tasks[task_info.id] = task_info self.storage.save_task(task_info) try: if asyncio.iscoroutinefunction(task_func): result = await task_func(*task_info.args, **task_info.kwargs) else: loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: task_func(*task_info.args, **task_info.kwargs)) task_info.status = TaskStatus.SUCCESS task_info.result = result task_info.completed_at = datetime.now() task_info.elapsed_seconds = (task_info.completed_at - task_info.started_at).total_seconds() task_info.progress = 100.0 except Exception as e: task_info.error = str(e) task_info.retry_count += 1 if task_info.retry_count <= task_info.max_retries: task_info.status = TaskStatus.RETRY await asyncio.sleep(min(2 ** task_info.retry_count, 60)) queue = self.queues[task_info.category] await queue.put((-task_info.priority.value, task_info.id, task_info)) else: task_info.status = TaskStatus.FAILED task_info.completed_at = datetime.now() task_info.elapsed_seconds = (task_info.completed_at - task_info.started_at).total_seconds() finally: if task_info.id in self.running_tasks: del self.running_tasks[task_info.id] self.storage.save_task(task_info)