Source code for quantumengine.query.base

from ..base_query import BaseQuerySet
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast, TypeVar, Generic
from ..exceptions import MultipleObjectsReturned, DoesNotExist
from ..fields import ReferenceField
from ..types import IdType, DocumentType
from surrealdb import RecordID
import json
import asyncio
import logging

# Type variable for QuerySet generic constraint
T = TypeVar('T', bound='Document')

# Set up logging
logger = logging.getLogger(__name__)


[docs] class QuerySet(BaseQuerySet, Generic[T]): """Query builder for SurrealDB with generic type safety. This class provides a query builder for document classes with a predefined schema. It extends BaseQuerySet to provide methods for querying and manipulating documents of a specific document class. Type Parameters: T: The document class type that this QuerySet operates on Attributes: document_class: The document class to query connection: The database connection to use for queries """
[docs] def __init__(self, document_class: Type[T], connection: Any = None) -> None: """Initialize a new QuerySet. Args: document_class: The document class to query connection: The database connection to use for queries (optional, will use document's backend if None) """ self.document_class = document_class self.backend = self._get_backend() # Use backend connection if no specific connection provided if connection is None: connection = self.backend.connection super().__init__(connection)
[docs] async def join(self, field_name: str, target_fields: Optional[List[str]] = None, dereference: bool = True, dereference_depth: int = 1) -> List[Any]: """Perform a JOIN-like operation on a reference field using FETCH. This method performs a JOIN-like operation on a reference field by using SurrealDB's FETCH clause to efficiently resolve references in a single query. Args: field_name: The name of the reference field to join on target_fields: Optional list of fields to select from the target document dereference: Whether to dereference references in the joined documents (default: True) dereference_depth: Maximum depth of reference resolution (default: 1) Returns: List of documents with joined data Raises: ValueError: If the field is not a ReferenceField """ # Ensure field_name is a ReferenceField field = self.document_class._fields.get(field_name) if not field or not isinstance(field, ReferenceField): raise ValueError(f"{field_name} is not a ReferenceField") if not dereference: # If no dereferencing needed, just return regular results return await self.all() # Use FETCH to join in a single query queryset = self._clone() queryset.fetch_fields.append(field_name) try: documents = await queryset.all() # If dereference_depth > 1, recursively resolve deeper references if dereference_depth > 1: for doc in documents: referenced_doc = getattr(doc, field_name, None) if referenced_doc and hasattr(referenced_doc, 'resolve_references'): await referenced_doc.resolve_references(depth=dereference_depth-1) return documents except Exception: # Fall back to manual resolution if FETCH fails documents = await self.all() target_document_class = field.document_type for doc in documents: if getattr(doc, field_name, None): ref_value = getattr(doc, field_name) ref_id = None if isinstance(ref_value, str) and ':' in ref_value: ref_id = ref_value elif hasattr(ref_value, 'id'): ref_id = ref_value.id if ref_id: referenced_doc = await target_document_class.get(id=ref_id, dereference=dereference, dereference_depth=dereference_depth) setattr(doc, field_name, referenced_doc) return documents
[docs] def join_sync(self, field_name: str, target_fields: Optional[List[str]] = None, dereference: bool = True, dereference_depth: int = 1) -> List[Any]: """Perform a JOIN-like operation on a reference field synchronously using FETCH. This method performs a JOIN-like operation on a reference field by using SurrealDB's FETCH clause to efficiently resolve references in a single query. Args: field_name: The name of the reference field to join on target_fields: Optional list of fields to select from the target document dereference: Whether to dereference references in the joined documents (default: True) dereference_depth: Maximum depth of reference resolution (default: 1) Returns: List of documents with joined data Raises: ValueError: If the field is not a ReferenceField """ # Ensure field_name is a ReferenceField field = self.document_class._fields.get(field_name) if not field or not isinstance(field, ReferenceField): raise ValueError(f"{field_name} is not a ReferenceField") if not dereference: # If no dereferencing needed, just return regular results return self.all_sync() # Use FETCH to join in a single query queryset = self._clone() queryset.fetch_fields.append(field_name) try: documents = queryset.all_sync() # If dereference_depth > 1, recursively resolve deeper references if dereference_depth > 1: for doc in documents: referenced_doc = getattr(doc, field_name, None) if referenced_doc and hasattr(referenced_doc, 'resolve_references_sync'): referenced_doc.resolve_references_sync(depth=dereference_depth-1) return documents except Exception: # Fall back to manual resolution if FETCH fails documents = self.all_sync() target_document_class = field.document_type for doc in documents: if getattr(doc, field_name, None): ref_value = getattr(doc, field_name) ref_id = None if isinstance(ref_value, str) and ':' in ref_value: ref_id = ref_value elif hasattr(ref_value, 'id'): ref_id = ref_value.id if ref_id: referenced_doc = target_document_class.get_sync(id=ref_id, dereference=dereference, dereference_depth=dereference_depth) setattr(doc, field_name, referenced_doc) return documents
def _build_query(self) -> str: """Build the query string with performance optimizations. This method builds the query string for the document class query. It automatically uses optimized direct record access when possible. Returns: The optimized query string """ # Try to build optimized direct record access query first optimized_query = self._build_direct_record_query() if optimized_query: return optimized_query # Fall back to regular query building query = f"SELECT * FROM {self.document_class._get_collection_name()}" if self.query_parts: conditions = self._build_conditions() query += f" WHERE {' AND '.join(conditions)}" # Add other clauses from _build_clauses clauses = self._build_clauses() for clause_name, clause_sql in clauses.items(): if clause_name != 'WHERE': # WHERE clause is already handled query += f" {clause_sql}" return query
[docs] async def all(self, dereference: bool = False) -> List[T]: """Execute the query and return all results asynchronously. This method builds and executes the query, then converts the results to instances of the document class. Includes automatic retry on transient failures. Args: dereference: Whether to dereference references (default: False) Returns: List of document instances """ async def _execute_query(): table_name = self.document_class._get_collection_name() # Handle bulk ID selection optimization if self._bulk_id_selection: # Use direct record access for bulk ID queries if supported if self.backend.supports_direct_record_access() and hasattr(self.backend, 'select_by_ids'): results = await self.backend.select_by_ids(table_name, self._bulk_id_selection) else: # Fallback: convert to IN condition condition = self.backend.build_condition('id', 'in', self._bulk_id_selection) results = await self.backend.select( table_name=table_name, conditions=[condition], limit=self.limit_value, offset=self.start_value, order_by=[self.order_by_value] if self.order_by_value else None ) else: # Build conditions using the backend conditions = [] for field, op, value in self.query_parts: condition = self.backend.build_condition(field, op, value) conditions.append(condition) # Use backend.select for querying results = await self.backend.select( table_name=table_name, conditions=conditions, limit=self.limit_value, offset=self.start_value, order_by=[self.order_by_value] if self.order_by_value else None ) if not results: return [] # Create one instance per result document processed_results = [self.document_class.from_db(doc, dereference=dereference) for doc in results] return processed_results # Execute with retry mechanism return await self._execute_with_retry("query_all", _execute_query)
[docs] def all_sync(self, dereference: bool = False) -> List[T]: """Execute the query and return all results synchronously. This method builds and executes the query, then converts the results to instances of the document class. Includes automatic retry on transient failures. Args: dereference: Whether to dereference references (default: False) Returns: List of document instances """ def _execute_query(): # For sync operations, we need to handle the backend differently # since most backend methods are async. For now, fall back to direct query query = self._build_query() results = self.connection.client.query(query) if not results or not results[0]: return [] # Create one instance per result document processed_results = [self.document_class.from_db(doc, dereference=dereference) for doc in results] return processed_results # Execute with retry mechanism return self._execute_with_retry_sync("query_all_sync", _execute_query)
[docs] async def count(self) -> int: """Count documents matching the query asynchronously. This method builds and executes a count query to count the number of documents matching the query. Includes automatic retry on transient failures. Returns: Number of matching documents """ async def _execute_count(): table_name = self.document_class._get_collection_name() # Build conditions using the backend conditions = [] for field, op, value in self.query_parts: condition = self.backend.build_condition(field, op, value) conditions.append(condition) # Use backend.count for counting return await self.backend.count(table_name, conditions) # Execute with retry mechanism return await self._execute_with_retry("query_count", _execute_count)
[docs] def count_sync(self) -> int: """Count documents matching the query synchronously. This method builds and executes a count query to count the number of documents matching the query. Includes automatic retry on transient failures. Returns: Number of matching documents """ def _execute_count(): count_query = f"SELECT count() FROM {self.document_class._get_collection_name()}" if self.query_parts: conditions = self._build_conditions() count_query += f" WHERE {' AND '.join(conditions)}" result = self.connection.client.query(count_query) if not result or not result[0]: return 0 return len(result) # Execute with retry mechanism return self._execute_with_retry_sync("query_count_sync", _execute_count)
[docs] async def get(self, dereference: bool = False, **kwargs: Any) -> T: """Get a single document matching the query asynchronously. This method applies filters and ensures that exactly one document is returned. Args: dereference: Whether to dereference references (default: False) **kwargs: Field names and values to filter by Returns: The matching document Raises: DoesNotExist: If no matching document is found MultipleObjectsReturned: If multiple matching documents are found """ self.filter(**kwargs) self.limit_value = 2 # Get 2 to check for multiple results = await self.all(dereference=dereference) if not results: raise DoesNotExist(f"{self.document_class.__name__} matching query does not exist.") if len(results) > 1: raise MultipleObjectsReturned(f"Multiple {self.document_class.__name__} objects returned instead of one") return results[0]
[docs] def get_sync(self, dereference: bool = False, **kwargs: Any) -> T: """Get a single document matching the query synchronously. This method applies filters and ensures that exactly one document is returned. Args: dereference: Whether to dereference references (default: False) **kwargs: Field names and values to filter by Returns: The matching document Raises: DoesNotExist: If no matching document is found MultipleObjectsReturned: If multiple matching documents are found """ self.filter(**kwargs) self.limit_value = 2 # Get 2 to check for multiple results = self.all_sync(dereference=dereference) if not results: raise DoesNotExist(f"{self.document_class.__name__} matching query does not exist.") if len(results) > 1: raise MultipleObjectsReturned(f"Multiple {self.document_class.__name__} objects returned instead of one") return results[0]
[docs] async def create(self, **kwargs: Any) -> T: """Create a new document asynchronously. This method creates a new document with the given field values. Includes automatic retry on transient failures. Args: **kwargs: Field names and values for the new document Returns: The created document """ async def _execute_create(): document = self.document_class(**kwargs) document.validate() # Convert to DB format data = document.to_db() # Use backend for insertion table_name = self.document_class._get_collection_name() result = await self.backend.insert(table_name, data) # Return new document instance from result return self.document_class.from_db(result) # Execute with retry mechanism return await self._execute_with_retry("query_create", _execute_create)
[docs] def create_sync(self, **kwargs: Any) -> T: """Create a new document synchronously. This method creates a new document with the given field values. Includes automatic retry on transient failures. Args: **kwargs: Field names and values for the new document Returns: The created document """ def _execute_create(): document = self.document_class(**kwargs) return document.save_sync(self.connection) # Execute with retry mechanism return self._execute_with_retry_sync("query_create_sync", _execute_create)
[docs] async def update(self, **kwargs: Any) -> List[T]: """Update documents matching the query asynchronously with performance optimizations. This method updates documents matching the query with the given field values. Uses direct record access for bulk ID operations for better performance. Args: **kwargs: Field names and values to update Returns: List of updated documents """ # PERFORMANCE OPTIMIZATION: Use direct record access for bulk operations if self._bulk_id_selection or self._id_range_selection: # For bulk operations, use subquery with direct record access for better performance optimized_query = self._build_direct_record_query() if optimized_query: # Convert SELECT to subquery for UPDATE subquery = optimized_query.replace("SELECT *", "SELECT id") update_query = f"UPDATE ({subquery}) SET {', '.join(f'{k} = {json.dumps(v)}' for k, v in kwargs.items())}" result = await self.connection.client.query(update_query) if not result: return [] # Handle different result structures if isinstance(result[0], dict): # Subquery UPDATE case: result is a flat list of documents return [self.document_class.from_db(doc) for doc in result] elif isinstance(result[0], list): # Normal case: result[0] is a list of document dictionaries return [self.document_class.from_db(doc) for doc in result[0]] else: return [] # Use backend for regular update operations table_name = self.document_class._get_collection_name() # Build conditions using the backend conditions = [] for field, op, value in self.query_parts: condition = self.backend.build_condition(field, op, value) conditions.append(condition) # Convert field values using field.to_db() if available backend_name = self.document_class._meta.get('backend', 'surrealdb') update_data = {} for field_name, value in kwargs.items(): if field_name in self.document_class._fields: field = self.document_class._fields[field_name] if hasattr(field, 'to_db'): # Pass backend parameter if supported if 'backend' in field.to_db.__code__.co_varnames: update_data[field_name] = field.to_db(value, backend=backend_name) else: update_data[field_name] = field.to_db(value) else: update_data[field_name] = value else: update_data[field_name] = value # Use backend.update for updating results = await self.backend.update(table_name, conditions, update_data) return [self.document_class.from_db(doc) for doc in results]
[docs] def update_sync(self, **kwargs: Any) -> List[T]: """Update documents matching the query synchronously with performance optimizations. This method updates documents matching the query with the given field values. Uses direct record access for bulk ID operations for better performance. Args: **kwargs: Field names and values to update Returns: List of updated documents """ # PERFORMANCE OPTIMIZATION: Use direct record access for bulk operations if self._bulk_id_selection or self._id_range_selection: # For bulk operations, use subquery with direct record access for better performance optimized_query = self._build_direct_record_query() if optimized_query: # Convert SELECT to subquery for UPDATE subquery = optimized_query.replace("SELECT *", "SELECT id") update_query = f"UPDATE ({subquery}) SET {', '.join(f'{k} = {json.dumps(v)}' for k, v in kwargs.items())}" result = self.connection.client.query(update_query) if not result: return [] # Handle different result structures if isinstance(result[0], dict): # Subquery UPDATE case: result is a flat list of documents return [self.document_class.from_db(doc) for doc in result] elif isinstance(result[0], list): # Normal case: result[0] is a list of document dictionaries return [self.document_class.from_db(doc) for doc in result[0]] else: return [] # Fall back to regular update query update_query = f"UPDATE {self.document_class._get_collection_name()}" if self.query_parts: conditions = self._build_conditions() update_query += f" WHERE {' AND '.join(conditions)}" update_query += f" SET {', '.join(f'{k} = {json.dumps(v)}' for k, v in kwargs.items())}" result = self.connection.client.query(update_query) if not result or not result[0]: return [] return [self.document_class.from_db(doc) for doc in result[0]]
[docs] async def delete(self) -> int: """Delete documents matching the query asynchronously with performance optimizations. This method deletes documents matching the query. Uses direct record access for bulk ID operations for better performance. Returns: Number of deleted documents """ # PERFORMANCE OPTIMIZATION: Use direct record access for bulk operations if self._bulk_id_selection: # Use direct record deletion syntax for bulk ID operations record_ids = [self._format_record_id(id_val) for id_val in self._bulk_id_selection] delete_query = f"DELETE {', '.join(record_ids)}" result = await self.connection.client.query(delete_query) # Direct record deletion returns empty list on success # Return the count of IDs we attempted to delete return len(record_ids) elif self._id_range_selection: # For range operations, use optimized query with subquery optimized_query = self._build_direct_record_query() if optimized_query: # Convert SELECT to subquery for DELETE subquery = optimized_query.replace("SELECT *", "SELECT id") delete_query = f"DELETE ({subquery})" result = await self.connection.client.query(delete_query) if not result or not result[0]: return 0 return len(result[0]) # Use backend for regular delete operations table_name = self.document_class._get_collection_name() # Build conditions using the backend conditions = [] for field, op, value in self.query_parts: condition = self.backend.build_condition(field, op, value) conditions.append(condition) # Use backend.delete for deleting return await self.backend.delete(table_name, conditions)
[docs] def delete_sync(self) -> int: """Delete documents matching the query synchronously with performance optimizations. This method deletes documents matching the query. Uses direct record access for bulk ID operations for better performance. Returns: Number of deleted documents """ # PERFORMANCE OPTIMIZATION: Use direct record access for bulk operations if self._bulk_id_selection: # Use direct record deletion syntax for bulk ID operations record_ids = [self._format_record_id(id_val) for id_val in self._bulk_id_selection] delete_query = f"DELETE {', '.join(record_ids)}" result = self.connection.client.query(delete_query) # Direct record deletion returns empty list on success # Return the count of IDs we attempted to delete return len(record_ids) elif self._id_range_selection: # For range operations, use optimized query with subquery optimized_query = self._build_direct_record_query() if optimized_query: # Convert SELECT to subquery for DELETE subquery = optimized_query.replace("SELECT *", "SELECT id") delete_query = f"DELETE ({subquery})" result = self.connection.client.query(delete_query) if not result or not result[0]: return 0 return len(result[0]) # Fall back to regular delete query delete_query = f"DELETE FROM {self.document_class._get_collection_name()}" if self.query_parts: conditions = self._build_conditions() delete_query += f" WHERE {' AND '.join(conditions)}" result = self.connection.client.query(delete_query) if not result or not result[0]: return 0 return len(result[0])
[docs] async def bulk_create(self, documents: List[T], batch_size: int = 1000, validate: bool = True, return_documents: bool = True) -> Union[List[T], int]: """Create multiple documents in a single operation asynchronously. This method creates multiple documents in a single operation, processing them in batches for better performance. It can optionally validate the documents and return the created documents. Args: documents: List of Document instances to create batch_size: Number of documents per batch (default: 1000) validate: Whether to validate documents (default: True) return_documents: Whether to return created documents (default: True) Returns: List of created documents with their IDs set if return_documents=True, otherwise returns the count of created documents """ if not documents: return [] if return_documents else 0 collection = self.document_class._get_collection_name() total_created = 0 created_docs = [] if return_documents else None # Process in batches for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] # Validate batch if required if validate: # Sequential validation since validate() is synchronous for doc in batch: doc.validate() # Separate documents with and without explicit IDs docs_without_ids = [] docs_with_ids = [] for doc in batch: if doc.id: docs_with_ids.append(doc) else: docs_without_ids.append(doc) # Handle documents without IDs using backend bulk insert if docs_without_ids: data = [doc.to_db() for doc in docs_without_ids] try: results = await self.backend.insert_many(collection, data) if return_documents and results: batch_docs = [self.document_class.from_db(doc_data) for doc_data in results] created_docs.extend(batch_docs) total_created += len(batch_docs) elif results: total_created += len(results) except Exception as e: logger.error(f"Error in bulk create batch (no IDs): {str(e)}") # Handle documents with explicit IDs using backend insert if docs_with_ids: docs_with_ids_data = [doc.to_db() for doc in docs_with_ids] try: results = await self.backend.insert_many(collection, docs_with_ids_data) if return_documents and results: batch_docs = [self.document_class.from_db(doc_data) for doc_data in results] if created_docs is not None: created_docs.extend(batch_docs) total_created += len(batch_docs) elif results: total_created += len(results) except Exception as e: logger.error(f"Error creating documents with IDs: {str(e)}") return created_docs if return_documents else total_created
[docs] def bulk_create_sync(self, documents: List[T], batch_size: int = 1000, validate: bool = True, return_documents: bool = True) -> Union[List[T], int]: """Create multiple documents in a single operation synchronously. This method creates multiple documents in a single operation, processing them in batches for better performance. It can optionally validate the documents and return the created documents. Args: documents: List of Document instances to create batch_size: Number of documents per batch (default: 1000) validate: Whether to validate documents (default: True) return_documents: Whether to return created documents (default: True) Returns: List of created documents with their IDs set if return_documents=True, otherwise returns the count of created documents """ if not documents: return [] if return_documents else 0 collection = self.document_class._get_collection_name() total_created = 0 created_docs = [] if return_documents else None # Process in batches for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] # Validate batch if required if validate: # Sequential validation for sync version for doc in batch: doc.validate() # Convert batch to DB representation data = [doc.to_db() for doc in batch] # Construct optimized bulk insert query query = f"INSERT INTO {collection} {json.dumps(data)};" # Execute batch insert try: result = self.connection.client.query(query) if return_documents and result and result[0]: # Process results if needed batch_docs = [self.document_class.from_db(doc_data) for doc_data in result[0]] created_docs.extend(batch_docs) total_created += len(batch_docs) elif result and result[0]: total_created += len(result[0]) except Exception as e: # Log error and continue with next batch logger.error(f"Error in bulk create batch: {str(e)}") continue return created_docs if return_documents else total_created
[docs] async def explain(self) -> List[Dict[str, Any]]: """Get query execution plan for performance analysis. This method appends EXPLAIN to the query to show how the database will execute it, helping identify performance bottlenecks. Returns: List of execution plan steps with details Example: plan = await User.objects.filter(age__lt=18).explain() print(f"Query will use: {plan[0]['operation']}") Raises: NotImplementedError: If backend doesn't support EXPLAIN queries """ if not self.backend.supports_explain(): raise NotImplementedError(f"EXPLAIN queries not supported by {self.backend.__class__.__name__}") # For explain, we still use the raw query since it's backend-specific query = self._build_query() + " EXPLAIN" result = await self.backend.execute_raw(query) return result[0] if result and result[0] else []
[docs] def explain_sync(self) -> List[Dict[str, Any]]: """Get query execution plan for performance analysis synchronously. Returns: List of execution plan steps with details """ query = self._build_query() + " EXPLAIN" result = self.connection.client.query(query) return result[0] if result and result[0] else []
[docs] def suggest_indexes(self) -> List[str]: """Suggest indexes based on current query patterns. Analyzes the current query conditions and suggests optimal indexes that could improve performance. Returns: List of suggested DEFINE INDEX statements Example: >>> suggestions = User.objects.filter(age__lt=18, city="NYC").suggest_indexes() >>> for suggestion in suggestions: ... print(f"Consider: {suggestion}") """ suggestions = [] collection_name = self.document_class._get_collection_name() # Analyze filter conditions analyzed_fields = set() for field, op, value in self.query_parts: if field != 'id' and field not in analyzed_fields: # ID doesn't need indexing analyzed_fields.add(field) if op in ('=', '!=', '>', '<', '>=', '<=', 'INSIDE', 'NOT INSIDE'): suggestions.append( f"DEFINE INDEX idx_{collection_name}_{field} ON {collection_name} FIELDS {field}" ) # Suggest compound indexes for multiple conditions if len(analyzed_fields) > 1: field_list = ', '.join(sorted(analyzed_fields)) suggestions.append( f"DEFINE INDEX idx_{collection_name}_compound ON {collection_name} FIELDS {field_list}" ) # Suggest order by indexes if self.order_by_value: order_field, _ = self.order_by_value if order_field not in analyzed_fields: suggestions.append( f"DEFINE INDEX idx_{collection_name}_{order_field} ON {collection_name} FIELDS {order_field}" ) return list(set(suggestions)) # Remove duplicates
def _get_backend(self): """Get the backend instance for this queryset's document class. Returns: Backend instance configured for this document """ backend_name = self.document_class._meta.get('backend', 'surrealdb') from ..backends import BackendRegistry from ..connection import ConnectionRegistry # Get the backend class backend_class = BackendRegistry.get_backend(backend_name) # Get the connection for this backend connection = ConnectionRegistry.get_default_connection(backend_name) # Return backend instance return backend_class(connection)