import json
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from surrealdb import RecordID
[docs]
class RelationQuerySet:
"""Query set specifically for graph relations.
This class provides methods for querying and manipulating graph relations
between documents in the database. It allows creating, retrieving, updating,
and deleting relations between documents.
Attributes:
from_document: The document class the relation is from
connection: The database connection to use for queries
relation: The name of the relation
query_parts: List of query parts
"""
[docs]
def __init__(self, from_document: Type, connection: Any, relation: Optional[str] = None) -> None:
"""Initialize a new RelationQuerySet.
Args:
from_document: The document class the relation is from
connection: The database connection to use for queries
relation: The name of the relation
"""
self.from_document = from_document
self.connection = connection
self.relation = relation
self.query_parts: List[Any] = []
self.backend = self._get_backend()
def _get_backend(self):
"""Get the backend instance for this relation's document class.
Returns:
Backend instance configured for this document
"""
backend_name = self.from_document._meta.get('backend', 'surrealdb')
from ..backends import BackendRegistry
from ..connection import ConnectionRegistry
# Get the backend class
backend_class = BackendRegistry.get_backend(backend_name)
# Get the connection for this backend
connection = ConnectionRegistry.get_default_connection(backend_name)
# Return backend instance
return backend_class(connection)
[docs]
async def relate(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]:
"""Create a relation between two instances asynchronously.
This method creates a relation between two document instances in the database.
It constructs a RELATE query with the given relation name and attributes.
Args:
from_instance: The instance to create the relation from
to_instance: The instance to create the relation to
**attrs: Attributes to set on the relation
Returns:
The created relation record or None if creation failed
Raises:
ValueError: If either instance is not saved or if no relation name is specified
"""
if not from_instance.id:
raise ValueError(f"Cannot create relation from unsaved {self.from_document.__name__}")
to_class = to_instance.__class__
if not to_instance.id:
raise ValueError(f"Cannot create relation to unsaved {to_class.__name__}")
# Handle both string and RecordID types for IDs
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id).split(':')[1]
from_collection = from_instance.id.table_name
else:
from_id = from_instance.id.split(':')[1] if ':' in from_instance.id else from_instance.id
from_collection = self.from_document._get_collection_name()
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id).split(':')[1]
to_collection = to_instance.id.table_name
else:
to_id = to_instance.id.split(':')[1] if ':' in to_instance.id else to_instance.id
to_collection = to_class._get_collection_name()
# Create RecordID objects with the correct collection names and IDs
from_record = RecordID(from_collection, from_id)
to_record = RecordID(to_collection, to_id)
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Check if backend supports graph relations
if not self.backend.supports_graph_relations():
raise NotImplementedError(f"Backend {self.backend.__class__.__name__} does not support graph relations")
# Use backend to create relation
result_data = await self.backend.create_relation(
from_collection, from_id, relation, to_collection, to_id, attrs
)
# Return the relation record data directly
return result_data
[docs]
def relate_sync(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]:
"""Create a relation between two instances synchronously.
This method creates a relation between two document instances in the database.
It constructs a RELATE query with the given relation name and attributes.
Args:
from_instance: The instance to create the relation from
to_instance: The instance to create the relation to
**attrs: Attributes to set on the relation
Returns:
The created relation record or None if creation failed
Raises:
ValueError: If either instance is not saved or if no relation name is specified
"""
if not from_instance.id:
raise ValueError(f"Cannot create relation from unsaved {self.from_document.__name__}")
to_class = to_instance.__class__
if not to_instance.id:
raise ValueError(f"Cannot create relation to unsaved {to_class.__name__}")
# Handle both string and RecordID types for IDs
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id).split(':')[1]
from_collection = from_instance.id.table_name
else:
from_id = from_instance.id.split(':')[1] if ':' in from_instance.id else from_instance.id
from_collection = self.from_document._get_collection_name()
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id).split(':')[1]
to_collection = to_instance.id.table_name
else:
to_id = to_instance.id.split(':')[1] if ':' in to_instance.id else to_instance.id
to_collection = to_class._get_collection_name()
# Create RecordID objects with the correct collection names and IDs
from_record = RecordID(from_collection, from_id)
to_record = RecordID(to_collection, to_id)
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Construct the relation query using the RecordID objects
query = f"RELATE {from_record}->{relation}->{to_record}"
# Add attributes if provided
if attrs:
attrs_str = ", ".join([f"{k}: {json.dumps(v)}" for k, v in attrs.items()])
query += f" CONTENT {{ {attrs_str} }}"
result = self.connection.client.query(query)
# Return the relation record
if result and result[0]:
return result[0]
return None
[docs]
async def update_relation(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]:
"""Update an existing relation asynchronously.
This method updates an existing relation between two document instances
in the database. If the relation doesn't exist, it creates it.
Args:
from_instance: The instance the relation is from
to_instance: The instance the relation is to
**attrs: Attributes to update on the relation
Returns:
The updated relation record or None if update failed
Raises:
ValueError: If either instance is not saved or if no relation name is specified
"""
if not from_instance.id or not to_instance.id:
raise ValueError("Cannot update relation between unsaved documents")
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Handle both string and RecordID types for IDs
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id)
else:
from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}"
to_class = to_instance.__class__
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id)
else:
to_id = f"{to_class._get_collection_name()}:{to_instance.id}"
# Query the relation first
relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}"
relation_result = await self.connection.client.query(relation_query)
if not relation_result or not relation_result[0]:
return await self.relate(from_instance, to_instance, **attrs)
# Get relation ID and update
relation_id = relation_result[0][0]['id']
update_query = f"UPDATE {relation_id} SET"
# Add attributes
updates = []
for key, value in attrs.items():
updates.append(f" {key} = {json.dumps(value)}")
update_query += ",".join(updates)
result = await self.connection.client.query(update_query)
if result and result[0]:
return result[0][0]
return None
[docs]
def update_relation_sync(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]:
"""Update an existing relation synchronously.
This method updates an existing relation between two document instances
in the database. If the relation doesn't exist, it creates it.
Args:
from_instance: The instance the relation is from
to_instance: The instance the relation is to
**attrs: Attributes to update on the relation
Returns:
The updated relation record or None if update failed
Raises:
ValueError: If either instance is not saved or if no relation name is specified
"""
if not from_instance.id or not to_instance.id:
raise ValueError("Cannot update relation between unsaved documents")
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Handle both string and RecordID types for IDs
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id)
else:
from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}"
to_class = to_instance.__class__
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id)
else:
to_id = f"{to_class._get_collection_name()}:{to_instance.id}"
# Query the relation first
relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}"
relation_result = self.connection.client.query(relation_query)
if not relation_result or not relation_result[0]:
return self.relate_sync(from_instance, to_instance, **attrs)
# Get relation ID and update
relation_id = relation_result[0][0]['id']
update_query = f"UPDATE {relation_id} SET"
# Add attributes
updates = []
for key, value in attrs.items():
updates.append(f" {key} = {json.dumps(value)}")
update_query += ",".join(updates)
result = self.connection.client.query(update_query)
if result and result[0]:
return result[0][0]
return None
[docs]
async def delete_relation(self, from_instance: Any, to_instance: Optional[Any] = None) -> int:
"""Delete a relation asynchronously.
This method deletes a relation between two document instances in the database.
If to_instance is not provided, it deletes all relations from from_instance.
Args:
from_instance: The instance the relation is from
to_instance: The instance the relation is to (optional)
Returns:
Number of deleted relations
Raises:
ValueError: If from_instance is not saved, if to_instance is provided but not saved,
or if no relation name is specified
"""
if not from_instance.id:
raise ValueError(f"Cannot delete relation for unsaved {self.from_document.__name__}")
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Handle both string and RecordID types for from_instance ID
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id)
else:
from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}"
# Construct the delete query
if to_instance:
if not to_instance.id:
raise ValueError("Cannot delete relation to unsaved document")
# Handle both string and RecordID types for to_instance ID
to_class = to_instance.__class__
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id)
else:
to_id = f"{to_class._get_collection_name()}:{to_instance.id}"
# Delete specific relation
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}"
else:
# Delete all relations from this instance
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)}"
result = await self.connection.client.query(query)
if result and result[0]:
return len(result[0])
return 0
[docs]
def delete_relation_sync(self, from_instance: Any, to_instance: Optional[Any] = None) -> int:
"""Delete a relation synchronously.
This method deletes a relation between two document instances in the database.
If to_instance is not provided, it deletes all relations from from_instance.
Args:
from_instance: The instance the relation is from
to_instance: The instance the relation is to (optional)
Returns:
Number of deleted relations
Raises:
ValueError: If from_instance is not saved, if to_instance is provided but not saved,
or if no relation name is specified
"""
if not from_instance.id:
raise ValueError(f"Cannot delete relation for unsaved {self.from_document.__name__}")
relation = self.relation
if not relation:
raise ValueError("Relation name must be specified")
# Handle both string and RecordID types for from_instance ID
if isinstance(from_instance.id, RecordID):
from_id = str(from_instance.id)
else:
from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}"
# Construct the delete query
if to_instance:
if not to_instance.id:
raise ValueError("Cannot delete relation to unsaved document")
# Handle both string and RecordID types for to_instance ID
to_class = to_instance.__class__
if isinstance(to_instance.id, RecordID):
to_id = str(to_instance.id)
else:
to_id = f"{to_class._get_collection_name()}:{to_instance.id}"
# Delete specific relation
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}"
else:
# Delete all relations from this instance
query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)}"
result = self.connection.client.query(query)
if result and result[0]:
return len(result[0])
return 0