Aggregation Pipeline¶
This module provides data aggregation utilities for building complex aggregation pipelines and queries.
Core Classes¶
- class surrealengine.aggregation.AggregationPipeline(query_set)[source]¶
Bases:
objectPipeline for building and executing aggregation queries.
This class provides a fluent interface for building complex aggregation pipelines with multiple stages, similar to MongoDB’s aggregation framework.
- __init__(query_set)[source]¶
Initialize a new AggregationPipeline.
- Parameters:
query_set (QuerySet) – The QuerySet to build the pipeline from
- group(by_fields=None, **aggregations)[source]¶
Group by fields and apply aggregations.
- Parameters:
by_fields – Field or list of fields to group by
**aggregations – Named aggregation functions to apply
- Returns:
The pipeline instance for method chaining
- project(**fields)[source]¶
Select or compute fields to include in output.
- Parameters:
**fields – Field mappings for projection
- Returns:
The pipeline instance for method chaining
- sort(**fields)[source]¶
Sort results by fields.
- Parameters:
**fields – Field names and sort directions (‘ASC’ or ‘DESC’)
- Returns:
The pipeline instance for method chaining
- limit(count)[source]¶
Limit number of results.
- Parameters:
count – Maximum number of results to return
- Returns:
The pipeline instance for method chaining
- skip(count)[source]¶
Skip number of results.
- Parameters:
count – Number of results to skip
- Returns:
The pipeline instance for method chaining
- with_index(index)[source]¶
Use the specified index for the query.
- Parameters:
index – Name of the index to use
- Returns:
The pipeline instance for method chaining
- match(**conditions)[source]¶
Filter documents before aggregation (similar to WHERE clause).
This method adds filtering conditions that are applied before any aggregation operations. Multiple conditions are combined with AND.
- Parameters:
**conditions – Field-value pairs for filtering (e.g., status=’active’)
- Returns:
The pipeline instance for method chaining
Example
pipeline.match(status=’completed’, price__gt=100)
- having(**conditions)[source]¶
Filter aggregated results (similar to HAVING clause).
This method adds filtering conditions that are applied after aggregation operations. Use this to filter based on aggregated values.
- Parameters:
**conditions – Field-value pairs for filtering aggregated results
- Returns:
The pipeline instance for method chaining
Example
pipeline.group(by_fields=’category’, total=Sum(‘price’)).having(total__gt=1000)
- build_query()[source]¶
Build the SurrealQL query from the pipeline stages.
- Returns:
The SurrealQL query string
Usage Examples¶
Basic Aggregation Pipeline¶
from surrealengine.aggregation import AggregationPipeline
from surrealengine.materialized_view import Sum, Count, Mean
class Order(Document):
customer_id = StringField(required=True)
amount = DecimalField(required=True)
status = StringField(required=True)
created_at = DateTimeField(auto_now_add=True)
# Create aggregation pipeline
pipeline = AggregationPipeline(Order) \\
.match(status="completed") \\
.group_by("customer_id") \\
.aggregate(
total_spent=Sum("amount"),
order_count=Count(),
avg_order_value=Mean("amount")
) \\
.sort("-total_spent") \\
.limit(10)
# Execute pipeline
results = await pipeline.execute()
Complex Aggregations¶
from datetime import datetime, timedelta
# Multi-stage aggregation
monthly_stats = AggregationPipeline(Order) \\
.match(
created_at__gte=datetime.now() - timedelta(days=365),
status__in=["completed", "shipped"]
) \\
.add_fields(
year_month="CONCAT(YEAR(created_at), '-', MONTH(created_at))"
) \\
.group_by("year_month") \\
.aggregate(
monthly_revenue=Sum("amount"),
monthly_orders=Count(),
unique_customers=Count("customer_id", distinct=True),
avg_order_value=Mean("amount")
) \\
.sort("year_month")
results = await monthly_stats.execute()
Window Functions¶
# Running totals and rankings
customer_rankings = AggregationPipeline(Order) \\
.group_by("customer_id") \\
.aggregate(
total_spent=Sum("amount"),
order_count=Count()
) \\
.add_fields(
rank="RANK() OVER (ORDER BY total_spent DESC)",
running_total="SUM(total_spent) OVER (ORDER BY total_spent DESC)"
) \\
.sort("rank")
top_customers = await customer_rankings.execute()
Conditional Aggregations¶
# Conditional aggregations using CASE expressions
order_analysis = AggregationPipeline(Order) \\
.group_by("customer_id") \\
.aggregate(
total_orders=Count(),
high_value_orders=Count(
condition="amount > 100"
),
revenue_this_month=Sum(
"amount",
condition="created_at >= DATE_SUB(NOW(), INTERVAL 1 MONTH)"
),
avg_order_size=Mean("amount")
) \\
.add_fields(
high_value_ratio="high_value_orders / total_orders",
customer_tier="""
CASE
WHEN total_orders > 50 THEN 'VIP'
WHEN total_orders > 20 THEN 'Premium'
WHEN total_orders > 5 THEN 'Regular'
ELSE 'New'
END
"""
)
customer_tiers = await order_analysis.execute()
Geographic Aggregations¶
class Store(Document):
name = StringField(required=True)
location = GeometryField() # Point geometry
region = StringField()
# Geographic aggregations
regional_performance = AggregationPipeline(Order) \\
.lookup(
from_collection="stores",
local_field="store_id",
foreign_field="id",
as_field="store_info"
) \\
.unwind("store_info") \\
.group_by("store_info.region") \\
.aggregate(
total_revenue=Sum("amount"),
total_orders=Count(),
unique_stores=Count("store_id", distinct=True),
avg_revenue_per_store=Mean("amount")
)
regional_stats = await regional_performance.execute()
Time Series Aggregations¶
# Daily time series with moving averages
daily_metrics = AggregationPipeline(Order) \\
.match(
created_at__gte=datetime.now() - timedelta(days=90)
) \\
.group_by("DATE(created_at)") \\
.aggregate(
daily_revenue=Sum("amount"),
daily_orders=Count(),
daily_customers=Count("customer_id", distinct=True)
) \\
.sort("DATE(created_at)") \\
.add_fields(
revenue_7day_avg="""
AVG(daily_revenue) OVER (
ORDER BY DATE(created_at)
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
)
""",
revenue_trend="""
CASE
WHEN daily_revenue > LAG(daily_revenue, 7) OVER (ORDER BY DATE(created_at))
THEN 'UP'
ELSE 'DOWN'
END
"""
)
time_series = await daily_metrics.execute()
Pipeline Composition¶
# Reusable pipeline components
def base_order_pipeline():
return AggregationPipeline(Order) \\
.match(status="completed") \\
.match(created_at__gte=datetime.now() - timedelta(days=365))
def customer_aggregation(pipeline):
return pipeline \\
.group_by("customer_id") \\
.aggregate(
total_spent=Sum("amount"),
order_count=Count(),
first_order=Min("created_at"),
last_order=Max("created_at")
)
# Compose pipelines
customer_lifetime_value = customer_aggregation(base_order_pipeline()) \\
.add_fields(
days_active="DATEDIFF(last_order, first_order)",
avg_days_between_orders="days_active / (order_count - 1)"
) \\
.match(total_spent__gte=500)
valuable_customers = await customer_lifetime_value.execute()
Export and Materialization¶
# Export pipeline results
pipeline = AggregationPipeline(Order) \\
.group_by("customer_id") \\
.aggregate(total_spent=Sum("amount"))
# Export to different formats
await pipeline.export_to_csv("customer_totals.csv")
await pipeline.export_to_json("customer_totals.json")
# Create materialized view from pipeline
materialized_view = pipeline.materialize(
view_name="customer_totals_mv",
refresh_interval="1h"
)
await materialized_view.create()
Performance Optimization¶
# Optimized pipeline with proper indexing hints
optimized_pipeline = AggregationPipeline(Order) \\
.use_index("status_created_at_idx") \\
.match(
status="completed",
created_at__gte=datetime.now() - timedelta(days=30)
) \\
.group_by("customer_id") \\
.aggregate(
monthly_spent=Sum("amount"),
monthly_orders=Count()
) \\
.explain() # Get execution plan
# Execute with performance metrics
results, metrics = await optimized_pipeline.execute_with_metrics()
print(f"Execution time: {metrics['duration']}ms")
print(f"Documents examined: {metrics['documents_examined']}")
print(f"Documents returned: {metrics['documents_returned']}")