import surrealdb
import time
import logging
import re
import urllib.parse
from typing import Dict, Optional, Any, Type, Union, Protocol, runtime_checkable, List, Tuple, Callable
from abc import ABC, abstractmethod
from queue import Queue, Empty
from threading import Lock, Event
import asyncio
from .schemaless import SurrealEngine
# Set up logging
logger = logging.getLogger(__name__)
[docs]
class ConnectionPoolBase(ABC):
"""Base class for connection pools.
This abstract class defines the common interface and functionality for
both synchronous and asynchronous connection pools.
Attributes:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
pool_size: Maximum number of connections in the pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
"""
[docs]
def __init__(self,
url: str,
namespace: Optional[str] = None,
database: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
pool_size: int = 10,
max_idle_time: int = 60,
connect_timeout: int = 30,
operation_timeout: int = 30,
retry_limit: int = 3,
retry_delay: float = 1.0,
retry_backoff: float = 2.0,
validate_on_borrow: bool = True) -> None:
"""Initialize a new ConnectionPoolBase.
Args:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
pool_size: Maximum number of connections in the pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
"""
self.url = url
self.namespace = namespace
self.database = database
self.username = username
self.password = password
self.pool_size = max(1, pool_size)
self.max_idle_time = max(0, max_idle_time)
self.connect_timeout = max(1, connect_timeout)
self.operation_timeout = max(1, operation_timeout)
self.retry_limit = max(0, retry_limit)
self.retry_delay = max(0.1, retry_delay)
self.retry_backoff = max(1.0, retry_backoff)
self.validate_on_borrow = validate_on_borrow
# Initialize pool statistics
self.created_connections = 0
self.borrowed_connections = 0
self.returned_connections = 0
self.discarded_connections = 0
[docs]
@abstractmethod
def create_connection(self) -> Any:
"""Create a new connection.
Returns:
A new connection
"""
pass
[docs]
@abstractmethod
def validate_connection(self, connection: Any) -> bool:
"""Validate a connection.
Args:
connection: The connection to validate
Returns:
True if the connection is valid, False otherwise
"""
pass
[docs]
@abstractmethod
def close_connection(self, connection: Any) -> None:
"""Close a connection.
Args:
connection: The connection to close
"""
pass
[docs]
@abstractmethod
def get_connection(self) -> Any:
"""Get a connection from the pool.
Returns:
A connection from the pool
"""
pass
[docs]
@abstractmethod
def return_connection(self, connection: Any) -> None:
"""Return a connection to the pool.
Args:
connection: The connection to return
"""
pass
[docs]
@abstractmethod
def close(self) -> None:
"""Close the pool and all connections."""
pass
[docs]
class SyncConnectionPool(ConnectionPoolBase):
"""Synchronous connection pool for SurrealDB.
This class manages a pool of synchronous connections to a SurrealDB database.
It handles connection creation, validation, and reuse, and provides methods
for acquiring and releasing connections.
The connections returned by this pool are wrapped in SurrealEngineSyncConnection
objects, which can be used with the Document class and other SurrealEngine
functionality that expects a SurrealEngineSyncConnection.
Attributes:
pool: Queue of available connections
in_use: Set of connections currently in use
lock: Lock for thread-safe operations
closed: Whether the pool is closed
"""
[docs]
def __init__(self,
url: str,
namespace: Optional[str] = None,
database: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
pool_size: int = 10,
max_idle_time: int = 60,
connect_timeout: int = 30,
operation_timeout: int = 30,
retry_limit: int = 3,
retry_delay: float = 1.0,
retry_backoff: float = 2.0,
validate_on_borrow: bool = True) -> None:
"""Initialize a new SyncConnectionPool.
Args:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
pool_size: Maximum number of connections in the pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
"""
super().__init__(
url, namespace, database, username, password,
pool_size, max_idle_time, connect_timeout, operation_timeout,
retry_limit, retry_delay, retry_backoff, validate_on_borrow
)
# Initialize the pool
self.pool: Queue = Queue(maxsize=self.pool_size)
self.in_use: Dict['SurrealEngineSyncConnection', float] = {} # Connection -> timestamp when borrowed
self.lock = Lock()
self.closed = False
[docs]
def create_connection(self) -> 'SurrealEngineSyncConnection':
"""Create a new connection.
Returns:
A new connection wrapped in SurrealEngineSyncConnection
Raises:
Exception: If the connection cannot be created
"""
try:
# Create a SurrealEngineSyncConnection
connection = SurrealEngineSyncConnection(
url=self.url,
namespace=self.namespace,
database=self.database,
username=self.username,
password=self.password
)
# Connect to the database
connection.connect()
self.created_connections += 1
logger.debug(f"Created new connection (total created: {self.created_connections})")
return connection
except Exception as e:
logger.error(f"Failed to create connection: {str(e)}")
raise
[docs]
def validate_connection(self, connection: 'SurrealEngineSyncConnection') -> bool:
"""Validate a connection.
Args:
connection: The connection to validate (SurrealEngineSyncConnection)
Returns:
True if the connection is valid, False otherwise
"""
try:
# Execute a simple query to check if the connection is valid
if connection.client:
connection.client.query("SELECT 1 FROM information_schema.tables LIMIT 1")
return True
return False
except Exception as e:
logger.warning(f"Connection validation failed: {str(e)}")
return False
[docs]
def close_connection(self, connection: 'SurrealEngineSyncConnection') -> None:
"""Close a connection.
Args:
connection: The connection to close (SurrealEngineSyncConnection)
"""
try:
connection.disconnect()
self.discarded_connections += 1
logger.debug(f"Closed connection (total discarded: {self.discarded_connections})")
except Exception as e:
logger.warning(f"Failed to close connection: {str(e)}")
[docs]
def get_connection(self) -> 'SurrealEngineSyncConnection':
"""Get a connection from the pool.
Returns:
A SurrealEngineSyncConnection from the pool
Raises:
RuntimeError: If the pool is closed
Exception: If a connection cannot be obtained
"""
if self.closed:
raise RuntimeError("Connection pool is closed")
# Try to get a connection from the pool
try:
connection = self.pool.get(block=False)
# Validate the connection if needed
if self.validate_on_borrow and not self.validate_connection(connection):
# Connection is invalid, close it and create a new one
self.close_connection(connection)
connection = self.create_connection()
except Empty:
# Pool is empty, create a new connection if we haven't reached the limit
with self.lock:
if len(self.in_use) < self.pool_size:
connection = self.create_connection()
else:
# Wait for a connection to become available
try:
connection = self.pool.get(timeout=self.connect_timeout)
# Validate the connection if needed
if self.validate_on_borrow and not self.validate_connection(connection):
# Connection is invalid, close it and create a new one
self.close_connection(connection)
connection = self.create_connection()
except Empty:
raise RuntimeError("Timeout waiting for a connection")
# Mark the connection as in use
with self.lock:
self.in_use[connection] = time.time()
self.borrowed_connections += 1
logger.debug(f"Borrowed connection (total borrowed: {self.borrowed_connections})")
return connection
[docs]
def return_connection(self, connection: 'SurrealEngineSyncConnection') -> None:
"""Return a connection to the pool.
Args:
connection: The connection to return (SurrealEngineSyncConnection)
"""
if self.closed:
# Pool is closed, just close the connection
self.close_connection(connection)
return
# Remove the connection from the in-use set
with self.lock:
if connection in self.in_use:
del self.in_use[connection]
self.returned_connections += 1
logger.debug(f"Returned connection (total returned: {self.returned_connections})")
else:
# Connection wasn't borrowed from this pool
logger.warning("Attempted to return a connection that wasn't borrowed from this pool")
return
# Check if the connection is still valid
if self.validate_on_borrow and not self.validate_connection(connection):
# Connection is invalid, close it
self.close_connection(connection)
return
# Return the connection to the pool
try:
self.pool.put(connection, block=False)
except Exception:
# Pool is full, close the connection
self.close_connection(connection)
[docs]
def close(self) -> None:
"""Close the pool and all connections."""
if self.closed:
return
self.closed = True
# Close all connections in the pool
while not self.pool.empty():
try:
connection = self.pool.get(block=False)
self.close_connection(connection)
except Empty:
break
# Close all in-use connections
with self.lock:
for connection in list(self.in_use.keys()):
self.close_connection(connection)
self.in_use.clear()
logger.info(f"Connection pool closed. Stats: created={self.created_connections}, "
f"borrowed={self.borrowed_connections}, returned={self.returned_connections}, "
f"discarded={self.discarded_connections}")
[docs]
class AsyncConnectionPool(ConnectionPoolBase):
"""Asynchronous connection pool for SurrealDB.
This class manages a pool of asynchronous connections to a SurrealDB database.
It handles connection creation, validation, and reuse, and provides methods
for acquiring and releasing connections.
The connections returned by this pool are wrapped in SurrealEngineAsyncConnection
objects, which can be used with the Document class and other SurrealEngine
functionality that expects a SurrealEngineAsyncConnection.
Attributes:
pool: List of available connections
in_use: Dictionary of connections currently in use and their timestamps
lock: Asyncio lock for thread-safe operations
closed: Whether the pool is closed
"""
[docs]
def __init__(self,
url: str,
namespace: Optional[str] = None,
database: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
pool_size: int = 10,
max_idle_time: int = 60,
connect_timeout: int = 30,
operation_timeout: int = 30,
retry_limit: int = 3,
retry_delay: float = 1.0,
retry_backoff: float = 2.0,
validate_on_borrow: bool = True) -> None:
"""Initialize a new AsyncConnectionPool.
Args:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
pool_size: Maximum number of connections in the pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
"""
super().__init__(
url, namespace, database, username, password,
pool_size, max_idle_time, connect_timeout, operation_timeout,
retry_limit, retry_delay, retry_backoff, validate_on_borrow
)
# Initialize the pool
self.pool: List['SurrealEngineAsyncConnection'] = []
self.in_use: Dict['SurrealEngineAsyncConnection', float] = {} # Connection -> timestamp when borrowed
self.lock = asyncio.Lock()
self.closed = False
self.connection_waiters: List[asyncio.Future] = []
[docs]
async def create_connection(self) -> 'SurrealEngineAsyncConnection':
"""Create a new connection.
Returns:
A new connection wrapped in SurrealEngineAsyncConnection
Raises:
Exception: If the connection cannot be created
"""
try:
# Create a SurrealEngineAsyncConnection
connection = SurrealEngineAsyncConnection(
url=self.url,
namespace=self.namespace,
database=self.database,
username=self.username,
password=self.password
)
# Connect to the database
await connection.connect()
self.created_connections += 1
logger.debug(f"Created new async connection (total created: {self.created_connections})")
return connection
except Exception as e:
logger.error(f"Failed to create async connection: {str(e)}")
raise
[docs]
async def validate_connection(self, connection: 'SurrealEngineAsyncConnection') -> bool:
"""Validate a connection.
Args:
connection: The connection to validate (SurrealEngineAsyncConnection)
Returns:
True if the connection is valid, False otherwise
"""
try:
# Execute a simple query to check if the connection is valid
if connection.client:
await connection.client.query("SELECT 1 FROM information_schema.tables LIMIT 1")
return True
return False
except Exception as e:
logger.warning(f"Async connection validation failed: {str(e)}")
return False
[docs]
async def close_connection(self, connection: 'SurrealEngineAsyncConnection') -> None:
"""Close a connection.
Args:
connection: The connection to close (SurrealEngineAsyncConnection)
"""
try:
await connection.disconnect()
self.discarded_connections += 1
logger.debug(f"Closed async connection (total discarded: {self.discarded_connections})")
except Exception as e:
logger.warning(f"Failed to close async connection: {str(e)}")
[docs]
async def get_connection(self) -> 'SurrealEngineAsyncConnection':
"""Get a connection from the pool.
Returns:
A SurrealEngineAsyncConnection from the pool
Raises:
RuntimeError: If the pool is closed
Exception: If a connection cannot be obtained
"""
if self.closed:
raise RuntimeError("Async connection pool is closed")
async with self.lock:
# Try to get a connection from the pool
if self.pool:
connection = self.pool.pop()
# Validate the connection if needed
if self.validate_on_borrow:
is_valid = await self.validate_connection(connection)
if not is_valid:
# Connection is invalid, close it and create a new one
await self.close_connection(connection)
connection = await self.create_connection()
elif len(self.in_use) < self.pool_size:
# Pool is empty but we haven't reached the limit, create a new connection
connection = await self.create_connection()
else:
# We've reached the pool size limit, wait for a connection to be returned
waiter = asyncio.get_event_loop().create_future()
self.connection_waiters.append(waiter)
# Release the lock while waiting
self.lock.release()
try:
# Wait for a connection with timeout
connection = await asyncio.wait_for(waiter, timeout=self.connect_timeout)
# Validate the connection if needed
if self.validate_on_borrow:
is_valid = await self.validate_connection(connection)
if not is_valid:
# Connection is invalid, close it and create a new one
await self.close_connection(connection)
async with self.lock:
connection = await self.create_connection()
except asyncio.TimeoutError:
# Remove the waiter from the list
async with self.lock:
if waiter in self.connection_waiters:
self.connection_waiters.remove(waiter)
raise RuntimeError("Timeout waiting for an async connection")
except Exception:
# Remove the waiter from the list
async with self.lock:
if waiter in self.connection_waiters:
self.connection_waiters.remove(waiter)
raise
finally:
# Re-acquire the lock
await self.lock.acquire()
# Mark the connection as in use
self.in_use[connection] = time.time()
self.borrowed_connections += 1
logger.debug(f"Borrowed async connection (total borrowed: {self.borrowed_connections})")
return connection
[docs]
async def return_connection(self, connection: 'SurrealEngineAsyncConnection') -> None:
"""Return a connection to the pool.
Args:
connection: The connection to return (SurrealEngineAsyncConnection)
"""
if self.closed:
# Pool is closed, just close the connection
await self.close_connection(connection)
return
async with self.lock:
# Remove the connection from the in-use set
if connection in self.in_use:
del self.in_use[connection]
self.returned_connections += 1
logger.debug(f"Returned async connection (total returned: {self.returned_connections})")
else:
# Connection wasn't borrowed from this pool
logger.warning("Attempted to return an async connection that wasn't borrowed from this pool")
return
# Check if there are waiters
if self.connection_waiters:
# Give the connection to the first waiter
waiter = self.connection_waiters.pop(0)
if not waiter.done():
waiter.set_result(connection)
return
# Check if the connection is still valid
if self.validate_on_borrow:
is_valid = await self.validate_connection(connection)
if not is_valid:
# Connection is invalid, close it
await self.close_connection(connection)
return
# Return the connection to the pool
if len(self.pool) < self.pool_size:
self.pool.append(connection)
else:
# Pool is full, close the connection
await self.close_connection(connection)
[docs]
async def close(self) -> None:
"""Close the pool and all connections."""
if self.closed:
return
async with self.lock:
self.closed = True
# Cancel all waiters
for waiter in self.connection_waiters:
if not waiter.done():
waiter.set_exception(RuntimeError("Async connection pool is closed"))
self.connection_waiters.clear()
# Close all connections in the pool
for connection in self.pool:
await self.close_connection(connection)
self.pool.clear()
# Close all in-use connections
for connection in list(self.in_use.keys()):
await self.close_connection(connection)
self.in_use.clear()
logger.info(f"Async connection pool closed. Stats: created={self.created_connections}, "
f"borrowed={self.borrowed_connections}, returned={self.returned_connections}, "
f"discarded={self.discarded_connections}")
[docs]
class ConnectionEvent:
"""Event types for connection events.
This class defines the event types that can be emitted by connections.
"""
CONNECTED = "connected"
DISCONNECTED = "disconnected"
RECONNECTING = "reconnecting"
RECONNECTED = "reconnected"
CONNECTION_FAILED = "connection_failed"
RECONNECTION_FAILED = "reconnection_failed"
CONNECTION_CLOSED = "connection_closed"
[docs]
class ConnectionEventListener:
"""Listener for connection events.
This class defines the interface for connection event listeners.
"""
[docs]
def on_event(self, event_type: str, connection: Any, **kwargs) -> None:
"""Handle a connection event.
Args:
event_type: The type of event
connection: The connection that emitted the event
**kwargs: Additional event data
"""
pass
[docs]
class ConnectionEventEmitter:
"""Emitter for connection events.
This class provides methods for registering listeners and emitting events.
Attributes:
listeners: List of registered event listeners
"""
[docs]
def __init__(self) -> None:
"""Initialize a new ConnectionEventEmitter."""
self.listeners: List[ConnectionEventListener] = []
[docs]
def add_listener(self, listener: ConnectionEventListener) -> None:
"""Add a listener for connection events.
Args:
listener: The listener to add
"""
if listener not in self.listeners:
self.listeners.append(listener)
[docs]
def remove_listener(self, listener: ConnectionEventListener) -> None:
"""Remove a listener for connection events.
Args:
listener: The listener to remove
"""
if listener in self.listeners:
self.listeners.remove(listener)
[docs]
def emit_event(self, event_type: str, connection: Any, **kwargs) -> None:
"""Emit a connection event.
Args:
event_type: The type of event
connection: The connection that emitted the event
**kwargs: Additional event data
"""
for listener in self.listeners:
try:
listener.on_event(event_type, connection, **kwargs)
except Exception as e:
logger.warning(f"Error in connection event listener: {str(e)}")
[docs]
class OperationQueue:
"""Queue for operations during reconnection.
This class manages a queue of operations that are waiting for a connection
to be reestablished. Once the connection is restored, the operations are
executed in the order they were queued.
Attributes:
sync_operations: Queue of synchronous operations
async_operations: Queue of asynchronous operations
is_reconnecting: Whether the connection is currently reconnecting
reconnection_event: Event that is set when reconnection is complete
async_reconnection_event: Asyncio event that is set when reconnection is complete
"""
[docs]
def __init__(self) -> None:
"""Initialize a new OperationQueue."""
self.sync_operations: List[Tuple[Callable, List, Dict]] = []
self.async_operations: List[Tuple[Callable, List, Dict]] = []
self.is_reconnecting = False
self.reconnection_event = Event()
self.async_reconnection_event = asyncio.Event()
[docs]
def start_reconnection(self) -> None:
"""Start the reconnection process.
This method marks the connection as reconnecting and clears the
reconnection events.
"""
self.is_reconnecting = True
self.reconnection_event.clear()
self.async_reconnection_event.clear()
[docs]
def end_reconnection(self) -> None:
"""End the reconnection process.
This method marks the connection as no longer reconnecting and sets
the reconnection events.
"""
self.is_reconnecting = False
self.reconnection_event.set()
self.async_reconnection_event.set()
[docs]
def queue_operation(self, operation: Callable, args: List = None, kwargs: Dict = None) -> None:
"""Queue a synchronous operation.
Args:
operation: The operation to queue
args: The positional arguments for the operation
kwargs: The keyword arguments for the operation
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.sync_operations.append((operation, args, kwargs))
[docs]
def queue_async_operation(self, operation: Callable, args: List = None, kwargs: Dict = None) -> None:
"""Queue an asynchronous operation.
Args:
operation: The operation to queue
args: The positional arguments for the operation
kwargs: The keyword arguments for the operation
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.async_operations.append((operation, args, kwargs))
[docs]
def execute_queued_operations(self) -> None:
"""Execute all queued synchronous operations."""
operations = self.sync_operations
self.sync_operations = []
for operation, args, kwargs in operations:
try:
operation(*args, **kwargs)
except Exception as e:
logger.error(f"Error executing queued operation: {str(e)}")
[docs]
async def execute_queued_async_operations(self) -> None:
"""Execute all queued asynchronous operations."""
operations = self.async_operations
self.async_operations = []
for operation, args, kwargs in operations:
try:
await operation(*args, **kwargs)
except Exception as e:
logger.error(f"Error executing queued async operation: {str(e)}")
[docs]
def wait_for_reconnection(self, timeout: Optional[float] = None) -> bool:
"""Wait for reconnection to complete.
Args:
timeout: Maximum time to wait in seconds
Returns:
True if reconnection completed, False if timed out
"""
if not self.is_reconnecting:
return True
return self.reconnection_event.wait(timeout)
[docs]
async def wait_for_async_reconnection(self, timeout: Optional[float] = None) -> bool:
"""Wait for reconnection to complete asynchronously.
Args:
timeout: Maximum time to wait in seconds
Returns:
True if reconnection completed, False if timed out
"""
if not self.is_reconnecting:
return True
try:
if timeout is not None:
await asyncio.wait_for(self.async_reconnection_event.wait(), timeout)
else:
await self.async_reconnection_event.wait()
return True
except asyncio.TimeoutError:
return False
[docs]
class ReconnectionStrategy:
"""Strategy for reconnecting to the database.
This class provides methods for reconnecting to the database with
configurable retry limits and backoff strategies.
Attributes:
max_attempts: Maximum number of reconnection attempts
initial_delay: Initial delay in seconds between reconnection attempts
max_delay: Maximum delay in seconds between reconnection attempts
backoff_factor: Backoff multiplier for reconnection delay
"""
[docs]
def __init__(self, max_attempts: int = 10, initial_delay: float = 1.0,
max_delay: float = 60.0, backoff_factor: float = 2.0) -> None:
"""Initialize a new ReconnectionStrategy.
Args:
max_attempts: Maximum number of reconnection attempts
initial_delay: Initial delay in seconds between reconnection attempts
max_delay: Maximum delay in seconds between reconnection attempts
backoff_factor: Backoff multiplier for reconnection delay
"""
self.max_attempts = max(1, max_attempts)
self.initial_delay = max(0.1, initial_delay)
self.max_delay = max(self.initial_delay, max_delay)
self.backoff_factor = max(1.0, backoff_factor)
[docs]
def get_delay(self, attempt: int) -> float:
"""Get the delay for a reconnection attempt.
Args:
attempt: The reconnection attempt number (0-based)
Returns:
The delay in seconds for the reconnection attempt
"""
delay = self.initial_delay * (self.backoff_factor ** attempt)
return min(delay, self.max_delay)
[docs]
class RetryStrategy:
"""Strategy for retrying operations with exponential backoff.
This class provides methods for retrying operations with configurable
retry limits and backoff strategies.
Attributes:
retry_limit: Maximum number of retries
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
"""
[docs]
def __init__(self, retry_limit: int = 3, retry_delay: float = 1.0, retry_backoff: float = 2.0) -> None:
"""Initialize a new RetryStrategy.
Args:
retry_limit: Maximum number of retries
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
"""
self.retry_limit = max(0, retry_limit)
self.retry_delay = max(0.1, retry_delay)
self.retry_backoff = max(1.0, retry_backoff)
[docs]
def get_retry_delay(self, attempt: int) -> float:
"""Get the delay for a retry attempt.
Args:
attempt: The retry attempt number (0-based)
Returns:
The delay in seconds for the retry attempt
"""
return self.retry_delay * (self.retry_backoff ** attempt)
[docs]
def should_retry(self, attempt: int, exception: Exception) -> bool:
"""Determine whether to retry an operation.
Args:
attempt: The retry attempt number (0-based)
exception: The exception that caused the operation to fail
Returns:
True if the operation should be retried, False otherwise
"""
# Don't retry if we've reached the retry limit
if attempt >= self.retry_limit:
return False
# Determine whether the exception is retryable
# For now, we'll retry on all exceptions, but in a real implementation
# you might want to be more selective
return True
[docs]
def execute_with_retry(self, operation: Callable[[], Any]) -> Any:
"""Execute an operation with retry.
Args:
operation: The operation to execute
Returns:
The result of the operation
Raises:
Exception: If the operation fails after all retries
"""
last_exception = None
for attempt in range(self.retry_limit + 1):
try:
return operation()
except Exception as e:
last_exception = e
if not self.should_retry(attempt, e):
break
# Calculate the delay for this retry attempt
delay = self.get_retry_delay(attempt)
logger.warning(f"Operation failed: {str(e)}. Retrying in {delay:.2f} seconds (attempt {attempt + 1}/{self.retry_limit + 1})...")
# Wait before retrying
time.sleep(delay)
# If we get here, all retries failed
if last_exception:
logger.error(f"Operation failed after {self.retry_limit + 1} attempts: {str(last_exception)}")
raise last_exception
# This should never happen, but just in case
raise RuntimeError("Operation failed for unknown reason")
[docs]
async def execute_with_retry_async(self, operation: Callable[[], Any]) -> Any:
"""Execute an asynchronous operation with retry.
Args:
operation: The asynchronous operation to execute
Returns:
The result of the operation
Raises:
Exception: If the operation fails after all retries
"""
last_exception = None
for attempt in range(self.retry_limit + 1):
try:
return await operation()
except Exception as e:
last_exception = e
if not self.should_retry(attempt, e):
break
# Calculate the delay for this retry attempt
delay = self.get_retry_delay(attempt)
logger.warning(f"Async operation failed: {str(e)}. Retrying in {delay:.2f} seconds (attempt {attempt + 1}/{self.retry_limit + 1})...")
# Wait before retrying
await asyncio.sleep(delay)
# If we get here, all retries failed
if last_exception:
logger.error(f"Async operation failed after {self.retry_limit + 1} attempts: {str(last_exception)}")
raise last_exception
# This should never happen, but just in case
raise RuntimeError("Async operation failed for unknown reason")
[docs]
def parse_connection_string(connection_string: str) -> Dict[str, Any]:
"""Parse a connection string into a dictionary of connection parameters.
Supports the following formats:
- surrealdb://user:pass@host:port/namespace/database?param1=value1¶m2=value2 (maps to ws://)
- wss://user:pass@host:port/namespace/database?param1=value1¶m2=value2
- ws://user:pass@host:port/namespace/database?param1=value1¶m2=value2
- http://user:pass@host:port/namespace/database?param1=value1¶m2=value2
- https://user:pass@host:port/namespace/database?param1=value1¶m2=value2
Connection string parameters:
- pool_size: Maximum number of connections in the pool (default: 10)
- max_idle_time: Maximum time in seconds a connection can be idle before being closed (default: 60)
- connect_timeout: Timeout in seconds for establishing a connection (default: 30)
- operation_timeout: Timeout in seconds for operations (default: 30)
- retry_limit: Maximum number of retries for failed operations (default: 3)
- retry_delay: Initial delay in seconds between retries (default: 1)
- retry_backoff: Backoff multiplier for retry delay (default: 2)
- validate_on_borrow: Whether to validate connections when borrowing from the pool (default: true)
Args:
connection_string: The connection string to parse
Returns:
A dictionary containing the parsed connection parameters
Raises:
ValueError: If the connection string is invalid
"""
# Validate the connection string
if not connection_string:
raise ValueError("Connection string cannot be empty")
# Check if the connection string starts with a supported protocol
supported_protocols = ["surrealdb://", "wss://", "ws://", "http://", "https://"]
protocol_match = False
for protocol in supported_protocols:
if connection_string.startswith(protocol):
protocol_match = True
break
if not protocol_match:
raise ValueError(f"Connection string must start with one of: {', '.join(supported_protocols)}")
# Parse the connection string using urllib.parse
try:
parsed_url = urllib.parse.urlparse(connection_string)
# Extract the components
scheme = parsed_url.scheme
netloc = parsed_url.netloc
path = parsed_url.path.strip('/')
query = parsed_url.query
# Parse the netloc to get username, password, host, and port
username = None
password = None
if '@' in netloc:
auth, netloc = netloc.split('@', 1)
if ':' in auth:
username, password = auth.split(':', 1)
username = urllib.parse.unquote(username)
password = urllib.parse.unquote(password)
else:
username = urllib.parse.unquote(auth)
# Parse host and port
host = netloc
port = None
if ':' in netloc:
host, port_str = netloc.split(':', 1)
try:
port = int(port_str)
except ValueError:
raise ValueError(f"Invalid port number: {port_str}")
# Parse namespace and database from path
namespace = None
database = None
path_parts = path.split('/')
if len(path_parts) >= 1 and path_parts[0]:
namespace = path_parts[0]
if len(path_parts) >= 2 and path_parts[1]:
database = path_parts[1]
# Parse query parameters
params = {}
if query:
query_params = urllib.parse.parse_qs(query)
for key, values in query_params.items():
if len(values) == 1:
# Try to convert to appropriate types
value = values[0]
if value.lower() == 'true':
params[key] = True
elif value.lower() == 'false':
params[key] = False
elif value.isdigit():
params[key] = int(value)
elif re.match(r'^-?\d+(\.\d+)?$', value):
params[key] = float(value)
else:
params[key] = value
else:
params[key] = values
# Construct the URL
# Map the surrealdb scheme to ws scheme
if scheme == "surrealdb":
scheme = "ws"
url = f"{scheme}://{host}"
if port:
url += f":{port}"
# Build the result dictionary
result = {
"url": url,
"namespace": namespace,
"database": database,
"username": username,
"password": password,
**params
}
return result
except Exception as e:
raise ValueError(f"Failed to parse connection string: {str(e)}")
[docs]
@runtime_checkable
class BaseSurrealEngineConnection(Protocol):
"""Protocol defining the interface for SurrealDB connections.
This protocol defines the common interface that both synchronous and
asynchronous connections must implement.
"""
url: Optional[str]
namespace: Optional[str]
database: Optional[str]
username: Optional[str]
password: Optional[str]
client: Any
@property
def db(self) -> SurrealEngine:
"""Get dynamic table accessor."""
...
[docs]
class ConnectionPoolClient:
"""Client that proxies requests to connections from a connection pool.
This class provides the same interface as the SurrealDB client but gets a connection
from the pool for each operation and returns it when done.
Attributes:
pool: The connection pool to get connections from
"""
[docs]
def __init__(self, pool: 'AsyncConnectionPool') -> None:
"""Initialize a new ConnectionPoolClient.
Args:
pool: The connection pool to get connections from
"""
self.pool = pool
[docs]
async def create(self, collection: str, data: Dict[str, Any]) -> Any:
"""Create a new record in the database.
Args:
collection: The collection to create the record in
data: The data to create the record with
Returns:
The created record
"""
connection = await self.pool.get_connection()
try:
return await connection.client.create(collection, data)
finally:
await self.pool.return_connection(connection)
[docs]
async def update(self, id: str, data: Dict[str, Any]) -> Any:
"""Update an existing record in the database.
Args:
id: The ID of the record to update
data: The data to update the record with
Returns:
The updated record
"""
connection = await self.pool.get_connection()
try:
return await connection.client.update(id, data)
finally:
await self.pool.return_connection(connection)
[docs]
async def delete(self, id: str) -> Any:
"""Delete a record from the database.
Args:
id: The ID of the record to delete
Returns:
The result of the delete operation
"""
connection = await self.pool.get_connection()
try:
return await connection.client.delete(id)
finally:
await self.pool.return_connection(connection)
[docs]
async def select(self, id: str) -> Any:
"""Select a record from the database.
Args:
id: The ID of the record to select
Returns:
The selected record
"""
connection = await self.pool.get_connection()
try:
return await connection.client.select(id)
finally:
await self.pool.return_connection(connection)
[docs]
async def query(self, query: str) -> Any:
"""Execute a query against the database.
Args:
query: The query to execute
Returns:
The result of the query
"""
connection = await self.pool.get_connection()
try:
return await connection.client.query(query)
finally:
await self.pool.return_connection(connection)
[docs]
async def insert(self, collection: str, data: List[Dict[str, Any]]) -> Any:
"""Insert multiple records into the database.
Args:
collection: The collection to insert the records into
data: The data to insert
Returns:
The inserted records
"""
connection = await self.pool.get_connection()
try:
return await connection.client.insert(collection, data)
finally:
await self.pool.return_connection(connection)
[docs]
async def signin(self, credentials: Dict[str, str]) -> Any:
"""Sign in to the database.
Args:
credentials: The credentials to sign in with
Returns:
The result of the sign-in operation
"""
connection = await self.pool.get_connection()
try:
return await connection.client.signin(credentials)
finally:
await self.pool.return_connection(connection)
[docs]
async def use(self, namespace: str, database: str) -> Any:
"""Use a specific namespace and database.
Args:
namespace: The namespace to use
database: The database to use
Returns:
The result of the use operation
"""
connection = await self.pool.get_connection()
try:
return await connection.client.use(namespace, database)
finally:
await self.pool.return_connection(connection)
[docs]
async def close(self) -> None:
"""Close the connection pool."""
await self.pool.close()
[docs]
class ConnectionRegistry:
"""Global connection registry for multi-backend support.
This class provides a centralized registry for managing database connections
for multiple backends (SurrealDB, ClickHouse, etc.). It allows setting default
connections per backend and registering named connections that can be retrieved
throughout the application.
Attributes:
_default_async_connection: The default async connection to use when none is specified (legacy)
_default_sync_connection: The default sync connection to use when none is specified (legacy)
_async_connections: Dictionary of named async connections (legacy)
_sync_connections: Dictionary of named sync connections (legacy)
_backend_connections: Dictionary of backend -> {connection_name -> connection}
_default_backend_connections: Dictionary of backend -> default_connection
"""
# Legacy attributes for backward compatibility
_default_async_connection: Optional['SurrealEngineAsyncConnection'] = None
_default_sync_connection: Optional['SurrealEngineSyncConnection'] = None
_async_connections: Dict[str, 'SurrealEngineAsyncConnection'] = {}
_sync_connections: Dict[str, 'SurrealEngineSyncConnection'] = {}
# New multi-backend attributes
_backend_connections: Dict[str, Dict[str, Any]] = {}
_default_backend_connections: Dict[str, Any] = {}
[docs]
@classmethod
def set_default_async_connection(cls, connection: 'SurrealEngineAsyncConnection') -> None:
"""Set the default async connection.
Args:
connection: The async connection to set as default
"""
cls._default_async_connection = connection
[docs]
@classmethod
def set_default_sync_connection(cls, connection: 'SurrealEngineSyncConnection') -> None:
"""Set the default sync connection.
Args:
connection: The sync connection to set as default
"""
cls._default_sync_connection = connection
[docs]
@classmethod
def set_default_connection(cls, connection: Union['SurrealEngineAsyncConnection', 'SurrealEngineSyncConnection']) -> None:
"""Set the default connection based on its type.
Args:
connection: The connection to set as default
"""
if isinstance(connection, SurrealEngineAsyncConnection):
cls.set_default_async_connection(connection)
elif isinstance(connection, SurrealEngineSyncConnection):
cls.set_default_sync_connection(connection)
else:
raise TypeError(f"Unsupported connection type: {type(connection)}")
[docs]
@classmethod
def get_default_async_connection(cls) -> 'SurrealEngineAsyncConnection':
"""Get the default async connection.
Returns:
The default async connection
Raises:
RuntimeError: If no default async connection has been set
"""
if cls._default_async_connection is None:
raise RuntimeError("No default async connection has been set. Call set_default_async_connection() first.")
return cls._default_async_connection
[docs]
@classmethod
def get_default_sync_connection(cls) -> 'SurrealEngineSyncConnection':
"""Get the default sync connection.
Returns:
The default sync connection
Raises:
RuntimeError: If no default sync connection has been set
"""
if cls._default_sync_connection is None:
raise RuntimeError("No default sync connection has been set. Call set_default_sync_connection() first.")
return cls._default_sync_connection
@classmethod
def get_default_connection(cls, async_mode: bool = True) -> Union['SurrealEngineAsyncConnection', 'SurrealEngineSyncConnection']:
"""Get the default connection based on the mode.
Args:
async_mode: Whether to get the async or sync connection
Returns:
The default connection of the requested type
Raises:
RuntimeError: If no default connection of the requested type has been set
"""
if async_mode:
return cls.get_default_async_connection()
else:
return cls.get_default_sync_connection()
[docs]
@classmethod
def add_async_connection(cls, name: str, connection: 'SurrealEngineAsyncConnection') -> None:
"""Add a named async connection to the registry.
Args:
name: The name to register the connection under
connection: The async connection to register
"""
cls._async_connections[name] = connection
[docs]
@classmethod
def add_sync_connection(cls, name: str, connection: 'SurrealEngineSyncConnection') -> None:
"""Add a named sync connection to the registry.
Args:
name: The name to register the connection under
connection: The sync connection to register
"""
cls._sync_connections[name] = connection
[docs]
@classmethod
def add_connection(cls, name: str, connection: Union['SurrealEngineAsyncConnection', 'SurrealEngineSyncConnection']) -> None:
"""Add a named connection to the registry based on its type.
Args:
name: The name to register the connection under
connection: The connection to register
"""
if isinstance(connection, SurrealEngineAsyncConnection):
cls.add_async_connection(name, connection)
elif isinstance(connection, SurrealEngineSyncConnection):
cls.add_sync_connection(name, connection)
else:
raise TypeError(f"Unsupported connection type: {type(connection)}")
[docs]
@classmethod
def get_async_connection(cls, name: str) -> 'SurrealEngineAsyncConnection':
"""Get a named async connection from the registry.
Args:
name: The name of the async connection to retrieve
Returns:
The requested async connection
Raises:
KeyError: If no async connection with the given name exists
"""
if name not in cls._async_connections:
raise KeyError(f"No async connection named '{name}' exists.")
return cls._async_connections[name]
[docs]
@classmethod
def get_sync_connection(cls, name: str) -> 'SurrealEngineSyncConnection':
"""Get a named sync connection from the registry.
Args:
name: The name of the sync connection to retrieve
Returns:
The requested sync connection
Raises:
KeyError: If no sync connection with the given name exists
"""
if name not in cls._sync_connections:
raise KeyError(f"No sync connection named '{name}' exists.")
return cls._sync_connections[name]
[docs]
@classmethod
def get_connection(cls, name: str, async_mode: bool = True) -> Union['SurrealEngineAsyncConnection', 'SurrealEngineSyncConnection']:
"""Get a named connection from the registry based on the mode.
Args:
name: The name of the connection to retrieve
async_mode: Whether to get an async or sync connection
Returns:
The requested connection of the requested type
Raises:
KeyError: If no connection of the requested type with the given name exists
"""
if async_mode:
return cls.get_async_connection(name)
else:
return cls.get_sync_connection(name)
# Multi-backend connection management methods
[docs]
@classmethod
def register(cls, name: str, connection: Any, backend: str = 'surrealdb') -> None:
"""Register a connection for a specific backend.
Args:
name: The name to register the connection under
connection: The connection object
backend: The backend type ('surrealdb', 'clickhouse', etc.)
"""
if backend not in cls._backend_connections:
cls._backend_connections[backend] = {}
cls._backend_connections[backend][name] = connection
[docs]
@classmethod
def set_default(cls, backend: str, connection_name: str) -> None:
"""Set the default connection for a backend.
Args:
backend: The backend type
connection_name: The name of the connection to set as default
"""
if backend not in cls._backend_connections or connection_name not in cls._backend_connections[backend]:
raise ValueError(f"Connection '{connection_name}' not found for backend '{backend}'")
cls._default_backend_connections[backend] = connection_name
[docs]
@classmethod
def get_default_connection(cls, backend: str = 'surrealdb') -> Any:
"""Get the default connection for a backend.
Args:
backend: The backend type
Returns:
The default connection for the backend
Raises:
ValueError: If no default connection is set for the backend
"""
if backend not in cls._default_backend_connections:
raise ValueError(f"No default connection set for backend '{backend}'")
connection_name = cls._default_backend_connections[backend]
if backend not in cls._backend_connections or connection_name not in cls._backend_connections[backend]:
raise ValueError(f"Default connection '{connection_name}' not found for backend '{backend}'")
return cls._backend_connections[backend][connection_name]
[docs]
@classmethod
def get_connection_by_backend(cls, name: str, backend: str = 'surrealdb') -> Any:
"""Get a named connection for a specific backend.
Args:
name: The name of the connection
backend: The backend type
Returns:
The requested connection
Raises:
ValueError: If the connection is not found
"""
if backend not in cls._backend_connections or name not in cls._backend_connections[backend]:
raise ValueError(f"Connection '{name}' not found for backend '{backend}'")
return cls._backend_connections[backend][name]
[docs]
@classmethod
def list_backends(cls) -> List[str]:
"""List all registered backends.
Returns:
List of backend names
"""
return list(cls._backend_connections.keys())
[docs]
@classmethod
def list_connections(cls, backend: str) -> List[str]:
"""List all connections for a backend.
Args:
backend: The backend type
Returns:
List of connection names for the backend
"""
if backend not in cls._backend_connections:
return []
return list(cls._backend_connections[backend].keys())
[docs]
class SurrealEngineAsyncConnection:
"""Asynchronous connection manager for SurrealDB.
This class manages the asynchronous connection to a SurrealDB database, providing methods
for connecting, disconnecting, and executing transactions. It also provides
access to the database through the db property.
Attributes:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
client: The SurrealDB async client instance or ConnectionPoolClient
use_pool: Whether to use a connection pool
pool: The connection pool if use_pool is True
pool_size: The size of the connection pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
"""
[docs]
def __init__(self, url: Optional[str] = None, namespace: Optional[str] = None,
database: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, name: Optional[str] = None,
make_default: bool = False, use_pool: bool = False,
pool_size: int = 10, max_idle_time: int = 60,
connect_timeout: int = 30, operation_timeout: int = 30,
retry_limit: int = 3, retry_delay: float = 1.0,
retry_backoff: float = 2.0, validate_on_borrow: bool = True) -> None:
"""Initialize a new SurrealEngineAsyncConnection.
Args:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
name: The name to register this connection under in the registry
make_default: Whether to set this connection as the default
use_pool: Whether to use a connection pool
pool_size: The size of the connection pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
"""
self.url = url
self.namespace = namespace
self.database = database
self.username = username
self.password = password
self.client = None
self.use_pool = use_pool
self.pool = None
self.pool_size = pool_size
self.max_idle_time = max_idle_time
self.connect_timeout = connect_timeout
self.operation_timeout = operation_timeout
self.retry_limit = retry_limit
self.retry_delay = retry_delay
self.retry_backoff = retry_backoff
self.validate_on_borrow = validate_on_borrow
if name:
ConnectionRegistry.add_async_connection(name, self)
if make_default or name is None:
ConnectionRegistry.set_default_async_connection(self)
[docs]
async def __aenter__(self) -> 'SurrealEngineAsyncConnection':
"""Enter the async context manager.
Returns:
The connection instance
"""
await self.connect()
return self
[docs]
async def __aexit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[Any]) -> None:
"""Exit the async context manager.
Args:
exc_type: The exception type, if an exception was raised
exc_val: The exception value, if an exception was raised
exc_tb: The exception traceback, if an exception was raised
"""
await self.disconnect()
@property
def db(self) -> SurrealEngine:
"""Get dynamic table accessor.
Returns:
A SurrealEngine instance for accessing tables dynamically
"""
return SurrealEngine(self)
[docs]
async def connect(self) -> Any:
"""Connect to the database.
This method creates a new client if one doesn't exist. If use_pool is True,
it creates a connection pool and a ConnectionPoolClient. Otherwise, it creates
a direct connection to the database.
Returns:
The SurrealDB client instance or ConnectionPoolClient
"""
if not self.client:
if self.use_pool:
# Create a connection pool
self.pool = AsyncConnectionPool(
url=self.url,
namespace=self.namespace,
database=self.database,
username=self.username,
password=self.password,
pool_size=self.pool_size,
max_idle_time=self.max_idle_time,
connect_timeout=self.connect_timeout,
operation_timeout=self.operation_timeout,
retry_limit=self.retry_limit,
retry_delay=self.retry_delay,
retry_backoff=self.retry_backoff,
validate_on_borrow=self.validate_on_borrow
)
# Create a client that uses the pool
self.client = ConnectionPoolClient(self.pool)
else:
# Create the client directly
self.client = surrealdb.AsyncSurreal(self.url)
# Sign in if credentials are provided
if self.username and self.password:
await self.client.signin({"username": self.username, "password": self.password})
# Use namespace and database
if self.namespace and self.database:
await self.client.use(self.namespace, self.database)
return self.client
[docs]
async def disconnect(self) -> None:
"""Disconnect from the database.
This method closes the client connection if one exists. If use_pool is True,
it closes the connection pool.
"""
if self.client:
if self.use_pool and self.pool:
await self.pool.close()
self.pool = None
else:
await self.client.close()
self.client = None
[docs]
async def transaction(self, coroutines: list) -> list:
"""Execute multiple operations in a transaction.
This method executes a list of coroutines within a transaction,
committing the transaction if all operations succeed or canceling
it if any operation fails.
Args:
coroutines: List of coroutines to execute in the transaction
Returns:
List of results from the coroutines
Raises:
Exception: If any operation in the transaction fails
"""
await self.client.query("BEGIN TRANSACTION;")
try:
results = []
for coro in coroutines:
result = await coro
results.append(result)
await self.client.query("COMMIT TRANSACTION;")
return results
except Exception as e:
await self.client.query("CANCEL TRANSACTION;")
raise e
[docs]
def create_connection(url: Optional[str] = None, namespace: Optional[str] = None,
database: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, name: Optional[str] = None,
make_default: bool = False, async_mode: bool = True,
use_pool: bool = False, pool_size: int = 10,
max_idle_time: int = 60, connect_timeout: int = 30,
operation_timeout: int = 30, retry_limit: int = 3,
retry_delay: float = 1.0, retry_backoff: float = 2.0,
validate_on_borrow: bool = True, auto_connect: bool = False,
backend: str = 'surrealdb', **backend_kwargs) -> Any:
"""Factory function to create a connection for the specified backend.
Args:
url: The URL of the database server (for SurrealDB) or host (for ClickHouse)
namespace: The namespace to use (SurrealDB only)
database: The database to use
username: The username for authentication
password: The password for authentication
name: The name to register this connection under in the registry
make_default: Whether to set this connection as the default
async_mode: Whether to create an async or sync connection (SurrealDB only)
use_pool: Whether to use a connection pool (SurrealDB async_mode only)
pool_size: The size of the connection pool
max_idle_time: Maximum time in seconds a connection can be idle before being closed
connect_timeout: Timeout in seconds for establishing a connection
operation_timeout: Timeout in seconds for operations
retry_limit: Maximum number of retries for failed operations
retry_delay: Initial delay in seconds between retries
retry_backoff: Backoff multiplier for retry delay
validate_on_borrow: Whether to validate connections when borrowing from the pool
auto_connect: Whether to automatically connect the connection
backend: The backend type ('surrealdb', 'clickhouse', etc.)
**backend_kwargs: Additional backend-specific connection parameters
Returns:
A connection instance for the specified backend
Examples:
SurrealDB connection (default)::
surrealdb_conn = create_connection(
url="ws://localhost:8000/rpc",
namespace="test", database="test",
username="root", password="root"
)
ClickHouse connection::
clickhouse_conn = create_connection(
url="localhost", database="default",
username="default", password="",
backend="clickhouse", port=8123
)
"""
if backend == 'surrealdb':
# Legacy SurrealDB connection creation
if async_mode:
connection = SurrealEngineAsyncConnection(
url=url,
namespace=namespace,
database=database,
username=username,
password=password,
name=name,
make_default=make_default,
use_pool=use_pool,
pool_size=pool_size,
max_idle_time=max_idle_time,
connect_timeout=connect_timeout,
operation_timeout=operation_timeout,
retry_limit=retry_limit,
retry_delay=retry_delay,
retry_backoff=retry_backoff,
validate_on_borrow=validate_on_borrow
)
else:
connection = SurrealEngineSyncConnection(
url=url,
namespace=namespace,
database=database,
username=username,
password=password,
name=name,
make_default=make_default
)
# Register in the unified backend system as well
if name:
ConnectionRegistry.register(name, connection, backend)
if make_default or name is None:
if name:
ConnectionRegistry.set_default(backend, name)
else:
# Create a default name for unified registry
default_name = f'{backend}_default'
ConnectionRegistry.register(default_name, connection, backend)
ConnectionRegistry.set_default(backend, default_name)
# Auto-connect if requested
if auto_connect:
if async_mode:
# We can't await here, so we'll return the connection without connecting
# The caller will need to await connection.connect() before using it
pass
else:
connection.connect()
return connection
elif backend == 'clickhouse':
# ClickHouse connection creation using modular backend system
from .backends import BackendRegistry
# Check if ClickHouse backend is available
if not BackendRegistry.is_backend_available('clickhouse'):
# This will raise ImportError with helpful message
BackendRegistry.get_backend('clickhouse')
try:
import clickhouse_connect
except ImportError:
# This should be caught by the BackendRegistry, but just in case
raise ImportError("ClickHouse backend requires 'clickhouse-connect'. Install with: pip install quantumengine[clickhouse]")
# Build ClickHouse connection parameters
clickhouse_params = {
'host': url or 'localhost',
'database': database or 'default',
'username': username or 'default',
'password': password or '',
**backend_kwargs # port, secure, etc.
}
# Create ClickHouse client
connection = clickhouse_connect.get_client(**clickhouse_params)
# Register in ConnectionRegistry
if name:
ConnectionRegistry.register(name, connection, backend)
if make_default or name is None:
if name:
ConnectionRegistry.set_default(backend, name)
else:
# Create a default name
default_name = f'{backend}_default'
ConnectionRegistry.register(default_name, connection, backend)
ConnectionRegistry.set_default(backend, default_name)
return connection
else:
# Generic backend connection creation using BackendRegistry
from .backends import BackendRegistry
# Check if the backend is available
try:
backend_class = BackendRegistry.get_backend(backend)
except (ImportError, ValueError) as e:
# Re-raise with additional context
available = BackendRegistry.list_backends()
failed = BackendRegistry.list_failed_backends()
error_msg = str(e)
if failed:
error_msg += f"\n\nAvailable installation commands:"
for failed_backend, install_msg in failed.items():
error_msg += f"\n - {install_msg}"
if isinstance(e, ImportError):
raise ImportError(error_msg)
else:
raise ValueError(error_msg)
# For future backends, we would create the connection here
# For now, raise an error for unsupported backends
raise ValueError(f"Backend '{backend}' found but connection creation not yet implemented")
[docs]
class SurrealEngineSyncConnection:
"""Synchronous connection manager for SurrealDB.
This class manages the synchronous connection to a SurrealDB database, providing methods
for connecting, disconnecting, and executing transactions. It also provides
access to the database through the db property.
Attributes:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
client: The SurrealDB sync client instance
"""
[docs]
def __init__(self, url: Optional[str] = None, namespace: Optional[str] = None,
database: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, name: Optional[str] = None,
make_default: bool = False) -> None:
"""Initialize a new SurrealEngineSyncConnection.
Args:
url: The URL of the SurrealDB server
namespace: The namespace to use
database: The database to use
username: The username for authentication
password: The password for authentication
name: The name to register this connection under in the registry
make_default: Whether to set this connection as the default
"""
self.url = url
self.namespace = namespace
self.database = database
self.username = username
self.password = password
self.client = None
if name:
ConnectionRegistry.add_sync_connection(name, self)
if make_default or name is None:
ConnectionRegistry.set_default_sync_connection(self)
[docs]
def __enter__(self) -> 'SurrealEngineSyncConnection':
"""Enter the sync context manager.
Returns:
The connection instance
"""
self.connect()
return self
[docs]
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[Any]) -> None:
"""Exit the sync context manager.
Args:
exc_type: The exception type, if an exception was raised
exc_val: The exception value, if an exception was raised
exc_tb: The exception traceback, if an exception was raised
"""
self.disconnect()
@property
def db(self) -> SurrealEngine:
"""Get dynamic table accessor.
Returns:
A SurrealEngine instance for accessing tables dynamically
"""
return SurrealEngine(self)
[docs]
def connect(self) -> Any:
"""Connect to the database.
This method creates a new client if one doesn't exist, signs in if
credentials are provided, and sets the namespace and database.
Returns:
The SurrealDB client instance
"""
if not self.client:
# Create the client directly
self.client = surrealdb.Surreal(self.url)
# Sign in if credentials are provided
if self.username and self.password:
self.client.signin({"username": self.username, "password": self.password})
# Use namespace and database
if self.namespace and self.database:
self.client.use(self.namespace, self.database)
return self.client
[docs]
def disconnect(self) -> None:
"""Disconnect from the database.
This method closes the client connection if one exists.
"""
if self.client:
self.client.close()
self.client = None
[docs]
def transaction(self, callables: list) -> list:
"""Execute multiple operations in a transaction.
This method executes a list of callables within a transaction,
committing the transaction if all operations succeed or canceling
it if any operation fails.
Args:
callables: List of callables to execute in the transaction
Returns:
List of results from the callables
Raises:
Exception: If any operation in the transaction fails
"""
self.client.query("BEGIN TRANSACTION;")
try:
results = []
for func in callables:
result = func()
results.append(result)
self.client.query("COMMIT TRANSACTION;")
return results
except Exception as e:
self.client.query("CANCEL TRANSACTION;")
raise e