Skip to content

API Gap Implementation Plan

This document outlines the plan to expose internal Uni features that currently have no public access path (neither Rust API nor Cypher).

Gap Summary

Gap Current State Priority Effort
Background Compaction Manual only, not exposed P1 5-6 days
S3/GCS Storage std::fs in metadata ops P1 7-10 days
FTS Queries Index creation only P2 3-4 days
Snapshot Management Internal only P3 2-3 days

1. Background Compaction

Current State

  • L0 → L1 Flush: ✅ Automatic (triggers at 10K mutations via check_flush())
  • L1 → L2 Compaction: ❌ Manual only, not exposed in public API
  • L1 runs accumulate without automatic compaction
  • No background thread for compaction
  • No write throttling when L1 grows too large

Industry Comparison

Database Compaction Model
RocksDB Background threads, automatic, write stalling
LanceDB Cloud Automatic background compaction
LanceDB OSS Manual via table.optimize()
SQLite auto_vacuum pragma or manual VACUUM
Uni (current) Manual only, not exposed

Proposed Solution

Phase 1: Background Compaction Thread

pub struct CompactionConfig {
    /// Enable background compaction (default: true)
    pub enabled: bool,

    /// Max L1 runs before triggering compaction (default: 4)
    pub max_l1_runs: usize,

    /// Max L1 size in bytes before compaction (default: 256MB)
    pub max_l1_size_bytes: u64,

    /// Max age of oldest L1 run before compaction (default: 1 hour)
    pub max_l1_age: Duration,

    /// Background check interval (default: 30s)
    pub check_interval: Duration,

    /// Number of compaction worker threads (default: 1)
    pub worker_threads: usize,
}

impl Default for CompactionConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            max_l1_runs: 4,
            max_l1_size_bytes: 256 * 1024 * 1024,
            max_l1_age: Duration::from_secs(3600),
            check_interval: Duration::from_secs(30),
            worker_threads: 1,
        }
    }
}

Phase 2: Compaction Scheduler

impl Uni {
    /// Starts background compaction worker (called internally on build)
    fn start_background_compaction(&self) {
        if !self.config.compaction.enabled {
            return;
        }

        let uni = self.clone();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(uni.config.compaction.check_interval);
            loop {
                interval.tick().await;

                if let Some(task) = uni.pick_compaction_task() {
                    if let Err(e) = uni.execute_compaction(task).await {
                        log::error!("Compaction failed: {}", e);
                    }
                }
            }
        });
    }

    fn pick_compaction_task(&self) -> Option<CompactionTask> {
        let status = self.compaction_status();

        // Check triggers in priority order
        if status.l1_runs >= self.config.compaction.max_l1_runs {
            return Some(CompactionTask::ByRunCount);
        }
        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
            return Some(CompactionTask::BySize);
        }
        if status.oldest_l1_age >= self.config.compaction.max_l1_age {
            return Some(CompactionTask::ByAge);
        }
        None
    }
}

Phase 3: Write Throttling (Backpressure)

Like RocksDB, slow down writes when compaction can't keep up:

pub struct WriteThrottleConfig {
    /// L1 run count to start throttling (default: 8)
    pub soft_limit: usize,

    /// L1 run count to stop writes entirely (default: 16)
    pub hard_limit: usize,

    /// Base delay when throttling (default: 10ms)
    pub base_delay: Duration,
}

impl Writer {
    async fn check_write_pressure(&self) -> Result<()> {
        let l1_runs = self.storage.l1_run_count();

        if l1_runs >= self.config.throttle.hard_limit {
            // Block until compaction catches up
            log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
            self.wait_for_compaction().await?;
        } else if l1_runs >= self.config.throttle.soft_limit {
            // Progressive delay
            let delay = self.calculate_backpressure_delay(l1_runs);
            log::debug!("Write throttled: {}ms delay", delay.as_millis());
            tokio::time::sleep(delay).await;
        }
        Ok(())
    }

    fn calculate_backpressure_delay(&self, l1_runs: usize) -> Duration {
        let excess = l1_runs - self.config.throttle.soft_limit;
        let multiplier = 2_u32.pow(excess as u32);
        self.config.throttle.base_delay * multiplier
    }
}

Phase 4: Public API

impl Uni {
    /// Trigger manual compaction (all labels, all edge types)
    pub async fn compact(&self) -> Result<CompactionStats>;

    /// Compact specific label
    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats>;

    /// Compact specific edge type
    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats>;

    /// Get compaction status
    pub fn compaction_status(&self) -> CompactionStatus;

    /// Wait for all pending compactions to complete
    pub async fn wait_for_compaction(&self) -> Result<()>;
}

pub struct CompactionStats {
    pub files_compacted: usize,
    pub bytes_before: u64,
    pub bytes_after: u64,
    pub duration: Duration,
    pub crdt_merges: usize,
}

pub struct CompactionStatus {
    pub l1_runs: usize,
    pub l1_size_bytes: u64,
    pub oldest_l1_age: Duration,
    pub compaction_in_progress: bool,
    pub compaction_pending: usize,
    pub last_compaction: Option<SystemTime>,
    pub total_compactions: u64,
    pub total_bytes_compacted: u64,
}

Cypher Procedures

-- Trigger compaction
CALL db.compact()
YIELD filesCompacted, bytesBefore, bytesAfter, durationMs, crdtMerges

-- Check status
CALL db.compactionStatus()
YIELD l1Runs, l1SizeBytes, compactionInProgress, lastCompaction

Tasks

Task Effort
CompactionConfig struct 0.5 day
Background compaction thread 1 day
Compaction task picker/scheduler 1 day
Write throttling/backpressure 1 day
Public API (compact(), compaction_status()) 0.5 day
Cypher procedures 0.5 day
Tests 1 day

Total: 5-6 days


2. S3/GCS Storage Backend

Current State

Lance datasets support S3/GCS/Azure natively, but Uni's metadata operations use std::fs:

Component Blocking Code Challenge
UniBuilder.build() std::fs::create_dir_all() Directory creation
SchemaManager fs::read_to_string(), fs::write() File I/O
SnapshotManager fs::create_dir_all(), fs::write() File I/O
WriteAheadLog File::open(), append Append-only semantics
IdAllocator fs::rename() Atomic rename

Object Store Challenges

Challenge Local FS S3/GCS
Latency <1ms 50-200ms
Atomic rename fs::rename() ❌ Copy + Delete
Cost Free Per-operation cost
Append ✅ Native ❌ Rewrite entire object
Concurrent writes File locks Requires coordination

Proposed Solution: Phased Approach

Phase 1: Local-Only with Background Compaction (5-6 days)

Focus on compaction first (see above). This provides immediate value for all deployments.

Phase 2: Hybrid Architecture (5-7 days)

Recommended for most cloud deployments.

┌─────────────────────────────────────────────────────────────┐
│  LOCAL (fast, atomic)                                        │
│  ├── WAL (append-only log, needs low latency)               │
│  ├── IdAllocator (atomic counter, needs atomicity)          │
│  └── L0 Buffer (in-memory, already local)                   │
├─────────────────────────────────────────────────────────────┤
│  OBJECT STORE (S3/GCS/Azure)                                │
│  ├── Lance Datasets (vertices, edges, adjacency)            │
│  ├── Schema (versioned JSON, infrequent writes)             │
│  └── Snapshots (manifest files, infrequent writes)          │
└─────────────────────────────────────────────────────────────┘

Implementation:

pub struct HybridStorageConfig {
    /// Local path for WAL and ID allocation
    pub local_path: PathBuf,

    /// Object store URL for data (s3://bucket/prefix)
    pub data_url: String,

    /// Object store credentials
    pub credentials: Option<ObjectStoreCredentials>,
}

impl UniBuilder {
    /// Configure hybrid storage (local WAL + S3 data)
    pub fn hybrid(
        self,
        local_path: impl AsRef<Path>,
        data_url: &str,
    ) -> Self;
}

// Usage
let db = Uni::hybrid("./local-wal", "s3://my-bucket/graphs/prod")
    .credentials(ObjectStoreCredentials::from_env())
    .build()
    .await?;

Tasks:

Task Effort
HybridStorageConfig 0.5 day
Refactor SchemaManager to use object_store 1 day
Refactor SnapshotManager to use object_store 1 day
Keep WAL and IdAllocator local 0 days (no change)
UniBuilder::hybrid() configuration 0.5 day
Lance dataset URL passthrough 0.5 day
Integration tests with LocalStack/MinIO 1.5 days

Total Phase 2: 5-7 days

Phase 3: Full Object Store (Optional, 5-7 days)

For serverless/ephemeral compute where local storage isn't available.

pub struct FullObjectStoreConfig {
    /// Object store URL for everything
    pub url: String,

    /// Credentials
    pub credentials: ObjectStoreCredentials,

    /// ID allocation strategy
    pub id_strategy: IdAllocationStrategy,

    /// WAL strategy
    pub wal_strategy: WalStrategy,
}

pub enum IdAllocationStrategy {
    /// Use conditional writes with ETag (optimistic locking)
    ConditionalWrite { batch_size: u64 },

    /// Use external coordination service (DynamoDB, etcd)
    External { endpoint: String },
}

pub enum WalStrategy {
    /// Buffer locally, flush segments to object store
    BufferedSegments { segment_size: usize },

    /// Disable WAL (data loss risk on crash)
    Disabled,
}

IdAllocator on S3 (conditional writes):

impl ObjectStoreIdAllocator {
    async fn allocate_batch(&self) -> Result<Range<u64>> {
        loop {
            // Read current counter
            let (current, etag) = self.read_counter_with_etag().await?;
            let new = current + self.batch_size;

            // Conditional write (fails if etag changed)
            match self.conditional_write(new, &etag).await {
                Ok(_) => return Ok(current..new),
                Err(PreconditionFailed) => continue, // Retry
            }
        }
    }
}

WAL on S3 (segment files):

impl ObjectStoreWal {
    async fn append(&mut self, entry: WalEntry) -> Result<()> {
        self.local_buffer.push(entry);

        if self.local_buffer.len() >= self.segment_size {
            // Flush segment to S3
            let segment_name = format!("wal/segment_{:016x}.bin", self.next_segment_id);
            self.store.put(&segment_name, self.serialize_buffer()).await?;
            self.local_buffer.clear();
            self.next_segment_id += 1;
        }
        Ok(())
    }
}

Tasks:

Task Effort
ObjectStoreIdAllocator with conditional writes 1.5 days
ObjectStoreWal with segment flushing 2 days
Recovery logic for WAL segments 1 day
UniBuilder::object_store() configuration 0.5 day
Integration tests 1.5 days

Total Phase 3: 5-7 days

Deployment Recommended Config Effort
Local development Uni::open("./data") Already works
Cloud (EC2/GKE/AKS) Uni::hybrid("./wal", "s3://...") Phase 2
Serverless (Lambda) Uni::object_store("s3://...") Phase 3

Start with Phase 2 (Hybrid) - covers 90% of cloud use cases with less complexity.


3. Full-Text Search Queries

Current State

  • FTS indexes can be created via DDL: CREATE FULLTEXT INDEX ... FOR (n:Label) ON EACH [n.prop]
  • Index stored using Lance's InvertedIndex
  • No query support: Parser lacks CONTAINS, STARTS WITH, ENDS WITH
  • No procedure: No db.idx.fts.query() exists

Proposed Solution

Add string predicates to the Cypher parser and predicate pushdown:

-- Target syntax
MATCH (p:Person)
WHERE p.bio CONTAINS 'machine learning'
RETURN p

MATCH (p:Person)
WHERE p.name STARTS WITH 'John'
RETURN p

MATCH (p:Person)
WHERE p.email ENDS WITH '@example.com'
RETURN p

Implementation:

  1. Add to Operator enum in ast.rs:

    pub enum Operator {
        // ... existing
        Contains,
        StartsWith,
        EndsWith,
    }
    

  2. Add parser support in parser.rs

  3. Add to predicate pushdown in pushdown.rs:

    // Convert to Lance SQL
    Operator::Contains => format!("{} LIKE '%{}%'", col, escape_like(value)),
    Operator::StartsWith => format!("{} LIKE '{}%'", col, escape_like(value)),
    Operator::EndsWith => format!("{} LIKE '%{}'", col, escape_like(value)),
    

  4. Lance will automatically use inverted index if available

Option B: Explicit FTS Procedure

Add a procedure similar to vector search:

CALL db.idx.fts.query('Person', 'bio', 'machine learning', 10)
YIELD node, score
RETURN node.name, score
ORDER BY score DESC

Recommendation

Start with Option A (string predicates) - simpler, more intuitive, follows Cypher standard.

Tasks

Task Effort
Add Contains/StartsWith/EndsWith to Operator enum 0.5 day
Parser support for string predicates 1 day
Predicate pushdown to Lance 1 day
Unit tests 0.5 day
Integration tests with FTS index 1 day

Total: 3-4 days


4. Snapshot Management

Current State

  • SnapshotManager exists internally
  • Creates snapshots automatically on flush
  • Snapshots stored as JSON manifests
  • No public API to create/restore/list snapshots
  • Feature-gated behind snapshot-internals

Proposed Solution

Rust API

impl Uni {
    /// Create a named snapshot
    pub async fn create_snapshot(&self, name: &str) -> Result<SnapshotId>;

    /// List all snapshots
    pub async fn list_snapshots(&self) -> Result<Vec<SnapshotInfo>>;

    /// Get snapshot details
    pub async fn get_snapshot(&self, id: &SnapshotId) -> Result<SnapshotInfo>;

    /// Restore to a snapshot (creates new database state)
    pub async fn restore_snapshot(&self, id: &SnapshotId) -> Result<()>;

    /// Delete a snapshot
    pub async fn delete_snapshot(&self, id: &SnapshotId) -> Result<()>;

    /// Open database at specific snapshot (read-only)
    pub fn at_snapshot(&self, id: &SnapshotId) -> Result<UniSnapshot>;
}

pub struct SnapshotInfo {
    pub id: SnapshotId,
    pub name: Option<String>,
    pub created_at: SystemTime,
    pub vertex_count: u64,
    pub edge_count: u64,
    pub size_bytes: u64,
}

/// Read-only view at a snapshot
pub struct UniSnapshot {
    // ...
}

impl UniSnapshot {
    pub async fn query(&self, cypher: &str) -> Result<QueryResult>;
    // No mutation methods
}

Cypher Procedures

-- Create snapshot
CALL db.snapshot.create('before-migration')
YIELD snapshotId, createdAt

-- List snapshots
CALL db.snapshot.list()
YIELD snapshotId, name, createdAt, vertexCount, edgeCount

-- Restore snapshot
CALL db.snapshot.restore($snapshotId)

Tasks

Task Effort
Expose create_snapshot() 0.5 day
Expose list_snapshots() / get_snapshot() 0.5 day
Implement restore_snapshot() 0.5 day
Implement at_snapshot() read-only view 0.5 day
Cypher procedures 0.5 day
Tests 0.5 day

Total: 2-3 days


Implementation Roadmap

Phase 1: Foundation (Week 1-2)

Feature Priority Effort Rationale
Background Compaction P1 5-6 days Required for production use
Manual Compact API P1 (included above) Operational control

Phase 2: Cloud Support (Week 3-4)

Feature Priority Effort Rationale
Hybrid S3/GCS (Phase 2) P1 5-7 days Enables cloud deployment
FTS Queries P2 3-4 days Completes FTS feature

Phase 3: Advanced (Week 5+)

Feature Priority Effort Rationale
Full S3 Support (Phase 3) P3 5-7 days Serverless only
Snapshot Management P3 2-3 days Backup/restore

Total Effort

Phase Features Effort
Phase 1 Background Compaction 5-6 days
Phase 2 Hybrid S3 + FTS 8-11 days
Phase 3 Full S3 + Snapshots 7-10 days
Total All features 20-27 days

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────────────┐
│                                APPLICATION                                    │
│                                                                              │
│   let db = Uni::hybrid("./wal", "s3://bucket/data").build().await?;         │
│                                                                              │
└──────────────────────────────────┬───────────────────────────────────────────┘
┌──────────────────────────────────┼───────────────────────────────────────────┐
│                               UNI                                             │
│                                  │                                            │
│  ┌───────────────────────────────┴────────────────────────────────────────┐  │
│  │                        RUNTIME LAYER                                    │  │
│  │                                                                         │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌──────────────┐   │  │
│  │  │ L0 Buffer   │  │   Writer    │  │ Compaction  │  │  Adjacency   │   │  │
│  │  │ (in-memory) │  │             │  │  Scheduler  │  │    Cache     │   │  │
│  │  └─────────────┘  └──────┬──────┘  └──────┬──────┘  └──────────────┘   │  │
│  │                          │                │                             │  │
│  │                          │    ┌───────────┴───────────┐                 │  │
│  │                          │    │  Background Thread    │                 │  │
│  │                          │    │  - Check every 30s    │                 │  │
│  │                          │    │  - Pick compaction    │                 │  │
│  │                          │    │  - Write throttling   │                 │  │
│  │                          │    └───────────────────────┘                 │  │
│  └──────────────────────────┼──────────────────────────────────────────────┘  │
│                             │                                                  │
│  ┌──────────────────────────┴──────────────────────────────────────────────┐  │
│  │                        STORAGE LAYER                                     │  │
│  │                                                                          │  │
│  │  ┌─────────────────────────────┐  ┌─────────────────────────────────┐   │  │
│  │  │     LOCAL (./wal)           │  │     OBJECT STORE (s3://...)     │   │  │
│  │  │                             │  │                                  │   │  │
│  │  │  ├── WAL (append-only)      │  │  ├── vertices_*/  (Lance)       │   │  │
│  │  │  ├── IdAllocator (atomic)   │  │  ├── edges_*/     (Lance)       │   │  │
│  │  │  └── L0 state (optional)    │  │  ├── adjacency_*/ (Lance)       │   │  │
│  │  │                             │  │  ├── schema.json                │   │  │
│  │  │                             │  │  └── snapshots/                 │   │  │
│  │  └─────────────────────────────┘  └─────────────────────────────────┘   │  │
│  │                                                                          │  │
│  └──────────────────────────────────────────────────────────────────────────┘  │
│                                                                                │
└────────────────────────────────────────────────────────────────────────────────┘

Success Criteria

  1. ✅ Background compaction runs automatically based on configurable policies
  2. ✅ Write throttling prevents L1 from growing unbounded
  3. db.compact() triggers manual compaction
  4. db.compaction_status() returns current state
  5. Uni::hybrid("./wal", "s3://...") works for cloud deployments
  6. WHERE p.bio CONTAINS 'term' uses FTS index automatically
  7. db.create_snapshot() / db.restore_snapshot() work
  8. ✅ All new APIs have tests and documentation

References