uni_pydantic

uni-pydantic: Pydantic-based OGM for Uni Graph Database.

This package provides a type-safe Object-Graph Mapping layer on top of the Uni graph database, using Pydantic v2 for model definitions.

Example:

from uni_db import Database from uni_pydantic import UniNode, UniSession, Field, Relationship, Vector

class Person(UniNode): ... name: str ... age: int | None = None ... email: str = Field(unique=True) ... embedding: Vector[1536] ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both")

db = Database("./my_graph") session = UniSession(db) session.register(Person) session.sync_schema()

alice = Person(name="Alice", age=30, email="alice@example.com") session.add(alice) session.commit()

Query with type safety

adults = session.query(Person).filter(Person.age >= 18).all()

  1# SPDX-License-Identifier: Apache-2.0
  2# Copyright 2024-2026 Dragonscale Team
  3
  4"""
  5uni-pydantic: Pydantic-based OGM for Uni Graph Database.
  6
  7This package provides a type-safe Object-Graph Mapping layer on top of
  8the Uni graph database, using Pydantic v2 for model definitions.
  9
 10Example:
 11    >>> from uni_db import Database
 12    >>> from uni_pydantic import UniNode, UniSession, Field, Relationship, Vector
 13    >>>
 14    >>> class Person(UniNode):
 15    ...     name: str
 16    ...     age: int | None = None
 17    ...     email: str = Field(unique=True)
 18    ...     embedding: Vector[1536]
 19    ...     friends: list["Person"] = Relationship("FRIEND_OF", direction="both")
 20    >>>
 21    >>> db = Database("./my_graph")
 22    >>> session = UniSession(db)
 23    >>> session.register(Person)
 24    >>> session.sync_schema()
 25    >>>
 26    >>> alice = Person(name="Alice", age=30, email="alice@example.com")
 27    >>> session.add(alice)
 28    >>> session.commit()
 29    >>>
 30    >>> # Query with type safety
 31    >>> adults = session.query(Person).filter(Person.age >= 18).all()
 32"""
 33
 34__version__ = "0.1.0"
 35
 36# Base classes
 37# Async support
 38from .async_query import AsyncQueryBuilder
 39from .async_session import AsyncUniSession, AsyncUniTransaction
 40from .base import UniEdge, UniNode
 41
 42# Database wrappers
 43from .database import AsyncUniDatabase, UniDatabase
 44
 45# Exceptions
 46from .exceptions import (
 47    BulkLoadError,
 48    CypherInjectionError,
 49    LazyLoadError,
 50    NotPersisted,
 51    NotRegisteredError,
 52    NotTrackedError,
 53    QueryError,
 54    RelationshipError,
 55    SchemaError,
 56    SessionError,
 57    TransactionError,
 58    TypeMappingError,
 59    UniPydanticError,
 60    ValidationError,
 61)
 62
 63# Field configuration
 64from .fields import (
 65    Direction,
 66    Field,
 67    FieldConfig,
 68    IndexType,
 69    Relationship,
 70    RelationshipConfig,
 71    RelationshipDescriptor,
 72    VectorMetric,
 73    get_field_config,
 74)
 75
 76# Lifecycle hooks
 77from .hooks import (
 78    after_create,
 79    after_delete,
 80    after_load,
 81    after_update,
 82    before_create,
 83    before_delete,
 84    before_load,
 85    before_update,
 86)
 87
 88# Query builder
 89from .query import (
 90    FilterExpr,
 91    FilterOp,
 92    ModelProxy,
 93    OrderByClause,
 94    PropertyProxy,
 95    QueryBuilder,
 96    TraversalStep,
 97    VectorSearchConfig,
 98)
 99
100# Schema generation
101from .schema import (
102    DatabaseSchema,
103    EdgeTypeSchema,
104    LabelSchema,
105    PropertySchema,
106    SchemaGenerator,
107    generate_schema,
108)
109
110# Session management
111from .session import UniSession, UniTransaction
112
113# Type utilities
114from .types import (
115    DATETIME_TYPES,
116    Vector,
117    db_to_python_value,
118    get_vector_dimensions,
119    is_list_type,
120    is_optional,
121    python_to_db_value,
122    python_type_to_uni,
123    uni_to_python_type,
124    unwrap_annotated,
125)
126
127__all__ = [
128    # Version
129    "__version__",
130    # Base classes
131    "UniNode",
132    "UniEdge",
133    # Session
134    "UniSession",
135    "UniTransaction",
136    # Async Session
137    "AsyncUniSession",
138    "AsyncUniTransaction",
139    # Fields
140    "Field",
141    "FieldConfig",
142    "Relationship",
143    "RelationshipConfig",
144    "RelationshipDescriptor",
145    "get_field_config",
146    "IndexType",
147    "Direction",
148    "VectorMetric",
149    # Types
150    "Vector",
151    "python_type_to_uni",
152    "uni_to_python_type",
153    "get_vector_dimensions",
154    "is_optional",
155    "is_list_type",
156    "unwrap_annotated",
157    "python_to_db_value",
158    "db_to_python_value",
159    "DATETIME_TYPES",
160    # Query
161    "QueryBuilder",
162    "AsyncQueryBuilder",
163    "FilterExpr",
164    "FilterOp",
165    "PropertyProxy",
166    "ModelProxy",
167    "OrderByClause",
168    "TraversalStep",
169    "VectorSearchConfig",
170    # Schema
171    "SchemaGenerator",
172    "DatabaseSchema",
173    "LabelSchema",
174    "EdgeTypeSchema",
175    "PropertySchema",
176    "generate_schema",
177    # Database
178    "UniDatabase",
179    "AsyncUniDatabase",
180    # Hooks
181    "before_create",
182    "after_create",
183    "before_update",
184    "after_update",
185    "before_delete",
186    "after_delete",
187    "before_load",
188    "after_load",
189    # Exceptions
190    "UniPydanticError",
191    "SchemaError",
192    "TypeMappingError",
193    "ValidationError",
194    "SessionError",
195    "NotRegisteredError",
196    "NotPersisted",
197    "NotTrackedError",
198    "TransactionError",
199    "QueryError",
200    "RelationshipError",
201    "LazyLoadError",
202    "BulkLoadError",
203    "CypherInjectionError",
204]
__version__ = '0.1.0'
class UniNode(pydantic.main.BaseModel):
159class UniNode(BaseModel, metaclass=UniModelMeta):
160    """
161    Base class for graph node models.
162
163    Subclass this to define your node types. Each UniNode subclass
164    represents a vertex label in the graph database.
165
166    Attributes:
167        __label__: The vertex label name. Defaults to the class name.
168        __relationships__: Dictionary of relationship configurations.
169
170    Private Attributes:
171        _vid: The vertex ID assigned by the database.
172        _uid: The unique identifier (content-addressed hash).
173        _session: Reference to the owning session.
174        _dirty: Set of modified field names.
175
176    Example:
177        >>> class Person(UniNode):
178        ...     __label__ = "Person"
179        ...
180        ...     name: str
181        ...     age: int | None = None
182        ...     email: str = Field(unique=True)
183        ...
184        ...     friends: list["Person"] = Relationship("FRIEND_OF", direction="both")
185    """
186
187    model_config = ConfigDict(
188        # Allow extra fields for future extensibility
189        extra="forbid",
190        # Validate on assignment for dirty tracking
191        validate_assignment=True,
192        # Allow arbitrary types (for Vector, etc.)
193        arbitrary_types_allowed=True,
194        # Use enum values
195        use_enum_values=True,
196    )
197
198    # Class-level configuration
199    __label__: ClassVar[str] = ""
200    __relationships__: ClassVar[dict[str, RelationshipConfig]] = {}
201
202    # Private attributes for session tracking
203    _vid: int | None = PrivateAttr(default=None)
204    _uid: str | None = PrivateAttr(default=None)
205    _session: UniSession | None = PrivateAttr(default=None)
206    _dirty: set[str] = PrivateAttr(default_factory=set)
207    _is_new: bool = PrivateAttr(default=True)
208
209    def __init_subclass__(cls, **kwargs: Any) -> None:
210        super().__init_subclass__(**kwargs)
211        # Set default label to class name if not specified
212        if not cls.__label__:
213            cls.__label__ = cls.__name__
214
215    def model_post_init(self, __context: Any) -> None:
216        """Clear dirty tracking after construction."""
217        super().model_post_init(__context)
218        self._dirty = set()
219
220    @property
221    def vid(self) -> int | None:
222        """The vertex ID assigned by the database."""
223        return self._vid
224
225    @property
226    def uid(self) -> str | None:
227        """The unique identifier (content-addressed hash)."""
228        return self._uid
229
230    @property
231    def is_persisted(self) -> bool:
232        """Whether this node has been saved to the database."""
233        return self._vid is not None
234
235    @property
236    def is_dirty(self) -> bool:
237        """Whether this node has unsaved changes."""
238        return bool(self._dirty)
239
240    def __setattr__(self, name: str, value: Any) -> None:
241        # Track dirty fields (but not private attributes)
242        if not name.startswith("_") and hasattr(self, "_dirty"):
243            self._dirty.add(name)
244        super().__setattr__(name, value)
245
246    def _mark_clean(self) -> None:
247        """Mark all fields as clean (called after commit)."""
248        self._dirty.clear()
249        self._is_new = False
250
251    def _attach_session(
252        self, session: UniSession, vid: int, uid: str | None = None
253    ) -> None:
254        """Attach this node to a session with its database IDs."""
255        self._session = session
256        self._vid = vid
257        self._uid = uid
258        self._is_new = False
259
260    @classmethod
261    def get_property_fields(cls) -> dict[str, FieldInfo]:
262        """Get all property fields (excluding relationships)."""
263        return {
264            name: info
265            for name, info in cls.model_fields.items()
266            if name not in cls.__relationships__
267        }
268
269    @classmethod
270    def get_relationship_fields(cls) -> dict[str, RelationshipConfig]:
271        """Get all relationship field configurations."""
272        return cls.__relationships__
273
274    def to_properties(self) -> dict[str, Any]:
275        """Convert to a property dictionary for database storage.
276
277        Uses python_to_db_value() for type conversion. Includes None
278        explicitly so null-outs work.
279        """
280        return _model_to_properties(self, self.get_property_fields())
281
282    @classmethod
283    def from_properties(
284        cls,
285        props: dict[str, Any],
286        *,
287        vid: int | None = None,
288        uid: str | None = None,
289        session: UniSession | None = None,
290    ) -> UniNode:
291        """Create an instance from a property dictionary.
292
293        Accepts _id (string->int vid) and _label from uni-db node dicts.
294        Does not mutate the input dict.
295        """
296        data = dict(props)
297
298        raw_id = data.pop("_id", None)
299        if raw_id is not None and vid is None:
300            vid = int(raw_id) if not isinstance(raw_id, int) else raw_id
301        data.pop("_label", None)
302
303        converted = _convert_db_values(data, cls)
304
305        instance = cls.model_validate(converted)
306        if vid is not None:
307            instance._vid = vid
308        if uid is not None:
309            instance._uid = uid
310        if session is not None:
311            instance._session = session
312        instance._is_new = vid is None
313        instance._dirty = set()
314        return instance
315
316    def __repr__(self) -> str:
317        vid_str = f"vid={self._vid}" if self._vid else "unsaved"
318        return f"{self.__class__.__name__}({vid_str}, {super().__repr__()})"

Base class for graph node models.

Subclass this to define your node types. Each UniNode subclass represents a vertex label in the graph database.

Attributes: __label__: The vertex label name. Defaults to the class name. __relationships__: Dictionary of relationship configurations.

Private Attributes: _vid: The vertex ID assigned by the database. _uid: The unique identifier (content-addressed hash). _session: Reference to the owning session. _dirty: Set of modified field names.

Example:

class Person(UniNode): ... __label__ = "Person" ... ... name: str ... age: int | None = None ... email: str = Field(unique=True) ... ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both")

vid: int | None
220    @property
221    def vid(self) -> int | None:
222        """The vertex ID assigned by the database."""
223        return self._vid

The vertex ID assigned by the database.

uid: str | None
225    @property
226    def uid(self) -> str | None:
227        """The unique identifier (content-addressed hash)."""
228        return self._uid

The unique identifier (content-addressed hash).

is_persisted: bool
230    @property
231    def is_persisted(self) -> bool:
232        """Whether this node has been saved to the database."""
233        return self._vid is not None

Whether this node has been saved to the database.

is_dirty: bool
235    @property
236    def is_dirty(self) -> bool:
237        """Whether this node has unsaved changes."""
238        return bool(self._dirty)

Whether this node has unsaved changes.

@classmethod
def get_property_fields(cls) -> dict[str, pydantic.fields.FieldInfo]:
260    @classmethod
261    def get_property_fields(cls) -> dict[str, FieldInfo]:
262        """Get all property fields (excluding relationships)."""
263        return {
264            name: info
265            for name, info in cls.model_fields.items()
266            if name not in cls.__relationships__
267        }

Get all property fields (excluding relationships).

@classmethod
def get_relationship_fields(cls) -> dict[str, RelationshipConfig]:
269    @classmethod
270    def get_relationship_fields(cls) -> dict[str, RelationshipConfig]:
271        """Get all relationship field configurations."""
272        return cls.__relationships__

Get all relationship field configurations.

def to_properties(self) -> dict[str, typing.Any]:
274    def to_properties(self) -> dict[str, Any]:
275        """Convert to a property dictionary for database storage.
276
277        Uses python_to_db_value() for type conversion. Includes None
278        explicitly so null-outs work.
279        """
280        return _model_to_properties(self, self.get_property_fields())

Convert to a property dictionary for database storage.

Uses python_to_db_value() for type conversion. Includes None explicitly so null-outs work.

@classmethod
def from_properties( cls, props: dict[str, typing.Any], *, vid: int | None = None, uid: str | None = None, session: UniSession | None = None) -> UniNode:
282    @classmethod
283    def from_properties(
284        cls,
285        props: dict[str, Any],
286        *,
287        vid: int | None = None,
288        uid: str | None = None,
289        session: UniSession | None = None,
290    ) -> UniNode:
291        """Create an instance from a property dictionary.
292
293        Accepts _id (string->int vid) and _label from uni-db node dicts.
294        Does not mutate the input dict.
295        """
296        data = dict(props)
297
298        raw_id = data.pop("_id", None)
299        if raw_id is not None and vid is None:
300            vid = int(raw_id) if not isinstance(raw_id, int) else raw_id
301        data.pop("_label", None)
302
303        converted = _convert_db_values(data, cls)
304
305        instance = cls.model_validate(converted)
306        if vid is not None:
307            instance._vid = vid
308        if uid is not None:
309            instance._uid = uid
310        if session is not None:
311            instance._session = session
312        instance._is_new = vid is None
313        instance._dirty = set()
314        return instance

Create an instance from a property dictionary.

Accepts _id (string->int vid) and _label from uni-db node dicts. Does not mutate the input dict.

class UniEdge(pydantic.main.BaseModel):
321class UniEdge(BaseModel, metaclass=UniModelMeta):
322    """
323    Base class for graph edge models with properties.
324
325    Subclass this to define edge types with typed properties.
326    Edges represent relationships between nodes.
327
328    Attributes:
329        __edge_type__: The edge type name.
330        __from__: The source node type(s).
331        __to__: The target node type(s).
332
333    Private Attributes:
334        _eid: The edge ID assigned by the database.
335        _src_vid: The source vertex ID.
336        _dst_vid: The destination vertex ID.
337        _session: Reference to the owning session.
338
339    Example:
340        >>> class FriendshipEdge(UniEdge):
341        ...     __edge_type__ = "FRIEND_OF"
342        ...     __from__ = Person
343        ...     __to__ = Person
344        ...
345        ...     since: date
346        ...     strength: float = 1.0
347    """
348
349    model_config = ConfigDict(
350        extra="forbid",
351        validate_assignment=True,
352        arbitrary_types_allowed=True,
353        use_enum_values=True,
354    )
355
356    # Class-level configuration
357    __edge_type__: ClassVar[str] = ""
358    __from__: ClassVar[type[UniNode] | tuple[type[UniNode], ...] | None] = None
359    __to__: ClassVar[type[UniNode] | tuple[type[UniNode], ...] | None] = None
360    __relationships__: ClassVar[dict[str, RelationshipConfig]] = {}
361
362    # Private attributes
363    _eid: int | None = PrivateAttr(default=None)
364    _src_vid: int | None = PrivateAttr(default=None)
365    _dst_vid: int | None = PrivateAttr(default=None)
366    _session: UniSession | None = PrivateAttr(default=None)
367    _is_new: bool = PrivateAttr(default=True)
368
369    def __init_subclass__(cls, **kwargs: Any) -> None:
370        super().__init_subclass__(**kwargs)
371        # Set default edge type to class name if not specified
372        if not cls.__edge_type__:
373            cls.__edge_type__ = cls.__name__
374
375    @property
376    def eid(self) -> int | None:
377        """The edge ID assigned by the database."""
378        return self._eid
379
380    @property
381    def src_vid(self) -> int | None:
382        """The source vertex ID."""
383        return self._src_vid
384
385    @property
386    def dst_vid(self) -> int | None:
387        """The destination vertex ID."""
388        return self._dst_vid
389
390    @property
391    def is_persisted(self) -> bool:
392        """Whether this edge has been saved to the database."""
393        return self._eid is not None
394
395    def _attach(
396        self,
397        session: UniSession,
398        eid: int,
399        src_vid: int,
400        dst_vid: int,
401    ) -> None:
402        """Attach this edge to a session with its database IDs."""
403        self._session = session
404        self._eid = eid
405        self._src_vid = src_vid
406        self._dst_vid = dst_vid
407        self._is_new = False
408
409    @classmethod
410    def get_from_labels(cls) -> list[str]:
411        """Get the source label names."""
412        if cls.__from__ is None:
413            return []
414        if isinstance(cls.__from__, tuple):
415            return [n.__label__ for n in cls.__from__]
416        return [cls.__from__.__label__]
417
418    @classmethod
419    def get_to_labels(cls) -> list[str]:
420        """Get the target label names."""
421        if cls.__to__ is None:
422            return []
423        if isinstance(cls.__to__, tuple):
424            return [n.__label__ for n in cls.__to__]
425        return [cls.__to__.__label__]
426
427    @classmethod
428    def get_property_fields(cls) -> dict[str, FieldInfo]:
429        """Get all property fields."""
430        return dict(cls.model_fields)
431
432    def to_properties(self) -> dict[str, Any]:
433        """Convert to a property dictionary for database storage."""
434        return _model_to_properties(self, self.get_property_fields())
435
436    @classmethod
437    def from_properties(
438        cls,
439        props: dict[str, Any],
440        *,
441        eid: int | None = None,
442        src_vid: int | None = None,
443        dst_vid: int | None = None,
444        session: UniSession | None = None,
445    ) -> UniEdge:
446        """Create an instance from a property dictionary.
447
448        Accepts _id, _type, _src, _dst from uni-db edge dicts.
449        Does not mutate the input dict.
450        """
451        data = dict(props)
452
453        raw_id = data.pop("_id", None)
454        if raw_id is not None and eid is None:
455            eid = int(raw_id) if not isinstance(raw_id, int) else raw_id
456        data.pop("_type", None)
457        raw_src = data.pop("_src", None)
458        if raw_src is not None and src_vid is None:
459            src_vid = int(raw_src) if not isinstance(raw_src, int) else raw_src
460        raw_dst = data.pop("_dst", None)
461        if raw_dst is not None and dst_vid is None:
462            dst_vid = int(raw_dst) if not isinstance(raw_dst, int) else raw_dst
463
464        converted = _convert_db_values(data, cls)
465
466        instance = cls.model_validate(converted)
467        if eid is not None:
468            instance._eid = eid
469        if src_vid is not None:
470            instance._src_vid = src_vid
471        if dst_vid is not None:
472            instance._dst_vid = dst_vid
473        if session is not None:
474            instance._session = session
475        instance._is_new = eid is None
476        return instance
477
478    @classmethod
479    def from_edge_result(
480        cls,
481        data: dict[str, Any],
482        *,
483        session: UniSession | None = None,
484    ) -> UniEdge:
485        """Create an instance from a uni-db edge result dict.
486
487        Convenience method that handles _id, _type, _src, _dst keys.
488        """
489        return cls.from_properties(data, session=session)
490
491    def __repr__(self) -> str:
492        eid_str = f"eid={self._eid}" if self._eid else "unsaved"
493        return f"{self.__class__.__name__}({eid_str}, {super().__repr__()})"

Base class for graph edge models with properties.

Subclass this to define edge types with typed properties. Edges represent relationships between nodes.

Attributes: __edge_type__: The edge type name. __from__: The source node type(s). __to__: The target node type(s).

Private Attributes: _eid: The edge ID assigned by the database. _src_vid: The source vertex ID. _dst_vid: The destination vertex ID. _session: Reference to the owning session.

Example:

class FriendshipEdge(UniEdge): ... __edge_type__ = "FRIEND_OF" ... __from__ = Person ... __to__ = Person ... ... since: date ... strength: float = 1.0

eid: int | None
375    @property
376    def eid(self) -> int | None:
377        """The edge ID assigned by the database."""
378        return self._eid

The edge ID assigned by the database.

src_vid: int | None
380    @property
381    def src_vid(self) -> int | None:
382        """The source vertex ID."""
383        return self._src_vid

The source vertex ID.

dst_vid: int | None
385    @property
386    def dst_vid(self) -> int | None:
387        """The destination vertex ID."""
388        return self._dst_vid

The destination vertex ID.

is_persisted: bool
390    @property
391    def is_persisted(self) -> bool:
392        """Whether this edge has been saved to the database."""
393        return self._eid is not None

Whether this edge has been saved to the database.

@classmethod
def get_from_labels(cls) -> list[str]:
409    @classmethod
410    def get_from_labels(cls) -> list[str]:
411        """Get the source label names."""
412        if cls.__from__ is None:
413            return []
414        if isinstance(cls.__from__, tuple):
415            return [n.__label__ for n in cls.__from__]
416        return [cls.__from__.__label__]

Get the source label names.

@classmethod
def get_to_labels(cls) -> list[str]:
418    @classmethod
419    def get_to_labels(cls) -> list[str]:
420        """Get the target label names."""
421        if cls.__to__ is None:
422            return []
423        if isinstance(cls.__to__, tuple):
424            return [n.__label__ for n in cls.__to__]
425        return [cls.__to__.__label__]

Get the target label names.

@classmethod
def get_property_fields(cls) -> dict[str, pydantic.fields.FieldInfo]:
427    @classmethod
428    def get_property_fields(cls) -> dict[str, FieldInfo]:
429        """Get all property fields."""
430        return dict(cls.model_fields)

Get all property fields.

def to_properties(self) -> dict[str, typing.Any]:
432    def to_properties(self) -> dict[str, Any]:
433        """Convert to a property dictionary for database storage."""
434        return _model_to_properties(self, self.get_property_fields())

Convert to a property dictionary for database storage.

@classmethod
def from_properties( cls, props: dict[str, typing.Any], *, eid: int | None = None, src_vid: int | None = None, dst_vid: int | None = None, session: UniSession | None = None) -> UniEdge:
436    @classmethod
437    def from_properties(
438        cls,
439        props: dict[str, Any],
440        *,
441        eid: int | None = None,
442        src_vid: int | None = None,
443        dst_vid: int | None = None,
444        session: UniSession | None = None,
445    ) -> UniEdge:
446        """Create an instance from a property dictionary.
447
448        Accepts _id, _type, _src, _dst from uni-db edge dicts.
449        Does not mutate the input dict.
450        """
451        data = dict(props)
452
453        raw_id = data.pop("_id", None)
454        if raw_id is not None and eid is None:
455            eid = int(raw_id) if not isinstance(raw_id, int) else raw_id
456        data.pop("_type", None)
457        raw_src = data.pop("_src", None)
458        if raw_src is not None and src_vid is None:
459            src_vid = int(raw_src) if not isinstance(raw_src, int) else raw_src
460        raw_dst = data.pop("_dst", None)
461        if raw_dst is not None and dst_vid is None:
462            dst_vid = int(raw_dst) if not isinstance(raw_dst, int) else raw_dst
463
464        converted = _convert_db_values(data, cls)
465
466        instance = cls.model_validate(converted)
467        if eid is not None:
468            instance._eid = eid
469        if src_vid is not None:
470            instance._src_vid = src_vid
471        if dst_vid is not None:
472            instance._dst_vid = dst_vid
473        if session is not None:
474            instance._session = session
475        instance._is_new = eid is None
476        return instance

Create an instance from a property dictionary.

Accepts _id, _type, _src, _dst from uni-db edge dicts. Does not mutate the input dict.

@classmethod
def from_edge_result( cls, data: dict[str, typing.Any], *, session: UniSession | None = None) -> UniEdge:
478    @classmethod
479    def from_edge_result(
480        cls,
481        data: dict[str, Any],
482        *,
483        session: UniSession | None = None,
484    ) -> UniEdge:
485        """Create an instance from a uni-db edge result dict.
486
487        Convenience method that handles _id, _type, _src, _dst keys.
488        """
489        return cls.from_properties(data, session=session)

Create an instance from a uni-db edge result dict.

Convenience method that handles _id, _type, _src, _dst keys.

class UniSession:
160class UniSession:
161    """
162    Session for interacting with the graph database using Pydantic models.
163
164    The session manages model registration, schema synchronization,
165    and provides CRUD operations and query building.
166
167    Example:
168        >>> from uni_db import Database
169        >>> from uni_pydantic import UniSession
170        >>>
171        >>> db = Database("./my_graph")
172        >>> session = UniSession(db)
173        >>> session.register(Person, Company)
174        >>> session.sync_schema()
175        >>>
176        >>> alice = Person(name="Alice", age=30)
177        >>> session.add(alice)
178        >>> session.commit()
179    """
180
181    def __init__(self, db: uni_db.Database) -> None:
182        self._db = db
183        self._schema_gen = SchemaGenerator()
184        self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = (
185            WeakValueDictionary()
186        )
187        self._pending_new: list[UniNode] = []
188        self._pending_delete: list[UniNode] = []
189
190    def __enter__(self) -> UniSession:
191        return self
192
193    def __exit__(
194        self,
195        exc_type: type[BaseException] | None,
196        exc_val: BaseException | None,
197        exc_tb: TracebackType | None,
198    ) -> None:
199        self.close()
200
201    def close(self) -> None:
202        """Close the session and clear all pending state."""
203        self._pending_new.clear()
204        self._pending_delete.clear()
205
206    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
207        """
208        Register model classes with the session.
209
210        Registered models can be used for schema generation and queries.
211
212        Args:
213            *models: UniNode or UniEdge subclasses to register.
214        """
215        self._schema_gen.register(*models)
216
217    def sync_schema(self) -> None:
218        """
219        Synchronize database schema with registered models.
220
221        Creates labels, edge types, properties, and indexes as needed.
222        This is additive-only; it won't remove existing schema elements.
223        """
224        self._schema_gen.apply_to_database(self._db)
225
226    def query(self, model: type[NodeT]) -> QueryBuilder[NodeT]:
227        """
228        Create a query builder for the given model.
229
230        Args:
231            model: The UniNode subclass to query.
232
233        Returns:
234            A QueryBuilder for constructing queries.
235        """
236        return QueryBuilder(self, model)
237
238    def add(self, entity: UniNode) -> None:
239        """
240        Add a new entity to be persisted.
241
242        The entity will be inserted on the next commit().
243        """
244        if entity.is_persisted:
245            raise SessionError(f"Entity {entity!r} is already persisted")
246        entity._session = self
247        self._pending_new.append(entity)
248
249    def add_all(self, entities: Sequence[UniNode]) -> None:
250        """Add multiple entities to be persisted."""
251        for entity in entities:
252            self.add(entity)
253
254    def delete(self, entity: UniNode) -> None:
255        """Mark an entity for deletion."""
256        if not entity.is_persisted:
257            raise NotPersisted(entity)
258        self._pending_delete.append(entity)
259
260    def get(
261        self,
262        model: type[NodeT],
263        vid: int | None = None,
264        uid: str | None = None,
265        **kwargs: Any,
266    ) -> NodeT | None:
267        """
268        Get an entity by ID or unique properties.
269
270        Args:
271            model: The model type to retrieve.
272            vid: Vertex ID to look up.
273            uid: Unique ID to look up.
274            **kwargs: Property equality filters.
275
276        Returns:
277            The model instance or None if not found.
278        """
279        # Check identity map first
280        if vid is not None:
281            cached = self._identity_map.get((model.__label__, vid))
282            if cached is not None:
283                return cached  # type: ignore[return-value]
284
285        # Build query
286        label = model.__label__
287        params: dict[str, Any] = {}
288
289        if vid is not None:
290            cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
291            params["vid"] = vid
292        elif uid is not None:
293            cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}"
294            params["uid"] = uid
295        elif kwargs:
296            # Validate property names
297            for k in kwargs:
298                _validate_property(k, model)
299            conditions = [f"n.{k} = ${k}" for k in kwargs]
300            cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1"
301            params.update(kwargs)
302        else:
303            raise ValueError("Must provide vid, uid, or property filters")
304
305        results = self._db.query(cypher, params)
306        if not results:
307            return None
308
309        node_data = _row_to_node_dict(results[0])
310        if node_data is None:
311            return None
312        return self._result_to_model(node_data, model)
313
314    def refresh(self, entity: UniNode) -> None:
315        """Refresh an entity's properties from the database."""
316        if not entity.is_persisted:
317            raise NotPersisted(entity)
318
319        label = entity.__class__.__label__
320        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
321        results = self._db.query(cypher, {"vid": entity._vid})
322
323        if not results:
324            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
325
326        # Update properties
327        props = _row_to_node_dict(results[0])
328        if props is None:
329            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
330        try:
331            hints = get_type_hints(type(entity))
332        except Exception:
333            hints = {}
334
335        for field_name in entity.get_property_fields():
336            if field_name in props:
337                value = props[field_name]
338                if field_name in hints:
339                    value = db_to_python_value(value, hints[field_name])
340                setattr(entity, field_name, value)
341
342        entity._mark_clean()
343
344    def commit(self) -> None:
345        """
346        Commit all pending changes to the database.
347
348        This persists new entities, updates dirty entities,
349        and deletes marked entities.
350        """
351        # Insert new entities
352        for entity in self._pending_new:
353            self._create_node(entity)
354
355        # Update dirty entities in identity map
356        for (label, vid), entity in list(self._identity_map.items()):
357            if entity.is_dirty and entity.is_persisted:
358                self._update_node(entity)
359
360        # Delete marked entities
361        for entity in self._pending_delete:
362            self._delete_node(entity)
363
364        # Flush to storage
365        self._db.flush()
366
367        # Clear pending lists
368        self._pending_new.clear()
369        self._pending_delete.clear()
370
371    def rollback(self) -> None:
372        """Discard all pending changes."""
373        # Clear pending new — detach entities
374        for entity in self._pending_new:
375            entity._session = None
376        self._pending_new.clear()
377
378        # Clear pending deletes
379        self._pending_delete.clear()
380
381        # Invalidate dirty identity map entries
382        for entity in list(self._identity_map.values()):
383            if entity.is_dirty:
384                self.refresh(entity)
385
386    @contextmanager
387    def transaction(self) -> Iterator[UniTransaction]:
388        """Create a transaction context."""
389        tx = UniTransaction(self)
390        with tx:
391            yield tx
392
393    def begin(self) -> UniTransaction:
394        """Begin a new transaction."""
395        tx = UniTransaction(self)
396        tx._tx = self._db.begin()
397        return tx
398
399    def cypher(
400        self,
401        query: str,
402        params: dict[str, Any] | None = None,
403        result_type: type[NodeT] | None = None,
404    ) -> list[NodeT] | list[dict[str, Any]]:
405        """
406        Execute a raw Cypher query.
407
408        Args:
409            query: Cypher query string.
410            params: Query parameters.
411            result_type: Optional model type for result mapping.
412
413        Returns:
414            List of results (model instances if result_type provided).
415        """
416        results = self._db.query(query, params)
417
418        if result_type is None:
419            return cast(list[dict[str, Any]], results)
420
421        # Map results to model instances
422        mapped = []
423        for row in results:
424            # Try to find node data in the row
425            for key, value in row.items():
426                if isinstance(value, dict):
427                    # Check for _id/_label keys (uni-db node dict)
428                    if "_id" in value and "_label" in value:
429                        instance = self._result_to_model(value, result_type)
430                        if instance:
431                            mapped.append(instance)
432                            break
433                    # Also check if _label matches registered model
434                    elif "_label" in value:
435                        label = value["_label"]
436                        if label in self._schema_gen._node_models:
437                            model = self._schema_gen._node_models[label]
438                            instance = self._result_to_model(value, model)
439                            if instance:
440                                mapped.append(instance)
441                                break
442            else:
443                # Try the first column
444                first_value = next(iter(row.values()), None)
445                if isinstance(first_value, dict):
446                    instance = self._result_to_model(first_value, result_type)
447                    if instance:
448                        mapped.append(instance)
449
450        return mapped
451
452    @staticmethod
453    def _validate_edge_endpoints(
454        source: UniNode, target: UniNode
455    ) -> tuple[int, int, str, str]:
456        """Validate that both endpoints are persisted and return (src_vid, dst_vid, src_label, dst_label)."""
457        if not source.is_persisted:
458            raise NotPersisted(source)
459        if not target.is_persisted:
460            raise NotPersisted(target)
461        return (
462            source._vid,
463            target._vid,
464            source.__class__.__label__,
465            target.__class__.__label__,
466        )
467
468    @staticmethod
469    def _normalize_edge_properties(
470        properties: dict[str, Any] | UniEdge | None,
471    ) -> dict[str, Any]:
472        """Normalize edge properties from dict, UniEdge, or None."""
473        if isinstance(properties, UniEdge):
474            return properties.to_properties()
475        if properties:
476            return properties
477        return {}
478
479    def create_edge(
480        self,
481        source: UniNode,
482        edge_type: str,
483        target: UniNode,
484        properties: dict[str, Any] | UniEdge | None = None,
485    ) -> None:
486        """Create an edge between two nodes."""
487        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
488            source, target
489        )
490        props = self._normalize_edge_properties(properties)
491
492        # Build CREATE edge query with labels (required by Cypher implementation)
493        props_str = ", ".join(f"{k}: ${k}" for k in props)
494        if props_str:
495            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)"
496        else:
497            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)"
498
499        params = {"src": src_vid, "dst": dst_vid, **props}
500        self._db.query(cypher, params)
501
502    def delete_edge(
503        self,
504        source: UniNode,
505        edge_type: str,
506        target: UniNode,
507    ) -> int:
508        """Delete edges between two nodes. Returns the number of deleted edges."""
509        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
510            source, target
511        )
512        cypher = (
513            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
514            f"WHERE a._vid = $src AND b._vid = $dst "
515            f"DELETE r RETURN count(r) as count"
516        )
517        results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
518        return cast(int, results[0]["count"]) if results else 0
519
520    def update_edge(
521        self,
522        source: UniNode,
523        edge_type: str,
524        target: UniNode,
525        properties: dict[str, Any],
526    ) -> int:
527        """Update properties on edges between two nodes. Returns the number of updated edges."""
528        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
529            source, target
530        )
531        set_parts = [f"r.{k} = ${k}" for k in properties]
532        params: dict[str, Any] = {"src": src_vid, "dst": dst_vid, **properties}
533        cypher = (
534            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
535            f"WHERE a._vid = $src AND b._vid = $dst "
536            f"SET {', '.join(set_parts)} "
537            f"RETURN count(r) as count"
538        )
539        results = self._db.query(cypher, params)
540        return cast(int, results[0]["count"]) if results else 0
541
542    def get_edge(
543        self,
544        source: UniNode,
545        edge_type: str,
546        target: UniNode,
547        edge_model: type[EdgeT] | None = None,
548    ) -> list[dict[str, Any]] | list[EdgeT]:
549        """Get edges between two nodes. Returns dicts or edge model instances."""
550        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
551            source, target
552        )
553        cypher = (
554            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
555            f"WHERE a._vid = $src AND b._vid = $dst "
556            f"RETURN properties(r) AS _props, id(r) AS _eid"
557        )
558        results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
559
560        if edge_model is None:
561            edge_dicts: list[dict[str, Any]] = []
562            for row in results:
563                props = row.get("_props", {})
564                if isinstance(props, dict):
565                    edge_dict = dict(props)
566                    edge_dict["_eid"] = row.get("_eid")
567                    edge_dicts.append(edge_dict)
568            return edge_dicts
569
570        edges = []
571        for row in results:
572            r_data = row.get("_props", {})
573            if isinstance(r_data, dict):
574                edge = edge_model.from_properties(
575                    r_data,
576                    src_vid=src_vid,
577                    dst_vid=dst_vid,
578                    session=self,
579                )
580                edges.append(edge)
581        return edges
582
583    def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
584        """
585        Bulk-add entities using bulk_insert_vertices for performance.
586
587        Groups entities by label and uses db.bulk_insert_vertices().
588        Returns VIDs and attaches sessions.
589
590        Args:
591            entities: Sequence of UniNode instances to bulk-insert.
592
593        Returns:
594            List of assigned vertex IDs.
595
596        Raises:
597            BulkLoadError: If bulk insertion fails.
598        """
599        if not entities:
600            return []
601
602        # Group by label
603        by_label: dict[str, list[UniNode]] = {}
604        for entity in entities:
605            label = entity.__class__.__label__
606            if label not in by_label:
607                by_label[label] = []
608            by_label[label].append(entity)
609
610        all_vids: list[int] = []
611        try:
612            for label, group in by_label.items():
613                # Run before_create hooks
614                for entity in group:
615                    run_hooks(entity, _BEFORE_CREATE)
616
617                # Convert to property dicts
618                prop_dicts = [e.to_properties() for e in group]
619
620                # Bulk insert
621                vids = self._db.bulk_insert_vertices(label, prop_dicts)
622
623                # Attach sessions and record VIDs
624                for entity, vid in zip(group, vids):
625                    entity._attach_session(self, vid)
626                    self._identity_map[(label, vid)] = entity
627                    run_hooks(entity, _AFTER_CREATE)
628                    entity._mark_clean()
629
630                all_vids.extend(vids)
631        except Exception as e:
632            raise BulkLoadError(f"Bulk insert failed: {e}") from e
633
634        return all_vids
635
636    def explain(self, cypher: str) -> dict[str, Any]:
637        """Get the query execution plan without running it."""
638        return self._db.explain(cypher)
639
640    def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
641        """Run the query with profiling and return results + stats."""
642        return self._db.profile(cypher)
643
644    def save_schema(self, path: str) -> None:
645        """Save the database schema to a file."""
646        self._db.save_schema(path)
647
648    def load_schema(self, path: str) -> None:
649        """Load a database schema from a file."""
650        self._db.load_schema(path)
651
652    # -------------------------------------------------------------------------
653    # Internal Methods
654    # -------------------------------------------------------------------------
655
656    def _create_node(self, entity: UniNode) -> None:
657        """Create a node in the database."""
658        # Run before_create hooks
659        run_hooks(entity, _BEFORE_CREATE)
660
661        label = entity.__class__.__label__
662        props = entity.to_properties()
663
664        # Build CREATE query
665        props_str = ", ".join(f"{k}: ${k}" for k in props)
666        cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid"
667
668        results = self._db.query(cypher, props)
669        if results:
670            vid = results[0]["vid"]
671            entity._attach_session(self, vid)
672
673            # Add to identity map
674            self._identity_map[(label, vid)] = entity
675
676        # Run after_create hooks
677        run_hooks(entity, _AFTER_CREATE)
678        entity._mark_clean()
679
680    def _create_node_in_tx(self, entity: UniNode, tx: uni_db.Transaction) -> None:
681        """Create a node within a transaction."""
682        run_hooks(entity, _BEFORE_CREATE)
683
684        label = entity.__class__.__label__
685        props = entity.to_properties()
686
687        props_str = ", ".join(f"{k}: ${k}" for k in props)
688        cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid"
689
690        results = tx.query(cypher, props)
691        if results:
692            vid = results[0]["vid"]
693            entity._attach_session(self, vid)
694            self._identity_map[(label, vid)] = entity
695
696        run_hooks(entity, _AFTER_CREATE)
697
698    def _create_edge_in_tx(
699        self,
700        source: UniNode,
701        edge_type: str,
702        target: UniNode,
703        properties: UniEdge | None,
704        tx: uni_db.Transaction,
705    ) -> None:
706        """Create an edge within a transaction."""
707        props = properties.to_properties() if properties else {}
708        src_label = source.__class__.__label__
709        dst_label = target.__class__.__label__
710
711        props_str = ", ".join(f"{k}: ${k}" for k in props)
712        if props_str:
713            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type} {{{props_str}}}]->(b)"
714        else:
715            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type}]->(b)"
716
717        params = {"src": source._vid, "dst": target._vid, **props}
718        tx.query(cypher, params)
719
720    def _update_node(self, entity: UniNode) -> None:
721        """Update a node in the database."""
722        run_hooks(entity, _BEFORE_UPDATE)
723
724        label = entity.__class__.__label__
725
726        # Convert dirty prop values via python_to_db_value
727        try:
728            hints = get_type_hints(type(entity))
729        except Exception:
730            hints = {}
731
732        dirty_props = {}
733        for name in entity._dirty:
734            value = getattr(entity, name)
735            if name in hints:
736                value = python_to_db_value(value, hints[name])
737            dirty_props[name] = value
738
739        if not dirty_props:
740            return
741
742        set_clause = ", ".join(f"n.{k} = ${k}" for k in dirty_props)
743        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid SET {set_clause}"
744        params = {"vid": entity._vid, **dirty_props}
745
746        self._db.query(cypher, params)
747
748        run_hooks(entity, _AFTER_UPDATE)
749        entity._mark_clean()
750
751    def _delete_node(self, entity: UniNode) -> None:
752        """Delete a node from the database."""
753        run_hooks(entity, _BEFORE_DELETE)
754
755        label = entity.__class__.__label__
756        vid = entity._vid
757
758        # DETACH DELETE to also remove connected edges
759        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid DETACH DELETE n"
760        self._db.query(cypher, {"vid": vid})
761
762        # Remove from identity map
763        if vid is not None and (label, vid) in self._identity_map:
764            del self._identity_map[(label, vid)]
765
766        # Clear entity IDs
767        entity._vid = None
768        entity._uid = None
769        entity._session = None
770
771        run_hooks(entity, _AFTER_DELETE)
772
773    def _result_to_model(
774        self,
775        data: dict[str, Any],
776        model: type[NodeT],
777    ) -> NodeT | None:
778        """Convert a query result row to a model instance.
779
780        Does not mutate the input dict.
781        """
782        if not data:
783            return None
784
785        # Work on a copy
786        data = dict(data)
787
788        # Run before_load hooks
789        data = run_class_hooks(model, _BEFORE_LOAD, data) or data
790
791        # Extract _id → vid (uni-db returns _id as string or int)
792        vid = data.pop("_id", None) or data.pop("_vid", None) or data.pop("vid", None)
793        if vid is not None and not isinstance(vid, int):
794            vid = int(vid)
795
796        # Remove _label (informational)
797        data.pop("_label", None)
798
799        try:
800            instance = cast(
801                NodeT,
802                model.from_properties(
803                    data,
804                    vid=vid,
805                    session=self,
806                ),
807            )
808        except Exception:
809            # If validation fails, return None
810            return None
811
812        # Add to identity map if we have a vid
813        if vid is not None:
814            existing = self._identity_map.get((model.__label__, vid))
815            if existing is not None:
816                return cast(NodeT, existing)
817            self._identity_map[(model.__label__, vid)] = instance
818
819        # Run after_load hooks
820        run_hooks(instance, _AFTER_LOAD)
821
822        return instance
823
824    def _load_relationship(
825        self,
826        entity: UniNode,
827        descriptor: RelationshipDescriptor[Any],
828    ) -> list[UniNode] | UniNode | None:
829        """Load a relationship for an entity."""
830        if not entity.is_persisted:
831            raise NotPersisted(entity)
832
833        config = descriptor.config
834        label = entity.__class__.__label__
835        pattern = _edge_pattern(config.edge_type, config.direction)
836
837        cypher = (
838            f"MATCH (a:{label}){pattern}(b) WHERE id(a) = $vid "
839            f"RETURN properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels"
840        )
841        results = self._db.query(cypher, {"vid": entity._vid})
842
843        nodes = []
844        for row in results:
845            node_data = _row_to_node_dict(row)
846            if node_data is None:
847                continue
848            # Try to find the model for this node
849            node_label = node_data.get("_label")
850            if node_label and node_label in self._schema_gen._node_models:
851                model = self._schema_gen._node_models[node_label]
852                instance = self._result_to_model(node_data, model)
853                if instance:
854                    nodes.append(instance)
855
856        if not descriptor.is_list:
857            return nodes[0] if nodes else None
858        return nodes
859
860    def _eager_load_relationships(
861        self,
862        entities: list[NodeT],
863        relationships: list[str],
864    ) -> None:
865        """Eager load relationships for a list of entities."""
866        if not entities:
867            return
868
869        model = type(entities[0])
870        rel_configs = model.get_relationship_fields()
871
872        for rel_name in relationships:
873            if rel_name not in rel_configs:
874                continue
875
876            config = rel_configs[rel_name]
877            label = model.__label__
878            vids = [e._vid for e in entities if e._vid is not None]
879
880            if not vids:
881                continue
882
883            pattern = _edge_pattern(config.edge_type, config.direction)
884            cypher = (
885                f"MATCH (a:{label}){pattern}(b) WHERE id(a) IN $vids "
886                f"RETURN id(a) as src_vid, properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels"
887            )
888            results = self._db.query(cypher, {"vids": vids})
889
890            # Group results by source vid
891            by_source: dict[int, list[Any]] = {}
892            for row in results:
893                src_vid = row["src_vid"]
894                node_data = _row_to_node_dict(row)
895                if node_data is None:
896                    continue
897                if src_vid not in by_source:
898                    by_source[src_vid] = []
899                by_source[src_vid].append(node_data)
900
901            # Set cached values on entities
902            for entity in entities:
903                if entity._vid in by_source:
904                    related = by_source[entity._vid]
905                    cache_attr = f"_rel_cache_{rel_name}"
906                    setattr(entity, cache_attr, related)

Session for interacting with the graph database using Pydantic models.

The session manages model registration, schema synchronization, and provides CRUD operations and query building.

Example:

from uni_db import Database from uni_pydantic import UniSession

db = Database("./my_graph") session = UniSession(db) session.register(Person, Company) session.sync_schema()

alice = Person(name="Alice", age=30) session.add(alice) session.commit()

UniSession(db: Database)
181    def __init__(self, db: uni_db.Database) -> None:
182        self._db = db
183        self._schema_gen = SchemaGenerator()
184        self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = (
185            WeakValueDictionary()
186        )
187        self._pending_new: list[UniNode] = []
188        self._pending_delete: list[UniNode] = []
def close(self) -> None:
201    def close(self) -> None:
202        """Close the session and clear all pending state."""
203        self._pending_new.clear()
204        self._pending_delete.clear()

Close the session and clear all pending state.

def register( self, *models: type[UniNode] | type[UniEdge]) -> None:
206    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
207        """
208        Register model classes with the session.
209
210        Registered models can be used for schema generation and queries.
211
212        Args:
213            *models: UniNode or UniEdge subclasses to register.
214        """
215        self._schema_gen.register(*models)

Register model classes with the session.

Registered models can be used for schema generation and queries.

Args: *models: UniNode or UniEdge subclasses to register.

def sync_schema(self) -> None:
217    def sync_schema(self) -> None:
218        """
219        Synchronize database schema with registered models.
220
221        Creates labels, edge types, properties, and indexes as needed.
222        This is additive-only; it won't remove existing schema elements.
223        """
224        self._schema_gen.apply_to_database(self._db)

Synchronize database schema with registered models.

Creates labels, edge types, properties, and indexes as needed. This is additive-only; it won't remove existing schema elements.

def query(self, model: type[~NodeT]) -> QueryBuilder[~NodeT]:
226    def query(self, model: type[NodeT]) -> QueryBuilder[NodeT]:
227        """
228        Create a query builder for the given model.
229
230        Args:
231            model: The UniNode subclass to query.
232
233        Returns:
234            A QueryBuilder for constructing queries.
235        """
236        return QueryBuilder(self, model)

Create a query builder for the given model.

Args: model: The UniNode subclass to query.

Returns: A QueryBuilder for constructing queries.

def add(self, entity: UniNode) -> None:
238    def add(self, entity: UniNode) -> None:
239        """
240        Add a new entity to be persisted.
241
242        The entity will be inserted on the next commit().
243        """
244        if entity.is_persisted:
245            raise SessionError(f"Entity {entity!r} is already persisted")
246        entity._session = self
247        self._pending_new.append(entity)

Add a new entity to be persisted.

The entity will be inserted on the next commit().

def add_all(self, entities: Sequence[UniNode]) -> None:
249    def add_all(self, entities: Sequence[UniNode]) -> None:
250        """Add multiple entities to be persisted."""
251        for entity in entities:
252            self.add(entity)

Add multiple entities to be persisted.

def delete(self, entity: UniNode) -> None:
254    def delete(self, entity: UniNode) -> None:
255        """Mark an entity for deletion."""
256        if not entity.is_persisted:
257            raise NotPersisted(entity)
258        self._pending_delete.append(entity)

Mark an entity for deletion.

def get( self, model: type[~NodeT], vid: int | None = None, uid: str | None = None, **kwargs: Any) -> Optional[~NodeT]:
260    def get(
261        self,
262        model: type[NodeT],
263        vid: int | None = None,
264        uid: str | None = None,
265        **kwargs: Any,
266    ) -> NodeT | None:
267        """
268        Get an entity by ID or unique properties.
269
270        Args:
271            model: The model type to retrieve.
272            vid: Vertex ID to look up.
273            uid: Unique ID to look up.
274            **kwargs: Property equality filters.
275
276        Returns:
277            The model instance or None if not found.
278        """
279        # Check identity map first
280        if vid is not None:
281            cached = self._identity_map.get((model.__label__, vid))
282            if cached is not None:
283                return cached  # type: ignore[return-value]
284
285        # Build query
286        label = model.__label__
287        params: dict[str, Any] = {}
288
289        if vid is not None:
290            cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
291            params["vid"] = vid
292        elif uid is not None:
293            cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}"
294            params["uid"] = uid
295        elif kwargs:
296            # Validate property names
297            for k in kwargs:
298                _validate_property(k, model)
299            conditions = [f"n.{k} = ${k}" for k in kwargs]
300            cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1"
301            params.update(kwargs)
302        else:
303            raise ValueError("Must provide vid, uid, or property filters")
304
305        results = self._db.query(cypher, params)
306        if not results:
307            return None
308
309        node_data = _row_to_node_dict(results[0])
310        if node_data is None:
311            return None
312        return self._result_to_model(node_data, model)

Get an entity by ID or unique properties.

Args: model: The model type to retrieve. vid: Vertex ID to look up. uid: Unique ID to look up. **kwargs: Property equality filters.

Returns: The model instance or None if not found.

def refresh(self, entity: UniNode) -> None:
314    def refresh(self, entity: UniNode) -> None:
315        """Refresh an entity's properties from the database."""
316        if not entity.is_persisted:
317            raise NotPersisted(entity)
318
319        label = entity.__class__.__label__
320        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
321        results = self._db.query(cypher, {"vid": entity._vid})
322
323        if not results:
324            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
325
326        # Update properties
327        props = _row_to_node_dict(results[0])
328        if props is None:
329            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
330        try:
331            hints = get_type_hints(type(entity))
332        except Exception:
333            hints = {}
334
335        for field_name in entity.get_property_fields():
336            if field_name in props:
337                value = props[field_name]
338                if field_name in hints:
339                    value = db_to_python_value(value, hints[field_name])
340                setattr(entity, field_name, value)
341
342        entity._mark_clean()

Refresh an entity's properties from the database.

def commit(self) -> None:
344    def commit(self) -> None:
345        """
346        Commit all pending changes to the database.
347
348        This persists new entities, updates dirty entities,
349        and deletes marked entities.
350        """
351        # Insert new entities
352        for entity in self._pending_new:
353            self._create_node(entity)
354
355        # Update dirty entities in identity map
356        for (label, vid), entity in list(self._identity_map.items()):
357            if entity.is_dirty and entity.is_persisted:
358                self._update_node(entity)
359
360        # Delete marked entities
361        for entity in self._pending_delete:
362            self._delete_node(entity)
363
364        # Flush to storage
365        self._db.flush()
366
367        # Clear pending lists
368        self._pending_new.clear()
369        self._pending_delete.clear()

Commit all pending changes to the database.

This persists new entities, updates dirty entities, and deletes marked entities.

def rollback(self) -> None:
371    def rollback(self) -> None:
372        """Discard all pending changes."""
373        # Clear pending new — detach entities
374        for entity in self._pending_new:
375            entity._session = None
376        self._pending_new.clear()
377
378        # Clear pending deletes
379        self._pending_delete.clear()
380
381        # Invalidate dirty identity map entries
382        for entity in list(self._identity_map.values()):
383            if entity.is_dirty:
384                self.refresh(entity)

Discard all pending changes.

@contextmanager
def transaction(self) -> Iterator[UniTransaction]:
386    @contextmanager
387    def transaction(self) -> Iterator[UniTransaction]:
388        """Create a transaction context."""
389        tx = UniTransaction(self)
390        with tx:
391            yield tx

Create a transaction context.

def begin(self) -> UniTransaction:
393    def begin(self) -> UniTransaction:
394        """Begin a new transaction."""
395        tx = UniTransaction(self)
396        tx._tx = self._db.begin()
397        return tx

Begin a new transaction.

def cypher( self, query: str, params: dict[str, typing.Any] | None = None, result_type: type[~NodeT] | None = None) -> list[~NodeT] | list[dict[str, typing.Any]]:
399    def cypher(
400        self,
401        query: str,
402        params: dict[str, Any] | None = None,
403        result_type: type[NodeT] | None = None,
404    ) -> list[NodeT] | list[dict[str, Any]]:
405        """
406        Execute a raw Cypher query.
407
408        Args:
409            query: Cypher query string.
410            params: Query parameters.
411            result_type: Optional model type for result mapping.
412
413        Returns:
414            List of results (model instances if result_type provided).
415        """
416        results = self._db.query(query, params)
417
418        if result_type is None:
419            return cast(list[dict[str, Any]], results)
420
421        # Map results to model instances
422        mapped = []
423        for row in results:
424            # Try to find node data in the row
425            for key, value in row.items():
426                if isinstance(value, dict):
427                    # Check for _id/_label keys (uni-db node dict)
428                    if "_id" in value and "_label" in value:
429                        instance = self._result_to_model(value, result_type)
430                        if instance:
431                            mapped.append(instance)
432                            break
433                    # Also check if _label matches registered model
434                    elif "_label" in value:
435                        label = value["_label"]
436                        if label in self._schema_gen._node_models:
437                            model = self._schema_gen._node_models[label]
438                            instance = self._result_to_model(value, model)
439                            if instance:
440                                mapped.append(instance)
441                                break
442            else:
443                # Try the first column
444                first_value = next(iter(row.values()), None)
445                if isinstance(first_value, dict):
446                    instance = self._result_to_model(first_value, result_type)
447                    if instance:
448                        mapped.append(instance)
449
450        return mapped

Execute a raw Cypher query.

Args: query: Cypher query string. params: Query parameters. result_type: Optional model type for result mapping.

Returns: List of results (model instances if result_type provided).

def create_edge( self, source: UniNode, edge_type: str, target: UniNode, properties: dict[str, typing.Any] | UniEdge | None = None) -> None:
479    def create_edge(
480        self,
481        source: UniNode,
482        edge_type: str,
483        target: UniNode,
484        properties: dict[str, Any] | UniEdge | None = None,
485    ) -> None:
486        """Create an edge between two nodes."""
487        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
488            source, target
489        )
490        props = self._normalize_edge_properties(properties)
491
492        # Build CREATE edge query with labels (required by Cypher implementation)
493        props_str = ", ".join(f"{k}: ${k}" for k in props)
494        if props_str:
495            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)"
496        else:
497            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)"
498
499        params = {"src": src_vid, "dst": dst_vid, **props}
500        self._db.query(cypher, params)

Create an edge between two nodes.

def delete_edge( self, source: UniNode, edge_type: str, target: UniNode) -> int:
502    def delete_edge(
503        self,
504        source: UniNode,
505        edge_type: str,
506        target: UniNode,
507    ) -> int:
508        """Delete edges between two nodes. Returns the number of deleted edges."""
509        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
510            source, target
511        )
512        cypher = (
513            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
514            f"WHERE a._vid = $src AND b._vid = $dst "
515            f"DELETE r RETURN count(r) as count"
516        )
517        results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
518        return cast(int, results[0]["count"]) if results else 0

Delete edges between two nodes. Returns the number of deleted edges.

def update_edge( self, source: UniNode, edge_type: str, target: UniNode, properties: dict[str, typing.Any]) -> int:
520    def update_edge(
521        self,
522        source: UniNode,
523        edge_type: str,
524        target: UniNode,
525        properties: dict[str, Any],
526    ) -> int:
527        """Update properties on edges between two nodes. Returns the number of updated edges."""
528        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
529            source, target
530        )
531        set_parts = [f"r.{k} = ${k}" for k in properties]
532        params: dict[str, Any] = {"src": src_vid, "dst": dst_vid, **properties}
533        cypher = (
534            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
535            f"WHERE a._vid = $src AND b._vid = $dst "
536            f"SET {', '.join(set_parts)} "
537            f"RETURN count(r) as count"
538        )
539        results = self._db.query(cypher, params)
540        return cast(int, results[0]["count"]) if results else 0

Update properties on edges between two nodes. Returns the number of updated edges.

def get_edge( self, source: UniNode, edge_type: str, target: UniNode, edge_model: type[~EdgeT] | None = None) -> list[dict[str, typing.Any]] | list[~EdgeT]:
542    def get_edge(
543        self,
544        source: UniNode,
545        edge_type: str,
546        target: UniNode,
547        edge_model: type[EdgeT] | None = None,
548    ) -> list[dict[str, Any]] | list[EdgeT]:
549        """Get edges between two nodes. Returns dicts or edge model instances."""
550        src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints(
551            source, target
552        )
553        cypher = (
554            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
555            f"WHERE a._vid = $src AND b._vid = $dst "
556            f"RETURN properties(r) AS _props, id(r) AS _eid"
557        )
558        results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
559
560        if edge_model is None:
561            edge_dicts: list[dict[str, Any]] = []
562            for row in results:
563                props = row.get("_props", {})
564                if isinstance(props, dict):
565                    edge_dict = dict(props)
566                    edge_dict["_eid"] = row.get("_eid")
567                    edge_dicts.append(edge_dict)
568            return edge_dicts
569
570        edges = []
571        for row in results:
572            r_data = row.get("_props", {})
573            if isinstance(r_data, dict):
574                edge = edge_model.from_properties(
575                    r_data,
576                    src_vid=src_vid,
577                    dst_vid=dst_vid,
578                    session=self,
579                )
580                edges.append(edge)
581        return edges

Get edges between two nodes. Returns dicts or edge model instances.

def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
583    def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
584        """
585        Bulk-add entities using bulk_insert_vertices for performance.
586
587        Groups entities by label and uses db.bulk_insert_vertices().
588        Returns VIDs and attaches sessions.
589
590        Args:
591            entities: Sequence of UniNode instances to bulk-insert.
592
593        Returns:
594            List of assigned vertex IDs.
595
596        Raises:
597            BulkLoadError: If bulk insertion fails.
598        """
599        if not entities:
600            return []
601
602        # Group by label
603        by_label: dict[str, list[UniNode]] = {}
604        for entity in entities:
605            label = entity.__class__.__label__
606            if label not in by_label:
607                by_label[label] = []
608            by_label[label].append(entity)
609
610        all_vids: list[int] = []
611        try:
612            for label, group in by_label.items():
613                # Run before_create hooks
614                for entity in group:
615                    run_hooks(entity, _BEFORE_CREATE)
616
617                # Convert to property dicts
618                prop_dicts = [e.to_properties() for e in group]
619
620                # Bulk insert
621                vids = self._db.bulk_insert_vertices(label, prop_dicts)
622
623                # Attach sessions and record VIDs
624                for entity, vid in zip(group, vids):
625                    entity._attach_session(self, vid)
626                    self._identity_map[(label, vid)] = entity
627                    run_hooks(entity, _AFTER_CREATE)
628                    entity._mark_clean()
629
630                all_vids.extend(vids)
631        except Exception as e:
632            raise BulkLoadError(f"Bulk insert failed: {e}") from e
633
634        return all_vids

Bulk-add entities using bulk_insert_vertices for performance.

Groups entities by label and uses db.bulk_insert_vertices(). Returns VIDs and attaches sessions.

Args: entities: Sequence of UniNode instances to bulk-insert.

Returns: List of assigned vertex IDs.

Raises: BulkLoadError: If bulk insertion fails.

def explain(self, cypher: str) -> dict[str, typing.Any]:
636    def explain(self, cypher: str) -> dict[str, Any]:
637        """Get the query execution plan without running it."""
638        return self._db.explain(cypher)

Get the query execution plan without running it.

def profile( self, cypher: str) -> tuple[list[dict[str, typing.Any]], dict[str, typing.Any]]:
640    def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
641        """Run the query with profiling and return results + stats."""
642        return self._db.profile(cypher)

Run the query with profiling and return results + stats.

def save_schema(self, path: str) -> None:
644    def save_schema(self, path: str) -> None:
645        """Save the database schema to a file."""
646        self._db.save_schema(path)

Save the database schema to a file.

def load_schema(self, path: str) -> None:
648    def load_schema(self, path: str) -> None:
649        """Load a database schema from a file."""
650        self._db.load_schema(path)

Load a database schema from a file.

class UniTransaction:
 61class UniTransaction:
 62    """
 63    Transaction context for atomic operations.
 64
 65    Provides commit/rollback semantics for a group of operations.
 66
 67    Example:
 68        >>> with session.transaction() as tx:
 69        ...     alice = Person(name="Alice")
 70        ...     tx.add(alice)
 71        ...     # Auto-commits on success, rolls back on exception
 72    """
 73
 74    def __init__(self, session: UniSession) -> None:
 75        self._session = session
 76        self._tx: uni_db.Transaction | None = None
 77        self._pending_nodes: list[UniNode] = []
 78        self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = []
 79        self._committed = False
 80        self._rolled_back = False
 81
 82    def __enter__(self) -> UniTransaction:
 83        self._tx = self._session._db.begin()
 84        return self
 85
 86    def __exit__(
 87        self,
 88        exc_type: type[BaseException] | None,
 89        exc_val: BaseException | None,
 90        exc_tb: TracebackType | None,
 91    ) -> None:
 92        if exc_type is not None:
 93            self.rollback()
 94            return
 95        if not self._committed and not self._rolled_back:
 96            self.commit()
 97
 98    def add(self, entity: UniNode) -> None:
 99        """Add a node to be created in this transaction."""
100        self._pending_nodes.append(entity)
101
102    def create_edge(
103        self,
104        source: UniNode,
105        edge_type: str,
106        target: UniNode,
107        properties: UniEdge | None = None,
108        **kwargs: Any,
109    ) -> None:
110        """Create an edge between two nodes in this transaction."""
111        if not source.is_persisted:
112            raise NotPersisted(source)
113        if not target.is_persisted:
114            raise NotPersisted(target)
115        self._pending_edges.append((source, edge_type, target, properties))
116
117    def commit(self) -> None:
118        """Commit the transaction."""
119        if self._committed:
120            raise TransactionError("Transaction already committed")
121        if self._rolled_back:
122            raise TransactionError("Transaction already rolled back")
123
124        if self._tx is None:
125            raise TransactionError("Transaction not started")
126
127        try:
128            # Create pending nodes
129            for node in self._pending_nodes:
130                self._session._create_node_in_tx(node, self._tx)
131
132            # Create pending edges
133            for source, edge_type, target, props in self._pending_edges:
134                self._session._create_edge_in_tx(
135                    source, edge_type, target, props, self._tx
136                )
137
138            self._tx.commit()
139            self._committed = True
140
141            # Mark nodes as clean
142            for node in self._pending_nodes:
143                node._mark_clean()
144
145        except Exception as e:
146            self.rollback()
147            raise TransactionError(f"Commit failed: {e}") from e
148
149    def rollback(self) -> None:
150        """Rollback the transaction."""
151        if self._rolled_back:
152            return
153        if self._tx is not None:
154            self._tx.rollback()
155        self._rolled_back = True
156        self._pending_nodes.clear()
157        self._pending_edges.clear()

Transaction context for atomic operations.

Provides commit/rollback semantics for a group of operations.

Example:

with session.transaction() as tx: ... alice = Person(name="Alice") ... tx.add(alice) ... # Auto-commits on success, rolls back on exception

UniTransaction(session: UniSession)
74    def __init__(self, session: UniSession) -> None:
75        self._session = session
76        self._tx: uni_db.Transaction | None = None
77        self._pending_nodes: list[UniNode] = []
78        self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = []
79        self._committed = False
80        self._rolled_back = False
def add(self, entity: UniNode) -> None:
 98    def add(self, entity: UniNode) -> None:
 99        """Add a node to be created in this transaction."""
100        self._pending_nodes.append(entity)

Add a node to be created in this transaction.

def create_edge( self, source: UniNode, edge_type: str, target: UniNode, properties: UniEdge | None = None, **kwargs: Any) -> None:
102    def create_edge(
103        self,
104        source: UniNode,
105        edge_type: str,
106        target: UniNode,
107        properties: UniEdge | None = None,
108        **kwargs: Any,
109    ) -> None:
110        """Create an edge between two nodes in this transaction."""
111        if not source.is_persisted:
112            raise NotPersisted(source)
113        if not target.is_persisted:
114            raise NotPersisted(target)
115        self._pending_edges.append((source, edge_type, target, properties))

Create an edge between two nodes in this transaction.

def commit(self) -> None:
117    def commit(self) -> None:
118        """Commit the transaction."""
119        if self._committed:
120            raise TransactionError("Transaction already committed")
121        if self._rolled_back:
122            raise TransactionError("Transaction already rolled back")
123
124        if self._tx is None:
125            raise TransactionError("Transaction not started")
126
127        try:
128            # Create pending nodes
129            for node in self._pending_nodes:
130                self._session._create_node_in_tx(node, self._tx)
131
132            # Create pending edges
133            for source, edge_type, target, props in self._pending_edges:
134                self._session._create_edge_in_tx(
135                    source, edge_type, target, props, self._tx
136                )
137
138            self._tx.commit()
139            self._committed = True
140
141            # Mark nodes as clean
142            for node in self._pending_nodes:
143                node._mark_clean()
144
145        except Exception as e:
146            self.rollback()
147            raise TransactionError(f"Commit failed: {e}") from e

Commit the transaction.

def rollback(self) -> None:
149    def rollback(self) -> None:
150        """Rollback the transaction."""
151        if self._rolled_back:
152            return
153        if self._tx is not None:
154            self._tx.rollback()
155        self._rolled_back = True
156        self._pending_nodes.clear()
157        self._pending_edges.clear()

Rollback the transaction.

class AsyncUniSession:
134class AsyncUniSession:
135    """
136    Async session for interacting with the graph database.
137
138    Mirrors UniSession with async methods. Uses AsyncDatabase.
139
140    Example:
141        >>> from uni_db import AsyncDatabase
142        >>> from uni_pydantic import AsyncUniSession
143        >>>
144        >>> db = await AsyncDatabase.open("./my_graph")
145        >>> async with AsyncUniSession(db) as session:
146        ...     session.register(Person)
147        ...     await session.sync_schema()
148        ...     alice = Person(name="Alice", age=30)
149        ...     session.add(alice)
150        ...     await session.commit()
151    """
152
153    def __init__(self, db: uni_db.AsyncDatabase) -> None:
154        self._db = db
155        self._schema_gen = SchemaGenerator()
156        self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = (
157            WeakValueDictionary()
158        )
159        self._pending_new: list[UniNode] = []
160        self._pending_delete: list[UniNode] = []
161
162    async def __aenter__(self) -> AsyncUniSession:
163        return self
164
165    async def __aexit__(
166        self,
167        exc_type: type[BaseException] | None,
168        exc_val: BaseException | None,
169        exc_tb: TracebackType | None,
170    ) -> None:
171        self.close()
172
173    def close(self) -> None:
174        """Close the session and clear pending state."""
175        self._pending_new.clear()
176        self._pending_delete.clear()
177
178    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
179        """Register model classes with the session (sync)."""
180        self._schema_gen.register(*models)
181
182    async def sync_schema(self) -> None:
183        """Synchronize database schema with registered models."""
184        await self._schema_gen.async_apply_to_database(self._db)
185
186    def query(self, model: type[NodeT]) -> AsyncQueryBuilder[NodeT]:
187        """Create an async query builder for the given model."""
188        return AsyncQueryBuilder(self, model)
189
190    def add(self, entity: UniNode) -> None:
191        """Add a new entity to be persisted (sync — just collects)."""
192        if entity.is_persisted:
193            raise SessionError(f"Entity {entity!r} is already persisted")
194        entity._session = self
195        self._pending_new.append(entity)
196
197    def add_all(self, entities: Sequence[UniNode]) -> None:
198        """Add multiple entities (sync — just collects)."""
199        for entity in entities:
200            self.add(entity)
201
202    def delete(self, entity: UniNode) -> None:
203        """Mark an entity for deletion (sync — just collects)."""
204        if not entity.is_persisted:
205            raise NotPersisted(entity)
206        self._pending_delete.append(entity)
207
208    async def get(
209        self,
210        model: type[NodeT],
211        vid: int | None = None,
212        uid: str | None = None,
213        **kwargs: Any,
214    ) -> NodeT | None:
215        """Get an entity by ID or unique properties."""
216        if vid is not None:
217            cached = self._identity_map.get((model.__label__, vid))
218            if cached is not None:
219                return cached  # type: ignore[return-value]
220
221        label = model.__label__
222        params: dict[str, Any] = {}
223
224        if vid is not None:
225            cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
226            params["vid"] = vid
227        elif uid is not None:
228            cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}"
229            params["uid"] = uid
230        elif kwargs:
231            for k in kwargs:
232                _validate_property(k, model)
233            conditions = [f"n.{k} = ${k}" for k in kwargs]
234            cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1"
235            params.update(kwargs)
236        else:
237            raise ValueError("Must provide vid, uid, or property filters")
238
239        results = await self._db.query(cypher, params)
240        if not results:
241            return None
242
243        node_data = _row_to_node_dict(results[0])
244        if node_data is None:
245            return None
246        return self._result_to_model(node_data, model)
247
248    async def refresh(self, entity: UniNode) -> None:
249        """Refresh an entity's properties from the database."""
250        if not entity.is_persisted:
251            raise NotPersisted(entity)
252
253        label = entity.__class__.__label__
254        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
255        results = await self._db.query(cypher, {"vid": entity._vid})
256
257        if not results:
258            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
259
260        props = _row_to_node_dict(results[0])
261        if props is None:
262            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
263        try:
264            hints = get_type_hints(type(entity))
265        except Exception:
266            hints = {}
267
268        for field_name in entity.get_property_fields():
269            if field_name in props:
270                value = props[field_name]
271                if field_name in hints:
272                    value = db_to_python_value(value, hints[field_name])
273                setattr(entity, field_name, value)
274
275        entity._mark_clean()
276
277    async def commit(self) -> None:
278        """Commit all pending changes."""
279        for entity in self._pending_new:
280            await self._create_node(entity)
281
282        for (label, vid), entity in list(self._identity_map.items()):
283            if entity.is_dirty and entity.is_persisted:
284                await self._update_node(entity)
285
286        for entity in self._pending_delete:
287            await self._delete_node(entity)
288
289        await self._db.flush()
290        self._pending_new.clear()
291        self._pending_delete.clear()
292
293    async def rollback(self) -> None:
294        """Discard all pending changes."""
295        for entity in self._pending_new:
296            entity._session = None
297        self._pending_new.clear()
298        self._pending_delete.clear()
299        for entity in list(self._identity_map.values()):
300            if entity.is_dirty:
301                await self.refresh(entity)
302
303    async def transaction(self) -> AsyncUniTransaction:
304        """Create an async transaction. Use as `async with session.transaction() as tx:`."""
305        return AsyncUniTransaction(self)
306
307    async def cypher(
308        self,
309        query: str,
310        params: dict[str, Any] | None = None,
311        result_type: type[NodeT] | None = None,
312    ) -> list[NodeT] | list[dict[str, Any]]:
313        """Execute a raw Cypher query."""
314        results = await self._db.query(query, params)
315
316        if result_type is None:
317            return cast(list[dict[str, Any]], results)
318
319        mapped = []
320        for row in results:
321            for key, value in row.items():
322                if isinstance(value, dict):
323                    if "_id" in value and "_label" in value:
324                        instance = self._result_to_model(value, result_type)
325                        if instance:
326                            mapped.append(instance)
327                            break
328                    elif "_label" in value:
329                        label = value["_label"]
330                        if label in self._schema_gen._node_models:
331                            model = self._schema_gen._node_models[label]
332                            instance = self._result_to_model(value, model)
333                            if instance:
334                                mapped.append(instance)
335                                break
336            else:
337                first_value = next(iter(row.values()), None)
338                if isinstance(first_value, dict):
339                    instance = self._result_to_model(first_value, result_type)
340                    if instance:
341                        mapped.append(instance)
342
343        return mapped
344
345    async def create_edge(
346        self,
347        source: UniNode,
348        edge_type: str,
349        target: UniNode,
350        properties: dict[str, Any] | UniEdge | None = None,
351    ) -> None:
352        """Create an edge between two nodes."""
353        src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints(
354            source, target
355        )
356        props = UniSession._normalize_edge_properties(properties)
357
358        props_str = ", ".join(f"{k}: ${k}" for k in props)
359        if props_str:
360            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)"
361        else:
362            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)"
363
364        await self._db.query(cypher, {"src": src_vid, "dst": dst_vid, **props})
365
366    async def delete_edge(
367        self, source: UniNode, edge_type: str, target: UniNode
368    ) -> int:
369        """Delete edges between two nodes. Returns the number of deleted edges."""
370        src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints(
371            source, target
372        )
373        cypher = (
374            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
375            f"WHERE a._vid = $src AND b._vid = $dst "
376            f"DELETE r RETURN count(r) as count"
377        )
378        results = await self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
379        return cast(int, results[0]["count"]) if results else 0
380
381    async def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
382        """Bulk-add entities using bulk_insert_vertices."""
383        if not entities:
384            return []
385
386        by_label: dict[str, list[UniNode]] = {}
387        for entity in entities:
388            label = entity.__class__.__label__
389            if label not in by_label:
390                by_label[label] = []
391            by_label[label].append(entity)
392
393        all_vids: list[int] = []
394        try:
395            for label, group in by_label.items():
396                for entity in group:
397                    run_hooks(entity, _BEFORE_CREATE)
398                prop_dicts = [e.to_properties() for e in group]
399                vids = await self._db.bulk_insert_vertices(label, prop_dicts)
400                for entity, vid in zip(group, vids):
401                    entity._attach_session(self, vid)
402                    self._identity_map[(label, vid)] = entity
403                    run_hooks(entity, _AFTER_CREATE)
404                    entity._mark_clean()
405                all_vids.extend(vids)
406        except Exception as e:
407            raise BulkLoadError(f"Bulk insert failed: {e}") from e
408
409        return all_vids
410
411    async def explain(self, cypher: str) -> dict[str, Any]:
412        """Get the query execution plan."""
413        return await self._db.explain(cypher)
414
415    async def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
416        """Run the query with profiling."""
417        return await self._db.profile(cypher)
418
419    async def save_schema(self, path: str) -> None:
420        """Save the database schema to a file."""
421        await self._db.save_schema(path)
422
423    async def load_schema(self, path: str) -> None:
424        """Load a database schema from a file."""
425        await self._db.load_schema(path)
426
427    # ---- Internal methods ----
428
429    async def _create_node(self, entity: UniNode) -> None:
430        run_hooks(entity, _BEFORE_CREATE)
431        label = entity.__class__.__label__
432        props = entity.to_properties()
433        props_str = ", ".join(f"{k}: ${k}" for k in props)
434        cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid"
435        results = await self._db.query(cypher, props)
436        if results:
437            vid = results[0]["vid"]
438            entity._attach_session(self, vid)
439            self._identity_map[(label, vid)] = entity
440        run_hooks(entity, _AFTER_CREATE)
441        entity._mark_clean()
442
443    async def _create_node_in_tx(
444        self, entity: UniNode, tx: uni_db.AsyncTransaction
445    ) -> None:
446        run_hooks(entity, _BEFORE_CREATE)
447        label = entity.__class__.__label__
448        props = entity.to_properties()
449        props_str = ", ".join(f"{k}: ${k}" for k in props)
450        cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid"
451        results = await tx.query(cypher, props)
452        if results:
453            vid = results[0]["vid"]
454            entity._attach_session(self, vid)
455            self._identity_map[(label, vid)] = entity
456        run_hooks(entity, _AFTER_CREATE)
457
458    async def _create_edge_in_tx(
459        self,
460        source: UniNode,
461        edge_type: str,
462        target: UniNode,
463        properties: UniEdge | None,
464        tx: uni_db.AsyncTransaction,
465    ) -> None:
466        props = properties.to_properties() if properties else {}
467        src_label = source.__class__.__label__
468        dst_label = target.__class__.__label__
469        props_str = ", ".join(f"{k}: ${k}" for k in props)
470        if props_str:
471            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type} {{{props_str}}}]->(b)"
472        else:
473            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type}]->(b)"
474        params = {"src": source._vid, "dst": target._vid, **props}
475        await tx.query(cypher, params)
476
477    async def _update_node(self, entity: UniNode) -> None:
478        run_hooks(entity, _BEFORE_UPDATE)
479        label = entity.__class__.__label__
480        try:
481            hints = get_type_hints(type(entity))
482        except Exception:
483            hints = {}
484        dirty_props = {}
485        for name in entity._dirty:
486            value = getattr(entity, name)
487            if name in hints:
488                value = python_to_db_value(value, hints[name])
489            dirty_props[name] = value
490        if not dirty_props:
491            return
492        set_clause = ", ".join(f"n.{k} = ${k}" for k in dirty_props)
493        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid SET {set_clause}"
494        params = {"vid": entity._vid, **dirty_props}
495        await self._db.query(cypher, params)
496        run_hooks(entity, _AFTER_UPDATE)
497        entity._mark_clean()
498
499    async def _delete_node(self, entity: UniNode) -> None:
500        run_hooks(entity, _BEFORE_DELETE)
501        label = entity.__class__.__label__
502        vid = entity._vid
503        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid DETACH DELETE n"
504        await self._db.query(cypher, {"vid": vid})
505        if vid is not None and (label, vid) in self._identity_map:
506            del self._identity_map[(label, vid)]
507        entity._vid = None
508        entity._uid = None
509        entity._session = None
510        run_hooks(entity, _AFTER_DELETE)
511
512    def _result_to_model(
513        self,
514        data: dict[str, Any],
515        model: type[NodeT],
516    ) -> NodeT | None:
517        """Convert a query result row to a model instance (sync — pure dict processing)."""
518        if not data:
519            return None
520
521        data = dict(data)
522        data = run_class_hooks(model, _BEFORE_LOAD, data) or data
523
524        vid = data.pop("_id", None) or data.pop("_vid", None) or data.pop("vid", None)
525        if vid is not None and not isinstance(vid, int):
526            vid = int(vid)
527        data.pop("_label", None)
528
529        try:
530            instance = cast(
531                NodeT,
532                model.from_properties(data, vid=vid, session=self),
533            )
534        except Exception:
535            return None
536
537        if vid is not None:
538            existing = self._identity_map.get((model.__label__, vid))
539            if existing is not None:
540                return cast(NodeT, existing)
541            self._identity_map[(model.__label__, vid)] = instance
542
543        run_hooks(instance, _AFTER_LOAD)
544        return instance
545
546    def _load_relationship(
547        self,
548        entity: UniNode,
549        descriptor: RelationshipDescriptor[Any],
550    ) -> list[UniNode] | UniNode | None:
551        """Sync relationship loading — raises error for async session.
552        Use _async_load_relationship instead."""
553        raise SessionError(
554            "Cannot synchronously load relationships in an async session. "
555            "Use eager_load() or access relationships via async queries."
556        )
557
558    async def _async_eager_load_relationships(
559        self,
560        entities: list[NodeT],
561        relationships: list[str],
562    ) -> None:
563        """Eager load relationships for a list of entities (async)."""
564        if not entities:
565            return
566
567        model = type(entities[0])
568        rel_configs = model.get_relationship_fields()
569
570        for rel_name in relationships:
571            if rel_name not in rel_configs:
572                continue
573
574            config = rel_configs[rel_name]
575            label = model.__label__
576            vids = [e._vid for e in entities if e._vid is not None]
577
578            if not vids:
579                continue
580
581            pattern = _edge_pattern(config.edge_type, config.direction)
582            cypher = (
583                f"MATCH (a:{label}){pattern}(b) WHERE id(a) IN $vids "
584                f"RETURN id(a) as src_vid, properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels"
585            )
586            results = await self._db.query(cypher, {"vids": vids})
587
588            by_source: dict[int, list[Any]] = {}
589            for row in results:
590                src_vid = row["src_vid"]
591                node_data = _row_to_node_dict(row)
592                if node_data is None:
593                    continue
594                if src_vid not in by_source:
595                    by_source[src_vid] = []
596                by_source[src_vid].append(node_data)
597
598            for entity in entities:
599                if entity._vid in by_source:
600                    related = by_source[entity._vid]
601                    cache_attr = f"_rel_cache_{rel_name}"
602                    setattr(entity, cache_attr, related)

Async session for interacting with the graph database.

Mirrors UniSession with async methods. Uses AsyncDatabase.

Example:

from uni_db import AsyncDatabase from uni_pydantic import AsyncUniSession

db = await AsyncDatabase.open("./my_graph") async with AsyncUniSession(db) as session: ... session.register(Person) ... await session.sync_schema() ... alice = Person(name="Alice", age=30) ... session.add(alice) ... await session.commit()

AsyncUniSession(db: AsyncDatabase)
153    def __init__(self, db: uni_db.AsyncDatabase) -> None:
154        self._db = db
155        self._schema_gen = SchemaGenerator()
156        self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = (
157            WeakValueDictionary()
158        )
159        self._pending_new: list[UniNode] = []
160        self._pending_delete: list[UniNode] = []
def close(self) -> None:
173    def close(self) -> None:
174        """Close the session and clear pending state."""
175        self._pending_new.clear()
176        self._pending_delete.clear()

Close the session and clear pending state.

def register( self, *models: type[UniNode] | type[UniEdge]) -> None:
178    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
179        """Register model classes with the session (sync)."""
180        self._schema_gen.register(*models)

Register model classes with the session (sync).

async def sync_schema(self) -> None:
182    async def sync_schema(self) -> None:
183        """Synchronize database schema with registered models."""
184        await self._schema_gen.async_apply_to_database(self._db)

Synchronize database schema with registered models.

def query( self, model: type[~NodeT]) -> AsyncQueryBuilder[~NodeT]:
186    def query(self, model: type[NodeT]) -> AsyncQueryBuilder[NodeT]:
187        """Create an async query builder for the given model."""
188        return AsyncQueryBuilder(self, model)

Create an async query builder for the given model.

def add(self, entity: UniNode) -> None:
190    def add(self, entity: UniNode) -> None:
191        """Add a new entity to be persisted (sync — just collects)."""
192        if entity.is_persisted:
193            raise SessionError(f"Entity {entity!r} is already persisted")
194        entity._session = self
195        self._pending_new.append(entity)

Add a new entity to be persisted (sync — just collects).

def add_all(self, entities: Sequence[UniNode]) -> None:
197    def add_all(self, entities: Sequence[UniNode]) -> None:
198        """Add multiple entities (sync — just collects)."""
199        for entity in entities:
200            self.add(entity)

Add multiple entities (sync — just collects).

def delete(self, entity: UniNode) -> None:
202    def delete(self, entity: UniNode) -> None:
203        """Mark an entity for deletion (sync — just collects)."""
204        if not entity.is_persisted:
205            raise NotPersisted(entity)
206        self._pending_delete.append(entity)

Mark an entity for deletion (sync — just collects).

async def get( self, model: type[~NodeT], vid: int | None = None, uid: str | None = None, **kwargs: Any) -> Optional[~NodeT]:
208    async def get(
209        self,
210        model: type[NodeT],
211        vid: int | None = None,
212        uid: str | None = None,
213        **kwargs: Any,
214    ) -> NodeT | None:
215        """Get an entity by ID or unique properties."""
216        if vid is not None:
217            cached = self._identity_map.get((model.__label__, vid))
218            if cached is not None:
219                return cached  # type: ignore[return-value]
220
221        label = model.__label__
222        params: dict[str, Any] = {}
223
224        if vid is not None:
225            cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
226            params["vid"] = vid
227        elif uid is not None:
228            cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}"
229            params["uid"] = uid
230        elif kwargs:
231            for k in kwargs:
232                _validate_property(k, model)
233            conditions = [f"n.{k} = ${k}" for k in kwargs]
234            cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1"
235            params.update(kwargs)
236        else:
237            raise ValueError("Must provide vid, uid, or property filters")
238
239        results = await self._db.query(cypher, params)
240        if not results:
241            return None
242
243        node_data = _row_to_node_dict(results[0])
244        if node_data is None:
245            return None
246        return self._result_to_model(node_data, model)

Get an entity by ID or unique properties.

async def refresh(self, entity: UniNode) -> None:
248    async def refresh(self, entity: UniNode) -> None:
249        """Refresh an entity's properties from the database."""
250        if not entity.is_persisted:
251            raise NotPersisted(entity)
252
253        label = entity.__class__.__label__
254        cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}"
255        results = await self._db.query(cypher, {"vid": entity._vid})
256
257        if not results:
258            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
259
260        props = _row_to_node_dict(results[0])
261        if props is None:
262            raise SessionError(f"Entity with vid={entity._vid} no longer exists")
263        try:
264            hints = get_type_hints(type(entity))
265        except Exception:
266            hints = {}
267
268        for field_name in entity.get_property_fields():
269            if field_name in props:
270                value = props[field_name]
271                if field_name in hints:
272                    value = db_to_python_value(value, hints[field_name])
273                setattr(entity, field_name, value)
274
275        entity._mark_clean()

Refresh an entity's properties from the database.

async def commit(self) -> None:
277    async def commit(self) -> None:
278        """Commit all pending changes."""
279        for entity in self._pending_new:
280            await self._create_node(entity)
281
282        for (label, vid), entity in list(self._identity_map.items()):
283            if entity.is_dirty and entity.is_persisted:
284                await self._update_node(entity)
285
286        for entity in self._pending_delete:
287            await self._delete_node(entity)
288
289        await self._db.flush()
290        self._pending_new.clear()
291        self._pending_delete.clear()

Commit all pending changes.

async def rollback(self) -> None:
293    async def rollback(self) -> None:
294        """Discard all pending changes."""
295        for entity in self._pending_new:
296            entity._session = None
297        self._pending_new.clear()
298        self._pending_delete.clear()
299        for entity in list(self._identity_map.values()):
300            if entity.is_dirty:
301                await self.refresh(entity)

Discard all pending changes.

async def transaction(self) -> AsyncUniTransaction:
303    async def transaction(self) -> AsyncUniTransaction:
304        """Create an async transaction. Use as `async with session.transaction() as tx:`."""
305        return AsyncUniTransaction(self)

Create an async transaction. Use as async with session.transaction() as tx:.

async def cypher( self, query: str, params: dict[str, typing.Any] | None = None, result_type: type[~NodeT] | None = None) -> list[~NodeT] | list[dict[str, typing.Any]]:
307    async def cypher(
308        self,
309        query: str,
310        params: dict[str, Any] | None = None,
311        result_type: type[NodeT] | None = None,
312    ) -> list[NodeT] | list[dict[str, Any]]:
313        """Execute a raw Cypher query."""
314        results = await self._db.query(query, params)
315
316        if result_type is None:
317            return cast(list[dict[str, Any]], results)
318
319        mapped = []
320        for row in results:
321            for key, value in row.items():
322                if isinstance(value, dict):
323                    if "_id" in value and "_label" in value:
324                        instance = self._result_to_model(value, result_type)
325                        if instance:
326                            mapped.append(instance)
327                            break
328                    elif "_label" in value:
329                        label = value["_label"]
330                        if label in self._schema_gen._node_models:
331                            model = self._schema_gen._node_models[label]
332                            instance = self._result_to_model(value, model)
333                            if instance:
334                                mapped.append(instance)
335                                break
336            else:
337                first_value = next(iter(row.values()), None)
338                if isinstance(first_value, dict):
339                    instance = self._result_to_model(first_value, result_type)
340                    if instance:
341                        mapped.append(instance)
342
343        return mapped

Execute a raw Cypher query.

async def create_edge( self, source: UniNode, edge_type: str, target: UniNode, properties: dict[str, typing.Any] | UniEdge | None = None) -> None:
345    async def create_edge(
346        self,
347        source: UniNode,
348        edge_type: str,
349        target: UniNode,
350        properties: dict[str, Any] | UniEdge | None = None,
351    ) -> None:
352        """Create an edge between two nodes."""
353        src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints(
354            source, target
355        )
356        props = UniSession._normalize_edge_properties(properties)
357
358        props_str = ", ".join(f"{k}: ${k}" for k in props)
359        if props_str:
360            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)"
361        else:
362            cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)"
363
364        await self._db.query(cypher, {"src": src_vid, "dst": dst_vid, **props})

Create an edge between two nodes.

async def delete_edge( self, source: UniNode, edge_type: str, target: UniNode) -> int:
366    async def delete_edge(
367        self, source: UniNode, edge_type: str, target: UniNode
368    ) -> int:
369        """Delete edges between two nodes. Returns the number of deleted edges."""
370        src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints(
371            source, target
372        )
373        cypher = (
374            f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) "
375            f"WHERE a._vid = $src AND b._vid = $dst "
376            f"DELETE r RETURN count(r) as count"
377        )
378        results = await self._db.query(cypher, {"src": src_vid, "dst": dst_vid})
379        return cast(int, results[0]["count"]) if results else 0

Delete edges between two nodes. Returns the number of deleted edges.

async def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
381    async def bulk_add(self, entities: Sequence[UniNode]) -> list[int]:
382        """Bulk-add entities using bulk_insert_vertices."""
383        if not entities:
384            return []
385
386        by_label: dict[str, list[UniNode]] = {}
387        for entity in entities:
388            label = entity.__class__.__label__
389            if label not in by_label:
390                by_label[label] = []
391            by_label[label].append(entity)
392
393        all_vids: list[int] = []
394        try:
395            for label, group in by_label.items():
396                for entity in group:
397                    run_hooks(entity, _BEFORE_CREATE)
398                prop_dicts = [e.to_properties() for e in group]
399                vids = await self._db.bulk_insert_vertices(label, prop_dicts)
400                for entity, vid in zip(group, vids):
401                    entity._attach_session(self, vid)
402                    self._identity_map[(label, vid)] = entity
403                    run_hooks(entity, _AFTER_CREATE)
404                    entity._mark_clean()
405                all_vids.extend(vids)
406        except Exception as e:
407            raise BulkLoadError(f"Bulk insert failed: {e}") from e
408
409        return all_vids

Bulk-add entities using bulk_insert_vertices.

async def explain(self, cypher: str) -> dict[str, typing.Any]:
411    async def explain(self, cypher: str) -> dict[str, Any]:
412        """Get the query execution plan."""
413        return await self._db.explain(cypher)

Get the query execution plan.

async def profile( self, cypher: str) -> tuple[list[dict[str, typing.Any]], dict[str, typing.Any]]:
415    async def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]:
416        """Run the query with profiling."""
417        return await self._db.profile(cypher)

Run the query with profiling.

async def save_schema(self, path: str) -> None:
419    async def save_schema(self, path: str) -> None:
420        """Save the database schema to a file."""
421        await self._db.save_schema(path)

Save the database schema to a file.

async def load_schema(self, path: str) -> None:
423    async def load_schema(self, path: str) -> None:
424        """Load a database schema from a file."""
425        await self._db.load_schema(path)

Load a database schema from a file.

class AsyncUniTransaction:
 54class AsyncUniTransaction:
 55    """Async transaction context for atomic operations."""
 56
 57    def __init__(self, session: AsyncUniSession) -> None:
 58        self._session = session
 59        self._tx: uni_db.AsyncTransaction | None = None
 60        self._pending_nodes: list[UniNode] = []
 61        self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = []
 62        self._committed = False
 63        self._rolled_back = False
 64
 65    async def __aenter__(self) -> AsyncUniTransaction:
 66        self._tx = await self._session._db.begin()
 67        return self
 68
 69    async def __aexit__(
 70        self,
 71        exc_type: type[BaseException] | None,
 72        exc_val: BaseException | None,
 73        exc_tb: TracebackType | None,
 74    ) -> None:
 75        if exc_type is not None:
 76            await self.rollback()
 77            return
 78        if not self._committed and not self._rolled_back:
 79            await self.commit()
 80
 81    def add(self, entity: UniNode) -> None:
 82        """Add a node to be created in this transaction (sync — just collects)."""
 83        self._pending_nodes.append(entity)
 84
 85    def create_edge(
 86        self,
 87        source: UniNode,
 88        edge_type: str,
 89        target: UniNode,
 90        properties: UniEdge | None = None,
 91    ) -> None:
 92        """Create an edge between two nodes in this transaction (sync — just collects)."""
 93        if not source.is_persisted:
 94            raise NotPersisted(source)
 95        if not target.is_persisted:
 96            raise NotPersisted(target)
 97        self._pending_edges.append((source, edge_type, target, properties))
 98
 99    async def commit(self) -> None:
100        """Commit the transaction."""
101        if self._committed:
102            raise TransactionError("Transaction already committed")
103        if self._rolled_back:
104            raise TransactionError("Transaction already rolled back")
105        if self._tx is None:
106            raise TransactionError("Transaction not started")
107
108        try:
109            for node in self._pending_nodes:
110                await self._session._create_node_in_tx(node, self._tx)
111            for source, edge_type, target, props in self._pending_edges:
112                await self._session._create_edge_in_tx(
113                    source, edge_type, target, props, self._tx
114                )
115            await self._tx.commit()
116            self._committed = True
117            for node in self._pending_nodes:
118                node._mark_clean()
119        except Exception as e:
120            await self.rollback()
121            raise TransactionError(f"Commit failed: {e}") from e
122
123    async def rollback(self) -> None:
124        """Rollback the transaction."""
125        if self._rolled_back:
126            return
127        if self._tx is not None:
128            await self._tx.rollback()
129        self._rolled_back = True
130        self._pending_nodes.clear()
131        self._pending_edges.clear()

Async transaction context for atomic operations.

AsyncUniTransaction(session: AsyncUniSession)
57    def __init__(self, session: AsyncUniSession) -> None:
58        self._session = session
59        self._tx: uni_db.AsyncTransaction | None = None
60        self._pending_nodes: list[UniNode] = []
61        self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = []
62        self._committed = False
63        self._rolled_back = False
def add(self, entity: UniNode) -> None:
81    def add(self, entity: UniNode) -> None:
82        """Add a node to be created in this transaction (sync — just collects)."""
83        self._pending_nodes.append(entity)

Add a node to be created in this transaction (sync — just collects).

def create_edge( self, source: UniNode, edge_type: str, target: UniNode, properties: UniEdge | None = None) -> None:
85    def create_edge(
86        self,
87        source: UniNode,
88        edge_type: str,
89        target: UniNode,
90        properties: UniEdge | None = None,
91    ) -> None:
92        """Create an edge between two nodes in this transaction (sync — just collects)."""
93        if not source.is_persisted:
94            raise NotPersisted(source)
95        if not target.is_persisted:
96            raise NotPersisted(target)
97        self._pending_edges.append((source, edge_type, target, properties))

Create an edge between two nodes in this transaction (sync — just collects).

async def commit(self) -> None:
 99    async def commit(self) -> None:
100        """Commit the transaction."""
101        if self._committed:
102            raise TransactionError("Transaction already committed")
103        if self._rolled_back:
104            raise TransactionError("Transaction already rolled back")
105        if self._tx is None:
106            raise TransactionError("Transaction not started")
107
108        try:
109            for node in self._pending_nodes:
110                await self._session._create_node_in_tx(node, self._tx)
111            for source, edge_type, target, props in self._pending_edges:
112                await self._session._create_edge_in_tx(
113                    source, edge_type, target, props, self._tx
114                )
115            await self._tx.commit()
116            self._committed = True
117            for node in self._pending_nodes:
118                node._mark_clean()
119        except Exception as e:
120            await self.rollback()
121            raise TransactionError(f"Commit failed: {e}") from e

Commit the transaction.

async def rollback(self) -> None:
123    async def rollback(self) -> None:
124        """Rollback the transaction."""
125        if self._rolled_back:
126            return
127        if self._tx is not None:
128            await self._tx.rollback()
129        self._rolled_back = True
130        self._pending_nodes.clear()
131        self._pending_edges.clear()

Rollback the transaction.

def Field( default: Any = Ellipsis, *, default_factory: Callable[[], typing.Any] | None = None, alias: str | None = None, title: str | None = None, description: str | None = None, examples: list[typing.Any] | None = None, exclude: bool = False, json_schema_extra: dict[str, typing.Any] | None = None, index: Optional[Literal['btree', 'hash', 'fulltext', 'vector']] = None, unique: bool = False, tokenizer: str | None = None, metric: Optional[Literal['l2', 'cosine', 'dot']] = None, generated: str | None = None) -> Any:
 69def Field(
 70    default: Any = ...,
 71    *,
 72    default_factory: Callable[[], Any] | None = None,
 73    alias: str | None = None,
 74    title: str | None = None,
 75    description: str | None = None,
 76    examples: list[Any] | None = None,
 77    exclude: bool = False,
 78    json_schema_extra: dict[str, Any] | None = None,
 79    # Uni-specific options
 80    index: IndexType | None = None,
 81    unique: bool = False,
 82    tokenizer: str | None = None,
 83    metric: VectorMetric | None = None,
 84    generated: str | None = None,
 85) -> Any:
 86    """
 87    Create a field with uni-pydantic configuration.
 88
 89    This extends Pydantic's Field with graph database options.
 90
 91    Args:
 92        default: Default value for the field.
 93        default_factory: Factory function for default value.
 94        alias: Field alias for serialization.
 95        title: Human-readable title.
 96        description: Field description.
 97        examples: Example values.
 98        exclude: Exclude from serialization.
 99        json_schema_extra: Extra JSON schema properties.
100        index: Index type ("btree", "hash", "fulltext", "vector").
101        unique: Whether to create a unique constraint.
102        tokenizer: Tokenizer for fulltext index (default: "standard").
103        metric: Distance metric for vector index ("l2", "cosine", "dot").
104        generated: Expression for generated/computed property.
105
106    Returns:
107        A Pydantic FieldInfo with uni-pydantic metadata attached.
108
109    Examples:
110        >>> class Person(UniNode):
111        ...     name: str = Field(index="btree")
112        ...     email: str = Field(unique=True)
113        ...     bio: str = Field(index="fulltext", tokenizer="standard")
114        ...     embedding: Vector[768] = Field(metric="cosine")
115    """
116    # Default tokenizer for fulltext indexes
117    if index == "fulltext" and tokenizer is None:
118        tokenizer = "standard"
119
120    # Store uni config in json_schema_extra
121    uni_config = FieldConfig(
122        index=index,
123        unique=unique,
124        tokenizer=tokenizer,
125        metric=metric,
126        generated=generated,
127        default=default,
128        default_factory=default_factory,
129        alias=alias,
130        title=title,
131        description=description,
132        examples=examples,
133        exclude=exclude,
134        json_schema_extra=json_schema_extra,
135    )
136
137    # Merge uni config into json_schema_extra
138    extra = json_schema_extra or {}
139    extra["uni_config"] = uni_config
140
141    # Create Pydantic FieldInfo
142    from pydantic.fields import FieldInfo as PydanticFieldInfo
143
144    if default_factory is not None:
145        return PydanticFieldInfo(
146            default_factory=default_factory,
147            alias=alias,
148            title=title,
149            description=description,
150            examples=examples,
151            exclude=exclude,
152            json_schema_extra=extra,
153        )
154    elif default is not ...:
155        return PydanticFieldInfo(
156            default=default,
157            alias=alias,
158            title=title,
159            description=description,
160            examples=examples,
161            exclude=exclude,
162            json_schema_extra=extra,
163        )
164    else:
165        return PydanticFieldInfo(
166            alias=alias,
167            title=title,
168            description=description,
169            examples=examples,
170            exclude=exclude,
171            json_schema_extra=extra,
172        )

Create a field with uni-pydantic configuration.

This extends Pydantic's Field with graph database options.

Args: default: Default value for the field. default_factory: Factory function for default value. alias: Field alias for serialization. title: Human-readable title. description: Field description. examples: Example values. exclude: Exclude from serialization. json_schema_extra: Extra JSON schema properties. index: Index type ("btree", "hash", "fulltext", "vector"). unique: Whether to create a unique constraint. tokenizer: Tokenizer for fulltext index (default: "standard"). metric: Distance metric for vector index ("l2", "cosine", "dot"). generated: Expression for generated/computed property.

Returns: A Pydantic FieldInfo with uni-pydantic metadata attached.

Examples:

class Person(UniNode): ... name: str = Field(index="btree") ... email: str = Field(unique=True) ... bio: str = Field(index="fulltext", tokenizer="standard") ... embedding: Vector[768] = Field(metric="cosine")

@dataclass
class FieldConfig:
41@dataclass
42class FieldConfig:
43    """Configuration for a uni-pydantic field."""
44
45    # Index configuration
46    index: IndexType | None = None
47    unique: bool = False
48
49    # Fulltext index options
50    tokenizer: str | None = None
51
52    # Vector index options
53    metric: VectorMetric | None = None
54
55    # Generated/computed property
56    generated: str | None = None
57
58    # Pydantic field options (passed through)
59    default: Any = dataclass_field(default_factory=lambda: ...)
60    default_factory: Callable[[], Any] | None = None
61    alias: str | None = None
62    title: str | None = None
63    description: str | None = None
64    examples: list[Any] | None = None
65    exclude: bool = False
66    json_schema_extra: dict[str, Any] | None = None

Configuration for a uni-pydantic field.

FieldConfig( index: Optional[Literal['btree', 'hash', 'fulltext', 'vector']] = None, unique: bool = False, tokenizer: str | None = None, metric: Optional[Literal['l2', 'cosine', 'dot']] = None, generated: str | None = None, default: Any = <factory>, default_factory: Callable[[], typing.Any] | None = None, alias: str | None = None, title: str | None = None, description: str | None = None, examples: list[typing.Any] | None = None, exclude: bool = False, json_schema_extra: dict[str, typing.Any] | None = None)
index: Optional[Literal['btree', 'hash', 'fulltext', 'vector']] = None
unique: bool = False
tokenizer: str | None = None
metric: Optional[Literal['l2', 'cosine', 'dot']] = None
generated: str | None = None
default: Any
default_factory: Callable[[], typing.Any] | None = None
alias: str | None = None
title: str | None = None
description: str | None = None
examples: list[typing.Any] | None = None
exclude: bool = False
json_schema_extra: dict[str, typing.Any] | None = None
def Relationship( edge_type: str, *, direction: Literal['outgoing', 'incoming', 'both'] = 'outgoing', edge_model: type[UniEdge] | None = None, eager: bool = False, cascade_delete: bool = False) -> Any:
272def Relationship(
273    edge_type: str,
274    *,
275    direction: Direction = "outgoing",
276    edge_model: type[UniEdge] | None = None,
277    eager: bool = False,
278    cascade_delete: bool = False,
279) -> Any:
280    """
281    Declare a relationship to another node type.
282
283    Relationships are lazy-loaded by default. Use eager=True or
284    query.eager_load() to load them with the parent query.
285
286    Args:
287        edge_type: The edge type name (e.g., "FRIEND_OF", "WORKS_AT").
288        direction: Relationship direction:
289            - "outgoing": Follow edges from this node (default)
290            - "incoming": Follow edges to this node
291            - "both": Follow edges in both directions
292        edge_model: Optional UniEdge subclass for typed edge properties.
293        eager: Whether to eager-load this relationship by default.
294        cascade_delete: Whether to delete related edges when this node is deleted.
295
296    Returns:
297        A RelationshipDescriptor that will be processed during model creation.
298
299    Examples:
300        >>> class Person(UniNode):
301        ...     # Outgoing relationship (default)
302        ...     follows: list["Person"] = Relationship("FOLLOWS")
303        ...
304        ...     # Incoming relationship
305        ...     followers: list["Person"] = Relationship("FOLLOWS", direction="incoming")
306        ...
307        ...     # Single optional relationship
308        ...     manager: "Person | None" = Relationship("REPORTS_TO")
309        ...
310        ...     # Relationship with edge properties
311        ...     friendships: list[tuple["Person", FriendshipEdge]] = Relationship(
312        ...         "FRIEND_OF",
313        ...         edge_model=FriendshipEdge
314        ...     )
315    """
316    config = RelationshipConfig(
317        edge_type=edge_type,
318        direction=direction,
319        edge_model=edge_model,
320        eager=eager,
321        cascade_delete=cascade_delete,
322    )
323    # Return a marker that will be processed by the metaclass
324    return _RelationshipMarker(config)

Declare a relationship to another node type.

Relationships are lazy-loaded by default. Use eager=True or query.eager_load() to load them with the parent query.

Args: edge_type: The edge type name (e.g., "FRIEND_OF", "WORKS_AT"). direction: Relationship direction: - "outgoing": Follow edges from this node (default) - "incoming": Follow edges to this node - "both": Follow edges in both directions edge_model: Optional UniEdge subclass for typed edge properties. eager: Whether to eager-load this relationship by default. cascade_delete: Whether to delete related edges when this node is deleted.

Returns: A RelationshipDescriptor that will be processed during model creation.

Examples:

class Person(UniNode): ... # Outgoing relationship (default) ... follows: list["Person"] = Relationship("FOLLOWS") ... ... # Incoming relationship ... followers: list["Person"] = Relationship("FOLLOWS", direction="incoming") ... ... # Single optional relationship ... manager: "Person | None" = Relationship("REPORTS_TO") ... ... # Relationship with edge properties ... friendships: list[tuple["Person", FriendshipEdge]] = Relationship( ... "FRIEND_OF", ... edge_model=FriendshipEdge ... )

@dataclass
class RelationshipConfig:
185@dataclass
186class RelationshipConfig:
187    """Configuration for a relationship field."""
188
189    edge_type: str
190    direction: Direction = "outgoing"
191    edge_model: type[UniEdge] | None = None
192    eager: bool = False
193    cascade_delete: bool = False

Configuration for a relationship field.

RelationshipConfig( edge_type: str, direction: Literal['outgoing', 'incoming', 'both'] = 'outgoing', edge_model: type[UniEdge] | None = None, eager: bool = False, cascade_delete: bool = False)
edge_type: str
direction: Literal['outgoing', 'incoming', 'both'] = 'outgoing'
edge_model: type[UniEdge] | None = None
eager: bool = False
cascade_delete: bool = False
class RelationshipDescriptor(typing.Generic[~NodeT]):
196class RelationshipDescriptor(Generic[NodeT]):
197    """
198    Descriptor for relationship fields that enables lazy loading.
199
200    When accessed on an instance, it returns the related nodes.
201    When accessed on the class, it returns the descriptor for query building.
202    """
203
204    def __init__(
205        self,
206        config: RelationshipConfig,
207        field_name: str,
208        target_type: type[NodeT] | str | None = None,
209        is_list: bool = True,
210    ) -> None:
211        self.config = config
212        self.field_name = field_name
213        self.target_type = target_type
214        self.is_list = is_list
215        self._cache_attr = f"_rel_cache_{field_name}"
216
217    def __set_name__(self, owner: type, name: str) -> None:
218        self.field_name = name
219        self._cache_attr = f"_rel_cache_{name}"
220
221    @overload
222    def __get__(
223        self, obj: None, objtype: type[NodeT]
224    ) -> RelationshipDescriptor[NodeT]: ...
225
226    @overload
227    def __get__(
228        self, obj: NodeT, objtype: type[NodeT] | None = None
229    ) -> list[NodeT] | NodeT | None: ...
230
231    def __get__(
232        self, obj: NodeT | None, objtype: type[NodeT] | None = None
233    ) -> RelationshipDescriptor[NodeT] | list[NodeT] | NodeT | None:
234        if obj is None:
235            # Class-level access returns the descriptor
236            return self
237
238        # Instance-level access - check cache first
239        if hasattr(obj, self._cache_attr):
240            cached = getattr(obj, self._cache_attr)
241            return cast("list[NodeT] | NodeT | None", cached)
242
243        # Check if we have a session for lazy loading
244        session = getattr(obj, "_session", None)
245        if session is None:
246            from .exceptions import LazyLoadError
247
248            raise LazyLoadError(
249                self.field_name,
250                "No session attached. Use session.get() or enable eager loading.",
251            )
252
253        # Lazy load the relationship
254        result = session._load_relationship(obj, self)
255
256        # Cache the result
257        setattr(obj, self._cache_attr, result)
258        return cast("list[NodeT] | NodeT | None", result)
259
260    def __set__(self, obj: NodeT, value: list[NodeT] | NodeT | None) -> None:
261        # Allow setting the cached value (e.g., during eager loading)
262        setattr(obj, self._cache_attr, value)
263
264    def __repr__(self) -> str:
265        return f"Relationship({self.config.edge_type!r}, direction={self.config.direction!r})"

Descriptor for relationship fields that enables lazy loading.

When accessed on an instance, it returns the related nodes. When accessed on the class, it returns the descriptor for query building.

RelationshipDescriptor( config: RelationshipConfig, field_name: str, target_type: type[~NodeT] | str | None = None, is_list: bool = True)
204    def __init__(
205        self,
206        config: RelationshipConfig,
207        field_name: str,
208        target_type: type[NodeT] | str | None = None,
209        is_list: bool = True,
210    ) -> None:
211        self.config = config
212        self.field_name = field_name
213        self.target_type = target_type
214        self.is_list = is_list
215        self._cache_attr = f"_rel_cache_{field_name}"
config
field_name
target_type
is_list
def get_field_config( field_info: pydantic.fields.FieldInfo) -> FieldConfig | None:
175def get_field_config(field_info: FieldInfo) -> FieldConfig | None:
176    """Extract uni-pydantic config from a Pydantic FieldInfo."""
177    extra = field_info.json_schema_extra
178    if isinstance(extra, dict):
179        config = extra.get("uni_config")
180        if isinstance(config, FieldConfig):
181            return config
182    return None

Extract uni-pydantic config from a Pydantic FieldInfo.

IndexType = typing.Literal['btree', 'hash', 'fulltext', 'vector']
Direction = typing.Literal['outgoing', 'incoming', 'both']
VectorMetric = typing.Literal['l2', 'cosine', 'dot']
class Vector(typing.Generic[~N]):
 54class Vector(Generic[N], metaclass=VectorMeta):
 55    """
 56    A vector type with fixed dimensions for embeddings.
 57
 58    Usage:
 59        embedding: Vector[1536]  # 1536-dimensional vector
 60
 61    At runtime, vectors are stored as list[float].
 62    """
 63
 64    __dimensions__: int = 0
 65    __origin__: type | None = None
 66
 67    def __init__(self, values: list[float]) -> None:
 68        expected = self.__class__.__dimensions__
 69        if expected > 0 and len(values) != expected:
 70            raise ValueError(f"Vector expects {expected} dimensions, got {len(values)}")
 71        self._values = values
 72
 73    @property
 74    def values(self) -> list[float]:
 75        return self._values
 76
 77    def __repr__(self) -> str:
 78        dims = self.__class__.__dimensions__
 79        return (
 80            f"Vector[{dims}]({self._values[:3]}...)"
 81            if len(self._values) > 3
 82            else f"Vector[{dims}]({self._values})"
 83        )
 84
 85    def __eq__(self, other: object) -> bool:
 86        if isinstance(other, Vector):
 87            return self._values == other._values
 88        if isinstance(other, list):
 89            return self._values == other
 90        return False
 91
 92    def __len__(self) -> int:
 93        return len(self._values)
 94
 95    def __iter__(self):  # type: ignore[no-untyped-def]
 96        return iter(self._values)
 97
 98    @classmethod
 99    def __get_pydantic_core_schema__(
100        cls, source_type: Any, handler: GetCoreSchemaHandler
101    ) -> CoreSchema:
102        """Make Vector compatible with Pydantic v2."""
103        dimensions = getattr(source_type, "__dimensions__", 0)
104        vec_cls = source_type if dimensions > 0 else cls
105
106        def validate_vector(v: Any) -> Vector:  # type: ignore[type-arg]
107            if isinstance(v, Vector):
108                if dimensions > 0 and len(v) != dimensions:
109                    raise ValueError(
110                        f"Vector expects {dimensions} dimensions, got {len(v)}"
111                    )
112                return v
113            if isinstance(v, list):
114                if dimensions > 0 and len(v) != dimensions:
115                    raise ValueError(
116                        f"Vector expects {dimensions} dimensions, got {len(v)}"
117                    )
118                return vec_cls([float(x) for x in v])
119            raise TypeError(f"Expected list or Vector, got {type(v)}")
120
121        return core_schema.no_info_plain_validator_function(
122            validate_vector,
123            serialization=core_schema.plain_serializer_function_ser_schema(
124                lambda v: v.values if isinstance(v, Vector) else list(v),
125                info_arg=False,
126            ),
127        )

A vector type with fixed dimensions for embeddings.

Usage: embedding: Vector[1536] # 1536-dimensional vector

At runtime, vectors are stored as list[float].

Vector(values: list[float])
67    def __init__(self, values: list[float]) -> None:
68        expected = self.__class__.__dimensions__
69        if expected > 0 and len(values) != expected:
70            raise ValueError(f"Vector expects {expected} dimensions, got {len(values)}")
71        self._values = values
values: list[float]
73    @property
74    def values(self) -> list[float]:
75        return self._values
def python_type_to_uni(type_hint: Any, *, nullable: bool = False) -> tuple[str, bool]:
305def python_type_to_uni(type_hint: Any, *, nullable: bool = False) -> tuple[str, bool]:
306    """
307    Convert a Python type hint to a Uni DataType string.
308
309    Args:
310        type_hint: The Python type hint to convert.
311        nullable: Whether the field is explicitly nullable.
312
313    Returns:
314        Tuple of (uni_data_type, is_nullable)
315
316    Raises:
317        TypeMappingError: If the type cannot be mapped.
318    """
319    # Unwrap Annotated if present
320    type_hint, _ = unwrap_annotated(type_hint)
321
322    # Check for optional (T | None)
323    is_opt, inner_type = is_optional(type_hint)
324    if is_opt:
325        uni_type, _ = python_type_to_uni(inner_type)
326        return uni_type, True
327
328    # Check for Vector types
329    dims = get_vector_dimensions(type_hint)
330    if dims is not None:
331        return f"vector:{dims}", nullable
332
333    # Check for list types
334    is_lst, elem_type = is_list_type(type_hint)
335    if is_lst:
336        if elem_type in (str, int, float, bool):
337            # Simple list types
338            elem_uni = TYPE_MAP.get(elem_type, "string")
339            return f"list:{elem_uni}", nullable
340        # Complex list types stored as JSON
341        return "json", nullable
342
343    # Direct type mapping
344    if type_hint in TYPE_MAP:
345        return TYPE_MAP[type_hint], nullable
346
347    # Handle generic dict types
348    origin = get_origin(type_hint)
349    if origin is dict:
350        return "json", nullable
351
352    # Handle forward references (strings)
353    if isinstance(type_hint, str):
354        # This is a forward reference, can't resolve here
355        raise TypeMappingError(
356            type_hint,
357            f"Cannot resolve forward reference {type_hint!r}. "
358            "Ensure the referenced class is defined before schema sync.",
359        )
360
361    raise TypeMappingError(type_hint)

Convert a Python type hint to a Uni DataType string.

Args: type_hint: The Python type hint to convert. nullable: Whether the field is explicitly nullable.

Returns: Tuple of (uni_data_type, is_nullable)

Raises: TypeMappingError: If the type cannot be mapped.

def uni_to_python_type(uni_type: str) -> type:
364def uni_to_python_type(uni_type: str) -> type:
365    """
366    Convert a Uni DataType string to a Python type.
367
368    Args:
369        uni_type: The Uni data type string.
370
371    Returns:
372        The corresponding Python type.
373    """
374    # Reverse mapping — manually constructed to avoid bytes overwriting str for "string"
375    _REVERSE_MAP: dict[str, type] = {
376        "string": str,
377        "int64": int,
378        "float64": float,
379        "bool": bool,
380        "datetime": datetime,
381        "date": date,
382        "time": time,
383        "duration": timedelta,
384        "json": dict,
385    }
386
387    # Handle vector types
388    if uni_type.startswith("vector:"):
389        return list  # Vectors are stored as list[float]
390
391    # Handle list types
392    if uni_type.startswith("list:"):
393        return list
394
395    return _REVERSE_MAP.get(uni_type.lower(), str)

Convert a Uni DataType string to a Python type.

Args: uni_type: The Uni data type string.

Returns: The corresponding Python type.

def get_vector_dimensions(type_hint: Any) -> int | None:
130def get_vector_dimensions(type_hint: Any) -> int | None:
131    """Extract vector dimensions from a Vector[N] type hint."""
132    if hasattr(type_hint, "__dimensions__"):
133        dims: int = type_hint.__dimensions__
134        return dims
135    origin = get_origin(type_hint)
136    if origin is Vector:
137        args = get_args(type_hint)
138        if args and isinstance(args[0], int):
139            return args[0]
140    return None

Extract vector dimensions from a Vector[N] type hint.

def is_optional(type_hint: Any) -> tuple[bool, typing.Any]:
143def is_optional(type_hint: Any) -> tuple[bool, Any]:
144    """
145    Check if a type hint is Optional (T | None).
146
147    Returns:
148        Tuple of (is_optional, inner_type)
149    """
150    origin = get_origin(type_hint)
151
152    # Handle Union types (including T | None which is Union[T, None])
153    if origin is Union:
154        args = get_args(type_hint)
155        non_none_args = [arg for arg in args if arg is not type(None)]
156        if len(non_none_args) == 1 and type(None) in args:
157            return True, non_none_args[0]
158
159    # Python 3.10+ uses types.UnionType for X | Y syntax
160    if isinstance(type_hint, types.UnionType):
161        args = get_args(type_hint)
162        non_none_args = [arg for arg in args if arg is not type(None)]
163        if len(non_none_args) == 1 and type(None) in args:
164            return True, non_none_args[0]
165
166    return False, type_hint

Check if a type hint is Optional (T | None).

Returns: Tuple of (is_optional, inner_type)

def is_list_type(type_hint: Any) -> tuple[bool, typing.Any | None]:
169def is_list_type(type_hint: Any) -> tuple[bool, Any | None]:
170    """
171    Check if a type hint is a list type.
172
173    Returns:
174        Tuple of (is_list, element_type)
175    """
176    origin = get_origin(type_hint)
177    if origin is list:
178        args = get_args(type_hint)
179        return True, args[0] if args else Any
180    return False, None

Check if a type hint is a list type.

Returns: Tuple of (is_list, element_type)

def unwrap_annotated(type_hint: Any) -> tuple[typing.Any, tuple[typing.Any, ...]]:
183def unwrap_annotated(type_hint: Any) -> tuple[Any, tuple[Any, ...]]:
184    """
185    Unwrap an Annotated type.
186
187    Returns:
188        Tuple of (base_type, metadata_tuple)
189    """
190    origin = get_origin(type_hint)
191    if origin is Annotated:
192        args = get_args(type_hint)
193        return args[0], args[1:]
194    return type_hint, ()

Unwrap an Annotated type.

Returns: Tuple of (base_type, metadata_tuple)

def python_to_db_value(value: Any, type_hint: Any) -> Any:
201def python_to_db_value(value: Any, type_hint: Any) -> Any:
202    """Convert a Python value to a database-compatible value.
203
204    Handles datetime→micros, date→days, time→micros, timedelta→micros,
205    Vector→list[float], and passes through everything else.
206    """
207    if value is None:
208        return None
209
210    # Unwrap Optional
211    _, inner = is_optional(type_hint)
212    if inner is not type_hint:
213        type_hint = inner
214
215    # Unwrap Annotated
216    type_hint, _ = unwrap_annotated(type_hint)
217
218    if isinstance(value, datetime):
219        return int(value.timestamp() * 1_000_000)
220    if isinstance(value, date):
221        epoch = date(1970, 1, 1)
222        return (value - epoch).days
223    if isinstance(value, time):
224        return (
225            value.hour * 3_600_000_000
226            + value.minute * 60_000_000
227            + value.second * 1_000_000
228            + value.microsecond
229        )
230    if isinstance(value, timedelta):
231        return int(value.total_seconds() * 1_000_000)
232
233    # Vector → list[float]
234    if isinstance(value, Vector):
235        return value.values
236
237    return value

Convert a Python value to a database-compatible value.

Handles datetime→micros, date→days, time→micros, timedelta→micros, Vector→list[float], and passes through everything else.

def db_to_python_value(value: Any, type_hint: Any) -> Any:
240def db_to_python_value(value: Any, type_hint: Any) -> Any:
241    """Convert a database value back to a Python value.
242
243    Converts int back to datetime/date/time/timedelta based on the model
244    field's type annotation.
245    """
246    if value is None:
247        return None
248
249    # Unwrap Optional
250    _, inner = is_optional(type_hint)
251    if inner is not type_hint:
252        type_hint = inner
253
254    # Unwrap Annotated
255    type_hint, _ = unwrap_annotated(type_hint)
256
257    if type_hint is datetime and isinstance(value, (int, float)):
258        # Microseconds since epoch → datetime
259        return datetime.fromtimestamp(value / 1_000_000)
260    elif type_hint is date and isinstance(value, (int, float)):
261        # Days since epoch → date
262        from datetime import date as date_cls
263        from datetime import timedelta as td_cls
264
265        epoch = date_cls(1970, 1, 1)
266        return epoch + td_cls(days=int(value))
267    elif type_hint is time and isinstance(value, (int, float)):
268        # Microseconds since midnight → time
269        total_us = int(value)
270        hours = total_us // 3_600_000_000
271        total_us %= 3_600_000_000
272        minutes = total_us // 60_000_000
273        total_us %= 60_000_000
274        seconds = total_us // 1_000_000
275        microseconds = total_us % 1_000_000
276        return time(hours, minutes, seconds, microseconds)
277    elif type_hint is timedelta and isinstance(value, (int, float)):
278        # Microseconds → timedelta
279        return timedelta(microseconds=int(value))
280
281    # Vector fields: list[float] → Vector
282    dims = get_vector_dimensions(type_hint)
283    if dims is not None and isinstance(value, list):
284        vec_cls = Vector[dims]
285        return vec_cls(value)
286
287    return value

Convert a database value back to a Python value.

Converts int back to datetime/date/time/timedelta based on the model field's type annotation.

DATETIME_TYPES = {<class 'datetime.date'>, <class 'datetime.time'>, <class 'datetime.timedelta'>, <class 'datetime.datetime'>}
class QueryBuilder(uni_pydantic.query._QueryBuilderBase[~NodeT]):
591class QueryBuilder(_QueryBuilderBase[NodeT]):
592    """
593    Immutable, type-safe query builder for graph queries.
594
595    Each method returns a **new** QueryBuilder instance. The original is
596    never mutated. Provides a fluent API for building Cypher queries
597    with type checking and IDE autocomplete support.
598
599    Example:
600        >>> adults = (
601        ...     session.query(Person)
602        ...     .filter(Person.age >= 18)
603        ...     .order_by(Person.name)
604        ...     .limit(10)
605        ...     .all()
606        ... )
607    """
608
609    def __init__(self, session: UniSession, model: type[NodeT]) -> None:
610        self._init_state(session, model)
611
612    def _execute_query(
613        self, cypher: str, params: dict[str, Any]
614    ) -> list[dict[str, Any]]:
615        """Execute a query, using query_with if timeout/max_memory is set."""
616        if self._timeout is not None or self._max_memory is not None:
617            builder = self._session._db.query_with(cypher)
618            if params:
619                builder = builder.params(params)
620            if self._timeout is not None:
621                builder = builder.timeout(self._timeout)
622            if self._max_memory is not None:
623                builder = builder.max_memory(self._max_memory)
624            return builder.fetch_all()
625        return self._session._db.query(cypher, params)
626
627    def all(self) -> list[NodeT]:
628        """Execute the query and return all results."""
629        cypher, params = self._build_cypher()
630        results = self._execute_query(cypher, params)
631        instances = self._rows_to_instances(results)
632        if self._eager_load and instances:
633            self._session._eager_load_relationships(instances, self._eager_load)
634        return instances
635
636    def first(self) -> NodeT | None:
637        """Execute the query and return the first result."""
638        clone = self._clone()
639        clone._limit = 1
640        results = clone.all()
641        return results[0] if results else None
642
643    def one(self) -> NodeT:
644        """Execute the query and return exactly one result.
645
646        Raises QueryError if no results or more than one result.
647        """
648        clone = self._clone()
649        clone._limit = 2
650        results = clone.all()
651        if not results:
652            raise QueryError("Query returned no results")
653        if len(results) > 1:
654            raise QueryError("Query returned more than one result")
655        return results[0]
656
657    def count(self) -> int:
658        """Execute the query and return the count of results."""
659        cypher, params = self._build_count_cypher()
660        results = self._execute_query(cypher, params)
661        return cast(int, results[0]["count"]) if results else 0
662
663    def exists(self) -> bool:
664        """Check if any matching records exist."""
665        cypher, params = self._build_exists_cypher()
666        results = self._execute_query(cypher, params)
667        return len(results) > 0
668
669    def delete(self) -> int:
670        """Delete all matching records (DETACH DELETE)."""
671        cypher, params = self._build_delete_cypher()
672        results = self._session._db.query(cypher, params)
673        return cast(int, results[0]["count"]) if results else 0
674
675    def update(self, **kwargs: Any) -> int:
676        """Update all matching records."""
677        cypher, params = self._build_update_cypher(**kwargs)
678        results = self._session._db.query(cypher, params)
679        return cast(int, results[0]["count"]) if results else 0

Immutable, type-safe query builder for graph queries.

Each method returns a new QueryBuilder instance. The original is never mutated. Provides a fluent API for building Cypher queries with type checking and IDE autocomplete support.

Example:

adults = ( ... session.query(Person) ... .filter(Person.age >= 18) ... .order_by(Person.name) ... .limit(10) ... .all() ... )

QueryBuilder(session: UniSession, model: type[~NodeT])
609    def __init__(self, session: UniSession, model: type[NodeT]) -> None:
610        self._init_state(session, model)
def all(self) -> list[~NodeT]:
627    def all(self) -> list[NodeT]:
628        """Execute the query and return all results."""
629        cypher, params = self._build_cypher()
630        results = self._execute_query(cypher, params)
631        instances = self._rows_to_instances(results)
632        if self._eager_load and instances:
633            self._session._eager_load_relationships(instances, self._eager_load)
634        return instances

Execute the query and return all results.

def first(self) -> Optional[~NodeT]:
636    def first(self) -> NodeT | None:
637        """Execute the query and return the first result."""
638        clone = self._clone()
639        clone._limit = 1
640        results = clone.all()
641        return results[0] if results else None

Execute the query and return the first result.

def one(self) -> ~NodeT:
643    def one(self) -> NodeT:
644        """Execute the query and return exactly one result.
645
646        Raises QueryError if no results or more than one result.
647        """
648        clone = self._clone()
649        clone._limit = 2
650        results = clone.all()
651        if not results:
652            raise QueryError("Query returned no results")
653        if len(results) > 1:
654            raise QueryError("Query returned more than one result")
655        return results[0]

Execute the query and return exactly one result.

Raises QueryError if no results or more than one result.

def count(self) -> int:
657    def count(self) -> int:
658        """Execute the query and return the count of results."""
659        cypher, params = self._build_count_cypher()
660        results = self._execute_query(cypher, params)
661        return cast(int, results[0]["count"]) if results else 0

Execute the query and return the count of results.

def exists(self) -> bool:
663    def exists(self) -> bool:
664        """Check if any matching records exist."""
665        cypher, params = self._build_exists_cypher()
666        results = self._execute_query(cypher, params)
667        return len(results) > 0

Check if any matching records exist.

def delete(self) -> int:
669    def delete(self) -> int:
670        """Delete all matching records (DETACH DELETE)."""
671        cypher, params = self._build_delete_cypher()
672        results = self._session._db.query(cypher, params)
673        return cast(int, results[0]["count"]) if results else 0

Delete all matching records (DETACH DELETE).

def update(self, **kwargs: Any) -> int:
675    def update(self, **kwargs: Any) -> int:
676        """Update all matching records."""
677        cypher, params = self._build_update_cypher(**kwargs)
678        results = self._session._db.query(cypher, params)
679        return cast(int, results[0]["count"]) if results else 0

Update all matching records.

class AsyncQueryBuilder(uni_pydantic.query._QueryBuilderBase[~NodeT]):
 26class AsyncQueryBuilder(_QueryBuilderBase[NodeT]):
 27    """
 28    Immutable, async query builder for graph queries.
 29
 30    Inherits all Cypher-building and immutable builder methods from
 31    ``_QueryBuilderBase``. Only the execution methods are async.
 32    """
 33
 34    def __init__(self, session: AsyncUniSession, model: type[NodeT]) -> None:
 35        self._init_state(session, model)
 36
 37    async def _execute_query(
 38        self, cypher: str, params: dict[str, Any]
 39    ) -> list[dict[str, Any]]:
 40        """Execute a query, using query_with if timeout/max_memory is set."""
 41        if self._timeout is not None or self._max_memory is not None:
 42            builder = self._session._db.query_with(cypher)
 43            if params:
 44                builder = builder.params(params)
 45            if self._timeout is not None:
 46                builder = builder.timeout(self._timeout)
 47            if self._max_memory is not None:
 48                builder = builder.max_memory(self._max_memory)
 49            return await builder.run()
 50        return await self._session._db.query(cypher, params)
 51
 52    async def all(self) -> list[NodeT]:
 53        """Execute the query and return all results."""
 54        cypher, params = self._build_cypher()
 55        results = await self._execute_query(cypher, params)
 56        instances = self._rows_to_instances(results)
 57        if self._eager_load and instances:
 58            await self._session._async_eager_load_relationships(
 59                instances, self._eager_load
 60            )
 61        return instances
 62
 63    async def first(self) -> NodeT | None:
 64        """Execute the query and return the first result."""
 65        clone = self._clone()
 66        clone._limit = 1
 67        results = await clone.all()
 68        return results[0] if results else None
 69
 70    async def one(self) -> NodeT:
 71        """Execute the query and return exactly one result.
 72
 73        Raises QueryError if no results or more than one result.
 74        """
 75        clone = self._clone()
 76        clone._limit = 2
 77        results = await clone.all()
 78        if not results:
 79            raise QueryError("Query returned no results")
 80        if len(results) > 1:
 81            raise QueryError("Query returned more than one result")
 82        return results[0]
 83
 84    async def count(self) -> int:
 85        """Execute the query and return the count of results."""
 86        cypher, params = self._build_count_cypher()
 87        results = await self._execute_query(cypher, params)
 88        return cast(int, results[0]["count"]) if results else 0
 89
 90    async def exists(self) -> bool:
 91        """Check if any matching records exist."""
 92        cypher, params = self._build_exists_cypher()
 93        results = await self._execute_query(cypher, params)
 94        return len(results) > 0
 95
 96    async def delete(self) -> int:
 97        """Delete all matching records (DETACH DELETE)."""
 98        cypher, params = self._build_delete_cypher()
 99        results = await self._session._db.query(cypher, params)
100        return cast(int, results[0]["count"]) if results else 0
101
102    async def update(self, **kwargs: Any) -> int:
103        """Update all matching records."""
104        cypher, params = self._build_update_cypher(**kwargs)
105        results = await self._session._db.query(cypher, params)
106        return cast(int, results[0]["count"]) if results else 0

Immutable, async query builder for graph queries.

Inherits all Cypher-building and immutable builder methods from _QueryBuilderBase. Only the execution methods are async.

AsyncQueryBuilder( session: AsyncUniSession, model: type[~NodeT])
34    def __init__(self, session: AsyncUniSession, model: type[NodeT]) -> None:
35        self._init_state(session, model)
async def all(self) -> list[~NodeT]:
52    async def all(self) -> list[NodeT]:
53        """Execute the query and return all results."""
54        cypher, params = self._build_cypher()
55        results = await self._execute_query(cypher, params)
56        instances = self._rows_to_instances(results)
57        if self._eager_load and instances:
58            await self._session._async_eager_load_relationships(
59                instances, self._eager_load
60            )
61        return instances

Execute the query and return all results.

async def first(self) -> Optional[~NodeT]:
63    async def first(self) -> NodeT | None:
64        """Execute the query and return the first result."""
65        clone = self._clone()
66        clone._limit = 1
67        results = await clone.all()
68        return results[0] if results else None

Execute the query and return the first result.

async def one(self) -> ~NodeT:
70    async def one(self) -> NodeT:
71        """Execute the query and return exactly one result.
72
73        Raises QueryError if no results or more than one result.
74        """
75        clone = self._clone()
76        clone._limit = 2
77        results = await clone.all()
78        if not results:
79            raise QueryError("Query returned no results")
80        if len(results) > 1:
81            raise QueryError("Query returned more than one result")
82        return results[0]

Execute the query and return exactly one result.

Raises QueryError if no results or more than one result.

async def count(self) -> int:
84    async def count(self) -> int:
85        """Execute the query and return the count of results."""
86        cypher, params = self._build_count_cypher()
87        results = await self._execute_query(cypher, params)
88        return cast(int, results[0]["count"]) if results else 0

Execute the query and return the count of results.

async def exists(self) -> bool:
90    async def exists(self) -> bool:
91        """Check if any matching records exist."""
92        cypher, params = self._build_exists_cypher()
93        results = await self._execute_query(cypher, params)
94        return len(results) > 0

Check if any matching records exist.

async def delete(self) -> int:
 96    async def delete(self) -> int:
 97        """Delete all matching records (DETACH DELETE)."""
 98        cypher, params = self._build_delete_cypher()
 99        results = await self._session._db.query(cypher, params)
100        return cast(int, results[0]["count"]) if results else 0

Delete all matching records (DETACH DELETE).

async def update(self, **kwargs: Any) -> int:
102    async def update(self, **kwargs: Any) -> int:
103        """Update all matching records."""
104        cypher, params = self._build_update_cypher(**kwargs)
105        results = await self._session._db.query(cypher, params)
106        return cast(int, results[0]["count"]) if results else 0

Update all matching records.

@dataclass
class FilterExpr:
116@dataclass
117class FilterExpr:
118    """A filter expression for a query."""
119
120    property_name: str
121    op: FilterOp
122    value: Any = None
123
124    def to_cypher(self, node_var: str, param_name: str) -> tuple[str, dict[str, Any]]:
125        """Convert to Cypher WHERE clause fragment."""
126        prop = f"{node_var}.{self.property_name}"
127
128        if self.op == FilterOp.IS_NULL:
129            return f"{prop} IS NULL", {}
130        elif self.op == FilterOp.IS_NOT_NULL:
131            return f"{prop} IS NOT NULL", {}
132        elif self.op == FilterOp.IN:
133            return f"{prop} IN ${param_name}", {param_name: self.value}
134        elif self.op == FilterOp.NOT_IN:
135            return f"NOT {prop} IN ${param_name}", {param_name: self.value}
136        elif self.op == FilterOp.LIKE:
137            return f"{prop} =~ ${param_name}", {param_name: self.value}
138        elif self.op == FilterOp.STARTS_WITH:
139            return f"{prop} STARTS WITH ${param_name}", {param_name: self.value}
140        elif self.op == FilterOp.ENDS_WITH:
141            return f"{prop} ENDS WITH ${param_name}", {param_name: self.value}
142        elif self.op == FilterOp.CONTAINS:
143            return f"{prop} CONTAINS ${param_name}", {param_name: self.value}
144        else:
145            return f"{prop} {self.op.value} ${param_name}", {param_name: self.value}

A filter expression for a query.

FilterExpr( property_name: str, op: FilterOp, value: Any = None)
property_name: str
op: FilterOp
value: Any = None
def to_cypher( self, node_var: str, param_name: str) -> tuple[str, dict[str, typing.Any]]:
124    def to_cypher(self, node_var: str, param_name: str) -> tuple[str, dict[str, Any]]:
125        """Convert to Cypher WHERE clause fragment."""
126        prop = f"{node_var}.{self.property_name}"
127
128        if self.op == FilterOp.IS_NULL:
129            return f"{prop} IS NULL", {}
130        elif self.op == FilterOp.IS_NOT_NULL:
131            return f"{prop} IS NOT NULL", {}
132        elif self.op == FilterOp.IN:
133            return f"{prop} IN ${param_name}", {param_name: self.value}
134        elif self.op == FilterOp.NOT_IN:
135            return f"NOT {prop} IN ${param_name}", {param_name: self.value}
136        elif self.op == FilterOp.LIKE:
137            return f"{prop} =~ ${param_name}", {param_name: self.value}
138        elif self.op == FilterOp.STARTS_WITH:
139            return f"{prop} STARTS WITH ${param_name}", {param_name: self.value}
140        elif self.op == FilterOp.ENDS_WITH:
141            return f"{prop} ENDS WITH ${param_name}", {param_name: self.value}
142        elif self.op == FilterOp.CONTAINS:
143            return f"{prop} CONTAINS ${param_name}", {param_name: self.value}
144        else:
145            return f"{prop} {self.op.value} ${param_name}", {param_name: self.value}

Convert to Cypher WHERE clause fragment.

class FilterOp(enum.Enum):
 97class FilterOp(Enum):
 98    """Filter operation types."""
 99
100    EQ = "="
101    NE = "<>"
102    LT = "<"
103    LE = "<="
104    GT = ">"
105    GE = ">="
106    IN = "IN"
107    NOT_IN = "NOT IN"
108    LIKE = "=~"
109    IS_NULL = "IS NULL"
110    IS_NOT_NULL = "IS NOT NULL"
111    STARTS_WITH = "STARTS WITH"
112    ENDS_WITH = "ENDS WITH"
113    CONTAINS = "CONTAINS"

Filter operation types.

EQ = <FilterOp.EQ: '='>
NE = <FilterOp.NE: '<>'>
LT = <FilterOp.LT: '<'>
LE = <FilterOp.LE: '<='>
GT = <FilterOp.GT: '>'>
GE = <FilterOp.GE: '>='>
IN = <FilterOp.IN: 'IN'>
NOT_IN = <FilterOp.NOT_IN: 'NOT IN'>
LIKE = <FilterOp.LIKE: '=~'>
IS_NULL = <FilterOp.IS_NULL: 'IS NULL'>
IS_NOT_NULL = <FilterOp.IS_NOT_NULL: 'IS NOT NULL'>
STARTS_WITH = <FilterOp.STARTS_WITH: 'STARTS WITH'>
ENDS_WITH = <FilterOp.ENDS_WITH: 'ENDS WITH'>
CONTAINS = <FilterOp.CONTAINS: 'CONTAINS'>
class PropertyProxy(typing.Generic[~T]):
148class PropertyProxy(Generic[T]):
149    """
150    Proxy for model properties that enables filter expressions.
151
152    Used in query builder to create type-safe filter conditions.
153
154    Example:
155        >>> query.filter(Person.age >= 18)
156        >>> query.filter(Person.name.starts_with("A"))
157    """
158
159    def __init__(self, property_name: str, model: type[UniNode]) -> None:
160        self._property_name = property_name
161        self._model = model
162
163    def __eq__(self, other: Any) -> FilterExpr:  # type: ignore[override]
164        return FilterExpr(self._property_name, FilterOp.EQ, other)
165
166    def __ne__(self, other: Any) -> FilterExpr:  # type: ignore[override]
167        return FilterExpr(self._property_name, FilterOp.NE, other)
168
169    def __lt__(self, other: Any) -> FilterExpr:
170        return FilterExpr(self._property_name, FilterOp.LT, other)
171
172    def __le__(self, other: Any) -> FilterExpr:
173        return FilterExpr(self._property_name, FilterOp.LE, other)
174
175    def __gt__(self, other: Any) -> FilterExpr:
176        return FilterExpr(self._property_name, FilterOp.GT, other)
177
178    def __ge__(self, other: Any) -> FilterExpr:
179        return FilterExpr(self._property_name, FilterOp.GE, other)
180
181    def in_(self, values: Sequence[T]) -> FilterExpr:
182        """Check if value is in a list."""
183        return FilterExpr(self._property_name, FilterOp.IN, list(values))
184
185    def not_in(self, values: Sequence[T]) -> FilterExpr:
186        """Check if value is not in a list."""
187        return FilterExpr(self._property_name, FilterOp.NOT_IN, list(values))
188
189    def like(self, pattern: str) -> FilterExpr:
190        """Match a regex pattern."""
191        return FilterExpr(self._property_name, FilterOp.LIKE, pattern)
192
193    def is_null(self) -> FilterExpr:
194        """Check if value is null."""
195        return FilterExpr(self._property_name, FilterOp.IS_NULL)
196
197    def is_not_null(self) -> FilterExpr:
198        """Check if value is not null."""
199        return FilterExpr(self._property_name, FilterOp.IS_NOT_NULL)
200
201    def starts_with(self, prefix: str) -> FilterExpr:
202        """Check if string starts with prefix."""
203        return FilterExpr(self._property_name, FilterOp.STARTS_WITH, prefix)
204
205    def ends_with(self, suffix: str) -> FilterExpr:
206        """Check if string ends with suffix."""
207        return FilterExpr(self._property_name, FilterOp.ENDS_WITH, suffix)
208
209    def contains(self, substring: str) -> FilterExpr:
210        """Check if string contains substring."""
211        return FilterExpr(self._property_name, FilterOp.CONTAINS, substring)

Proxy for model properties that enables filter expressions.

Used in query builder to create type-safe filter conditions.

Example:

query.filter(Person.age >= 18) query.filter(Person.name.starts_with("A"))

PropertyProxy(property_name: str, model: type[UniNode])
159    def __init__(self, property_name: str, model: type[UniNode]) -> None:
160        self._property_name = property_name
161        self._model = model
def in_(self, values: Sequence[~T]) -> FilterExpr:
181    def in_(self, values: Sequence[T]) -> FilterExpr:
182        """Check if value is in a list."""
183        return FilterExpr(self._property_name, FilterOp.IN, list(values))

Check if value is in a list.

def not_in(self, values: Sequence[~T]) -> FilterExpr:
185    def not_in(self, values: Sequence[T]) -> FilterExpr:
186        """Check if value is not in a list."""
187        return FilterExpr(self._property_name, FilterOp.NOT_IN, list(values))

Check if value is not in a list.

def like(self, pattern: str) -> FilterExpr:
189    def like(self, pattern: str) -> FilterExpr:
190        """Match a regex pattern."""
191        return FilterExpr(self._property_name, FilterOp.LIKE, pattern)

Match a regex pattern.

def is_null(self) -> FilterExpr:
193    def is_null(self) -> FilterExpr:
194        """Check if value is null."""
195        return FilterExpr(self._property_name, FilterOp.IS_NULL)

Check if value is null.

def is_not_null(self) -> FilterExpr:
197    def is_not_null(self) -> FilterExpr:
198        """Check if value is not null."""
199        return FilterExpr(self._property_name, FilterOp.IS_NOT_NULL)

Check if value is not null.

def starts_with(self, prefix: str) -> FilterExpr:
201    def starts_with(self, prefix: str) -> FilterExpr:
202        """Check if string starts with prefix."""
203        return FilterExpr(self._property_name, FilterOp.STARTS_WITH, prefix)

Check if string starts with prefix.

def ends_with(self, suffix: str) -> FilterExpr:
205    def ends_with(self, suffix: str) -> FilterExpr:
206        """Check if string ends with suffix."""
207        return FilterExpr(self._property_name, FilterOp.ENDS_WITH, suffix)

Check if string ends with suffix.

def contains(self, substring: str) -> FilterExpr:
209    def contains(self, substring: str) -> FilterExpr:
210        """Check if string contains substring."""
211        return FilterExpr(self._property_name, FilterOp.CONTAINS, substring)

Check if string contains substring.

class ModelProxy(typing.Generic[~NodeT]):
214class ModelProxy(Generic[NodeT]):
215    """
216    Proxy for model classes that provides property proxies.
217
218    Enables type-safe property access in query filters.
219
220    Example:
221        >>> Person.name  # Returns PropertyProxy for 'name'
222        >>> query.filter(Person.age >= 18)
223    """
224
225    def __init__(self, model: type[NodeT]) -> None:
226        self._model = model
227
228    def __getattr__(self, name: str) -> PropertyProxy[Any]:
229        if name.startswith("_"):
230            raise AttributeError(name)
231        return PropertyProxy(name, self._model)

Proxy for model classes that provides property proxies.

Enables type-safe property access in query filters.

Example:

Person.name # Returns PropertyProxy for 'name' query.filter(Person.age >= 18)

ModelProxy(model: type[~NodeT])
225    def __init__(self, model: type[NodeT]) -> None:
226        self._model = model
@dataclass
class OrderByClause:
234@dataclass
235class OrderByClause:
236    """An ORDER BY clause."""
237
238    property_name: str
239    descending: bool = False

An ORDER BY clause.

OrderByClause(property_name: str, descending: bool = False)
property_name: str
descending: bool = False
@dataclass
class TraversalStep:
242@dataclass
243class TraversalStep:
244    """A relationship traversal step."""
245
246    edge_type: str
247    direction: Literal["outgoing", "incoming", "both"]
248    target_label: str | None = None

A relationship traversal step.

TraversalStep( edge_type: str, direction: Literal['outgoing', 'incoming', 'both'], target_label: str | None = None)
edge_type: str
direction: Literal['outgoing', 'incoming', 'both']
target_label: str | None = None
@dataclass
class VectorSearchConfig:
251@dataclass
252class VectorSearchConfig:
253    """Configuration for vector similarity search."""
254
255    property_name: str
256    query_vector: list[float]
257    k: int
258    threshold: float | None = None
259    pre_filter: str | None = None

Configuration for vector similarity search.

VectorSearchConfig( property_name: str, query_vector: list[float], k: int, threshold: float | None = None, pre_filter: str | None = None)
property_name: str
query_vector: list[float]
k: int
threshold: float | None = None
pre_filter: str | None = None
class SchemaGenerator:
 60class SchemaGenerator:
 61    """Generates Uni database schema from registered models."""
 62
 63    def __init__(self) -> None:
 64        self._node_models: dict[str, type[UniNode]] = {}
 65        self._edge_models: dict[str, type[UniEdge]] = {}
 66        self._schema: DatabaseSchema | None = None
 67
 68    def register_node(self, model: type[UniNode]) -> None:
 69        """Register a node model for schema generation."""
 70        label = model.__label__
 71        if not label:
 72            raise SchemaError(f"Model {model.__name__} has no __label__", model)
 73        self._node_models[label] = model
 74        self._schema = None  # Invalidate cached schema
 75
 76    def register_edge(self, model: type[UniEdge]) -> None:
 77        """Register an edge model for schema generation."""
 78        edge_type = model.__edge_type__
 79        if not edge_type:
 80            raise SchemaError(f"Model {model.__name__} has no __edge_type__", model)
 81        self._edge_models[edge_type] = model
 82        self._schema = None
 83
 84    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
 85        """Register multiple models."""
 86        for model in models:
 87            if issubclass(model, UniEdge):
 88                self.register_edge(model)
 89            elif issubclass(model, UniNode):
 90                self.register_node(model)
 91            else:
 92                raise SchemaError(
 93                    f"Model {model.__name__} must be a subclass of UniNode or UniEdge"
 94                )
 95
 96    def _generate_property_schema(
 97        self,
 98        model: type[UniNode] | type[UniEdge],
 99        field_name: str,
100    ) -> PropertySchema:
101        """Generate schema for a single property field."""
102        field_info = model.model_fields[field_name]
103
104        # Get type hints with forward refs resolved
105        try:
106            hints = get_type_hints(model)
107            type_hint = hints.get(field_name, field_info.annotation)
108        except Exception:
109            type_hint = field_info.annotation
110
111        # Check for nullability
112        is_nullable, inner_type = is_optional(type_hint)
113
114        # Get Uni data type
115        data_type, nullable = python_type_to_uni(type_hint, nullable=is_nullable)
116
117        # Check for vector dimensions
118        vec_dims = get_vector_dimensions(inner_type if is_nullable else type_hint)
119        if vec_dims:
120            data_type = f"vector:{vec_dims}"
121
122        # Get field config for index settings
123        config = get_field_config(field_info)
124        index_type = config.index if config else None
125        unique = config.unique if config else False
126        tokenizer = config.tokenizer if config else None
127        metric = config.metric if config else None
128
129        # Auto-create vector index for Vector fields (regardless of Field config)
130        if vec_dims and not index_type:
131            index_type = "vector"
132
133        return PropertySchema(
134            name=field_name,
135            data_type=data_type,
136            nullable=nullable,
137            index_type=index_type,
138            unique=unique,
139            tokenizer=tokenizer,
140            metric=metric,
141        )
142
143    def _generate_label_schema(self, model: type[UniNode]) -> LabelSchema:
144        """Generate schema for a node model."""
145        label = model.__label__
146
147        properties = {}
148        for field_name in model.get_property_fields():
149            prop_schema = self._generate_property_schema(model, field_name)
150            properties[field_name] = prop_schema
151
152        return LabelSchema(
153            name=label,
154            properties=properties,
155        )
156
157    def _generate_edge_type_schema(self, model: type[UniEdge]) -> EdgeTypeSchema:
158        """Generate schema for an edge model."""
159        edge_type = model.__edge_type__
160        from_labels = model.get_from_labels()
161        to_labels = model.get_to_labels()
162
163        # If from/to not specified, allow any labels
164        if not from_labels:
165            from_labels = list(self._node_models.keys())
166        if not to_labels:
167            to_labels = list(self._node_models.keys())
168
169        properties = {}
170        for field_name in model.get_property_fields():
171            prop_schema = self._generate_property_schema(model, field_name)
172            properties[field_name] = prop_schema
173
174        return EdgeTypeSchema(
175            name=edge_type,
176            from_labels=from_labels,
177            to_labels=to_labels,
178            properties=properties,
179        )
180
181    def generate(self) -> DatabaseSchema:
182        """Generate the complete database schema."""
183        if self._schema is not None:
184            return self._schema
185
186        schema = DatabaseSchema()
187
188        # Generate label schemas
189        for label, model in self._node_models.items():
190            schema.labels[label] = self._generate_label_schema(model)
191
192        # Generate edge type schemas
193        for edge_type_name, edge_model in self._edge_models.items():
194            schema.edge_types[edge_type_name] = self._generate_edge_type_schema(
195                edge_model
196            )
197
198        # Also generate labels from relationships in node models
199        for model in self._node_models.values():
200            for rel_name, rel_config in model.get_relationship_fields().items():
201                edge_type = rel_config.edge_type
202                if edge_type not in schema.edge_types:
203                    # Create a minimal edge type schema
204                    schema.edge_types[edge_type] = EdgeTypeSchema(
205                        name=edge_type,
206                        from_labels=list(self._node_models.keys()),
207                        to_labels=list(self._node_models.keys()),
208                    )
209
210        self._schema = schema
211        return schema
212
213    def apply_to_database(self, db: uni_db.Database) -> None:
214        """Apply the generated schema to a database using SchemaBuilder.
215
216        Uses db.schema() for atomic schema application with additive-only
217        semantics. Creates labels, edge types, properties, and indexes.
218        """
219        schema = self.generate()
220
221        # Build the full schema using SchemaBuilder, skipping existing labels/edge types
222        builder = db.schema()
223        has_changes = False
224
225        for label, label_schema in schema.labels.items():
226            if db.label_exists(label):
227                continue  # Additive-only: skip existing labels
228            lb = builder.label(label)
229            for prop in label_schema.properties.values():
230                # Check for vector type
231                if prop.data_type.startswith("vector:"):
232                    dims = int(prop.data_type.split(":")[1])
233                    lb = lb.vector(prop.name, dims)
234                elif prop.nullable:
235                    lb = lb.property_nullable(prop.name, prop.data_type)
236                else:
237                    lb = lb.property(prop.name, prop.data_type)
238
239                # Add indexes (not vector — vector is handled by .vector())
240                if prop.index_type and prop.index_type in ("btree", "hash"):
241                    lb = lb.index(prop.name, prop.index_type)
242            builder = lb.done()
243            has_changes = True
244
245        for edge_type, edge_schema in schema.edge_types.items():
246            if db.edge_type_exists(edge_type):
247                continue  # Skip existing edge types
248            eb = builder.edge_type(
249                edge_type, edge_schema.from_labels, edge_schema.to_labels
250            )
251            for prop in edge_schema.properties.values():
252                if prop.nullable:
253                    eb = eb.property_nullable(prop.name, prop.data_type)
254                else:
255                    eb = eb.property(prop.name, prop.data_type)
256            builder = eb.done()
257            has_changes = True
258
259        if has_changes:
260            builder.apply()
261
262        # Create vector indexes separately (SchemaBuilder may not handle them)
263        for label, label_schema in schema.labels.items():
264            for prop in label_schema.properties.values():
265                if prop.index_type == "vector":
266                    metric = prop.metric or "l2"
267                    try:
268                        db.create_vector_index(label, prop.name, metric)
269                    except Exception:
270                        pass  # Index may already exist
271
272        # Create fulltext indexes separately
273        for label, label_schema in schema.labels.items():
274            for prop in label_schema.properties.values():
275                if prop.index_type == "fulltext":
276                    try:
277                        db.create_scalar_index(label, prop.name, "fulltext")
278                    except Exception:
279                        pass  # Index may already exist or not supported
280
281    async def async_apply_to_database(self, db: uni_db.AsyncDatabase) -> None:
282        """Apply the generated schema to an async database.
283
284        Async variant of apply_to_database using AsyncSchemaBuilder.
285        """
286        schema = self.generate()
287
288        # Build the full schema using AsyncSchemaBuilder, skipping existing labels/edge types
289        builder = db.schema()
290        has_changes = False
291
292        for label, label_schema in schema.labels.items():
293            if await db.label_exists(label):
294                continue
295            lb = builder.label(label)
296            for prop in label_schema.properties.values():
297                if prop.data_type.startswith("vector:"):
298                    dims = int(prop.data_type.split(":")[1])
299                    lb = lb.vector(prop.name, dims)
300                elif prop.nullable:
301                    lb = lb.property_nullable(prop.name, prop.data_type)
302                else:
303                    lb = lb.property(prop.name, prop.data_type)
304
305                if prop.index_type and prop.index_type in ("btree", "hash"):
306                    lb = lb.index(prop.name, prop.index_type)
307            builder = lb.done()
308            has_changes = True
309
310        for edge_type, edge_schema in schema.edge_types.items():
311            if await db.edge_type_exists(edge_type):
312                continue
313            eb = builder.edge_type(
314                edge_type, edge_schema.from_labels, edge_schema.to_labels
315            )
316            for prop in edge_schema.properties.values():
317                if prop.nullable:
318                    eb = eb.property_nullable(prop.name, prop.data_type)
319                else:
320                    eb = eb.property(prop.name, prop.data_type)
321            builder = eb.done()
322            has_changes = True
323
324        if has_changes:
325            await builder.apply()
326
327        # Create vector indexes separately
328        for label, label_schema in schema.labels.items():
329            for prop in label_schema.properties.values():
330                if prop.index_type == "vector":
331                    metric = prop.metric or "l2"
332                    try:
333                        await db.create_vector_index(label, prop.name, metric)
334                    except Exception:
335                        pass
336
337        # Create fulltext indexes separately
338        for label, label_schema in schema.labels.items():
339            for prop in label_schema.properties.values():
340                if prop.index_type == "fulltext":
341                    try:
342                        await db.create_scalar_index(label, prop.name, "fulltext")
343                    except Exception:
344                        pass

Generates Uni database schema from registered models.

def register_node(self, model: type[UniNode]) -> None:
68    def register_node(self, model: type[UniNode]) -> None:
69        """Register a node model for schema generation."""
70        label = model.__label__
71        if not label:
72            raise SchemaError(f"Model {model.__name__} has no __label__", model)
73        self._node_models[label] = model
74        self._schema = None  # Invalidate cached schema

Register a node model for schema generation.

def register_edge(self, model: type[UniEdge]) -> None:
76    def register_edge(self, model: type[UniEdge]) -> None:
77        """Register an edge model for schema generation."""
78        edge_type = model.__edge_type__
79        if not edge_type:
80            raise SchemaError(f"Model {model.__name__} has no __edge_type__", model)
81        self._edge_models[edge_type] = model
82        self._schema = None

Register an edge model for schema generation.

def register( self, *models: type[UniNode] | type[UniEdge]) -> None:
84    def register(self, *models: type[UniNode] | type[UniEdge]) -> None:
85        """Register multiple models."""
86        for model in models:
87            if issubclass(model, UniEdge):
88                self.register_edge(model)
89            elif issubclass(model, UniNode):
90                self.register_node(model)
91            else:
92                raise SchemaError(
93                    f"Model {model.__name__} must be a subclass of UniNode or UniEdge"
94                )

Register multiple models.

def generate(self) -> DatabaseSchema:
181    def generate(self) -> DatabaseSchema:
182        """Generate the complete database schema."""
183        if self._schema is not None:
184            return self._schema
185
186        schema = DatabaseSchema()
187
188        # Generate label schemas
189        for label, model in self._node_models.items():
190            schema.labels[label] = self._generate_label_schema(model)
191
192        # Generate edge type schemas
193        for edge_type_name, edge_model in self._edge_models.items():
194            schema.edge_types[edge_type_name] = self._generate_edge_type_schema(
195                edge_model
196            )
197
198        # Also generate labels from relationships in node models
199        for model in self._node_models.values():
200            for rel_name, rel_config in model.get_relationship_fields().items():
201                edge_type = rel_config.edge_type
202                if edge_type not in schema.edge_types:
203                    # Create a minimal edge type schema
204                    schema.edge_types[edge_type] = EdgeTypeSchema(
205                        name=edge_type,
206                        from_labels=list(self._node_models.keys()),
207                        to_labels=list(self._node_models.keys()),
208                    )
209
210        self._schema = schema
211        return schema

Generate the complete database schema.

def apply_to_database(self, db: Database) -> None:
213    def apply_to_database(self, db: uni_db.Database) -> None:
214        """Apply the generated schema to a database using SchemaBuilder.
215
216        Uses db.schema() for atomic schema application with additive-only
217        semantics. Creates labels, edge types, properties, and indexes.
218        """
219        schema = self.generate()
220
221        # Build the full schema using SchemaBuilder, skipping existing labels/edge types
222        builder = db.schema()
223        has_changes = False
224
225        for label, label_schema in schema.labels.items():
226            if db.label_exists(label):
227                continue  # Additive-only: skip existing labels
228            lb = builder.label(label)
229            for prop in label_schema.properties.values():
230                # Check for vector type
231                if prop.data_type.startswith("vector:"):
232                    dims = int(prop.data_type.split(":")[1])
233                    lb = lb.vector(prop.name, dims)
234                elif prop.nullable:
235                    lb = lb.property_nullable(prop.name, prop.data_type)
236                else:
237                    lb = lb.property(prop.name, prop.data_type)
238
239                # Add indexes (not vector — vector is handled by .vector())
240                if prop.index_type and prop.index_type in ("btree", "hash"):
241                    lb = lb.index(prop.name, prop.index_type)
242            builder = lb.done()
243            has_changes = True
244
245        for edge_type, edge_schema in schema.edge_types.items():
246            if db.edge_type_exists(edge_type):
247                continue  # Skip existing edge types
248            eb = builder.edge_type(
249                edge_type, edge_schema.from_labels, edge_schema.to_labels
250            )
251            for prop in edge_schema.properties.values():
252                if prop.nullable:
253                    eb = eb.property_nullable(prop.name, prop.data_type)
254                else:
255                    eb = eb.property(prop.name, prop.data_type)
256            builder = eb.done()
257            has_changes = True
258
259        if has_changes:
260            builder.apply()
261
262        # Create vector indexes separately (SchemaBuilder may not handle them)
263        for label, label_schema in schema.labels.items():
264            for prop in label_schema.properties.values():
265                if prop.index_type == "vector":
266                    metric = prop.metric or "l2"
267                    try:
268                        db.create_vector_index(label, prop.name, metric)
269                    except Exception:
270                        pass  # Index may already exist
271
272        # Create fulltext indexes separately
273        for label, label_schema in schema.labels.items():
274            for prop in label_schema.properties.values():
275                if prop.index_type == "fulltext":
276                    try:
277                        db.create_scalar_index(label, prop.name, "fulltext")
278                    except Exception:
279                        pass  # Index may already exist or not supported

Apply the generated schema to a database using SchemaBuilder.

Uses db.schema() for atomic schema application with additive-only semantics. Creates labels, edge types, properties, and indexes.

async def async_apply_to_database(self, db: AsyncDatabase) -> None:
281    async def async_apply_to_database(self, db: uni_db.AsyncDatabase) -> None:
282        """Apply the generated schema to an async database.
283
284        Async variant of apply_to_database using AsyncSchemaBuilder.
285        """
286        schema = self.generate()
287
288        # Build the full schema using AsyncSchemaBuilder, skipping existing labels/edge types
289        builder = db.schema()
290        has_changes = False
291
292        for label, label_schema in schema.labels.items():
293            if await db.label_exists(label):
294                continue
295            lb = builder.label(label)
296            for prop in label_schema.properties.values():
297                if prop.data_type.startswith("vector:"):
298                    dims = int(prop.data_type.split(":")[1])
299                    lb = lb.vector(prop.name, dims)
300                elif prop.nullable:
301                    lb = lb.property_nullable(prop.name, prop.data_type)
302                else:
303                    lb = lb.property(prop.name, prop.data_type)
304
305                if prop.index_type and prop.index_type in ("btree", "hash"):
306                    lb = lb.index(prop.name, prop.index_type)
307            builder = lb.done()
308            has_changes = True
309
310        for edge_type, edge_schema in schema.edge_types.items():
311            if await db.edge_type_exists(edge_type):
312                continue
313            eb = builder.edge_type(
314                edge_type, edge_schema.from_labels, edge_schema.to_labels
315            )
316            for prop in edge_schema.properties.values():
317                if prop.nullable:
318                    eb = eb.property_nullable(prop.name, prop.data_type)
319                else:
320                    eb = eb.property(prop.name, prop.data_type)
321            builder = eb.done()
322            has_changes = True
323
324        if has_changes:
325            await builder.apply()
326
327        # Create vector indexes separately
328        for label, label_schema in schema.labels.items():
329            for prop in label_schema.properties.values():
330                if prop.index_type == "vector":
331                    metric = prop.metric or "l2"
332                    try:
333                        await db.create_vector_index(label, prop.name, metric)
334                    except Exception:
335                        pass
336
337        # Create fulltext indexes separately
338        for label, label_schema in schema.labels.items():
339            for prop in label_schema.properties.values():
340                if prop.index_type == "fulltext":
341                    try:
342                        await db.create_scalar_index(label, prop.name, "fulltext")
343                    except Exception:
344                        pass

Apply the generated schema to an async database.

Async variant of apply_to_database using AsyncSchemaBuilder.

@dataclass
class DatabaseSchema:
52@dataclass
53class DatabaseSchema:
54    """Complete database schema generated from models."""
55
56    labels: dict[str, LabelSchema] = field(default_factory=dict)
57    edge_types: dict[str, EdgeTypeSchema] = field(default_factory=dict)

Complete database schema generated from models.

DatabaseSchema( labels: dict[str, LabelSchema] = <factory>, edge_types: dict[str, EdgeTypeSchema] = <factory>)
labels: dict[str, LabelSchema]
edge_types: dict[str, EdgeTypeSchema]
@dataclass
class LabelSchema:
34@dataclass
35class LabelSchema:
36    """Schema for a vertex label."""
37
38    name: str
39    properties: dict[str, PropertySchema] = field(default_factory=dict)

Schema for a vertex label.

LabelSchema( name: str, properties: dict[str, PropertySchema] = <factory>)
name: str
properties: dict[str, PropertySchema]
@dataclass
class EdgeTypeSchema:
42@dataclass
43class EdgeTypeSchema:
44    """Schema for an edge type."""
45
46    name: str
47    from_labels: list[str] = field(default_factory=list)
48    to_labels: list[str] = field(default_factory=list)
49    properties: dict[str, PropertySchema] = field(default_factory=dict)

Schema for an edge type.

EdgeTypeSchema( name: str, from_labels: list[str] = <factory>, to_labels: list[str] = <factory>, properties: dict[str, PropertySchema] = <factory>)
name: str
from_labels: list[str]
to_labels: list[str]
properties: dict[str, PropertySchema]
@dataclass
class PropertySchema:
21@dataclass
22class PropertySchema:
23    """Schema for a single property."""
24
25    name: str
26    data_type: str
27    nullable: bool = False
28    index_type: str | None = None
29    unique: bool = False
30    tokenizer: str | None = None
31    metric: str | None = None

Schema for a single property.

PropertySchema( name: str, data_type: str, nullable: bool = False, index_type: str | None = None, unique: bool = False, tokenizer: str | None = None, metric: str | None = None)
name: str
data_type: str
nullable: bool = False
index_type: str | None = None
unique: bool = False
tokenizer: str | None = None
metric: str | None = None
def generate_schema( *models: type[UniNode] | type[UniEdge]) -> DatabaseSchema:
347def generate_schema(*models: type[UniNode] | type[UniEdge]) -> DatabaseSchema:
348    """Generate a database schema from the given models."""
349    generator = SchemaGenerator()
350    generator.register(*models)
351    return generator.generate()

Generate a database schema from the given models.

class UniDatabase:
15class UniDatabase:
16    """
17    Thin wrapper around uni-db DatabaseBuilder for ergonomic database creation.
18
19    Example:
20        >>> db = UniDatabase.open("./path").cache_size(1024*1024).build()
21        >>> db = UniDatabase.temporary().build()
22        >>> db = UniDatabase.in_memory().build()
23    """
24
25    def __init__(self, builder: uni_db.DatabaseBuilder) -> None:
26        self._builder = builder
27
28    @classmethod
29    def open(cls, path: str) -> UniDatabase:
30        """Open or create a database at the given path."""
31        import uni_db
32
33        return cls(uni_db.DatabaseBuilder.open(path))
34
35    @classmethod
36    def create(cls, path: str) -> UniDatabase:
37        """Create a new database at the given path."""
38        import uni_db
39
40        return cls(uni_db.DatabaseBuilder.create(path))
41
42    @classmethod
43    def open_existing(cls, path: str) -> UniDatabase:
44        """Open an existing database (must already exist)."""
45        import uni_db
46
47        return cls(uni_db.DatabaseBuilder.open_existing(path))
48
49    @classmethod
50    def temporary(cls) -> UniDatabase:
51        """Create an ephemeral in-memory database."""
52        import uni_db
53
54        return cls(uni_db.DatabaseBuilder.temporary())
55
56    @classmethod
57    def in_memory(cls) -> UniDatabase:
58        """Create a persistent in-memory database."""
59        import uni_db
60
61        return cls(uni_db.DatabaseBuilder.in_memory())
62
63    def cache_size(self, bytes_: int) -> UniDatabase:
64        """Set the cache size in bytes."""
65        self._builder = self._builder.cache_size(bytes_)
66        return self
67
68    def parallelism(self, n: int) -> UniDatabase:
69        """Set the parallelism level."""
70        self._builder = self._builder.parallelism(n)
71        return self
72
73    def build(self) -> uni_db.Database:
74        """Build and return the database instance."""
75        return self._builder.build()

Thin wrapper around uni-db DatabaseBuilder for ergonomic database creation.

Example:

db = UniDatabase.open("./path").cache_size(1024*1024).build() db = UniDatabase.temporary().build() db = UniDatabase.in_memory().build()

UniDatabase(builder: DatabaseBuilder)
25    def __init__(self, builder: uni_db.DatabaseBuilder) -> None:
26        self._builder = builder
@classmethod
def open(cls, path: str) -> UniDatabase:
28    @classmethod
29    def open(cls, path: str) -> UniDatabase:
30        """Open or create a database at the given path."""
31        import uni_db
32
33        return cls(uni_db.DatabaseBuilder.open(path))

Open or create a database at the given path.

@classmethod
def create(cls, path: str) -> UniDatabase:
35    @classmethod
36    def create(cls, path: str) -> UniDatabase:
37        """Create a new database at the given path."""
38        import uni_db
39
40        return cls(uni_db.DatabaseBuilder.create(path))

Create a new database at the given path.

@classmethod
def open_existing(cls, path: str) -> UniDatabase:
42    @classmethod
43    def open_existing(cls, path: str) -> UniDatabase:
44        """Open an existing database (must already exist)."""
45        import uni_db
46
47        return cls(uni_db.DatabaseBuilder.open_existing(path))

Open an existing database (must already exist).

@classmethod
def temporary(cls) -> UniDatabase:
49    @classmethod
50    def temporary(cls) -> UniDatabase:
51        """Create an ephemeral in-memory database."""
52        import uni_db
53
54        return cls(uni_db.DatabaseBuilder.temporary())

Create an ephemeral in-memory database.

@classmethod
def in_memory(cls) -> UniDatabase:
56    @classmethod
57    def in_memory(cls) -> UniDatabase:
58        """Create a persistent in-memory database."""
59        import uni_db
60
61        return cls(uni_db.DatabaseBuilder.in_memory())

Create a persistent in-memory database.

def cache_size(self, bytes_: int) -> UniDatabase:
63    def cache_size(self, bytes_: int) -> UniDatabase:
64        """Set the cache size in bytes."""
65        self._builder = self._builder.cache_size(bytes_)
66        return self

Set the cache size in bytes.

def parallelism(self, n: int) -> UniDatabase:
68    def parallelism(self, n: int) -> UniDatabase:
69        """Set the parallelism level."""
70        self._builder = self._builder.parallelism(n)
71        return self

Set the parallelism level.

def build(self) -> Database:
73    def build(self) -> uni_db.Database:
74        """Build and return the database instance."""
75        return self._builder.build()

Build and return the database instance.

class AsyncUniDatabase:
 78class AsyncUniDatabase:
 79    """
 80    Thin wrapper around uni-db AsyncDatabaseBuilder for ergonomic async database creation.
 81
 82    Example:
 83        >>> db = await AsyncUniDatabase.open("./path").build()
 84        >>> db = await AsyncUniDatabase.temporary().build()
 85    """
 86
 87    def __init__(self, builder: uni_db.AsyncDatabaseBuilder) -> None:
 88        self._builder = builder
 89
 90    @classmethod
 91    def open(cls, path: str) -> AsyncUniDatabase:
 92        """Open or create a database at the given path."""
 93        import uni_db
 94
 95        return cls(uni_db.AsyncDatabaseBuilder.open(path))
 96
 97    @classmethod
 98    def temporary(cls) -> AsyncUniDatabase:
 99        """Create an ephemeral in-memory database."""
100        import uni_db
101
102        return cls(uni_db.AsyncDatabaseBuilder.temporary())
103
104    @classmethod
105    def in_memory(cls) -> AsyncUniDatabase:
106        """Create a persistent in-memory database."""
107        import uni_db
108
109        return cls(uni_db.AsyncDatabaseBuilder.in_memory())
110
111    def cache_size(self, bytes_: int) -> AsyncUniDatabase:
112        """Set the cache size in bytes."""
113        self._builder = self._builder.cache_size(bytes_)
114        return self
115
116    def parallelism(self, n: int) -> AsyncUniDatabase:
117        """Set the parallelism level."""
118        self._builder = self._builder.parallelism(n)
119        return self
120
121    async def build(self) -> uni_db.AsyncDatabase:
122        """Build and return the async database instance."""
123        return await self._builder.build()

Thin wrapper around uni-db AsyncDatabaseBuilder for ergonomic async database creation.

Example:

db = await AsyncUniDatabase.open("./path").build() db = await AsyncUniDatabase.temporary().build()

AsyncUniDatabase(builder: AsyncDatabaseBuilder)
87    def __init__(self, builder: uni_db.AsyncDatabaseBuilder) -> None:
88        self._builder = builder
@classmethod
def open(cls, path: str) -> AsyncUniDatabase:
90    @classmethod
91    def open(cls, path: str) -> AsyncUniDatabase:
92        """Open or create a database at the given path."""
93        import uni_db
94
95        return cls(uni_db.AsyncDatabaseBuilder.open(path))

Open or create a database at the given path.

@classmethod
def temporary(cls) -> AsyncUniDatabase:
 97    @classmethod
 98    def temporary(cls) -> AsyncUniDatabase:
 99        """Create an ephemeral in-memory database."""
100        import uni_db
101
102        return cls(uni_db.AsyncDatabaseBuilder.temporary())

Create an ephemeral in-memory database.

@classmethod
def in_memory(cls) -> AsyncUniDatabase:
104    @classmethod
105    def in_memory(cls) -> AsyncUniDatabase:
106        """Create a persistent in-memory database."""
107        import uni_db
108
109        return cls(uni_db.AsyncDatabaseBuilder.in_memory())

Create a persistent in-memory database.

def cache_size(self, bytes_: int) -> AsyncUniDatabase:
111    def cache_size(self, bytes_: int) -> AsyncUniDatabase:
112        """Set the cache size in bytes."""
113        self._builder = self._builder.cache_size(bytes_)
114        return self

Set the cache size in bytes.

def parallelism(self, n: int) -> AsyncUniDatabase:
116    def parallelism(self, n: int) -> AsyncUniDatabase:
117        """Set the parallelism level."""
118        self._builder = self._builder.parallelism(n)
119        return self

Set the parallelism level.

async def build(self) -> AsyncDatabase:
121    async def build(self) -> uni_db.AsyncDatabase:
122        """Build and return the async database instance."""
123        return await self._builder.build()

Build and return the async database instance.

def before_create(func: ~F) -> ~F:
40def before_create(func: F) -> F:
41    """
42    Mark a method to be called before the entity is created in the database.
43
44    The method is called after validation but before the INSERT operation.
45    Useful for setting timestamps, generating IDs, or final validation.
46
47    Example:
48        >>> class Person(UniNode):
49        ...     name: str
50        ...     created_at: datetime | None = None
51        ...
52        ...     @before_create
53        ...     def set_created_at(self):
54        ...         self.created_at = datetime.now()
55    """
56    return _mark_hook(_BEFORE_CREATE)(func)

Mark a method to be called before the entity is created in the database.

The method is called after validation but before the INSERT operation. Useful for setting timestamps, generating IDs, or final validation.

Example:

class Person(UniNode): ... name: str ... created_at: datetime | None = None ... ... @before_create ... def set_created_at(self): ... self.created_at = datetime.now()

def after_create(func: ~F) -> ~F:
59def after_create(func: F) -> F:
60    """
61    Mark a method to be called after the entity is created in the database.
62
63    The method is called after the INSERT operation completes successfully.
64    The entity will have its vid/eid assigned at this point.
65
66    Example:
67        >>> class Person(UniNode):
68        ...     name: str
69        ...
70        ...     @after_create
71        ...     def log_creation(self):
72        ...         logger.info(f"Created person {self.name} with vid={self.vid}")
73    """
74    return _mark_hook(_AFTER_CREATE)(func)

Mark a method to be called after the entity is created in the database.

The method is called after the INSERT operation completes successfully. The entity will have its vid/eid assigned at this point.

Example:

class Person(UniNode): ... name: str ... ... @after_create ... def log_creation(self): ... logger.info(f"Created person {self.name} with vid={self.vid}")

def before_update(func: ~F) -> ~F:
77def before_update(func: F) -> F:
78    """
79    Mark a method to be called before the entity is updated in the database.
80
81    The method is called before the UPDATE operation.
82    Useful for validation or updating timestamps.
83
84    Example:
85        >>> class Person(UniNode):
86        ...     name: str
87        ...     updated_at: datetime | None = None
88        ...
89        ...     @before_update
90        ...     def validate_and_timestamp(self):
91        ...         if not self.name:
92        ...             raise ValueError("Name cannot be empty")
93        ...         self.updated_at = datetime.now()
94    """
95    return _mark_hook(_BEFORE_UPDATE)(func)

Mark a method to be called before the entity is updated in the database.

The method is called before the UPDATE operation. Useful for validation or updating timestamps.

Example:

class Person(UniNode): ... name: str ... updated_at: datetime | None = None ... ... @before_update ... def validate_and_timestamp(self): ... if not self.name: ... raise ValueError("Name cannot be empty") ... self.updated_at = datetime.now()

def after_update(func: ~F) -> ~F:
 98def after_update(func: F) -> F:
 99    """
100    Mark a method to be called after the entity is updated in the database.
101
102    The method is called after the UPDATE operation completes successfully.
103
104    Example:
105        >>> class Person(UniNode):
106        ...     name: str
107        ...
108        ...     @after_update
109        ...     def notify_change(self):
110        ...         events.emit("person_updated", self.vid)
111    """
112    return _mark_hook(_AFTER_UPDATE)(func)

Mark a method to be called after the entity is updated in the database.

The method is called after the UPDATE operation completes successfully.

Example:

class Person(UniNode): ... name: str ... ... @after_update ... def notify_change(self): ... events.emit("person_updated", self.vid)

def before_delete(func: ~F) -> ~F:
115def before_delete(func: F) -> F:
116    """
117    Mark a method to be called before the entity is deleted from the database.
118
119    The method is called before the DELETE operation.
120    Useful for cleanup or validation.
121
122    Example:
123        >>> class Person(UniNode):
124        ...     name: str
125        ...
126        ...     @before_delete
127        ...     def cleanup(self):
128        ...         # Remove related data
129        ...         pass
130    """
131    return _mark_hook(_BEFORE_DELETE)(func)

Mark a method to be called before the entity is deleted from the database.

The method is called before the DELETE operation. Useful for cleanup or validation.

Example:

class Person(UniNode): ... name: str ... ... @before_delete ... def cleanup(self): ... # Remove related data ... pass

def after_delete(func: ~F) -> ~F:
134def after_delete(func: F) -> F:
135    """
136    Mark a method to be called after the entity is deleted from the database.
137
138    The method is called after the DELETE operation completes successfully.
139    The entity's vid/eid will be cleared at this point.
140
141    Example:
142        >>> class Person(UniNode):
143        ...     name: str
144        ...
145        ...     @after_delete
146        ...     def log_deletion(self):
147        ...         logger.info(f"Deleted person {self.name}")
148    """
149    return _mark_hook(_AFTER_DELETE)(func)

Mark a method to be called after the entity is deleted from the database.

The method is called after the DELETE operation completes successfully. The entity's vid/eid will be cleared at this point.

Example:

class Person(UniNode): ... name: str ... ... @after_delete ... def log_deletion(self): ... logger.info(f"Deleted person {self.name}")

def before_load(func: ~F) -> ~F:
152def before_load(func: F) -> F:
153    """
154    Mark a method to be called before the entity is loaded from the database.
155
156    This is a class method that receives the raw property dictionary.
157    Can be used to transform data before model instantiation.
158
159    Example:
160        >>> class Person(UniNode):
161        ...     name: str
162        ...
163        ...     @classmethod
164        ...     @before_load
165        ...     def transform_data(cls, props: dict) -> dict:
166        ...         # Normalize name
167        ...         if 'name' in props:
168        ...             props['name'] = props['name'].strip()
169        ...         return props
170    """
171    return _mark_hook(_BEFORE_LOAD)(func)

Mark a method to be called before the entity is loaded from the database.

This is a class method that receives the raw property dictionary. Can be used to transform data before model instantiation.

Example:

class Person(UniNode): ... name: str ... ... @classmethod ... @before_load ... def transform_data(cls, props: dict) -> dict: ... # Normalize name ... if 'name' in props: ... props['name'] = props['name'].strip() ... return props

def after_load(func: ~F) -> ~F:
174def after_load(func: F) -> F:
175    """
176    Mark a method to be called after the entity is loaded from the database.
177
178    The method is called after the entity is instantiated from database data.
179    Useful for computing derived values or initializing non-persisted state.
180
181    Example:
182        >>> class Person(UniNode):
183        ...     first_name: str
184        ...     last_name: str
185        ...     full_name: str | None = None
186        ...
187        ...     @after_load
188        ...     def compute_full_name(self):
189        ...         self.full_name = f"{self.first_name} {self.last_name}"
190    """
191    return _mark_hook(_AFTER_LOAD)(func)

Mark a method to be called after the entity is loaded from the database.

The method is called after the entity is instantiated from database data. Useful for computing derived values or initializing non-persisted state.

Example:

class Person(UniNode): ... first_name: str ... last_name: str ... full_name: str | None = None ... ... @after_load ... def compute_full_name(self): ... self.full_name = f"{self.first_name} {self.last_name}"

class UniPydanticError(builtins.Exception):
15class UniPydanticError(Exception):
16    """Base exception for all uni-pydantic errors."""

Base exception for all uni-pydantic errors.

class SchemaError(uni_pydantic.UniPydanticError):
19class SchemaError(UniPydanticError):
20    """Error related to schema definition or generation."""
21
22    def __init__(self, message: str, model: type | None = None) -> None:
23        self.model = model
24        super().__init__(message)

Error related to schema definition or generation.

SchemaError(message: str, model: type | None = None)
22    def __init__(self, message: str, model: type | None = None) -> None:
23        self.model = model
24        super().__init__(message)
model
class TypeMappingError(uni_pydantic.SchemaError):
27class TypeMappingError(SchemaError):
28    """Error mapping Python type to Uni DataType."""
29
30    def __init__(self, python_type: Any, message: str | None = None) -> None:
31        self.python_type = python_type
32        msg = message or f"Cannot map Python type {python_type!r} to Uni DataType"
33        super().__init__(msg)

Error mapping Python type to Uni DataType.

TypeMappingError(python_type: Any, message: str | None = None)
30    def __init__(self, python_type: Any, message: str | None = None) -> None:
31        self.python_type = python_type
32        msg = message or f"Cannot map Python type {python_type!r} to Uni DataType"
33        super().__init__(msg)
python_type
class ValidationError(uni_pydantic.UniPydanticError):
36class ValidationError(UniPydanticError):
37    """Validation error for model instances."""

Validation error for model instances.

class SessionError(uni_pydantic.UniPydanticError):
40class SessionError(UniPydanticError):
41    """Error related to session operations."""

Error related to session operations.

class NotRegisteredError(uni_pydantic.SessionError):
44class NotRegisteredError(SessionError):
45    """Model type not registered with session."""
46
47    def __init__(self, model: type[UniNode] | type[UniEdge]) -> None:
48        self.model = model
49        super().__init__(
50            f"Model {model.__name__!r} is not registered with this session. "
51            f"Call session.register({model.__name__}) first."
52        )

Model type not registered with session.

NotRegisteredError( model: type[UniNode] | type[UniEdge])
47    def __init__(self, model: type[UniNode] | type[UniEdge]) -> None:
48        self.model = model
49        super().__init__(
50            f"Model {model.__name__!r} is not registered with this session. "
51            f"Call session.register({model.__name__}) first."
52        )
model
class NotPersisted(uni_pydantic.SessionError):
55class NotPersisted(SessionError):
56    """Operation requires a persisted entity."""
57
58    def __init__(self, entity: UniNode | UniEdge) -> None:
59        self.entity = entity
60        super().__init__(
61            f"Entity {entity!r} is not persisted. Call session.add() and commit() first."
62        )

Operation requires a persisted entity.

NotPersisted(entity: UniNode | UniEdge)
58    def __init__(self, entity: UniNode | UniEdge) -> None:
59        self.entity = entity
60        super().__init__(
61            f"Entity {entity!r} is not persisted. Call session.add() and commit() first."
62        )
entity
class NotTrackedError(uni_pydantic.SessionError):
65class NotTrackedError(SessionError):
66    """Entity is not tracked by this session."""

Entity is not tracked by this session.

class TransactionError(uni_pydantic.SessionError):
69class TransactionError(SessionError):
70    """Error related to transaction operations."""

Error related to transaction operations.

class QueryError(uni_pydantic.UniPydanticError):
73class QueryError(UniPydanticError):
74    """Error executing a query."""

Error executing a query.

class RelationshipError(uni_pydantic.UniPydanticError):
77class RelationshipError(UniPydanticError):
78    """Error related to relationship operations."""

Error related to relationship operations.

class LazyLoadError(uni_pydantic.RelationshipError):
81class LazyLoadError(RelationshipError):
82    """Error lazy-loading a relationship."""
83
84    def __init__(self, field_name: str, reason: str) -> None:
85        self.field_name = field_name
86        super().__init__(f"Cannot lazy-load relationship '{field_name}': {reason}")

Error lazy-loading a relationship.

LazyLoadError(field_name: str, reason: str)
84    def __init__(self, field_name: str, reason: str) -> None:
85        self.field_name = field_name
86        super().__init__(f"Cannot lazy-load relationship '{field_name}': {reason}")
field_name
class BulkLoadError(uni_pydantic.UniPydanticError):
89class BulkLoadError(UniPydanticError):
90    """Error during bulk loading operations."""

Error during bulk loading operations.

class CypherInjectionError(uni_pydantic.QueryError):
93class CypherInjectionError(QueryError):
94    """Property name validation failure — potential Cypher injection."""
95
96    def __init__(self, name: str, reason: str | None = None) -> None:
97        self.name = name
98        msg = reason or f"Invalid property name {name!r}: possible Cypher injection"
99        super().__init__(msg)

Property name validation failure — potential Cypher injection.

CypherInjectionError(name: str, reason: str | None = None)
96    def __init__(self, name: str, reason: str | None = None) -> None:
97        self.name = name
98        msg = reason or f"Invalid property name {name!r}: possible Cypher injection"
99        super().__init__(msg)
name