uni_pydantic
uni-pydantic: Pydantic-based OGM for Uni Graph Database.
This package provides a type-safe Object-Graph Mapping layer on top of the Uni graph database, using Pydantic v2 for model definitions.
Example:
from uni_db import Database from uni_pydantic import UniNode, UniSession, Field, Relationship, Vector
class Person(UniNode): ... name: str ... age: int | None = None ... email: str = Field(unique=True) ... embedding: Vector[1536] ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both")
db = Database("./my_graph") session = UniSession(db) session.register(Person) session.sync_schema()
alice = Person(name="Alice", age=30, email="alice@example.com") session.add(alice) session.commit()
Query with type safety
adults = session.query(Person).filter(Person.age >= 18).all()
1# SPDX-License-Identifier: Apache-2.0 2# Copyright 2024-2026 Dragonscale Team 3 4""" 5uni-pydantic: Pydantic-based OGM for Uni Graph Database. 6 7This package provides a type-safe Object-Graph Mapping layer on top of 8the Uni graph database, using Pydantic v2 for model definitions. 9 10Example: 11 >>> from uni_db import Database 12 >>> from uni_pydantic import UniNode, UniSession, Field, Relationship, Vector 13 >>> 14 >>> class Person(UniNode): 15 ... name: str 16 ... age: int | None = None 17 ... email: str = Field(unique=True) 18 ... embedding: Vector[1536] 19 ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both") 20 >>> 21 >>> db = Database("./my_graph") 22 >>> session = UniSession(db) 23 >>> session.register(Person) 24 >>> session.sync_schema() 25 >>> 26 >>> alice = Person(name="Alice", age=30, email="alice@example.com") 27 >>> session.add(alice) 28 >>> session.commit() 29 >>> 30 >>> # Query with type safety 31 >>> adults = session.query(Person).filter(Person.age >= 18).all() 32""" 33 34__version__ = "0.1.0" 35 36# Base classes 37# Async support 38from .async_query import AsyncQueryBuilder 39from .async_session import AsyncUniSession, AsyncUniTransaction 40from .base import UniEdge, UniNode 41 42# Database wrappers 43from .database import AsyncUniDatabase, UniDatabase 44 45# Exceptions 46from .exceptions import ( 47 BulkLoadError, 48 CypherInjectionError, 49 LazyLoadError, 50 NotPersisted, 51 NotRegisteredError, 52 NotTrackedError, 53 QueryError, 54 RelationshipError, 55 SchemaError, 56 SessionError, 57 TransactionError, 58 TypeMappingError, 59 UniPydanticError, 60 ValidationError, 61) 62 63# Field configuration 64from .fields import ( 65 Direction, 66 Field, 67 FieldConfig, 68 IndexType, 69 Relationship, 70 RelationshipConfig, 71 RelationshipDescriptor, 72 VectorMetric, 73 get_field_config, 74) 75 76# Lifecycle hooks 77from .hooks import ( 78 after_create, 79 after_delete, 80 after_load, 81 after_update, 82 before_create, 83 before_delete, 84 before_load, 85 before_update, 86) 87 88# Query builder 89from .query import ( 90 FilterExpr, 91 FilterOp, 92 ModelProxy, 93 OrderByClause, 94 PropertyProxy, 95 QueryBuilder, 96 TraversalStep, 97 VectorSearchConfig, 98) 99 100# Schema generation 101from .schema import ( 102 DatabaseSchema, 103 EdgeTypeSchema, 104 LabelSchema, 105 PropertySchema, 106 SchemaGenerator, 107 generate_schema, 108) 109 110# Session management 111from .session import UniSession, UniTransaction 112 113# Type utilities 114from .types import ( 115 DATETIME_TYPES, 116 Vector, 117 db_to_python_value, 118 get_vector_dimensions, 119 is_list_type, 120 is_optional, 121 python_to_db_value, 122 python_type_to_uni, 123 uni_to_python_type, 124 unwrap_annotated, 125) 126 127__all__ = [ 128 # Version 129 "__version__", 130 # Base classes 131 "UniNode", 132 "UniEdge", 133 # Session 134 "UniSession", 135 "UniTransaction", 136 # Async Session 137 "AsyncUniSession", 138 "AsyncUniTransaction", 139 # Fields 140 "Field", 141 "FieldConfig", 142 "Relationship", 143 "RelationshipConfig", 144 "RelationshipDescriptor", 145 "get_field_config", 146 "IndexType", 147 "Direction", 148 "VectorMetric", 149 # Types 150 "Vector", 151 "python_type_to_uni", 152 "uni_to_python_type", 153 "get_vector_dimensions", 154 "is_optional", 155 "is_list_type", 156 "unwrap_annotated", 157 "python_to_db_value", 158 "db_to_python_value", 159 "DATETIME_TYPES", 160 # Query 161 "QueryBuilder", 162 "AsyncQueryBuilder", 163 "FilterExpr", 164 "FilterOp", 165 "PropertyProxy", 166 "ModelProxy", 167 "OrderByClause", 168 "TraversalStep", 169 "VectorSearchConfig", 170 # Schema 171 "SchemaGenerator", 172 "DatabaseSchema", 173 "LabelSchema", 174 "EdgeTypeSchema", 175 "PropertySchema", 176 "generate_schema", 177 # Database 178 "UniDatabase", 179 "AsyncUniDatabase", 180 # Hooks 181 "before_create", 182 "after_create", 183 "before_update", 184 "after_update", 185 "before_delete", 186 "after_delete", 187 "before_load", 188 "after_load", 189 # Exceptions 190 "UniPydanticError", 191 "SchemaError", 192 "TypeMappingError", 193 "ValidationError", 194 "SessionError", 195 "NotRegisteredError", 196 "NotPersisted", 197 "NotTrackedError", 198 "TransactionError", 199 "QueryError", 200 "RelationshipError", 201 "LazyLoadError", 202 "BulkLoadError", 203 "CypherInjectionError", 204]
159class UniNode(BaseModel, metaclass=UniModelMeta): 160 """ 161 Base class for graph node models. 162 163 Subclass this to define your node types. Each UniNode subclass 164 represents a vertex label in the graph database. 165 166 Attributes: 167 __label__: The vertex label name. Defaults to the class name. 168 __relationships__: Dictionary of relationship configurations. 169 170 Private Attributes: 171 _vid: The vertex ID assigned by the database. 172 _uid: The unique identifier (content-addressed hash). 173 _session: Reference to the owning session. 174 _dirty: Set of modified field names. 175 176 Example: 177 >>> class Person(UniNode): 178 ... __label__ = "Person" 179 ... 180 ... name: str 181 ... age: int | None = None 182 ... email: str = Field(unique=True) 183 ... 184 ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both") 185 """ 186 187 model_config = ConfigDict( 188 # Allow extra fields for future extensibility 189 extra="forbid", 190 # Validate on assignment for dirty tracking 191 validate_assignment=True, 192 # Allow arbitrary types (for Vector, etc.) 193 arbitrary_types_allowed=True, 194 # Use enum values 195 use_enum_values=True, 196 ) 197 198 # Class-level configuration 199 __label__: ClassVar[str] = "" 200 __relationships__: ClassVar[dict[str, RelationshipConfig]] = {} 201 202 # Private attributes for session tracking 203 _vid: int | None = PrivateAttr(default=None) 204 _uid: str | None = PrivateAttr(default=None) 205 _session: UniSession | None = PrivateAttr(default=None) 206 _dirty: set[str] = PrivateAttr(default_factory=set) 207 _is_new: bool = PrivateAttr(default=True) 208 209 def __init_subclass__(cls, **kwargs: Any) -> None: 210 super().__init_subclass__(**kwargs) 211 # Set default label to class name if not specified 212 if not cls.__label__: 213 cls.__label__ = cls.__name__ 214 215 def model_post_init(self, __context: Any) -> None: 216 """Clear dirty tracking after construction.""" 217 super().model_post_init(__context) 218 self._dirty = set() 219 220 @property 221 def vid(self) -> int | None: 222 """The vertex ID assigned by the database.""" 223 return self._vid 224 225 @property 226 def uid(self) -> str | None: 227 """The unique identifier (content-addressed hash).""" 228 return self._uid 229 230 @property 231 def is_persisted(self) -> bool: 232 """Whether this node has been saved to the database.""" 233 return self._vid is not None 234 235 @property 236 def is_dirty(self) -> bool: 237 """Whether this node has unsaved changes.""" 238 return bool(self._dirty) 239 240 def __setattr__(self, name: str, value: Any) -> None: 241 # Track dirty fields (but not private attributes) 242 if not name.startswith("_") and hasattr(self, "_dirty"): 243 self._dirty.add(name) 244 super().__setattr__(name, value) 245 246 def _mark_clean(self) -> None: 247 """Mark all fields as clean (called after commit).""" 248 self._dirty.clear() 249 self._is_new = False 250 251 def _attach_session( 252 self, session: UniSession, vid: int, uid: str | None = None 253 ) -> None: 254 """Attach this node to a session with its database IDs.""" 255 self._session = session 256 self._vid = vid 257 self._uid = uid 258 self._is_new = False 259 260 @classmethod 261 def get_property_fields(cls) -> dict[str, FieldInfo]: 262 """Get all property fields (excluding relationships).""" 263 return { 264 name: info 265 for name, info in cls.model_fields.items() 266 if name not in cls.__relationships__ 267 } 268 269 @classmethod 270 def get_relationship_fields(cls) -> dict[str, RelationshipConfig]: 271 """Get all relationship field configurations.""" 272 return cls.__relationships__ 273 274 def to_properties(self) -> dict[str, Any]: 275 """Convert to a property dictionary for database storage. 276 277 Uses python_to_db_value() for type conversion. Includes None 278 explicitly so null-outs work. 279 """ 280 return _model_to_properties(self, self.get_property_fields()) 281 282 @classmethod 283 def from_properties( 284 cls, 285 props: dict[str, Any], 286 *, 287 vid: int | None = None, 288 uid: str | None = None, 289 session: UniSession | None = None, 290 ) -> UniNode: 291 """Create an instance from a property dictionary. 292 293 Accepts _id (string->int vid) and _label from uni-db node dicts. 294 Does not mutate the input dict. 295 """ 296 data = dict(props) 297 298 raw_id = data.pop("_id", None) 299 if raw_id is not None and vid is None: 300 vid = int(raw_id) if not isinstance(raw_id, int) else raw_id 301 data.pop("_label", None) 302 303 converted = _convert_db_values(data, cls) 304 305 instance = cls.model_validate(converted) 306 if vid is not None: 307 instance._vid = vid 308 if uid is not None: 309 instance._uid = uid 310 if session is not None: 311 instance._session = session 312 instance._is_new = vid is None 313 instance._dirty = set() 314 return instance 315 316 def __repr__(self) -> str: 317 vid_str = f"vid={self._vid}" if self._vid else "unsaved" 318 return f"{self.__class__.__name__}({vid_str}, {super().__repr__()})"
Base class for graph node models.
Subclass this to define your node types. Each UniNode subclass represents a vertex label in the graph database.
Attributes: __label__: The vertex label name. Defaults to the class name. __relationships__: Dictionary of relationship configurations.
Private Attributes: _vid: The vertex ID assigned by the database. _uid: The unique identifier (content-addressed hash). _session: Reference to the owning session. _dirty: Set of modified field names.
Example:
class Person(UniNode): ... __label__ = "Person" ... ... name: str ... age: int | None = None ... email: str = Field(unique=True) ... ... friends: list["Person"] = Relationship("FRIEND_OF", direction="both")
220 @property 221 def vid(self) -> int | None: 222 """The vertex ID assigned by the database.""" 223 return self._vid
The vertex ID assigned by the database.
225 @property 226 def uid(self) -> str | None: 227 """The unique identifier (content-addressed hash).""" 228 return self._uid
The unique identifier (content-addressed hash).
230 @property 231 def is_persisted(self) -> bool: 232 """Whether this node has been saved to the database.""" 233 return self._vid is not None
Whether this node has been saved to the database.
235 @property 236 def is_dirty(self) -> bool: 237 """Whether this node has unsaved changes.""" 238 return bool(self._dirty)
Whether this node has unsaved changes.
260 @classmethod 261 def get_property_fields(cls) -> dict[str, FieldInfo]: 262 """Get all property fields (excluding relationships).""" 263 return { 264 name: info 265 for name, info in cls.model_fields.items() 266 if name not in cls.__relationships__ 267 }
Get all property fields (excluding relationships).
269 @classmethod 270 def get_relationship_fields(cls) -> dict[str, RelationshipConfig]: 271 """Get all relationship field configurations.""" 272 return cls.__relationships__
Get all relationship field configurations.
274 def to_properties(self) -> dict[str, Any]: 275 """Convert to a property dictionary for database storage. 276 277 Uses python_to_db_value() for type conversion. Includes None 278 explicitly so null-outs work. 279 """ 280 return _model_to_properties(self, self.get_property_fields())
Convert to a property dictionary for database storage.
Uses python_to_db_value() for type conversion. Includes None explicitly so null-outs work.
282 @classmethod 283 def from_properties( 284 cls, 285 props: dict[str, Any], 286 *, 287 vid: int | None = None, 288 uid: str | None = None, 289 session: UniSession | None = None, 290 ) -> UniNode: 291 """Create an instance from a property dictionary. 292 293 Accepts _id (string->int vid) and _label from uni-db node dicts. 294 Does not mutate the input dict. 295 """ 296 data = dict(props) 297 298 raw_id = data.pop("_id", None) 299 if raw_id is not None and vid is None: 300 vid = int(raw_id) if not isinstance(raw_id, int) else raw_id 301 data.pop("_label", None) 302 303 converted = _convert_db_values(data, cls) 304 305 instance = cls.model_validate(converted) 306 if vid is not None: 307 instance._vid = vid 308 if uid is not None: 309 instance._uid = uid 310 if session is not None: 311 instance._session = session 312 instance._is_new = vid is None 313 instance._dirty = set() 314 return instance
Create an instance from a property dictionary.
Accepts _id (string->int vid) and _label from uni-db node dicts. Does not mutate the input dict.
321class UniEdge(BaseModel, metaclass=UniModelMeta): 322 """ 323 Base class for graph edge models with properties. 324 325 Subclass this to define edge types with typed properties. 326 Edges represent relationships between nodes. 327 328 Attributes: 329 __edge_type__: The edge type name. 330 __from__: The source node type(s). 331 __to__: The target node type(s). 332 333 Private Attributes: 334 _eid: The edge ID assigned by the database. 335 _src_vid: The source vertex ID. 336 _dst_vid: The destination vertex ID. 337 _session: Reference to the owning session. 338 339 Example: 340 >>> class FriendshipEdge(UniEdge): 341 ... __edge_type__ = "FRIEND_OF" 342 ... __from__ = Person 343 ... __to__ = Person 344 ... 345 ... since: date 346 ... strength: float = 1.0 347 """ 348 349 model_config = ConfigDict( 350 extra="forbid", 351 validate_assignment=True, 352 arbitrary_types_allowed=True, 353 use_enum_values=True, 354 ) 355 356 # Class-level configuration 357 __edge_type__: ClassVar[str] = "" 358 __from__: ClassVar[type[UniNode] | tuple[type[UniNode], ...] | None] = None 359 __to__: ClassVar[type[UniNode] | tuple[type[UniNode], ...] | None] = None 360 __relationships__: ClassVar[dict[str, RelationshipConfig]] = {} 361 362 # Private attributes 363 _eid: int | None = PrivateAttr(default=None) 364 _src_vid: int | None = PrivateAttr(default=None) 365 _dst_vid: int | None = PrivateAttr(default=None) 366 _session: UniSession | None = PrivateAttr(default=None) 367 _is_new: bool = PrivateAttr(default=True) 368 369 def __init_subclass__(cls, **kwargs: Any) -> None: 370 super().__init_subclass__(**kwargs) 371 # Set default edge type to class name if not specified 372 if not cls.__edge_type__: 373 cls.__edge_type__ = cls.__name__ 374 375 @property 376 def eid(self) -> int | None: 377 """The edge ID assigned by the database.""" 378 return self._eid 379 380 @property 381 def src_vid(self) -> int | None: 382 """The source vertex ID.""" 383 return self._src_vid 384 385 @property 386 def dst_vid(self) -> int | None: 387 """The destination vertex ID.""" 388 return self._dst_vid 389 390 @property 391 def is_persisted(self) -> bool: 392 """Whether this edge has been saved to the database.""" 393 return self._eid is not None 394 395 def _attach( 396 self, 397 session: UniSession, 398 eid: int, 399 src_vid: int, 400 dst_vid: int, 401 ) -> None: 402 """Attach this edge to a session with its database IDs.""" 403 self._session = session 404 self._eid = eid 405 self._src_vid = src_vid 406 self._dst_vid = dst_vid 407 self._is_new = False 408 409 @classmethod 410 def get_from_labels(cls) -> list[str]: 411 """Get the source label names.""" 412 if cls.__from__ is None: 413 return [] 414 if isinstance(cls.__from__, tuple): 415 return [n.__label__ for n in cls.__from__] 416 return [cls.__from__.__label__] 417 418 @classmethod 419 def get_to_labels(cls) -> list[str]: 420 """Get the target label names.""" 421 if cls.__to__ is None: 422 return [] 423 if isinstance(cls.__to__, tuple): 424 return [n.__label__ for n in cls.__to__] 425 return [cls.__to__.__label__] 426 427 @classmethod 428 def get_property_fields(cls) -> dict[str, FieldInfo]: 429 """Get all property fields.""" 430 return dict(cls.model_fields) 431 432 def to_properties(self) -> dict[str, Any]: 433 """Convert to a property dictionary for database storage.""" 434 return _model_to_properties(self, self.get_property_fields()) 435 436 @classmethod 437 def from_properties( 438 cls, 439 props: dict[str, Any], 440 *, 441 eid: int | None = None, 442 src_vid: int | None = None, 443 dst_vid: int | None = None, 444 session: UniSession | None = None, 445 ) -> UniEdge: 446 """Create an instance from a property dictionary. 447 448 Accepts _id, _type, _src, _dst from uni-db edge dicts. 449 Does not mutate the input dict. 450 """ 451 data = dict(props) 452 453 raw_id = data.pop("_id", None) 454 if raw_id is not None and eid is None: 455 eid = int(raw_id) if not isinstance(raw_id, int) else raw_id 456 data.pop("_type", None) 457 raw_src = data.pop("_src", None) 458 if raw_src is not None and src_vid is None: 459 src_vid = int(raw_src) if not isinstance(raw_src, int) else raw_src 460 raw_dst = data.pop("_dst", None) 461 if raw_dst is not None and dst_vid is None: 462 dst_vid = int(raw_dst) if not isinstance(raw_dst, int) else raw_dst 463 464 converted = _convert_db_values(data, cls) 465 466 instance = cls.model_validate(converted) 467 if eid is not None: 468 instance._eid = eid 469 if src_vid is not None: 470 instance._src_vid = src_vid 471 if dst_vid is not None: 472 instance._dst_vid = dst_vid 473 if session is not None: 474 instance._session = session 475 instance._is_new = eid is None 476 return instance 477 478 @classmethod 479 def from_edge_result( 480 cls, 481 data: dict[str, Any], 482 *, 483 session: UniSession | None = None, 484 ) -> UniEdge: 485 """Create an instance from a uni-db edge result dict. 486 487 Convenience method that handles _id, _type, _src, _dst keys. 488 """ 489 return cls.from_properties(data, session=session) 490 491 def __repr__(self) -> str: 492 eid_str = f"eid={self._eid}" if self._eid else "unsaved" 493 return f"{self.__class__.__name__}({eid_str}, {super().__repr__()})"
Base class for graph edge models with properties.
Subclass this to define edge types with typed properties. Edges represent relationships between nodes.
Attributes: __edge_type__: The edge type name. __from__: The source node type(s). __to__: The target node type(s).
Private Attributes: _eid: The edge ID assigned by the database. _src_vid: The source vertex ID. _dst_vid: The destination vertex ID. _session: Reference to the owning session.
Example:
class FriendshipEdge(UniEdge): ... __edge_type__ = "FRIEND_OF" ... __from__ = Person ... __to__ = Person ... ... since: date ... strength: float = 1.0
375 @property 376 def eid(self) -> int | None: 377 """The edge ID assigned by the database.""" 378 return self._eid
The edge ID assigned by the database.
380 @property 381 def src_vid(self) -> int | None: 382 """The source vertex ID.""" 383 return self._src_vid
The source vertex ID.
385 @property 386 def dst_vid(self) -> int | None: 387 """The destination vertex ID.""" 388 return self._dst_vid
The destination vertex ID.
390 @property 391 def is_persisted(self) -> bool: 392 """Whether this edge has been saved to the database.""" 393 return self._eid is not None
Whether this edge has been saved to the database.
409 @classmethod 410 def get_from_labels(cls) -> list[str]: 411 """Get the source label names.""" 412 if cls.__from__ is None: 413 return [] 414 if isinstance(cls.__from__, tuple): 415 return [n.__label__ for n in cls.__from__] 416 return [cls.__from__.__label__]
Get the source label names.
418 @classmethod 419 def get_to_labels(cls) -> list[str]: 420 """Get the target label names.""" 421 if cls.__to__ is None: 422 return [] 423 if isinstance(cls.__to__, tuple): 424 return [n.__label__ for n in cls.__to__] 425 return [cls.__to__.__label__]
Get the target label names.
427 @classmethod 428 def get_property_fields(cls) -> dict[str, FieldInfo]: 429 """Get all property fields.""" 430 return dict(cls.model_fields)
Get all property fields.
432 def to_properties(self) -> dict[str, Any]: 433 """Convert to a property dictionary for database storage.""" 434 return _model_to_properties(self, self.get_property_fields())
Convert to a property dictionary for database storage.
436 @classmethod 437 def from_properties( 438 cls, 439 props: dict[str, Any], 440 *, 441 eid: int | None = None, 442 src_vid: int | None = None, 443 dst_vid: int | None = None, 444 session: UniSession | None = None, 445 ) -> UniEdge: 446 """Create an instance from a property dictionary. 447 448 Accepts _id, _type, _src, _dst from uni-db edge dicts. 449 Does not mutate the input dict. 450 """ 451 data = dict(props) 452 453 raw_id = data.pop("_id", None) 454 if raw_id is not None and eid is None: 455 eid = int(raw_id) if not isinstance(raw_id, int) else raw_id 456 data.pop("_type", None) 457 raw_src = data.pop("_src", None) 458 if raw_src is not None and src_vid is None: 459 src_vid = int(raw_src) if not isinstance(raw_src, int) else raw_src 460 raw_dst = data.pop("_dst", None) 461 if raw_dst is not None and dst_vid is None: 462 dst_vid = int(raw_dst) if not isinstance(raw_dst, int) else raw_dst 463 464 converted = _convert_db_values(data, cls) 465 466 instance = cls.model_validate(converted) 467 if eid is not None: 468 instance._eid = eid 469 if src_vid is not None: 470 instance._src_vid = src_vid 471 if dst_vid is not None: 472 instance._dst_vid = dst_vid 473 if session is not None: 474 instance._session = session 475 instance._is_new = eid is None 476 return instance
Create an instance from a property dictionary.
Accepts _id, _type, _src, _dst from uni-db edge dicts. Does not mutate the input dict.
478 @classmethod 479 def from_edge_result( 480 cls, 481 data: dict[str, Any], 482 *, 483 session: UniSession | None = None, 484 ) -> UniEdge: 485 """Create an instance from a uni-db edge result dict. 486 487 Convenience method that handles _id, _type, _src, _dst keys. 488 """ 489 return cls.from_properties(data, session=session)
Create an instance from a uni-db edge result dict.
Convenience method that handles _id, _type, _src, _dst keys.
160class UniSession: 161 """ 162 Session for interacting with the graph database using Pydantic models. 163 164 The session manages model registration, schema synchronization, 165 and provides CRUD operations and query building. 166 167 Example: 168 >>> from uni_db import Database 169 >>> from uni_pydantic import UniSession 170 >>> 171 >>> db = Database("./my_graph") 172 >>> session = UniSession(db) 173 >>> session.register(Person, Company) 174 >>> session.sync_schema() 175 >>> 176 >>> alice = Person(name="Alice", age=30) 177 >>> session.add(alice) 178 >>> session.commit() 179 """ 180 181 def __init__(self, db: uni_db.Database) -> None: 182 self._db = db 183 self._schema_gen = SchemaGenerator() 184 self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = ( 185 WeakValueDictionary() 186 ) 187 self._pending_new: list[UniNode] = [] 188 self._pending_delete: list[UniNode] = [] 189 190 def __enter__(self) -> UniSession: 191 return self 192 193 def __exit__( 194 self, 195 exc_type: type[BaseException] | None, 196 exc_val: BaseException | None, 197 exc_tb: TracebackType | None, 198 ) -> None: 199 self.close() 200 201 def close(self) -> None: 202 """Close the session and clear all pending state.""" 203 self._pending_new.clear() 204 self._pending_delete.clear() 205 206 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 207 """ 208 Register model classes with the session. 209 210 Registered models can be used for schema generation and queries. 211 212 Args: 213 *models: UniNode or UniEdge subclasses to register. 214 """ 215 self._schema_gen.register(*models) 216 217 def sync_schema(self) -> None: 218 """ 219 Synchronize database schema with registered models. 220 221 Creates labels, edge types, properties, and indexes as needed. 222 This is additive-only; it won't remove existing schema elements. 223 """ 224 self._schema_gen.apply_to_database(self._db) 225 226 def query(self, model: type[NodeT]) -> QueryBuilder[NodeT]: 227 """ 228 Create a query builder for the given model. 229 230 Args: 231 model: The UniNode subclass to query. 232 233 Returns: 234 A QueryBuilder for constructing queries. 235 """ 236 return QueryBuilder(self, model) 237 238 def add(self, entity: UniNode) -> None: 239 """ 240 Add a new entity to be persisted. 241 242 The entity will be inserted on the next commit(). 243 """ 244 if entity.is_persisted: 245 raise SessionError(f"Entity {entity!r} is already persisted") 246 entity._session = self 247 self._pending_new.append(entity) 248 249 def add_all(self, entities: Sequence[UniNode]) -> None: 250 """Add multiple entities to be persisted.""" 251 for entity in entities: 252 self.add(entity) 253 254 def delete(self, entity: UniNode) -> None: 255 """Mark an entity for deletion.""" 256 if not entity.is_persisted: 257 raise NotPersisted(entity) 258 self._pending_delete.append(entity) 259 260 def get( 261 self, 262 model: type[NodeT], 263 vid: int | None = None, 264 uid: str | None = None, 265 **kwargs: Any, 266 ) -> NodeT | None: 267 """ 268 Get an entity by ID or unique properties. 269 270 Args: 271 model: The model type to retrieve. 272 vid: Vertex ID to look up. 273 uid: Unique ID to look up. 274 **kwargs: Property equality filters. 275 276 Returns: 277 The model instance or None if not found. 278 """ 279 # Check identity map first 280 if vid is not None: 281 cached = self._identity_map.get((model.__label__, vid)) 282 if cached is not None: 283 return cached # type: ignore[return-value] 284 285 # Build query 286 label = model.__label__ 287 params: dict[str, Any] = {} 288 289 if vid is not None: 290 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 291 params["vid"] = vid 292 elif uid is not None: 293 cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}" 294 params["uid"] = uid 295 elif kwargs: 296 # Validate property names 297 for k in kwargs: 298 _validate_property(k, model) 299 conditions = [f"n.{k} = ${k}" for k in kwargs] 300 cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1" 301 params.update(kwargs) 302 else: 303 raise ValueError("Must provide vid, uid, or property filters") 304 305 results = self._db.query(cypher, params) 306 if not results: 307 return None 308 309 node_data = _row_to_node_dict(results[0]) 310 if node_data is None: 311 return None 312 return self._result_to_model(node_data, model) 313 314 def refresh(self, entity: UniNode) -> None: 315 """Refresh an entity's properties from the database.""" 316 if not entity.is_persisted: 317 raise NotPersisted(entity) 318 319 label = entity.__class__.__label__ 320 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 321 results = self._db.query(cypher, {"vid": entity._vid}) 322 323 if not results: 324 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 325 326 # Update properties 327 props = _row_to_node_dict(results[0]) 328 if props is None: 329 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 330 try: 331 hints = get_type_hints(type(entity)) 332 except Exception: 333 hints = {} 334 335 for field_name in entity.get_property_fields(): 336 if field_name in props: 337 value = props[field_name] 338 if field_name in hints: 339 value = db_to_python_value(value, hints[field_name]) 340 setattr(entity, field_name, value) 341 342 entity._mark_clean() 343 344 def commit(self) -> None: 345 """ 346 Commit all pending changes to the database. 347 348 This persists new entities, updates dirty entities, 349 and deletes marked entities. 350 """ 351 # Insert new entities 352 for entity in self._pending_new: 353 self._create_node(entity) 354 355 # Update dirty entities in identity map 356 for (label, vid), entity in list(self._identity_map.items()): 357 if entity.is_dirty and entity.is_persisted: 358 self._update_node(entity) 359 360 # Delete marked entities 361 for entity in self._pending_delete: 362 self._delete_node(entity) 363 364 # Flush to storage 365 self._db.flush() 366 367 # Clear pending lists 368 self._pending_new.clear() 369 self._pending_delete.clear() 370 371 def rollback(self) -> None: 372 """Discard all pending changes.""" 373 # Clear pending new — detach entities 374 for entity in self._pending_new: 375 entity._session = None 376 self._pending_new.clear() 377 378 # Clear pending deletes 379 self._pending_delete.clear() 380 381 # Invalidate dirty identity map entries 382 for entity in list(self._identity_map.values()): 383 if entity.is_dirty: 384 self.refresh(entity) 385 386 @contextmanager 387 def transaction(self) -> Iterator[UniTransaction]: 388 """Create a transaction context.""" 389 tx = UniTransaction(self) 390 with tx: 391 yield tx 392 393 def begin(self) -> UniTransaction: 394 """Begin a new transaction.""" 395 tx = UniTransaction(self) 396 tx._tx = self._db.begin() 397 return tx 398 399 def cypher( 400 self, 401 query: str, 402 params: dict[str, Any] | None = None, 403 result_type: type[NodeT] | None = None, 404 ) -> list[NodeT] | list[dict[str, Any]]: 405 """ 406 Execute a raw Cypher query. 407 408 Args: 409 query: Cypher query string. 410 params: Query parameters. 411 result_type: Optional model type for result mapping. 412 413 Returns: 414 List of results (model instances if result_type provided). 415 """ 416 results = self._db.query(query, params) 417 418 if result_type is None: 419 return cast(list[dict[str, Any]], results) 420 421 # Map results to model instances 422 mapped = [] 423 for row in results: 424 # Try to find node data in the row 425 for key, value in row.items(): 426 if isinstance(value, dict): 427 # Check for _id/_label keys (uni-db node dict) 428 if "_id" in value and "_label" in value: 429 instance = self._result_to_model(value, result_type) 430 if instance: 431 mapped.append(instance) 432 break 433 # Also check if _label matches registered model 434 elif "_label" in value: 435 label = value["_label"] 436 if label in self._schema_gen._node_models: 437 model = self._schema_gen._node_models[label] 438 instance = self._result_to_model(value, model) 439 if instance: 440 mapped.append(instance) 441 break 442 else: 443 # Try the first column 444 first_value = next(iter(row.values()), None) 445 if isinstance(first_value, dict): 446 instance = self._result_to_model(first_value, result_type) 447 if instance: 448 mapped.append(instance) 449 450 return mapped 451 452 @staticmethod 453 def _validate_edge_endpoints( 454 source: UniNode, target: UniNode 455 ) -> tuple[int, int, str, str]: 456 """Validate that both endpoints are persisted and return (src_vid, dst_vid, src_label, dst_label).""" 457 if not source.is_persisted: 458 raise NotPersisted(source) 459 if not target.is_persisted: 460 raise NotPersisted(target) 461 return ( 462 source._vid, 463 target._vid, 464 source.__class__.__label__, 465 target.__class__.__label__, 466 ) 467 468 @staticmethod 469 def _normalize_edge_properties( 470 properties: dict[str, Any] | UniEdge | None, 471 ) -> dict[str, Any]: 472 """Normalize edge properties from dict, UniEdge, or None.""" 473 if isinstance(properties, UniEdge): 474 return properties.to_properties() 475 if properties: 476 return properties 477 return {} 478 479 def create_edge( 480 self, 481 source: UniNode, 482 edge_type: str, 483 target: UniNode, 484 properties: dict[str, Any] | UniEdge | None = None, 485 ) -> None: 486 """Create an edge between two nodes.""" 487 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 488 source, target 489 ) 490 props = self._normalize_edge_properties(properties) 491 492 # Build CREATE edge query with labels (required by Cypher implementation) 493 props_str = ", ".join(f"{k}: ${k}" for k in props) 494 if props_str: 495 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)" 496 else: 497 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)" 498 499 params = {"src": src_vid, "dst": dst_vid, **props} 500 self._db.query(cypher, params) 501 502 def delete_edge( 503 self, 504 source: UniNode, 505 edge_type: str, 506 target: UniNode, 507 ) -> int: 508 """Delete edges between two nodes. Returns the number of deleted edges.""" 509 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 510 source, target 511 ) 512 cypher = ( 513 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 514 f"WHERE a._vid = $src AND b._vid = $dst " 515 f"DELETE r RETURN count(r) as count" 516 ) 517 results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 518 return cast(int, results[0]["count"]) if results else 0 519 520 def update_edge( 521 self, 522 source: UniNode, 523 edge_type: str, 524 target: UniNode, 525 properties: dict[str, Any], 526 ) -> int: 527 """Update properties on edges between two nodes. Returns the number of updated edges.""" 528 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 529 source, target 530 ) 531 set_parts = [f"r.{k} = ${k}" for k in properties] 532 params: dict[str, Any] = {"src": src_vid, "dst": dst_vid, **properties} 533 cypher = ( 534 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 535 f"WHERE a._vid = $src AND b._vid = $dst " 536 f"SET {', '.join(set_parts)} " 537 f"RETURN count(r) as count" 538 ) 539 results = self._db.query(cypher, params) 540 return cast(int, results[0]["count"]) if results else 0 541 542 def get_edge( 543 self, 544 source: UniNode, 545 edge_type: str, 546 target: UniNode, 547 edge_model: type[EdgeT] | None = None, 548 ) -> list[dict[str, Any]] | list[EdgeT]: 549 """Get edges between two nodes. Returns dicts or edge model instances.""" 550 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 551 source, target 552 ) 553 cypher = ( 554 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 555 f"WHERE a._vid = $src AND b._vid = $dst " 556 f"RETURN properties(r) AS _props, id(r) AS _eid" 557 ) 558 results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 559 560 if edge_model is None: 561 edge_dicts: list[dict[str, Any]] = [] 562 for row in results: 563 props = row.get("_props", {}) 564 if isinstance(props, dict): 565 edge_dict = dict(props) 566 edge_dict["_eid"] = row.get("_eid") 567 edge_dicts.append(edge_dict) 568 return edge_dicts 569 570 edges = [] 571 for row in results: 572 r_data = row.get("_props", {}) 573 if isinstance(r_data, dict): 574 edge = edge_model.from_properties( 575 r_data, 576 src_vid=src_vid, 577 dst_vid=dst_vid, 578 session=self, 579 ) 580 edges.append(edge) 581 return edges 582 583 def bulk_add(self, entities: Sequence[UniNode]) -> list[int]: 584 """ 585 Bulk-add entities using bulk_insert_vertices for performance. 586 587 Groups entities by label and uses db.bulk_insert_vertices(). 588 Returns VIDs and attaches sessions. 589 590 Args: 591 entities: Sequence of UniNode instances to bulk-insert. 592 593 Returns: 594 List of assigned vertex IDs. 595 596 Raises: 597 BulkLoadError: If bulk insertion fails. 598 """ 599 if not entities: 600 return [] 601 602 # Group by label 603 by_label: dict[str, list[UniNode]] = {} 604 for entity in entities: 605 label = entity.__class__.__label__ 606 if label not in by_label: 607 by_label[label] = [] 608 by_label[label].append(entity) 609 610 all_vids: list[int] = [] 611 try: 612 for label, group in by_label.items(): 613 # Run before_create hooks 614 for entity in group: 615 run_hooks(entity, _BEFORE_CREATE) 616 617 # Convert to property dicts 618 prop_dicts = [e.to_properties() for e in group] 619 620 # Bulk insert 621 vids = self._db.bulk_insert_vertices(label, prop_dicts) 622 623 # Attach sessions and record VIDs 624 for entity, vid in zip(group, vids): 625 entity._attach_session(self, vid) 626 self._identity_map[(label, vid)] = entity 627 run_hooks(entity, _AFTER_CREATE) 628 entity._mark_clean() 629 630 all_vids.extend(vids) 631 except Exception as e: 632 raise BulkLoadError(f"Bulk insert failed: {e}") from e 633 634 return all_vids 635 636 def explain(self, cypher: str) -> dict[str, Any]: 637 """Get the query execution plan without running it.""" 638 return self._db.explain(cypher) 639 640 def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: 641 """Run the query with profiling and return results + stats.""" 642 return self._db.profile(cypher) 643 644 def save_schema(self, path: str) -> None: 645 """Save the database schema to a file.""" 646 self._db.save_schema(path) 647 648 def load_schema(self, path: str) -> None: 649 """Load a database schema from a file.""" 650 self._db.load_schema(path) 651 652 # ------------------------------------------------------------------------- 653 # Internal Methods 654 # ------------------------------------------------------------------------- 655 656 def _create_node(self, entity: UniNode) -> None: 657 """Create a node in the database.""" 658 # Run before_create hooks 659 run_hooks(entity, _BEFORE_CREATE) 660 661 label = entity.__class__.__label__ 662 props = entity.to_properties() 663 664 # Build CREATE query 665 props_str = ", ".join(f"{k}: ${k}" for k in props) 666 cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid" 667 668 results = self._db.query(cypher, props) 669 if results: 670 vid = results[0]["vid"] 671 entity._attach_session(self, vid) 672 673 # Add to identity map 674 self._identity_map[(label, vid)] = entity 675 676 # Run after_create hooks 677 run_hooks(entity, _AFTER_CREATE) 678 entity._mark_clean() 679 680 def _create_node_in_tx(self, entity: UniNode, tx: uni_db.Transaction) -> None: 681 """Create a node within a transaction.""" 682 run_hooks(entity, _BEFORE_CREATE) 683 684 label = entity.__class__.__label__ 685 props = entity.to_properties() 686 687 props_str = ", ".join(f"{k}: ${k}" for k in props) 688 cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid" 689 690 results = tx.query(cypher, props) 691 if results: 692 vid = results[0]["vid"] 693 entity._attach_session(self, vid) 694 self._identity_map[(label, vid)] = entity 695 696 run_hooks(entity, _AFTER_CREATE) 697 698 def _create_edge_in_tx( 699 self, 700 source: UniNode, 701 edge_type: str, 702 target: UniNode, 703 properties: UniEdge | None, 704 tx: uni_db.Transaction, 705 ) -> None: 706 """Create an edge within a transaction.""" 707 props = properties.to_properties() if properties else {} 708 src_label = source.__class__.__label__ 709 dst_label = target.__class__.__label__ 710 711 props_str = ", ".join(f"{k}: ${k}" for k in props) 712 if props_str: 713 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type} {{{props_str}}}]->(b)" 714 else: 715 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type}]->(b)" 716 717 params = {"src": source._vid, "dst": target._vid, **props} 718 tx.query(cypher, params) 719 720 def _update_node(self, entity: UniNode) -> None: 721 """Update a node in the database.""" 722 run_hooks(entity, _BEFORE_UPDATE) 723 724 label = entity.__class__.__label__ 725 726 # Convert dirty prop values via python_to_db_value 727 try: 728 hints = get_type_hints(type(entity)) 729 except Exception: 730 hints = {} 731 732 dirty_props = {} 733 for name in entity._dirty: 734 value = getattr(entity, name) 735 if name in hints: 736 value = python_to_db_value(value, hints[name]) 737 dirty_props[name] = value 738 739 if not dirty_props: 740 return 741 742 set_clause = ", ".join(f"n.{k} = ${k}" for k in dirty_props) 743 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid SET {set_clause}" 744 params = {"vid": entity._vid, **dirty_props} 745 746 self._db.query(cypher, params) 747 748 run_hooks(entity, _AFTER_UPDATE) 749 entity._mark_clean() 750 751 def _delete_node(self, entity: UniNode) -> None: 752 """Delete a node from the database.""" 753 run_hooks(entity, _BEFORE_DELETE) 754 755 label = entity.__class__.__label__ 756 vid = entity._vid 757 758 # DETACH DELETE to also remove connected edges 759 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid DETACH DELETE n" 760 self._db.query(cypher, {"vid": vid}) 761 762 # Remove from identity map 763 if vid is not None and (label, vid) in self._identity_map: 764 del self._identity_map[(label, vid)] 765 766 # Clear entity IDs 767 entity._vid = None 768 entity._uid = None 769 entity._session = None 770 771 run_hooks(entity, _AFTER_DELETE) 772 773 def _result_to_model( 774 self, 775 data: dict[str, Any], 776 model: type[NodeT], 777 ) -> NodeT | None: 778 """Convert a query result row to a model instance. 779 780 Does not mutate the input dict. 781 """ 782 if not data: 783 return None 784 785 # Work on a copy 786 data = dict(data) 787 788 # Run before_load hooks 789 data = run_class_hooks(model, _BEFORE_LOAD, data) or data 790 791 # Extract _id → vid (uni-db returns _id as string or int) 792 vid = data.pop("_id", None) or data.pop("_vid", None) or data.pop("vid", None) 793 if vid is not None and not isinstance(vid, int): 794 vid = int(vid) 795 796 # Remove _label (informational) 797 data.pop("_label", None) 798 799 try: 800 instance = cast( 801 NodeT, 802 model.from_properties( 803 data, 804 vid=vid, 805 session=self, 806 ), 807 ) 808 except Exception: 809 # If validation fails, return None 810 return None 811 812 # Add to identity map if we have a vid 813 if vid is not None: 814 existing = self._identity_map.get((model.__label__, vid)) 815 if existing is not None: 816 return cast(NodeT, existing) 817 self._identity_map[(model.__label__, vid)] = instance 818 819 # Run after_load hooks 820 run_hooks(instance, _AFTER_LOAD) 821 822 return instance 823 824 def _load_relationship( 825 self, 826 entity: UniNode, 827 descriptor: RelationshipDescriptor[Any], 828 ) -> list[UniNode] | UniNode | None: 829 """Load a relationship for an entity.""" 830 if not entity.is_persisted: 831 raise NotPersisted(entity) 832 833 config = descriptor.config 834 label = entity.__class__.__label__ 835 pattern = _edge_pattern(config.edge_type, config.direction) 836 837 cypher = ( 838 f"MATCH (a:{label}){pattern}(b) WHERE id(a) = $vid " 839 f"RETURN properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels" 840 ) 841 results = self._db.query(cypher, {"vid": entity._vid}) 842 843 nodes = [] 844 for row in results: 845 node_data = _row_to_node_dict(row) 846 if node_data is None: 847 continue 848 # Try to find the model for this node 849 node_label = node_data.get("_label") 850 if node_label and node_label in self._schema_gen._node_models: 851 model = self._schema_gen._node_models[node_label] 852 instance = self._result_to_model(node_data, model) 853 if instance: 854 nodes.append(instance) 855 856 if not descriptor.is_list: 857 return nodes[0] if nodes else None 858 return nodes 859 860 def _eager_load_relationships( 861 self, 862 entities: list[NodeT], 863 relationships: list[str], 864 ) -> None: 865 """Eager load relationships for a list of entities.""" 866 if not entities: 867 return 868 869 model = type(entities[0]) 870 rel_configs = model.get_relationship_fields() 871 872 for rel_name in relationships: 873 if rel_name not in rel_configs: 874 continue 875 876 config = rel_configs[rel_name] 877 label = model.__label__ 878 vids = [e._vid for e in entities if e._vid is not None] 879 880 if not vids: 881 continue 882 883 pattern = _edge_pattern(config.edge_type, config.direction) 884 cypher = ( 885 f"MATCH (a:{label}){pattern}(b) WHERE id(a) IN $vids " 886 f"RETURN id(a) as src_vid, properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels" 887 ) 888 results = self._db.query(cypher, {"vids": vids}) 889 890 # Group results by source vid 891 by_source: dict[int, list[Any]] = {} 892 for row in results: 893 src_vid = row["src_vid"] 894 node_data = _row_to_node_dict(row) 895 if node_data is None: 896 continue 897 if src_vid not in by_source: 898 by_source[src_vid] = [] 899 by_source[src_vid].append(node_data) 900 901 # Set cached values on entities 902 for entity in entities: 903 if entity._vid in by_source: 904 related = by_source[entity._vid] 905 cache_attr = f"_rel_cache_{rel_name}" 906 setattr(entity, cache_attr, related)
Session for interacting with the graph database using Pydantic models.
The session manages model registration, schema synchronization, and provides CRUD operations and query building.
Example:
from uni_db import Database from uni_pydantic import UniSession
db = Database("./my_graph") session = UniSession(db) session.register(Person, Company) session.sync_schema()
alice = Person(name="Alice", age=30) session.add(alice) session.commit()
181 def __init__(self, db: uni_db.Database) -> None: 182 self._db = db 183 self._schema_gen = SchemaGenerator() 184 self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = ( 185 WeakValueDictionary() 186 ) 187 self._pending_new: list[UniNode] = [] 188 self._pending_delete: list[UniNode] = []
201 def close(self) -> None: 202 """Close the session and clear all pending state.""" 203 self._pending_new.clear() 204 self._pending_delete.clear()
Close the session and clear all pending state.
206 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 207 """ 208 Register model classes with the session. 209 210 Registered models can be used for schema generation and queries. 211 212 Args: 213 *models: UniNode or UniEdge subclasses to register. 214 """ 215 self._schema_gen.register(*models)
Register model classes with the session.
Registered models can be used for schema generation and queries.
Args: *models: UniNode or UniEdge subclasses to register.
217 def sync_schema(self) -> None: 218 """ 219 Synchronize database schema with registered models. 220 221 Creates labels, edge types, properties, and indexes as needed. 222 This is additive-only; it won't remove existing schema elements. 223 """ 224 self._schema_gen.apply_to_database(self._db)
Synchronize database schema with registered models.
Creates labels, edge types, properties, and indexes as needed. This is additive-only; it won't remove existing schema elements.
226 def query(self, model: type[NodeT]) -> QueryBuilder[NodeT]: 227 """ 228 Create a query builder for the given model. 229 230 Args: 231 model: The UniNode subclass to query. 232 233 Returns: 234 A QueryBuilder for constructing queries. 235 """ 236 return QueryBuilder(self, model)
Create a query builder for the given model.
Args: model: The UniNode subclass to query.
Returns: A QueryBuilder for constructing queries.
238 def add(self, entity: UniNode) -> None: 239 """ 240 Add a new entity to be persisted. 241 242 The entity will be inserted on the next commit(). 243 """ 244 if entity.is_persisted: 245 raise SessionError(f"Entity {entity!r} is already persisted") 246 entity._session = self 247 self._pending_new.append(entity)
Add a new entity to be persisted.
The entity will be inserted on the next commit().
249 def add_all(self, entities: Sequence[UniNode]) -> None: 250 """Add multiple entities to be persisted.""" 251 for entity in entities: 252 self.add(entity)
Add multiple entities to be persisted.
254 def delete(self, entity: UniNode) -> None: 255 """Mark an entity for deletion.""" 256 if not entity.is_persisted: 257 raise NotPersisted(entity) 258 self._pending_delete.append(entity)
Mark an entity for deletion.
260 def get( 261 self, 262 model: type[NodeT], 263 vid: int | None = None, 264 uid: str | None = None, 265 **kwargs: Any, 266 ) -> NodeT | None: 267 """ 268 Get an entity by ID or unique properties. 269 270 Args: 271 model: The model type to retrieve. 272 vid: Vertex ID to look up. 273 uid: Unique ID to look up. 274 **kwargs: Property equality filters. 275 276 Returns: 277 The model instance or None if not found. 278 """ 279 # Check identity map first 280 if vid is not None: 281 cached = self._identity_map.get((model.__label__, vid)) 282 if cached is not None: 283 return cached # type: ignore[return-value] 284 285 # Build query 286 label = model.__label__ 287 params: dict[str, Any] = {} 288 289 if vid is not None: 290 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 291 params["vid"] = vid 292 elif uid is not None: 293 cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}" 294 params["uid"] = uid 295 elif kwargs: 296 # Validate property names 297 for k in kwargs: 298 _validate_property(k, model) 299 conditions = [f"n.{k} = ${k}" for k in kwargs] 300 cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1" 301 params.update(kwargs) 302 else: 303 raise ValueError("Must provide vid, uid, or property filters") 304 305 results = self._db.query(cypher, params) 306 if not results: 307 return None 308 309 node_data = _row_to_node_dict(results[0]) 310 if node_data is None: 311 return None 312 return self._result_to_model(node_data, model)
Get an entity by ID or unique properties.
Args: model: The model type to retrieve. vid: Vertex ID to look up. uid: Unique ID to look up. **kwargs: Property equality filters.
Returns: The model instance or None if not found.
314 def refresh(self, entity: UniNode) -> None: 315 """Refresh an entity's properties from the database.""" 316 if not entity.is_persisted: 317 raise NotPersisted(entity) 318 319 label = entity.__class__.__label__ 320 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 321 results = self._db.query(cypher, {"vid": entity._vid}) 322 323 if not results: 324 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 325 326 # Update properties 327 props = _row_to_node_dict(results[0]) 328 if props is None: 329 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 330 try: 331 hints = get_type_hints(type(entity)) 332 except Exception: 333 hints = {} 334 335 for field_name in entity.get_property_fields(): 336 if field_name in props: 337 value = props[field_name] 338 if field_name in hints: 339 value = db_to_python_value(value, hints[field_name]) 340 setattr(entity, field_name, value) 341 342 entity._mark_clean()
Refresh an entity's properties from the database.
344 def commit(self) -> None: 345 """ 346 Commit all pending changes to the database. 347 348 This persists new entities, updates dirty entities, 349 and deletes marked entities. 350 """ 351 # Insert new entities 352 for entity in self._pending_new: 353 self._create_node(entity) 354 355 # Update dirty entities in identity map 356 for (label, vid), entity in list(self._identity_map.items()): 357 if entity.is_dirty and entity.is_persisted: 358 self._update_node(entity) 359 360 # Delete marked entities 361 for entity in self._pending_delete: 362 self._delete_node(entity) 363 364 # Flush to storage 365 self._db.flush() 366 367 # Clear pending lists 368 self._pending_new.clear() 369 self._pending_delete.clear()
Commit all pending changes to the database.
This persists new entities, updates dirty entities, and deletes marked entities.
371 def rollback(self) -> None: 372 """Discard all pending changes.""" 373 # Clear pending new — detach entities 374 for entity in self._pending_new: 375 entity._session = None 376 self._pending_new.clear() 377 378 # Clear pending deletes 379 self._pending_delete.clear() 380 381 # Invalidate dirty identity map entries 382 for entity in list(self._identity_map.values()): 383 if entity.is_dirty: 384 self.refresh(entity)
Discard all pending changes.
386 @contextmanager 387 def transaction(self) -> Iterator[UniTransaction]: 388 """Create a transaction context.""" 389 tx = UniTransaction(self) 390 with tx: 391 yield tx
Create a transaction context.
393 def begin(self) -> UniTransaction: 394 """Begin a new transaction.""" 395 tx = UniTransaction(self) 396 tx._tx = self._db.begin() 397 return tx
Begin a new transaction.
399 def cypher( 400 self, 401 query: str, 402 params: dict[str, Any] | None = None, 403 result_type: type[NodeT] | None = None, 404 ) -> list[NodeT] | list[dict[str, Any]]: 405 """ 406 Execute a raw Cypher query. 407 408 Args: 409 query: Cypher query string. 410 params: Query parameters. 411 result_type: Optional model type for result mapping. 412 413 Returns: 414 List of results (model instances if result_type provided). 415 """ 416 results = self._db.query(query, params) 417 418 if result_type is None: 419 return cast(list[dict[str, Any]], results) 420 421 # Map results to model instances 422 mapped = [] 423 for row in results: 424 # Try to find node data in the row 425 for key, value in row.items(): 426 if isinstance(value, dict): 427 # Check for _id/_label keys (uni-db node dict) 428 if "_id" in value and "_label" in value: 429 instance = self._result_to_model(value, result_type) 430 if instance: 431 mapped.append(instance) 432 break 433 # Also check if _label matches registered model 434 elif "_label" in value: 435 label = value["_label"] 436 if label in self._schema_gen._node_models: 437 model = self._schema_gen._node_models[label] 438 instance = self._result_to_model(value, model) 439 if instance: 440 mapped.append(instance) 441 break 442 else: 443 # Try the first column 444 first_value = next(iter(row.values()), None) 445 if isinstance(first_value, dict): 446 instance = self._result_to_model(first_value, result_type) 447 if instance: 448 mapped.append(instance) 449 450 return mapped
Execute a raw Cypher query.
Args: query: Cypher query string. params: Query parameters. result_type: Optional model type for result mapping.
Returns: List of results (model instances if result_type provided).
479 def create_edge( 480 self, 481 source: UniNode, 482 edge_type: str, 483 target: UniNode, 484 properties: dict[str, Any] | UniEdge | None = None, 485 ) -> None: 486 """Create an edge between two nodes.""" 487 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 488 source, target 489 ) 490 props = self._normalize_edge_properties(properties) 491 492 # Build CREATE edge query with labels (required by Cypher implementation) 493 props_str = ", ".join(f"{k}: ${k}" for k in props) 494 if props_str: 495 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)" 496 else: 497 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)" 498 499 params = {"src": src_vid, "dst": dst_vid, **props} 500 self._db.query(cypher, params)
Create an edge between two nodes.
502 def delete_edge( 503 self, 504 source: UniNode, 505 edge_type: str, 506 target: UniNode, 507 ) -> int: 508 """Delete edges between two nodes. Returns the number of deleted edges.""" 509 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 510 source, target 511 ) 512 cypher = ( 513 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 514 f"WHERE a._vid = $src AND b._vid = $dst " 515 f"DELETE r RETURN count(r) as count" 516 ) 517 results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 518 return cast(int, results[0]["count"]) if results else 0
Delete edges between two nodes. Returns the number of deleted edges.
520 def update_edge( 521 self, 522 source: UniNode, 523 edge_type: str, 524 target: UniNode, 525 properties: dict[str, Any], 526 ) -> int: 527 """Update properties on edges between two nodes. Returns the number of updated edges.""" 528 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 529 source, target 530 ) 531 set_parts = [f"r.{k} = ${k}" for k in properties] 532 params: dict[str, Any] = {"src": src_vid, "dst": dst_vid, **properties} 533 cypher = ( 534 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 535 f"WHERE a._vid = $src AND b._vid = $dst " 536 f"SET {', '.join(set_parts)} " 537 f"RETURN count(r) as count" 538 ) 539 results = self._db.query(cypher, params) 540 return cast(int, results[0]["count"]) if results else 0
Update properties on edges between two nodes. Returns the number of updated edges.
542 def get_edge( 543 self, 544 source: UniNode, 545 edge_type: str, 546 target: UniNode, 547 edge_model: type[EdgeT] | None = None, 548 ) -> list[dict[str, Any]] | list[EdgeT]: 549 """Get edges between two nodes. Returns dicts or edge model instances.""" 550 src_vid, dst_vid, src_label, dst_label = self._validate_edge_endpoints( 551 source, target 552 ) 553 cypher = ( 554 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 555 f"WHERE a._vid = $src AND b._vid = $dst " 556 f"RETURN properties(r) AS _props, id(r) AS _eid" 557 ) 558 results = self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 559 560 if edge_model is None: 561 edge_dicts: list[dict[str, Any]] = [] 562 for row in results: 563 props = row.get("_props", {}) 564 if isinstance(props, dict): 565 edge_dict = dict(props) 566 edge_dict["_eid"] = row.get("_eid") 567 edge_dicts.append(edge_dict) 568 return edge_dicts 569 570 edges = [] 571 for row in results: 572 r_data = row.get("_props", {}) 573 if isinstance(r_data, dict): 574 edge = edge_model.from_properties( 575 r_data, 576 src_vid=src_vid, 577 dst_vid=dst_vid, 578 session=self, 579 ) 580 edges.append(edge) 581 return edges
Get edges between two nodes. Returns dicts or edge model instances.
583 def bulk_add(self, entities: Sequence[UniNode]) -> list[int]: 584 """ 585 Bulk-add entities using bulk_insert_vertices for performance. 586 587 Groups entities by label and uses db.bulk_insert_vertices(). 588 Returns VIDs and attaches sessions. 589 590 Args: 591 entities: Sequence of UniNode instances to bulk-insert. 592 593 Returns: 594 List of assigned vertex IDs. 595 596 Raises: 597 BulkLoadError: If bulk insertion fails. 598 """ 599 if not entities: 600 return [] 601 602 # Group by label 603 by_label: dict[str, list[UniNode]] = {} 604 for entity in entities: 605 label = entity.__class__.__label__ 606 if label not in by_label: 607 by_label[label] = [] 608 by_label[label].append(entity) 609 610 all_vids: list[int] = [] 611 try: 612 for label, group in by_label.items(): 613 # Run before_create hooks 614 for entity in group: 615 run_hooks(entity, _BEFORE_CREATE) 616 617 # Convert to property dicts 618 prop_dicts = [e.to_properties() for e in group] 619 620 # Bulk insert 621 vids = self._db.bulk_insert_vertices(label, prop_dicts) 622 623 # Attach sessions and record VIDs 624 for entity, vid in zip(group, vids): 625 entity._attach_session(self, vid) 626 self._identity_map[(label, vid)] = entity 627 run_hooks(entity, _AFTER_CREATE) 628 entity._mark_clean() 629 630 all_vids.extend(vids) 631 except Exception as e: 632 raise BulkLoadError(f"Bulk insert failed: {e}") from e 633 634 return all_vids
Bulk-add entities using bulk_insert_vertices for performance.
Groups entities by label and uses db.bulk_insert_vertices(). Returns VIDs and attaches sessions.
Args: entities: Sequence of UniNode instances to bulk-insert.
Returns: List of assigned vertex IDs.
Raises: BulkLoadError: If bulk insertion fails.
636 def explain(self, cypher: str) -> dict[str, Any]: 637 """Get the query execution plan without running it.""" 638 return self._db.explain(cypher)
Get the query execution plan without running it.
640 def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: 641 """Run the query with profiling and return results + stats.""" 642 return self._db.profile(cypher)
Run the query with profiling and return results + stats.
61class UniTransaction: 62 """ 63 Transaction context for atomic operations. 64 65 Provides commit/rollback semantics for a group of operations. 66 67 Example: 68 >>> with session.transaction() as tx: 69 ... alice = Person(name="Alice") 70 ... tx.add(alice) 71 ... # Auto-commits on success, rolls back on exception 72 """ 73 74 def __init__(self, session: UniSession) -> None: 75 self._session = session 76 self._tx: uni_db.Transaction | None = None 77 self._pending_nodes: list[UniNode] = [] 78 self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = [] 79 self._committed = False 80 self._rolled_back = False 81 82 def __enter__(self) -> UniTransaction: 83 self._tx = self._session._db.begin() 84 return self 85 86 def __exit__( 87 self, 88 exc_type: type[BaseException] | None, 89 exc_val: BaseException | None, 90 exc_tb: TracebackType | None, 91 ) -> None: 92 if exc_type is not None: 93 self.rollback() 94 return 95 if not self._committed and not self._rolled_back: 96 self.commit() 97 98 def add(self, entity: UniNode) -> None: 99 """Add a node to be created in this transaction.""" 100 self._pending_nodes.append(entity) 101 102 def create_edge( 103 self, 104 source: UniNode, 105 edge_type: str, 106 target: UniNode, 107 properties: UniEdge | None = None, 108 **kwargs: Any, 109 ) -> None: 110 """Create an edge between two nodes in this transaction.""" 111 if not source.is_persisted: 112 raise NotPersisted(source) 113 if not target.is_persisted: 114 raise NotPersisted(target) 115 self._pending_edges.append((source, edge_type, target, properties)) 116 117 def commit(self) -> None: 118 """Commit the transaction.""" 119 if self._committed: 120 raise TransactionError("Transaction already committed") 121 if self._rolled_back: 122 raise TransactionError("Transaction already rolled back") 123 124 if self._tx is None: 125 raise TransactionError("Transaction not started") 126 127 try: 128 # Create pending nodes 129 for node in self._pending_nodes: 130 self._session._create_node_in_tx(node, self._tx) 131 132 # Create pending edges 133 for source, edge_type, target, props in self._pending_edges: 134 self._session._create_edge_in_tx( 135 source, edge_type, target, props, self._tx 136 ) 137 138 self._tx.commit() 139 self._committed = True 140 141 # Mark nodes as clean 142 for node in self._pending_nodes: 143 node._mark_clean() 144 145 except Exception as e: 146 self.rollback() 147 raise TransactionError(f"Commit failed: {e}") from e 148 149 def rollback(self) -> None: 150 """Rollback the transaction.""" 151 if self._rolled_back: 152 return 153 if self._tx is not None: 154 self._tx.rollback() 155 self._rolled_back = True 156 self._pending_nodes.clear() 157 self._pending_edges.clear()
Transaction context for atomic operations.
Provides commit/rollback semantics for a group of operations.
Example:
with session.transaction() as tx: ... alice = Person(name="Alice") ... tx.add(alice) ... # Auto-commits on success, rolls back on exception
74 def __init__(self, session: UniSession) -> None: 75 self._session = session 76 self._tx: uni_db.Transaction | None = None 77 self._pending_nodes: list[UniNode] = [] 78 self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = [] 79 self._committed = False 80 self._rolled_back = False
98 def add(self, entity: UniNode) -> None: 99 """Add a node to be created in this transaction.""" 100 self._pending_nodes.append(entity)
Add a node to be created in this transaction.
102 def create_edge( 103 self, 104 source: UniNode, 105 edge_type: str, 106 target: UniNode, 107 properties: UniEdge | None = None, 108 **kwargs: Any, 109 ) -> None: 110 """Create an edge between two nodes in this transaction.""" 111 if not source.is_persisted: 112 raise NotPersisted(source) 113 if not target.is_persisted: 114 raise NotPersisted(target) 115 self._pending_edges.append((source, edge_type, target, properties))
Create an edge between two nodes in this transaction.
117 def commit(self) -> None: 118 """Commit the transaction.""" 119 if self._committed: 120 raise TransactionError("Transaction already committed") 121 if self._rolled_back: 122 raise TransactionError("Transaction already rolled back") 123 124 if self._tx is None: 125 raise TransactionError("Transaction not started") 126 127 try: 128 # Create pending nodes 129 for node in self._pending_nodes: 130 self._session._create_node_in_tx(node, self._tx) 131 132 # Create pending edges 133 for source, edge_type, target, props in self._pending_edges: 134 self._session._create_edge_in_tx( 135 source, edge_type, target, props, self._tx 136 ) 137 138 self._tx.commit() 139 self._committed = True 140 141 # Mark nodes as clean 142 for node in self._pending_nodes: 143 node._mark_clean() 144 145 except Exception as e: 146 self.rollback() 147 raise TransactionError(f"Commit failed: {e}") from e
Commit the transaction.
149 def rollback(self) -> None: 150 """Rollback the transaction.""" 151 if self._rolled_back: 152 return 153 if self._tx is not None: 154 self._tx.rollback() 155 self._rolled_back = True 156 self._pending_nodes.clear() 157 self._pending_edges.clear()
Rollback the transaction.
134class AsyncUniSession: 135 """ 136 Async session for interacting with the graph database. 137 138 Mirrors UniSession with async methods. Uses AsyncDatabase. 139 140 Example: 141 >>> from uni_db import AsyncDatabase 142 >>> from uni_pydantic import AsyncUniSession 143 >>> 144 >>> db = await AsyncDatabase.open("./my_graph") 145 >>> async with AsyncUniSession(db) as session: 146 ... session.register(Person) 147 ... await session.sync_schema() 148 ... alice = Person(name="Alice", age=30) 149 ... session.add(alice) 150 ... await session.commit() 151 """ 152 153 def __init__(self, db: uni_db.AsyncDatabase) -> None: 154 self._db = db 155 self._schema_gen = SchemaGenerator() 156 self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = ( 157 WeakValueDictionary() 158 ) 159 self._pending_new: list[UniNode] = [] 160 self._pending_delete: list[UniNode] = [] 161 162 async def __aenter__(self) -> AsyncUniSession: 163 return self 164 165 async def __aexit__( 166 self, 167 exc_type: type[BaseException] | None, 168 exc_val: BaseException | None, 169 exc_tb: TracebackType | None, 170 ) -> None: 171 self.close() 172 173 def close(self) -> None: 174 """Close the session and clear pending state.""" 175 self._pending_new.clear() 176 self._pending_delete.clear() 177 178 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 179 """Register model classes with the session (sync).""" 180 self._schema_gen.register(*models) 181 182 async def sync_schema(self) -> None: 183 """Synchronize database schema with registered models.""" 184 await self._schema_gen.async_apply_to_database(self._db) 185 186 def query(self, model: type[NodeT]) -> AsyncQueryBuilder[NodeT]: 187 """Create an async query builder for the given model.""" 188 return AsyncQueryBuilder(self, model) 189 190 def add(self, entity: UniNode) -> None: 191 """Add a new entity to be persisted (sync — just collects).""" 192 if entity.is_persisted: 193 raise SessionError(f"Entity {entity!r} is already persisted") 194 entity._session = self 195 self._pending_new.append(entity) 196 197 def add_all(self, entities: Sequence[UniNode]) -> None: 198 """Add multiple entities (sync — just collects).""" 199 for entity in entities: 200 self.add(entity) 201 202 def delete(self, entity: UniNode) -> None: 203 """Mark an entity for deletion (sync — just collects).""" 204 if not entity.is_persisted: 205 raise NotPersisted(entity) 206 self._pending_delete.append(entity) 207 208 async def get( 209 self, 210 model: type[NodeT], 211 vid: int | None = None, 212 uid: str | None = None, 213 **kwargs: Any, 214 ) -> NodeT | None: 215 """Get an entity by ID or unique properties.""" 216 if vid is not None: 217 cached = self._identity_map.get((model.__label__, vid)) 218 if cached is not None: 219 return cached # type: ignore[return-value] 220 221 label = model.__label__ 222 params: dict[str, Any] = {} 223 224 if vid is not None: 225 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 226 params["vid"] = vid 227 elif uid is not None: 228 cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}" 229 params["uid"] = uid 230 elif kwargs: 231 for k in kwargs: 232 _validate_property(k, model) 233 conditions = [f"n.{k} = ${k}" for k in kwargs] 234 cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1" 235 params.update(kwargs) 236 else: 237 raise ValueError("Must provide vid, uid, or property filters") 238 239 results = await self._db.query(cypher, params) 240 if not results: 241 return None 242 243 node_data = _row_to_node_dict(results[0]) 244 if node_data is None: 245 return None 246 return self._result_to_model(node_data, model) 247 248 async def refresh(self, entity: UniNode) -> None: 249 """Refresh an entity's properties from the database.""" 250 if not entity.is_persisted: 251 raise NotPersisted(entity) 252 253 label = entity.__class__.__label__ 254 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 255 results = await self._db.query(cypher, {"vid": entity._vid}) 256 257 if not results: 258 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 259 260 props = _row_to_node_dict(results[0]) 261 if props is None: 262 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 263 try: 264 hints = get_type_hints(type(entity)) 265 except Exception: 266 hints = {} 267 268 for field_name in entity.get_property_fields(): 269 if field_name in props: 270 value = props[field_name] 271 if field_name in hints: 272 value = db_to_python_value(value, hints[field_name]) 273 setattr(entity, field_name, value) 274 275 entity._mark_clean() 276 277 async def commit(self) -> None: 278 """Commit all pending changes.""" 279 for entity in self._pending_new: 280 await self._create_node(entity) 281 282 for (label, vid), entity in list(self._identity_map.items()): 283 if entity.is_dirty and entity.is_persisted: 284 await self._update_node(entity) 285 286 for entity in self._pending_delete: 287 await self._delete_node(entity) 288 289 await self._db.flush() 290 self._pending_new.clear() 291 self._pending_delete.clear() 292 293 async def rollback(self) -> None: 294 """Discard all pending changes.""" 295 for entity in self._pending_new: 296 entity._session = None 297 self._pending_new.clear() 298 self._pending_delete.clear() 299 for entity in list(self._identity_map.values()): 300 if entity.is_dirty: 301 await self.refresh(entity) 302 303 async def transaction(self) -> AsyncUniTransaction: 304 """Create an async transaction. Use as `async with session.transaction() as tx:`.""" 305 return AsyncUniTransaction(self) 306 307 async def cypher( 308 self, 309 query: str, 310 params: dict[str, Any] | None = None, 311 result_type: type[NodeT] | None = None, 312 ) -> list[NodeT] | list[dict[str, Any]]: 313 """Execute a raw Cypher query.""" 314 results = await self._db.query(query, params) 315 316 if result_type is None: 317 return cast(list[dict[str, Any]], results) 318 319 mapped = [] 320 for row in results: 321 for key, value in row.items(): 322 if isinstance(value, dict): 323 if "_id" in value and "_label" in value: 324 instance = self._result_to_model(value, result_type) 325 if instance: 326 mapped.append(instance) 327 break 328 elif "_label" in value: 329 label = value["_label"] 330 if label in self._schema_gen._node_models: 331 model = self._schema_gen._node_models[label] 332 instance = self._result_to_model(value, model) 333 if instance: 334 mapped.append(instance) 335 break 336 else: 337 first_value = next(iter(row.values()), None) 338 if isinstance(first_value, dict): 339 instance = self._result_to_model(first_value, result_type) 340 if instance: 341 mapped.append(instance) 342 343 return mapped 344 345 async def create_edge( 346 self, 347 source: UniNode, 348 edge_type: str, 349 target: UniNode, 350 properties: dict[str, Any] | UniEdge | None = None, 351 ) -> None: 352 """Create an edge between two nodes.""" 353 src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints( 354 source, target 355 ) 356 props = UniSession._normalize_edge_properties(properties) 357 358 props_str = ", ".join(f"{k}: ${k}" for k in props) 359 if props_str: 360 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)" 361 else: 362 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)" 363 364 await self._db.query(cypher, {"src": src_vid, "dst": dst_vid, **props}) 365 366 async def delete_edge( 367 self, source: UniNode, edge_type: str, target: UniNode 368 ) -> int: 369 """Delete edges between two nodes. Returns the number of deleted edges.""" 370 src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints( 371 source, target 372 ) 373 cypher = ( 374 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 375 f"WHERE a._vid = $src AND b._vid = $dst " 376 f"DELETE r RETURN count(r) as count" 377 ) 378 results = await self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 379 return cast(int, results[0]["count"]) if results else 0 380 381 async def bulk_add(self, entities: Sequence[UniNode]) -> list[int]: 382 """Bulk-add entities using bulk_insert_vertices.""" 383 if not entities: 384 return [] 385 386 by_label: dict[str, list[UniNode]] = {} 387 for entity in entities: 388 label = entity.__class__.__label__ 389 if label not in by_label: 390 by_label[label] = [] 391 by_label[label].append(entity) 392 393 all_vids: list[int] = [] 394 try: 395 for label, group in by_label.items(): 396 for entity in group: 397 run_hooks(entity, _BEFORE_CREATE) 398 prop_dicts = [e.to_properties() for e in group] 399 vids = await self._db.bulk_insert_vertices(label, prop_dicts) 400 for entity, vid in zip(group, vids): 401 entity._attach_session(self, vid) 402 self._identity_map[(label, vid)] = entity 403 run_hooks(entity, _AFTER_CREATE) 404 entity._mark_clean() 405 all_vids.extend(vids) 406 except Exception as e: 407 raise BulkLoadError(f"Bulk insert failed: {e}") from e 408 409 return all_vids 410 411 async def explain(self, cypher: str) -> dict[str, Any]: 412 """Get the query execution plan.""" 413 return await self._db.explain(cypher) 414 415 async def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: 416 """Run the query with profiling.""" 417 return await self._db.profile(cypher) 418 419 async def save_schema(self, path: str) -> None: 420 """Save the database schema to a file.""" 421 await self._db.save_schema(path) 422 423 async def load_schema(self, path: str) -> None: 424 """Load a database schema from a file.""" 425 await self._db.load_schema(path) 426 427 # ---- Internal methods ---- 428 429 async def _create_node(self, entity: UniNode) -> None: 430 run_hooks(entity, _BEFORE_CREATE) 431 label = entity.__class__.__label__ 432 props = entity.to_properties() 433 props_str = ", ".join(f"{k}: ${k}" for k in props) 434 cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid" 435 results = await self._db.query(cypher, props) 436 if results: 437 vid = results[0]["vid"] 438 entity._attach_session(self, vid) 439 self._identity_map[(label, vid)] = entity 440 run_hooks(entity, _AFTER_CREATE) 441 entity._mark_clean() 442 443 async def _create_node_in_tx( 444 self, entity: UniNode, tx: uni_db.AsyncTransaction 445 ) -> None: 446 run_hooks(entity, _BEFORE_CREATE) 447 label = entity.__class__.__label__ 448 props = entity.to_properties() 449 props_str = ", ".join(f"{k}: ${k}" for k in props) 450 cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN id(n) as vid" 451 results = await tx.query(cypher, props) 452 if results: 453 vid = results[0]["vid"] 454 entity._attach_session(self, vid) 455 self._identity_map[(label, vid)] = entity 456 run_hooks(entity, _AFTER_CREATE) 457 458 async def _create_edge_in_tx( 459 self, 460 source: UniNode, 461 edge_type: str, 462 target: UniNode, 463 properties: UniEdge | None, 464 tx: uni_db.AsyncTransaction, 465 ) -> None: 466 props = properties.to_properties() if properties else {} 467 src_label = source.__class__.__label__ 468 dst_label = target.__class__.__label__ 469 props_str = ", ".join(f"{k}: ${k}" for k in props) 470 if props_str: 471 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type} {{{props_str}}}]->(b)" 472 else: 473 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[:{edge_type}]->(b)" 474 params = {"src": source._vid, "dst": target._vid, **props} 475 await tx.query(cypher, params) 476 477 async def _update_node(self, entity: UniNode) -> None: 478 run_hooks(entity, _BEFORE_UPDATE) 479 label = entity.__class__.__label__ 480 try: 481 hints = get_type_hints(type(entity)) 482 except Exception: 483 hints = {} 484 dirty_props = {} 485 for name in entity._dirty: 486 value = getattr(entity, name) 487 if name in hints: 488 value = python_to_db_value(value, hints[name]) 489 dirty_props[name] = value 490 if not dirty_props: 491 return 492 set_clause = ", ".join(f"n.{k} = ${k}" for k in dirty_props) 493 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid SET {set_clause}" 494 params = {"vid": entity._vid, **dirty_props} 495 await self._db.query(cypher, params) 496 run_hooks(entity, _AFTER_UPDATE) 497 entity._mark_clean() 498 499 async def _delete_node(self, entity: UniNode) -> None: 500 run_hooks(entity, _BEFORE_DELETE) 501 label = entity.__class__.__label__ 502 vid = entity._vid 503 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid DETACH DELETE n" 504 await self._db.query(cypher, {"vid": vid}) 505 if vid is not None and (label, vid) in self._identity_map: 506 del self._identity_map[(label, vid)] 507 entity._vid = None 508 entity._uid = None 509 entity._session = None 510 run_hooks(entity, _AFTER_DELETE) 511 512 def _result_to_model( 513 self, 514 data: dict[str, Any], 515 model: type[NodeT], 516 ) -> NodeT | None: 517 """Convert a query result row to a model instance (sync — pure dict processing).""" 518 if not data: 519 return None 520 521 data = dict(data) 522 data = run_class_hooks(model, _BEFORE_LOAD, data) or data 523 524 vid = data.pop("_id", None) or data.pop("_vid", None) or data.pop("vid", None) 525 if vid is not None and not isinstance(vid, int): 526 vid = int(vid) 527 data.pop("_label", None) 528 529 try: 530 instance = cast( 531 NodeT, 532 model.from_properties(data, vid=vid, session=self), 533 ) 534 except Exception: 535 return None 536 537 if vid is not None: 538 existing = self._identity_map.get((model.__label__, vid)) 539 if existing is not None: 540 return cast(NodeT, existing) 541 self._identity_map[(model.__label__, vid)] = instance 542 543 run_hooks(instance, _AFTER_LOAD) 544 return instance 545 546 def _load_relationship( 547 self, 548 entity: UniNode, 549 descriptor: RelationshipDescriptor[Any], 550 ) -> list[UniNode] | UniNode | None: 551 """Sync relationship loading — raises error for async session. 552 Use _async_load_relationship instead.""" 553 raise SessionError( 554 "Cannot synchronously load relationships in an async session. " 555 "Use eager_load() or access relationships via async queries." 556 ) 557 558 async def _async_eager_load_relationships( 559 self, 560 entities: list[NodeT], 561 relationships: list[str], 562 ) -> None: 563 """Eager load relationships for a list of entities (async).""" 564 if not entities: 565 return 566 567 model = type(entities[0]) 568 rel_configs = model.get_relationship_fields() 569 570 for rel_name in relationships: 571 if rel_name not in rel_configs: 572 continue 573 574 config = rel_configs[rel_name] 575 label = model.__label__ 576 vids = [e._vid for e in entities if e._vid is not None] 577 578 if not vids: 579 continue 580 581 pattern = _edge_pattern(config.edge_type, config.direction) 582 cypher = ( 583 f"MATCH (a:{label}){pattern}(b) WHERE id(a) IN $vids " 584 f"RETURN id(a) as src_vid, properties(b) AS _props, id(b) AS _vid, labels(b) AS _labels" 585 ) 586 results = await self._db.query(cypher, {"vids": vids}) 587 588 by_source: dict[int, list[Any]] = {} 589 for row in results: 590 src_vid = row["src_vid"] 591 node_data = _row_to_node_dict(row) 592 if node_data is None: 593 continue 594 if src_vid not in by_source: 595 by_source[src_vid] = [] 596 by_source[src_vid].append(node_data) 597 598 for entity in entities: 599 if entity._vid in by_source: 600 related = by_source[entity._vid] 601 cache_attr = f"_rel_cache_{rel_name}" 602 setattr(entity, cache_attr, related)
Async session for interacting with the graph database.
Mirrors UniSession with async methods. Uses AsyncDatabase.
Example:
from uni_db import AsyncDatabase from uni_pydantic import AsyncUniSession
db = await AsyncDatabase.open("./my_graph") async with AsyncUniSession(db) as session: ... session.register(Person) ... await session.sync_schema() ... alice = Person(name="Alice", age=30) ... session.add(alice) ... await session.commit()
153 def __init__(self, db: uni_db.AsyncDatabase) -> None: 154 self._db = db 155 self._schema_gen = SchemaGenerator() 156 self._identity_map: WeakValueDictionary[tuple[str, int], UniNode] = ( 157 WeakValueDictionary() 158 ) 159 self._pending_new: list[UniNode] = [] 160 self._pending_delete: list[UniNode] = []
173 def close(self) -> None: 174 """Close the session and clear pending state.""" 175 self._pending_new.clear() 176 self._pending_delete.clear()
Close the session and clear pending state.
178 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 179 """Register model classes with the session (sync).""" 180 self._schema_gen.register(*models)
Register model classes with the session (sync).
182 async def sync_schema(self) -> None: 183 """Synchronize database schema with registered models.""" 184 await self._schema_gen.async_apply_to_database(self._db)
Synchronize database schema with registered models.
186 def query(self, model: type[NodeT]) -> AsyncQueryBuilder[NodeT]: 187 """Create an async query builder for the given model.""" 188 return AsyncQueryBuilder(self, model)
Create an async query builder for the given model.
190 def add(self, entity: UniNode) -> None: 191 """Add a new entity to be persisted (sync — just collects).""" 192 if entity.is_persisted: 193 raise SessionError(f"Entity {entity!r} is already persisted") 194 entity._session = self 195 self._pending_new.append(entity)
Add a new entity to be persisted (sync — just collects).
197 def add_all(self, entities: Sequence[UniNode]) -> None: 198 """Add multiple entities (sync — just collects).""" 199 for entity in entities: 200 self.add(entity)
Add multiple entities (sync — just collects).
202 def delete(self, entity: UniNode) -> None: 203 """Mark an entity for deletion (sync — just collects).""" 204 if not entity.is_persisted: 205 raise NotPersisted(entity) 206 self._pending_delete.append(entity)
Mark an entity for deletion (sync — just collects).
208 async def get( 209 self, 210 model: type[NodeT], 211 vid: int | None = None, 212 uid: str | None = None, 213 **kwargs: Any, 214 ) -> NodeT | None: 215 """Get an entity by ID or unique properties.""" 216 if vid is not None: 217 cached = self._identity_map.get((model.__label__, vid)) 218 if cached is not None: 219 return cached # type: ignore[return-value] 220 221 label = model.__label__ 222 params: dict[str, Any] = {} 223 224 if vid is not None: 225 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 226 params["vid"] = vid 227 elif uid is not None: 228 cypher = f"MATCH (n:{label}) WHERE n._uid = $uid RETURN {_NODE_RETURN}" 229 params["uid"] = uid 230 elif kwargs: 231 for k in kwargs: 232 _validate_property(k, model) 233 conditions = [f"n.{k} = ${k}" for k in kwargs] 234 cypher = f"MATCH (n:{label}) WHERE {' AND '.join(conditions)} RETURN {_NODE_RETURN} LIMIT 1" 235 params.update(kwargs) 236 else: 237 raise ValueError("Must provide vid, uid, or property filters") 238 239 results = await self._db.query(cypher, params) 240 if not results: 241 return None 242 243 node_data = _row_to_node_dict(results[0]) 244 if node_data is None: 245 return None 246 return self._result_to_model(node_data, model)
Get an entity by ID or unique properties.
248 async def refresh(self, entity: UniNode) -> None: 249 """Refresh an entity's properties from the database.""" 250 if not entity.is_persisted: 251 raise NotPersisted(entity) 252 253 label = entity.__class__.__label__ 254 cypher = f"MATCH (n:{label}) WHERE id(n) = $vid RETURN {_NODE_RETURN}" 255 results = await self._db.query(cypher, {"vid": entity._vid}) 256 257 if not results: 258 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 259 260 props = _row_to_node_dict(results[0]) 261 if props is None: 262 raise SessionError(f"Entity with vid={entity._vid} no longer exists") 263 try: 264 hints = get_type_hints(type(entity)) 265 except Exception: 266 hints = {} 267 268 for field_name in entity.get_property_fields(): 269 if field_name in props: 270 value = props[field_name] 271 if field_name in hints: 272 value = db_to_python_value(value, hints[field_name]) 273 setattr(entity, field_name, value) 274 275 entity._mark_clean()
Refresh an entity's properties from the database.
277 async def commit(self) -> None: 278 """Commit all pending changes.""" 279 for entity in self._pending_new: 280 await self._create_node(entity) 281 282 for (label, vid), entity in list(self._identity_map.items()): 283 if entity.is_dirty and entity.is_persisted: 284 await self._update_node(entity) 285 286 for entity in self._pending_delete: 287 await self._delete_node(entity) 288 289 await self._db.flush() 290 self._pending_new.clear() 291 self._pending_delete.clear()
Commit all pending changes.
293 async def rollback(self) -> None: 294 """Discard all pending changes.""" 295 for entity in self._pending_new: 296 entity._session = None 297 self._pending_new.clear() 298 self._pending_delete.clear() 299 for entity in list(self._identity_map.values()): 300 if entity.is_dirty: 301 await self.refresh(entity)
Discard all pending changes.
303 async def transaction(self) -> AsyncUniTransaction: 304 """Create an async transaction. Use as `async with session.transaction() as tx:`.""" 305 return AsyncUniTransaction(self)
Create an async transaction. Use as async with session.transaction() as tx:.
307 async def cypher( 308 self, 309 query: str, 310 params: dict[str, Any] | None = None, 311 result_type: type[NodeT] | None = None, 312 ) -> list[NodeT] | list[dict[str, Any]]: 313 """Execute a raw Cypher query.""" 314 results = await self._db.query(query, params) 315 316 if result_type is None: 317 return cast(list[dict[str, Any]], results) 318 319 mapped = [] 320 for row in results: 321 for key, value in row.items(): 322 if isinstance(value, dict): 323 if "_id" in value and "_label" in value: 324 instance = self._result_to_model(value, result_type) 325 if instance: 326 mapped.append(instance) 327 break 328 elif "_label" in value: 329 label = value["_label"] 330 if label in self._schema_gen._node_models: 331 model = self._schema_gen._node_models[label] 332 instance = self._result_to_model(value, model) 333 if instance: 334 mapped.append(instance) 335 break 336 else: 337 first_value = next(iter(row.values()), None) 338 if isinstance(first_value, dict): 339 instance = self._result_to_model(first_value, result_type) 340 if instance: 341 mapped.append(instance) 342 343 return mapped
Execute a raw Cypher query.
345 async def create_edge( 346 self, 347 source: UniNode, 348 edge_type: str, 349 target: UniNode, 350 properties: dict[str, Any] | UniEdge | None = None, 351 ) -> None: 352 """Create an edge between two nodes.""" 353 src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints( 354 source, target 355 ) 356 props = UniSession._normalize_edge_properties(properties) 357 358 props_str = ", ".join(f"{k}: ${k}" for k in props) 359 if props_str: 360 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type} {{{props_str}}}]->(b)" 361 else: 362 cypher = f"MATCH (a:{src_label}), (b:{dst_label}) WHERE a._vid = $src AND b._vid = $dst CREATE (a)-[r:{edge_type}]->(b)" 363 364 await self._db.query(cypher, {"src": src_vid, "dst": dst_vid, **props})
Create an edge between two nodes.
366 async def delete_edge( 367 self, source: UniNode, edge_type: str, target: UniNode 368 ) -> int: 369 """Delete edges between two nodes. Returns the number of deleted edges.""" 370 src_vid, dst_vid, src_label, dst_label = UniSession._validate_edge_endpoints( 371 source, target 372 ) 373 cypher = ( 374 f"MATCH (a:{src_label})-[r:{edge_type}]->(b:{dst_label}) " 375 f"WHERE a._vid = $src AND b._vid = $dst " 376 f"DELETE r RETURN count(r) as count" 377 ) 378 results = await self._db.query(cypher, {"src": src_vid, "dst": dst_vid}) 379 return cast(int, results[0]["count"]) if results else 0
Delete edges between two nodes. Returns the number of deleted edges.
381 async def bulk_add(self, entities: Sequence[UniNode]) -> list[int]: 382 """Bulk-add entities using bulk_insert_vertices.""" 383 if not entities: 384 return [] 385 386 by_label: dict[str, list[UniNode]] = {} 387 for entity in entities: 388 label = entity.__class__.__label__ 389 if label not in by_label: 390 by_label[label] = [] 391 by_label[label].append(entity) 392 393 all_vids: list[int] = [] 394 try: 395 for label, group in by_label.items(): 396 for entity in group: 397 run_hooks(entity, _BEFORE_CREATE) 398 prop_dicts = [e.to_properties() for e in group] 399 vids = await self._db.bulk_insert_vertices(label, prop_dicts) 400 for entity, vid in zip(group, vids): 401 entity._attach_session(self, vid) 402 self._identity_map[(label, vid)] = entity 403 run_hooks(entity, _AFTER_CREATE) 404 entity._mark_clean() 405 all_vids.extend(vids) 406 except Exception as e: 407 raise BulkLoadError(f"Bulk insert failed: {e}") from e 408 409 return all_vids
Bulk-add entities using bulk_insert_vertices.
411 async def explain(self, cypher: str) -> dict[str, Any]: 412 """Get the query execution plan.""" 413 return await self._db.explain(cypher)
Get the query execution plan.
415 async def profile(self, cypher: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: 416 """Run the query with profiling.""" 417 return await self._db.profile(cypher)
Run the query with profiling.
54class AsyncUniTransaction: 55 """Async transaction context for atomic operations.""" 56 57 def __init__(self, session: AsyncUniSession) -> None: 58 self._session = session 59 self._tx: uni_db.AsyncTransaction | None = None 60 self._pending_nodes: list[UniNode] = [] 61 self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = [] 62 self._committed = False 63 self._rolled_back = False 64 65 async def __aenter__(self) -> AsyncUniTransaction: 66 self._tx = await self._session._db.begin() 67 return self 68 69 async def __aexit__( 70 self, 71 exc_type: type[BaseException] | None, 72 exc_val: BaseException | None, 73 exc_tb: TracebackType | None, 74 ) -> None: 75 if exc_type is not None: 76 await self.rollback() 77 return 78 if not self._committed and not self._rolled_back: 79 await self.commit() 80 81 def add(self, entity: UniNode) -> None: 82 """Add a node to be created in this transaction (sync — just collects).""" 83 self._pending_nodes.append(entity) 84 85 def create_edge( 86 self, 87 source: UniNode, 88 edge_type: str, 89 target: UniNode, 90 properties: UniEdge | None = None, 91 ) -> None: 92 """Create an edge between two nodes in this transaction (sync — just collects).""" 93 if not source.is_persisted: 94 raise NotPersisted(source) 95 if not target.is_persisted: 96 raise NotPersisted(target) 97 self._pending_edges.append((source, edge_type, target, properties)) 98 99 async def commit(self) -> None: 100 """Commit the transaction.""" 101 if self._committed: 102 raise TransactionError("Transaction already committed") 103 if self._rolled_back: 104 raise TransactionError("Transaction already rolled back") 105 if self._tx is None: 106 raise TransactionError("Transaction not started") 107 108 try: 109 for node in self._pending_nodes: 110 await self._session._create_node_in_tx(node, self._tx) 111 for source, edge_type, target, props in self._pending_edges: 112 await self._session._create_edge_in_tx( 113 source, edge_type, target, props, self._tx 114 ) 115 await self._tx.commit() 116 self._committed = True 117 for node in self._pending_nodes: 118 node._mark_clean() 119 except Exception as e: 120 await self.rollback() 121 raise TransactionError(f"Commit failed: {e}") from e 122 123 async def rollback(self) -> None: 124 """Rollback the transaction.""" 125 if self._rolled_back: 126 return 127 if self._tx is not None: 128 await self._tx.rollback() 129 self._rolled_back = True 130 self._pending_nodes.clear() 131 self._pending_edges.clear()
Async transaction context for atomic operations.
57 def __init__(self, session: AsyncUniSession) -> None: 58 self._session = session 59 self._tx: uni_db.AsyncTransaction | None = None 60 self._pending_nodes: list[UniNode] = [] 61 self._pending_edges: list[tuple[UniNode, str, UniNode, UniEdge | None]] = [] 62 self._committed = False 63 self._rolled_back = False
81 def add(self, entity: UniNode) -> None: 82 """Add a node to be created in this transaction (sync — just collects).""" 83 self._pending_nodes.append(entity)
Add a node to be created in this transaction (sync — just collects).
85 def create_edge( 86 self, 87 source: UniNode, 88 edge_type: str, 89 target: UniNode, 90 properties: UniEdge | None = None, 91 ) -> None: 92 """Create an edge between two nodes in this transaction (sync — just collects).""" 93 if not source.is_persisted: 94 raise NotPersisted(source) 95 if not target.is_persisted: 96 raise NotPersisted(target) 97 self._pending_edges.append((source, edge_type, target, properties))
Create an edge between two nodes in this transaction (sync — just collects).
99 async def commit(self) -> None: 100 """Commit the transaction.""" 101 if self._committed: 102 raise TransactionError("Transaction already committed") 103 if self._rolled_back: 104 raise TransactionError("Transaction already rolled back") 105 if self._tx is None: 106 raise TransactionError("Transaction not started") 107 108 try: 109 for node in self._pending_nodes: 110 await self._session._create_node_in_tx(node, self._tx) 111 for source, edge_type, target, props in self._pending_edges: 112 await self._session._create_edge_in_tx( 113 source, edge_type, target, props, self._tx 114 ) 115 await self._tx.commit() 116 self._committed = True 117 for node in self._pending_nodes: 118 node._mark_clean() 119 except Exception as e: 120 await self.rollback() 121 raise TransactionError(f"Commit failed: {e}") from e
Commit the transaction.
123 async def rollback(self) -> None: 124 """Rollback the transaction.""" 125 if self._rolled_back: 126 return 127 if self._tx is not None: 128 await self._tx.rollback() 129 self._rolled_back = True 130 self._pending_nodes.clear() 131 self._pending_edges.clear()
Rollback the transaction.
69def Field( 70 default: Any = ..., 71 *, 72 default_factory: Callable[[], Any] | None = None, 73 alias: str | None = None, 74 title: str | None = None, 75 description: str | None = None, 76 examples: list[Any] | None = None, 77 exclude: bool = False, 78 json_schema_extra: dict[str, Any] | None = None, 79 # Uni-specific options 80 index: IndexType | None = None, 81 unique: bool = False, 82 tokenizer: str | None = None, 83 metric: VectorMetric | None = None, 84 generated: str | None = None, 85) -> Any: 86 """ 87 Create a field with uni-pydantic configuration. 88 89 This extends Pydantic's Field with graph database options. 90 91 Args: 92 default: Default value for the field. 93 default_factory: Factory function for default value. 94 alias: Field alias for serialization. 95 title: Human-readable title. 96 description: Field description. 97 examples: Example values. 98 exclude: Exclude from serialization. 99 json_schema_extra: Extra JSON schema properties. 100 index: Index type ("btree", "hash", "fulltext", "vector"). 101 unique: Whether to create a unique constraint. 102 tokenizer: Tokenizer for fulltext index (default: "standard"). 103 metric: Distance metric for vector index ("l2", "cosine", "dot"). 104 generated: Expression for generated/computed property. 105 106 Returns: 107 A Pydantic FieldInfo with uni-pydantic metadata attached. 108 109 Examples: 110 >>> class Person(UniNode): 111 ... name: str = Field(index="btree") 112 ... email: str = Field(unique=True) 113 ... bio: str = Field(index="fulltext", tokenizer="standard") 114 ... embedding: Vector[768] = Field(metric="cosine") 115 """ 116 # Default tokenizer for fulltext indexes 117 if index == "fulltext" and tokenizer is None: 118 tokenizer = "standard" 119 120 # Store uni config in json_schema_extra 121 uni_config = FieldConfig( 122 index=index, 123 unique=unique, 124 tokenizer=tokenizer, 125 metric=metric, 126 generated=generated, 127 default=default, 128 default_factory=default_factory, 129 alias=alias, 130 title=title, 131 description=description, 132 examples=examples, 133 exclude=exclude, 134 json_schema_extra=json_schema_extra, 135 ) 136 137 # Merge uni config into json_schema_extra 138 extra = json_schema_extra or {} 139 extra["uni_config"] = uni_config 140 141 # Create Pydantic FieldInfo 142 from pydantic.fields import FieldInfo as PydanticFieldInfo 143 144 if default_factory is not None: 145 return PydanticFieldInfo( 146 default_factory=default_factory, 147 alias=alias, 148 title=title, 149 description=description, 150 examples=examples, 151 exclude=exclude, 152 json_schema_extra=extra, 153 ) 154 elif default is not ...: 155 return PydanticFieldInfo( 156 default=default, 157 alias=alias, 158 title=title, 159 description=description, 160 examples=examples, 161 exclude=exclude, 162 json_schema_extra=extra, 163 ) 164 else: 165 return PydanticFieldInfo( 166 alias=alias, 167 title=title, 168 description=description, 169 examples=examples, 170 exclude=exclude, 171 json_schema_extra=extra, 172 )
Create a field with uni-pydantic configuration.
This extends Pydantic's Field with graph database options.
Args: default: Default value for the field. default_factory: Factory function for default value. alias: Field alias for serialization. title: Human-readable title. description: Field description. examples: Example values. exclude: Exclude from serialization. json_schema_extra: Extra JSON schema properties. index: Index type ("btree", "hash", "fulltext", "vector"). unique: Whether to create a unique constraint. tokenizer: Tokenizer for fulltext index (default: "standard"). metric: Distance metric for vector index ("l2", "cosine", "dot"). generated: Expression for generated/computed property.
Returns: A Pydantic FieldInfo with uni-pydantic metadata attached.
Examples:
class Person(UniNode): ... name: str = Field(index="btree") ... email: str = Field(unique=True) ... bio: str = Field(index="fulltext", tokenizer="standard") ... embedding: Vector[768] = Field(metric="cosine")
41@dataclass 42class FieldConfig: 43 """Configuration for a uni-pydantic field.""" 44 45 # Index configuration 46 index: IndexType | None = None 47 unique: bool = False 48 49 # Fulltext index options 50 tokenizer: str | None = None 51 52 # Vector index options 53 metric: VectorMetric | None = None 54 55 # Generated/computed property 56 generated: str | None = None 57 58 # Pydantic field options (passed through) 59 default: Any = dataclass_field(default_factory=lambda: ...) 60 default_factory: Callable[[], Any] | None = None 61 alias: str | None = None 62 title: str | None = None 63 description: str | None = None 64 examples: list[Any] | None = None 65 exclude: bool = False 66 json_schema_extra: dict[str, Any] | None = None
Configuration for a uni-pydantic field.
272def Relationship( 273 edge_type: str, 274 *, 275 direction: Direction = "outgoing", 276 edge_model: type[UniEdge] | None = None, 277 eager: bool = False, 278 cascade_delete: bool = False, 279) -> Any: 280 """ 281 Declare a relationship to another node type. 282 283 Relationships are lazy-loaded by default. Use eager=True or 284 query.eager_load() to load them with the parent query. 285 286 Args: 287 edge_type: The edge type name (e.g., "FRIEND_OF", "WORKS_AT"). 288 direction: Relationship direction: 289 - "outgoing": Follow edges from this node (default) 290 - "incoming": Follow edges to this node 291 - "both": Follow edges in both directions 292 edge_model: Optional UniEdge subclass for typed edge properties. 293 eager: Whether to eager-load this relationship by default. 294 cascade_delete: Whether to delete related edges when this node is deleted. 295 296 Returns: 297 A RelationshipDescriptor that will be processed during model creation. 298 299 Examples: 300 >>> class Person(UniNode): 301 ... # Outgoing relationship (default) 302 ... follows: list["Person"] = Relationship("FOLLOWS") 303 ... 304 ... # Incoming relationship 305 ... followers: list["Person"] = Relationship("FOLLOWS", direction="incoming") 306 ... 307 ... # Single optional relationship 308 ... manager: "Person | None" = Relationship("REPORTS_TO") 309 ... 310 ... # Relationship with edge properties 311 ... friendships: list[tuple["Person", FriendshipEdge]] = Relationship( 312 ... "FRIEND_OF", 313 ... edge_model=FriendshipEdge 314 ... ) 315 """ 316 config = RelationshipConfig( 317 edge_type=edge_type, 318 direction=direction, 319 edge_model=edge_model, 320 eager=eager, 321 cascade_delete=cascade_delete, 322 ) 323 # Return a marker that will be processed by the metaclass 324 return _RelationshipMarker(config)
Declare a relationship to another node type.
Relationships are lazy-loaded by default. Use eager=True or query.eager_load() to load them with the parent query.
Args: edge_type: The edge type name (e.g., "FRIEND_OF", "WORKS_AT"). direction: Relationship direction: - "outgoing": Follow edges from this node (default) - "incoming": Follow edges to this node - "both": Follow edges in both directions edge_model: Optional UniEdge subclass for typed edge properties. eager: Whether to eager-load this relationship by default. cascade_delete: Whether to delete related edges when this node is deleted.
Returns: A RelationshipDescriptor that will be processed during model creation.
Examples:
class Person(UniNode): ... # Outgoing relationship (default) ... follows: list["Person"] = Relationship("FOLLOWS") ... ... # Incoming relationship ... followers: list["Person"] = Relationship("FOLLOWS", direction="incoming") ... ... # Single optional relationship ... manager: "Person | None" = Relationship("REPORTS_TO") ... ... # Relationship with edge properties ... friendships: list[tuple["Person", FriendshipEdge]] = Relationship( ... "FRIEND_OF", ... edge_model=FriendshipEdge ... )
185@dataclass 186class RelationshipConfig: 187 """Configuration for a relationship field.""" 188 189 edge_type: str 190 direction: Direction = "outgoing" 191 edge_model: type[UniEdge] | None = None 192 eager: bool = False 193 cascade_delete: bool = False
Configuration for a relationship field.
196class RelationshipDescriptor(Generic[NodeT]): 197 """ 198 Descriptor for relationship fields that enables lazy loading. 199 200 When accessed on an instance, it returns the related nodes. 201 When accessed on the class, it returns the descriptor for query building. 202 """ 203 204 def __init__( 205 self, 206 config: RelationshipConfig, 207 field_name: str, 208 target_type: type[NodeT] | str | None = None, 209 is_list: bool = True, 210 ) -> None: 211 self.config = config 212 self.field_name = field_name 213 self.target_type = target_type 214 self.is_list = is_list 215 self._cache_attr = f"_rel_cache_{field_name}" 216 217 def __set_name__(self, owner: type, name: str) -> None: 218 self.field_name = name 219 self._cache_attr = f"_rel_cache_{name}" 220 221 @overload 222 def __get__( 223 self, obj: None, objtype: type[NodeT] 224 ) -> RelationshipDescriptor[NodeT]: ... 225 226 @overload 227 def __get__( 228 self, obj: NodeT, objtype: type[NodeT] | None = None 229 ) -> list[NodeT] | NodeT | None: ... 230 231 def __get__( 232 self, obj: NodeT | None, objtype: type[NodeT] | None = None 233 ) -> RelationshipDescriptor[NodeT] | list[NodeT] | NodeT | None: 234 if obj is None: 235 # Class-level access returns the descriptor 236 return self 237 238 # Instance-level access - check cache first 239 if hasattr(obj, self._cache_attr): 240 cached = getattr(obj, self._cache_attr) 241 return cast("list[NodeT] | NodeT | None", cached) 242 243 # Check if we have a session for lazy loading 244 session = getattr(obj, "_session", None) 245 if session is None: 246 from .exceptions import LazyLoadError 247 248 raise LazyLoadError( 249 self.field_name, 250 "No session attached. Use session.get() or enable eager loading.", 251 ) 252 253 # Lazy load the relationship 254 result = session._load_relationship(obj, self) 255 256 # Cache the result 257 setattr(obj, self._cache_attr, result) 258 return cast("list[NodeT] | NodeT | None", result) 259 260 def __set__(self, obj: NodeT, value: list[NodeT] | NodeT | None) -> None: 261 # Allow setting the cached value (e.g., during eager loading) 262 setattr(obj, self._cache_attr, value) 263 264 def __repr__(self) -> str: 265 return f"Relationship({self.config.edge_type!r}, direction={self.config.direction!r})"
Descriptor for relationship fields that enables lazy loading.
When accessed on an instance, it returns the related nodes. When accessed on the class, it returns the descriptor for query building.
204 def __init__( 205 self, 206 config: RelationshipConfig, 207 field_name: str, 208 target_type: type[NodeT] | str | None = None, 209 is_list: bool = True, 210 ) -> None: 211 self.config = config 212 self.field_name = field_name 213 self.target_type = target_type 214 self.is_list = is_list 215 self._cache_attr = f"_rel_cache_{field_name}"
175def get_field_config(field_info: FieldInfo) -> FieldConfig | None: 176 """Extract uni-pydantic config from a Pydantic FieldInfo.""" 177 extra = field_info.json_schema_extra 178 if isinstance(extra, dict): 179 config = extra.get("uni_config") 180 if isinstance(config, FieldConfig): 181 return config 182 return None
Extract uni-pydantic config from a Pydantic FieldInfo.
54class Vector(Generic[N], metaclass=VectorMeta): 55 """ 56 A vector type with fixed dimensions for embeddings. 57 58 Usage: 59 embedding: Vector[1536] # 1536-dimensional vector 60 61 At runtime, vectors are stored as list[float]. 62 """ 63 64 __dimensions__: int = 0 65 __origin__: type | None = None 66 67 def __init__(self, values: list[float]) -> None: 68 expected = self.__class__.__dimensions__ 69 if expected > 0 and len(values) != expected: 70 raise ValueError(f"Vector expects {expected} dimensions, got {len(values)}") 71 self._values = values 72 73 @property 74 def values(self) -> list[float]: 75 return self._values 76 77 def __repr__(self) -> str: 78 dims = self.__class__.__dimensions__ 79 return ( 80 f"Vector[{dims}]({self._values[:3]}...)" 81 if len(self._values) > 3 82 else f"Vector[{dims}]({self._values})" 83 ) 84 85 def __eq__(self, other: object) -> bool: 86 if isinstance(other, Vector): 87 return self._values == other._values 88 if isinstance(other, list): 89 return self._values == other 90 return False 91 92 def __len__(self) -> int: 93 return len(self._values) 94 95 def __iter__(self): # type: ignore[no-untyped-def] 96 return iter(self._values) 97 98 @classmethod 99 def __get_pydantic_core_schema__( 100 cls, source_type: Any, handler: GetCoreSchemaHandler 101 ) -> CoreSchema: 102 """Make Vector compatible with Pydantic v2.""" 103 dimensions = getattr(source_type, "__dimensions__", 0) 104 vec_cls = source_type if dimensions > 0 else cls 105 106 def validate_vector(v: Any) -> Vector: # type: ignore[type-arg] 107 if isinstance(v, Vector): 108 if dimensions > 0 and len(v) != dimensions: 109 raise ValueError( 110 f"Vector expects {dimensions} dimensions, got {len(v)}" 111 ) 112 return v 113 if isinstance(v, list): 114 if dimensions > 0 and len(v) != dimensions: 115 raise ValueError( 116 f"Vector expects {dimensions} dimensions, got {len(v)}" 117 ) 118 return vec_cls([float(x) for x in v]) 119 raise TypeError(f"Expected list or Vector, got {type(v)}") 120 121 return core_schema.no_info_plain_validator_function( 122 validate_vector, 123 serialization=core_schema.plain_serializer_function_ser_schema( 124 lambda v: v.values if isinstance(v, Vector) else list(v), 125 info_arg=False, 126 ), 127 )
A vector type with fixed dimensions for embeddings.
Usage: embedding: Vector[1536] # 1536-dimensional vector
At runtime, vectors are stored as list[float].
305def python_type_to_uni(type_hint: Any, *, nullable: bool = False) -> tuple[str, bool]: 306 """ 307 Convert a Python type hint to a Uni DataType string. 308 309 Args: 310 type_hint: The Python type hint to convert. 311 nullable: Whether the field is explicitly nullable. 312 313 Returns: 314 Tuple of (uni_data_type, is_nullable) 315 316 Raises: 317 TypeMappingError: If the type cannot be mapped. 318 """ 319 # Unwrap Annotated if present 320 type_hint, _ = unwrap_annotated(type_hint) 321 322 # Check for optional (T | None) 323 is_opt, inner_type = is_optional(type_hint) 324 if is_opt: 325 uni_type, _ = python_type_to_uni(inner_type) 326 return uni_type, True 327 328 # Check for Vector types 329 dims = get_vector_dimensions(type_hint) 330 if dims is not None: 331 return f"vector:{dims}", nullable 332 333 # Check for list types 334 is_lst, elem_type = is_list_type(type_hint) 335 if is_lst: 336 if elem_type in (str, int, float, bool): 337 # Simple list types 338 elem_uni = TYPE_MAP.get(elem_type, "string") 339 return f"list:{elem_uni}", nullable 340 # Complex list types stored as JSON 341 return "json", nullable 342 343 # Direct type mapping 344 if type_hint in TYPE_MAP: 345 return TYPE_MAP[type_hint], nullable 346 347 # Handle generic dict types 348 origin = get_origin(type_hint) 349 if origin is dict: 350 return "json", nullable 351 352 # Handle forward references (strings) 353 if isinstance(type_hint, str): 354 # This is a forward reference, can't resolve here 355 raise TypeMappingError( 356 type_hint, 357 f"Cannot resolve forward reference {type_hint!r}. " 358 "Ensure the referenced class is defined before schema sync.", 359 ) 360 361 raise TypeMappingError(type_hint)
Convert a Python type hint to a Uni DataType string.
Args: type_hint: The Python type hint to convert. nullable: Whether the field is explicitly nullable.
Returns: Tuple of (uni_data_type, is_nullable)
Raises: TypeMappingError: If the type cannot be mapped.
364def uni_to_python_type(uni_type: str) -> type: 365 """ 366 Convert a Uni DataType string to a Python type. 367 368 Args: 369 uni_type: The Uni data type string. 370 371 Returns: 372 The corresponding Python type. 373 """ 374 # Reverse mapping — manually constructed to avoid bytes overwriting str for "string" 375 _REVERSE_MAP: dict[str, type] = { 376 "string": str, 377 "int64": int, 378 "float64": float, 379 "bool": bool, 380 "datetime": datetime, 381 "date": date, 382 "time": time, 383 "duration": timedelta, 384 "json": dict, 385 } 386 387 # Handle vector types 388 if uni_type.startswith("vector:"): 389 return list # Vectors are stored as list[float] 390 391 # Handle list types 392 if uni_type.startswith("list:"): 393 return list 394 395 return _REVERSE_MAP.get(uni_type.lower(), str)
Convert a Uni DataType string to a Python type.
Args: uni_type: The Uni data type string.
Returns: The corresponding Python type.
130def get_vector_dimensions(type_hint: Any) -> int | None: 131 """Extract vector dimensions from a Vector[N] type hint.""" 132 if hasattr(type_hint, "__dimensions__"): 133 dims: int = type_hint.__dimensions__ 134 return dims 135 origin = get_origin(type_hint) 136 if origin is Vector: 137 args = get_args(type_hint) 138 if args and isinstance(args[0], int): 139 return args[0] 140 return None
Extract vector dimensions from a Vector[N] type hint.
143def is_optional(type_hint: Any) -> tuple[bool, Any]: 144 """ 145 Check if a type hint is Optional (T | None). 146 147 Returns: 148 Tuple of (is_optional, inner_type) 149 """ 150 origin = get_origin(type_hint) 151 152 # Handle Union types (including T | None which is Union[T, None]) 153 if origin is Union: 154 args = get_args(type_hint) 155 non_none_args = [arg for arg in args if arg is not type(None)] 156 if len(non_none_args) == 1 and type(None) in args: 157 return True, non_none_args[0] 158 159 # Python 3.10+ uses types.UnionType for X | Y syntax 160 if isinstance(type_hint, types.UnionType): 161 args = get_args(type_hint) 162 non_none_args = [arg for arg in args if arg is not type(None)] 163 if len(non_none_args) == 1 and type(None) in args: 164 return True, non_none_args[0] 165 166 return False, type_hint
Check if a type hint is Optional (T | None).
Returns: Tuple of (is_optional, inner_type)
169def is_list_type(type_hint: Any) -> tuple[bool, Any | None]: 170 """ 171 Check if a type hint is a list type. 172 173 Returns: 174 Tuple of (is_list, element_type) 175 """ 176 origin = get_origin(type_hint) 177 if origin is list: 178 args = get_args(type_hint) 179 return True, args[0] if args else Any 180 return False, None
Check if a type hint is a list type.
Returns: Tuple of (is_list, element_type)
183def unwrap_annotated(type_hint: Any) -> tuple[Any, tuple[Any, ...]]: 184 """ 185 Unwrap an Annotated type. 186 187 Returns: 188 Tuple of (base_type, metadata_tuple) 189 """ 190 origin = get_origin(type_hint) 191 if origin is Annotated: 192 args = get_args(type_hint) 193 return args[0], args[1:] 194 return type_hint, ()
Unwrap an Annotated type.
Returns: Tuple of (base_type, metadata_tuple)
201def python_to_db_value(value: Any, type_hint: Any) -> Any: 202 """Convert a Python value to a database-compatible value. 203 204 Handles datetime→micros, date→days, time→micros, timedelta→micros, 205 Vector→list[float], and passes through everything else. 206 """ 207 if value is None: 208 return None 209 210 # Unwrap Optional 211 _, inner = is_optional(type_hint) 212 if inner is not type_hint: 213 type_hint = inner 214 215 # Unwrap Annotated 216 type_hint, _ = unwrap_annotated(type_hint) 217 218 if isinstance(value, datetime): 219 return int(value.timestamp() * 1_000_000) 220 if isinstance(value, date): 221 epoch = date(1970, 1, 1) 222 return (value - epoch).days 223 if isinstance(value, time): 224 return ( 225 value.hour * 3_600_000_000 226 + value.minute * 60_000_000 227 + value.second * 1_000_000 228 + value.microsecond 229 ) 230 if isinstance(value, timedelta): 231 return int(value.total_seconds() * 1_000_000) 232 233 # Vector → list[float] 234 if isinstance(value, Vector): 235 return value.values 236 237 return value
Convert a Python value to a database-compatible value.
Handles datetime→micros, date→days, time→micros, timedelta→micros, Vector→list[float], and passes through everything else.
240def db_to_python_value(value: Any, type_hint: Any) -> Any: 241 """Convert a database value back to a Python value. 242 243 Converts int back to datetime/date/time/timedelta based on the model 244 field's type annotation. 245 """ 246 if value is None: 247 return None 248 249 # Unwrap Optional 250 _, inner = is_optional(type_hint) 251 if inner is not type_hint: 252 type_hint = inner 253 254 # Unwrap Annotated 255 type_hint, _ = unwrap_annotated(type_hint) 256 257 if type_hint is datetime and isinstance(value, (int, float)): 258 # Microseconds since epoch → datetime 259 return datetime.fromtimestamp(value / 1_000_000) 260 elif type_hint is date and isinstance(value, (int, float)): 261 # Days since epoch → date 262 from datetime import date as date_cls 263 from datetime import timedelta as td_cls 264 265 epoch = date_cls(1970, 1, 1) 266 return epoch + td_cls(days=int(value)) 267 elif type_hint is time and isinstance(value, (int, float)): 268 # Microseconds since midnight → time 269 total_us = int(value) 270 hours = total_us // 3_600_000_000 271 total_us %= 3_600_000_000 272 minutes = total_us // 60_000_000 273 total_us %= 60_000_000 274 seconds = total_us // 1_000_000 275 microseconds = total_us % 1_000_000 276 return time(hours, minutes, seconds, microseconds) 277 elif type_hint is timedelta and isinstance(value, (int, float)): 278 # Microseconds → timedelta 279 return timedelta(microseconds=int(value)) 280 281 # Vector fields: list[float] → Vector 282 dims = get_vector_dimensions(type_hint) 283 if dims is not None and isinstance(value, list): 284 vec_cls = Vector[dims] 285 return vec_cls(value) 286 287 return value
Convert a database value back to a Python value.
Converts int back to datetime/date/time/timedelta based on the model field's type annotation.
591class QueryBuilder(_QueryBuilderBase[NodeT]): 592 """ 593 Immutable, type-safe query builder for graph queries. 594 595 Each method returns a **new** QueryBuilder instance. The original is 596 never mutated. Provides a fluent API for building Cypher queries 597 with type checking and IDE autocomplete support. 598 599 Example: 600 >>> adults = ( 601 ... session.query(Person) 602 ... .filter(Person.age >= 18) 603 ... .order_by(Person.name) 604 ... .limit(10) 605 ... .all() 606 ... ) 607 """ 608 609 def __init__(self, session: UniSession, model: type[NodeT]) -> None: 610 self._init_state(session, model) 611 612 def _execute_query( 613 self, cypher: str, params: dict[str, Any] 614 ) -> list[dict[str, Any]]: 615 """Execute a query, using query_with if timeout/max_memory is set.""" 616 if self._timeout is not None or self._max_memory is not None: 617 builder = self._session._db.query_with(cypher) 618 if params: 619 builder = builder.params(params) 620 if self._timeout is not None: 621 builder = builder.timeout(self._timeout) 622 if self._max_memory is not None: 623 builder = builder.max_memory(self._max_memory) 624 return builder.fetch_all() 625 return self._session._db.query(cypher, params) 626 627 def all(self) -> list[NodeT]: 628 """Execute the query and return all results.""" 629 cypher, params = self._build_cypher() 630 results = self._execute_query(cypher, params) 631 instances = self._rows_to_instances(results) 632 if self._eager_load and instances: 633 self._session._eager_load_relationships(instances, self._eager_load) 634 return instances 635 636 def first(self) -> NodeT | None: 637 """Execute the query and return the first result.""" 638 clone = self._clone() 639 clone._limit = 1 640 results = clone.all() 641 return results[0] if results else None 642 643 def one(self) -> NodeT: 644 """Execute the query and return exactly one result. 645 646 Raises QueryError if no results or more than one result. 647 """ 648 clone = self._clone() 649 clone._limit = 2 650 results = clone.all() 651 if not results: 652 raise QueryError("Query returned no results") 653 if len(results) > 1: 654 raise QueryError("Query returned more than one result") 655 return results[0] 656 657 def count(self) -> int: 658 """Execute the query and return the count of results.""" 659 cypher, params = self._build_count_cypher() 660 results = self._execute_query(cypher, params) 661 return cast(int, results[0]["count"]) if results else 0 662 663 def exists(self) -> bool: 664 """Check if any matching records exist.""" 665 cypher, params = self._build_exists_cypher() 666 results = self._execute_query(cypher, params) 667 return len(results) > 0 668 669 def delete(self) -> int: 670 """Delete all matching records (DETACH DELETE).""" 671 cypher, params = self._build_delete_cypher() 672 results = self._session._db.query(cypher, params) 673 return cast(int, results[0]["count"]) if results else 0 674 675 def update(self, **kwargs: Any) -> int: 676 """Update all matching records.""" 677 cypher, params = self._build_update_cypher(**kwargs) 678 results = self._session._db.query(cypher, params) 679 return cast(int, results[0]["count"]) if results else 0
Immutable, type-safe query builder for graph queries.
Each method returns a new QueryBuilder instance. The original is never mutated. Provides a fluent API for building Cypher queries with type checking and IDE autocomplete support.
Example:
adults = ( ... session.query(Person) ... .filter(Person.age >= 18) ... .order_by(Person.name) ... .limit(10) ... .all() ... )
627 def all(self) -> list[NodeT]: 628 """Execute the query and return all results.""" 629 cypher, params = self._build_cypher() 630 results = self._execute_query(cypher, params) 631 instances = self._rows_to_instances(results) 632 if self._eager_load and instances: 633 self._session._eager_load_relationships(instances, self._eager_load) 634 return instances
Execute the query and return all results.
636 def first(self) -> NodeT | None: 637 """Execute the query and return the first result.""" 638 clone = self._clone() 639 clone._limit = 1 640 results = clone.all() 641 return results[0] if results else None
Execute the query and return the first result.
643 def one(self) -> NodeT: 644 """Execute the query and return exactly one result. 645 646 Raises QueryError if no results or more than one result. 647 """ 648 clone = self._clone() 649 clone._limit = 2 650 results = clone.all() 651 if not results: 652 raise QueryError("Query returned no results") 653 if len(results) > 1: 654 raise QueryError("Query returned more than one result") 655 return results[0]
Execute the query and return exactly one result.
Raises QueryError if no results or more than one result.
657 def count(self) -> int: 658 """Execute the query and return the count of results.""" 659 cypher, params = self._build_count_cypher() 660 results = self._execute_query(cypher, params) 661 return cast(int, results[0]["count"]) if results else 0
Execute the query and return the count of results.
663 def exists(self) -> bool: 664 """Check if any matching records exist.""" 665 cypher, params = self._build_exists_cypher() 666 results = self._execute_query(cypher, params) 667 return len(results) > 0
Check if any matching records exist.
669 def delete(self) -> int: 670 """Delete all matching records (DETACH DELETE).""" 671 cypher, params = self._build_delete_cypher() 672 results = self._session._db.query(cypher, params) 673 return cast(int, results[0]["count"]) if results else 0
Delete all matching records (DETACH DELETE).
675 def update(self, **kwargs: Any) -> int: 676 """Update all matching records.""" 677 cypher, params = self._build_update_cypher(**kwargs) 678 results = self._session._db.query(cypher, params) 679 return cast(int, results[0]["count"]) if results else 0
Update all matching records.
26class AsyncQueryBuilder(_QueryBuilderBase[NodeT]): 27 """ 28 Immutable, async query builder for graph queries. 29 30 Inherits all Cypher-building and immutable builder methods from 31 ``_QueryBuilderBase``. Only the execution methods are async. 32 """ 33 34 def __init__(self, session: AsyncUniSession, model: type[NodeT]) -> None: 35 self._init_state(session, model) 36 37 async def _execute_query( 38 self, cypher: str, params: dict[str, Any] 39 ) -> list[dict[str, Any]]: 40 """Execute a query, using query_with if timeout/max_memory is set.""" 41 if self._timeout is not None or self._max_memory is not None: 42 builder = self._session._db.query_with(cypher) 43 if params: 44 builder = builder.params(params) 45 if self._timeout is not None: 46 builder = builder.timeout(self._timeout) 47 if self._max_memory is not None: 48 builder = builder.max_memory(self._max_memory) 49 return await builder.run() 50 return await self._session._db.query(cypher, params) 51 52 async def all(self) -> list[NodeT]: 53 """Execute the query and return all results.""" 54 cypher, params = self._build_cypher() 55 results = await self._execute_query(cypher, params) 56 instances = self._rows_to_instances(results) 57 if self._eager_load and instances: 58 await self._session._async_eager_load_relationships( 59 instances, self._eager_load 60 ) 61 return instances 62 63 async def first(self) -> NodeT | None: 64 """Execute the query and return the first result.""" 65 clone = self._clone() 66 clone._limit = 1 67 results = await clone.all() 68 return results[0] if results else None 69 70 async def one(self) -> NodeT: 71 """Execute the query and return exactly one result. 72 73 Raises QueryError if no results or more than one result. 74 """ 75 clone = self._clone() 76 clone._limit = 2 77 results = await clone.all() 78 if not results: 79 raise QueryError("Query returned no results") 80 if len(results) > 1: 81 raise QueryError("Query returned more than one result") 82 return results[0] 83 84 async def count(self) -> int: 85 """Execute the query and return the count of results.""" 86 cypher, params = self._build_count_cypher() 87 results = await self._execute_query(cypher, params) 88 return cast(int, results[0]["count"]) if results else 0 89 90 async def exists(self) -> bool: 91 """Check if any matching records exist.""" 92 cypher, params = self._build_exists_cypher() 93 results = await self._execute_query(cypher, params) 94 return len(results) > 0 95 96 async def delete(self) -> int: 97 """Delete all matching records (DETACH DELETE).""" 98 cypher, params = self._build_delete_cypher() 99 results = await self._session._db.query(cypher, params) 100 return cast(int, results[0]["count"]) if results else 0 101 102 async def update(self, **kwargs: Any) -> int: 103 """Update all matching records.""" 104 cypher, params = self._build_update_cypher(**kwargs) 105 results = await self._session._db.query(cypher, params) 106 return cast(int, results[0]["count"]) if results else 0
Immutable, async query builder for graph queries.
Inherits all Cypher-building and immutable builder methods from
_QueryBuilderBase. Only the execution methods are async.
52 async def all(self) -> list[NodeT]: 53 """Execute the query and return all results.""" 54 cypher, params = self._build_cypher() 55 results = await self._execute_query(cypher, params) 56 instances = self._rows_to_instances(results) 57 if self._eager_load and instances: 58 await self._session._async_eager_load_relationships( 59 instances, self._eager_load 60 ) 61 return instances
Execute the query and return all results.
63 async def first(self) -> NodeT | None: 64 """Execute the query and return the first result.""" 65 clone = self._clone() 66 clone._limit = 1 67 results = await clone.all() 68 return results[0] if results else None
Execute the query and return the first result.
70 async def one(self) -> NodeT: 71 """Execute the query and return exactly one result. 72 73 Raises QueryError if no results or more than one result. 74 """ 75 clone = self._clone() 76 clone._limit = 2 77 results = await clone.all() 78 if not results: 79 raise QueryError("Query returned no results") 80 if len(results) > 1: 81 raise QueryError("Query returned more than one result") 82 return results[0]
Execute the query and return exactly one result.
Raises QueryError if no results or more than one result.
84 async def count(self) -> int: 85 """Execute the query and return the count of results.""" 86 cypher, params = self._build_count_cypher() 87 results = await self._execute_query(cypher, params) 88 return cast(int, results[0]["count"]) if results else 0
Execute the query and return the count of results.
90 async def exists(self) -> bool: 91 """Check if any matching records exist.""" 92 cypher, params = self._build_exists_cypher() 93 results = await self._execute_query(cypher, params) 94 return len(results) > 0
Check if any matching records exist.
96 async def delete(self) -> int: 97 """Delete all matching records (DETACH DELETE).""" 98 cypher, params = self._build_delete_cypher() 99 results = await self._session._db.query(cypher, params) 100 return cast(int, results[0]["count"]) if results else 0
Delete all matching records (DETACH DELETE).
102 async def update(self, **kwargs: Any) -> int: 103 """Update all matching records.""" 104 cypher, params = self._build_update_cypher(**kwargs) 105 results = await self._session._db.query(cypher, params) 106 return cast(int, results[0]["count"]) if results else 0
Update all matching records.
116@dataclass 117class FilterExpr: 118 """A filter expression for a query.""" 119 120 property_name: str 121 op: FilterOp 122 value: Any = None 123 124 def to_cypher(self, node_var: str, param_name: str) -> tuple[str, dict[str, Any]]: 125 """Convert to Cypher WHERE clause fragment.""" 126 prop = f"{node_var}.{self.property_name}" 127 128 if self.op == FilterOp.IS_NULL: 129 return f"{prop} IS NULL", {} 130 elif self.op == FilterOp.IS_NOT_NULL: 131 return f"{prop} IS NOT NULL", {} 132 elif self.op == FilterOp.IN: 133 return f"{prop} IN ${param_name}", {param_name: self.value} 134 elif self.op == FilterOp.NOT_IN: 135 return f"NOT {prop} IN ${param_name}", {param_name: self.value} 136 elif self.op == FilterOp.LIKE: 137 return f"{prop} =~ ${param_name}", {param_name: self.value} 138 elif self.op == FilterOp.STARTS_WITH: 139 return f"{prop} STARTS WITH ${param_name}", {param_name: self.value} 140 elif self.op == FilterOp.ENDS_WITH: 141 return f"{prop} ENDS WITH ${param_name}", {param_name: self.value} 142 elif self.op == FilterOp.CONTAINS: 143 return f"{prop} CONTAINS ${param_name}", {param_name: self.value} 144 else: 145 return f"{prop} {self.op.value} ${param_name}", {param_name: self.value}
A filter expression for a query.
124 def to_cypher(self, node_var: str, param_name: str) -> tuple[str, dict[str, Any]]: 125 """Convert to Cypher WHERE clause fragment.""" 126 prop = f"{node_var}.{self.property_name}" 127 128 if self.op == FilterOp.IS_NULL: 129 return f"{prop} IS NULL", {} 130 elif self.op == FilterOp.IS_NOT_NULL: 131 return f"{prop} IS NOT NULL", {} 132 elif self.op == FilterOp.IN: 133 return f"{prop} IN ${param_name}", {param_name: self.value} 134 elif self.op == FilterOp.NOT_IN: 135 return f"NOT {prop} IN ${param_name}", {param_name: self.value} 136 elif self.op == FilterOp.LIKE: 137 return f"{prop} =~ ${param_name}", {param_name: self.value} 138 elif self.op == FilterOp.STARTS_WITH: 139 return f"{prop} STARTS WITH ${param_name}", {param_name: self.value} 140 elif self.op == FilterOp.ENDS_WITH: 141 return f"{prop} ENDS WITH ${param_name}", {param_name: self.value} 142 elif self.op == FilterOp.CONTAINS: 143 return f"{prop} CONTAINS ${param_name}", {param_name: self.value} 144 else: 145 return f"{prop} {self.op.value} ${param_name}", {param_name: self.value}
Convert to Cypher WHERE clause fragment.
97class FilterOp(Enum): 98 """Filter operation types.""" 99 100 EQ = "=" 101 NE = "<>" 102 LT = "<" 103 LE = "<=" 104 GT = ">" 105 GE = ">=" 106 IN = "IN" 107 NOT_IN = "NOT IN" 108 LIKE = "=~" 109 IS_NULL = "IS NULL" 110 IS_NOT_NULL = "IS NOT NULL" 111 STARTS_WITH = "STARTS WITH" 112 ENDS_WITH = "ENDS WITH" 113 CONTAINS = "CONTAINS"
Filter operation types.
148class PropertyProxy(Generic[T]): 149 """ 150 Proxy for model properties that enables filter expressions. 151 152 Used in query builder to create type-safe filter conditions. 153 154 Example: 155 >>> query.filter(Person.age >= 18) 156 >>> query.filter(Person.name.starts_with("A")) 157 """ 158 159 def __init__(self, property_name: str, model: type[UniNode]) -> None: 160 self._property_name = property_name 161 self._model = model 162 163 def __eq__(self, other: Any) -> FilterExpr: # type: ignore[override] 164 return FilterExpr(self._property_name, FilterOp.EQ, other) 165 166 def __ne__(self, other: Any) -> FilterExpr: # type: ignore[override] 167 return FilterExpr(self._property_name, FilterOp.NE, other) 168 169 def __lt__(self, other: Any) -> FilterExpr: 170 return FilterExpr(self._property_name, FilterOp.LT, other) 171 172 def __le__(self, other: Any) -> FilterExpr: 173 return FilterExpr(self._property_name, FilterOp.LE, other) 174 175 def __gt__(self, other: Any) -> FilterExpr: 176 return FilterExpr(self._property_name, FilterOp.GT, other) 177 178 def __ge__(self, other: Any) -> FilterExpr: 179 return FilterExpr(self._property_name, FilterOp.GE, other) 180 181 def in_(self, values: Sequence[T]) -> FilterExpr: 182 """Check if value is in a list.""" 183 return FilterExpr(self._property_name, FilterOp.IN, list(values)) 184 185 def not_in(self, values: Sequence[T]) -> FilterExpr: 186 """Check if value is not in a list.""" 187 return FilterExpr(self._property_name, FilterOp.NOT_IN, list(values)) 188 189 def like(self, pattern: str) -> FilterExpr: 190 """Match a regex pattern.""" 191 return FilterExpr(self._property_name, FilterOp.LIKE, pattern) 192 193 def is_null(self) -> FilterExpr: 194 """Check if value is null.""" 195 return FilterExpr(self._property_name, FilterOp.IS_NULL) 196 197 def is_not_null(self) -> FilterExpr: 198 """Check if value is not null.""" 199 return FilterExpr(self._property_name, FilterOp.IS_NOT_NULL) 200 201 def starts_with(self, prefix: str) -> FilterExpr: 202 """Check if string starts with prefix.""" 203 return FilterExpr(self._property_name, FilterOp.STARTS_WITH, prefix) 204 205 def ends_with(self, suffix: str) -> FilterExpr: 206 """Check if string ends with suffix.""" 207 return FilterExpr(self._property_name, FilterOp.ENDS_WITH, suffix) 208 209 def contains(self, substring: str) -> FilterExpr: 210 """Check if string contains substring.""" 211 return FilterExpr(self._property_name, FilterOp.CONTAINS, substring)
Proxy for model properties that enables filter expressions.
Used in query builder to create type-safe filter conditions.
Example:
query.filter(Person.age >= 18) query.filter(Person.name.starts_with("A"))
181 def in_(self, values: Sequence[T]) -> FilterExpr: 182 """Check if value is in a list.""" 183 return FilterExpr(self._property_name, FilterOp.IN, list(values))
Check if value is in a list.
185 def not_in(self, values: Sequence[T]) -> FilterExpr: 186 """Check if value is not in a list.""" 187 return FilterExpr(self._property_name, FilterOp.NOT_IN, list(values))
Check if value is not in a list.
189 def like(self, pattern: str) -> FilterExpr: 190 """Match a regex pattern.""" 191 return FilterExpr(self._property_name, FilterOp.LIKE, pattern)
Match a regex pattern.
193 def is_null(self) -> FilterExpr: 194 """Check if value is null.""" 195 return FilterExpr(self._property_name, FilterOp.IS_NULL)
Check if value is null.
197 def is_not_null(self) -> FilterExpr: 198 """Check if value is not null.""" 199 return FilterExpr(self._property_name, FilterOp.IS_NOT_NULL)
Check if value is not null.
201 def starts_with(self, prefix: str) -> FilterExpr: 202 """Check if string starts with prefix.""" 203 return FilterExpr(self._property_name, FilterOp.STARTS_WITH, prefix)
Check if string starts with prefix.
205 def ends_with(self, suffix: str) -> FilterExpr: 206 """Check if string ends with suffix.""" 207 return FilterExpr(self._property_name, FilterOp.ENDS_WITH, suffix)
Check if string ends with suffix.
214class ModelProxy(Generic[NodeT]): 215 """ 216 Proxy for model classes that provides property proxies. 217 218 Enables type-safe property access in query filters. 219 220 Example: 221 >>> Person.name # Returns PropertyProxy for 'name' 222 >>> query.filter(Person.age >= 18) 223 """ 224 225 def __init__(self, model: type[NodeT]) -> None: 226 self._model = model 227 228 def __getattr__(self, name: str) -> PropertyProxy[Any]: 229 if name.startswith("_"): 230 raise AttributeError(name) 231 return PropertyProxy(name, self._model)
Proxy for model classes that provides property proxies.
Enables type-safe property access in query filters.
Example:
Person.name # Returns PropertyProxy for 'name' query.filter(Person.age >= 18)
234@dataclass 235class OrderByClause: 236 """An ORDER BY clause.""" 237 238 property_name: str 239 descending: bool = False
An ORDER BY clause.
242@dataclass 243class TraversalStep: 244 """A relationship traversal step.""" 245 246 edge_type: str 247 direction: Literal["outgoing", "incoming", "both"] 248 target_label: str | None = None
A relationship traversal step.
251@dataclass 252class VectorSearchConfig: 253 """Configuration for vector similarity search.""" 254 255 property_name: str 256 query_vector: list[float] 257 k: int 258 threshold: float | None = None 259 pre_filter: str | None = None
Configuration for vector similarity search.
60class SchemaGenerator: 61 """Generates Uni database schema from registered models.""" 62 63 def __init__(self) -> None: 64 self._node_models: dict[str, type[UniNode]] = {} 65 self._edge_models: dict[str, type[UniEdge]] = {} 66 self._schema: DatabaseSchema | None = None 67 68 def register_node(self, model: type[UniNode]) -> None: 69 """Register a node model for schema generation.""" 70 label = model.__label__ 71 if not label: 72 raise SchemaError(f"Model {model.__name__} has no __label__", model) 73 self._node_models[label] = model 74 self._schema = None # Invalidate cached schema 75 76 def register_edge(self, model: type[UniEdge]) -> None: 77 """Register an edge model for schema generation.""" 78 edge_type = model.__edge_type__ 79 if not edge_type: 80 raise SchemaError(f"Model {model.__name__} has no __edge_type__", model) 81 self._edge_models[edge_type] = model 82 self._schema = None 83 84 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 85 """Register multiple models.""" 86 for model in models: 87 if issubclass(model, UniEdge): 88 self.register_edge(model) 89 elif issubclass(model, UniNode): 90 self.register_node(model) 91 else: 92 raise SchemaError( 93 f"Model {model.__name__} must be a subclass of UniNode or UniEdge" 94 ) 95 96 def _generate_property_schema( 97 self, 98 model: type[UniNode] | type[UniEdge], 99 field_name: str, 100 ) -> PropertySchema: 101 """Generate schema for a single property field.""" 102 field_info = model.model_fields[field_name] 103 104 # Get type hints with forward refs resolved 105 try: 106 hints = get_type_hints(model) 107 type_hint = hints.get(field_name, field_info.annotation) 108 except Exception: 109 type_hint = field_info.annotation 110 111 # Check for nullability 112 is_nullable, inner_type = is_optional(type_hint) 113 114 # Get Uni data type 115 data_type, nullable = python_type_to_uni(type_hint, nullable=is_nullable) 116 117 # Check for vector dimensions 118 vec_dims = get_vector_dimensions(inner_type if is_nullable else type_hint) 119 if vec_dims: 120 data_type = f"vector:{vec_dims}" 121 122 # Get field config for index settings 123 config = get_field_config(field_info) 124 index_type = config.index if config else None 125 unique = config.unique if config else False 126 tokenizer = config.tokenizer if config else None 127 metric = config.metric if config else None 128 129 # Auto-create vector index for Vector fields (regardless of Field config) 130 if vec_dims and not index_type: 131 index_type = "vector" 132 133 return PropertySchema( 134 name=field_name, 135 data_type=data_type, 136 nullable=nullable, 137 index_type=index_type, 138 unique=unique, 139 tokenizer=tokenizer, 140 metric=metric, 141 ) 142 143 def _generate_label_schema(self, model: type[UniNode]) -> LabelSchema: 144 """Generate schema for a node model.""" 145 label = model.__label__ 146 147 properties = {} 148 for field_name in model.get_property_fields(): 149 prop_schema = self._generate_property_schema(model, field_name) 150 properties[field_name] = prop_schema 151 152 return LabelSchema( 153 name=label, 154 properties=properties, 155 ) 156 157 def _generate_edge_type_schema(self, model: type[UniEdge]) -> EdgeTypeSchema: 158 """Generate schema for an edge model.""" 159 edge_type = model.__edge_type__ 160 from_labels = model.get_from_labels() 161 to_labels = model.get_to_labels() 162 163 # If from/to not specified, allow any labels 164 if not from_labels: 165 from_labels = list(self._node_models.keys()) 166 if not to_labels: 167 to_labels = list(self._node_models.keys()) 168 169 properties = {} 170 for field_name in model.get_property_fields(): 171 prop_schema = self._generate_property_schema(model, field_name) 172 properties[field_name] = prop_schema 173 174 return EdgeTypeSchema( 175 name=edge_type, 176 from_labels=from_labels, 177 to_labels=to_labels, 178 properties=properties, 179 ) 180 181 def generate(self) -> DatabaseSchema: 182 """Generate the complete database schema.""" 183 if self._schema is not None: 184 return self._schema 185 186 schema = DatabaseSchema() 187 188 # Generate label schemas 189 for label, model in self._node_models.items(): 190 schema.labels[label] = self._generate_label_schema(model) 191 192 # Generate edge type schemas 193 for edge_type_name, edge_model in self._edge_models.items(): 194 schema.edge_types[edge_type_name] = self._generate_edge_type_schema( 195 edge_model 196 ) 197 198 # Also generate labels from relationships in node models 199 for model in self._node_models.values(): 200 for rel_name, rel_config in model.get_relationship_fields().items(): 201 edge_type = rel_config.edge_type 202 if edge_type not in schema.edge_types: 203 # Create a minimal edge type schema 204 schema.edge_types[edge_type] = EdgeTypeSchema( 205 name=edge_type, 206 from_labels=list(self._node_models.keys()), 207 to_labels=list(self._node_models.keys()), 208 ) 209 210 self._schema = schema 211 return schema 212 213 def apply_to_database(self, db: uni_db.Database) -> None: 214 """Apply the generated schema to a database using SchemaBuilder. 215 216 Uses db.schema() for atomic schema application with additive-only 217 semantics. Creates labels, edge types, properties, and indexes. 218 """ 219 schema = self.generate() 220 221 # Build the full schema using SchemaBuilder, skipping existing labels/edge types 222 builder = db.schema() 223 has_changes = False 224 225 for label, label_schema in schema.labels.items(): 226 if db.label_exists(label): 227 continue # Additive-only: skip existing labels 228 lb = builder.label(label) 229 for prop in label_schema.properties.values(): 230 # Check for vector type 231 if prop.data_type.startswith("vector:"): 232 dims = int(prop.data_type.split(":")[1]) 233 lb = lb.vector(prop.name, dims) 234 elif prop.nullable: 235 lb = lb.property_nullable(prop.name, prop.data_type) 236 else: 237 lb = lb.property(prop.name, prop.data_type) 238 239 # Add indexes (not vector — vector is handled by .vector()) 240 if prop.index_type and prop.index_type in ("btree", "hash"): 241 lb = lb.index(prop.name, prop.index_type) 242 builder = lb.done() 243 has_changes = True 244 245 for edge_type, edge_schema in schema.edge_types.items(): 246 if db.edge_type_exists(edge_type): 247 continue # Skip existing edge types 248 eb = builder.edge_type( 249 edge_type, edge_schema.from_labels, edge_schema.to_labels 250 ) 251 for prop in edge_schema.properties.values(): 252 if prop.nullable: 253 eb = eb.property_nullable(prop.name, prop.data_type) 254 else: 255 eb = eb.property(prop.name, prop.data_type) 256 builder = eb.done() 257 has_changes = True 258 259 if has_changes: 260 builder.apply() 261 262 # Create vector indexes separately (SchemaBuilder may not handle them) 263 for label, label_schema in schema.labels.items(): 264 for prop in label_schema.properties.values(): 265 if prop.index_type == "vector": 266 metric = prop.metric or "l2" 267 try: 268 db.create_vector_index(label, prop.name, metric) 269 except Exception: 270 pass # Index may already exist 271 272 # Create fulltext indexes separately 273 for label, label_schema in schema.labels.items(): 274 for prop in label_schema.properties.values(): 275 if prop.index_type == "fulltext": 276 try: 277 db.create_scalar_index(label, prop.name, "fulltext") 278 except Exception: 279 pass # Index may already exist or not supported 280 281 async def async_apply_to_database(self, db: uni_db.AsyncDatabase) -> None: 282 """Apply the generated schema to an async database. 283 284 Async variant of apply_to_database using AsyncSchemaBuilder. 285 """ 286 schema = self.generate() 287 288 # Build the full schema using AsyncSchemaBuilder, skipping existing labels/edge types 289 builder = db.schema() 290 has_changes = False 291 292 for label, label_schema in schema.labels.items(): 293 if await db.label_exists(label): 294 continue 295 lb = builder.label(label) 296 for prop in label_schema.properties.values(): 297 if prop.data_type.startswith("vector:"): 298 dims = int(prop.data_type.split(":")[1]) 299 lb = lb.vector(prop.name, dims) 300 elif prop.nullable: 301 lb = lb.property_nullable(prop.name, prop.data_type) 302 else: 303 lb = lb.property(prop.name, prop.data_type) 304 305 if prop.index_type and prop.index_type in ("btree", "hash"): 306 lb = lb.index(prop.name, prop.index_type) 307 builder = lb.done() 308 has_changes = True 309 310 for edge_type, edge_schema in schema.edge_types.items(): 311 if await db.edge_type_exists(edge_type): 312 continue 313 eb = builder.edge_type( 314 edge_type, edge_schema.from_labels, edge_schema.to_labels 315 ) 316 for prop in edge_schema.properties.values(): 317 if prop.nullable: 318 eb = eb.property_nullable(prop.name, prop.data_type) 319 else: 320 eb = eb.property(prop.name, prop.data_type) 321 builder = eb.done() 322 has_changes = True 323 324 if has_changes: 325 await builder.apply() 326 327 # Create vector indexes separately 328 for label, label_schema in schema.labels.items(): 329 for prop in label_schema.properties.values(): 330 if prop.index_type == "vector": 331 metric = prop.metric or "l2" 332 try: 333 await db.create_vector_index(label, prop.name, metric) 334 except Exception: 335 pass 336 337 # Create fulltext indexes separately 338 for label, label_schema in schema.labels.items(): 339 for prop in label_schema.properties.values(): 340 if prop.index_type == "fulltext": 341 try: 342 await db.create_scalar_index(label, prop.name, "fulltext") 343 except Exception: 344 pass
Generates Uni database schema from registered models.
68 def register_node(self, model: type[UniNode]) -> None: 69 """Register a node model for schema generation.""" 70 label = model.__label__ 71 if not label: 72 raise SchemaError(f"Model {model.__name__} has no __label__", model) 73 self._node_models[label] = model 74 self._schema = None # Invalidate cached schema
Register a node model for schema generation.
76 def register_edge(self, model: type[UniEdge]) -> None: 77 """Register an edge model for schema generation.""" 78 edge_type = model.__edge_type__ 79 if not edge_type: 80 raise SchemaError(f"Model {model.__name__} has no __edge_type__", model) 81 self._edge_models[edge_type] = model 82 self._schema = None
Register an edge model for schema generation.
84 def register(self, *models: type[UniNode] | type[UniEdge]) -> None: 85 """Register multiple models.""" 86 for model in models: 87 if issubclass(model, UniEdge): 88 self.register_edge(model) 89 elif issubclass(model, UniNode): 90 self.register_node(model) 91 else: 92 raise SchemaError( 93 f"Model {model.__name__} must be a subclass of UniNode or UniEdge" 94 )
Register multiple models.
181 def generate(self) -> DatabaseSchema: 182 """Generate the complete database schema.""" 183 if self._schema is not None: 184 return self._schema 185 186 schema = DatabaseSchema() 187 188 # Generate label schemas 189 for label, model in self._node_models.items(): 190 schema.labels[label] = self._generate_label_schema(model) 191 192 # Generate edge type schemas 193 for edge_type_name, edge_model in self._edge_models.items(): 194 schema.edge_types[edge_type_name] = self._generate_edge_type_schema( 195 edge_model 196 ) 197 198 # Also generate labels from relationships in node models 199 for model in self._node_models.values(): 200 for rel_name, rel_config in model.get_relationship_fields().items(): 201 edge_type = rel_config.edge_type 202 if edge_type not in schema.edge_types: 203 # Create a minimal edge type schema 204 schema.edge_types[edge_type] = EdgeTypeSchema( 205 name=edge_type, 206 from_labels=list(self._node_models.keys()), 207 to_labels=list(self._node_models.keys()), 208 ) 209 210 self._schema = schema 211 return schema
Generate the complete database schema.
213 def apply_to_database(self, db: uni_db.Database) -> None: 214 """Apply the generated schema to a database using SchemaBuilder. 215 216 Uses db.schema() for atomic schema application with additive-only 217 semantics. Creates labels, edge types, properties, and indexes. 218 """ 219 schema = self.generate() 220 221 # Build the full schema using SchemaBuilder, skipping existing labels/edge types 222 builder = db.schema() 223 has_changes = False 224 225 for label, label_schema in schema.labels.items(): 226 if db.label_exists(label): 227 continue # Additive-only: skip existing labels 228 lb = builder.label(label) 229 for prop in label_schema.properties.values(): 230 # Check for vector type 231 if prop.data_type.startswith("vector:"): 232 dims = int(prop.data_type.split(":")[1]) 233 lb = lb.vector(prop.name, dims) 234 elif prop.nullable: 235 lb = lb.property_nullable(prop.name, prop.data_type) 236 else: 237 lb = lb.property(prop.name, prop.data_type) 238 239 # Add indexes (not vector — vector is handled by .vector()) 240 if prop.index_type and prop.index_type in ("btree", "hash"): 241 lb = lb.index(prop.name, prop.index_type) 242 builder = lb.done() 243 has_changes = True 244 245 for edge_type, edge_schema in schema.edge_types.items(): 246 if db.edge_type_exists(edge_type): 247 continue # Skip existing edge types 248 eb = builder.edge_type( 249 edge_type, edge_schema.from_labels, edge_schema.to_labels 250 ) 251 for prop in edge_schema.properties.values(): 252 if prop.nullable: 253 eb = eb.property_nullable(prop.name, prop.data_type) 254 else: 255 eb = eb.property(prop.name, prop.data_type) 256 builder = eb.done() 257 has_changes = True 258 259 if has_changes: 260 builder.apply() 261 262 # Create vector indexes separately (SchemaBuilder may not handle them) 263 for label, label_schema in schema.labels.items(): 264 for prop in label_schema.properties.values(): 265 if prop.index_type == "vector": 266 metric = prop.metric or "l2" 267 try: 268 db.create_vector_index(label, prop.name, metric) 269 except Exception: 270 pass # Index may already exist 271 272 # Create fulltext indexes separately 273 for label, label_schema in schema.labels.items(): 274 for prop in label_schema.properties.values(): 275 if prop.index_type == "fulltext": 276 try: 277 db.create_scalar_index(label, prop.name, "fulltext") 278 except Exception: 279 pass # Index may already exist or not supported
Apply the generated schema to a database using SchemaBuilder.
Uses db.schema() for atomic schema application with additive-only semantics. Creates labels, edge types, properties, and indexes.
281 async def async_apply_to_database(self, db: uni_db.AsyncDatabase) -> None: 282 """Apply the generated schema to an async database. 283 284 Async variant of apply_to_database using AsyncSchemaBuilder. 285 """ 286 schema = self.generate() 287 288 # Build the full schema using AsyncSchemaBuilder, skipping existing labels/edge types 289 builder = db.schema() 290 has_changes = False 291 292 for label, label_schema in schema.labels.items(): 293 if await db.label_exists(label): 294 continue 295 lb = builder.label(label) 296 for prop in label_schema.properties.values(): 297 if prop.data_type.startswith("vector:"): 298 dims = int(prop.data_type.split(":")[1]) 299 lb = lb.vector(prop.name, dims) 300 elif prop.nullable: 301 lb = lb.property_nullable(prop.name, prop.data_type) 302 else: 303 lb = lb.property(prop.name, prop.data_type) 304 305 if prop.index_type and prop.index_type in ("btree", "hash"): 306 lb = lb.index(prop.name, prop.index_type) 307 builder = lb.done() 308 has_changes = True 309 310 for edge_type, edge_schema in schema.edge_types.items(): 311 if await db.edge_type_exists(edge_type): 312 continue 313 eb = builder.edge_type( 314 edge_type, edge_schema.from_labels, edge_schema.to_labels 315 ) 316 for prop in edge_schema.properties.values(): 317 if prop.nullable: 318 eb = eb.property_nullable(prop.name, prop.data_type) 319 else: 320 eb = eb.property(prop.name, prop.data_type) 321 builder = eb.done() 322 has_changes = True 323 324 if has_changes: 325 await builder.apply() 326 327 # Create vector indexes separately 328 for label, label_schema in schema.labels.items(): 329 for prop in label_schema.properties.values(): 330 if prop.index_type == "vector": 331 metric = prop.metric or "l2" 332 try: 333 await db.create_vector_index(label, prop.name, metric) 334 except Exception: 335 pass 336 337 # Create fulltext indexes separately 338 for label, label_schema in schema.labels.items(): 339 for prop in label_schema.properties.values(): 340 if prop.index_type == "fulltext": 341 try: 342 await db.create_scalar_index(label, prop.name, "fulltext") 343 except Exception: 344 pass
Apply the generated schema to an async database.
Async variant of apply_to_database using AsyncSchemaBuilder.
52@dataclass 53class DatabaseSchema: 54 """Complete database schema generated from models.""" 55 56 labels: dict[str, LabelSchema] = field(default_factory=dict) 57 edge_types: dict[str, EdgeTypeSchema] = field(default_factory=dict)
Complete database schema generated from models.
34@dataclass 35class LabelSchema: 36 """Schema for a vertex label.""" 37 38 name: str 39 properties: dict[str, PropertySchema] = field(default_factory=dict)
Schema for a vertex label.
42@dataclass 43class EdgeTypeSchema: 44 """Schema for an edge type.""" 45 46 name: str 47 from_labels: list[str] = field(default_factory=list) 48 to_labels: list[str] = field(default_factory=list) 49 properties: dict[str, PropertySchema] = field(default_factory=dict)
Schema for an edge type.
21@dataclass 22class PropertySchema: 23 """Schema for a single property.""" 24 25 name: str 26 data_type: str 27 nullable: bool = False 28 index_type: str | None = None 29 unique: bool = False 30 tokenizer: str | None = None 31 metric: str | None = None
Schema for a single property.
347def generate_schema(*models: type[UniNode] | type[UniEdge]) -> DatabaseSchema: 348 """Generate a database schema from the given models.""" 349 generator = SchemaGenerator() 350 generator.register(*models) 351 return generator.generate()
Generate a database schema from the given models.
15class UniDatabase: 16 """ 17 Thin wrapper around uni-db DatabaseBuilder for ergonomic database creation. 18 19 Example: 20 >>> db = UniDatabase.open("./path").cache_size(1024*1024).build() 21 >>> db = UniDatabase.temporary().build() 22 >>> db = UniDatabase.in_memory().build() 23 """ 24 25 def __init__(self, builder: uni_db.DatabaseBuilder) -> None: 26 self._builder = builder 27 28 @classmethod 29 def open(cls, path: str) -> UniDatabase: 30 """Open or create a database at the given path.""" 31 import uni_db 32 33 return cls(uni_db.DatabaseBuilder.open(path)) 34 35 @classmethod 36 def create(cls, path: str) -> UniDatabase: 37 """Create a new database at the given path.""" 38 import uni_db 39 40 return cls(uni_db.DatabaseBuilder.create(path)) 41 42 @classmethod 43 def open_existing(cls, path: str) -> UniDatabase: 44 """Open an existing database (must already exist).""" 45 import uni_db 46 47 return cls(uni_db.DatabaseBuilder.open_existing(path)) 48 49 @classmethod 50 def temporary(cls) -> UniDatabase: 51 """Create an ephemeral in-memory database.""" 52 import uni_db 53 54 return cls(uni_db.DatabaseBuilder.temporary()) 55 56 @classmethod 57 def in_memory(cls) -> UniDatabase: 58 """Create a persistent in-memory database.""" 59 import uni_db 60 61 return cls(uni_db.DatabaseBuilder.in_memory()) 62 63 def cache_size(self, bytes_: int) -> UniDatabase: 64 """Set the cache size in bytes.""" 65 self._builder = self._builder.cache_size(bytes_) 66 return self 67 68 def parallelism(self, n: int) -> UniDatabase: 69 """Set the parallelism level.""" 70 self._builder = self._builder.parallelism(n) 71 return self 72 73 def build(self) -> uni_db.Database: 74 """Build and return the database instance.""" 75 return self._builder.build()
Thin wrapper around uni-db DatabaseBuilder for ergonomic database creation.
Example:
db = UniDatabase.open("./path").cache_size(1024*1024).build() db = UniDatabase.temporary().build() db = UniDatabase.in_memory().build()
28 @classmethod 29 def open(cls, path: str) -> UniDatabase: 30 """Open or create a database at the given path.""" 31 import uni_db 32 33 return cls(uni_db.DatabaseBuilder.open(path))
Open or create a database at the given path.
35 @classmethod 36 def create(cls, path: str) -> UniDatabase: 37 """Create a new database at the given path.""" 38 import uni_db 39 40 return cls(uni_db.DatabaseBuilder.create(path))
Create a new database at the given path.
42 @classmethod 43 def open_existing(cls, path: str) -> UniDatabase: 44 """Open an existing database (must already exist).""" 45 import uni_db 46 47 return cls(uni_db.DatabaseBuilder.open_existing(path))
Open an existing database (must already exist).
49 @classmethod 50 def temporary(cls) -> UniDatabase: 51 """Create an ephemeral in-memory database.""" 52 import uni_db 53 54 return cls(uni_db.DatabaseBuilder.temporary())
Create an ephemeral in-memory database.
56 @classmethod 57 def in_memory(cls) -> UniDatabase: 58 """Create a persistent in-memory database.""" 59 import uni_db 60 61 return cls(uni_db.DatabaseBuilder.in_memory())
Create a persistent in-memory database.
63 def cache_size(self, bytes_: int) -> UniDatabase: 64 """Set the cache size in bytes.""" 65 self._builder = self._builder.cache_size(bytes_) 66 return self
Set the cache size in bytes.
78class AsyncUniDatabase: 79 """ 80 Thin wrapper around uni-db AsyncDatabaseBuilder for ergonomic async database creation. 81 82 Example: 83 >>> db = await AsyncUniDatabase.open("./path").build() 84 >>> db = await AsyncUniDatabase.temporary().build() 85 """ 86 87 def __init__(self, builder: uni_db.AsyncDatabaseBuilder) -> None: 88 self._builder = builder 89 90 @classmethod 91 def open(cls, path: str) -> AsyncUniDatabase: 92 """Open or create a database at the given path.""" 93 import uni_db 94 95 return cls(uni_db.AsyncDatabaseBuilder.open(path)) 96 97 @classmethod 98 def temporary(cls) -> AsyncUniDatabase: 99 """Create an ephemeral in-memory database.""" 100 import uni_db 101 102 return cls(uni_db.AsyncDatabaseBuilder.temporary()) 103 104 @classmethod 105 def in_memory(cls) -> AsyncUniDatabase: 106 """Create a persistent in-memory database.""" 107 import uni_db 108 109 return cls(uni_db.AsyncDatabaseBuilder.in_memory()) 110 111 def cache_size(self, bytes_: int) -> AsyncUniDatabase: 112 """Set the cache size in bytes.""" 113 self._builder = self._builder.cache_size(bytes_) 114 return self 115 116 def parallelism(self, n: int) -> AsyncUniDatabase: 117 """Set the parallelism level.""" 118 self._builder = self._builder.parallelism(n) 119 return self 120 121 async def build(self) -> uni_db.AsyncDatabase: 122 """Build and return the async database instance.""" 123 return await self._builder.build()
Thin wrapper around uni-db AsyncDatabaseBuilder for ergonomic async database creation.
Example:
db = await AsyncUniDatabase.open("./path").build() db = await AsyncUniDatabase.temporary().build()
90 @classmethod 91 def open(cls, path: str) -> AsyncUniDatabase: 92 """Open or create a database at the given path.""" 93 import uni_db 94 95 return cls(uni_db.AsyncDatabaseBuilder.open(path))
Open or create a database at the given path.
97 @classmethod 98 def temporary(cls) -> AsyncUniDatabase: 99 """Create an ephemeral in-memory database.""" 100 import uni_db 101 102 return cls(uni_db.AsyncDatabaseBuilder.temporary())
Create an ephemeral in-memory database.
104 @classmethod 105 def in_memory(cls) -> AsyncUniDatabase: 106 """Create a persistent in-memory database.""" 107 import uni_db 108 109 return cls(uni_db.AsyncDatabaseBuilder.in_memory())
Create a persistent in-memory database.
111 def cache_size(self, bytes_: int) -> AsyncUniDatabase: 112 """Set the cache size in bytes.""" 113 self._builder = self._builder.cache_size(bytes_) 114 return self
Set the cache size in bytes.
40def before_create(func: F) -> F: 41 """ 42 Mark a method to be called before the entity is created in the database. 43 44 The method is called after validation but before the INSERT operation. 45 Useful for setting timestamps, generating IDs, or final validation. 46 47 Example: 48 >>> class Person(UniNode): 49 ... name: str 50 ... created_at: datetime | None = None 51 ... 52 ... @before_create 53 ... def set_created_at(self): 54 ... self.created_at = datetime.now() 55 """ 56 return _mark_hook(_BEFORE_CREATE)(func)
Mark a method to be called before the entity is created in the database.
The method is called after validation but before the INSERT operation. Useful for setting timestamps, generating IDs, or final validation.
Example:
class Person(UniNode): ... name: str ... created_at: datetime | None = None ... ... @before_create ... def set_created_at(self): ... self.created_at = datetime.now()
59def after_create(func: F) -> F: 60 """ 61 Mark a method to be called after the entity is created in the database. 62 63 The method is called after the INSERT operation completes successfully. 64 The entity will have its vid/eid assigned at this point. 65 66 Example: 67 >>> class Person(UniNode): 68 ... name: str 69 ... 70 ... @after_create 71 ... def log_creation(self): 72 ... logger.info(f"Created person {self.name} with vid={self.vid}") 73 """ 74 return _mark_hook(_AFTER_CREATE)(func)
Mark a method to be called after the entity is created in the database.
The method is called after the INSERT operation completes successfully. The entity will have its vid/eid assigned at this point.
Example:
class Person(UniNode): ... name: str ... ... @after_create ... def log_creation(self): ... logger.info(f"Created person {self.name} with vid={self.vid}")
77def before_update(func: F) -> F: 78 """ 79 Mark a method to be called before the entity is updated in the database. 80 81 The method is called before the UPDATE operation. 82 Useful for validation or updating timestamps. 83 84 Example: 85 >>> class Person(UniNode): 86 ... name: str 87 ... updated_at: datetime | None = None 88 ... 89 ... @before_update 90 ... def validate_and_timestamp(self): 91 ... if not self.name: 92 ... raise ValueError("Name cannot be empty") 93 ... self.updated_at = datetime.now() 94 """ 95 return _mark_hook(_BEFORE_UPDATE)(func)
Mark a method to be called before the entity is updated in the database.
The method is called before the UPDATE operation. Useful for validation or updating timestamps.
Example:
class Person(UniNode): ... name: str ... updated_at: datetime | None = None ... ... @before_update ... def validate_and_timestamp(self): ... if not self.name: ... raise ValueError("Name cannot be empty") ... self.updated_at = datetime.now()
98def after_update(func: F) -> F: 99 """ 100 Mark a method to be called after the entity is updated in the database. 101 102 The method is called after the UPDATE operation completes successfully. 103 104 Example: 105 >>> class Person(UniNode): 106 ... name: str 107 ... 108 ... @after_update 109 ... def notify_change(self): 110 ... events.emit("person_updated", self.vid) 111 """ 112 return _mark_hook(_AFTER_UPDATE)(func)
Mark a method to be called after the entity is updated in the database.
The method is called after the UPDATE operation completes successfully.
Example:
class Person(UniNode): ... name: str ... ... @after_update ... def notify_change(self): ... events.emit("person_updated", self.vid)
115def before_delete(func: F) -> F: 116 """ 117 Mark a method to be called before the entity is deleted from the database. 118 119 The method is called before the DELETE operation. 120 Useful for cleanup or validation. 121 122 Example: 123 >>> class Person(UniNode): 124 ... name: str 125 ... 126 ... @before_delete 127 ... def cleanup(self): 128 ... # Remove related data 129 ... pass 130 """ 131 return _mark_hook(_BEFORE_DELETE)(func)
Mark a method to be called before the entity is deleted from the database.
The method is called before the DELETE operation. Useful for cleanup or validation.
Example:
class Person(UniNode): ... name: str ... ... @before_delete ... def cleanup(self): ... # Remove related data ... pass
134def after_delete(func: F) -> F: 135 """ 136 Mark a method to be called after the entity is deleted from the database. 137 138 The method is called after the DELETE operation completes successfully. 139 The entity's vid/eid will be cleared at this point. 140 141 Example: 142 >>> class Person(UniNode): 143 ... name: str 144 ... 145 ... @after_delete 146 ... def log_deletion(self): 147 ... logger.info(f"Deleted person {self.name}") 148 """ 149 return _mark_hook(_AFTER_DELETE)(func)
Mark a method to be called after the entity is deleted from the database.
The method is called after the DELETE operation completes successfully. The entity's vid/eid will be cleared at this point.
Example:
class Person(UniNode): ... name: str ... ... @after_delete ... def log_deletion(self): ... logger.info(f"Deleted person {self.name}")
152def before_load(func: F) -> F: 153 """ 154 Mark a method to be called before the entity is loaded from the database. 155 156 This is a class method that receives the raw property dictionary. 157 Can be used to transform data before model instantiation. 158 159 Example: 160 >>> class Person(UniNode): 161 ... name: str 162 ... 163 ... @classmethod 164 ... @before_load 165 ... def transform_data(cls, props: dict) -> dict: 166 ... # Normalize name 167 ... if 'name' in props: 168 ... props['name'] = props['name'].strip() 169 ... return props 170 """ 171 return _mark_hook(_BEFORE_LOAD)(func)
Mark a method to be called before the entity is loaded from the database.
This is a class method that receives the raw property dictionary. Can be used to transform data before model instantiation.
Example:
class Person(UniNode): ... name: str ... ... @classmethod ... @before_load ... def transform_data(cls, props: dict) -> dict: ... # Normalize name ... if 'name' in props: ... props['name'] = props['name'].strip() ... return props
174def after_load(func: F) -> F: 175 """ 176 Mark a method to be called after the entity is loaded from the database. 177 178 The method is called after the entity is instantiated from database data. 179 Useful for computing derived values or initializing non-persisted state. 180 181 Example: 182 >>> class Person(UniNode): 183 ... first_name: str 184 ... last_name: str 185 ... full_name: str | None = None 186 ... 187 ... @after_load 188 ... def compute_full_name(self): 189 ... self.full_name = f"{self.first_name} {self.last_name}" 190 """ 191 return _mark_hook(_AFTER_LOAD)(func)
Mark a method to be called after the entity is loaded from the database.
The method is called after the entity is instantiated from database data. Useful for computing derived values or initializing non-persisted state.
Example:
class Person(UniNode): ... first_name: str ... last_name: str ... full_name: str | None = None ... ... @after_load ... def compute_full_name(self): ... self.full_name = f"{self.first_name} {self.last_name}"
Base exception for all uni-pydantic errors.
19class SchemaError(UniPydanticError): 20 """Error related to schema definition or generation.""" 21 22 def __init__(self, message: str, model: type | None = None) -> None: 23 self.model = model 24 super().__init__(message)
Error related to schema definition or generation.
27class TypeMappingError(SchemaError): 28 """Error mapping Python type to Uni DataType.""" 29 30 def __init__(self, python_type: Any, message: str | None = None) -> None: 31 self.python_type = python_type 32 msg = message or f"Cannot map Python type {python_type!r} to Uni DataType" 33 super().__init__(msg)
Error mapping Python type to Uni DataType.
Validation error for model instances.
Error related to session operations.
44class NotRegisteredError(SessionError): 45 """Model type not registered with session.""" 46 47 def __init__(self, model: type[UniNode] | type[UniEdge]) -> None: 48 self.model = model 49 super().__init__( 50 f"Model {model.__name__!r} is not registered with this session. " 51 f"Call session.register({model.__name__}) first." 52 )
Model type not registered with session.
55class NotPersisted(SessionError): 56 """Operation requires a persisted entity.""" 57 58 def __init__(self, entity: UniNode | UniEdge) -> None: 59 self.entity = entity 60 super().__init__( 61 f"Entity {entity!r} is not persisted. Call session.add() and commit() first." 62 )
Operation requires a persisted entity.
Entity is not tracked by this session.
Error related to transaction operations.
Error executing a query.
Error related to relationship operations.
81class LazyLoadError(RelationshipError): 82 """Error lazy-loading a relationship.""" 83 84 def __init__(self, field_name: str, reason: str) -> None: 85 self.field_name = field_name 86 super().__init__(f"Cannot lazy-load relationship '{field_name}': {reason}")
Error lazy-loading a relationship.
Error during bulk loading operations.
93class CypherInjectionError(QueryError): 94 """Property name validation failure — potential Cypher injection.""" 95 96 def __init__(self, name: str, reason: str | None = None) -> None: 97 self.name = name 98 msg = reason or f"Invalid property name {name!r}: possible Cypher injection" 99 super().__init__(msg)
Property name validation failure — potential Cypher injection.