Python API Reference¶
Complete reference for the uni_db Python bindings (v1.0.0). All operations go through the Session (reads, Locy) or Transaction (writes, bulk loading) obtained from a Uni instance.
Installation¶
Or build from source:
Quick Start (Sync)¶
import uni_db
# Open or create a database
db = uni_db.Uni.open("/path/to/db")
# Define schema
db.schema() \
.label("Person").property("name", "string").property("age", "int").done() \
.edge_type("KNOWS", ["Person"], ["Person"]).done() \
.apply()
# Create a session for reads
session = db.session()
# Write data in a transaction
with session.tx() as tx:
tx.execute("CREATE (a:Person {name: 'Alice', age: 30})")
tx.execute("CREATE (b:Person {name: 'Bob', age: 25})")
tx.execute("""
MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'})
CREATE (a)-[:KNOWS {since: 2020}]->(b)
""")
tx.commit()
# Query through the session
result = session.query("MATCH (n:Person) WHERE n.age > 20 RETURN n.name AS name, n.age AS age")
for row in result:
print(row["name"], row["age"])
# Parameterized query
result = session.query(
"MATCH (n:Person) WHERE n.name = $name RETURN n",
{"name": "Alice"}
)
Async Quick Start¶
import uni_db
import asyncio
async def main():
async with uni_db.AsyncUni.open("/path/to/db") as db:
session = db.session()
# Reads work on the session
result = await session.query("MATCH (n:Person) RETURN n.name AS name")
for row in result:
print(row["name"])
# Writes go through a transaction
tx = await session.tx()
await tx.execute("CREATE (:Person {name: 'Carol', age: 28})")
await tx.commit()
asyncio.run(main())
Core Classes¶
Uni¶
The main synchronous entry point. Exposed as uni_db.Uni in Python.
Factory Methods (static)¶
| Method | Description |
|---|---|
Uni.open(path) |
Open or create a database at the given path |
Uni.create(path) |
Create a new database (fails if it already exists) |
Uni.open_existing(path) |
Open an existing database (fails if it does not exist) |
Uni.temporary() |
Create an ephemeral in-memory database |
Uni.in_memory() |
Alias for temporary() |
Uni.builder() |
Return a UniBuilder for advanced configuration |
# Simplest usage
db = uni_db.Uni.open("./my_graph")
# In-memory for tests
db = uni_db.Uni.temporary()
# Context manager (flushes on exit)
with uni_db.Uni.open("./my_graph") as db:
session = db.session()
# ...
Instance Methods¶
| Method | Returns | Description |
|---|---|---|
db.session() |
Session |
Create a new query session |
db.session_template() |
SessionTemplateBuilder |
Create a reusable session factory |
db.schema() |
SchemaBuilder |
Start a fluent schema definition |
db.rules() |
RuleRegistry |
Access the database-level Locy rule registry |
db.xervo() |
Xervo |
Access embedding and generation operations |
db.indexes() |
Indexes |
Access index management |
db.compaction() |
Compaction |
Access compaction operations |
db.metrics() |
DatabaseMetrics |
Database-wide metrics snapshot |
db.config() |
dict |
Current database configuration |
db.flush() |
None |
Flush uncommitted changes to persistent storage |
db.shutdown() |
None |
Flush and prepare for shutdown |
db.label_exists(name) |
bool |
Check if a label exists |
db.edge_type_exists(name) |
bool |
Check if an edge type exists |
db.list_labels() |
list[str] |
List all label names |
db.list_edge_types() |
list[str] |
List all edge type names |
db.get_label_info(name) |
LabelInfo \| None |
Detailed label metadata |
db.get_edge_type_info(name) |
EdgeTypeInfo \| None |
Detailed edge type metadata |
db.load_schema(path) |
None |
Load schema from a JSON file |
db.save_schema(path) |
None |
Save schema to a JSON file |
db.create_snapshot(name) |
str |
Create a named snapshot, returns snapshot ID |
db.list_snapshots() |
list[SnapshotInfo] |
List all snapshots |
db.restore_snapshot(snapshot_id) |
None |
Restore database to a snapshot |
db.write_lease() |
WriteLease \| None |
Current write lease configuration |
UniBuilder¶
Fluent builder for advanced database configuration. Exposed as uni_db.UniBuilder.
Factory Methods (static)¶
| Method | Description |
|---|---|
UniBuilder.open(path) |
Open or create |
UniBuilder.create(path) |
Create new (fail if exists) |
UniBuilder.open_existing(path) |
Open existing (fail if missing) |
UniBuilder.temporary() |
Ephemeral in-memory |
UniBuilder.in_memory() |
Alias for temporary() |
Configuration Methods (chainable)¶
| Method | Description |
|---|---|
.cache_size(bytes) |
Maximum cache size in bytes |
.parallelism(n) |
Number of worker threads |
.schema_file(path) |
Load schema from JSON on init |
.xervo_catalog_from_file(path) |
Configure Xervo models from a JSON file |
.xervo_catalog_from_str(json) |
Configure Xervo models from a JSON string |
.cloud_config(config_dict) |
Cloud storage credentials (s3, gcs, azure) |
.config(config_dict) |
Database options (query_timeout, max_query_memory, etc.) |
.batch_size(n) |
I/O batch size (default 1024) |
.wal_enabled(bool) |
Enable/disable write-ahead log (default True) |
.read_only() |
Open in read-only mode |
.write_lease(lease) |
Multi-agent write coordination |
.hybrid(local_path, remote_url) |
Hybrid storage (local metadata + remote data) |
.build() |
Build and return the Uni instance |
db = (
uni_db.UniBuilder.open("/path/to/db")
.cache_size(1024 * 1024 * 100) # 100 MB cache
.parallelism(4)
.wal_enabled(True)
.build()
)
Session¶
The primary scope for reads and the factory for transactions. Created via db.session().
Sessions are lightweight and can be created freely. Each session maintains its own plan cache, parameter store, rule registry, and version pin state.
session.query() is read-only — mutation clauses (CREATE, SET, DELETE, MERGE, REMOVE) return an error. Use session.tx() for writes.
Query Methods¶
| Method | Returns | Description |
|---|---|---|
session.query(cypher, params=None) |
QueryResult |
Execute a read-only query. Rejects mutations with an error. |
session.query_with(cypher) |
SessionQueryBuilder |
Fluent read query builder |
# Simple query
result = session.query("MATCH (n:Person) RETURN n.name AS name")
# Parameterized
result = session.query(
"MATCH (n:Person) WHERE n.age > $min RETURN n",
{"min": 21}
)
# Builder pattern with timeout
result = (
session.query_with("MATCH (n:Person) WHERE n.age > $min RETURN n")
.param("min", 21)
.timeout(5.0)
.max_memory(50_000_000)
.fetch_all()
)
Locy Methods¶
| Method | Returns | Description |
|---|---|---|
session.locy(program, params=None) |
LocyResult |
Evaluate a Locy program |
session.locy_with(program) |
SessionLocyBuilder |
Fluent Locy builder |
session.compile_locy(program) |
CompiledProgram |
Compile without executing |
Transaction Factory¶
| Method | Returns | Description |
|---|---|---|
session.tx() |
Transaction |
Create a new transaction |
session.tx_with() |
TransactionBuilder |
Transaction with options (timeout, isolation) |
# Default transaction
tx = session.tx()
# Configured transaction
tx = session.tx_with().timeout(30.0).isolation("serialized").start()
Prepared Statements¶
| Method | Returns | Description |
|---|---|---|
session.prepare(cypher) |
PreparedQuery |
Prepare a Cypher query |
session.prepare_locy(program) |
PreparedLocy |
Prepare a Locy program |
Registry and Parameters¶
| Method | Returns | Description |
|---|---|---|
session.rules() |
RuleRegistry |
Session-scoped rule registry |
session.params() |
Params |
Session-scoped parameter store |
Version Pinning¶
| Method | Description |
|---|---|
session.pin_to_version(snapshot_id) |
Pin to a specific snapshot version |
session.pin_to_timestamp(epoch_secs) |
Pin to a specific timestamp |
session.refresh() |
Unpin and refresh to latest version |
session.is_pinned() |
Check if session is pinned |
Notifications¶
| Method | Returns | Description |
|---|---|---|
session.watch() |
CommitStream |
Subscribe to commit notifications |
session.watch_with() |
WatchBuilder |
Filtered commit notifications |
# Watch all commits
with session.watch() as stream:
for notification in stream:
print(f"Version {notification.version}: {notification.labels_affected}")
break # just one
# Watch specific labels
stream = session.watch_with().labels(["Person"]).build()
Hooks¶
| Method | Description |
|---|---|
session.add_hook(name, hook_obj) |
Register a named session hook |
session.remove_hook(name) |
Remove a hook by name (returns bool) |
session.list_hooks() |
List all registered hook names |
session.clear_hooks() |
Remove all hooks |
Hooks can implement before_query(ctx), after_query(ctx, metrics), before_commit(ctx), and after_commit(ctx, result).
class AuditHook:
def before_query(self, ctx):
print(f"Query: {ctx['query_text']}")
def after_commit(self, ctx, result):
print(f"Committed version {result.version}")
session.add_hook("audit", AuditHook())
Metrics and Introspection¶
| Method | Returns | Description |
|---|---|---|
session.metrics() |
SessionMetrics |
Session lifetime metrics |
session.capabilities() |
SessionCapabilities |
Session capabilities snapshot |
session.id |
str |
Session identifier |
session.cancellation_token() |
CancellationToken |
Cooperative cancellation token |
session.cancel() |
None |
Cancel in-progress operations |
Context manager support cancels in-progress operations on exit:
Transaction¶
ACID transactions for atomic writes. Created via session.tx(). Supports context manager protocol (auto-rollback on exception).
session = db.session()
with session.tx() as tx:
tx.execute("CREATE (n:Person {name: 'Alice', age: 30})")
tx.execute("CREATE (n:Person {name: 'Bob', age: 25})")
result = tx.query("MATCH (n:Person) RETURN count(n) AS cnt")
tx.commit()
# If an exception occurs before commit(), the transaction is rolled back automatically.
Read Methods¶
| Method | Returns | Description |
|---|---|---|
tx.query(cypher, params=None) |
QueryResult |
Read query within the transaction |
tx.query_with(cypher) |
TxQueryBuilder |
Fluent query builder |
Write Methods¶
| Method | Returns | Description |
|---|---|---|
tx.execute(cypher, params=None) |
ExecuteResult |
Execute a mutation |
tx.execute_with(cypher) |
TxExecuteBuilder |
Fluent mutation builder |
Locy Methods¶
| Method | Returns | Description |
|---|---|---|
tx.locy(program, params=None) |
LocyResult |
Evaluate Locy within the transaction |
tx.locy_with(program) |
TxLocyBuilder |
Fluent Locy builder |
tx.apply(derived_fact_set) |
ApplyResult |
Materialize a DerivedFactSet |
tx.apply_with(derived_fact_set) |
ApplyBuilder |
Apply with options (require_fresh, max_version_gap) |
Prepared Statements¶
| Method | Returns | Description |
|---|---|---|
tx.prepare(cypher) |
PreparedQuery |
Prepare a Cypher query |
tx.prepare_locy(program) |
PreparedLocy |
Prepare a Locy program |
Bulk Loading¶
| Method | Returns | Description |
|---|---|---|
tx.bulk_writer() |
TxBulkWriterBuilder |
High-throughput bulk data loader |
tx.appender(label) |
StreamingAppender |
Streaming single-label appender |
tx.appender_builder(label) |
TxAppenderBuilder |
Configurable appender builder |
Lifecycle¶
| Method | Returns | Description |
|---|---|---|
tx.commit() |
CommitResult |
Commit the transaction |
tx.rollback() |
None |
Rollback the transaction |
tx.cancel() |
None |
Cancel in-progress operations |
tx.rules() |
RuleRegistry |
Transaction-scoped rule registry |
tx.id() |
str |
Transaction identifier |
tx.started_at_version() |
int |
Database version when started |
tx.is_dirty() |
bool |
Whether the transaction has uncommitted changes |
tx.is_completed() |
bool |
Whether committed or rolled back |
tx.cancellation_token() |
CancellationToken |
Cooperative cancellation token |
Builder Classes¶
SessionQueryBuilder¶
Returned by session.query_with(cypher). Supports chaining.
| Method | Returns | Description |
|---|---|---|
.param(name, value) |
self | Bind a parameter |
.params(dict) |
self | Bind multiple parameters |
.timeout(seconds) |
self | Set query timeout |
.max_memory(bytes) |
self | Set memory limit |
.cancellation_token(token) |
self | Attach cancellation token |
.fetch_all() |
QueryResult |
Execute and return all rows |
.fetch_one() |
dict \| None |
Execute and return first row |
.cursor() |
QueryCursor |
Open a streaming cursor |
.explain() |
ExplainOutput |
Explain without executing |
.profile() |
(QueryResult, ProfileOutput) |
Execute with profiling |
SessionLocyBuilder¶
Returned by session.locy_with(program). Supports chaining.
| Method | Returns | Description |
|---|---|---|
.param(name, value) |
self | Bind a parameter |
.params(dict) |
self | Bind multiple parameters |
.timeout(seconds) |
self | Set evaluation timeout |
.max_iterations(n) |
self | Set fixpoint iteration limit |
.with_config(config) |
self | Apply a LocyConfig or config dict |
.cancellation_token(token) |
self | Attach cancellation token |
.run() |
LocyResult |
Execute the evaluation |
.explain() |
LocyExplainOutput |
Explain without executing |
TransactionBuilder¶
Returned by session.tx_with().
| Method | Returns | Description |
|---|---|---|
.timeout(seconds) |
self | Set transaction timeout |
.isolation(level) |
self | Set isolation level (currently "serialized") |
.start() |
Transaction |
Start the transaction |
TxQueryBuilder¶
Returned by tx.query_with(cypher).
| Method | Returns | Description |
|---|---|---|
.param(name, value) |
self | Bind a parameter |
.timeout(seconds) |
self | Set query timeout |
.fetch_all() |
QueryResult |
Execute and return all rows |
.fetch_one() |
dict \| None |
Execute and return first row |
.execute() |
ExecuteResult |
Execute as a mutation |
.cursor() |
QueryCursor |
Open a streaming cursor |
TxExecuteBuilder¶
Returned by tx.execute_with(cypher).
| Method | Returns | Description |
|---|---|---|
.param(name, value) |
self | Bind a parameter |
.timeout(seconds) |
self | Set execution timeout |
.run() |
ExecuteResult |
Execute the mutation |
TxLocyBuilder¶
Returned by tx.locy_with(program).
| Method | Returns | Description |
|---|---|---|
.param(name, value) |
self | Bind a parameter |
.timeout(seconds) |
self | Set evaluation timeout |
.max_iterations(n) |
self | Set fixpoint iteration limit |
.with_config(config) |
self | Apply a LocyConfig or config dict |
.cancellation_token(token) |
self | Attach cancellation token |
.run() |
LocyResult |
Execute the evaluation |
ApplyBuilder¶
Returned by tx.apply_with(derived_fact_set).
| Method | Returns | Description |
|---|---|---|
.require_fresh(bool) |
self | Fail if version gap is non-zero |
.max_version_gap(n) |
self | Maximum allowed version gap |
.run() |
ApplyResult |
Execute the apply |
Schema Management¶
SchemaBuilder¶
Returned by db.schema(). Accumulates label and edge type definitions, then applies them atomically.
db.schema() \
.label("Person") \
.property("name", "string") \
.property("age", "int") \
.property_nullable("email", "string") \
.vector("embedding", 128) \
.index("name", "btree") \
.done() \
.label("Company") \
.property("name", "string") \
.done() \
.edge_type("WORKS_AT", ["Person"], ["Company"]) \
.property("since", "int") \
.done() \
.apply()
| Method | Returns | Description |
|---|---|---|
.label(name) |
LabelBuilder |
Start defining a label |
.edge_type(name, from_labels, to_labels) |
EdgeTypeBuilder |
Start defining an edge type |
.current() |
dict |
Get the current schema as a dictionary |
.current_typed() |
Schema |
Get the current schema as a typed object |
.apply() |
None |
Apply all pending schema changes |
LabelBuilder¶
Returned by schema.label(name).
| Method | Returns | Description |
|---|---|---|
.property(name, data_type) |
self | Add a required property |
.property_nullable(name, data_type) |
self | Add a nullable property |
.vector(name, dimensions) |
self | Add a vector property |
.index(property, index_type) |
self | Add an index on a property |
.done() |
SchemaBuilder |
Return to parent builder |
.apply() |
None |
Apply schema changes immediately |
The index_type parameter can be a string ("btree", "vector", "fulltext", "inverted") or a dict with detailed configuration:
# Simple index
builder.index("name", "btree")
# Vector index with HNSW configuration
builder.index("embedding", {
"type": "vector",
"algorithm": "hnsw",
"m": 32,
"ef_construction": 400,
"metric": "cosine"
})
# Full-text index with n-gram tokenizer
builder.index("content", {
"type": "fulltext",
"tokenizer": "ngram",
"ngram_min": 2,
"ngram_max": 4
})
EdgeTypeBuilder¶
Returned by schema.edge_type(name, from_labels, to_labels).
| Method | Returns | Description |
|---|---|---|
.property(name, data_type) |
self | Add a required property |
.property_nullable(name, data_type) |
self | Add a nullable property |
.done() |
SchemaBuilder |
Return to parent builder |
.apply() |
None |
Apply schema changes immediately |
Bulk Loading¶
Bulk loading APIs are accessed through a Transaction.
BulkWriter¶
High-throughput bulk data ingestion. Created via tx.bulk_writer().build().
session = db.session()
with session.tx() as tx:
writer = tx.bulk_writer().batch_size(10000).build()
# Insert vertices — returns list of allocated VIDs
vids = writer.insert_vertices("Person", [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
# Insert edges — (source_vid, target_vid, properties)
writer.insert_edges("KNOWS", [
(vids[0], vids[1], {"since": 2020}),
])
# Commit bulk data and rebuild indexes
stats = writer.commit()
print(f"Inserted {stats.vertices_inserted} vertices, {stats.edges_inserted} edges")
tx.commit()
TxBulkWriterBuilder¶
Returned by tx.bulk_writer(). All methods are chainable.
| Method | Description |
|---|---|
.batch_size(n) |
Set batch size for writes |
.defer_vector_indexes(bool) |
Defer vector index rebuilds (default True) |
.defer_scalar_indexes(bool) |
Defer scalar index rebuilds (default True) |
.async_indexes(bool) |
Build indexes asynchronously |
.validate_constraints(bool) |
Enable/disable constraint validation |
.max_buffer_size_bytes(n) |
Maximum in-memory buffer size |
.on_progress(callback) |
Register a progress callback (receives BulkProgress) |
.build() |
Build the BulkWriter |
BulkWriter Methods¶
| Method | Returns | Description |
|---|---|---|
.insert_vertices(label, props_list) |
list[int] |
Insert vertices, returns VIDs |
.insert_edges(edge_type, edges) |
None |
Insert edges as (src, dst, props) tuples |
.stats() |
BulkStats |
Current load statistics |
.touched_labels() |
list[str] |
Labels written to |
.touched_edge_types() |
list[str] |
Edge types written to |
.commit() |
BulkStats |
Commit data and rebuild indexes |
.abort() |
None |
Discard uncommitted changes |
Context manager support (auto-aborts on exception):
StreamingAppender¶
Single-label streaming appender for incremental loading. Created via tx.appender(label).
with session.tx() as tx:
appender = tx.appender("Person")
for record in data_source:
appender.append({"name": record["name"], "age": record["age"]})
stats = appender.finish()
tx.commit()
| Method | Description |
|---|---|
.append(properties) |
Append a single row |
.finish() |
Flush remaining rows (returns BulkStats) |
TxAppenderBuilder¶
Returned by tx.appender_builder(label). Configurable variant of tx.appender().
| Method | Description |
|---|---|
.batch_size(n) |
Set batch size |
.defer_vector_indexes(bool) |
Defer vector index rebuilds |
.max_buffer_size_bytes(n) |
Maximum buffer size |
.build() |
Build the StreamingAppender |
Vector Search¶
Vector similarity search is exposed via Cypher using the uni.vector.query procedure. First, define a vector property and index in the schema:
db.schema() \
.label("Document") \
.property("title", "string") \
.vector("embedding", 128) \
.index("embedding", {"type": "vector", "metric": "cosine"}) \
.done() \
.apply()
Then query:
session = db.session()
query_vec = [0.1, 0.2, ...] # 128 dimensions
result = session.query(
"CALL uni.vector.query('Document', 'embedding', $vec, 10) "
"YIELD vid, distance RETURN vid, distance",
{"vec": query_vec}
)
for row in result:
print(row["vid"], row["distance"])
Locy Reasoning¶
Locy is Uni's built-in Datalog-based reasoning engine. Evaluate programs through a Session or Transaction.
Basic Evaluation¶
session = db.session()
result = session.locy("""
reachable(x, y) :- MATCH (a)-[:KNOWS]->(b) WHERE a.name = x AND b.name = y.
reachable(x, z) :- reachable(x, y), reachable(y, z).
QUERY reachable(x, y) INTO result.
""")
# Access command results (QUERY output)
rows = result.rows()
if rows:
for row in rows:
print(row)
# Access derived relations
derived = result.derived # dict: rule_name -> list[dict]
# Access evaluation statistics
print(result.stats) # LocyStats(strata=..., iterations=..., time=...)
Builder Pattern¶
result = (
session.locy_with(program)
.param("start_node", "Alice")
.timeout(30.0)
.max_iterations(500)
.with_config({
"strict_probability_domain": True,
"exact_probability": True,
"max_bdd_variables": 1000,
})
.run()
)
Materializing Derived Facts¶
After evaluating a Locy program, materialize the results into the graph:
result = session.locy(program)
with session.tx() as tx:
apply_result = tx.apply(result.derived_fact_set)
print(f"Applied {apply_result.facts_applied} facts")
tx.commit()
For controlled application with version gap checks:
with session.tx() as tx:
apply_result = (
tx.apply_with(result.derived_fact_set)
.require_fresh(True)
.max_version_gap(5)
.run()
)
tx.commit()
LocyResult¶
| Attribute/Method | Type | Description |
|---|---|---|
.derived |
dict |
Derived relations: rule_name -> list[dict] |
.stats |
LocyStats |
Evaluation statistics |
.command_results |
list |
QUERY/ABDUCE/EXPLAIN RULE output |
.warnings |
list |
Runtime warnings |
.approximate_groups |
list |
Groups with approximate probabilities |
.derived_fact_set |
DerivedFactSet |
Opaque fact set for tx.apply() |
.rows() |
list \| None |
Rows from the first QUERY command |
.columns() |
list[str] \| None |
Column names from the first QUERY |
.derived_facts(rule) |
list \| None |
Facts for a specific rule |
.has_warning(code) |
bool |
Check for a specific warning code |
.iterations |
int |
Total fixpoint iterations (property) |
See Locy Overview for the language reference.
Xervo (Embedding & Generation)¶
Access the configured model runtime via db.xervo(). Requires a Xervo catalog configured at database open time.
db = (
uni_db.UniBuilder.open("./graph")
.xervo_catalog_from_file("./models.json")
.build()
)
xervo = db.xervo()
# Check availability
if xervo.is_available():
# Embed text -> list[list[float]]
vectors = xervo.embed("embed/default", ["graph databases", "vector search"])
# Generate with Message objects
from uni_db import Message
result = xervo.generate(
"llm/default",
[
Message.system("You are a concise technical assistant."),
Message.user("What is snapshot isolation?"),
],
max_tokens=256,
temperature=0.7,
)
print(result.text) # Generated string
print(result.usage) # TokenUsage | None
# Convenience wrapper -- single prompt string
result = xervo.generate_text("llm/default", "Explain hybrid search in one sentence.")
Async Xervo¶
db = await uni_db.AsyncUni.builder() \
.open("./graph") \
.xervo_catalog_from_file("./models.json") \
.build()
xervo = db.xervo()
vectors = await xervo.embed("embed/default", ["hello"])
result = await xervo.generate_text("llm/default", "Hello!")
Message Constructors¶
Message.user("content") # role = "user"
Message.assistant("content") # role = "assistant"
Message.system("content") # role = "system"
Message(role, content) # explicit
generate() also accepts plain dicts with "role" and "content" keys instead of Message objects.
Prepared Statements¶
Prepare a query or Locy program once, then execute it many times with different parameters. Avoids repeated parsing and planning.
PreparedQuery¶
session = db.session()
# Prepare once
prepared = session.prepare("MATCH (n:Person) WHERE n.name = $name RETURN n")
# Execute many times
result1 = prepared.execute({"name": "Alice"})
result2 = prepared.execute({"name": "Bob"})
# Fluent binder pattern
result = prepared.bind().param("name", "Carol").execute()
# Get the original query text
print(prepared.query_text())
PreparedLocy¶
prepared = session.prepare_locy("""
reachable(x, y) :- MATCH (a)-[:KNOWS]->(b) WHERE a.name = x AND b.name = y.
QUERY reachable($start, y) INTO result.
""")
result = prepared.execute({"start": "Alice"})
# Fluent binder
result = prepared.bind().param("start", "Bob").execute()
Prepared statements are also available on Transactions via tx.prepare() and tx.prepare_locy().
Notifications & Hooks¶
Commit Notifications (Watch)¶
Subscribe to real-time commit notifications from any session:
session = db.session()
# Watch all commits
with session.watch() as stream:
for notification in stream:
print(f"v{notification.version}: {notification.labels_affected}")
# Watch with filters
stream = (
session.watch_with()
.labels(["Person", "Company"])
.edge_types(["WORKS_AT"])
.build()
)
CommitNotification fields:
| Field | Type | Description |
|---|---|---|
version |
int |
Database version after commit |
mutation_count |
int |
Number of mutations |
labels_affected |
list[str] |
Vertex labels affected |
edge_types_affected |
list[str] |
Edge types affected |
rules_promoted |
int |
Locy rules promoted |
timestamp |
str |
ISO 8601 commit timestamp |
tx_id |
str |
Transaction ID |
session_id |
str |
Committing session ID |
causal_version |
int |
Version when the transaction started |
Session Hooks¶
Register Python callbacks for query and commit lifecycle events:
class MyHook:
def before_query(self, ctx):
"""Called before each query. Raise to reject."""
if "DROP" in ctx["query_text"]:
raise ValueError("DROP queries are forbidden")
def after_query(self, ctx, metrics):
"""Called after each query with metrics."""
if metrics.total_time_ms > 1000:
print(f"Slow query: {ctx['query_text']}")
def before_commit(self, ctx):
"""Called before commit. Raise to reject."""
pass
def after_commit(self, ctx, result):
"""Called after successful commit."""
print(f"Committed v{result.version}: {result.mutations_committed} mutations")
session.add_hook("my_hook", MyHook())
Data Types¶
Schema Type Strings¶
When defining properties with strings in SchemaBuilder:
| Type String | Python Type | Description |
|---|---|---|
"string" |
str |
UTF-8 string |
"int" / "int64" |
int |
64-bit signed integer |
"int32" |
int |
32-bit signed integer |
"float" / "float64" |
float |
64-bit float |
"float32" |
float |
32-bit float |
"bool" |
bool |
Boolean |
"vector:N" |
list[float] |
N-dimensional float vector |
"timestamp" |
datetime |
Timestamp |
"date" |
date |
Calendar date |
"time" |
time |
Time of day |
"datetime" |
datetime |
Date and time |
"duration" |
timedelta |
Duration |
"json" |
Any |
Arbitrary JSON-like value |
DataType Class¶
For programmatic type definitions, use uni_db.DataType:
from uni_db import DataType
schema.label("Person") \
.property("name", DataType.STRING()) \
.property("age", DataType.INT64()) \
.property("score", DataType.FLOAT64()) \
.property("active", DataType.BOOL()) \
.property("embedding", DataType.vector(128)) \
.property("tags", DataType.list(DataType.STRING())) \
.property("metadata", DataType.map(DataType.STRING(), DataType.JSON())) \
.done()
Available constructors:
| Constructor | Description |
|---|---|
DataType.STRING() |
UTF-8 string |
DataType.INT32() |
32-bit integer |
DataType.INT64() |
64-bit integer |
DataType.FLOAT32() |
32-bit float |
DataType.FLOAT64() |
64-bit float |
DataType.BOOL() |
Boolean |
DataType.TIMESTAMP() |
Timestamp |
DataType.DATE() |
Calendar date |
DataType.TIME() |
Time of day |
DataType.DATETIME() |
Date and time |
DataType.DURATION() |
Duration |
DataType.JSON() |
Arbitrary value |
DataType.vector(dimensions) |
N-dimensional float vector |
DataType.list(element_type) |
Typed list |
DataType.map(key_type, value_type) |
Typed map |
DataType.crdt(crdt_type) |
CRDT type |
Query Results¶
QueryResult¶
Returned by session.query() and tx.query(). Implements the sequence protocol.
result = session.query("MATCH (n:Person) RETURN n.name AS name, n.age AS age")
# Iterate rows (each row is a dict)
for row in result:
print(row["name"], row["age"])
# Index access
first = result[0]
last = result[-1]
slice = result[1:3]
# Length
print(len(result))
# Boolean (true if non-empty)
if result:
print("Got results")
| Attribute | Type | Description |
|---|---|---|
.rows |
list[dict] |
Row dictionaries |
.columns |
list[str] |
Column names |
.metrics |
QueryMetrics |
Performance metrics |
.warnings |
list[QueryWarning] |
Execution warnings |
QueryMetrics¶
| Attribute | Type | Description |
|---|---|---|
.parse_time_ms |
float |
Parse time in ms |
.plan_time_ms |
float |
Planning time in ms |
.exec_time_ms |
float |
Execution time in ms |
.total_time_ms |
float |
Total time in ms |
.rows_returned |
int |
Number of rows returned |
.rows_scanned |
int |
Number of rows scanned |
.bytes_read |
int |
Bytes read from storage |
.plan_cache_hit |
bool |
Whether the plan was cached |
.l0_reads |
int |
In-memory reads |
.storage_reads |
int |
Persistent storage reads |
.cache_hits |
int |
Cache hits |
ExecuteResult¶
Returned by tx.execute().
| Attribute | Type | Description |
|---|---|---|
.affected_rows |
int |
Total rows affected |
.nodes_created |
int |
Nodes created |
.nodes_deleted |
int |
Nodes deleted |
.relationships_created |
int |
Relationships created |
.relationships_deleted |
int |
Relationships deleted |
.properties_set |
int |
Properties set |
.labels_added |
int |
Labels added |
.labels_removed |
int |
Labels removed |
.metrics |
dict |
Execution metrics |
CommitResult¶
Returned by tx.commit().
| Attribute | Type | Description |
|---|---|---|
.mutations_committed |
int |
Number of mutations committed |
.rules_promoted |
int |
Locy rules promoted |
.version |
int |
Database version after commit |
.started_at_version |
int |
Version when the transaction started |
.wal_lsn |
int |
WAL log sequence number |
.duration_secs |
float |
Commit duration in seconds |
.rule_promotion_errors |
list[RulePromotionError] |
Any rule promotion errors |
.version_gap() |
int |
Versions between start and commit |
ApplyResult¶
Returned by tx.apply().
| Attribute | Type | Description |
|---|---|---|
.facts_applied |
int |
Number of facts materialized |
.version_gap |
int |
Version gap at apply time |
Graph Element Types¶
Query results can contain graph elements with rich object types:
Node -- returned when a query returns a node variable (e.g., RETURN n):
| Attribute/Method | Type | Description |
|---|---|---|
.id |
Vid |
Internal vertex identifier |
.labels |
list[str] |
Node labels |
.properties |
dict |
Property dictionary |
.get(key, default=None) |
Any |
Get a property with default |
.keys() / .values() / .items() |
Dict-like access | |
node["key"] |
Dict-style property access | |
"key" in node |
Membership test |
Edge -- returned when a query returns a relationship variable:
| Attribute/Method | Type | Description |
|---|---|---|
.id |
Eid |
Internal edge identifier |
.type |
str |
Relationship type name |
.start_id |
Vid |
Source vertex ID |
.end_id |
Vid |
Target vertex ID |
.properties |
dict |
Property dictionary |
.get(key, default=None) |
Any |
Get a property with default |
Path -- returned for path expressions:
| Attribute/Method | Type | Description |
|---|---|---|
.nodes |
list[Node] |
Nodes along the path |
.edges |
list[Edge] |
Edges connecting the nodes |
.start |
Node \| None |
First node |
.end |
Node \| None |
Last node |
.is_empty() |
bool |
True if no edges |
len(path) |
int |
Number of hops |
path[i] |
Node \| Edge |
Interleaved access (even=node, odd=edge) |
QueryCursor¶
Streaming cursor for large result sets. Created via session.query_with(cypher).cursor().
with session.query_with("MATCH (n) RETURN n").cursor() as cursor:
print(cursor.columns) # Column names
row = cursor.fetch_one() # Single row or None
batch = cursor.fetch_many(100) # Up to 100 rows
rest = cursor.fetch_all() # All remaining rows
Also supports Python iterator protocol:
EXPLAIN and PROFILE¶
Analyze query execution plans without (or with) running the query.
EXPLAIN¶
explain = session.query_with("MATCH (n:Person) RETURN n").explain()
print(explain.plan_text) # Human-readable plan
print(explain.warnings) # Planner warnings
print(explain.cost_estimates) # Estimated rows and cost
print(explain.index_usage) # Index usage details
print(explain.suggestions) # Index suggestions
PROFILE¶
result, profile = session.query_with("MATCH (n:Person) RETURN n").profile()
print(f"Total time: {profile.total_time_ms}ms")
print(f"Peak memory: {profile.peak_memory_bytes} bytes")
print(profile.plan_text) # Plan with actual row counts
print(profile.operators) # Per-operator statistics
Locy EXPLAIN¶
explain = session.locy_with(program).explain()
print(explain.plan_text)
print(f"Strata: {explain.strata_count}")
print(f"Rules: {explain.rule_names}")
print(f"Recursive: {explain.has_recursive_strata}")
print(f"Commands: {explain.command_count}")
Snapshots¶
Point-in-time snapshots for backup and recovery.
# Create a named snapshot
snapshot_id = db.create_snapshot("before-migration")
# List all snapshots
for snap in db.list_snapshots():
print(snap.snapshot_id, snap.name, snap.created_at, snap.version_hwm)
# Restore to a snapshot
db.restore_snapshot(snapshot_id)
Time-Travel Reads¶
Pin a session to a historical version:
session = db.session()
session.pin_to_version(snapshot_id)
# All queries now read from the snapshot
result = session.query("MATCH (n:Person) RETURN n")
# Or pin to a timestamp (seconds since epoch)
session.pin_to_timestamp(1700000000.0)
# Unpin and return to latest
session.refresh()
Async Snapshots¶
snapshot_id = await db.create_snapshot("pre-migration")
snapshots = await db.list_snapshots()
await db.restore_snapshot(snapshot_id)
SnapshotInfo fields: snapshot_id, name, created_at, version_hwm.
Error Handling¶
All exceptions inherit from uni_db.UniError, which inherits from Python's Exception.
import uni_db
try:
session = db.session()
result = session.query("INVALID CYPHER")
except uni_db.UniParseError as e:
print(f"Parse error: {e}")
except uni_db.UniQueryError as e:
print(f"Query error: {e}")
except uni_db.UniError as e:
print(f"Database error: {e}")
Exception Hierarchy¶
Base:
| Exception | Description |
|---|---|
UniError |
Base for all Uni database errors |
Database Lifecycle:
| Exception | Description |
|---|---|
UniNotFoundError |
Database path does not exist |
UniDatabaseLockedError |
Database is locked by another process |
Schema:
| Exception | Description |
|---|---|
UniSchemaError |
Schema definition or migration error |
UniLabelNotFoundError |
Label not found in schema |
UniEdgeTypeNotFoundError |
Edge type not found in schema |
UniPropertyNotFoundError |
Property not found on entity |
UniIndexNotFoundError |
Index not found |
UniLabelAlreadyExistsError |
Label already exists |
UniEdgeTypeAlreadyExistsError |
Edge type already exists |
UniConstraintError |
Constraint violation |
UniInvalidIdentifierError |
Invalid identifier name |
Query & Parse:
| Exception | Description |
|---|---|
UniParseError |
Cypher or Locy parse error |
UniQueryError |
Query execution error |
UniTypeError |
Type mismatch error |
Transaction:
| Exception | Description |
|---|---|
UniTransactionError |
General transaction error |
UniTransactionConflictError |
Transaction serialization conflict |
UniTransactionAlreadyCompletedError |
Transaction already committed or rolled back |
UniTransactionExpiredError |
Transaction exceeded its deadline |
UniCommitTimeoutError |
Commit timed out waiting for writer lock |
Resource Limits:
| Exception | Description |
|---|---|
UniMemoryLimitExceededError |
Query exceeded memory limit |
UniTimeoutError |
Operation timed out |
Access Control:
| Exception | Description |
|---|---|
UniReadOnlyError |
Write on a read-only database |
UniPermissionDeniedError |
Permission denied |
Storage & I/O:
| Exception | Description |
|---|---|
UniStorageError |
Storage layer error |
UniIOError |
I/O error |
UniInternalError |
Internal error |
Snapshot:
| Exception | Description |
|---|---|
UniSnapshotNotFoundError |
Snapshot not found |
Arguments:
| Exception | Description |
|---|---|
UniInvalidArgumentError |
Invalid argument |
Concurrency:
| Exception | Description |
|---|---|
UniWriteContextAlreadyActiveError |
A write context is already active on the session |
UniCancelledError |
Operation was cancelled |
Locy:
| Exception | Description |
|---|---|
UniStaleDerivedFactsError |
Derived facts are stale relative to current database version |
UniRuleConflictError |
Locy rule conflict during promotion |
UniHookRejectedError |
A session hook rejected the operation |
UniLocyCompileError |
Locy program compilation error |
UniLocyRuntimeError |
Locy program runtime error |
Async API¶
The async API mirrors the sync API. All classes are prefixed with Async and methods return awaitables.
| Sync | Async |
|---|---|
Uni |
AsyncUni |
UniBuilder |
AsyncUniBuilder |
Session |
AsyncSession |
Transaction |
AsyncTransaction |
async with uni_db.AsyncUni.open("./graph") as db:
session = db.session()
# Reads
result = await session.query("MATCH (n:Person) RETURN n.name AS name")
# Writes
tx = await session.tx()
await tx.execute("CREATE (:Person {name: 'Dave', age: 35})")
await tx.commit()
# Locy
result = await session.locy(program)
# Xervo
xervo = db.xervo()
vectors = await xervo.embed("embed/default", ["hello world"])
Full API Documentation¶
See the auto-generated pdoc documentation for complete API details including all method signatures and type annotations.