Rust API Reference¶
Uni provides a comprehensive Rust API for embedding the graph database directly into your application. This reference covers all public APIs.
Quick Start¶
use uni::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// Open or create database
let db = Uni::open("./my-graph")
.schema_file("schema.json")
.build()
.await?;
// Run a query
let results = db.query("MATCH (p:Paper) WHERE p.year > 2020 RETURN p.title LIMIT 10").await?;
for row in &results {
let title: String = row.get("p.title")?;
println!("{}", title);
}
Ok(())
}
Module: uni¶
The main entry point for the database.
Uni¶
/// Main database instance
pub struct Uni {
// Internal state
}
impl Uni {
/// Open or create a database at the given path
pub fn open(path: impl AsRef<Path>) -> UniBuilder;
/// Create an in-memory database
pub fn in_memory() -> UniBuilder;
/// Get current configuration
pub fn config(&self) -> &UniConfig;
/// Get current schema
pub fn current_schema(&self) -> Schema;
/// Flush uncommitted changes to storage
pub async fn flush(&self) -> Result<()>;
}
UniBuilder¶
/// Fluent builder for database instances
pub struct UniBuilder {
// Internal state
}
impl UniBuilder {
/// Set schema from JSON file
pub fn schema_file(self, path: impl AsRef<Path>) -> Self;
/// Set configuration
pub fn config(self, config: UniConfig) -> Self;
/// Set cache size in bytes
pub fn cache_size(self, bytes: usize) -> Self;
/// Set parallelism (worker threads)
pub fn parallelism(self, n: usize) -> Self;
/// Build the database instance (async)
pub async fn build(self) -> Result<Uni>;
/// Build the database instance (blocking)
pub fn build_sync(self) -> Result<Uni>;
}
// Example
let db = Uni::open("./data")
.schema_file("schema.json")
.cache_size(2 * 1024 * 1024 * 1024) // 2 GB
.parallelism(8)
.build()
.await?;
Queries¶
Basic Queries¶
impl Uni {
/// Execute a Cypher query
pub async fn query(&self, cypher: &str) -> Result<QueryResult>;
/// Execute a query with parameters
pub fn query_with(&self, cypher: &str) -> QueryBuilder<'_>;
/// Execute a mutation (CREATE, SET, DELETE, MERGE)
pub async fn execute(&self, cypher: &str) -> Result<ExecuteResult>;
}
// Simple query
let results = db.query("MATCH (p:Paper) RETURN p.title, p.year").await?;
// Query with parameters
let results = db.query_with("MATCH (p:Paper) WHERE p.year > $year RETURN p")
.param("year", 2020)
.fetch_all()
.await?;
// Mutation
let result = db.execute("CREATE (p:Paper {title: 'New Paper', year: 2024})").await?;
println!("Created {} nodes", result.affected_rows);
QueryBuilder¶
/// Builder for parameterized queries
pub struct QueryBuilder<'a> {
// Internal state
}
impl<'a> QueryBuilder<'a> {
/// Add a parameter
pub fn param<V: Into<Value>>(self, name: &str, value: V) -> Self;
/// Add multiple parameters
pub fn params(self, params: HashMap<String, Value>) -> Self;
/// Execute and fetch all results
pub async fn fetch_all(self) -> Result<QueryResult>;
}
// Example with multiple parameters
let results = db.query_with(
"MATCH (a:Author)-[:AUTHORED]->(p:Paper)
WHERE a.name = $name AND p.year >= $min_year
RETURN p.title, p.year"
)
.param("name", "Jane Smith")
.param("min_year", 2020)
.fetch_all()
.await?;
Sessions¶
Sessions provide scoped context for multi-tenant and security-aware queries.
SessionBuilder¶
impl Uni {
/// Create a session builder with scoped variables
pub fn session(&self) -> SessionBuilder<'_>;
}
/// Builder for creating query sessions
pub struct SessionBuilder<'a> {
// Internal state
}
impl<'a> SessionBuilder<'a> {
/// Set a session variable
pub fn set<K: Into<String>, V: Into<Value>>(self, key: K, value: V) -> Self;
/// Build the session (variables become immutable)
pub fn build(self) -> Session<'a>;
}
Session¶
/// A query session with scoped variables
pub struct Session<'a> {
// Internal state
}
impl<'a> Session<'a> {
/// Execute a query with session variables available
pub async fn query(&self, cypher: &str) -> Result<QueryResult>;
/// Execute a query with additional parameters
pub fn query_with(&self, cypher: &str) -> SessionQueryBuilder<'a, '_>;
/// Execute a mutation
pub async fn execute(&self, cypher: &str) -> Result<ExecuteResult>;
/// Get a session variable value
pub fn get(&self, key: &str) -> Option<&Value>;
}
Session Example¶
// Create session with tenant context
let session = db.session()
.set("tenant_id", "acme-corp")
.set("user_id", "user-123")
.set("granted_tags", vec!["public", "team:eng"])
.build();
// All queries automatically have access to $session.*
let results = session.query(
"MATCH (d:Document)
WHERE d.tenant_id = $session.tenant_id
RETURN d.title"
).await?;
// Query with additional parameters
let results = session.query_with(
"MATCH (d:Document)
WHERE d.tenant_id = $session.tenant_id
AND d.status = $status
RETURN d"
)
.param("status", "published")
.fetch_all()
.await?;
Bulk Loading¶
High-performance data loading with deferred indexing.
BulkWriter¶
impl Uni {
/// Create a bulk writer builder
pub fn bulk_writer(&self) -> BulkWriterBuilder<'_>;
}
/// Builder for bulk write operations
pub struct BulkWriterBuilder<'a> {
// Internal state
}
impl<'a> BulkWriterBuilder<'a> {
/// Defer vector index updates until commit
pub fn defer_vector_indexes(self, defer: bool) -> Self;
/// Defer scalar index updates until commit
pub fn defer_scalar_indexes(self, defer: bool) -> Self;
/// Set batch size for flushing
pub fn batch_size(self, size: usize) -> Self;
/// Build indexes asynchronously after commit
pub fn async_indexes(self, async_build: bool) -> Self;
/// Set progress callback
pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(self, f: F) -> Self;
/// Build the bulk writer
pub fn build(self) -> Result<BulkWriter<'a>>;
}
/// Bulk writer for high-performance data loading
pub struct BulkWriter<'a> {
// Internal state
}
impl<'a> BulkWriter<'a> {
/// Insert vertices in bulk
pub async fn insert_vertices(
&mut self,
label: &str,
vertices: Vec<HashMap<String, Value>>,
) -> Result<Vec<Vid>>;
/// Insert edges in bulk
pub async fn insert_edges(
&mut self,
edge_type: &str,
edges: Vec<EdgeData>,
) -> Result<Vec<Eid>>;
/// Commit all pending data and rebuild indexes
pub async fn commit(self) -> Result<BulkStats>;
/// Abort bulk operation, discarding uncommitted data
pub async fn abort(self) -> Result<()>;
}
/// Bulk loading progress information
#[derive(Debug, Clone)]
pub struct BulkProgress {
pub phase: BulkPhase,
pub rows_processed: usize,
pub total_rows: Option<usize>,
pub current_label: Option<String>,
pub elapsed: Duration,
}
/// Bulk loading phase
#[derive(Debug, Clone)]
pub enum BulkPhase {
Inserting,
RebuildingVectorIndex { label: String, property: String },
RebuildingScalarIndex { label: String, property: String },
UpdatingAdjacency,
Finalizing,
}
/// Bulk loading statistics
#[derive(Debug, Clone, Default)]
pub struct BulkStats {
pub vertices_inserted: usize,
pub edges_inserted: usize,
pub indexes_rebuilt: usize,
pub duration: Duration,
pub index_build_duration: Duration,
}
BulkWriter Example¶
// Create bulk writer with deferred indexing
let mut bulk = db.bulk_writer()
.defer_vector_indexes(true)
.defer_scalar_indexes(true)
.batch_size(50_000)
.on_progress(|p| println!("{:?}: {} rows", p.phase, p.rows_processed))
.build()?;
// Insert 100K vertices
let vertices: Vec<HashMap<String, Value>> = (0..100_000)
.map(|i| {
hashmap! {
"name" => format!("item-{}", i).into(),
"embedding" => random_vector(128).into(),
}
})
.collect();
let vids = bulk.insert_vertices("Item", vertices).await?;
assert_eq!(vids.len(), 100_000);
// Commit and rebuild indexes
let stats = bulk.commit().await?;
println!("Loaded {} vertices, rebuilt {} indexes in {:?}",
stats.vertices_inserted, stats.indexes_rebuilt, stats.duration);
Snapshots¶
Read-only access to historical database states.
Snapshot Management¶
impl Uni {
/// List all available snapshots
pub async fn list_snapshots(&self) -> Result<Vec<SnapshotInfo>>;
/// Open a read-only view at a specific snapshot
pub async fn at_snapshot(&self, snapshot_id: &str) -> Result<Uni>;
/// Restore database to a snapshot state
pub async fn restore_snapshot(&self, snapshot_id: &str) -> Result<()>;
}
/// Snapshot metadata
#[derive(Debug, Clone)]
pub struct SnapshotInfo {
pub id: String,
pub name: Option<String>,
pub created_at: DateTime<Utc>,
pub version: u64,
}
Snapshot Example¶
// List available snapshots
let snapshots = db.list_snapshots().await?;
for snap in &snapshots {
println!("{}: {} ({})", snap.id, snap.name.as_deref().unwrap_or("-"), snap.created_at);
}
// Open a read-only view at a specific snapshot
let historical = db.at_snapshot(&snapshots[0].id).await?;
// Query historical data
let old_results = historical.query("MATCH (n) RETURN count(n) AS c").await?;
println!("Count at snapshot: {}", old_results[0].get::<i64>("c")?);
// Writes fail on snapshot readers
let result = historical.execute("CREATE (n:Test)").await;
assert!(result.is_err()); // WriteOnReadOnly error
Snapshot Procedures¶
Snapshots can also be managed via Cypher:
// Create a named snapshot
CALL db.snapshot.create('before_migration')
YIELD id, name, created
// List snapshots
CALL db.snapshot.list()
YIELD id, name, created, size
// Restore to a snapshot
CALL db.snapshot.restore('before_migration')
Transactions¶
impl Uni {
/// Begin an explicit transaction
pub async fn begin(&self) -> Result<Transaction<'_>>;
/// Run a closure within a transaction
pub async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
where
F: FnOnce(Transaction<'_>) -> Fut,
Fut: Future<Output = Result<T>>;
}
/// Active transaction handle
pub struct Transaction<'a> {
// Internal state
}
impl<'a> Transaction<'a> {
/// Execute a query within the transaction
pub async fn query(&self, cypher: &str) -> Result<QueryResult>;
/// Execute a mutation within the transaction
pub async fn execute(&self, cypher: &str) -> Result<ExecuteResult>;
/// Commit the transaction
pub async fn commit(self) -> Result<()>;
/// Rollback the transaction
pub async fn rollback(self) -> Result<()>;
}
Transaction Examples¶
// Explicit transaction
let tx = db.begin().await?;
tx.execute("CREATE (p:Paper {title: 'Paper 1'})").await?;
tx.execute("CREATE (p:Paper {title: 'Paper 2'})").await?;
tx.commit().await?;
// Closure-based transaction (auto-commit on success, rollback on error)
db.transaction(|tx| async move {
tx.execute("CREATE (a:Author {name: 'Alice'})").await?;
tx.execute("CREATE (a:Author {name: 'Bob'})").await?;
Ok(())
}).await?;
// Transaction with rollback
let tx = db.begin().await?;
tx.execute("DELETE (p:Paper) WHERE p.year < 2000").await?;
// Changed our mind
tx.rollback().await?;
Query Results¶
QueryResult¶
/// Collection of result rows
pub struct QueryResult {
// Internal state
}
impl QueryResult {
/// Get column names
pub fn columns(&self) -> &[String];
/// Get number of rows
pub fn len(&self) -> usize;
/// Check if empty
pub fn is_empty(&self) -> bool;
/// Get rows as slice
pub fn rows(&self) -> &[Row];
/// Consume into owned rows
pub fn into_rows(self) -> Vec<Row>;
/// Iterate over rows
pub fn iter(&self) -> impl Iterator<Item = &Row>;
}
// Implements IntoIterator
for row in results {
// ...
}
// Or by reference
for row in &results {
// ...
}
Row¶
/// Single result row
pub struct Row {
// Internal state
}
impl Row {
/// Get typed value by column name
pub fn get<T: FromValue>(&self, column: &str) -> Result<T>;
/// Get typed value by index
pub fn get_idx<T: FromValue>(&self, index: usize) -> Result<T>;
/// Try to get value (returns None if missing or wrong type)
pub fn try_get<T: FromValue>(&self, column: &str) -> Option<T>;
/// Get raw Value by column name
pub fn value(&self, column: &str) -> Option<&Value>;
/// Convert to HashMap
pub fn as_map(&self) -> HashMap<&str, &Value>;
/// Convert to JSON
pub fn to_json(&self) -> serde_json::Value;
}
// Index access
impl Index<usize> for Row {
type Output = Value;
fn index(&self, index: usize) -> &Value;
}
// Example
for row in &results {
let title: String = row.get("p.title")?;
let year: i32 = row.get("p.year")?;
let citations: Option<i64> = row.try_get("p.citations");
println!("{} ({}) - {:?} citations", title, year, citations);
}
Node¶
/// Graph node returned from queries
pub struct Node {
// Internal state
}
impl Node {
/// Get vertex ID
pub fn id(&self) -> Vid;
/// Get label name
pub fn label(&self) -> &str;
/// Get all properties
pub fn properties(&self) -> &HashMap<String, Value>;
/// Get typed property value
pub fn get<T: FromValue>(&self, property: &str) -> Result<T>;
/// Try to get property (returns None if missing)
pub fn try_get<T: FromValue>(&self, property: &str) -> Option<T>;
}
// Example: Query returns nodes
let results = db.query("MATCH (p:Paper) RETURN p").await?;
for row in &results {
let node: Node = row.get("p")?;
println!("Label: {}, ID: {}", node.label(), node.id());
println!("Title: {}", node.get::<String>("title")?);
}
Edge¶
/// Graph edge returned from queries
pub struct Edge {
// Internal state
}
impl Edge {
/// Get edge ID
pub fn id(&self) -> Eid;
/// Get edge type name
pub fn edge_type(&self) -> &str;
/// Get source vertex ID
pub fn src(&self) -> Vid;
/// Get destination vertex ID
pub fn dst(&self) -> Vid;
/// Get all properties
pub fn properties(&self) -> &HashMap<String, Value>;
/// Get typed property value
pub fn get<T: FromValue>(&self, property: &str) -> Result<T>;
}
// Example
let results = db.query("MATCH (a:Author)-[r:AUTHORED]->(p:Paper) RETURN r").await?;
for row in &results {
let edge: Edge = row.get("r")?;
println!("Type: {}, From: {} To: {}", edge.edge_type(), edge.src(), edge.dst());
}
Path¶
/// Graph path (sequence of nodes and edges)
pub struct Path {
// Internal state
}
impl Path {
/// Get all nodes in the path
pub fn nodes(&self) -> &[Node];
/// Get all edges in the path
pub fn edges(&self) -> &[Edge];
/// Get path length (number of edges)
pub fn len(&self) -> usize;
/// Check if path is empty
pub fn is_empty(&self) -> bool;
/// Get start node
pub fn start(&self) -> &Node;
/// Get end node
pub fn end(&self) -> &Node;
}
// Example: Variable-length path query
let results = db.query(
"MATCH path = (a:Paper)-[:CITES*1..3]->(b:Paper)
WHERE a.title = 'Attention Is All You Need'
RETURN path"
).await?;
for row in &results {
let path: Path = row.get("path")?;
println!("Path length: {} hops", path.len());
println!("Start: {}", path.start().get::<String>("title")?);
println!("End: {}", path.end().get::<String>("title")?);
}
Value¶
/// Dynamic value type for properties and results
#[derive(Clone, Debug, PartialEq)]
pub enum Value {
Null,
Bool(bool),
Int(i64),
Float(f64),
String(String),
Bytes(Vec<u8>),
List(Vec<Value>),
Map(HashMap<String, Value>),
Node(Node),
Edge(Edge),
Path(Path),
Vector(Vec<f32>),
}
impl Value {
// Type checking
pub fn is_null(&self) -> bool;
pub fn is_bool(&self) -> bool;
pub fn is_int(&self) -> bool;
pub fn is_float(&self) -> bool;
pub fn is_string(&self) -> bool;
pub fn is_list(&self) -> bool;
pub fn is_map(&self) -> bool;
pub fn is_node(&self) -> bool;
pub fn is_edge(&self) -> bool;
pub fn is_path(&self) -> bool;
pub fn is_vector(&self) -> bool;
// Accessors (return None if wrong type)
pub fn as_bool(&self) -> Option<bool>;
pub fn as_i64(&self) -> Option<i64>;
pub fn as_f64(&self) -> Option<f64>;
pub fn as_str(&self) -> Option<&str>;
pub fn as_bytes(&self) -> Option<&[u8]>;
pub fn as_list(&self) -> Option<&[Value]>;
pub fn as_map(&self) -> Option<&HashMap<String, Value>>;
pub fn as_node(&self) -> Option<&Node>;
pub fn as_edge(&self) -> Option<&Edge>;
pub fn as_path(&self) -> Option<&Path>;
pub fn as_vector(&self) -> Option<&[f32]>;
}
// From implementations for common types
impl From<bool> for Value { ... }
impl From<i32> for Value { ... }
impl From<i64> for Value { ... }
impl From<f64> for Value { ... }
impl From<String> for Value { ... }
impl From<&str> for Value { ... }
impl From<Vec<f32>> for Value { ... }
FromValue Trait¶
/// Trait for converting from Value
pub trait FromValue: Sized {
fn from_value(value: &Value) -> Result<Self>;
}
// Implemented for:
// - String, &str
// - bool
// - i32, i64, u32, u64
// - f32, f64
// - Vec<T> where T: FromValue
// - Option<T> where T: FromValue
// - Node, Edge, Path
// - Vid, Eid
// - Vec<f32> (vectors)
Schema Management¶
Schema Builder¶
impl Uni {
/// Get schema builder for modifications
pub fn schema(&self) -> SchemaBuilder<'_>;
/// Load schema from file
pub async fn load_schema(&self, path: impl AsRef<Path>) -> Result<()>;
/// Save schema to file
pub async fn save_schema(&self, path: impl AsRef<Path>) -> Result<()>;
}
/// Fluent schema builder
pub struct SchemaBuilder<'a> {
// Internal state
}
impl<'a> SchemaBuilder<'a> {
/// Add a new label (vertex type)
pub fn label(self, name: &str) -> LabelBuilder<'a>;
/// Add a new edge type
pub fn edge_type(
self,
name: &str,
from_labels: &[&str],
to_labels: &[&str],
) -> EdgeTypeBuilder<'a>;
/// Apply all schema changes
pub async fn apply(self) -> Result<()>;
}
LabelBuilder¶
/// Builder for label definitions
pub struct LabelBuilder<'a> {
// Internal state
}
impl<'a> LabelBuilder<'a> {
/// Mark as document collection (enables JSON storage)
pub fn document(self) -> Self;
/// Add a required property
pub fn property(self, name: &str, data_type: DataType) -> Self;
/// Add a nullable property
pub fn property_nullable(self, name: &str, data_type: DataType) -> Self;
/// Add a vector property
pub fn vector(self, name: &str, dimensions: usize) -> Self;
/// Add an index on a property
pub fn index(self, property: &str, index_type: IndexType) -> Self;
/// Finish this label and return to SchemaBuilder
pub fn done(self) -> SchemaBuilder<'a>;
/// Chain to another label
pub fn label(self, name: &str) -> LabelBuilder<'a>;
/// Chain to an edge type
pub fn edge_type(
self,
name: &str,
from: &[&str],
to: &[&str],
) -> EdgeTypeBuilder<'a>;
/// Apply all schema changes
pub async fn apply(self) -> Result<()>;
}
EdgeTypeBuilder¶
/// Builder for edge type definitions
pub struct EdgeTypeBuilder<'a> {
// Internal state
}
impl<'a> EdgeTypeBuilder<'a> {
/// Add a required property
pub fn property(self, name: &str, data_type: DataType) -> Self;
/// Add a nullable property
pub fn property_nullable(self, name: &str, data_type: DataType) -> Self;
/// Finish and return to SchemaBuilder
pub fn done(self) -> SchemaBuilder<'a>;
/// Chain to a label
pub fn label(self, name: &str) -> LabelBuilder<'a>;
/// Chain to another edge type
pub fn edge_type(
self,
name: &str,
from: &[&str],
to: &[&str],
) -> EdgeTypeBuilder<'a>;
/// Apply all schema changes
pub async fn apply(self) -> Result<()>;
}
Schema Example¶
// Define schema using fluent API
db.schema()
.label("Paper")
.property("title", DataType::String)
.property("year", DataType::Int32)
.property_nullable("abstract", DataType::String)
.vector("embedding", 768)
.index("year", IndexType::Scalar(ScalarType::BTree))
.index("embedding", IndexType::Vector(VectorIndexCfg {
algorithm: VectorAlgo::Hnsw { m: 16, ef_construction: 200 },
metric: VectorMetric::Cosine,
}))
.label("Author")
.property("name", DataType::String)
.property_nullable("email", DataType::String)
.index("name", IndexType::Scalar(ScalarType::Hash))
.edge_type("AUTHORED", &["Author"], &["Paper"])
.property_nullable("position", DataType::Int32)
.edge_type("CITES", &["Paper"], &["Paper"])
.apply()
.await?;
Schema Types¶
/// Property data type
#[derive(Clone, Debug, PartialEq)]
pub enum DataType {
String,
Int32,
Int64,
Float32,
Float64,
Bool,
Timestamp,
Json,
Vector { dimensions: usize },
Crdt(CrdtType),
}
/// CRDT type variants
#[derive(Clone, Debug, PartialEq)]
pub enum CrdtType {
GCounter,
GSet,
ORSet,
LWWRegister,
LWWMap,
Rga,
}
/// Index type configuration
#[derive(Clone, Debug)]
pub enum IndexType {
Vector(VectorIndexCfg),
FullText,
Scalar(ScalarType),
}
/// Vector index configuration
#[derive(Clone, Debug)]
pub struct VectorIndexCfg {
pub algorithm: VectorAlgo,
pub metric: VectorMetric,
}
/// Vector index algorithms
#[derive(Clone, Debug)]
pub enum VectorAlgo {
/// HNSW index (fast, high recall)
Hnsw { m: u32, ef_construction: u32 },
/// IVF-PQ index (memory efficient)
IvfPq { partitions: u32, sub_vectors: u32 },
/// Flat index (exact, slow)
Flat,
}
/// Vector distance metrics
#[derive(Clone, Copy, Debug)]
pub enum VectorMetric {
Cosine,
L2,
Dot,
}
/// Scalar index types
#[derive(Clone, Copy, Debug)]
pub enum ScalarType {
BTree, // Range queries
Hash, // Equality queries
Bitmap, // Low cardinality
}
Vector Search¶
Basic Vector Search¶
impl Uni {
/// Perform vector similarity search
pub async fn vector_search(
&self,
label: &str,
property: &str,
query: Vec<f32>,
k: usize,
) -> Result<Vec<VectorMatch>>;
/// Vector search with options
pub fn vector_search_with(
&self,
label: &str,
property: &str,
query: Vec<f32>,
) -> VectorSearchBuilder<'_>;
}
/// Vector search result
#[derive(Clone, Debug)]
pub struct VectorMatch {
pub vid: Vid,
pub distance: f32,
}
VectorSearchBuilder¶
/// Fluent builder for vector searches
pub struct VectorSearchBuilder<'a> {
// Internal state
}
impl<'a> VectorSearchBuilder<'a> {
/// Set number of results to return
pub fn k(self, k: usize) -> Self;
/// Set distance threshold (filter out results above this)
pub fn threshold(self, threshold: f32) -> Self;
/// Add a filter predicate (Cypher WHERE syntax)
pub fn filter(self, filter: &str) -> Self;
/// Execute search and return matches
pub async fn search(self) -> Result<Vec<VectorMatch>>;
/// Execute search and fetch full nodes
pub async fn fetch_nodes(self) -> Result<Vec<(Node, f32)>>;
}
Vector Search Examples¶
// Simple vector search
let query_embedding = embedding_service.embed("machine learning")?;
let matches = db.vector_search("Paper", "embedding", query_embedding, 10).await?;
for m in matches {
println!("VID: {}, Distance: {:.4}", m.vid, m.distance);
}
// Vector search with filters and fetch nodes
let results = db.vector_search_with("Paper", "embedding", query_embedding)
.k(20)
.threshold(0.5)
.filter("node.year >= 2020")
.fetch_nodes()
.await?;
for (node, distance) in results {
println!("{} ({:.4})", node.get::<String>("title")?, distance);
}
// Vector search via Cypher
let results = db.query_with(
"CALL db.idx.vector.query('Paper', 'embedding', $vec, 10)
YIELD node, distance
WHERE node.year >= 2020
RETURN node.title, distance
ORDER BY distance"
)
.param("vec", query_embedding)
.fetch_all()
.await?;
Graph Algorithms¶
AlgoBuilder¶
impl Uni {
/// Access algorithm builder
pub fn algo(&self) -> AlgoBuilder<'_>;
}
/// Builder for graph algorithms
pub struct AlgoBuilder<'a> {
// Internal state
}
impl<'a> AlgoBuilder<'a> {
/// PageRank centrality algorithm
pub fn pagerank(self) -> PageRankBuilder<'a>;
/// Weakly Connected Components
pub fn wcc(self) -> WccBuilder<'a>;
}
PageRank¶
/// PageRank algorithm builder
pub struct PageRankBuilder<'a> {
// Internal state
}
impl<'a> PageRankBuilder<'a> {
/// Filter to specific labels
pub fn labels(self, labels: &[&str]) -> Self;
/// Filter to specific edge types
pub fn edge_types(self, types: &[&str]) -> Self;
/// Set damping factor (default: 0.85)
pub fn damping(self, d: f64) -> Self;
/// Set maximum iterations (default: 20)
pub fn max_iterations(self, n: usize) -> Self;
/// Set convergence tolerance (default: 1e-6)
pub fn tolerance(self, tol: f64) -> Self;
/// Run the algorithm
pub async fn run(self) -> Result<Vec<(Vid, f64)>>;
}
// Example
let rankings = db.algo()
.pagerank()
.labels(&["Paper"])
.edge_types(&["CITES"])
.damping(0.85)
.max_iterations(50)
.run()
.await?;
for (vid, score) in rankings.iter().take(10) {
println!("VID: {}, PageRank: {:.6}", vid, score);
}
Weakly Connected Components¶
/// WCC algorithm builder
pub struct WccBuilder<'a> {
// Internal state
}
impl<'a> WccBuilder<'a> {
/// Filter to specific labels
pub fn labels(self, labels: &[&str]) -> Self;
/// Filter to specific edge types
pub fn edge_types(self, types: &[&str]) -> Self;
/// Run the algorithm
pub async fn run(self) -> Result<Vec<(Vid, i64)>>;
}
// Example: Find connected components
let components = db.algo()
.wcc()
.labels(&["Paper", "Author"])
.edge_types(&["AUTHORED", "CITES"])
.run()
.await?;
// Count component sizes
let mut component_sizes: HashMap<i64, usize> = HashMap::new();
for (_, component_id) in &components {
*component_sizes.entry(*component_id).or_default() += 1;
}
println!("Found {} components", component_sizes.len());
Algorithms via Cypher¶
// PageRank via Cypher CALL
let results = db.query(
"CALL algo.pageRank('Paper', 'CITES', {damping: 0.85, iterations: 20})
YIELD nodeId, score
RETURN nodeId, score
ORDER BY score DESC
LIMIT 10"
).await?;
// Shortest path
let results = db.query(
"CALL algo.shortestPath($startVid, $endVid, 'CITES')
YIELD path, distance
RETURN path, distance"
)
.param("startVid", start_vid.as_u64() as i64)
.param("endVid", end_vid.as_u64() as i64)
.fetch_all()
.await?;
// Louvain community detection
let results = db.query(
"CALL algo.louvain('Paper', 'CITES')
YIELD nodeId, communityId
RETURN communityId, COUNT(*) AS size
ORDER BY size DESC"
).await?;
Core Types¶
Vid (Vertex ID)¶
/// 64-bit vertex identifier
/// Encoding: label_id (16 bits) | local_offset (48 bits)
#[derive(Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct Vid(u64);
impl Vid {
/// Create from label ID and offset
pub fn new(label_id: u16, local_offset: u64) -> Self;
/// Extract label ID (upper 16 bits)
pub fn label_id(&self) -> u16;
/// Extract local offset (lower 48 bits)
pub fn local_offset(&self) -> u64;
/// Get raw u64 value
pub fn as_u64(&self) -> u64;
}
impl From<u64> for Vid { ... }
impl Display for Vid { ... } // Formats as "0x{:016x}"
// Example
let vid = Vid::new(1, 12345);
assert_eq!(vid.label_id(), 1);
assert_eq!(vid.local_offset(), 12345);
Eid (Edge ID)¶
/// 64-bit edge identifier
/// Encoding: type_id (16 bits) | local_offset (48 bits)
#[derive(Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct Eid(u64);
impl Eid {
/// Create from edge type ID and offset
pub fn new(type_id: u16, local_offset: u64) -> Self;
/// Extract edge type ID
pub fn type_id(&self) -> u16;
/// Extract local offset
pub fn local_offset(&self) -> u64;
/// Get raw u64 value
pub fn as_u64(&self) -> u64;
}
impl From<u64> for Eid { ... }
impl Display for Eid { ... }
UniId¶
/// Content-addressed identifier (SHA3-256 hash)
/// Used for provenance tracking and distributed sync
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct UniId([u8; 32]);
impl UniId {
/// Create from raw bytes
pub fn from_bytes(bytes: [u8; 32]) -> Self;
/// Compute from content
pub fn compute(content: &[u8]) -> Self;
/// Get as hex string
pub fn to_hex(&self) -> String;
/// Parse from hex string
pub fn from_hex(s: &str) -> Result<Self>;
/// Get raw bytes
pub fn as_bytes(&self) -> &[u8; 32];
}
Blocking API (UniSync)¶
For applications that cannot use async/await.
/// Blocking wrapper for Uni
pub struct UniSync {
// Internal state
}
impl UniSync {
/// Open database (blocking)
pub fn open(path: impl AsRef<Path>) -> Result<Self>;
/// Create in-memory database (blocking)
pub fn in_memory() -> Result<Self>;
/// Execute query (blocking)
pub fn query(&self, cypher: &str) -> Result<QueryResult>;
/// Execute mutation (blocking)
pub fn execute(&self, cypher: &str) -> Result<ExecuteResult>;
/// Query with parameters
pub fn query_with(&self, cypher: &str) -> QueryBuilderSync<'_>;
/// Get current schema
pub fn schema_meta(&self) -> Schema;
/// Get schema builder
pub fn schema(&self) -> SchemaBuilderSync<'_>;
/// Begin transaction (blocking)
pub fn begin(&self) -> Result<TransactionSync<'_>>;
}
/// Blocking transaction
pub struct TransactionSync<'a> {
// Internal state
}
impl<'a> TransactionSync<'a> {
pub fn query(&self, cypher: &str) -> Result<QueryResult>;
pub fn execute(&self, cypher: &str) -> Result<ExecuteResult>;
pub fn commit(self) -> Result<()>;
pub fn rollback(self) -> Result<()>;
}
// Example
let db = UniSync::open("./data")?;
let results = db.query("MATCH (p:Paper) RETURN p.title LIMIT 10")?;
for row in &results {
println!("{}", row.get::<String>("p.title")?);
}
Configuration¶
/// Database configuration
#[derive(Clone, Debug)]
pub struct UniConfig {
/// Cache size in bytes (default: 1 GB)
pub cache_size: usize,
/// Worker thread count (default: available cores)
pub parallelism: usize,
/// Batch size for vectorized execution (default: 1024)
pub batch_size: usize,
/// Maximum frontier size for traversals (default: 1M)
pub max_frontier_size: usize,
/// Auto-flush after this many mutations (default: 10K)
pub auto_flush_threshold: usize,
/// Enable write-ahead log (default: true)
pub wal_enabled: bool,
}
impl Default for UniConfig {
fn default() -> Self {
Self {
cache_size: 1024 * 1024 * 1024, // 1 GB
parallelism: num_cpus::get(),
batch_size: 1024,
max_frontier_size: 1_000_000,
auto_flush_threshold: 10_000,
wal_enabled: true,
}
}
}
// Example
let config = UniConfig {
cache_size: 4 * 1024 * 1024 * 1024, // 4 GB
parallelism: 16,
..Default::default()
};
let db = Uni::open("./data")
.config(config)
.build()
.await?;
Error Handling¶
/// Main error type
#[derive(Debug, thiserror::Error)]
pub enum UniError {
#[error("Database not found: {path}")]
NotFound { path: PathBuf },
#[error("Schema error: {message}")]
Schema { message: String },
#[error("Parse error: {message}")]
Parse {
message: String,
position: Option<usize>,
line: Option<usize>,
column: Option<usize>,
context: Option<String>,
},
#[error("Query error: {message}")]
Query {
message: String,
query: Option<String>,
},
#[error("Transaction error: {message}")]
Transaction { message: String },
#[error("Transaction conflict: {message}")]
TransactionConflict { message: String },
#[error("Database is locked by another process")]
DatabaseLocked,
#[error("Query timeout after {timeout_ms}ms")]
Timeout { timeout_ms: u64 },
#[error("Type error: expected {expected}, got {actual}")]
Type { expected: String, actual: String },
#[error("Constraint violation: {message}")]
Constraint { message: String },
#[error("Storage error: {message}")]
Storage {
message: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Internal error: {0}")]
Internal(#[from] anyhow::Error),
}
/// Result type alias
pub type Result<T> = std::result::Result<T, UniError>;
// Error handling example
match db.query("INVALID QUERY").await {
Ok(results) => { /* ... */ }
Err(UniError::Parse { message, line, column, .. }) => {
eprintln!("Parse error at {}:{}: {}", line.unwrap_or(0), column.unwrap_or(0), message);
}
Err(UniError::Query { message, query }) => {
eprintln!("Query failed: {}", message);
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
Prelude¶
Convenient re-exports for common usage.
pub use crate::{
Uni, UniBuilder, UniSync, UniConfig, UniError, Result,
Transaction, QueryBuilder,
};
pub use crate::query::{
QueryResult, Row, Node, Edge, Path, Value, FromValue, ExecuteResult,
};
pub use crate::core::{Vid, Eid, UniId};
pub use crate::schema::{
Schema, DataType, CrdtType, IndexType, VectorIndexCfg,
VectorAlgo, VectorMetric, ScalarType,
};
pub use crate::vector::VectorMatch;
Complete Example¶
use uni::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// Create database with schema
let db = Uni::open("./academic-graph")
.cache_size(2 * 1024 * 1024 * 1024)
.build()
.await?;
// Define schema
db.schema()
.label("Paper")
.property("title", DataType::String)
.property("year", DataType::Int32)
.vector("embedding", 768)
.index("year", IndexType::Scalar(ScalarType::BTree))
.label("Author")
.property("name", DataType::String)
.edge_type("AUTHORED", &["Author"], &["Paper"])
.edge_type("CITES", &["Paper"], &["Paper"])
.apply()
.await?;
// Insert data in a transaction
db.transaction(|tx| async move {
tx.execute("CREATE (a:Author {name: 'Alice'})").await?;
tx.execute("CREATE (p:Paper {title: 'Graph Databases', year: 2024})").await?;
tx.execute(
"MATCH (a:Author {name: 'Alice'}), (p:Paper {title: 'Graph Databases'})
CREATE (a)-[:AUTHORED]->(p)"
).await?;
Ok(())
}).await?;
// Query with parameters
let results = db.query_with(
"MATCH (a:Author)-[:AUTHORED]->(p:Paper)
WHERE p.year >= $min_year
RETURN a.name AS author, p.title AS paper, p.year AS year
ORDER BY p.year DESC"
)
.param("min_year", 2020)
.fetch_all()
.await?;
println!("Found {} results:", results.len());
for row in &results {
println!(
" {} wrote '{}' ({})",
row.get::<String>("author")?,
row.get::<String>("paper")?,
row.get::<i32>("year")?
);
}
// Run PageRank on citation network
let rankings = db.algo()
.pagerank()
.labels(&["Paper"])
.edge_types(&["CITES"])
.run()
.await?;
println!("\nTop 5 papers by PageRank:");
for (vid, score) in rankings.iter().take(5) {
let result = db.query_with("MATCH (p:Paper) WHERE id(p) = $vid RETURN p.title")
.param("vid", vid.as_u64() as i64)
.fetch_all()
.await?;
if let Some(row) = result.rows().first() {
println!(" {:.6}: {}", score, row.get::<String>("p.title")?);
}
}
Ok(())
}
API Gaps & Workarounds¶
The following features exist internally but have limited or no exposure through the public Rust API. This section documents what's available and workarounds where applicable.
Feature Availability Summary¶
| Feature | Rust API | Cypher | Notes |
|---|---|---|---|
| Basic CRUD | ✅ | ✅ | query(), execute() |
| Parameterized queries | ✅ | ✅ | query_with().param() |
| Transactions | ✅ | ✅ | begin(), transaction() |
| Schema definition | ✅ | ✅ | schema() builder |
| Vector search | ✅ | ✅ | vector_search() + db.idx.vector.query() |
| PageRank / WCC | ✅ | ✅ | algo().pagerank() |
| Session variables | ✅ | ✅ | session().set().build() + $session.* |
| Bulk loading | ✅ | — | bulk_writer() with deferred indexing |
| Snapshots | ✅ | ✅ | at_snapshot(), list_snapshots(), db.snapshot.* |
| Temporal queries | — | ✅ | uni.validAt(), VALID_AT macro |
| Schema DDL procedures | — | ✅ | db.createLabel(), db.createIndex() |
| Schema introspection | ✅ | ✅ | db.labels(), db.indexes(), db.constraints() |
| EXPLAIN/PROFILE | ✅ | ✅ | explain(), profile() with index usage |
| Import/Export | ❌ | ✅ | Use COPY (see below) |
| Embedding generation | ❌ | ✅ | Auto on CREATE with schema config |
| Other algorithms | ❌ | ✅ | Use CALL algo.* |
| CRDT operations | ❌ | ✅ | Schema type + Cypher |
| S3/GCS storage | ❌ | — | Filesystem assumptions in metadata ops |
| Compaction control | ❌ | ❌ | Automatic only |
Batch Ingestion¶
No direct put_batch() method exists. Use Cypher UNWIND for batch operations:
// Batch insert via UNWIND
let papers = vec![
json!({"title": "Paper 1", "year": 2024}),
json!({"title": "Paper 2", "year": 2023}),
json!({"title": "Paper 3", "year": 2024}),
];
db.query_with(
"UNWIND $papers AS p
CREATE (n:Paper {title: p.title, year: p.year})"
)
.param("papers", papers)
.fetch_all()
.await?;
Import/Export¶
No direct import/export methods. Use Cypher COPY:
// Import from CSV
db.execute("COPY Paper FROM 'papers.csv'").await?;
// Export to Parquet
db.execute("COPY Paper TO 'papers.parquet' WITH {format: 'parquet'}").await?;
// Export to CSV
db.execute("COPY Paper TO 'papers.csv' WITH {format: 'csv'}").await?;
Graph Algorithms¶
Only PageRank and WCC have dedicated builder APIs. Other algorithms are accessible via Cypher:
// Louvain community detection
let results = db.query(
"CALL algo.louvain('Paper', 'CITES')
YIELD nodeId, communityId
RETURN communityId, COUNT(*) AS size
ORDER BY size DESC"
).await?;
// Shortest path
let results = db.query_with(
"CALL algo.shortestPath($start, $end, 'CITES')
YIELD path, distance
RETURN path, distance"
)
.param("start", start_vid)
.param("end", end_vid)
.fetch_all()
.await?;
// Label propagation
let results = db.query(
"CALL algo.labelPropagation('Paper', 'CITES')
YIELD nodeId, communityId
RETURN communityId, COUNT(*) AS size"
).await?;
Embedding Generation¶
Embeddings are auto-generated on CREATE when configured in schema. No direct embedding API:
// Schema with embedding config (embeddings auto-generated on insert)
db.schema()
.label("Paper")
.property("title", DataType::String)
.property("abstract", DataType::String)
.vector("embedding", 384) // Dimensions match model
.apply()
.await?;
// Embedding generated automatically from configured source properties
db.execute("CREATE (p:Paper {title: 'ML Paper', abstract: 'Deep learning...'})").await?;
// Query embeddings via vector search
let results = db.query(
"CALL db.idx.vector.query('Paper', 'embedding', $query_vec, 10)
YIELD node, distance
RETURN node.title, distance"
)
.param("query_vec", query_embedding)
.fetch_all()
.await?;
CRDT Types¶
CRDTs can be defined in schema but manipulated only via Cypher:
// Define CRDT property
db.schema()
.label("Counter")
.property("value", DataType::Crdt(CrdtType::GCounter))
.apply()
.await?;
// Increment counter via Cypher
db.execute("MATCH (c:Counter {id: 'visits'}) SET c.value = crdt.increment(c.value, 1)").await?;
// Read counter
let results = db.query("MATCH (c:Counter {id: 'visits'}) RETURN crdt.value(c.value) AS count").await?;
Storage Backend¶
Currently only local filesystem paths are supported. While Lance (the underlying storage) supports S3/GCS/Azure natively, Uni's metadata operations (schema, snapshots, WAL, ID allocation) use std::fs directly:
// Local storage (supported)
let db = Uni::open("./my-graph").build().await?;
// S3/GCS/Azure (NOT SUPPORTED)
// Blocked by filesystem assumptions in:
// - SchemaManager (fs::read_to_string, fs::write)
// - SnapshotManager (fs::create_dir_all, fs::write)
// - WriteAheadLog (File::open)
// - IdAllocator (fs::rename for atomic writes)
Supporting object stores would require abstracting these operations to use object_store crate throughout.
Full-Text Search¶
FTS indexes can be created but querying is not yet implemented:
// Create FTS index (works)
db.execute("CREATE FULLTEXT INDEX bio_fts FOR (p:Person) ON EACH [p.bio]").await?;
// FTS query (NOT YET IMPLEMENTED)
// CONTAINS, STARTS WITH, ENDS WITH not supported in parser
// No db.idx.fts.query() procedure exists
What's NOT Accessible¶
These internal features have no public access path:
- Compaction control: No
compact()method; runs automatically - Snapshot management: Internal only; no create/restore API
- WAL management: Only
wal_enabledconfig flag; no rotation/recovery API - ID allocation: Internal
IdAllocator; no public ID control - Subgraph extraction: Internal
load_subgraph_cached(); no public method - Property lazy-loading: Internal
PropertyManager; transparent to users
Next Steps¶
- Configuration Reference — All configuration options
- Cypher Querying — Query language guide
- Vector Search — Embedding search patterns
- Troubleshooting — Common issues and solutions