Tutorial

This tutorial will guide you through QuantumEngine’s advanced features step by step.

Note

This tutorial assumes you’ve completed the Quick Start Guide guide.

Advanced Field Types

QuantumEngine provides specialized field types for different use cases:

ClickHouse-Specific Fields

from quantumengine.fields.clickhouse import (
    LowCardinalityField, ArrayField, FixedStringField
)

class ProductCatalog(Document):
    # LowCardinality for enum-like values
    category = LowCardinalityField(required=True)  # More efficient than StringField
    brand = LowCardinalityField()

    # Arrays with type constraints
    tags = ArrayField(LowCardinalityField())  # Array(LowCardinality(String))
    colors = ArrayField(StringField())        # Array(String)

    # Fixed-length strings
    product_code = FixedStringField(length=10)  # FixedString(10)

    class Meta:
        backend = 'clickhouse'
        collection = 'products'

Schema Management

QuantumEngine supports both strict and flexible schemas:

SCHEMAFULL vs SCHEMALESS

# SCHEMAFULL - Strict schema enforcement
await User.create_table(schemafull=True)

# SCHEMALESS - Flexible schema
await User.create_table(schemafull=False)

# Hybrid approach - some fields always defined
class Product(Document):
    # Always defined in schema
    name = StringField(required=True, define_schema=True)
    price = FloatField(define_schema=True)

    # Only defined in SCHEMAFULL tables
    description = StringField()
    metadata = DictField()

Index Management

class User(Document):
    username = StringField(required=True)
    email = StringField(required=True)
    age = IntField()

    class Meta:
        collection = "users"
        indexes = [
            {"name": "user_username_idx", "fields": ["username"], "unique": True},
            {"name": "user_email_idx", "fields": ["email"], "unique": True},
            {"name": "user_age_idx", "fields": ["age"]}
        ]

# Create indexes
await User.create_indexes()

Relationships (SurrealDB)

QuantumEngine supports rich relationship modeling in SurrealDB:

Document References

from quantumengine.fields import ReferenceField, ListField

class Post(Document):
    title = StringField(required=True)
    author = ReferenceField(User, required=True)
    categories = ListField(field_type=ReferenceField(Category))

    class Meta:
        collection = "posts"
        backend = "surrealdb"

Graph Relations

# Create relations between documents
await author1.relate_to("collaborated_with", author2, project="Novel")

# Fetch relations
collaborators = await author1.fetch_relation("collaborated_with")

# Resolve relations (get related documents)
related_authors = await author1.resolve_relation("collaborated_with")

Relation Documents

from quantumengine import RelationDocument

class AuthorCollaboration(RelationDocument):
    project_name = StringField(required=True)
    start_date = DateTimeField()
    contribution_percent = FloatField()

    class Meta:
        collection = "collaborated_with"

# Create relation with metadata
relation = await AuthorCollaboration.create_relation(
    author1, author2,
    project_name="Science Fiction Novel",
    start_date=datetime.now(),
    contribution_percent=60.0
)

Advanced Querying

Complex Query Building

from quantumengine import Q, QueryExpression

# Complex filtering with Q objects
active_adults = await User.objects.filter(
    Q(age__gte=18) & Q(active=True) & ~Q(status="banned")
).all()

# QueryExpressions with FETCH (SurrealDB)
posts_with_authors = await Post.objects.filter(
    QueryExpression(where=Q(published=True)).fetch("author")
).all()

# Ordering and limiting
recent_posts = await Post.objects.filter(
    published=True
).order_by("-created_at", "title").limit(10).all()

Aggregations

from quantumengine.aggregation import Count, Avg, Sum, Max, Min

# Count users by status
status_counts = await User.objects.aggregate(
    total=Count('*'),
    active_count=Count('*', filter=Q(active=True)),
    avg_age=Avg('age')
)

# ClickHouse-specific aggregations
daily_stats = await AnalyticsEvent.objects.filter(
    timestamp__gte=yesterday
).group_by('event_type').aggregate(
    event_count=Count('*'),
    total_value=Sum('value'),
    avg_value=Avg('value')
)

Performance Optimization

Direct Record Access

# Optimized ID-based queries use direct record access
users = await User.objects.filter(id__in=['user:1', 'user:2']).all()

# Convenience methods for ID operations
users = await User.objects.get_many([1, 2, 3]).all()
users = await User.objects.get_range(100, 200, inclusive=True).all()

Query Analysis

# Explain query execution plan
plan = await User.objects.filter(age__gt=25).explain()

# Get index suggestions
suggestions = User.objects.filter(age__lt=30).suggest_indexes()

Bulk Operations

# Bulk updates
updated = await User.objects.filter(active=False).update(status="inactive")

# Bulk deletes
deleted_count = await User.objects.filter(last_login__lt=cutoff_date).delete()

Connection Management

Multiple Named Connections

# Main transactional database
main_db = create_connection(
    name="main_db",
    url="ws://localhost:8000/rpc",
    backend="surrealdb",
    make_default=True
)

# Analytics database
analytics_db = create_connection(
    name="analytics_db",
    url="https://analytics.clickhouse.cloud",
    backend="clickhouse"
)

# Use specific connection
await User.create_table(connection=main_db)
await AnalyticsEvent.create_table(connection=analytics_db)

Connection Pooling

# Connection with pooling
connection = create_connection(
    url="ws://localhost:8000/rpc",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600
)

Async/Sync API

Synchronous Operations

# Create sync connection
connection = create_connection(
    url="ws://localhost:8000/rpc",
    namespace="test_ns",
    database="test_db",
    async_mode=False
)

with connection:
    # Synchronous operations
    User.create_table_sync(schemafull=True)

    user = User(username="alice", email="alice@example.com")
    user.save_sync()

    users = User.objects.all_sync()
    user = User.objects.get_sync(id=user_id)
    user.delete_sync()

Migration and Schema Evolution

Generating Migrations

from quantumengine import generate_migration_statements

class UserV1(Document):
    username = StringField(required=True)
    email = StringField(required=True)

class UserV2(Document):
    username = StringField(required=True)
    email = StringField(required=True)
    active = BooleanField(default=True)  # New field

migration = generate_migration_statements(UserV1, UserV2)
print("UP migration:")
for stmt in migration['up']:
    print(f"  {stmt}")

print("DOWN migration:")
for stmt in migration['down']:
    print(f"  {stmt}")

Drop Table Operations

from quantumengine import (
    generate_drop_statements,
    drop_tables_from_module
)

# Drop individual table
await User.drop_table(if_exists=True)

# Generate drop statements for scripts
drop_statements = generate_drop_statements(User)

# Drop all tables in a module
await drop_tables_from_module('myapp.models')

Best Practices

  1. Use appropriate backends: SurrealDB for transactional data, ClickHouse for analytics

  2. Leverage modular installation: Only install backends you need

  3. Use LowCardinalityField for enum-like values in ClickHouse

  4. Index frequently queried fields

  5. Use Q objects for complex queries

  6. Handle backend availability gracefully

  7. Use connection pooling for high-traffic applications

Next Steps

  • Explore the API Reference for detailed API documentation

  • Check out examples/index for real-world usage patterns

  • Learn about specific Backends features

  • Contribute to the project via Contributing