Source code for surrealengine.query.descriptor

from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from ..connection import ConnectionRegistry
from .base import QuerySet
from ..pagination import PaginationResult


[docs] class QuerySetDescriptor: """Descriptor that provides QuerySet access through Document.objects. This class is a descriptor that provides access to a QuerySet through the Document.objects attribute. It allows querying documents of a specific document class using the Document.objects attribute. Attributes: owner: The document class that owns this descriptor connection: The database connection to use for queries """
[docs] def __init__(self) -> None: """Initialize a new QuerySetDescriptor.""" self.owner: Optional[Type] = None self.connection: Optional[Any] = None
[docs] def __get__(self, obj: Any, owner: Type) -> 'QuerySetDescriptor': """Get the descriptor for the given owner. This method is called when the descriptor is accessed through an attribute of a class or instance. Args: obj: The instance the descriptor is accessed through, or None owner: The class the descriptor is accessed through Returns: The descriptor instance """ self.owner = owner # Don't set a default connection here, let each method get the appropriate connection self.connection = None return self
[docs] async def __call__(self, query=None, limit: Optional[int] = None, start: Optional[int] = None, page: Optional[tuple] = None, **kwargs: Any) -> List[Any]: """Allow direct filtering through call syntax asynchronously. This method allows calling the descriptor directly with filters or query objects to query the document class. It supports pagination through limit and start parameters or the page parameter. Args: query: Q object or QueryExpression for complex queries limit: Maximum number of results to return (for pagination) start: Number of results to skip (for pagination) page: Tuple of (page_number, page_size) for pagination **kwargs: Field names and values to filter by Returns: List of matching documents """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) # Apply query object if provided if query is not None: queryset = queryset.filter(query) # Apply filters and pagination if kwargs: queryset = queryset.filter(**kwargs) if page is not None: page_number, page_size = page queryset = queryset.page(page_number, page_size) else: if limit is not None: queryset = queryset.limit(limit) if start is not None: queryset = queryset.start(start) # Return results return await queryset.all()
[docs] def call_sync(self, query=None, limit: Optional[int] = None, start: Optional[int] = None, page: Optional[tuple] = None, **kwargs: Any) -> List[Any]: """Allow direct filtering through call syntax synchronously. This method allows calling the descriptor directly with filters or query objects to query the document class. It supports pagination through limit and start parameters or the page parameter. Args: query: Q object or QueryExpression for complex queries limit: Maximum number of results to return (for pagination) start: Number of results to skip (for pagination) page: Tuple of (page_number, page_size) for pagination **kwargs: Field names and values to filter by Returns: List of matching documents """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) # Apply query object if provided if query is not None: queryset = queryset.filter(query) # Apply filters and pagination if kwargs: queryset = queryset.filter(**kwargs) if page is not None: page_number, page_size = page queryset = queryset.page(page_number, page_size) else: if limit is not None: queryset = queryset.limit(limit) if start is not None: queryset = queryset.start(start) # Return results return queryset.all_sync()
[docs] async def get(self, **kwargs: Any) -> Any: """Allow direct get operation asynchronously. This method allows getting a single document matching the given filters. Args: **kwargs: Field names and values to filter by Returns: The matching document Raises: DoesNotExist: If no matching document is found MultipleObjectsReturned: If multiple matching documents are found """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.get(**kwargs)
[docs] def get_sync(self, **kwargs: Any) -> Any: """Allow direct get operation synchronously. This method allows getting a single document matching the given filters. Args: **kwargs: Field names and values to filter by Returns: The matching document Raises: DoesNotExist: If no matching document is found MultipleObjectsReturned: If multiple matching documents are found """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.get_sync(**kwargs)
[docs] def filter(self, query=None, **kwargs: Any) -> QuerySet: """Create a QuerySet with filters using the default async connection. This method creates a new QuerySet with the given filters using the default async connection. Args: query: Q object or QueryExpression for complex queries **kwargs: Field names and values to filter by Returns: A QuerySet with the given filters """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.filter(query=query, **kwargs)
[docs] def filter_sync(self, query=None, **kwargs: Any) -> QuerySet: """Create a QuerySet with filters using the default sync connection. This method creates a new QuerySet with the given filters using the default sync connection. Args: query: Q object or QueryExpression for complex queries **kwargs: Field names and values to filter by Returns: A QuerySet with the given filters """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.filter(query=query, **kwargs)
[docs] def limit(self, value: int) -> QuerySet: """Set the maximum number of results to return. Args: value: Maximum number of results Returns: A QuerySet with the limit applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.limit(value)
[docs] def limit_sync(self, value: int) -> QuerySet: """Set the maximum number of results to return using the default sync connection. Args: value: Maximum number of results Returns: A QuerySet with the limit applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.limit(value)
[docs] def start(self, value: int) -> QuerySet: """Set the number of results to skip (for pagination). Args: value: Number of results to skip Returns: A QuerySet with the start applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.start(value)
[docs] def start_sync(self, value: int) -> QuerySet: """Set the number of results to skip (for pagination) using the default sync connection. Args: value: Number of results to skip Returns: A QuerySet with the start applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.start(value)
[docs] def order_by(self, field: str, direction: str = 'ASC') -> QuerySet: """Set the field and direction to order results by. Args: field: Field name to order by direction: Direction to order by ('ASC' or 'DESC') Returns: A QuerySet with the order by applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.order_by(field, direction)
[docs] def order_by_sync(self, field: str, direction: str = 'ASC') -> QuerySet: """Set the field and direction to order results by using the default sync connection. Args: field: Field name to order by direction: Direction to order by ('ASC' or 'DESC') Returns: A QuerySet with the order by applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.order_by(field, direction)
[docs] def group_by(self, *fields: str) -> QuerySet: """Group the results by the specified fields. This method sets the fields to group the results by using the GROUP BY clause. Args: *fields: Field names to group by Returns: A QuerySet with the group by applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.group_by(*fields)
[docs] def group_by_sync(self, *fields: str) -> QuerySet: """Group the results by the specified fields using the default sync connection. This method sets the fields to group the results by using the GROUP BY clause. Args: *fields: Field names to group by Returns: A QuerySet with the group by applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.group_by(*fields)
[docs] def split(self, *fields: str) -> QuerySet: """Split the results by the specified fields. This method sets the fields to split the results by using the SPLIT clause. Args: *fields: Field names to split by Returns: A QuerySet with the split applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.split(*fields)
[docs] def split_sync(self, *fields: str) -> QuerySet: """Split the results by the specified fields using the default sync connection. This method sets the fields to split the results by using the SPLIT clause. Args: *fields: Field names to split by Returns: A QuerySet with the split applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.split(*fields)
[docs] def fetch(self, *fields: str) -> QuerySet: """Fetch related records for the specified fields. This method sets the fields to fetch related records for using the FETCH clause. Args: *fields: Field names to fetch related records for Returns: A QuerySet with the fetch applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.fetch(*fields)
[docs] def fetch_sync(self, *fields: str) -> QuerySet: """Fetch related records for the specified fields using the default sync connection. This method sets the fields to fetch related records for using the FETCH clause. Args: *fields: Field names to fetch related records for Returns: A QuerySet with the fetch applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.fetch(*fields)
[docs] async def first(self) -> Any: """Get the first result from the query asynchronously. Returns: The first matching document or None if no matches Raises: DoesNotExist: If no matching document is found """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.first()
[docs] def first_sync(self) -> Any: """Get the first result from the query synchronously. Returns: The first matching document or None if no matches Raises: DoesNotExist: If no matching document is found """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.first_sync()
[docs] def page(self, number: int, size: int) -> QuerySet: """Set pagination parameters using page number and size. Args: number: Page number (1-based, first page is 1) size: Number of items per page Returns: A QuerySet with pagination applied """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.page(number, size)
[docs] def page_sync(self, number: int, size: int) -> QuerySet: """Set pagination parameters using page number and size using the default sync connection. Args: number: Page number (1-based, first page is 1) size: Number of items per page Returns: A QuerySet with pagination applied """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.page(number, size)
[docs] async def paginate(self, page: int, per_page: int) -> 'PaginationResult': """Get a page of results with pagination metadata asynchronously. This method gets a page of results along with metadata about the pagination, such as the total number of items, the number of pages, and whether there are next or previous pages. Args: page: The page number (1-based) per_page: The number of items per page Returns: A PaginationResult containing the items and pagination metadata """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) # Return the paginated results return await queryset.paginate(page, per_page)
[docs] def paginate_sync(self, page: int, per_page: int) -> 'PaginationResult': """Get a page of results with pagination metadata synchronously. This method gets a page of results along with metadata about the pagination, such as the total number of items, the number of pages, and whether there are next or previous pages. Args: page: The page number (1-based) per_page: The number of items per page Returns: A PaginationResult containing the items and pagination metadata """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) # Return the paginated results return queryset.paginate_sync(page, per_page)
[docs] def aggregate(self): """Create an aggregation pipeline from this query. This method returns an AggregationPipeline instance that can be used to build and execute complex aggregation queries with multiple stages. Returns: An AggregationPipeline instance for building and executing aggregation queries. """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.aggregate()
[docs] def aggregate_sync(self): """Create an aggregation pipeline from this query using the default sync connection. This method returns an AggregationPipeline instance that can be used to build and execute complex aggregation queries with multiple stages. Returns: An AggregationPipeline instance for building and executing aggregation queries. """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.aggregate()
[docs] async def join(self, field_name: str, target_fields: Optional[List[str]] = None, dereference: bool = True, dereference_depth: int = 1) -> List[Any]: """Perform a JOIN-like operation on a reference field. This method performs a JOIN-like operation on a reference field by using SurrealDB's graph traversal capabilities. It retrieves the referenced documents and replaces the reference IDs with the actual documents. Args: field_name: The name of the reference field to join on target_fields: Optional list of fields to select from the target document dereference: Whether to dereference references in the joined documents (default: True) dereference_depth: Maximum depth of reference resolution (default: 1) Returns: List of documents with joined data Raises: ValueError: If the field is not a ReferenceField """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.join(field_name, target_fields, dereference=dereference, dereference_depth=dereference_depth)
[docs] def join_sync(self, field_name: str, target_fields: Optional[List[str]] = None, dereference: bool = True, dereference_depth: int = 1) -> List[Any]: """Perform a JOIN-like operation on a reference field synchronously. This method performs a JOIN-like operation on a reference field by using SurrealDB's graph traversal capabilities. It retrieves the referenced documents and replaces the reference IDs with the actual documents. Args: field_name: The name of the reference field to join on target_fields: Optional list of fields to select from the target document dereference: Whether to dereference references in the joined documents (default: True) dereference_depth: Maximum depth of reference resolution (default: 1) Returns: List of documents with joined data Raises: ValueError: If the field is not a ReferenceField """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.join_sync(field_name, target_fields, dereference=dereference, dereference_depth=dereference_depth)
[docs] def get_many(self, ids: List[Union[str, Any]]) -> QuerySet: """Get multiple records by IDs using optimized direct record access. This method uses SurrealDB's direct record selection syntax for better performance compared to WHERE clause filtering. Args: ids: List of record IDs (can be strings or other ID types) Returns: The query set instance configured for direct record access """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.get_many(ids)
[docs] def get_many_sync(self, ids: List[Union[str, Any]]) -> QuerySet: """Get multiple records by IDs using optimized direct record access synchronously. Args: ids: List of record IDs (can be strings or other ID types) Returns: The query set instance configured for direct record access """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.get_many(ids)
[docs] def get_range(self, start_id: Union[str, Any], end_id: Union[str, Any], inclusive: bool = True) -> QuerySet: """Get a range of records by ID using optimized range syntax. This method uses SurrealDB's range selection syntax for better performance compared to WHERE clause filtering. Args: start_id: Starting ID of the range end_id: Ending ID of the range inclusive: Whether the range is inclusive (default: True) Returns: The query set instance configured for range access """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return queryset.get_range(start_id, end_id, inclusive)
[docs] def get_range_sync(self, start_id: Union[str, Any], end_id: Union[str, Any], inclusive: bool = True) -> QuerySet: """Get a range of records by ID using optimized range syntax synchronously. Args: start_id: Starting ID of the range end_id: Ending ID of the range inclusive: Whether the range is inclusive (default: True) Returns: The query set instance configured for range access """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.get_range(start_id, end_id, inclusive)
[docs] async def bulk_create(self, documents: List[Any], batch_size: int = 1000, validate: bool = True, return_documents: bool = True) -> Union[List[Any], int]: """Create multiple documents in a single operation asynchronously. This method creates multiple documents in a single operation, processing them in batches for better performance. Args: documents: List of Document instances to create batch_size: Number of documents per batch (default: 1000) validate: Whether to validate documents (default: True) return_documents: Whether to return created documents (default: True) Returns: List of created documents with their IDs set if return_documents=True, otherwise returns the count of created documents """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.bulk_create(documents, batch_size, validate, return_documents)
[docs] def bulk_create_sync(self, documents: List[Any], batch_size: int = 1000, validate: bool = True, return_documents: bool = True) -> Union[List[Any], int]: """Create multiple documents in a single operation synchronously. Args: documents: List of Document instances to create batch_size: Number of documents per batch (default: 1000) validate: Whether to validate documents (default: True) return_documents: Whether to return created documents (default: True) Returns: List of created documents with their IDs set if return_documents=True, otherwise returns the count of created documents """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.bulk_create_sync(documents, batch_size, validate, return_documents)
[docs] async def all(self) -> List[Any]: """Execute the query and return all results asynchronously. Returns: List of all documents matching any implicit query """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.all()
[docs] def all_sync(self) -> List[Any]: """Execute the query and return all results synchronously. Returns: List of all documents matching any implicit query """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.all_sync()
[docs] async def count(self) -> int: """Count all documents asynchronously. Returns: Number of documents """ # Get the default async connection connection = ConnectionRegistry.get_default_connection(async_mode=True) queryset = QuerySet(self.owner, connection) return await queryset.count()
[docs] def count_sync(self) -> int: """Count all documents synchronously. Returns: Number of documents """ # Get the default sync connection connection = ConnectionRegistry.get_default_connection(async_mode=False) queryset = QuerySet(self.owner, connection) return queryset.count_sync()