import json
import asyncio
import logging
from typing import Any, Dict, List, Optional, Type, Union, Callable
from .exceptions import DoesNotExist, MultipleObjectsReturned
from surrealdb import RecordID
from .base_query import BaseQuerySet
# Set up logging
logger = logging.getLogger(__name__)
[docs]
class SchemalessQuerySet(BaseQuerySet):
"""QuerySet for schemaless operations.
This class provides a query builder for tables without a predefined schema.
It extends BaseQuerySet to provide methods for querying and manipulating
documents in a schemaless manner.
Attributes:
table_name: The name of the table to query
connection: The database connection to use for queries
"""
[docs]
def __init__(self, table_name: str, connection: Any) -> None:
"""Initialize a new SchemalessQuerySet.
Args:
table_name: The name of the table to query
connection: The database connection to use for queries
"""
super().__init__(connection)
self.table_name = table_name
[docs]
async def all(self) -> List[Any]:
"""Execute the query and return all results asynchronously.
This method builds and executes the query, then processes the results
based on whether a matching document class is found. If a matching
document class is found, the results are converted to instances of that
class. Otherwise, they are converted to SimpleNamespace objects.
Returns:
List of results, either document instances or SimpleNamespace objects
"""
query = self._build_query()
results = await self.connection.client.query(query)
if not results or not results[0]:
return []
# If we have a document class in the connection's database mapping, use it
from .document import Document # Import at the top of the file
doc_class = None
# Find matching document class
for cls in Document.__subclasses__():
if hasattr(cls, '_meta') and cls._meta.get('collection') == self.table_name:
doc_class = cls
break
# Process results based on whether we found a matching document class
processed_results = []
if doc_class:
for doc_data in results: # results[0] contains the actual data
instance = doc_class.from_db(doc_data)
processed_results.append(instance)
else:
# If no matching document class, create dynamic objects
from types import SimpleNamespace
for doc_data in results:
# Check if doc_data is a dictionary, if not try to convert or skip
if isinstance(doc_data, dict):
instance = SimpleNamespace(**doc_data)
else:
# If it's a string, try to use it as a name attribute
instance = SimpleNamespace(name=str(doc_data))
processed_results.append(instance)
return processed_results
[docs]
def all_sync(self) -> List[Any]:
"""Execute the query and return all results synchronously.
This method builds and executes the query, then processes the results
based on whether a matching document class is found. If a matching
document class is found, the results are converted to instances of that
class. Otherwise, they are converted to SimpleNamespace objects.
Returns:
List of results, either document instances or SimpleNamespace objects
"""
query = self._build_query()
results = self.connection.client.query(query)
if not results or not results[0]:
return []
# If we have a document class in the connection's database mapping, use it
from .document import Document # Import at the top of the file
doc_class = None
# Find matching document class
for cls in Document.__subclasses__():
if hasattr(cls, '_meta') and cls._meta.get('collection') == self.table_name:
doc_class = cls
break
# Process results based on whether we found a matching document class
processed_results = []
if doc_class:
for doc_data in results: # results[0] contains the actual data
instance = doc_class.from_db(doc_data)
processed_results.append(instance)
else:
# If no matching document class, create dynamic objects
from types import SimpleNamespace
for doc_data in results:
# Check if doc_data is a dictionary, if not try to convert or skip
if isinstance(doc_data, dict):
instance = SimpleNamespace(**doc_data)
else:
# If it's a string, try to use it as a name attribute
instance = SimpleNamespace(name=str(doc_data))
processed_results.append(instance)
return processed_results
[docs]
async def get(self, **kwargs: Any) -> Any:
"""Get a single document matching the query asynchronously.
This method provides special handling for ID-based lookups, using the
direct select method with RecordID. For non-ID lookups, it falls back
to the base class implementation.
Args:
**kwargs: Field names and values to filter by
Returns:
The matching document
Raises:
DoesNotExist: If no matching document is found
MultipleObjectsReturned: If multiple matching documents are found
"""
# Special handling for ID-based lookup
if len(kwargs) == 1 and 'id' in kwargs:
id_value = kwargs['id']
# Handle both full and short ID formats
if ':' in str(id_value):
record_id = id_value.split(':')[1]
else:
record_id = id_value
# Use direct select with RecordID
result = await self.connection.client.select(RecordID(self.table_name, record_id))
if not result or result == self.table_name: # Check for the table name response
raise DoesNotExist(f"Object in table '{self.table_name}' matching query does not exist.")
# Handle the result appropriately
if isinstance(result, list):
return result[0] if result else None
return result
# For non-ID lookups, use the base class implementation
return await super().get(**kwargs)
[docs]
def get_sync(self, **kwargs: Any) -> Any:
"""Get a single document matching the query synchronously.
This method provides special handling for ID-based lookups, using the
direct select method with RecordID. For non-ID lookups, it falls back
to the base class implementation.
Args:
**kwargs: Field names and values to filter by
Returns:
The matching document
Raises:
DoesNotExist: If no matching document is found
MultipleObjectsReturned: If multiple matching documents are found
"""
# Special handling for ID-based lookup
if len(kwargs) == 1 and 'id' in kwargs:
id_value = kwargs['id']
# Handle both full and short ID formats
if ':' in str(id_value):
record_id = id_value.split(':')[1]
else:
record_id = id_value
# Use direct select with RecordID
result = self.connection.client.select(RecordID(self.table_name, record_id))
if not result or result == self.table_name: # Check for the table name response
raise DoesNotExist(f"Object in table '{self.table_name}' matching query does not exist.")
# Handle the result appropriately
if isinstance(result, list):
return result[0] if result else None
return result
# For non-ID lookups, use the base class implementation
return super().get_sync(**kwargs)
def _build_query(self) -> str:
"""Build the query string.
This method builds the query string for the schemaless query, handling
special cases for ID fields. It processes the query_parts to handle
both full and short ID formats.
Returns:
The query string
"""
query = f"SELECT * FROM {self.table_name}"
if self.query_parts:
# Process special ID handling first
processed_query_parts = []
for field, op, value in self.query_parts:
if field == 'id' and isinstance(value, str):
# Handle record IDs specially
if ':' in value:
# Full record ID format (table:id)
processed_query_parts.append(('id', '=', value))
else:
# Short ID format (just id)
processed_query_parts.append(('id', '=', f'{self.table_name}:{value}'))
else:
processed_query_parts.append((field, op, value))
# Save the original query_parts
original_query_parts = self.query_parts
# Use the processed query_parts for building conditions
self.query_parts = processed_query_parts
conditions = self._build_conditions()
# Restore the original query_parts
self.query_parts = original_query_parts
query += f" WHERE {' AND '.join(conditions)}"
# Add other clauses from _build_clauses
clauses = self._build_clauses()
for clause_name, clause_sql in clauses.items():
if clause_name != 'WHERE': # WHERE clause is already handled
query += f" {clause_sql}"
return query
[docs]
async def bulk_create(self, documents: List[Dict[str, Any]], batch_size: int = 1000,
return_documents: bool = True) -> Union[List[Any], int]:
"""Create multiple documents in a single operation asynchronously.
This method creates multiple documents in a single operation, processing
them in batches for better performance. It can optionally return the created documents.
Args:
documents: List of dictionaries representing documents to create
batch_size: Number of documents per batch (default: 1000)
return_documents: Whether to return created documents (default: True)
Returns:
List of created documents with their IDs set if return_documents=True,
otherwise returns the count of created documents
"""
if not documents:
return [] if return_documents else 0
total_created = 0
created_docs = [] if return_documents else None
# Process in batches
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Construct optimized bulk insert query
query = f"INSERT INTO {self.table_name} {json.dumps(batch)};"
# Execute batch insert
try:
result = await self.connection.client.query(query)
if return_documents and result and result[0]:
# Process results if needed
from types import SimpleNamespace
batch_docs = []
for doc_data in result[0]:
if isinstance(doc_data, dict):
instance = SimpleNamespace(**doc_data)
else:
instance = SimpleNamespace(name=str(doc_data))
batch_docs.append(instance)
created_docs.extend(batch_docs)
total_created += len(batch_docs)
elif result and result[0]:
total_created += len(result[0])
except Exception as e:
# Log error and continue with next batch
logger.error(f"Error in bulk create batch: {str(e)}")
continue
return created_docs if return_documents else total_created
[docs]
def bulk_create_sync(self, documents: List[Dict[str, Any]], batch_size: int = 1000,
return_documents: bool = True) -> Union[List[Any], int]:
"""Create multiple documents in a single operation synchronously.
This method creates multiple documents in a single operation, processing
them in batches for better performance. It can optionally return the created documents.
Args:
documents: List of dictionaries representing documents to create
batch_size: Number of documents per batch (default: 1000)
return_documents: Whether to return created documents (default: True)
Returns:
List of created documents with their IDs set if return_documents=True,
otherwise returns the count of created documents
"""
if not documents:
return [] if return_documents else 0
total_created = 0
created_docs = [] if return_documents else None
# Process in batches
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Construct optimized bulk insert query
query = f"INSERT INTO {self.table_name} {json.dumps(batch)};"
# Execute batch insert
try:
result = self.connection.client.query(query)
if return_documents and result and result[0]:
# Process results if needed
from types import SimpleNamespace
batch_docs = []
for doc_data in result[0]:
if isinstance(doc_data, dict):
instance = SimpleNamespace(**doc_data)
else:
instance = SimpleNamespace(name=str(doc_data))
batch_docs.append(instance)
created_docs.extend(batch_docs)
total_created += len(batch_docs)
elif result and result[0]:
total_created += len(result[0])
except Exception as e:
# Log error and continue with next batch
logger.error(f"Error in bulk create batch: {str(e)}")
continue
return created_docs if return_documents else total_created
[docs]
class SchemalessTable:
"""Dynamic table accessor.
This class provides access to a specific table in the database without
requiring a predefined schema. It allows querying the table using the
objects property or by calling the instance directly with filters.
Attributes:
name: The name of the table
connection: The database connection to use for queries
"""
[docs]
def __init__(self, name: str, connection: Any) -> None:
"""Initialize a new SchemalessTable.
Args:
name: The name of the table
connection: The database connection to use for queries
"""
self.name = name
self.connection = connection
[docs]
async def relate(self, from_id: Union[str, RecordID], relation: str,
to_id: Union[str, RecordID], **attrs: Any) -> Optional[Any]:
"""Create a relation between two records asynchronously.
This method creates a relation between two records in the database.
It constructs a RELATE query with the given relation name and attributes.
Args:
from_id: The ID of the record to create the relation from
relation: The name of the relation
to_id: The ID of the record to create the relation to
**attrs: Attributes to set on the relation
Returns:
The created relation record or None if creation failed
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = from_id
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
table, id_part = from_id.split(':', 1)
from_record = RecordID(table, id_part)
else:
# Assume it's from the current table
from_record = RecordID(self.name, from_id)
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = to_id
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
table, id_part = to_id.split(':', 1)
to_record = RecordID(table, id_part)
else:
# Assume it's from the current table
to_record = RecordID(self.name, to_id)
# Construct the relation query using the RecordID objects
query = f"RELATE {from_record}->{relation}->{to_record}"
# Add attributes if provided
if attrs:
attrs_str = ", ".join([f"{k}: {json.dumps(v)}" for k, v in attrs.items()])
query += f" CONTENT {{ {attrs_str} }}"
result = await self.connection.client.query(query)
# Return the relation record
if result and result[0]:
return result[0]
return None
[docs]
def relate_sync(self, from_id: Union[str, RecordID], relation: str,
to_id: Union[str, RecordID], **attrs: Any) -> Optional[Any]:
"""Create a relation between two records synchronously.
This method creates a relation between two records in the database.
It constructs a RELATE query with the given relation name and attributes.
Args:
from_id: The ID of the record to create the relation from
relation: The name of the relation
to_id: The ID of the record to create the relation to
**attrs: Attributes to set on the relation
Returns:
The created relation record or None if creation failed
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = from_id
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
table, id_part = from_id.split(':', 1)
from_record = RecordID(table, id_part)
else:
# Assume it's from the current table
from_record = RecordID(self.name, from_id)
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = to_id
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
table, id_part = to_id.split(':', 1)
to_record = RecordID(table, id_part)
else:
# Assume it's from the current table
to_record = RecordID(self.name, to_id)
# Construct the relation query using the RecordID objects
query = f"RELATE {from_record}->{relation}->{to_record}"
# Add attributes if provided
if attrs:
attrs_str = ", ".join([f"{k}: {json.dumps(v)}" for k, v in attrs.items()])
query += f" CONTENT {{ {attrs_str} }}"
result = self.connection.client.query(query)
# Return the relation record
if result and result[0]:
return result[0]
return None
[docs]
async def update_relation(self, from_id: Union[str, RecordID], relation: str,
to_id: Union[str, RecordID], **attrs: Any) -> Optional[Any]:
"""Update an existing relation asynchronously.
This method updates an existing relation between two records in the database.
If the relation doesn't exist, it creates it.
Args:
from_id: The ID of the record the relation is from
relation: The name of the relation
to_id: The ID of the record the relation is to
**attrs: Attributes to update on the relation
Returns:
The updated relation record or None if update failed
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = str(from_id)
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
from_record = from_id
else:
# Assume it's from the current table
from_record = f"{self.name}:{from_id}"
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = str(to_id)
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
to_record = to_id
else:
# Assume it's from the current table
to_record = f"{self.name}:{to_id}"
# Query the relation first
relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_record)} AND out = {json.dumps(to_record)}"
relation_result = await self.connection.client.query(relation_query)
if not relation_result or not relation_result[0]:
# If relation doesn't exist, create it
return await self.relate(from_id, relation, to_id, **attrs)
# Get relation ID and update
relation_id = relation_result[0][0]['id']
update_query = f"UPDATE {relation_id} SET"
# Add attributes
updates = []
for key, value in attrs.items():
updates.append(f" {key} = {json.dumps(value)}")
update_query += ",".join(updates)
result = await self.connection.client.query(update_query)
if result and result[0]:
return result[0][0]
return None
[docs]
def update_relation_sync(self, from_id: Union[str, RecordID], relation: str,
to_id: Union[str, RecordID], **attrs: Any) -> Optional[Any]:
"""Update an existing relation synchronously.
This method updates an existing relation between two records in the database.
If the relation doesn't exist, it creates it.
Args:
from_id: The ID of the record the relation is from
relation: The name of the relation
to_id: The ID of the record the relation is to
**attrs: Attributes to update on the relation
Returns:
The updated relation record or None if update failed
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = str(from_id)
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
from_record = from_id
else:
# Assume it's from the current table
from_record = f"{self.name}:{from_id}"
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = str(to_id)
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
to_record = to_id
else:
# Assume it's from the current table
to_record = f"{self.name}:{to_id}"
# Query the relation first
relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_record)} AND out = {json.dumps(to_record)}"
relation_result = self.connection.client.query(relation_query)
if not relation_result or not relation_result[0]:
# If relation doesn't exist, create it
return self.relate_sync(from_id, relation, to_id, **attrs)
# Get relation ID and update
relation_id = relation_result[0][0]['id']
update_query = f"UPDATE {relation_id} SET"
# Add attributes
updates = []
for key, value in attrs.items():
updates.append(f" {key} = {json.dumps(value)}")
update_query += ",".join(updates)
result = self.connection.client.query(update_query)
if result and result[0]:
return result[0][0]
return None
[docs]
async def delete_relation(self, from_id: Union[str, RecordID], relation: str,
to_id: Optional[Union[str, RecordID]] = None) -> int:
"""Delete a relation asynchronously.
This method deletes a relation between two records in the database.
If to_id is not provided, it deletes all relations from from_id.
Args:
from_id: The ID of the record the relation is from
relation: The name of the relation
to_id: The ID of the record the relation is to (optional)
Returns:
Number of deleted relations
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = str(from_id)
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
from_record = from_id
else:
# Assume it's from the current table
from_record = f"{self.name}:{from_id}"
# Construct the delete query
if to_id:
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = str(to_id)
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
to_record = to_id
else:
# Assume it's from the current table
to_record = f"{self.name}:{to_id}"
# Delete specific relation
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_record)} AND out = {json.dumps(to_record)}"
else:
# Delete all relations from this record
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_record)}"
result = await self.connection.client.query(query)
if result and result[0]:
return len(result[0])
return 0
[docs]
def delete_relation_sync(self, from_id: Union[str, RecordID], relation: str,
to_id: Optional[Union[str, RecordID]] = None) -> int:
"""Delete a relation synchronously.
This method deletes a relation between two records in the database.
If to_id is not provided, it deletes all relations from from_id.
Args:
from_id: The ID of the record the relation is from
relation: The name of the relation
to_id: The ID of the record the relation is to (optional)
Returns:
Number of deleted relations
"""
# Handle both string and RecordID types for from_id
if isinstance(from_id, RecordID):
from_record = str(from_id)
else:
# If it's a string, check if it includes the table name
if ':' in from_id:
from_record = from_id
else:
# Assume it's from the current table
from_record = f"{self.name}:{from_id}"
# Construct the delete query
if to_id:
# Handle both string and RecordID types for to_id
if isinstance(to_id, RecordID):
to_record = str(to_id)
else:
# If it's a string, check if it includes the table name
if ':' in to_id:
to_record = to_id
else:
# Assume it's from the current table
to_record = f"{self.name}:{to_id}"
# Delete specific relation
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_record)} AND out = {json.dumps(to_record)}"
else:
# Delete all relations from this record
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_record)}"
result = self.connection.client.query(query)
if result and result[0]:
return len(result[0])
return 0
[docs]
async def create_index(self, index_name: str, fields: List[str], unique: bool = False,
search: bool = False, analyzer: Optional[str] = None,
comment: Optional[str] = None) -> None:
"""Create an index on this table asynchronously.
Args:
index_name: Name of the index
fields: List of field names to include in the index
unique: Whether the index should enforce uniqueness
search: Whether the index is a search index
analyzer: Analyzer to use for search indexes
comment: Optional comment for the index
"""
fields_str = ", ".join(fields)
# Build the index definition
query = f"DEFINE INDEX {index_name} ON {self.name} FIELDS {fields_str}"
# Add index type
if unique:
query += " UNIQUE"
elif search and analyzer:
query += f" SEARCH ANALYZER {analyzer}"
# Add comment if provided
if comment:
query += f" COMMENT '{comment}'"
# Execute the query
await self.connection.client.query(query)
[docs]
def create_index_sync(self, index_name: str, fields: List[str], unique: bool = False,
search: bool = False, analyzer: Optional[str] = None,
comment: Optional[str] = None) -> None:
"""Create an index on this table synchronously.
Args:
index_name: Name of the index
fields: List of field names to include in the index
unique: Whether the index should enforce uniqueness
search: Whether the index is a search index
analyzer: Analyzer to use for search indexes
comment: Optional comment for the index
"""
fields_str = ", ".join(fields)
# Build the index definition
query = f"DEFINE INDEX {index_name} ON {self.name} FIELDS {fields_str}"
# Add index type
if unique:
query += " UNIQUE"
elif search and analyzer:
query += f" SEARCH ANALYZER {analyzer}"
# Add comment if provided
if comment:
query += f" COMMENT '{comment}'"
# Execute the query
self.connection.client.query(query)
@property
def objects(self) -> SchemalessQuerySet:
"""Get a query set for this table.
Returns:
A SchemalessQuerySet for querying this table
"""
return SchemalessQuerySet(self.name, self.connection)
[docs]
async def __call__(self, limit: Optional[int] = None, start: Optional[int] = None,
page: Optional[tuple] = None, **kwargs: Any) -> List[Any]:
"""Query the table with filters asynchronously.
This method allows calling the table instance directly with filters
to query the table. It supports pagination through limit and start parameters
or the page parameter. It returns the results as SimpleNamespace objects
if they aren't already Document instances.
Args:
limit: Maximum number of results to return (for pagination)
start: Number of results to skip (for pagination)
page: Tuple of (page_number, page_size) for pagination
**kwargs: Field names and values to filter by
Returns:
List of results, either document instances or SimpleNamespace objects
"""
queryset = SchemalessQuerySet(self.name, self.connection)
# Apply filters
queryset = queryset.filter(**kwargs)
# Apply pagination
if page is not None:
page_number, page_size = page
queryset = queryset.page(page_number, page_size)
else:
if limit is not None:
queryset = queryset.limit(limit)
if start is not None:
queryset = queryset.start(start)
# Execute query
results = await queryset.all()
# Convert results to SimpleNamespace objects if they aren't already Document instances
if results and not hasattr(results[0], '_data'): # Check if it's not a Document instance
from types import SimpleNamespace
results = [SimpleNamespace(**result) if isinstance(result, dict) else result
for result in results]
return results
[docs]
def call_sync(self, limit: Optional[int] = None, start: Optional[int] = None,
page: Optional[tuple] = None, **kwargs: Any) -> List[Any]:
"""Query the table with filters synchronously.
This method allows calling the table synchronously with filters
to query the table. It supports pagination through limit and start parameters
or the page parameter. It returns the results as SimpleNamespace objects
if they aren't already Document instances.
Args:
limit: Maximum number of results to return (for pagination)
start: Number of results to skip (for pagination)
page: Tuple of (page_number, page_size) for pagination
**kwargs: Field names and values to filter by
Returns:
List of results, either document instances or SimpleNamespace objects
"""
queryset = SchemalessQuerySet(self.name, self.connection)
# Apply filters
queryset = queryset.filter(**kwargs)
# Apply pagination
if page is not None:
page_number, page_size = page
queryset = queryset.page(page_number, page_size)
else:
if limit is not None:
queryset = queryset.limit(limit)
if start is not None:
queryset = queryset.start(start)
# Execute query
results = queryset.all_sync()
# Convert results to SimpleNamespace objects if they aren't already Document instances
if results and not hasattr(results[0], '_data'): # Check if it's not a Document instance
from types import SimpleNamespace
results = [SimpleNamespace(**result) if isinstance(result, dict) else result
for result in results]
return results
[docs]
async def transaction(self, coroutines: List[Callable]) -> List[Any]:
"""Execute multiple operations in a transaction asynchronously.
This method executes a list of coroutines within a transaction,
committing the transaction if all operations succeed or canceling
it if any operation fails.
Args:
coroutines: List of coroutines to execute in the transaction
Returns:
List of results from the coroutines
Raises:
Exception: If any operation in the transaction fails
"""
await self.connection.client.query("BEGIN TRANSACTION;")
try:
results = []
for coro in coroutines:
result = await coro
results.append(result)
await self.connection.client.query("COMMIT TRANSACTION;")
return results
except Exception as e:
await self.connection.client.query("CANCEL TRANSACTION;")
raise e
[docs]
def transaction_sync(self, callables: List[Callable]) -> List[Any]:
"""Execute multiple operations in a transaction synchronously.
This method executes a list of callables within a transaction,
committing the transaction if all operations succeed or canceling
it if any operation fails.
Args:
callables: List of callables to execute in the transaction
Returns:
List of results from the callables
Raises:
Exception: If any operation in the transaction fails
"""
self.connection.client.query("BEGIN TRANSACTION;")
try:
results = []
for func in callables:
result = func()
results.append(result)
self.connection.client.query("COMMIT TRANSACTION;")
return results
except Exception as e:
self.connection.client.query("CANCEL TRANSACTION;")
raise e
[docs]
async def bulk_create(self, documents: List[Dict[str, Any]], batch_size: int = 1000,
return_documents: bool = True) -> Union[List[Any], int]:
"""Create multiple documents in a single operation asynchronously.
This method creates multiple documents in a single operation, processing
them in batches for better performance. It can optionally return the created documents.
Args:
documents: List of dictionaries representing documents to create
batch_size: Number of documents per batch (default: 1000)
return_documents: Whether to return created documents (default: True)
Returns:
List of created documents with their IDs set if return_documents=True,
otherwise returns the count of created documents
"""
return await self.objects.bulk_create(documents, batch_size, return_documents)
[docs]
def bulk_create_sync(self, documents: List[Dict[str, Any]], batch_size: int = 1000,
return_documents: bool = True) -> Union[List[Any], int]:
"""Create multiple documents in a single operation synchronously.
This method creates multiple documents in a single operation, processing
them in batches for better performance. It can optionally return the created documents.
Args:
documents: List of dictionaries representing documents to create
batch_size: Number of documents per batch (default: 1000)
return_documents: Whether to return created documents (default: True)
Returns:
List of created documents with their IDs set if return_documents=True,
otherwise returns the count of created documents
"""
return self.objects.bulk_create_sync(documents, batch_size, return_documents)
[docs]
class SurrealEngine:
"""Dynamic database accessor.
This class provides dynamic access to tables in the database without
requiring predefined schemas. It allows accessing tables as attributes
of the instance.
Attributes:
connection: The database connection to use for queries
is_async: Whether the connection is asynchronous
"""
[docs]
def __init__(self, connection: Any) -> None:
"""Initialize a new SurrealEngine.
Args:
connection: The database connection to use for queries
"""
self.connection = connection
# Determine if the connection is async or sync
from .connection import SurrealEngineAsyncConnection
self.is_async = isinstance(connection, SurrealEngineAsyncConnection)
[docs]
def __getattr__(self, name: str) -> SchemalessTable:
"""Get a table accessor for the given table name.
This method allows accessing tables as attributes of the instance.
Args:
name: The name of the table
Returns:
A SchemalessTable for accessing the table
"""
return SchemalessTable(name, self.connection)