API Gap Implementation Plan¶
This document outlines the plan to expose internal Uni features that currently have no public access path (neither Rust API nor Cypher).
Gap Summary¶
| Gap | Current State | Priority | Effort |
|---|---|---|---|
| Background Compaction | Manual only, not exposed | P1 | 5-6 days |
| S3/GCS Storage | std::fs in metadata ops |
P1 | 7-10 days |
| FTS Queries | Index creation only | P2 | 3-4 days |
| Snapshot Management | Internal only | P3 | 2-3 days |
1. Background Compaction¶
Current State¶
- L0 → L1 Flush: ✅ Automatic (triggers at 10K mutations via
check_flush()) - L1 → L2 Compaction: ❌ Manual only, not exposed in public API
- L1 runs accumulate without automatic compaction
- No background thread for compaction
- No write throttling when L1 grows too large
Industry Comparison¶
| Database | Compaction Model |
|---|---|
| RocksDB | Background threads, automatic, write stalling |
| LanceDB Cloud | Automatic background compaction |
| LanceDB OSS | Manual via table.optimize() |
| SQLite | auto_vacuum pragma or manual VACUUM |
| Uni (current) | Manual only, not exposed |
Proposed Solution¶
Phase 1: Background Compaction Thread¶
pub struct CompactionConfig {
/// Enable background compaction (default: true)
pub enabled: bool,
/// Max L1 runs before triggering compaction (default: 4)
pub max_l1_runs: usize,
/// Max L1 size in bytes before compaction (default: 256MB)
pub max_l1_size_bytes: u64,
/// Max age of oldest L1 run before compaction (default: 1 hour)
pub max_l1_age: Duration,
/// Background check interval (default: 30s)
pub check_interval: Duration,
/// Number of compaction worker threads (default: 1)
pub worker_threads: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
enabled: true,
max_l1_runs: 4,
max_l1_size_bytes: 256 * 1024 * 1024,
max_l1_age: Duration::from_secs(3600),
check_interval: Duration::from_secs(30),
worker_threads: 1,
}
}
}
Phase 2: Compaction Scheduler¶
impl Uni {
/// Starts background compaction worker (called internally on build)
fn start_background_compaction(&self) {
if !self.config.compaction.enabled {
return;
}
let uni = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(uni.config.compaction.check_interval);
loop {
interval.tick().await;
if let Some(task) = uni.pick_compaction_task() {
if let Err(e) = uni.execute_compaction(task).await {
log::error!("Compaction failed: {}", e);
}
}
}
});
}
fn pick_compaction_task(&self) -> Option<CompactionTask> {
let status = self.compaction_status();
// Check triggers in priority order
if status.l1_runs >= self.config.compaction.max_l1_runs {
return Some(CompactionTask::ByRunCount);
}
if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
return Some(CompactionTask::BySize);
}
if status.oldest_l1_age >= self.config.compaction.max_l1_age {
return Some(CompactionTask::ByAge);
}
None
}
}
Phase 3: Write Throttling (Backpressure)¶
Like RocksDB, slow down writes when compaction can't keep up:
pub struct WriteThrottleConfig {
/// L1 run count to start throttling (default: 8)
pub soft_limit: usize,
/// L1 run count to stop writes entirely (default: 16)
pub hard_limit: usize,
/// Base delay when throttling (default: 10ms)
pub base_delay: Duration,
}
impl Writer {
async fn check_write_pressure(&self) -> Result<()> {
let l1_runs = self.storage.l1_run_count();
if l1_runs >= self.config.throttle.hard_limit {
// Block until compaction catches up
log::warn!("Write stalled: L1 runs ({}) at hard limit", l1_runs);
self.wait_for_compaction().await?;
} else if l1_runs >= self.config.throttle.soft_limit {
// Progressive delay
let delay = self.calculate_backpressure_delay(l1_runs);
log::debug!("Write throttled: {}ms delay", delay.as_millis());
tokio::time::sleep(delay).await;
}
Ok(())
}
fn calculate_backpressure_delay(&self, l1_runs: usize) -> Duration {
let excess = l1_runs - self.config.throttle.soft_limit;
let multiplier = 2_u32.pow(excess as u32);
self.config.throttle.base_delay * multiplier
}
}
Phase 4: Public API¶
impl Uni {
/// Trigger manual compaction (all labels, all edge types)
pub async fn compact(&self) -> Result<CompactionStats>;
/// Compact specific label
pub async fn compact_label(&self, label: &str) -> Result<CompactionStats>;
/// Compact specific edge type
pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats>;
/// Get compaction status
pub fn compaction_status(&self) -> CompactionStatus;
/// Wait for all pending compactions to complete
pub async fn wait_for_compaction(&self) -> Result<()>;
}
pub struct CompactionStats {
pub files_compacted: usize,
pub bytes_before: u64,
pub bytes_after: u64,
pub duration: Duration,
pub crdt_merges: usize,
}
pub struct CompactionStatus {
pub l1_runs: usize,
pub l1_size_bytes: u64,
pub oldest_l1_age: Duration,
pub compaction_in_progress: bool,
pub compaction_pending: usize,
pub last_compaction: Option<SystemTime>,
pub total_compactions: u64,
pub total_bytes_compacted: u64,
}
Cypher Procedures¶
-- Trigger compaction
CALL db.compact()
YIELD filesCompacted, bytesBefore, bytesAfter, durationMs, crdtMerges
-- Check status
CALL db.compactionStatus()
YIELD l1Runs, l1SizeBytes, compactionInProgress, lastCompaction
Tasks¶
| Task | Effort |
|---|---|
CompactionConfig struct |
0.5 day |
| Background compaction thread | 1 day |
| Compaction task picker/scheduler | 1 day |
| Write throttling/backpressure | 1 day |
Public API (compact(), compaction_status()) |
0.5 day |
| Cypher procedures | 0.5 day |
| Tests | 1 day |
Total: 5-6 days
2. S3/GCS Storage Backend¶
Current State¶
Lance datasets support S3/GCS/Azure natively, but Uni's metadata operations use std::fs:
| Component | Blocking Code | Challenge |
|---|---|---|
UniBuilder.build() |
std::fs::create_dir_all() |
Directory creation |
SchemaManager |
fs::read_to_string(), fs::write() |
File I/O |
SnapshotManager |
fs::create_dir_all(), fs::write() |
File I/O |
WriteAheadLog |
File::open(), append |
Append-only semantics |
IdAllocator |
fs::rename() |
Atomic rename |
Object Store Challenges¶
| Challenge | Local FS | S3/GCS |
|---|---|---|
| Latency | <1ms | 50-200ms |
| Atomic rename | ✅ fs::rename() |
❌ Copy + Delete |
| Cost | Free | Per-operation cost |
| Append | ✅ Native | ❌ Rewrite entire object |
| Concurrent writes | File locks | Requires coordination |
Proposed Solution: Phased Approach¶
Phase 1: Local-Only with Background Compaction (5-6 days)¶
Focus on compaction first (see above). This provides immediate value for all deployments.
Phase 2: Hybrid Architecture (5-7 days)¶
Recommended for most cloud deployments.
┌─────────────────────────────────────────────────────────────┐
│ LOCAL (fast, atomic) │
│ ├── WAL (append-only log, needs low latency) │
│ ├── IdAllocator (atomic counter, needs atomicity) │
│ └── L0 Buffer (in-memory, already local) │
├─────────────────────────────────────────────────────────────┤
│ OBJECT STORE (S3/GCS/Azure) │
│ ├── Lance Datasets (vertices, edges, adjacency) │
│ ├── Schema (versioned JSON, infrequent writes) │
│ └── Snapshots (manifest files, infrequent writes) │
└─────────────────────────────────────────────────────────────┘
Implementation:
pub struct HybridStorageConfig {
/// Local path for WAL and ID allocation
pub local_path: PathBuf,
/// Object store URL for data (s3://bucket/prefix)
pub data_url: String,
/// Object store credentials
pub credentials: Option<ObjectStoreCredentials>,
}
impl UniBuilder {
/// Configure hybrid storage (local WAL + S3 data)
pub fn hybrid(
self,
local_path: impl AsRef<Path>,
data_url: &str,
) -> Self;
}
// Usage
let db = Uni::hybrid("./local-wal", "s3://my-bucket/graphs/prod")
.credentials(ObjectStoreCredentials::from_env())
.build()
.await?;
Tasks:
| Task | Effort |
|---|---|
HybridStorageConfig |
0.5 day |
Refactor SchemaManager to use object_store |
1 day |
Refactor SnapshotManager to use object_store |
1 day |
| Keep WAL and IdAllocator local | 0 days (no change) |
UniBuilder::hybrid() configuration |
0.5 day |
| Lance dataset URL passthrough | 0.5 day |
| Integration tests with LocalStack/MinIO | 1.5 days |
Total Phase 2: 5-7 days
Phase 3: Full Object Store (Optional, 5-7 days)¶
For serverless/ephemeral compute where local storage isn't available.
pub struct FullObjectStoreConfig {
/// Object store URL for everything
pub url: String,
/// Credentials
pub credentials: ObjectStoreCredentials,
/// ID allocation strategy
pub id_strategy: IdAllocationStrategy,
/// WAL strategy
pub wal_strategy: WalStrategy,
}
pub enum IdAllocationStrategy {
/// Use conditional writes with ETag (optimistic locking)
ConditionalWrite { batch_size: u64 },
/// Use external coordination service (DynamoDB, etcd)
External { endpoint: String },
}
pub enum WalStrategy {
/// Buffer locally, flush segments to object store
BufferedSegments { segment_size: usize },
/// Disable WAL (data loss risk on crash)
Disabled,
}
IdAllocator on S3 (conditional writes):
impl ObjectStoreIdAllocator {
async fn allocate_batch(&self) -> Result<Range<u64>> {
loop {
// Read current counter
let (current, etag) = self.read_counter_with_etag().await?;
let new = current + self.batch_size;
// Conditional write (fails if etag changed)
match self.conditional_write(new, &etag).await {
Ok(_) => return Ok(current..new),
Err(PreconditionFailed) => continue, // Retry
}
}
}
}
WAL on S3 (segment files):
impl ObjectStoreWal {
async fn append(&mut self, entry: WalEntry) -> Result<()> {
self.local_buffer.push(entry);
if self.local_buffer.len() >= self.segment_size {
// Flush segment to S3
let segment_name = format!("wal/segment_{:016x}.bin", self.next_segment_id);
self.store.put(&segment_name, self.serialize_buffer()).await?;
self.local_buffer.clear();
self.next_segment_id += 1;
}
Ok(())
}
}
Tasks:
| Task | Effort |
|---|---|
ObjectStoreIdAllocator with conditional writes |
1.5 days |
ObjectStoreWal with segment flushing |
2 days |
| Recovery logic for WAL segments | 1 day |
UniBuilder::object_store() configuration |
0.5 day |
| Integration tests | 1.5 days |
Total Phase 3: 5-7 days
Recommended Approach¶
| Deployment | Recommended Config | Effort |
|---|---|---|
| Local development | Uni::open("./data") |
Already works |
| Cloud (EC2/GKE/AKS) | Uni::hybrid("./wal", "s3://...") |
Phase 2 |
| Serverless (Lambda) | Uni::object_store("s3://...") |
Phase 3 |
Start with Phase 2 (Hybrid) - covers 90% of cloud use cases with less complexity.
3. Full-Text Search Queries¶
Current State¶
- FTS indexes can be created via DDL:
CREATE FULLTEXT INDEX ... FOR (n:Label) ON EACH [n.prop] - Index stored using Lance's
InvertedIndex - No query support: Parser lacks
CONTAINS,STARTS WITH,ENDS WITH - No procedure: No
db.idx.fts.query()exists
Proposed Solution¶
Option A: Cypher String Predicates (Recommended)¶
Add string predicates to the Cypher parser and predicate pushdown:
-- Target syntax
MATCH (p:Person)
WHERE p.bio CONTAINS 'machine learning'
RETURN p
MATCH (p:Person)
WHERE p.name STARTS WITH 'John'
RETURN p
MATCH (p:Person)
WHERE p.email ENDS WITH '@example.com'
RETURN p
Implementation:
-
Add to
Operatorenum inast.rs: -
Add parser support in
parser.rs -
Add to predicate pushdown in
pushdown.rs: -
Lance will automatically use inverted index if available
Option B: Explicit FTS Procedure¶
Add a procedure similar to vector search:
CALL db.idx.fts.query('Person', 'bio', 'machine learning', 10)
YIELD node, score
RETURN node.name, score
ORDER BY score DESC
Recommendation¶
Start with Option A (string predicates) - simpler, more intuitive, follows Cypher standard.
Tasks¶
| Task | Effort |
|---|---|
Add Contains/StartsWith/EndsWith to Operator enum |
0.5 day |
| Parser support for string predicates | 1 day |
| Predicate pushdown to Lance | 1 day |
| Unit tests | 0.5 day |
| Integration tests with FTS index | 1 day |
Total: 3-4 days
4. Snapshot Management¶
Current State¶
SnapshotManagerexists internally- Creates snapshots automatically on flush
- Snapshots stored as JSON manifests
- No public API to create/restore/list snapshots
- Feature-gated behind
snapshot-internals
Proposed Solution¶
Rust API¶
impl Uni {
/// Create a named snapshot
pub async fn create_snapshot(&self, name: &str) -> Result<SnapshotId>;
/// List all snapshots
pub async fn list_snapshots(&self) -> Result<Vec<SnapshotInfo>>;
/// Get snapshot details
pub async fn get_snapshot(&self, id: &SnapshotId) -> Result<SnapshotInfo>;
/// Restore to a snapshot (creates new database state)
pub async fn restore_snapshot(&self, id: &SnapshotId) -> Result<()>;
/// Delete a snapshot
pub async fn delete_snapshot(&self, id: &SnapshotId) -> Result<()>;
/// Open database at specific snapshot (read-only)
pub fn at_snapshot(&self, id: &SnapshotId) -> Result<UniSnapshot>;
}
pub struct SnapshotInfo {
pub id: SnapshotId,
pub name: Option<String>,
pub created_at: SystemTime,
pub vertex_count: u64,
pub edge_count: u64,
pub size_bytes: u64,
}
/// Read-only view at a snapshot
pub struct UniSnapshot {
// ...
}
impl UniSnapshot {
pub async fn query(&self, cypher: &str) -> Result<QueryResult>;
// No mutation methods
}
Cypher Procedures¶
-- Create snapshot
CALL db.snapshot.create('before-migration')
YIELD snapshotId, createdAt
-- List snapshots
CALL db.snapshot.list()
YIELD snapshotId, name, createdAt, vertexCount, edgeCount
-- Restore snapshot
CALL db.snapshot.restore($snapshotId)
Tasks¶
| Task | Effort |
|---|---|
Expose create_snapshot() |
0.5 day |
Expose list_snapshots() / get_snapshot() |
0.5 day |
Implement restore_snapshot() |
0.5 day |
Implement at_snapshot() read-only view |
0.5 day |
| Cypher procedures | 0.5 day |
| Tests | 0.5 day |
Total: 2-3 days
Implementation Roadmap¶
Phase 1: Foundation (Week 1-2)¶
| Feature | Priority | Effort | Rationale |
|---|---|---|---|
| Background Compaction | P1 | 5-6 days | Required for production use |
| Manual Compact API | P1 | (included above) | Operational control |
Phase 2: Cloud Support (Week 3-4)¶
| Feature | Priority | Effort | Rationale |
|---|---|---|---|
| Hybrid S3/GCS (Phase 2) | P1 | 5-7 days | Enables cloud deployment |
| FTS Queries | P2 | 3-4 days | Completes FTS feature |
Phase 3: Advanced (Week 5+)¶
| Feature | Priority | Effort | Rationale |
|---|---|---|---|
| Full S3 Support (Phase 3) | P3 | 5-7 days | Serverless only |
| Snapshot Management | P3 | 2-3 days | Backup/restore |
Total Effort¶
| Phase | Features | Effort |
|---|---|---|
| Phase 1 | Background Compaction | 5-6 days |
| Phase 2 | Hybrid S3 + FTS | 8-11 days |
| Phase 3 | Full S3 + Snapshots | 7-10 days |
| Total | All features | 20-27 days |
Architecture Diagram¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ APPLICATION │
│ │
│ let db = Uni::hybrid("./wal", "s3://bucket/data").build().await?; │
│ │
└──────────────────────────────────┬───────────────────────────────────────────┘
│
┌──────────────────────────────────┼───────────────────────────────────────────┐
│ UNI │
│ │ │
│ ┌───────────────────────────────┴────────────────────────────────────────┐ │
│ │ RUNTIME LAYER │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ L0 Buffer │ │ Writer │ │ Compaction │ │ Adjacency │ │ │
│ │ │ (in-memory) │ │ │ │ Scheduler │ │ Cache │ │ │
│ │ └─────────────┘ └──────┬──────┘ └──────┬──────┘ └──────────────┘ │ │
│ │ │ │ │ │
│ │ │ ┌───────────┴───────────┐ │ │
│ │ │ │ Background Thread │ │ │
│ │ │ │ - Check every 30s │ │ │
│ │ │ │ - Pick compaction │ │ │
│ │ │ │ - Write throttling │ │ │
│ │ │ └───────────────────────┘ │ │
│ └──────────────────────────┼──────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┴──────────────────────────────────────────────┐ │
│ │ STORAGE LAYER │ │
│ │ │ │
│ │ ┌─────────────────────────────┐ ┌─────────────────────────────────┐ │ │
│ │ │ LOCAL (./wal) │ │ OBJECT STORE (s3://...) │ │ │
│ │ │ │ │ │ │ │
│ │ │ ├── WAL (append-only) │ │ ├── vertices_*/ (Lance) │ │ │
│ │ │ ├── IdAllocator (atomic) │ │ ├── edges_*/ (Lance) │ │ │
│ │ │ └── L0 state (optional) │ │ ├── adjacency_*/ (Lance) │ │ │
│ │ │ │ │ ├── schema.json │ │ │
│ │ │ │ │ └── snapshots/ │ │ │
│ │ └─────────────────────────────┘ └─────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Success Criteria¶
- ✅ Background compaction runs automatically based on configurable policies
- ✅ Write throttling prevents L1 from growing unbounded
- ✅
db.compact()triggers manual compaction - ✅
db.compaction_status()returns current state - ✅
Uni::hybrid("./wal", "s3://...")works for cloud deployments - ✅
WHERE p.bio CONTAINS 'term'uses FTS index automatically - ✅
db.create_snapshot()/db.restore_snapshot()work - ✅ All new APIs have tests and documentation