# Background Task System Gobstopper includes a sophisticated background task system for handling long-running operations, periodic jobs, and asynchronous work. The system features DuckDB persistence, priority queues, automatic retries, and intelligent worker management. ## Overview The background task system consists of: - **TaskQueue**: Main queue manager with worker coordination - **TaskInfo**: Rich task metadata and status tracking - **TaskStorage**: DuckDB-based persistence with intelligent cleanup - **Priority System**: Four-level priority system with queue management - **Retry Logic**: Configurable automatic retry with exponential backoff ## Core Components ### Task Models #### TaskStatus Enum ```python from gobstopper.tasks.models import TaskStatus # Available task statuses TaskStatus.PENDING # Task queued, waiting for worker TaskStatus.STARTED # Task currently executing TaskStatus.SUCCESS # Task completed successfully TaskStatus.FAILED # Task failed after all retries TaskStatus.CANCELLED # Task was cancelled before execution TaskStatus.RETRY # Task failed but will retry ``` #### TaskPriority Enum ```python from gobstopper.tasks.models import TaskPriority # Priority levels (higher number = higher priority) TaskPriority.LOW # Value: 1 - Background cleanup, maintenance TaskPriority.NORMAL # Value: 2 - Regular user requests TaskPriority.HIGH # Value: 3 - Important operations TaskPriority.CRITICAL # Value: 4 - System-critical tasks ``` #### TaskInfo Class ```python from gobstopper.tasks.models import TaskInfo # Task information contains: task_info = TaskInfo( id="uuid-string", # Unique task identifier name="send_email", # Registered task name category="notifications", # Task category for worker routing priority=TaskPriority.HIGH, # Task priority status=TaskStatus.PENDING, # Current status created_at=datetime.now(), # Creation timestamp started_at=None, # Execution start time completed_at=None, # Completion timestamp elapsed_seconds=0.0, # Execution duration result=None, # Task return value error=None, # Error message if failed retry_count=0, # Current retry attempt max_retries=3, # Maximum retry attempts args=(), # Positional arguments kwargs={}, # Keyword arguments progress=0.0, # Progress percentage (0-100) progress_message="" # Progress description ) # Convenience properties print(task_info.is_running) # True if status is STARTED print(task_info.is_completed) # True if finished (success/failed/cancelled) ``` ## Task Registration ### Basic Task Registration ```python from gobstopper import Gobstopper app = Gobstopper() @app.task("send_email", category="notifications") async def send_email(to: str, subject: str, body: str): """Send email notification""" # Simulate email sending await asyncio.sleep(2) # Send via email service result = await email_service.send(to, subject, body) return { "message_id": result.id, "status": "sent", "to": to, "sent_at": datetime.utcnow().isoformat() } @app.task("process_image", category="media") async def process_image(image_path: str, operations: list): """Process uploaded image""" processed_path = f"processed_{image_path}" for operation in operations: if operation == "resize": await resize_image(image_path, (800, 600)) elif operation == "watermark": await add_watermark(image_path) elif operation == "compress": await compress_image(image_path) return { "original": image_path, "processed": processed_path, "operations": operations } # Synchronous tasks are also supported @app.task("generate_report", category="reports") def generate_report(user_id: int, report_type: str): """Generate PDF report (sync function)""" # Sync task - will be run in thread executor data = fetch_report_data(user_id, report_type) pdf_path = create_pdf_report(data) return {"report_path": pdf_path, "user_id": user_id} ``` ### Advanced Task Registration ```python # Task with custom retry logic @app.task("critical_operation", category="system") async def critical_operation(data: dict): """Critical system operation with custom error handling""" try: # Attempt operation result = await perform_critical_operation(data) return result except TemporaryError as e: # Let retry system handle temporary errors raise e except PermanentError as e: # Mark as failed immediately for permanent errors raise RuntimeError(f"Permanent failure: {e}") # Task with progress reporting @app.task("bulk_import", category="data") async def bulk_import(file_path: str): """Import large dataset with progress reporting""" records = await load_records(file_path) total_records = len(records) imported = 0 for i, record in enumerate(records): await import_record(record) imported += 1 # Update progress (this would need task context in real implementation) progress = (imported / total_records) * 100 # await update_task_progress(progress, f"Imported {imported}/{total_records}") if i % 100 == 0: # Update every 100 records await asyncio.sleep(0.1) # Yield control return { "imported_count": imported, "total_records": total_records, "file_path": file_path } ``` ## Task Queuing ### Basic Task Queuing ```python from gobstopper.tasks.queue import TaskPriority @app.post("/api/send-notification") async def queue_notification(request): data = await request.json() # Queue high-priority email task task_id = await app.add_background_task( "send_email", category="notifications", priority=TaskPriority.HIGH, max_retries=3, to=data["email"], subject=data["subject"], body=data["message"] ) return {"task_id": task_id, "status": "queued"} @app.post("/api/process-upload") async def process_upload(request): data = await request.json() # Queue normal priority image processing task_id = await app.add_background_task( "process_image", category="media", priority=TaskPriority.NORMAL, max_retries=2, image_path=data["image_path"], operations=data["operations"] ) return {"task_id": task_id} @app.post("/api/generate-report") async def request_report(request): data = await request.json() # Queue low priority report generation task_id = await app.add_background_task( "generate_report", category="reports", priority=TaskPriority.LOW, max_retries=1, user_id=data["user_id"], report_type=data["type"] ) return {"task_id": task_id, "estimated_time": "5-10 minutes"} ``` ### Batch Task Queuing ```python @app.post("/api/bulk-operations") async def queue_bulk_operations(request): data = await request.json() operations = data["operations"] task_ids = [] for operation in operations: task_id = await app.add_background_task( operation["task_name"], category=operation.get("category", "default"), priority=TaskPriority(operation.get("priority", 2)), max_retries=operation.get("max_retries", 3), **operation["params"] ) task_ids.append(task_id) return { "batch_id": str(uuid.uuid4()), "task_ids": task_ids, "total_tasks": len(task_ids) } # Schedule recurring tasks @app.on_startup async def schedule_periodic_tasks(): """Schedule periodic maintenance tasks""" # Daily cleanup at 2 AM await schedule_daily_task( "cleanup_old_data", category="maintenance", priority=TaskPriority.LOW, hour=2 ) # Hourly health checks await schedule_hourly_task( "system_health_check", category="monitoring", priority=TaskPriority.HIGH ) ``` ## Worker Management ### Starting Workers ```python @app.on_startup async def start_background_workers(): """Initialize background task workers""" # High-throughput notification workers await app.start_task_workers("notifications", worker_count=5) # CPU-intensive media processing (fewer workers) await app.start_task_workers("media", worker_count=2) # Single report generator (sequential processing) await app.start_task_workers("reports", worker_count=1) # System maintenance workers await app.start_task_workers("maintenance", worker_count=1) # General purpose workers await app.start_task_workers("default", worker_count=3) # Dynamic worker scaling @app.post("/api/scale-workers") async def scale_workers(request): data = await request.json() category = data["category"] worker_count = data["worker_count"] # Stop existing workers for category (if needed) if category in app.task_queue.workers: for worker in app.task_queue.workers[category]: worker.cancel() app.task_queue.workers[category].clear() # Start new workers await app.start_task_workers(category, worker_count) return { "category": category, "worker_count": worker_count, "status": "scaled" } ``` ### Worker Categories and Routing ```python # Workers automatically process tasks from their assigned category # Category-based routing allows: # 1. Specialized workers for different task types notifications_worker # Handles email, SMS, push notifications media_worker # Handles image/video processing reports_worker # Handles PDF generation, data exports system_worker # Handles backups, cleanup, maintenance # 2. Resource isolation cpu_intensive_category # Limited workers to prevent CPU overload io_intensive_category # More workers for I/O bound tasks memory_heavy_category # Controlled memory usage # 3. Priority handling within categories # Tasks within same category are processed by priority order ``` ## Task Monitoring ### Task Status Queries ```python @app.get("/api/tasks/{task_id}") async def get_task_status(request, task_id: str): """Get detailed task status""" task_info = await app.task_queue.get_task_info(task_id) if not task_info: return {"error": "Task not found"}, 404 return { "task_id": task_info.id, "name": task_info.name, "status": task_info.status.value, "priority": task_info.priority.value, "category": task_info.category, "progress": task_info.progress, "progress_message": task_info.progress_message, "created_at": task_info.created_at.isoformat(), "started_at": task_info.started_at.isoformat() if task_info.started_at else None, "completed_at": task_info.completed_at.isoformat() if task_info.completed_at else None, "elapsed_seconds": task_info.elapsed_seconds, "retry_count": task_info.retry_count, "max_retries": task_info.max_retries, "result": task_info.result, "error": task_info.error } @app.get("/api/tasks") async def list_tasks(request): """List recent tasks with filtering""" # Get query parameters category = request.args.get("category", [None])[0] status = request.args.get("status", [None])[0] limit = int(request.args.get("limit", ["50"])[0]) offset = int(request.args.get("offset", ["0"])[0]) # Parse status enum if provided status_enum = None if status: try: status_enum = TaskStatus(status) except ValueError: return {"error": f"Invalid status: {status}"}, 400 # Query tasks from storage tasks = app.task_queue.storage.get_tasks( category=category, status=status_enum, limit=limit, offset=offset ) return { "tasks": [ { "id": task.id, "name": task.name, "status": task.status.value, "category": task.category, "created_at": task.created_at.isoformat(), "progress": task.progress } for task in tasks ], "count": len(tasks), "offset": offset, "limit": limit } @app.get("/api/task-stats") async def get_task_stats(request): """Get aggregate task statistics""" stats = await app.task_queue.get_task_stats() # Add queue information queue_info = {} for category, queue in app.task_queue.queues.items(): queue_info[category] = { "queued_tasks": queue.qsize(), "worker_count": len(app.task_queue.workers.get(category, [])) } return { "overall": stats, "queues": queue_info, "registered_tasks": list(app.task_queue.task_functions.keys()) } ``` ### Real-time Task Monitoring ```python # WebSocket endpoint for real-time task updates @app.websocket("/ws/task-monitor") async def task_monitor(websocket): await websocket.accept() try: while True: # Send current statistics stats = await app.task_queue.get_task_stats() await websocket.send_text(json.dumps({ "type": "stats_update", "data": stats, "timestamp": datetime.utcnow().isoformat() })) # Send running tasks running_tasks = [ { "id": task.id, "name": task.name, "progress": task.progress, "progress_message": task.progress_message, "elapsed": (datetime.now() - task.started_at).total_seconds() } for task in app.task_queue.running_tasks.values() ] await websocket.send_text(json.dumps({ "type": "running_tasks", "data": running_tasks, "count": len(running_tasks) })) await asyncio.sleep(2) # Update every 2 seconds except ConnectionError: pass # Task completion notifications async def notify_task_completion(task_info: TaskInfo): """Notify clients when tasks complete""" notification = { "type": "task_completed", "task_id": task_info.id, "name": task_info.name, "status": task_info.status.value, "result": task_info.result, "elapsed_seconds": task_info.elapsed_seconds } # Broadcast to monitoring clients await websocket_manager.broadcast_to_room( "task_monitor", json.dumps(notification) ) ``` ## Task Management Operations ### Task Control ```python @app.post("/api/tasks/{task_id}/cancel") async def cancel_task(request, task_id: str): """Cancel a pending task""" success = await app.task_queue.cancel_task(task_id) if success: return {"message": "Task cancelled successfully"} else: return {"error": "Task not found or cannot be cancelled"}, 400 @app.post("/api/tasks/{task_id}/retry") async def retry_task(request, task_id: str): """Retry a failed task""" success = await app.task_queue.retry_task(task_id) if success: return {"message": "Task queued for retry"} else: return {"error": "Task not found or not in failed state"}, 400 @app.delete("/api/tasks/{task_id}") async def delete_task(request, task_id: str): """Delete task record""" # Custom deletion logic task_info = await app.task_queue.get_task_info(task_id) if not task_info: return {"error": "Task not found"}, 404 if task_info.is_running: return {"error": "Cannot delete running task"}, 400 # Delete from storage app.task_queue.storage.connection.execute( "DELETE FROM tasks WHERE id = ?", (task_id,) ) return {"message": "Task deleted"} ``` ### Bulk Operations ```python @app.post("/api/tasks/bulk-retry") async def bulk_retry_failed_tasks(request): """Retry all failed tasks in a category""" data = await request.json() category = data.get("category") # Get all failed tasks failed_tasks = app.task_queue.storage.get_tasks( category=category, status=TaskStatus.FAILED, limit=1000 ) retry_count = 0 for task in failed_tasks: if await app.task_queue.retry_task(task.id): retry_count += 1 return { "retried_count": retry_count, "total_failed": len(failed_tasks) } @app.post("/api/tasks/cleanup") async def cleanup_old_tasks(request): """Clean up old completed tasks""" data = await request.json() days = data.get("days", 30) deleted_count = app.task_queue.storage.cleanup_old_tasks(days=days) return { "deleted_count": deleted_count, "cutoff_days": days } ``` ## Advanced Features ### Custom Task Progress Tracking ```python # Task progress would be implemented through a context manager # or progress callback in a full implementation class TaskProgressTracker: def __init__(self, task_id: str, task_queue): self.task_id = task_id self.task_queue = task_queue async def update_progress(self, percentage: float, message: str = ""): """Update task progress""" if self.task_id in self.task_queue.running_tasks: task_info = self.task_queue.running_tasks[self.task_id] task_info.progress = percentage task_info.progress_message = message self.task_queue.storage.save_task(task_info) @app.task("long_running_task", category="processing") async def long_running_task(data_size: int): """Example of task with progress tracking""" # In a full implementation, task context would be injected # progress = TaskProgressTracker(current_task_id, task_queue) total_items = data_size processed = 0 for i in range(total_items): # Process item await process_item(i) processed += 1 # Update progress every 10% if processed % (total_items // 10) == 0: percentage = (processed / total_items) * 100 # await progress.update_progress(percentage, f"Processed {processed}/{total_items}") await asyncio.sleep(0.1) # Simulate work return {"processed_items": processed} ``` ### Task Dependencies ```python # Task dependency system (conceptual - would need implementation) class TaskDependency: def __init__(self, task_queue): self.task_queue = task_queue self.dependencies = {} # task_id -> [dependency_task_ids] async def add_dependent_task(self, parent_task_id: str, child_task_name: str, **kwargs): """Add task that depends on parent completion""" # Wait for parent task completion parent_task = await self.wait_for_task_completion(parent_task_id) if parent_task.status == TaskStatus.SUCCESS: # Parent succeeded, queue child task child_task_id = await self.task_queue.add_task( child_task_name, **kwargs ) return child_task_id else: # Parent failed, handle accordingly raise RuntimeError(f"Parent task {parent_task_id} failed") # Usage example @app.post("/api/workflow") async def start_workflow(request): data = await request.json() # Step 1: Process uploaded file process_task_id = await app.add_background_task( "process_file", file_path=data["file_path"] ) # Step 2: Generate report (depends on Step 1) report_task_id = await dependency_manager.add_dependent_task( process_task_id, "generate_report", processed_data=f"output_of_{process_task_id}" ) # Step 3: Send notification (depends on Step 2) notify_task_id = await dependency_manager.add_dependent_task( report_task_id, "send_notification", message="Report ready for download" ) return { "workflow_id": str(uuid.uuid4()), "tasks": [process_task_id, report_task_id, notify_task_id] } ``` ## Storage and Persistence ### DuckDB Storage Features ```python # The TaskStorage class provides: # 1. Efficient indexing for fast queries storage.connection.execute(""" CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category); CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at); CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority); """) # 2. JSON storage for complex task data task_dict['result'] = json.dumps(task_result) task_dict['args'] = json.dumps(task_arguments) task_dict['kwargs'] = json.dumps(task_keyword_arguments) # 3. Intelligent cleanup of old tasks deleted_count = storage.cleanup_old_tasks(days=30) # Remove tasks older than 30 days # 4. Rich querying capabilities recent_failed_tasks = storage.get_tasks( status=TaskStatus.FAILED, limit=100 ) high_priority_media_tasks = storage.connection.execute(""" SELECT * FROM tasks WHERE category = 'media' AND priority >= 3 ORDER BY created_at DESC """).fetchall() ``` ### Performance Optimization ```python # Database connection pooling (for high-volume scenarios) class PooledTaskStorage: def __init__(self, pool_size: int = 5): self.pool = [] for _ in range(pool_size): conn = duckdb.connect("gobstopper_tasks.duckdb") self.pool.append(conn) self.current = 0 def get_connection(self): conn = self.pool[self.current] self.current = (self.current + 1) % len(self.pool) return conn # Batch task operations async def bulk_queue_tasks(task_specs: list): """Queue multiple tasks efficiently""" task_ids = [] # Prepare all tasks for spec in task_specs: task_id = await app.task_queue.add_task(**spec) task_ids.append(task_id) return task_ids # Task archival for long-term storage async def archive_completed_tasks(cutoff_date: datetime): """Archive old tasks to separate storage""" old_tasks = app.task_queue.storage.connection.execute(""" SELECT * FROM tasks WHERE completed_at < ? AND status IN ('success', 'failed') """, (cutoff_date.isoformat(),)).fetchall() # Save to archive (file, external DB, etc.) await save_to_archive(old_tasks) # Remove from active storage deleted_count = app.task_queue.storage.cleanup_old_tasks( days=(datetime.now() - cutoff_date).days ) return len(old_tasks), deleted_count ``` ## Integration Examples ### Complete Task Management Dashboard ```python @app.get("/dashboard/tasks") async def task_dashboard(request): """Task management dashboard""" # Get recent stats stats = await app.task_queue.get_task_stats() # Get running tasks running_tasks = list(app.task_queue.running_tasks.values()) # Get recent completed tasks recent_tasks = app.task_queue.storage.get_tasks(limit=20) # Get failed tasks needing attention failed_tasks = app.task_queue.storage.get_tasks( status=TaskStatus.FAILED, limit=10 ) return await app.render_template("task_dashboard.html", stats=stats, running_tasks=running_tasks, recent_tasks=recent_tasks, failed_tasks=failed_tasks ) # API for task management UI @app.post("/api/admin/restart-workers") async def restart_all_workers(request): """Restart all background workers""" # Graceful shutdown await app.task_queue.shutdown() # Restart workers await start_background_workers() return {"status": "workers_restarted"} @app.get("/api/admin/system-health") async def system_health(request): """Check task system health""" health = { "database_connection": True, "workers": {}, "queue_sizes": {}, "recent_failures": 0 } try: # Test database connection app.task_queue.storage.connection.execute("SELECT 1").fetchone() except Exception: health["database_connection"] = False # Check worker status for category, workers in app.task_queue.workers.items(): health["workers"][category] = { "count": len(workers), "healthy": sum(1 for w in workers if not w.done()) } # Check queue sizes for category, queue in app.task_queue.queues.items(): health["queue_sizes"][category] = queue.qsize() # Count recent failures recent_failures = app.task_queue.storage.get_tasks( status=TaskStatus.FAILED, limit=100 ) health["recent_failures"] = len([ t for t in recent_failures if t.completed_at and (datetime.now() - t.completed_at).total_seconds() < 3600 # Last hour ]) return health ``` The background task system in Gobstopper provides enterprise-grade task management with persistence, monitoring, and operational tools for building robust asynchronous applications.