Skip to content

Rust API Reference

Complete reference for the Uni 1.0.0 embedded graph database Rust API.

Quick Start

use uni_db::{Uni, Value};

#[tokio::main]
async fn main() -> uni_db::Result<()> {
    // Open or create a database
    let db = Uni::open("./my-graph").build().await?;

    // Define schema
    db.schema()
        .label("Person")
            .property("name", uni_db::DataType::String)
            .property("age", uni_db::DataType::Int64)
        .edge_type("KNOWS", &["Person"], &["Person"])
            .property("since", uni_db::DataType::Date)
        .apply()
        .await?;

    // All data access goes through Sessions
    let session = db.session();

    // Read with Cypher
    let results = session.query("MATCH (p:Person) RETURN p.name, p.age").await?;
    for row in results.rows() {
        let name: String = row.get("p.name")?;
        println!("{name}");
    }

    // Write through Transactions
    let tx = session.tx().await?;
    tx.execute("CREATE (:Person {name: 'Alice', age: 30})").await?;
    let result = tx.commit().await?;
    println!("Committed {} mutations at version {}", result.mutations_committed, result.version);

    db.shutdown().await?;
    Ok(())
}

Architecture Overview

Uni 1.0.0 uses a three-tier access model:

Uni  (lifecycle / admin)
 |
 +-- Session  (read scope, cheap to create)
      |
      +-- Transaction  (write scope, ACID)
  • Uni -- lifecycle handle. Creates sessions, manages schema, provides database-level metrics. Does not execute queries directly.
  • Session -- primary read scope. Executes Cypher queries, evaluates Locy programs, holds scoped parameters and rule registries, creates transactions. Cheap, synchronous, infallible to create.
  • Transaction -- explicit write scope. Executes mutations in a private L0 buffer with commit-time serialization. Changes are isolated until commit().

Uni

The top-level database handle. Created via builder methods.

Opening a Database

// Open or create at path
let db = Uni::open("./my-graph").build().await?;

// Open existing only (fails if missing)
let db = Uni::open_existing("./my-graph").build().await?;

// Create new (fails if already exists)
let db = Uni::create("./new-graph").build().await?;

// In-memory / temporary database
let db = Uni::in_memory().build().await?;
let db = Uni::temporary().build().await?;

Factory Methods

Method Returns Description
session() Session Create a new session. Cheap, sync, infallible.
session_template() SessionTemplateBuilder Create a pre-configured session factory.
schema() SchemaBuilder Start building schema changes.
rules() RuleRegistry Access global Locy rule registry.

Admin / Lifecycle

Method Returns Description
metrics() DatabaseMetrics Snapshot database-level metrics (sync, cheap).
flush() Result<()> Flush L0 buffer to persistent L1 storage.
create_snapshot(name) Result<String> Create a named point-in-time snapshot. Returns snapshot ID.
list_snapshots() Result<Vec<SnapshotManifest>> List all available snapshots.
restore_snapshot(id) Result<()> Restore database to a specific snapshot.
shutdown() Result<()> Graceful shutdown: flush data, stop background tasks.
config() &UniConfig Get current configuration.
write_lease() Option<&WriteLease> Get write lease configuration, if any.
compaction() Compaction Access compaction operations.
indexes() Indexes Access index management operations.
functions() Functions Access custom Cypher function management.

Schema Introspection

Method Returns Description
label_exists(name) Result<bool> Check if a label exists in the schema.
edge_type_exists(name) Result<bool> Check if an edge type exists.
list_labels() Result<Vec<String>> List all label names (schema + data).
list_edge_types() Result<Vec<String>> List all edge type names.
get_label_info(name) Result<Option<LabelInfo>> Detailed label info (properties, indexes, constraints, count).
get_edge_type_info(name) Result<Option<EdgeTypeInfo>> Detailed edge type info.
load_schema(path) Result<()> Load schema from a JSON file.
save_schema(path) Result<()> Save current schema to a JSON file.

UniBuilder

Returned by Uni::open(), Uni::create(), etc. Configures the database before opening.

let db = Uni::open("./my-graph")
    .schema_file("schema.json")
    .config(UniConfig { /* ... */ })
    .read_only()
    .write_lease(WriteLease::Local)
    .remote_storage("s3://bucket/data", cloud_config)
    .build()
    .await?;
Method Description
schema_file(path) Load schema from JSON file on initialization.
config(config) Set UniConfig options.
read_only() Open in read-only mode (no writer created).
write_lease(lease) Set write lease strategy for multi-agent access.
xervo_catalog(catalog) Set Uni-Xervo model catalog.
remote_storage(url, config) Configure hybrid local+remote storage.
build() async -- Open the database. Returns Result<Uni>.

Session

The primary read scope. Created via db.session(). Cheap, synchronous, infallible. Implements Clone (clones share the plan cache).

session.query() is read-only — mutation clauses (CREATE, SET, DELETE, MERGE, REMOVE) return an error. Use session.tx() for writes.

let session = db.session();

// Queries (read-only)
let rows = session.query("MATCH (n:Person) RETURN n.name").await?;

// Parameterized queries
let rows = session.query_with("MATCH (n) WHERE n.age > $min RETURN n")
    .param("min", 21)
    .timeout(Duration::from_secs(5))
    .fetch_all()
    .await?;

// Transactions for writes
let tx = session.tx().await?;
tx.execute("CREATE (:Person {name: 'Bob'})").await?;
tx.commit().await?;

// Cloning (shares plan cache, independent params/metrics)
let session2 = session.clone();

Cypher Reads

Method Returns Description
query(cypher) Result<QueryResult> Execute a read-only Cypher query. Rejects mutations with an error. Uses transparent plan cache.
query_with(cypher) QueryBuilder Build a parameterized read query with .param(), .timeout(), .max_memory().

Locy Evaluation

Method Returns Description
locy(program) Result<LocyResult> Evaluate a Locy program with default configuration.
locy_with(program) LocyBuilder Build a parameterized Locy evaluation.
compile_locy(program) Result<CompiledProgram> Compile without executing (uses session's rule registry).

Rule Management

Method Returns Description
rules() RuleRegistry Access the session-scoped rule registry.

Transaction Factory

Method Returns Description
tx() Result<Transaction> Create a new transaction. Fails if session is pinned or another write context is active.
tx_with() TransactionBuilder Build a transaction with .timeout() and .isolation() options.

Scoped Parameters

let session = db.session();
session.params().set("tenant", 42);
session.params().set("region", "us-east");

// Parameters are auto-merged into queries as $session.tenant, $session.region
let rows = session.query("MATCH (n) WHERE n.tenant = $session.tenant RETURN n").await?;
Method Returns Description
params() Params Access the session-scoped parameter store.

Params methods:

Method Description
set(key, value) Set a parameter.
get(key) Get a parameter value. Returns Option<Value>.
unset(key) Remove a parameter. Returns previous value.
get_all() Snapshot all parameters as HashMap<String, Value>.
set_all(iter) Set multiple parameters from an iterator.

Version Pinning

let mut session = db.session();

// Pin to a named snapshot (read-only after pinning)
session.pin_to_version("snapshot_20240101").await?;

// Pin to a timestamp
session.pin_to_timestamp(chrono::Utc::now() - chrono::Duration::hours(1)).await?;

// Unpin and return to live state
session.refresh().await?;
Method Description
pin_to_version(snapshot_id) Pin session to a specific snapshot. Writes rejected while pinned.
pin_to_timestamp(ts) Pin to the closest snapshot at or before the given timestamp.
refresh() Unpin and return to the live database state.
is_pinned() Returns true if pinned.

Prepared Statements

Method Returns Description
prepare(cypher) Result<PreparedQuery> Prepare a Cypher query for repeated execution.
prepare_locy(program) Result<PreparedLocy> Prepare a Locy program for repeated evaluation.

Notifications

Method Returns Description
watch() CommitStream Watch for all commit notifications.
watch_with() WatchBuilder Build a filtered commit notification stream.

Hooks

Method Description
add_hook(name, hook) Add a named session hook. Takes &mut self.
remove_hook(name) Remove a hook by name. Returns true if it existed.
list_hooks() List names of all registered hooks.
clear_hooks() Remove all hooks.

Lifecycle & Observability

Method Returns Description
id() &str Session ID (UUID).
metrics() SessionMetrics Snapshot session-level metrics.
capabilities() SessionCapabilities Query what the session can do in its current mode.
cancel() () Cancel all in-flight queries. Session remains usable after.
cancellation_token() CancellationToken Get a clone of the session's cancellation token.

QueryBuilder

Fluent builder for parameterized Cypher queries within a session. Created by session.query_with(cypher).

let result = session.query_with("MATCH (n:Person) WHERE n.age > $min RETURN n")
    .param("min", 25)
    .param("limit", 100)
    .timeout(Duration::from_secs(10))
    .max_memory(512 * 1024 * 1024) // 512 MB
    .fetch_all()
    .await?;
Method Returns Description
param(name, value) Self Bind a named parameter (no $ prefix).
params(iter) Self Bind multiple parameters from an iterator.
timeout(duration) Self Set maximum execution time.
max_memory(bytes) Self Set maximum memory per query.
cancellation_token(token) Self Attach a cancellation token.
fetch_all() Result<QueryResult> Execute and return all rows.
fetch_one() Result<Option<Row>> Execute and return first row.
cursor() Result<QueryCursor> Execute and return a streaming cursor.
explain() Result<ExplainOutput> Explain the query plan without executing.
profile() Result<(QueryResult, ProfileOutput)> Execute with profiling.

Transaction

The explicit write scope. Created via session.tx(). Provides ACID guarantees with commit-time serialization.

Each transaction owns a private L0 buffer. Reads within the transaction see both committed data and the transaction's own uncommitted writes. Changes are isolated from other transactions and sessions until commit().

let tx = session.tx().await?;

// Reads see uncommitted writes
tx.execute("CREATE (:Person {name: 'Alice', age: 30})").await?;
let rows = tx.query("MATCH (p:Person {name: 'Alice'}) RETURN p.age").await?;

// Parameterized mutations
tx.execute_with("CREATE (:Person {name: $name, age: $age})")
    .param("name", "Bob")
    .param("age", 25)
    .run()
    .await?;

let result = tx.commit().await?;
println!("Version: {}, Mutations: {}", result.version, result.mutations_committed);

Cypher Reads (sees uncommitted writes)

Method Returns Description
query(cypher) Result<QueryResult> Execute a Cypher query within the transaction.
query_with(cypher) TxQueryBuilder Build a parameterized query.

TxQueryBuilder methods:

Method Returns Description
param(name, value) Self Bind a parameter.
cancellation_token(token) Self Attach a cancellation token.
timeout(duration) Self Set maximum execution time.
execute() Result<ExecuteResult> Execute as mutation; returns affected rows.
fetch_all() Result<QueryResult> Execute as query; returns rows.
fetch_one() Result<Option<Row>> Execute and return first row.
cursor() Result<QueryCursor> Execute and return a streaming cursor.

Cypher Writes

Method Returns Description
execute(cypher) Result<ExecuteResult> Execute a Cypher mutation.
execute_with(cypher) ExecuteBuilder Build a parameterized mutation.

ExecuteBuilder methods:

Method Returns Description
param(key, value) Self Bind a parameter.
params(iter) Self Bind multiple parameters.
timeout(duration) Self Set maximum execution time.
run() Result<ExecuteResult> Execute the mutation.

DerivedFactSet Application

Apply Locy DERIVE results to the transaction.

// Evaluate a Locy DERIVE at session level (collects derived facts)
let result = session.locy("
    reachable(X, Y) :- knows(X, Y).
    reachable(X, Z) :- reachable(X, Y), knows(Y, Z).
    ?DERIVE reachable(X, Y).
").await?;

// Apply the derived facts inside a transaction
let tx = session.tx().await?;
let apply_result = tx.apply(result.derived().unwrap().clone()).await?;
println!("Applied {} facts, version gap: {}", apply_result.facts_applied, apply_result.version_gap);
tx.commit().await?;
Method Returns Description
apply(derived) Result<ApplyResult> Apply a DerivedFactSet to this transaction.
apply_with(derived) ApplyBuilder Build with staleness controls.

ApplyBuilder methods:

Method Description
require_fresh() Reject if any commits occurred between evaluation and apply.
max_version_gap(n) Reject if gap exceeds n versions.
run() Execute the apply operation. Returns Result<ApplyResult>.

ApplyResult:

Field Type Description
facts_applied usize Number of mutation queries replayed.
version_gap u64 Versions committed between DERIVE and apply. 0 = fresh.

Locy Evaluation (within Transaction)

DERIVE commands auto-apply to the transaction's private L0.

Method Returns Description
locy(program) Result<LocyResult> Evaluate a Locy program. DERIVE auto-applies to tx.
locy_with(program) TxLocyBuilder Build a parameterized Locy evaluation.

Rule Management

Method Returns Description
rules() RuleRegistry Access the transaction-scoped rule registry. Rules promoted to session on commit.

Bulk Loading (within Transaction)

Method Returns Description
bulk_writer() BulkWriterBuilder Create a bulk writer that writes directly to storage.
appender(label) AppenderBuilder Create a streaming appender for a single label.
bulk_insert_vertices(label, props) Result<Vec<Vid>> Bulk insert vertices to the tx L0. Returns allocated VIDs.
bulk_insert_edges(type, edges) Result<()> Bulk insert edges to the tx L0.

Prepared Statements

Method Returns Description
prepare(cypher) Result<PreparedQuery> Prepare a Cypher query.
prepare_locy(program) Result<PreparedLocy> Prepare a Locy program.

Lifecycle

Method Returns Description
commit() Result<CommitResult> Commit all changes. Consumes the transaction.
rollback() () Rollback all changes. Consumes the transaction. Infallible.
is_dirty() bool Whether the transaction has uncommitted changes.
id() &str Transaction ID (UUID).
started_at_version() u64 Database version when the transaction was created.
cancel() () Cancel all in-flight queries in this transaction.

Drop Behavior

If a transaction is dropped without calling commit() or rollback(), the private L0 buffer is silently discarded. A warning is logged if the transaction was dirty.


CommitResult

Returned by Transaction::commit().

let result = tx.commit().await?;
println!("Version: {}", result.version);
println!("Mutations: {}", result.mutations_committed);
println!("Duration: {:?}", result.duration);
println!("Version gap: {}", result.version_gap());
Field Type Description
mutations_committed usize Number of mutations committed.
rules_promoted usize Number of rules promoted from tx to session.
version u64 Database version after commit.
started_at_version u64 Database version when the transaction started.
wal_lsn u64 WAL log sequence number (0 when no WAL).
duration Duration Time for the commit operation (lock + WAL + merge).
rule_promotion_errors Vec<RulePromotionError> Errors from best-effort rule promotion.
Method Returns Description
version_gap() u64 Number of concurrent commits between tx start and commit. 0 = no contention.

IsolationLevel

pub enum IsolationLevel {
    Serialized, // default — commit-time serialization with private L0 per tx
}

Used with session.tx_with().isolation(IsolationLevel::Serialized).start().await?.


Schema Management

SchemaBuilder

Define labels, edge types, properties, and indexes with a fluent API. Changes are batched and applied atomically.

db.schema()
    .label("Person")
        .property("name", DataType::String)
        .property("age", DataType::Int64)
        .property_nullable("email", DataType::String)
        .vector("embedding", 1536)
        .index("name", IndexType::Scalar(ScalarType::BTree))
    .label("Document")
        .property("title", DataType::String)
        .index("title", IndexType::FullText)
    .edge_type("KNOWS", &["Person"], &["Person"])
        .property("since", DataType::Date)
        .property_nullable("weight", DataType::Float64)
    .apply()
    .await?;

SchemaBuilder methods:

Method Returns Description
label(name) LabelBuilder Start defining a label.
edge_type(name, from, to) EdgeTypeBuilder Start defining an edge type.
current() Arc<Schema> Get the current schema (read-only snapshot).
with_changes(changes) Self Add pre-built schema changes.
apply() Result<()> Apply all batched changes atomically.

LabelBuilder methods:

Method Returns Description
property(name, data_type) Self Add a required property.
property_nullable(name, data_type) Self Add a nullable property.
vector(name, dimensions) Self Add a vector property (shorthand for DataType::Vector).
index(property, index_type) Self Add an index on a property.
done() SchemaBuilder Return to the parent builder.
label(name) LabelBuilder Chain to another label (calls done() first).
edge_type(name, from, to) EdgeTypeBuilder Chain to an edge type.
apply() Result<()> Apply (calls done() then apply()).

EdgeTypeBuilder methods:

Method Returns Description
property(name, data_type) Self Add a required property.
property_nullable(name, data_type) Self Add a nullable property.
done() SchemaBuilder Return to the parent builder.
label(name) LabelBuilder Chain to a label.
edge_type(name, from, to) EdgeTypeBuilder Chain to another edge type.
apply() Result<()> Apply.

DataType Enum

pub enum DataType {
    String,
    Int64,
    Float64,
    Boolean,
    Date,
    DateTime,
    Duration,
    Bytes,
    Json,
    Vector { dimensions: usize },
    // ... additional variants
}

IndexType Enum

pub enum IndexType {
    Vector(VectorIndexCfg),
    FullText,
    Scalar(ScalarType),
    Inverted(InvertedIndexConfig),
}

pub enum ScalarType {
    BTree,
    Hash,
    Bitmap,
    LabelList,
}

pub struct VectorIndexCfg {
    pub algorithm: VectorAlgo,
    pub metric: VectorMetric,
    pub embedding: Option<EmbeddingCfg>,
}

pub enum VectorAlgo {
    Flat,
    IvfFlat { partitions: u32 },
    IvfPq { partitions: u32, sub_vectors: u32 },
    IvfSq { partitions: u32 },
    IvfRq { partitions: u32, num_bits: Option<u8> },
    Hnsw { m: u32, ef_construction: u32, partitions: Option<u32> },      // alias for HnswSq
    HnswFlat { m: u32, ef_construction: u32, partitions: Option<u32> },
    HnswSq { m: u32, ef_construction: u32, partitions: Option<u32> },
    HnswPq { m: u32, ef_construction: u32, sub_vectors: u32, partitions: Option<u32> },
}

pub enum VectorMetric {
    Cosine,
    L2,
    Dot,
}

Schema Info Types

LabelInfo:

Field Type
name String
count usize
properties Vec<PropertyInfo>
indexes Vec<IndexInfo>
constraints Vec<ConstraintInfo>

EdgeTypeInfo:

Field Type
name String
count usize
source_labels Vec<String>
target_labels Vec<String>
properties Vec<PropertyInfo>
indexes Vec<IndexInfo>
constraints Vec<ConstraintInfo>

PropertyInfo: name: String, data_type: String, nullable: bool, is_indexed: bool

IndexInfo: name: String, index_type: String, properties: Vec<String>, status: String

ConstraintInfo: name: String, constraint_type: String, properties: Vec<String>, enabled: bool


Bulk Loading

Two bulk loading mechanisms are available on Transaction:

BulkWriter

High-throughput bulk writer that writes directly to storage, bypassing the L0 buffer. Supports deferred index building, constraint validation, progress callbacks, and automatic checkpointing.

let tx = session.tx().await?;

let mut writer = tx.bulk_writer()
    .batch_size(10_000)
    .defer_vector_indexes(true)
    .async_indexes(false)
    .validate_constraints(true)
    .max_buffer_size_bytes(1_073_741_824) // 1 GB
    .on_progress(|p| println!("{:?}", p.phase))
    .build()?;

let vids = writer.insert_vertices("Person", vec![
    HashMap::from([("name".into(), Value::from("Alice")), ("age".into(), Value::from(30))]),
    HashMap::from([("name".into(), Value::from("Bob")), ("age".into(), Value::from(25))]),
]).await?;

writer.insert_edges("KNOWS", vec![
    EdgeData::new(vids[0], vids[1], HashMap::new()),
]).await?;

let stats = writer.commit().await?;
println!("Inserted {} vertices, {} edges", stats.vertices_inserted, stats.edges_inserted);

tx.commit().await?;

BulkWriterBuilder methods:

Method Description
batch_size(n) Rows to buffer before flush (default: 10,000).
defer_vector_indexes(bool) Defer vector index building until commit (default: true).
defer_scalar_indexes(bool) Defer scalar index building until commit (default: true).
async_indexes(bool) Build indexes asynchronously after commit (default: false).
validate_constraints(bool) Validate NOT NULL, UNIQUE, CHECK constraints (default: true).
max_buffer_size_bytes(n) Trigger checkpoint when buffer exceeds this (default: 1 GB).
on_progress(callback) Set progress callback.
build() Build the BulkWriter. Returns Result<BulkWriter>.

BulkWriter methods:

Method Returns Description
insert_vertices(label, data) Result<Vec<Vid>> Insert vertices. Accepts Vec<HashMap> or RecordBatch.
insert_edges(type, edges) Result<Vec<Eid>> Insert edges via Vec<EdgeData>.
commit() Result<BulkStats> Flush and commit. Rebuilds deferred indexes.
abort() Result<()> Discard all changes and roll back.
stats() &BulkStats Current statistics snapshot.

BulkStats:

Field Type Description
vertices_inserted usize Total vertices inserted.
edges_inserted usize Total edges inserted.
indexes_rebuilt usize Indexes rebuilt at commit.
duration Duration Total elapsed time.
index_build_duration Duration Time spent rebuilding indexes.
index_task_ids Vec<String> Background index task IDs (async mode).
indexes_pending bool True if index builds are running in background.

StreamingAppender

Buffered, single-label row-by-row data loading.

let tx = session.tx().await?;

let mut appender = tx.appender("Person")
    .batch_size(5000)
    .defer_vector_indexes(true)
    .build()?;

appender.append(HashMap::from([("name".into(), Value::from("Alice"))])).await?;
appender.append(HashMap::from([("name".into(), Value::from("Bob"))])).await?;

// Also supports Arrow RecordBatch
// appender.write_batch(&record_batch).await?;

let stats = appender.finish().await?;

tx.commit().await?;

AppenderBuilder methods:

Method Description
batch_size(n) Auto-flush threshold (default: 5000).
defer_vector_indexes(bool) Defer vector index building (default: true).
max_buffer_size_bytes(n) Max buffer size before checkpoint.
build() Build the StreamingAppender. Returns Result<StreamingAppender>.

StreamingAppender methods:

Method Returns Description
append(properties) Result<()> Append a single row. Auto-flushes when buffer is full.
write_batch(record_batch) Result<()> Append an Arrow RecordBatch.
finish() Result<BulkStats> Flush remaining rows and commit. Consumes self.
abort() () Discard all data. Consumes self.
buffered_count() usize Number of rows currently buffered.

Locy Integration

LocyBuilder (Session-level)

Created by session.locy_with(program). Fluent builder for Locy evaluation with parameters.

let result = session.locy_with("
    edge_rule(X, Y) :- knows(X, Y).
    ?edge_rule(X, Y).
")
    .param("threshold", 0.5)
    .timeout(Duration::from_secs(30))
    .max_iterations(1000)
    .run()
    .await?;

for row in result.rows() {
    println!("{:?}", row);
}
Method Returns Description
param(name, value) Self Bind a parameter.
params(iter) Self Bind multiple parameters.
params_map(map) Self Bind from a HashMap.
timeout(duration) Self Override evaluation timeout.
max_iterations(n) Self Override max fixpoint iterations.
cancellation_token(token) Self Attach a cancellation token.
with_config(config) Self Apply a full LocyConfig.
run() Result<LocyResult> Evaluate and return results.
explain() Result<LocyExplainOutput> Explain without executing.

TxLocyBuilder (Transaction-level)

Created by tx.locy_with(program). Same API as LocyBuilder but sees the transaction's uncommitted writes. DERIVE commands auto-apply to the private L0.

LocyResult

Wraps uni_locy::LocyResult with additional accessors.

let result = session.locy("?QUERY knows(X, Y).").await?;

// Access rows (via Deref to inner LocyResult)
for row in result.rows() { /* ... */ }

// Execution metrics
let metrics = result.metrics();
println!("Duration: {:?}", metrics.duration);

// Derived facts (from DERIVE commands)
if let Some(derived) = result.derived() {
    let tx = session.tx().await?;
    tx.apply(derived.clone()).await?;
    tx.commit().await?;
}

// Warnings (from inner LocyResult via Deref)
for warning in result.warnings() {
    println!("Warning: {}", warning.message);
}
Method Returns Description
metrics() &QueryMetrics Execution metrics (timing, row counts).
derived() Option<&DerivedFactSet> Derived facts from DERIVE commands.
into_inner() uni_locy::LocyResult Unwrap, discarding metrics.
into_parts() (LocyResult, QueryMetrics) Decompose into parts.
Deref uni_locy::LocyResult All inner fields/methods (rows(), stats, warnings(), etc.).

LocyExplainOutput

Returned by session.locy_with(program).explain().

Field Type Description
plan_text String Human-readable evaluation plan.
strata_count usize Number of evaluation strata.
rule_names Vec<String> Names of rules in the program.
has_recursive_strata bool Whether fixpoint iteration is needed.
warnings Vec<String> Compiler warnings from static analysis.
command_count usize Number of commands (QUERY, DERIVE, ASSUME, etc.).

For full Locy language documentation, see the Locy Overview.


Prepared Statements

Cache parse/plan results for repeated execution. Auto-replan on schema changes.

PreparedQuery

let prepared = session.prepare("MATCH (n:Person) WHERE n.age > $min RETURN n").await?;

// Execute with positional params
let result = prepared.execute(&[("min", Value::from(21))]).await?;

// Or use the fluent binder
let result = prepared.bind()
    .param("min", 21)
    .execute()
    .await?;

// Can be shared across threads via Arc
let shared = Arc::new(prepared);
Method Returns Description
execute(params) Result<QueryResult> Execute with &[(&str, Value)] params.
bind() PreparedQueryBinder Fluent parameter binder.
query_text() &str Original query text.

PreparedLocy

let prepared = session.prepare_locy("edge_rule(X, Y) :- knows(X, Y). ?edge_rule(X, Y).").await?;

let result = prepared.execute(&[("threshold", Value::from(0.5))]).await?;

let result = prepared.bind()
    .param("threshold", 0.5)
    .execute()
    .await?;
Method Returns Description
execute(params) Result<LocyResult> Execute with &[(&str, Value)] params.
bind() PreparedLocyBinder Fluent parameter binder.
program_text() &str Original program text.

Notifications

Reactive awareness of database changes. Sessions can subscribe to commit events.

CommitNotification

Field Type Description
version u64 Database version after commit.
mutation_count usize Number of mutations in the transaction.
labels_affected Vec<String> Vertex labels affected.
edge_types_affected Vec<String> Edge types affected.
rules_promoted usize Rules promoted from tx to session.
timestamp DateTime<Utc> Commit timestamp.
tx_id String Transaction ID.
session_id String Session ID that committed.
causal_version u64 Version when the transaction started (for causal ordering).

CommitStream

Async stream of commit notifications.

let mut stream = session.watch();
while let Some(notif) = stream.next().await {
    println!("Version {} committed with {} mutations", notif.version, notif.mutation_count);
}
Method Returns Description
next() Option<CommitNotification> Wait for the next matching notification. None if channel closed.

WatchBuilder

Build a filtered commit stream. Created by session.watch_with().

let mut stream = session.watch_with()
    .labels(&["Person", "Document"])
    .edge_types(&["KNOWS"])
    .exclude_session(session.id())
    .debounce(Duration::from_millis(100))
    .build();
Method Returns Description
labels(labels) Self Only receive notifications affecting these labels.
edge_types(types) Self Only receive notifications affecting these edge types.
exclude_session(id) Self Exclude notifications from a specific session.
debounce(interval) Self Collapse notifications within interval.
build() CommitStream Build the filtered stream.

Session Hooks

Intercept queries and commits at the session level. Useful for audit logging, authorization, and metrics.

SessionHook Trait

pub trait SessionHook: Send + Sync {
    /// Called before a query. Return Err to reject.
    fn before_query(&self, ctx: &HookContext) -> Result<()> { Ok(()) }

    /// Called after a query completes. Panics are caught and logged.
    fn after_query(&self, ctx: &HookContext, metrics: &QueryMetrics) {}

    /// Called before commit. Return Err to reject.
    fn before_commit(&self, ctx: &CommitHookContext) -> Result<()> { Ok(()) }

    /// Called after commit succeeds. Panics are caught and logged.
    fn after_commit(&self, ctx: &CommitHookContext, result: &CommitResult) {}
}

Hook Context Types

HookContext:

Field Type
session_id String
query_text String
query_type QueryType (Cypher, Locy, Execute)
params HashMap<String, Value>

CommitHookContext:

Field Type
session_id String
tx_id String
mutation_count usize

Usage Example

struct AuditHook;

impl SessionHook for AuditHook {
    fn before_query(&self, ctx: &HookContext) -> uni_db::Result<()> {
        tracing::info!(session = %ctx.session_id, query = %ctx.query_text, "Query started");
        Ok(())
    }

    fn before_commit(&self, ctx: &CommitHookContext) -> uni_db::Result<()> {
        if ctx.mutation_count > 10_000 {
            return Err(uni_db::UniError::HookRejected {
                message: "Too many mutations in one commit".into(),
            });
        }
        Ok(())
    }
}

let mut session = db.session();
session.add_hook("audit", AuditHook);

Multi-Agent Access

Coordinate write access across multiple processes sharing the same database.

WriteLease

pub enum WriteLease {
    Local,                          // Single-process lock (default)
    DynamoDB { table: String },     // DynamoDB-based distributed lease
    Custom(Box<dyn WriteLeaseProvider>),  // Custom provider
}

WriteLeaseProvider Trait

#[async_trait]
pub trait WriteLeaseProvider: Send + Sync {
    async fn acquire(&self) -> Result<LeaseGuard>;
    async fn heartbeat(&self, guard: &LeaseGuard) -> Result<()>;
    async fn release(&self, guard: LeaseGuard) -> Result<()>;
}

LeaseGuard:

Field Type Description
lease_id String Unique lease acquisition ID.
expires_at DateTime<Utc> Expiration time (renew via heartbeat before this).

Configuration

let db = Uni::open("./shared-db")
    .write_lease(WriteLease::Local)
    .build()
    .await?;

// Or with read-only mode (no writer)
let db = Uni::open("./shared-db")
    .read_only()
    .build()
    .await?;

Synchronous Wrappers

Blocking API wrappers for non-async contexts. Each wraps the corresponding async type with a Tokio runtime.

UniSync

let db = UniSync::in_memory()?;
let session = db.session();
let rows = session.query("MATCH (n) RETURN count(n)")?;
db.shutdown()?;
Method Returns Description
UniSync::new(uni) Result<Self> Wrap an existing Uni in a blocking runtime.
UniSync::in_memory() Result<Self> Open an in-memory database (blocking).
session() SessionSync Create a sync session.
schema() SchemaBuilderSync Access schema builder (sync).
schema_meta() Arc<Schema> Get current schema snapshot.
shutdown() Result<()> Graceful shutdown (blocking).

SessionSync

Mirrors the Session API, with all async methods wrapped in block_on().

Method Returns Notes
query(cypher) Result<QueryResult>
query_with(cypher) QueryBuilderSync .param().fetch_all()
locy(program) Result<LocyResult>
locy_with(program) LocyBuilderSync .param().run()
rules() RuleRegistry
compile_locy(program) Result<CompiledProgram>
tx() Result<TransactionSync>
tx_with() TransactionBuilderSync
watch() CommitStream
watch_with() WatchBuilder
add_hook(name, hook) Takes &mut self
remove_hook(name) bool
pin_to_version(id) Result<()> Takes &mut self
pin_to_timestamp(ts) Result<()> Takes &mut self
refresh() Result<()> Takes &mut self
prepare(cypher) Result<PreparedQuery>
prepare_locy(program) Result<PreparedLocy>
params() Params
id() &str
capabilities() SessionCapabilities
metrics() SessionMetrics
cancel()

TransactionSync

Method Returns Notes
query(cypher) Result<QueryResult>
query_with(cypher) TxQueryBuilderSync
execute(cypher) Result<ExecuteResult>
execute_with(cypher) ExecuteBuilderSync .param().run()
locy(program) Result<LocyResult>
locy_with(program) TxLocyBuilderSync .param().run()
apply(derived) Result<ApplyResult>
apply_with(derived) ApplyBuilderSync
prepare(cypher) Result<PreparedQuery>
prepare_locy(program) Result<PreparedLocy>
bulk_writer() BulkWriterBuilder
appender(label) AppenderBuilder
bulk_insert_vertices(label, props) Result<Vec<Vid>>
bulk_insert_edges(type, edges) Result<()>
commit() Result<CommitResult> Consumes self
rollback() () Consumes self
is_dirty() bool
id() &str

Session Templates

Pre-configured session factories for per-request session creation.

let template = db.session_template()
    .param("tenant", 42)
    .rules("edge_rule(X, Y) :- knows(X, Y).")?
    .hook("audit", AuditHook)
    .query_timeout(Duration::from_secs(30))
    .transaction_timeout(Duration::from_secs(60))
    .build()?;

// Cheap per-request session creation (sync, no recompilation)
let session = template.create();

SessionTemplateBuilder methods:

Method Returns Description
param(key, value) Self Bind a parameter for all sessions.
rules(program) Result<Self> Pre-compile Locy rules (eager compilation).
hook(name, hook) Self Attach a named session hook.
query_timeout(duration) Self Default query timeout.
transaction_timeout(duration) Self Default transaction timeout.
build() Result<SessionTemplate> Build the template.

SessionTemplate methods:

Method Returns Description
create() Session Create a new session. Cheap: rules cloned, params cloned, hooks Arc-cloned.

Database Metrics

Snapshot of database-level metrics. Returned by db.metrics().

let m = db.metrics();
println!("L0 mutations: {}", m.l0_mutation_count);
println!("Active sessions: {}", m.active_sessions);
println!("Uptime: {:?}", m.uptime);
println!("Total queries: {}", m.total_queries);
Field Type Description
l0_mutation_count usize Cumulative L0 mutations since last flush.
l0_estimated_size_bytes usize Estimated L0 buffer size.
schema_version u64 Current schema version number.
uptime Duration Time since database instance was created.
active_sessions usize Currently active sessions.
l1_run_count usize L1 compaction runs completed.
write_throttle_pressure ThrottlePressure Write back-pressure (0.0--1.0).
compaction_status CompactionStatus Current compaction state.
wal_size_bytes u64 WAL size in bytes.
wal_lsn u64 Highest flushed WAL log sequence number.
total_queries u64 Total queries across all sessions.
total_commits u64 Total committed transactions.

SessionMetrics

Returned by session.metrics().

Field Type Description
session_id String The session ID.
active_since Instant When the session was created.
queries_executed u64 Number of queries.
locy_evaluations u64 Number of Locy evaluations.
total_query_time Duration Cumulative query execution time.
transactions_committed u64 Committed transactions.
transactions_rolled_back u64 Rolled-back transactions.
total_rows_returned u64 Rows returned across all queries.
total_rows_scanned u64 Rows scanned.
plan_cache_hits u64 Plan cache hits.
plan_cache_misses u64 Plan cache misses.
plan_cache_size usize Current plan cache entries.

SessionCapabilities

Returned by session.capabilities().

Field Type Description
can_write bool Can create transactions and execute writes.
can_pin bool Supports version pinning.
isolation IsolationLevel Isolation level for transactions.
has_notifications bool Commit notifications available.
write_lease Option<WriteLeaseSummary> Write lease strategy summary.

Result Types

QueryResult

Returned by session.query(), tx.query(), and builder terminals.

let result = session.query("MATCH (n:Person) RETURN n.name, n.age").await?;

// Iterate rows
for row in result.rows() {
    let name: String = row.get("n.name")?;
    let age: i64 = row.get("n.age")?;
}

// Metadata
println!("Columns: {:?}", result.columns());
println!("Row count: {}", result.len());
println!("Empty: {}", result.is_empty());

// Metrics
let metrics = result.metrics();
println!("Duration: {:?}", metrics.duration);

// Consume into owned rows
let rows: Vec<Row> = result.into_rows();

ExecuteResult

Returned by tx.execute() and ExecuteBuilder::run().

let result = tx.execute("CREATE (:Person {name: 'Alice'})").await?;
println!("Affected rows: {}", result.affected_rows);
println!("Nodes created: {}", result.nodes_created);
println!("Relationships created: {}", result.relationships_created);
println!("Properties set: {}", result.properties_set);

Row

let row: &Row = result.rows().first().unwrap();

// Typed access by column name
let name: String = row.get("n.name")?;
let age: i64 = row.get("n.age")?;
let maybe_email: Option<String> = row.get("n.email").ok();

// Access by index
let first_value: &Value = row.get_by_index(0)?;

QueryCursor

Streaming cursor for large result sets.

let mut cursor = session.query_with("MATCH (n) RETURN n")
    .cursor()
    .await?;

while let Some(row) = cursor.next().await? {
    // Process row without loading entire result set
}

Value Types

The Value enum represents all data types in Uni:

pub enum Value {
    Null,
    Bool(bool),
    Int(i64),
    Float(f64),
    String(String),
    Bytes(Vec<u8>),
    List(Vec<Value>),
    Map(HashMap<String, Value>),
    Vector(Vec<f32>),
    Node { id: Vid, labels: Vec<String>, properties: HashMap<String, Value> },
    Edge { id: Eid, src: Vid, dst: Vid, edge_type: String, properties: HashMap<String, Value> },
    Path { nodes: Vec<Value>, edges: Vec<Value> },
    // ... additional variants (Date, DateTime, Duration)
}

Conversions from Rust types are provided via Into<Value>:

Value::from(42_i64)        // Int
Value::from(3.14_f64)      // Float
Value::from("hello")       // String
Value::from(true)          // Bool
Value::from(vec![1.0_f32, 2.0, 3.0])  // Vector

Compaction & Index Management

Compaction

Accessed via db.compaction().

// Compact a specific label or edge type
let stats = db.compaction().compact("Person").await?;

// Wait for all background compaction
db.compaction().wait().await?;
Method Returns Description
compact(name) Result<CompactionStats> Compact data for a label or edge type.
wait() Result<()> Wait for all background compaction.

Index Management

Accessed via db.indexes().

// List all indexes
let all_indexes = db.indexes().list(None);

// List indexes for a label
let person_indexes = db.indexes().list(Some("Person"));

// Rebuild indexes (blocking)
db.indexes().rebuild("Person", false).await?;

// Rebuild indexes (background)
let task_id = db.indexes().rebuild("Person", true).await?;

// Check rebuild status
let status = db.indexes().rebuild_status().await?;

// Retry failed rebuilds
let retried = db.indexes().retry_failed().await?;
Method Returns Description
list(label) Vec<IndexDefinition> List index definitions. None for all indexes.
rebuild(label, background) Result<Option<String>> Rebuild indexes. Returns task ID if background.
rebuild_status() Result<Vec<IndexRebuildTask>> Status of all rebuild tasks.
retry_failed() Result<Vec<String>> Retry failed rebuild tasks. Returns retried IDs.

Snapshots

Named point-in-time snapshots for time travel and backup.

// Create a snapshot
let snapshot_id = db.create_snapshot("before_migration").await?;

// List snapshots
let snapshots = db.list_snapshots().await?;
for s in &snapshots {
    println!("{}: {:?}", s.id, s.created_at);
}

// Pin a session to a snapshot
let mut session = db.session();
session.pin_to_version(&snapshot_id).await?;
let old_data = session.query("MATCH (n) RETURN count(n)").await?;
session.refresh().await?;  // Return to live state

// Restore to a snapshot (requires restart to fully take effect)
db.restore_snapshot(&snapshot_id).await?;

Rule Registry

Manage pre-compiled Locy rules at database, session, or transaction scope. Rules registered at the database level are cloned into every new session.

// Database-level (global)
db.rules().register("edge_rule(X, Y) :- knows(X, Y).")?;

// Session-level
let session = db.session();
session.rules().register("path_rule(X, Z) :- edge_rule(X, Y), edge_rule(Y, Z).")?;

// Query registered rules
let names = session.rules().list();
let info = session.rules().get("edge_rule");
println!("Rules: {:?}, count: {}", names, session.rules().count());

// Remove and clear
session.rules().remove("edge_rule")?;
session.rules().clear();
Method Returns Description
register(program) Result<()> Compile and register rules from a Locy program.
remove(name) Result<bool> Remove a rule by name. Recompiles remaining rules.
list() Vec<String> List all rule names.
get(name) Option<RuleInfo> Get metadata about a rule.
count() usize Number of registered rules.
clear() () Clear all rules.

RuleInfo:

Field Type Description
name String Rule name.
clause_count usize Number of clauses.
is_recursive bool Whether the rule is recursive.