Source code for quantumengine.query.relation

import json
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from surrealdb import RecordID


[docs] class RelationQuerySet: """Query set specifically for graph relations. This class provides methods for querying and manipulating graph relations between documents in the database. It allows creating, retrieving, updating, and deleting relations between documents. Attributes: from_document: The document class the relation is from connection: The database connection to use for queries relation: The name of the relation query_parts: List of query parts """
[docs] def __init__(self, from_document: Type, connection: Any, relation: Optional[str] = None) -> None: """Initialize a new RelationQuerySet. Args: from_document: The document class the relation is from connection: The database connection to use for queries relation: The name of the relation """ self.from_document = from_document self.connection = connection self.relation = relation self.query_parts: List[Any] = [] self.backend = self._get_backend()
def _get_backend(self): """Get the backend instance for this relation's document class. Returns: Backend instance configured for this document """ backend_name = self.from_document._meta.get('backend', 'surrealdb') from ..backends import BackendRegistry from ..connection import ConnectionRegistry # Get the backend class backend_class = BackendRegistry.get_backend(backend_name) # Get the connection for this backend connection = ConnectionRegistry.get_default_connection(backend_name) # Return backend instance return backend_class(connection)
[docs] async def relate(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]: """Create a relation between two instances asynchronously. This method creates a relation between two document instances in the database. It constructs a RELATE query with the given relation name and attributes. Args: from_instance: The instance to create the relation from to_instance: The instance to create the relation to **attrs: Attributes to set on the relation Returns: The created relation record or None if creation failed Raises: ValueError: If either instance is not saved or if no relation name is specified """ if not from_instance.id: raise ValueError(f"Cannot create relation from unsaved {self.from_document.__name__}") to_class = to_instance.__class__ if not to_instance.id: raise ValueError(f"Cannot create relation to unsaved {to_class.__name__}") # Handle both string and RecordID types for IDs if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id).split(':')[1] from_collection = from_instance.id.table_name else: from_id = from_instance.id.split(':')[1] if ':' in from_instance.id else from_instance.id from_collection = self.from_document._get_collection_name() if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id).split(':')[1] to_collection = to_instance.id.table_name else: to_id = to_instance.id.split(':')[1] if ':' in to_instance.id else to_instance.id to_collection = to_class._get_collection_name() # Create RecordID objects with the correct collection names and IDs from_record = RecordID(from_collection, from_id) to_record = RecordID(to_collection, to_id) relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Check if backend supports graph relations if not self.backend.supports_graph_relations(): raise NotImplementedError(f"Backend {self.backend.__class__.__name__} does not support graph relations") # Use backend to create relation result_data = await self.backend.create_relation( from_collection, from_id, relation, to_collection, to_id, attrs ) # Return the relation record data directly return result_data
[docs] def relate_sync(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]: """Create a relation between two instances synchronously. This method creates a relation between two document instances in the database. It constructs a RELATE query with the given relation name and attributes. Args: from_instance: The instance to create the relation from to_instance: The instance to create the relation to **attrs: Attributes to set on the relation Returns: The created relation record or None if creation failed Raises: ValueError: If either instance is not saved or if no relation name is specified """ if not from_instance.id: raise ValueError(f"Cannot create relation from unsaved {self.from_document.__name__}") to_class = to_instance.__class__ if not to_instance.id: raise ValueError(f"Cannot create relation to unsaved {to_class.__name__}") # Handle both string and RecordID types for IDs if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id).split(':')[1] from_collection = from_instance.id.table_name else: from_id = from_instance.id.split(':')[1] if ':' in from_instance.id else from_instance.id from_collection = self.from_document._get_collection_name() if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id).split(':')[1] to_collection = to_instance.id.table_name else: to_id = to_instance.id.split(':')[1] if ':' in to_instance.id else to_instance.id to_collection = to_class._get_collection_name() # Create RecordID objects with the correct collection names and IDs from_record = RecordID(from_collection, from_id) to_record = RecordID(to_collection, to_id) relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Construct the relation query using the RecordID objects query = f"RELATE {from_record}->{relation}->{to_record}" # Add attributes if provided if attrs: attrs_str = ", ".join([f"{k}: {json.dumps(v)}" for k, v in attrs.items()]) query += f" CONTENT {{ {attrs_str} }}" result = self.connection.client.query(query) # Return the relation record if result and result[0]: return result[0] return None
[docs] async def update_relation(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]: """Update an existing relation asynchronously. This method updates an existing relation between two document instances in the database. If the relation doesn't exist, it creates it. Args: from_instance: The instance the relation is from to_instance: The instance the relation is to **attrs: Attributes to update on the relation Returns: The updated relation record or None if update failed Raises: ValueError: If either instance is not saved or if no relation name is specified """ if not from_instance.id or not to_instance.id: raise ValueError("Cannot update relation between unsaved documents") relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Handle both string and RecordID types for IDs if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id) else: from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}" to_class = to_instance.__class__ if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id) else: to_id = f"{to_class._get_collection_name()}:{to_instance.id}" # Query the relation first relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}" relation_result = await self.connection.client.query(relation_query) if not relation_result or not relation_result[0]: return await self.relate(from_instance, to_instance, **attrs) # Get relation ID and update relation_id = relation_result[0][0]['id'] update_query = f"UPDATE {relation_id} SET" # Add attributes updates = [] for key, value in attrs.items(): updates.append(f" {key} = {json.dumps(value)}") update_query += ",".join(updates) result = await self.connection.client.query(update_query) if result and result[0]: return result[0][0] return None
[docs] def update_relation_sync(self, from_instance: Any, to_instance: Any, **attrs: Any) -> Optional[Any]: """Update an existing relation synchronously. This method updates an existing relation between two document instances in the database. If the relation doesn't exist, it creates it. Args: from_instance: The instance the relation is from to_instance: The instance the relation is to **attrs: Attributes to update on the relation Returns: The updated relation record or None if update failed Raises: ValueError: If either instance is not saved or if no relation name is specified """ if not from_instance.id or not to_instance.id: raise ValueError("Cannot update relation between unsaved documents") relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Handle both string and RecordID types for IDs if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id) else: from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}" to_class = to_instance.__class__ if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id) else: to_id = f"{to_class._get_collection_name()}:{to_instance.id}" # Query the relation first relation_query = f"SELECT id FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}" relation_result = self.connection.client.query(relation_query) if not relation_result or not relation_result[0]: return self.relate_sync(from_instance, to_instance, **attrs) # Get relation ID and update relation_id = relation_result[0][0]['id'] update_query = f"UPDATE {relation_id} SET" # Add attributes updates = [] for key, value in attrs.items(): updates.append(f" {key} = {json.dumps(value)}") update_query += ",".join(updates) result = self.connection.client.query(update_query) if result and result[0]: return result[0][0] return None
[docs] async def delete_relation(self, from_instance: Any, to_instance: Optional[Any] = None) -> int: """Delete a relation asynchronously. This method deletes a relation between two document instances in the database. If to_instance is not provided, it deletes all relations from from_instance. Args: from_instance: The instance the relation is from to_instance: The instance the relation is to (optional) Returns: Number of deleted relations Raises: ValueError: If from_instance is not saved, if to_instance is provided but not saved, or if no relation name is specified """ if not from_instance.id: raise ValueError(f"Cannot delete relation for unsaved {self.from_document.__name__}") relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Handle both string and RecordID types for from_instance ID if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id) else: from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}" # Construct the delete query if to_instance: if not to_instance.id: raise ValueError("Cannot delete relation to unsaved document") # Handle both string and RecordID types for to_instance ID to_class = to_instance.__class__ if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id) else: to_id = f"{to_class._get_collection_name()}:{to_instance.id}" # Delete specific relation query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}" else: # Delete all relations from this instance query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)}" result = await self.connection.client.query(query) if result and result[0]: return len(result[0]) return 0
[docs] def delete_relation_sync(self, from_instance: Any, to_instance: Optional[Any] = None) -> int: """Delete a relation synchronously. This method deletes a relation between two document instances in the database. If to_instance is not provided, it deletes all relations from from_instance. Args: from_instance: The instance the relation is from to_instance: The instance the relation is to (optional) Returns: Number of deleted relations Raises: ValueError: If from_instance is not saved, if to_instance is provided but not saved, or if no relation name is specified """ if not from_instance.id: raise ValueError(f"Cannot delete relation for unsaved {self.from_document.__name__}") relation = self.relation if not relation: raise ValueError("Relation name must be specified") # Handle both string and RecordID types for from_instance ID if isinstance(from_instance.id, RecordID): from_id = str(from_instance.id) else: from_id = f"{self.from_document._get_collection_name()}:{from_instance.id}" # Construct the delete query if to_instance: if not to_instance.id: raise ValueError("Cannot delete relation to unsaved document") # Handle both string and RecordID types for to_instance ID to_class = to_instance.__class__ if isinstance(to_instance.id, RecordID): to_id = str(to_instance.id) else: to_id = f"{to_class._get_collection_name()}:{to_instance.id}" # Delete specific relation query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)} AND out = {json.dumps(to_id)}" else: # Delete all relations from this instance query = f"DELETE FROM {relation} WHERE in = {json.dumps(from_id)}" result = self.connection.client.query(query) if result and result[0]: return len(result[0]) return 0