"""ClickHouse backend implementation for SurrealEngine."""
import asyncio
import json
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Type, Union
import clickhouse_connect
from .base import BaseBackend
[docs]
class ClickHouseBackend(BaseBackend):
"""ClickHouse backend implementation using clickhouse-connect."""
[docs]
def __init__(self, connection: Any) -> None:
"""Initialize the ClickHouse backend.
Args:
connection: ClickHouse client connection
"""
super().__init__(connection)
# If connection is async, we'll need to handle it differently
self.client = connection
[docs]
async def create_table(self, document_class: Type, **kwargs) -> None:
"""Create a table for the document class with advanced ClickHouse features.
Args:
document_class: The document class to create a table for
**kwargs: Backend-specific options (override Meta settings):
- engine: ClickHouse table engine (default: MergeTree)
- engine_params: Parameters for the engine (e.g., ['date_collected'] for ReplacingMergeTree)
- order_by: Order by columns (default: ['id'])
- partition_by: Partition by expression
- primary_key: Primary key columns
- settings: Additional table settings
- ttl: TTL expression for data lifecycle
"""
table_name = document_class._meta.get('table_name')
meta = document_class._meta
# Get engine configuration from Meta or kwargs
engine = kwargs.get('engine', meta.get('engine', 'MergeTree'))
engine_params = kwargs.get('engine_params', meta.get('engine_params', []))
order_by = kwargs.get('order_by', meta.get('order_by'))
partition_by = kwargs.get('partition_by', meta.get('partition_by'))
primary_key = kwargs.get('primary_key', meta.get('primary_key'))
settings = kwargs.get('settings', meta.get('settings', {}))
ttl = kwargs.get('ttl', meta.get('ttl'))
# ClickHouse-specific ORDER BY intelligence
if not order_by:
order_by = self._determine_smart_order_by(document_class)
# Build column definitions
columns = []
materialized_columns = []
for field_name, field in document_class._fields.items():
field_type = self.get_field_type(field)
# Check for materialized columns
if hasattr(field, 'materialized') and field.materialized:
columns.append(f"`{field_name}` {field_type} MATERIALIZED ({field.materialized})")
materialized_columns.append(field_name)
elif field.required and field_name not in materialized_columns:
columns.append(f"`{field_name}` {field_type}")
elif field_name not in materialized_columns:
# Handle special ClickHouse type restrictions
if field_type.startswith('LowCardinality('):
# LowCardinality cannot be inside Nullable - it handles nulls natively
columns.append(f"`{field_name}` {field_type}")
elif hasattr(field, 'codec') and field.codec:
# Handle codec fields specially - CODEC cannot be inside Nullable()
if ' CODEC(' in field_type:
base_type, codec_part = field_type.split(' CODEC(', 1)
columns.append(f"`{field_name}` Nullable({base_type}) CODEC({codec_part}")
else:
columns.append(f"`{field_name}` Nullable({field_type})")
else:
columns.append(f"`{field_name}` Nullable({field_type})")
# Build CREATE TABLE query
query = f"CREATE TABLE IF NOT EXISTS {table_name} (\n"
query += ",\n".join(f" {col}" for col in columns)
# Add engine with parameters
if engine_params:
params_str = ", ".join(f"`{p}`" if isinstance(p, str) else str(p) for p in engine_params)
query += f"\n) ENGINE = {engine}({params_str})"
else:
query += f"\n) ENGINE = {engine}()"
# Add partition by
if partition_by:
query += f"\nPARTITION BY {partition_by}"
# Add primary key
if primary_key:
if isinstance(primary_key, list):
primary_key = ", ".join(f"`{pk}`" for pk in primary_key)
query += f"\nPRIMARY KEY ({primary_key})"
# Add order by
if isinstance(order_by, list):
order_by_str = ", ".join(f"`{col}`" for col in order_by)
else:
order_by_str = f"`{order_by}`" if not order_by.startswith('`') else order_by
query += f"\nORDER BY ({order_by_str})"
# Add TTL
if ttl:
query += f"\nTTL {ttl}"
# Add settings
if settings:
settings_str = ", ".join(f"{k}={v}" for k, v in settings.items())
query += f"\nSETTINGS {settings_str}"
# Debug: Print the generated query
print("Generated ClickHouse SQL:")
print(query)
print("=" * 60)
# Execute table creation
await self._execute(query)
# Create indexes if specified
await self._create_indexes(document_class, table_name)
async def _create_indexes(self, document_class: Type, table_name: str) -> None:
"""Create indexes for the table based on field specifications.
Args:
document_class: The document class
table_name: The table name
"""
for field_name, field in document_class._fields.items():
if hasattr(field, 'indexes') and field.indexes:
for index_spec in field.indexes:
await self._create_single_index(table_name, field_name, index_spec)
async def _create_single_index(self, table_name: str, field_name: str, index_spec: Dict[str, Any]) -> None:
"""Create a single index based on specification.
Args:
table_name: The table name
field_name: The field name
index_spec: Index specification dictionary
"""
index_type = index_spec.get('type', 'bloom_filter')
granularity = index_spec.get('granularity', 3)
# Generate index name
index_name = f"idx_{table_name}_{field_name}_{index_type}"
if index_type == 'bloom_filter':
false_positive_rate = index_spec.get('false_positive_rate', 0.01)
query = (f"ALTER TABLE {table_name} "
f"ADD INDEX {index_name} {field_name} "
f"TYPE bloom_filter({false_positive_rate}) GRANULARITY {granularity}")
elif index_type == 'set':
max_values = index_spec.get('max_values', 100)
query = (f"ALTER TABLE {table_name} "
f"ADD INDEX {index_name} {field_name} "
f"TYPE set({max_values}) GRANULARITY {granularity}")
elif index_type == 'minmax':
query = (f"ALTER TABLE {table_name} "
f"ADD INDEX {index_name} {field_name} "
f"TYPE minmax GRANULARITY {granularity}")
else:
# Custom index type - use as-is
query = (f"ALTER TABLE {table_name} "
f"ADD INDEX {index_name} {field_name} "
f"TYPE {index_type} GRANULARITY {granularity}")
try:
await self._execute(query)
except Exception as e:
# Log index creation failure but don't fail table creation
print(f"Warning: Failed to create index {index_name}: {e}")
def _determine_smart_order_by(self, document_class: Type) -> List[str]:
"""Intelligently determine ORDER BY clause for ClickHouse tables.
ClickHouse requires an ORDER BY clause, but unlike traditional databases,
it doesn't need an artificial 'id' field. This method analyzes the document
fields to choose the most appropriate ORDER BY strategy.
Priority order:
1. Time-based fields (common in analytics workloads)
2. Required categorical fields (user_id, product_id, etc.)
3. Any required non-nullable fields
4. Auto-generate a simple ordering field if needed
Args:
document_class: The document class to analyze
Returns:
List of field names for ORDER BY clause
"""
fields = document_class._fields
time_fields = []
categorical_fields = []
required_fields = []
# Import field types for analysis
from ..fields import DateTimeField, StringField
from ..fields.clickhouse import LowCardinalityField
for field_name, field in fields.items():
# Skip materialized columns from ORDER BY consideration
if hasattr(field, 'materialized') and field.materialized:
continue
# Analyze field patterns
field_name_lower = field_name.lower()
# Priority 1: Time-based fields
if isinstance(field, DateTimeField):
priority = 0
# Give higher priority to common timestamp field names
if any(keyword in field_name_lower for keyword in
['created', 'updated', 'collected', 'timestamp', 'time', 'date']):
priority = -1
time_fields.append((priority, field_name, field))
# Priority 2: Categorical identifier fields
elif (isinstance(field, (StringField, LowCardinalityField)) and
field.required and
any(keyword in field_name_lower for keyword in
['id', 'key', 'name', 'code', 'type', 'category', 'brand', 'seller'])):
# Lower cardinality fields get higher priority
priority = 0 if isinstance(field, LowCardinalityField) else 1
categorical_fields.append((priority, field_name, field))
# Priority 3: Any required fields that could work for ordering
elif field.required:
required_fields.append((field_name, field))
# Build ORDER BY strategy
order_by = []
# Add best time field (most common pattern in ClickHouse)
if time_fields:
time_fields.sort(key=lambda x: x[0]) # Sort by priority
best_time_field = time_fields[0][1]
order_by.append(best_time_field)
# Add best categorical field to improve sorting
if categorical_fields and len(order_by) < 3:
categorical_fields.sort(key=lambda x: x[0])
best_categorical = categorical_fields[0][1]
order_by.append(best_categorical)
# If no time fields, start with categorical fields
elif categorical_fields:
categorical_fields.sort(key=lambda x: x[0])
# Add up to 2 categorical fields for compound sorting
for i, (_, field_name, _) in enumerate(categorical_fields[:2]):
order_by.append(field_name)
# If no good categorical fields, use any required field
elif required_fields:
order_by.append(required_fields[0][0])
# Last resort: auto-generate a simple ordering field
if not order_by:
print("Warning: No suitable fields found for ORDER BY. "
"Consider adding a timestamp or identifier field, "
"or specify order_by explicitly in Meta class.")
# Create a synthetic ordering using tuple() of available fields
available_fields = [name for name, field in fields.items()
if not (hasattr(field, 'materialized') and field.materialized)]
if available_fields:
order_by = available_fields[:3] # Use first few fields
else:
# Absolute last resort - this shouldn't happen in practice
order_by = ['tuple()']
return order_by
[docs]
async def insert(self, table_name: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Insert a single document.
Args:
table_name: The table name
data: The document data to insert
Returns:
The inserted document with generated id if not provided
"""
# Generate ID only if the table has an id column and it's not provided
# First check if the table has an id column by describing it
try:
describe_result = await self._query(f"DESCRIBE {table_name}")
column_names = [row[0] for row in describe_result] if describe_result else []
if 'id' in column_names and ('id' not in data or not data['id']):
data['id'] = str(uuid.uuid4())
except Exception:
# If we can't describe the table, fall back to the old behavior
if 'id' not in data or not data['id']:
data['id'] = str(uuid.uuid4())
columns = list(data.keys())
values = [data[col] for col in columns]
await self._execute_insert(table_name, [values], columns)
return data
[docs]
async def insert_many(self, table_name: str, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Insert multiple documents efficiently.
Args:
table_name: The table name
data: List of documents to insert
Returns:
List of inserted documents
"""
if not data:
return []
# Check if the table has an id column
try:
describe_result = await self._query(f"DESCRIBE {table_name}")
column_names = [row[0] for row in describe_result] if describe_result else []
table_has_id = 'id' in column_names
except Exception:
# If we can't describe the table, assume it has an id column
table_has_id = True
# Ensure all documents have IDs only if the table has an id column
for doc in data:
if table_has_id and ('id' not in doc or not doc['id']):
doc['id'] = str(uuid.uuid4())
# Get columns from first document
columns = list(data[0].keys())
# Prepare values
values = []
for doc in data:
row = [doc.get(col) for col in columns]
values.append(row)
await self._execute_insert(table_name, values, columns)
return data
[docs]
async def select(self, table_name: str, conditions: List[str],
fields: Optional[List[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
order_by: Optional[List[tuple[str, str]]] = None) -> List[Dict[str, Any]]:
"""Select documents from a table.
Args:
table_name: The table name
conditions: List of condition strings
fields: List of fields to return (None for all)
limit: Maximum number of results
offset: Number of results to skip
order_by: List of (field, direction) tuples
Returns:
List of matching documents
"""
# Build SELECT clause
if fields:
select_clause = ", ".join(f"`{field}`" for field in fields)
else:
select_clause = "*"
query = f"SELECT {select_clause} FROM {table_name}"
# Add WHERE clause
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
# Add ORDER BY clause
if order_by:
order_parts = []
for field, direction in order_by:
order_parts.append(f"`{field}` {direction.upper()}")
query += f" ORDER BY {', '.join(order_parts)}"
# Add LIMIT and OFFSET
if limit:
query += f" LIMIT {limit}"
if offset:
query += f" OFFSET {offset}"
result = await self._query(query)
if not result:
return []
# Get column names for converting to dicts
columns_query = f"DESCRIBE {table_name}"
columns_result = await self._query(columns_query)
column_names = [row[0] for row in columns_result] if columns_result else None
# Convert to list of dicts
if column_names:
return [dict(zip(column_names, row)) for row in result]
else:
# Fallback: use generic column names
if result and len(result) > 0:
column_count = len(result[0])
column_names = [f"col_{i}" for i in range(column_count)]
return [dict(zip(column_names, row)) for row in result]
return []
[docs]
async def count(self, table_name: str, conditions: List[str]) -> int:
"""Count documents matching conditions.
Args:
table_name: The table name
conditions: List of condition strings
Returns:
Number of matching documents
"""
query = f"SELECT count(*) FROM {table_name}"
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
result = await self._query(query)
if result and result[0]:
return result[0][0]
return 0
[docs]
async def update(self, table_name: str, conditions: List[str],
data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Update documents matching conditions.
Note: ClickHouse uses ALTER TABLE UPDATE which is asynchronous
and doesn't immediately return updated rows.
Args:
table_name: The table name
conditions: List of condition strings
data: The fields to update
Returns:
List of documents that will be updated
"""
# First, get the documents that will be updated
docs_to_update = await self.select(table_name, conditions)
if not docs_to_update:
return []
# Build UPDATE query
set_clauses = []
for key, value in data.items():
set_clauses.append(f"`{key}` = {self.format_value(value)}")
query = f"ALTER TABLE {table_name} UPDATE {', '.join(set_clauses)}"
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
await self._execute(query)
# Return the documents with updates applied
# Note: In real ClickHouse, the update is asynchronous
for doc in docs_to_update:
doc.update(data)
return docs_to_update
[docs]
async def delete(self, table_name: str, conditions: List[str]) -> int:
"""Delete documents matching conditions.
Note: ClickHouse uses ALTER TABLE DELETE which is asynchronous.
Args:
table_name: The table name
conditions: List of condition strings
Returns:
Number of documents that will be deleted
"""
# Count documents before deletion
count = await self.count(table_name, conditions)
if count == 0:
return 0
query = f"ALTER TABLE {table_name} DELETE"
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
await self._execute(query)
return count
[docs]
async def drop_table(self, table_name: str, if_exists: bool = True) -> None:
"""Drop a table using ClickHouse's DROP TABLE statement.
Args:
table_name: The table name to drop
if_exists: Whether to use IF EXISTS clause to avoid errors if table doesn't exist
"""
if if_exists:
query = f"DROP TABLE IF EXISTS {table_name}"
else:
query = f"DROP TABLE {table_name}"
await self._execute(query)
[docs]
async def execute_raw(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""Execute a raw query.
Args:
query: The raw query string
params: Optional query parameters
Returns:
Query result
"""
if params:
# Simple parameter substitution for ClickHouse
for key, value in params.items():
query = query.replace(f":{key}", self.format_value(value))
return await self._query(query)
[docs]
def build_condition(self, field: str, operator: str, value: Any) -> str:
"""Build a condition string for ClickHouse SQL.
Args:
field: The field name
operator: The operator
value: The value to compare against
Returns:
A condition string in ClickHouse SQL
"""
field = f"`{field}`"
if operator == '=':
return f"{field} = {self.format_value(value)}"
elif operator == '!=':
return f"{field} != {self.format_value(value)}"
elif operator in ['>', '<', '>=', '<=']:
return f"{field} {operator} {self.format_value(value)}"
elif operator == 'in':
if isinstance(value, list):
formatted_values = [self.format_value(v) for v in value]
return f"{field} IN ({', '.join(formatted_values)})"
return f"{field} IN {self.format_value(value)}"
elif operator == 'not in':
if isinstance(value, list):
formatted_values = [self.format_value(v) for v in value]
return f"{field} NOT IN ({', '.join(formatted_values)})"
return f"{field} NOT IN {self.format_value(value)}"
elif operator == 'like':
return f"{field} LIKE {self.format_value(value)}"
elif operator == 'ilike':
return f"{field} ILIKE {self.format_value(value)}"
elif operator == 'contains':
# For string contains (LIKE with wildcards) or array contains
# Check if this is likely a string field by the value type
if isinstance(value, str):
# String contains - use LIKE with wildcards
escaped_value = value.replace('%', '\\%').replace('_', '\\_')
return f"{field} LIKE {self.format_value(f'%{escaped_value}%')}"
else:
# Array contains - use has()
return f"has({field}, {self.format_value(value)})"
elif operator == 'is null':
return f"{field} IS NULL"
elif operator == 'is not null':
return f"{field} IS NOT NULL"
else:
return f"{field} {operator} {self.format_value(value)}"
[docs]
def get_field_type(self, field: Any) -> str:
"""Get the ClickHouse field type for a QuantumORM field.
Args:
field: A QuantumORM field instance
Returns:
The corresponding ClickHouse field type
"""
# Import here to avoid circular imports
from ..fields import (
StringField, IntField, FloatField, BooleanField,
DateTimeField, UUIDField, DictField, DecimalField
)
from ..fields.clickhouse import (
LowCardinalityField, FixedStringField, EnumField,
CompressedStringField, CompressedLowCardinalityField
)
# Check for ClickHouse-specific fields first
if hasattr(field, 'get_clickhouse_type'):
return field.get_clickhouse_type()
# Handle standard fields
if isinstance(field, StringField):
if hasattr(field, 'max_length') and field.max_length:
return f"FixedString({field.max_length})"
return "String"
elif isinstance(field, IntField):
return "Int64"
elif isinstance(field, FloatField):
return "Float64"
elif isinstance(field, BooleanField):
return "UInt8" # ClickHouse uses UInt8 for booleans
elif isinstance(field, DateTimeField):
return "DateTime64(3)" # Millisecond precision
elif isinstance(field, UUIDField):
return "UUID"
elif isinstance(field, DecimalField):
return "Decimal(38, 18)" # High precision decimal
elif isinstance(field, DictField):
return "String" # Store JSON as string
else:
return "String" # Default to string
# Transaction support (limited in ClickHouse)
[docs]
async def begin_transaction(self) -> Any:
"""Begin a transaction.
Note: ClickHouse has limited transaction support.
"""
# ClickHouse doesn't support traditional transactions
# Return None to indicate no transaction
return None
[docs]
async def commit_transaction(self, transaction: Any) -> None:
"""Commit a transaction.
Note: No-op for ClickHouse.
"""
pass
[docs]
async def rollback_transaction(self, transaction: Any) -> None:
"""Rollback a transaction.
Note: No-op for ClickHouse.
"""
pass
[docs]
def supports_transactions(self) -> bool:
"""ClickHouse has limited transaction support."""
return False
[docs]
def supports_references(self) -> bool:
"""ClickHouse doesn't support references between tables."""
return False
[docs]
def supports_graph_relations(self) -> bool:
"""ClickHouse doesn't support graph relations."""
return False
[docs]
def supports_direct_record_access(self) -> bool:
"""ClickHouse doesn't support direct record access syntax."""
return False
[docs]
def supports_explain(self) -> bool:
"""ClickHouse supports EXPLAIN queries."""
return True
[docs]
def supports_indexes(self) -> bool:
"""ClickHouse supports indexes."""
return True
[docs]
def supports_full_text_search(self) -> bool:
"""ClickHouse has limited full-text search support."""
return False
[docs]
def supports_bulk_operations(self) -> bool:
"""ClickHouse excels at bulk operations."""
return True
[docs]
def get_optimized_methods(self) -> Dict[str, str]:
"""Get ClickHouse-specific optimization methods."""
return {
'bulk_insert': 'INSERT INTO table VALUES (...)',
'analytical_functions': 'groupArray(), uniq(), quantile()',
'array_functions': 'has(), arrayFilter(), arrayMap()',
'columnar_storage': 'Optimized for analytical workloads',
}
# Materialized view support
[docs]
async def create_materialized_view(self, materialized_document_class: Type) -> None:
"""Create a ClickHouse materialized view.
Args:
materialized_document_class: The MaterializedDocument class
"""
view_name = materialized_document_class._meta.get('view_name') or \
materialized_document_class._meta.get('table_name') or \
materialized_document_class.__name__.lower()
meta = materialized_document_class._meta
# Build the source query
source_query = materialized_document_class._build_source_query()
# Get ClickHouse-specific configuration
engine = meta.get('engine', 'AggregatingMergeTree')
engine_params = meta.get('engine_params', [])
order_by = meta.get('order_by', [])
partition_by = meta.get('partition_by')
# If no ORDER BY specified, use dimension fields
if not order_by:
order_by = list(materialized_document_class._dimension_fields.keys())
# Build CREATE MATERIALIZED VIEW query
if engine_params:
params_str = ", ".join(f"`{p}`" if isinstance(p, str) else str(p) for p in engine_params)
engine_clause = f"ENGINE = {engine}({params_str})"
else:
engine_clause = f"ENGINE = {engine}()"
# Build ORDER BY clause
if isinstance(order_by, list):
order_by_str = ", ".join(f"`{col}`" for col in order_by)
else:
order_by_str = f"`{order_by}`" if not order_by.startswith('`') else order_by
order_by_clause = f"ORDER BY ({order_by_str})"
# Add partition clause if specified
partition_clause = f"PARTITION BY {partition_by}" if partition_by else ""
# Build the complete query
query = f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS {view_name}
{engine_clause}
{partition_clause}
{order_by_clause}
AS {source_query}
""".strip()
# Debug: Print the generated query
print("Generated ClickHouse Materialized View SQL:")
print(query)
print("=" * 60)
await self._execute(query)
[docs]
async def drop_materialized_view(self, materialized_document_class: Type) -> None:
"""Drop a ClickHouse materialized view.
Args:
materialized_document_class: The MaterializedDocument class
"""
view_name = materialized_document_class._meta.get('view_name') or \
materialized_document_class._meta.get('table_name') or \
materialized_document_class.__name__.lower()
query = f"DROP VIEW IF EXISTS {view_name}"
await self._execute(query)
[docs]
async def refresh_materialized_view(self, materialized_document_class: Type) -> None:
"""Refresh a ClickHouse materialized view.
Note: ClickHouse materialized views update automatically as data arrives.
This is a no-op for ClickHouse.
Args:
materialized_document_class: The MaterializedDocument class
"""
# ClickHouse materialized views refresh automatically
pass
# Helper methods for async execution
async def _execute(self, query: str) -> None:
"""Execute a query without returning results."""
# Run sync operation in thread pool
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.command, query)
async def _query(self, query: str) -> List[Any]:
"""Execute a query and return results."""
# Run sync operation in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self.client.query, query)
return result.result_rows if result else []
async def _execute_insert(self, table_name: str, data: List[List[Any]], column_names: List[str]) -> None:
"""Execute an INSERT with multiple rows."""
# Run sync operation in thread pool
loop = asyncio.get_event_loop()
# Use partial to bind keyword arguments
from functools import partial
insert_func = partial(
self.client.insert,
table_name,
data,
column_names=column_names
)
await loop.run_in_executor(None, insert_func)