"""Task storage with DuckDB backend for Gobstopper framework.
This module provides persistent storage for task metadata using DuckDB,
an embedded analytical database. Tasks are stored with full metadata including
status, timing, results, and error information for tracking and debugging.
The storage system features:
- Lazy database initialization (no file created until first use)
- Automatic table and index creation
- Efficient querying with category and status filters
- Intelligent cleanup of old completed tasks
- JSON serialization for complex data types
Classes:
TaskStorage: DuckDB-based persistent task storage with indexing.
Example:
Basic storage operations::
from gobstopper.tasks.storage import TaskStorage
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
from datetime import datetime
# Create storage instance
storage = TaskStorage("my_tasks.duckdb")
# Save a task
task = TaskInfo(
id="task-123",
name="process_data",
category="analytics",
priority=TaskPriority.NORMAL,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
storage.save_task(task)
# Retrieve task
retrieved = storage.get_task("task-123")
print(f"Status: {retrieved.status}")
# Query by category
analytics_tasks = storage.get_tasks(category="analytics", limit=50)
# Cleanup old completed tasks
deleted = storage.cleanup_old_tasks(days=30)
print(f"Deleted {deleted} old tasks")
Note:
DuckDB is required for task storage. Install with: uv add duckdb
The database file is created lazily on first use.
"""
import json
from dataclasses import asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Union
from .models import TaskInfo, TaskStatus, TaskPriority
try:
import duckdb
DUCKDB_AVAILABLE = True
except ImportError:
DUCKDB_AVAILABLE = False
duckdb = None
[docs]
class TaskStorage:
"""DuckDB-based task storage with intelligent cleanup.
Provides persistent storage for TaskInfo objects using DuckDB as the backend.
The database is created lazily on first use, with automatic schema creation
and index optimization for common query patterns.
The storage layer handles:
- Conversion between TaskInfo objects and database rows
- JSON serialization of complex fields (args, kwargs, result)
- Datetime string conversion for database compatibility
- Efficient indexing on category, status, created_at, and priority
- Safe concurrent access through DuckDB's transaction handling
Attributes:
db_path: Path to the DuckDB database file.
connection: DuckDB connection object (created lazily).
Example:
Creating and using storage::
from gobstopper.tasks.storage import TaskStorage
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
from datetime import datetime
# Initialize storage (no database file created yet)
storage = TaskStorage("tasks.duckdb")
# Create and save a task
task = TaskInfo(
id="550e8400-e29b-41d4-a716-446655440000",
name="send_email",
category="notifications",
priority=TaskPriority.HIGH,
status=TaskStatus.PENDING,
created_at=datetime.now(),
args=("user@example.com",),
kwargs={"subject": "Welcome"}
)
storage.save_task(task) # Database file created here
# Update task status
task.status = TaskStatus.SUCCESS
task.completed_at = datetime.now()
storage.save_task(task) # Updates existing record
# Query tasks
pending = storage.get_tasks(status=TaskStatus.PENDING, limit=100)
email_tasks = storage.get_tasks(category="notifications")
Raises:
ImportError: If DuckDB is not installed.
Note:
- Database initialization is deferred until first actual use
- The database file is created with full schema on first write
- All datetime values are stored as ISO 8601 strings
- JSON fields (args, kwargs, result) support any JSON-serializable data
"""
[docs]
def __init__(self, db_path: Union[str, Path] = "wopr_tasks.duckdb"):
"""Initialize TaskStorage with a database path.
Args:
db_path: Path to the DuckDB database file. Can be relative or
absolute. Defaults to "wopr_tasks.duckdb" in current directory.
Raises:
ImportError: If DuckDB package is not installed.
Note:
The database file is not created until the first operation that
requires it. This allows TaskStorage to be instantiated without
side effects.
"""
if not DUCKDB_AVAILABLE:
raise ImportError("DuckDB is required for task storage. Install: uv add duckdb")
self.db_path = Path(db_path)
self.connection = None # Lazy connection; do not touch DB until first use
def _init_database(self):
"""Initialize database connection, schema, and indexes.
Creates the DuckDB connection and sets up the tasks table with all
necessary columns and indexes. This method is called automatically
on the first database operation (lazy initialization).
The tasks table includes:
- Primary key on id (UUID)
- Columns for all TaskInfo fields
- JSON columns for args, kwargs, and result
- Indexes on category, status, created_at, and priority
Note:
This method is idempotent - calling it multiple times is safe.
If the connection already exists, it returns immediately.
Uses CREATE TABLE IF NOT EXISTS and CREATE INDEX IF NOT EXISTS.
"""
if self.connection is not None:
return
self.connection = duckdb.connect(str(self.db_path))
self.connection.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
category VARCHAR NOT NULL,
priority INTEGER NOT NULL,
status VARCHAR NOT NULL,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
elapsed_seconds DOUBLE,
result JSON,
error TEXT,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 0,
args JSON,
kwargs JSON,
progress DOUBLE DEFAULT 0.0,
progress_message VARCHAR DEFAULT ''
)
""")
# Create indexes for performance
indexes = [
"CREATE INDEX IF NOT EXISTS idx_tasks_category ON tasks(category)",
"CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)",
"CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at)",
"CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)"
]
for idx in indexes:
self.connection.execute(idx)
[docs]
def save_task(self, task_info: TaskInfo):
"""Save or update task information in the database.
Performs an upsert operation (INSERT OR REPLACE) to either create
a new task record or update an existing one. All TaskInfo fields
are serialized appropriately for database storage.
Args:
task_info: TaskInfo object to save or update.
Example:
Saving and updating a task::
task = TaskInfo(
id="task-123",
name="process_data",
category="analytics",
priority=TaskPriority.NORMAL,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
# Initial save
storage.save_task(task)
# Update after execution
task.status = TaskStatus.SUCCESS
task.completed_at = datetime.now()
task.elapsed_seconds = 5.2
task.result = {"records_processed": 1000}
storage.save_task(task) # Updates existing record
Note:
- Datetime objects are converted to ISO 8601 strings
- TaskPriority and TaskStatus enums are stored as their values
- args, kwargs, and result are JSON-serialized
- The database is initialized automatically if needed
"""
self._init_database()
task_dict = asdict(task_info)
# Convert datetime objects to ISO strings
for key in ['created_at', 'started_at', 'completed_at']:
if task_dict[key]:
task_dict[key] = task_dict[key].isoformat()
task_dict['priority'] = task_info.priority.value
task_dict['status'] = task_info.status.value
task_dict['args'] = json.dumps(task_dict['args'])
task_dict['kwargs'] = json.dumps(task_dict['kwargs'])
task_dict['result'] = json.dumps(task_dict['result'])
self.connection.execute("""
INSERT OR REPLACE INTO tasks VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
""", tuple(task_dict.values()))
[docs]
def get_task(self, task_id: str) -> Optional[TaskInfo]:
"""Retrieve a single task by its unique ID.
Args:
task_id: Unique identifier of the task to retrieve.
Returns:
TaskInfo object if found, None if no task exists with that ID.
Example:
Retrieving and checking a task::
task = storage.get_task("550e8400-e29b-41d4-a716-446655440000")
if task:
print(f"Task: {task.name}")
print(f"Status: {task.status.value}")
if task.status == TaskStatus.SUCCESS:
print(f"Result: {task.result}")
else:
print("Task not found")
Note:
Returns None rather than raising an exception if task doesn't exist.
"""
self._init_database()
result = self.connection.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return self._row_to_task_info(result) if result else None
[docs]
def get_tasks(self,
category: Optional[str] = None,
status: Optional[TaskStatus] = None,
limit: int = 100,
offset: int = 0) -> List[TaskInfo]:
"""Query tasks with optional filtering and pagination.
Retrieves multiple tasks from storage with optional filters on category
and status. Results are ordered by creation time (newest first) and
support pagination via limit/offset.
Args:
category: Filter by task category (e.g., "email", "reports").
If None, returns tasks from all categories.
status: Filter by TaskStatus enum value. If None, returns tasks
in any status.
limit: Maximum number of tasks to return. Defaults to 100.
offset: Number of tasks to skip (for pagination). Defaults to 0.
Returns:
List of TaskInfo objects matching the filters, ordered by
created_at DESC (newest first). Empty list if no matches.
Example:
Querying tasks with filters::
# Get all pending email tasks
pending_emails = storage.get_tasks(
category="email",
status=TaskStatus.PENDING,
limit=50
)
# Get next page of results
next_page = storage.get_tasks(
category="email",
status=TaskStatus.PENDING,
limit=50,
offset=50
)
# Get recent failed tasks across all categories
failed = storage.get_tasks(
status=TaskStatus.FAILED,
limit=20
)
# Get all tasks in analytics category
analytics = storage.get_tasks(category="analytics")
Note:
- Results are always ordered by created_at DESC
- Efficient queries due to indexes on category and status
- Both category and status filters are optional
"""
self._init_database()
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if status:
query += " AND status = ?"
params.append(status.value)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
results = self.connection.execute(query, params).fetchall()
return [self._row_to_task_info(row) for row in results]
[docs]
def cleanup_old_tasks(self, days: int = None, months: int = None, years: int = None):
"""Delete old completed tasks to manage database size.
Removes tasks that completed before a calculated cutoff date. Only
deletes tasks in terminal states (SUCCESS, FAILED, CANCELLED) to
avoid removing active or pending tasks.
Args:
days: Number of days to retain. Tasks older than this are deleted.
months: Number of months to retain (converted to 30-day periods).
years: Number of years to retain (converted to 365-day periods).
Returns:
int: Number of task records deleted.
Raises:
None: Returns 0 if no time period specified.
Example:
Cleaning up old tasks::
# Delete tasks completed over 30 days ago
deleted = storage.cleanup_old_tasks(days=30)
print(f"Cleaned up {deleted} old tasks")
# Delete tasks completed over 3 months ago
deleted = storage.cleanup_old_tasks(months=3)
# Delete tasks completed over 1 year ago
deleted = storage.cleanup_old_tasks(years=1)
# Combine periods (90 days + 6 months)
deleted = storage.cleanup_old_tasks(days=90, months=6)
Note:
- Only deletes tasks in SUCCESS, FAILED, or CANCELLED status
- PENDING and STARTED tasks are never deleted
- Time periods are cumulative if multiple specified
- Safe to run periodically (e.g., daily cron job)
- Returns 0 if no time period arguments provided
"""
if not any([days, months, years]):
return 0
self._init_database()
cutoff_date = datetime.now()
if days:
cutoff_date -= timedelta(days=days)
if months:
cutoff_date -= timedelta(days=months * 30)
if years:
cutoff_date -= timedelta(days=years * 365)
result = self.connection.execute("""
DELETE FROM tasks
WHERE completed_at < ?
AND status IN ('success', 'failed', 'cancelled')
""", (cutoff_date.isoformat(),))
return result.rowcount
def _row_to_task_info(self, row) -> TaskInfo:
"""Convert a database row tuple to a TaskInfo object.
Deserializes all fields from database representation back to Python
objects, including JSON deserialization for complex fields and datetime
parsing for timestamp fields.
Args:
row: Tuple from DuckDB query result containing all task fields
in the order defined by the table schema.
Returns:
TaskInfo object reconstructed from the database row.
Note:
- Handles both string and datetime objects for timestamp fields
- JSON fields (args, kwargs, result) are deserialized from strings
- Enum fields (priority, status) are converted from stored values
- Helper function _parse_datetime handles flexible datetime parsing
"""
def _parse_datetime(dt_value):
"""Parse datetime value that might be string or datetime object.
Args:
dt_value: Either a datetime object, ISO string, or None.
Returns:
datetime object or None.
"""
if dt_value is None:
return None
if isinstance(dt_value, datetime):
return dt_value
return datetime.fromisoformat(dt_value)
return TaskInfo(
id=row[0], name=row[1], category=row[2],
priority=TaskPriority(row[3]), status=TaskStatus(row[4]),
created_at=_parse_datetime(row[5]),
started_at=_parse_datetime(row[6]),
completed_at=_parse_datetime(row[7]),
elapsed_seconds=row[8] or 0.0,
result=json.loads(row[9]) if row[9] else None,
error=row[10], retry_count=row[11], max_retries=row[12],
args=tuple(json.loads(row[13])) if row[13] else (),
kwargs=json.loads(row[14]) if row[14] else {},
progress=row[15] or 0.0, progress_message=row[16] or ""
)