Background Task System

Gobstopper includes a sophisticated background task system for handling long-running operations, periodic jobs, and asynchronous work. The system features DuckDB persistence, priority queues, automatic retries, and intelligent worker management.

Overview

The background task system consists of:

  • TaskQueue: Main queue manager with worker coordination

  • TaskInfo: Rich task metadata and status tracking

  • TaskStorage: DuckDB-based persistence with intelligent cleanup

  • Priority System: Four-level priority system with queue management

  • Retry Logic: Configurable automatic retry with exponential backoff

Core Components

Task Models

TaskStatus Enum

from gobstopper.tasks.models import TaskStatus

# Available task statuses
TaskStatus.PENDING    # Task queued, waiting for worker
TaskStatus.STARTED    # Task currently executing
TaskStatus.SUCCESS    # Task completed successfully
TaskStatus.FAILED     # Task failed after all retries
TaskStatus.CANCELLED  # Task was cancelled before execution
TaskStatus.RETRY      # Task failed but will retry

TaskPriority Enum

from gobstopper.tasks.models import TaskPriority

# Priority levels (higher number = higher priority)
TaskPriority.LOW      # Value: 1 - Background cleanup, maintenance
TaskPriority.NORMAL   # Value: 2 - Regular user requests
TaskPriority.HIGH     # Value: 3 - Important operations
TaskPriority.CRITICAL # Value: 4 - System-critical tasks

TaskInfo Class

from gobstopper.tasks.models import TaskInfo

# Task information contains:
task_info = TaskInfo(
    id="uuid-string",              # Unique task identifier
    name="send_email",             # Registered task name
    category="notifications",       # Task category for worker routing
    priority=TaskPriority.HIGH,    # Task priority
    status=TaskStatus.PENDING,     # Current status
    created_at=datetime.now(),     # Creation timestamp
    started_at=None,               # Execution start time
    completed_at=None,             # Completion timestamp
    elapsed_seconds=0.0,           # Execution duration
    result=None,                   # Task return value
    error=None,                    # Error message if failed
    retry_count=0,                 # Current retry attempt
    max_retries=3,                 # Maximum retry attempts
    args=(),                       # Positional arguments
    kwargs={},                     # Keyword arguments
    progress=0.0,                  # Progress percentage (0-100)
    progress_message=""            # Progress description
)

# Convenience properties
print(task_info.is_running)      # True if status is STARTED
print(task_info.is_completed)    # True if finished (success/failed/cancelled)

Task Registration

Basic Task Registration

from gobstopper import Gobstopper

app = Gobstopper()

@app.task("send_email", category="notifications")
async def send_email(to: str, subject: str, body: str):
    """Send email notification"""
    # Simulate email sending
    await asyncio.sleep(2)
    
    # Send via email service
    result = await email_service.send(to, subject, body)
    
    return {
        "message_id": result.id,
        "status": "sent",
        "to": to,
        "sent_at": datetime.utcnow().isoformat()
    }

@app.task("process_image", category="media")
async def process_image(image_path: str, operations: list):
    """Process uploaded image"""
    processed_path = f"processed_{image_path}"
    
    for operation in operations:
        if operation == "resize":
            await resize_image(image_path, (800, 600))
        elif operation == "watermark":
            await add_watermark(image_path)
        elif operation == "compress":
            await compress_image(image_path)
    
    return {
        "original": image_path,
        "processed": processed_path,
        "operations": operations
    }

# Synchronous tasks are also supported
@app.task("generate_report", category="reports")
def generate_report(user_id: int, report_type: str):
    """Generate PDF report (sync function)"""
    # Sync task - will be run in thread executor
    data = fetch_report_data(user_id, report_type)
    pdf_path = create_pdf_report(data)
    return {"report_path": pdf_path, "user_id": user_id}

Advanced Task Registration

# Task with custom retry logic
@app.task("critical_operation", category="system")
async def critical_operation(data: dict):
    """Critical system operation with custom error handling"""
    try:
        # Attempt operation
        result = await perform_critical_operation(data)
        return result
    except TemporaryError as e:
        # Let retry system handle temporary errors
        raise e
    except PermanentError as e:
        # Mark as failed immediately for permanent errors
        raise RuntimeError(f"Permanent failure: {e}")

# Task with progress reporting
@app.task("bulk_import", category="data")
async def bulk_import(file_path: str):
    """Import large dataset with progress reporting"""
    records = await load_records(file_path)
    total_records = len(records)
    
    imported = 0
    for i, record in enumerate(records):
        await import_record(record)
        imported += 1
        
        # Update progress (this would need task context in real implementation)
        progress = (imported / total_records) * 100
        # await update_task_progress(progress, f"Imported {imported}/{total_records}")
        
        if i % 100 == 0:  # Update every 100 records
            await asyncio.sleep(0.1)  # Yield control
    
    return {
        "imported_count": imported,
        "total_records": total_records,
        "file_path": file_path
    }

Task Queuing

Basic Task Queuing

from gobstopper.tasks.queue import TaskPriority

@app.post("/api/send-notification")
async def queue_notification(request):
    data = await request.json()
    
    # Queue high-priority email task
    task_id = await app.add_background_task(
        "send_email",
        category="notifications",
        priority=TaskPriority.HIGH,
        max_retries=3,
        to=data["email"],
        subject=data["subject"],
        body=data["message"]
    )
    
    return {"task_id": task_id, "status": "queued"}

@app.post("/api/process-upload")
async def process_upload(request):
    data = await request.json()
    
    # Queue normal priority image processing
    task_id = await app.add_background_task(
        "process_image",
        category="media",
        priority=TaskPriority.NORMAL,
        max_retries=2,
        image_path=data["image_path"],
        operations=data["operations"]
    )
    
    return {"task_id": task_id}

@app.post("/api/generate-report")
async def request_report(request):
    data = await request.json()
    
    # Queue low priority report generation
    task_id = await app.add_background_task(
        "generate_report",
        category="reports",
        priority=TaskPriority.LOW,
        max_retries=1,
        user_id=data["user_id"],
        report_type=data["type"]
    )
    
    return {"task_id": task_id, "estimated_time": "5-10 minutes"}

Batch Task Queuing

@app.post("/api/bulk-operations")
async def queue_bulk_operations(request):
    data = await request.json()
    operations = data["operations"]
    
    task_ids = []
    for operation in operations:
        task_id = await app.add_background_task(
            operation["task_name"],
            category=operation.get("category", "default"),
            priority=TaskPriority(operation.get("priority", 2)),
            max_retries=operation.get("max_retries", 3),
            **operation["params"]
        )
        task_ids.append(task_id)
    
    return {
        "batch_id": str(uuid.uuid4()),
        "task_ids": task_ids,
        "total_tasks": len(task_ids)
    }

# Schedule recurring tasks
@app.on_startup
async def schedule_periodic_tasks():
    """Schedule periodic maintenance tasks"""
    
    # Daily cleanup at 2 AM
    await schedule_daily_task(
        "cleanup_old_data", 
        category="maintenance",
        priority=TaskPriority.LOW,
        hour=2
    )
    
    # Hourly health checks
    await schedule_hourly_task(
        "system_health_check",
        category="monitoring", 
        priority=TaskPriority.HIGH
    )

Worker Management

Starting Workers

@app.on_startup
async def start_background_workers():
    """Initialize background task workers"""
    
    # High-throughput notification workers
    await app.start_task_workers("notifications", worker_count=5)
    
    # CPU-intensive media processing (fewer workers)
    await app.start_task_workers("media", worker_count=2)
    
    # Single report generator (sequential processing)
    await app.start_task_workers("reports", worker_count=1)
    
    # System maintenance workers
    await app.start_task_workers("maintenance", worker_count=1)
    
    # General purpose workers
    await app.start_task_workers("default", worker_count=3)

# Dynamic worker scaling
@app.post("/api/scale-workers")
async def scale_workers(request):
    data = await request.json()
    
    category = data["category"]
    worker_count = data["worker_count"]
    
    # Stop existing workers for category (if needed)
    if category in app.task_queue.workers:
        for worker in app.task_queue.workers[category]:
            worker.cancel()
        app.task_queue.workers[category].clear()
    
    # Start new workers
    await app.start_task_workers(category, worker_count)
    
    return {
        "category": category,
        "worker_count": worker_count,
        "status": "scaled"
    }

Worker Categories and Routing

# Workers automatically process tasks from their assigned category
# Category-based routing allows:

# 1. Specialized workers for different task types
notifications_worker   # Handles email, SMS, push notifications
media_worker          # Handles image/video processing  
reports_worker        # Handles PDF generation, data exports
system_worker         # Handles backups, cleanup, maintenance

# 2. Resource isolation
cpu_intensive_category  # Limited workers to prevent CPU overload
io_intensive_category   # More workers for I/O bound tasks
memory_heavy_category   # Controlled memory usage

# 3. Priority handling within categories
# Tasks within same category are processed by priority order

Task Monitoring

Task Status Queries

@app.get("/api/tasks/{task_id}")
async def get_task_status(request, task_id: str):
    """Get detailed task status"""
    task_info = await app.task_queue.get_task_info(task_id)
    
    if not task_info:
        return {"error": "Task not found"}, 404
    
    return {
        "task_id": task_info.id,
        "name": task_info.name,
        "status": task_info.status.value,
        "priority": task_info.priority.value,
        "category": task_info.category,
        "progress": task_info.progress,
        "progress_message": task_info.progress_message,
        "created_at": task_info.created_at.isoformat(),
        "started_at": task_info.started_at.isoformat() if task_info.started_at else None,
        "completed_at": task_info.completed_at.isoformat() if task_info.completed_at else None,
        "elapsed_seconds": task_info.elapsed_seconds,
        "retry_count": task_info.retry_count,
        "max_retries": task_info.max_retries,
        "result": task_info.result,
        "error": task_info.error
    }

@app.get("/api/tasks")
async def list_tasks(request):
    """List recent tasks with filtering"""
    # Get query parameters
    category = request.args.get("category", [None])[0]
    status = request.args.get("status", [None])[0]
    limit = int(request.args.get("limit", ["50"])[0])
    offset = int(request.args.get("offset", ["0"])[0])
    
    # Parse status enum if provided
    status_enum = None
    if status:
        try:
            status_enum = TaskStatus(status)
        except ValueError:
            return {"error": f"Invalid status: {status}"}, 400
    
    # Query tasks from storage
    tasks = app.task_queue.storage.get_tasks(
        category=category,
        status=status_enum,
        limit=limit,
        offset=offset
    )
    
    return {
        "tasks": [
            {
                "id": task.id,
                "name": task.name,
                "status": task.status.value,
                "category": task.category,
                "created_at": task.created_at.isoformat(),
                "progress": task.progress
            }
            for task in tasks
        ],
        "count": len(tasks),
        "offset": offset,
        "limit": limit
    }

@app.get("/api/task-stats")
async def get_task_stats(request):
    """Get aggregate task statistics"""
    stats = await app.task_queue.get_task_stats()
    
    # Add queue information
    queue_info = {}
    for category, queue in app.task_queue.queues.items():
        queue_info[category] = {
            "queued_tasks": queue.qsize(),
            "worker_count": len(app.task_queue.workers.get(category, []))
        }
    
    return {
        "overall": stats,
        "queues": queue_info,
        "registered_tasks": list(app.task_queue.task_functions.keys())
    }

Real-time Task Monitoring

# WebSocket endpoint for real-time task updates
@app.websocket("/ws/task-monitor")
async def task_monitor(websocket):
    await websocket.accept()
    
    try:
        while True:
            # Send current statistics
            stats = await app.task_queue.get_task_stats()
            await websocket.send_text(json.dumps({
                "type": "stats_update",
                "data": stats,
                "timestamp": datetime.utcnow().isoformat()
            }))
            
            # Send running tasks
            running_tasks = [
                {
                    "id": task.id,
                    "name": task.name,
                    "progress": task.progress,
                    "progress_message": task.progress_message,
                    "elapsed": (datetime.now() - task.started_at).total_seconds()
                }
                for task in app.task_queue.running_tasks.values()
            ]
            
            await websocket.send_text(json.dumps({
                "type": "running_tasks",
                "data": running_tasks,
                "count": len(running_tasks)
            }))
            
            await asyncio.sleep(2)  # Update every 2 seconds
            
    except ConnectionError:
        pass

# Task completion notifications
async def notify_task_completion(task_info: TaskInfo):
    """Notify clients when tasks complete"""
    notification = {
        "type": "task_completed",
        "task_id": task_info.id,
        "name": task_info.name,
        "status": task_info.status.value,
        "result": task_info.result,
        "elapsed_seconds": task_info.elapsed_seconds
    }
    
    # Broadcast to monitoring clients
    await websocket_manager.broadcast_to_room(
        "task_monitor", 
        json.dumps(notification)
    )

Task Management Operations

Task Control

@app.post("/api/tasks/{task_id}/cancel")
async def cancel_task(request, task_id: str):
    """Cancel a pending task"""
    success = await app.task_queue.cancel_task(task_id)
    
    if success:
        return {"message": "Task cancelled successfully"}
    else:
        return {"error": "Task not found or cannot be cancelled"}, 400

@app.post("/api/tasks/{task_id}/retry")
async def retry_task(request, task_id: str):
    """Retry a failed task"""
    success = await app.task_queue.retry_task(task_id)
    
    if success:
        return {"message": "Task queued for retry"}
    else:
        return {"error": "Task not found or not in failed state"}, 400

@app.delete("/api/tasks/{task_id}")
async def delete_task(request, task_id: str):
    """Delete task record"""
    # Custom deletion logic
    task_info = await app.task_queue.get_task_info(task_id)
    
    if not task_info:
        return {"error": "Task not found"}, 404
    
    if task_info.is_running:
        return {"error": "Cannot delete running task"}, 400
    
    # Delete from storage
    app.task_queue.storage.connection.execute(
        "DELETE FROM tasks WHERE id = ?", (task_id,)
    )
    
    return {"message": "Task deleted"}

Bulk Operations

@app.post("/api/tasks/bulk-retry")
async def bulk_retry_failed_tasks(request):
    """Retry all failed tasks in a category"""
    data = await request.json()
    category = data.get("category")
    
    # Get all failed tasks
    failed_tasks = app.task_queue.storage.get_tasks(
        category=category,
        status=TaskStatus.FAILED,
        limit=1000
    )
    
    retry_count = 0
    for task in failed_tasks:
        if await app.task_queue.retry_task(task.id):
            retry_count += 1
    
    return {
        "retried_count": retry_count,
        "total_failed": len(failed_tasks)
    }

@app.post("/api/tasks/cleanup")
async def cleanup_old_tasks(request):
    """Clean up old completed tasks"""
    data = await request.json()
    
    days = data.get("days", 30)
    deleted_count = app.task_queue.storage.cleanup_old_tasks(days=days)
    
    return {
        "deleted_count": deleted_count,
        "cutoff_days": days
    }

Advanced Features

Custom Task Progress Tracking

# Task progress would be implemented through a context manager
# or progress callback in a full implementation

class TaskProgressTracker:
    def __init__(self, task_id: str, task_queue):
        self.task_id = task_id
        self.task_queue = task_queue
    
    async def update_progress(self, percentage: float, message: str = ""):
        """Update task progress"""
        if self.task_id in self.task_queue.running_tasks:
            task_info = self.task_queue.running_tasks[self.task_id]
            task_info.progress = percentage
            task_info.progress_message = message
            self.task_queue.storage.save_task(task_info)

@app.task("long_running_task", category="processing")
async def long_running_task(data_size: int):
    """Example of task with progress tracking"""
    # In a full implementation, task context would be injected
    # progress = TaskProgressTracker(current_task_id, task_queue)
    
    total_items = data_size
    processed = 0
    
    for i in range(total_items):
        # Process item
        await process_item(i)
        processed += 1
        
        # Update progress every 10%
        if processed % (total_items // 10) == 0:
            percentage = (processed / total_items) * 100
            # await progress.update_progress(percentage, f"Processed {processed}/{total_items}")
        
        await asyncio.sleep(0.1)  # Simulate work
    
    return {"processed_items": processed}

Task Dependencies

# Task dependency system (conceptual - would need implementation)
class TaskDependency:
    def __init__(self, task_queue):
        self.task_queue = task_queue
        self.dependencies = {}  # task_id -> [dependency_task_ids]
    
    async def add_dependent_task(self, parent_task_id: str, child_task_name: str, **kwargs):
        """Add task that depends on parent completion"""
        # Wait for parent task completion
        parent_task = await self.wait_for_task_completion(parent_task_id)
        
        if parent_task.status == TaskStatus.SUCCESS:
            # Parent succeeded, queue child task
            child_task_id = await self.task_queue.add_task(
                child_task_name, **kwargs
            )
            return child_task_id
        else:
            # Parent failed, handle accordingly
            raise RuntimeError(f"Parent task {parent_task_id} failed")

# Usage example
@app.post("/api/workflow")
async def start_workflow(request):
    data = await request.json()
    
    # Step 1: Process uploaded file
    process_task_id = await app.add_background_task(
        "process_file",
        file_path=data["file_path"]
    )
    
    # Step 2: Generate report (depends on Step 1)
    report_task_id = await dependency_manager.add_dependent_task(
        process_task_id,
        "generate_report",
        processed_data=f"output_of_{process_task_id}"
    )
    
    # Step 3: Send notification (depends on Step 2)  
    notify_task_id = await dependency_manager.add_dependent_task(
        report_task_id,
        "send_notification",
        message="Report ready for download"
    )
    
    return {
        "workflow_id": str(uuid.uuid4()),
        "tasks": [process_task_id, report_task_id, notify_task_id]
    }

Storage and Persistence

DuckDB Storage Features

# The TaskStorage class provides:

# 1. Efficient indexing for fast queries
storage.connection.execute("""
    CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category);
    CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
    CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at);
    CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority);
""")

# 2. JSON storage for complex task data
task_dict['result'] = json.dumps(task_result)
task_dict['args'] = json.dumps(task_arguments)
task_dict['kwargs'] = json.dumps(task_keyword_arguments)

# 3. Intelligent cleanup of old tasks
deleted_count = storage.cleanup_old_tasks(days=30)  # Remove tasks older than 30 days

# 4. Rich querying capabilities
recent_failed_tasks = storage.get_tasks(
    status=TaskStatus.FAILED,
    limit=100
)

high_priority_media_tasks = storage.connection.execute("""
    SELECT * FROM tasks 
    WHERE category = 'media' AND priority >= 3
    ORDER BY created_at DESC
""").fetchall()

Performance Optimization

# Database connection pooling (for high-volume scenarios)
class PooledTaskStorage:
    def __init__(self, pool_size: int = 5):
        self.pool = []
        for _ in range(pool_size):
            conn = duckdb.connect("gobstopper_tasks.duckdb")
            self.pool.append(conn)
        self.current = 0
    
    def get_connection(self):
        conn = self.pool[self.current]
        self.current = (self.current + 1) % len(self.pool)
        return conn

# Batch task operations
async def bulk_queue_tasks(task_specs: list):
    """Queue multiple tasks efficiently"""
    task_ids = []
    
    # Prepare all tasks
    for spec in task_specs:
        task_id = await app.task_queue.add_task(**spec)
        task_ids.append(task_id)
    
    return task_ids

# Task archival for long-term storage
async def archive_completed_tasks(cutoff_date: datetime):
    """Archive old tasks to separate storage"""
    old_tasks = app.task_queue.storage.connection.execute("""
        SELECT * FROM tasks 
        WHERE completed_at < ? AND status IN ('success', 'failed')
    """, (cutoff_date.isoformat(),)).fetchall()
    
    # Save to archive (file, external DB, etc.)
    await save_to_archive(old_tasks)
    
    # Remove from active storage
    deleted_count = app.task_queue.storage.cleanup_old_tasks(
        days=(datetime.now() - cutoff_date).days
    )
    
    return len(old_tasks), deleted_count

Integration Examples

Complete Task Management Dashboard

@app.get("/dashboard/tasks")
async def task_dashboard(request):
    """Task management dashboard"""
    # Get recent stats
    stats = await app.task_queue.get_task_stats()
    
    # Get running tasks
    running_tasks = list(app.task_queue.running_tasks.values())
    
    # Get recent completed tasks
    recent_tasks = app.task_queue.storage.get_tasks(limit=20)
    
    # Get failed tasks needing attention
    failed_tasks = app.task_queue.storage.get_tasks(
        status=TaskStatus.FAILED,
        limit=10
    )
    
    return await app.render_template("task_dashboard.html",
        stats=stats,
        running_tasks=running_tasks,
        recent_tasks=recent_tasks,
        failed_tasks=failed_tasks
    )

# API for task management UI
@app.post("/api/admin/restart-workers")
async def restart_all_workers(request):
    """Restart all background workers"""
    # Graceful shutdown
    await app.task_queue.shutdown()
    
    # Restart workers
    await start_background_workers()
    
    return {"status": "workers_restarted"}

@app.get("/api/admin/system-health")
async def system_health(request):
    """Check task system health"""
    health = {
        "database_connection": True,
        "workers": {},
        "queue_sizes": {},
        "recent_failures": 0
    }
    
    try:
        # Test database connection
        app.task_queue.storage.connection.execute("SELECT 1").fetchone()
    except Exception:
        health["database_connection"] = False
    
    # Check worker status
    for category, workers in app.task_queue.workers.items():
        health["workers"][category] = {
            "count": len(workers),
            "healthy": sum(1 for w in workers if not w.done())
        }
    
    # Check queue sizes
    for category, queue in app.task_queue.queues.items():
        health["queue_sizes"][category] = queue.qsize()
    
    # Count recent failures
    recent_failures = app.task_queue.storage.get_tasks(
        status=TaskStatus.FAILED,
        limit=100
    )
    health["recent_failures"] = len([
        t for t in recent_failures 
        if t.completed_at and 
        (datetime.now() - t.completed_at).total_seconds() < 3600  # Last hour
    ])
    
    return health

The background task system in Gobstopper provides enterprise-grade task management with persistence, monitoring, and operational tools for building robust asynchronous applications.