Background Task System¶
Gobstopper includes a sophisticated background task system for handling long-running operations, periodic jobs, and asynchronous work. The system features DuckDB persistence, priority queues, automatic retries, and intelligent worker management.
Overview¶
The background task system consists of:
TaskQueue: Main queue manager with worker coordination
TaskInfo: Rich task metadata and status tracking
TaskStorage: DuckDB-based persistence with intelligent cleanup
Priority System: Four-level priority system with queue management
Retry Logic: Configurable automatic retry with exponential backoff
Core Components¶
Task Models¶
TaskStatus Enum¶
from gobstopper.tasks.models import TaskStatus
# Available task statuses
TaskStatus.PENDING # Task queued, waiting for worker
TaskStatus.STARTED # Task currently executing
TaskStatus.SUCCESS # Task completed successfully
TaskStatus.FAILED # Task failed after all retries
TaskStatus.CANCELLED # Task was cancelled before execution
TaskStatus.RETRY # Task failed but will retry
TaskPriority Enum¶
from gobstopper.tasks.models import TaskPriority
# Priority levels (higher number = higher priority)
TaskPriority.LOW # Value: 1 - Background cleanup, maintenance
TaskPriority.NORMAL # Value: 2 - Regular user requests
TaskPriority.HIGH # Value: 3 - Important operations
TaskPriority.CRITICAL # Value: 4 - System-critical tasks
TaskInfo Class¶
from gobstopper.tasks.models import TaskInfo
# Task information contains:
task_info = TaskInfo(
id="uuid-string", # Unique task identifier
name="send_email", # Registered task name
category="notifications", # Task category for worker routing
priority=TaskPriority.HIGH, # Task priority
status=TaskStatus.PENDING, # Current status
created_at=datetime.now(), # Creation timestamp
started_at=None, # Execution start time
completed_at=None, # Completion timestamp
elapsed_seconds=0.0, # Execution duration
result=None, # Task return value
error=None, # Error message if failed
retry_count=0, # Current retry attempt
max_retries=3, # Maximum retry attempts
args=(), # Positional arguments
kwargs={}, # Keyword arguments
progress=0.0, # Progress percentage (0-100)
progress_message="" # Progress description
)
# Convenience properties
print(task_info.is_running) # True if status is STARTED
print(task_info.is_completed) # True if finished (success/failed/cancelled)
Task Registration¶
Basic Task Registration¶
from gobstopper import Gobstopper
app = Gobstopper()
@app.task("send_email", category="notifications")
async def send_email(to: str, subject: str, body: str):
"""Send email notification"""
# Simulate email sending
await asyncio.sleep(2)
# Send via email service
result = await email_service.send(to, subject, body)
return {
"message_id": result.id,
"status": "sent",
"to": to,
"sent_at": datetime.utcnow().isoformat()
}
@app.task("process_image", category="media")
async def process_image(image_path: str, operations: list):
"""Process uploaded image"""
processed_path = f"processed_{image_path}"
for operation in operations:
if operation == "resize":
await resize_image(image_path, (800, 600))
elif operation == "watermark":
await add_watermark(image_path)
elif operation == "compress":
await compress_image(image_path)
return {
"original": image_path,
"processed": processed_path,
"operations": operations
}
# Synchronous tasks are also supported
@app.task("generate_report", category="reports")
def generate_report(user_id: int, report_type: str):
"""Generate PDF report (sync function)"""
# Sync task - will be run in thread executor
data = fetch_report_data(user_id, report_type)
pdf_path = create_pdf_report(data)
return {"report_path": pdf_path, "user_id": user_id}
Advanced Task Registration¶
# Task with custom retry logic
@app.task("critical_operation", category="system")
async def critical_operation(data: dict):
"""Critical system operation with custom error handling"""
try:
# Attempt operation
result = await perform_critical_operation(data)
return result
except TemporaryError as e:
# Let retry system handle temporary errors
raise e
except PermanentError as e:
# Mark as failed immediately for permanent errors
raise RuntimeError(f"Permanent failure: {e}")
# Task with progress reporting
@app.task("bulk_import", category="data")
async def bulk_import(file_path: str):
"""Import large dataset with progress reporting"""
records = await load_records(file_path)
total_records = len(records)
imported = 0
for i, record in enumerate(records):
await import_record(record)
imported += 1
# Update progress (this would need task context in real implementation)
progress = (imported / total_records) * 100
# await update_task_progress(progress, f"Imported {imported}/{total_records}")
if i % 100 == 0: # Update every 100 records
await asyncio.sleep(0.1) # Yield control
return {
"imported_count": imported,
"total_records": total_records,
"file_path": file_path
}
Task Queuing¶
Basic Task Queuing¶
from gobstopper.tasks.queue import TaskPriority
@app.post("/api/send-notification")
async def queue_notification(request):
data = await request.json()
# Queue high-priority email task
task_id = await app.add_background_task(
"send_email",
category="notifications",
priority=TaskPriority.HIGH,
max_retries=3,
to=data["email"],
subject=data["subject"],
body=data["message"]
)
return {"task_id": task_id, "status": "queued"}
@app.post("/api/process-upload")
async def process_upload(request):
data = await request.json()
# Queue normal priority image processing
task_id = await app.add_background_task(
"process_image",
category="media",
priority=TaskPriority.NORMAL,
max_retries=2,
image_path=data["image_path"],
operations=data["operations"]
)
return {"task_id": task_id}
@app.post("/api/generate-report")
async def request_report(request):
data = await request.json()
# Queue low priority report generation
task_id = await app.add_background_task(
"generate_report",
category="reports",
priority=TaskPriority.LOW,
max_retries=1,
user_id=data["user_id"],
report_type=data["type"]
)
return {"task_id": task_id, "estimated_time": "5-10 minutes"}
Batch Task Queuing¶
@app.post("/api/bulk-operations")
async def queue_bulk_operations(request):
data = await request.json()
operations = data["operations"]
task_ids = []
for operation in operations:
task_id = await app.add_background_task(
operation["task_name"],
category=operation.get("category", "default"),
priority=TaskPriority(operation.get("priority", 2)),
max_retries=operation.get("max_retries", 3),
**operation["params"]
)
task_ids.append(task_id)
return {
"batch_id": str(uuid.uuid4()),
"task_ids": task_ids,
"total_tasks": len(task_ids)
}
# Schedule recurring tasks
@app.on_startup
async def schedule_periodic_tasks():
"""Schedule periodic maintenance tasks"""
# Daily cleanup at 2 AM
await schedule_daily_task(
"cleanup_old_data",
category="maintenance",
priority=TaskPriority.LOW,
hour=2
)
# Hourly health checks
await schedule_hourly_task(
"system_health_check",
category="monitoring",
priority=TaskPriority.HIGH
)
Worker Management¶
Starting Workers¶
@app.on_startup
async def start_background_workers():
"""Initialize background task workers"""
# High-throughput notification workers
await app.start_task_workers("notifications", worker_count=5)
# CPU-intensive media processing (fewer workers)
await app.start_task_workers("media", worker_count=2)
# Single report generator (sequential processing)
await app.start_task_workers("reports", worker_count=1)
# System maintenance workers
await app.start_task_workers("maintenance", worker_count=1)
# General purpose workers
await app.start_task_workers("default", worker_count=3)
# Dynamic worker scaling
@app.post("/api/scale-workers")
async def scale_workers(request):
data = await request.json()
category = data["category"]
worker_count = data["worker_count"]
# Stop existing workers for category (if needed)
if category in app.task_queue.workers:
for worker in app.task_queue.workers[category]:
worker.cancel()
app.task_queue.workers[category].clear()
# Start new workers
await app.start_task_workers(category, worker_count)
return {
"category": category,
"worker_count": worker_count,
"status": "scaled"
}
Worker Categories and Routing¶
# Workers automatically process tasks from their assigned category
# Category-based routing allows:
# 1. Specialized workers for different task types
notifications_worker # Handles email, SMS, push notifications
media_worker # Handles image/video processing
reports_worker # Handles PDF generation, data exports
system_worker # Handles backups, cleanup, maintenance
# 2. Resource isolation
cpu_intensive_category # Limited workers to prevent CPU overload
io_intensive_category # More workers for I/O bound tasks
memory_heavy_category # Controlled memory usage
# 3. Priority handling within categories
# Tasks within same category are processed by priority order
Task Monitoring¶
Task Status Queries¶
@app.get("/api/tasks/{task_id}")
async def get_task_status(request, task_id: str):
"""Get detailed task status"""
task_info = await app.task_queue.get_task_info(task_id)
if not task_info:
return {"error": "Task not found"}, 404
return {
"task_id": task_info.id,
"name": task_info.name,
"status": task_info.status.value,
"priority": task_info.priority.value,
"category": task_info.category,
"progress": task_info.progress,
"progress_message": task_info.progress_message,
"created_at": task_info.created_at.isoformat(),
"started_at": task_info.started_at.isoformat() if task_info.started_at else None,
"completed_at": task_info.completed_at.isoformat() if task_info.completed_at else None,
"elapsed_seconds": task_info.elapsed_seconds,
"retry_count": task_info.retry_count,
"max_retries": task_info.max_retries,
"result": task_info.result,
"error": task_info.error
}
@app.get("/api/tasks")
async def list_tasks(request):
"""List recent tasks with filtering"""
# Get query parameters
category = request.args.get("category", [None])[0]
status = request.args.get("status", [None])[0]
limit = int(request.args.get("limit", ["50"])[0])
offset = int(request.args.get("offset", ["0"])[0])
# Parse status enum if provided
status_enum = None
if status:
try:
status_enum = TaskStatus(status)
except ValueError:
return {"error": f"Invalid status: {status}"}, 400
# Query tasks from storage
tasks = app.task_queue.storage.get_tasks(
category=category,
status=status_enum,
limit=limit,
offset=offset
)
return {
"tasks": [
{
"id": task.id,
"name": task.name,
"status": task.status.value,
"category": task.category,
"created_at": task.created_at.isoformat(),
"progress": task.progress
}
for task in tasks
],
"count": len(tasks),
"offset": offset,
"limit": limit
}
@app.get("/api/task-stats")
async def get_task_stats(request):
"""Get aggregate task statistics"""
stats = await app.task_queue.get_task_stats()
# Add queue information
queue_info = {}
for category, queue in app.task_queue.queues.items():
queue_info[category] = {
"queued_tasks": queue.qsize(),
"worker_count": len(app.task_queue.workers.get(category, []))
}
return {
"overall": stats,
"queues": queue_info,
"registered_tasks": list(app.task_queue.task_functions.keys())
}
Real-time Task Monitoring¶
# WebSocket endpoint for real-time task updates
@app.websocket("/ws/task-monitor")
async def task_monitor(websocket):
await websocket.accept()
try:
while True:
# Send current statistics
stats = await app.task_queue.get_task_stats()
await websocket.send_text(json.dumps({
"type": "stats_update",
"data": stats,
"timestamp": datetime.utcnow().isoformat()
}))
# Send running tasks
running_tasks = [
{
"id": task.id,
"name": task.name,
"progress": task.progress,
"progress_message": task.progress_message,
"elapsed": (datetime.now() - task.started_at).total_seconds()
}
for task in app.task_queue.running_tasks.values()
]
await websocket.send_text(json.dumps({
"type": "running_tasks",
"data": running_tasks,
"count": len(running_tasks)
}))
await asyncio.sleep(2) # Update every 2 seconds
except ConnectionError:
pass
# Task completion notifications
async def notify_task_completion(task_info: TaskInfo):
"""Notify clients when tasks complete"""
notification = {
"type": "task_completed",
"task_id": task_info.id,
"name": task_info.name,
"status": task_info.status.value,
"result": task_info.result,
"elapsed_seconds": task_info.elapsed_seconds
}
# Broadcast to monitoring clients
await websocket_manager.broadcast_to_room(
"task_monitor",
json.dumps(notification)
)
Task Management Operations¶
Task Control¶
@app.post("/api/tasks/{task_id}/cancel")
async def cancel_task(request, task_id: str):
"""Cancel a pending task"""
success = await app.task_queue.cancel_task(task_id)
if success:
return {"message": "Task cancelled successfully"}
else:
return {"error": "Task not found or cannot be cancelled"}, 400
@app.post("/api/tasks/{task_id}/retry")
async def retry_task(request, task_id: str):
"""Retry a failed task"""
success = await app.task_queue.retry_task(task_id)
if success:
return {"message": "Task queued for retry"}
else:
return {"error": "Task not found or not in failed state"}, 400
@app.delete("/api/tasks/{task_id}")
async def delete_task(request, task_id: str):
"""Delete task record"""
# Custom deletion logic
task_info = await app.task_queue.get_task_info(task_id)
if not task_info:
return {"error": "Task not found"}, 404
if task_info.is_running:
return {"error": "Cannot delete running task"}, 400
# Delete from storage
app.task_queue.storage.connection.execute(
"DELETE FROM tasks WHERE id = ?", (task_id,)
)
return {"message": "Task deleted"}
Bulk Operations¶
@app.post("/api/tasks/bulk-retry")
async def bulk_retry_failed_tasks(request):
"""Retry all failed tasks in a category"""
data = await request.json()
category = data.get("category")
# Get all failed tasks
failed_tasks = app.task_queue.storage.get_tasks(
category=category,
status=TaskStatus.FAILED,
limit=1000
)
retry_count = 0
for task in failed_tasks:
if await app.task_queue.retry_task(task.id):
retry_count += 1
return {
"retried_count": retry_count,
"total_failed": len(failed_tasks)
}
@app.post("/api/tasks/cleanup")
async def cleanup_old_tasks(request):
"""Clean up old completed tasks"""
data = await request.json()
days = data.get("days", 30)
deleted_count = app.task_queue.storage.cleanup_old_tasks(days=days)
return {
"deleted_count": deleted_count,
"cutoff_days": days
}
Advanced Features¶
Custom Task Progress Tracking¶
# Task progress would be implemented through a context manager
# or progress callback in a full implementation
class TaskProgressTracker:
def __init__(self, task_id: str, task_queue):
self.task_id = task_id
self.task_queue = task_queue
async def update_progress(self, percentage: float, message: str = ""):
"""Update task progress"""
if self.task_id in self.task_queue.running_tasks:
task_info = self.task_queue.running_tasks[self.task_id]
task_info.progress = percentage
task_info.progress_message = message
self.task_queue.storage.save_task(task_info)
@app.task("long_running_task", category="processing")
async def long_running_task(data_size: int):
"""Example of task with progress tracking"""
# In a full implementation, task context would be injected
# progress = TaskProgressTracker(current_task_id, task_queue)
total_items = data_size
processed = 0
for i in range(total_items):
# Process item
await process_item(i)
processed += 1
# Update progress every 10%
if processed % (total_items // 10) == 0:
percentage = (processed / total_items) * 100
# await progress.update_progress(percentage, f"Processed {processed}/{total_items}")
await asyncio.sleep(0.1) # Simulate work
return {"processed_items": processed}
Task Dependencies¶
# Task dependency system (conceptual - would need implementation)
class TaskDependency:
def __init__(self, task_queue):
self.task_queue = task_queue
self.dependencies = {} # task_id -> [dependency_task_ids]
async def add_dependent_task(self, parent_task_id: str, child_task_name: str, **kwargs):
"""Add task that depends on parent completion"""
# Wait for parent task completion
parent_task = await self.wait_for_task_completion(parent_task_id)
if parent_task.status == TaskStatus.SUCCESS:
# Parent succeeded, queue child task
child_task_id = await self.task_queue.add_task(
child_task_name, **kwargs
)
return child_task_id
else:
# Parent failed, handle accordingly
raise RuntimeError(f"Parent task {parent_task_id} failed")
# Usage example
@app.post("/api/workflow")
async def start_workflow(request):
data = await request.json()
# Step 1: Process uploaded file
process_task_id = await app.add_background_task(
"process_file",
file_path=data["file_path"]
)
# Step 2: Generate report (depends on Step 1)
report_task_id = await dependency_manager.add_dependent_task(
process_task_id,
"generate_report",
processed_data=f"output_of_{process_task_id}"
)
# Step 3: Send notification (depends on Step 2)
notify_task_id = await dependency_manager.add_dependent_task(
report_task_id,
"send_notification",
message="Report ready for download"
)
return {
"workflow_id": str(uuid.uuid4()),
"tasks": [process_task_id, report_task_id, notify_task_id]
}
Storage and Persistence¶
DuckDB Storage Features¶
# The TaskStorage class provides:
# 1. Efficient indexing for fast queries
storage.connection.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at);
CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority);
""")
# 2. JSON storage for complex task data
task_dict['result'] = json.dumps(task_result)
task_dict['args'] = json.dumps(task_arguments)
task_dict['kwargs'] = json.dumps(task_keyword_arguments)
# 3. Intelligent cleanup of old tasks
deleted_count = storage.cleanup_old_tasks(days=30) # Remove tasks older than 30 days
# 4. Rich querying capabilities
recent_failed_tasks = storage.get_tasks(
status=TaskStatus.FAILED,
limit=100
)
high_priority_media_tasks = storage.connection.execute("""
SELECT * FROM tasks
WHERE category = 'media' AND priority >= 3
ORDER BY created_at DESC
""").fetchall()
Performance Optimization¶
# Database connection pooling (for high-volume scenarios)
class PooledTaskStorage:
def __init__(self, pool_size: int = 5):
self.pool = []
for _ in range(pool_size):
conn = duckdb.connect("gobstopper_tasks.duckdb")
self.pool.append(conn)
self.current = 0
def get_connection(self):
conn = self.pool[self.current]
self.current = (self.current + 1) % len(self.pool)
return conn
# Batch task operations
async def bulk_queue_tasks(task_specs: list):
"""Queue multiple tasks efficiently"""
task_ids = []
# Prepare all tasks
for spec in task_specs:
task_id = await app.task_queue.add_task(**spec)
task_ids.append(task_id)
return task_ids
# Task archival for long-term storage
async def archive_completed_tasks(cutoff_date: datetime):
"""Archive old tasks to separate storage"""
old_tasks = app.task_queue.storage.connection.execute("""
SELECT * FROM tasks
WHERE completed_at < ? AND status IN ('success', 'failed')
""", (cutoff_date.isoformat(),)).fetchall()
# Save to archive (file, external DB, etc.)
await save_to_archive(old_tasks)
# Remove from active storage
deleted_count = app.task_queue.storage.cleanup_old_tasks(
days=(datetime.now() - cutoff_date).days
)
return len(old_tasks), deleted_count
Integration Examples¶
Complete Task Management Dashboard¶
@app.get("/dashboard/tasks")
async def task_dashboard(request):
"""Task management dashboard"""
# Get recent stats
stats = await app.task_queue.get_task_stats()
# Get running tasks
running_tasks = list(app.task_queue.running_tasks.values())
# Get recent completed tasks
recent_tasks = app.task_queue.storage.get_tasks(limit=20)
# Get failed tasks needing attention
failed_tasks = app.task_queue.storage.get_tasks(
status=TaskStatus.FAILED,
limit=10
)
return await app.render_template("task_dashboard.html",
stats=stats,
running_tasks=running_tasks,
recent_tasks=recent_tasks,
failed_tasks=failed_tasks
)
# API for task management UI
@app.post("/api/admin/restart-workers")
async def restart_all_workers(request):
"""Restart all background workers"""
# Graceful shutdown
await app.task_queue.shutdown()
# Restart workers
await start_background_workers()
return {"status": "workers_restarted"}
@app.get("/api/admin/system-health")
async def system_health(request):
"""Check task system health"""
health = {
"database_connection": True,
"workers": {},
"queue_sizes": {},
"recent_failures": 0
}
try:
# Test database connection
app.task_queue.storage.connection.execute("SELECT 1").fetchone()
except Exception:
health["database_connection"] = False
# Check worker status
for category, workers in app.task_queue.workers.items():
health["workers"][category] = {
"count": len(workers),
"healthy": sum(1 for w in workers if not w.done())
}
# Check queue sizes
for category, queue in app.task_queue.queues.items():
health["queue_sizes"][category] = queue.qsize()
# Count recent failures
recent_failures = app.task_queue.storage.get_tasks(
status=TaskStatus.FAILED,
limit=100
)
health["recent_failures"] = len([
t for t in recent_failures
if t.completed_at and
(datetime.now() - t.completed_at).total_seconds() < 3600 # Last hour
])
return health
The background task system in Gobstopper provides enterprise-grade task management with persistence, monitoring, and operational tools for building robust asynchronous applications.