Fraud Detection with Uni (Rust)¶
Detecting money laundering rings (cycles) and shared device anomalies using Uni's native Rust API.
In [ ]:
Copied!
:dep uni-db = { path = "../../../crates/uni" }
:dep tokio = { version = "1", features = ["full"] }
:dep serde_json = "1"
:dep uni-db = { path = "../../../crates/uni" }
:dep tokio = { version = "1", features = ["full"] }
:dep serde_json = "1"
In [ ]:
Copied!
use uni::{Uni, DataType, IndexType, ScalarType, VectorMetric, VectorAlgo, VectorIndexCfg};
use std::collections::HashMap;
use serde_json::json;
// Helper macro to run async code in evcxr
macro_rules! run {
($e:expr) => {
tokio::runtime::Runtime::new().unwrap().block_on($e)
};
}
use uni::{Uni, DataType, IndexType, ScalarType, VectorMetric, VectorAlgo, VectorIndexCfg};
use std::collections::HashMap;
use serde_json::json;
// Helper macro to run async code in evcxr
macro_rules! run {
($e:expr) => {
tokio::runtime::Runtime::new().unwrap().block_on($e)
};
}
In [ ]:
Copied!
let db_path = "./fraud_db";
// Clean up any existing database
if std::path::Path::new(db_path).exists() {
std::fs::remove_dir_all(db_path).unwrap();
}
let db = run!(Uni::open(db_path).build()).unwrap();
println!("Opened database at {}", db_path);
let db_path = "./fraud_db";
// Clean up any existing database
if std::path::Path::new(db_path).exists() {
std::fs::remove_dir_all(db_path).unwrap();
}
let db = run!(Uni::open(db_path).build()).unwrap();
println!("Opened database at {}", db_path);
1. Schema¶
In [ ]:
Copied!
run!(async {
db.schema()
.label("User")
.property_nullable("risk_score", DataType::Float32)
.label("Device")
.edge_type("SENT_MONEY", &["User"], &["User"])
.property("amount", DataType::Float64)
.edge_type("USED_DEVICE", &["User"], &["Device"])
.apply()
.await
}).unwrap();
println!("Fraud detection schema created");
run!(async {
db.schema()
.label("User")
.property_nullable("risk_score", DataType::Float32)
.label("Device")
.edge_type("SENT_MONEY", &["User"], &["User"])
.property("amount", DataType::Float64)
.edge_type("USED_DEVICE", &["User"], &["Device"])
.apply()
.await
}).unwrap();
println!("Fraud detection schema created");
2. Ingestion¶
Creating a cycle A->B->C->A and a shared device scenario.
In [ ]:
Copied!
// Users with risk scores
let users = vec![
HashMap::from([("risk_score".to_string(), json!(0.1))]), // A
HashMap::from([("risk_score".to_string(), json!(0.2))]), // B
HashMap::from([("risk_score".to_string(), json!(0.3))]), // C
HashMap::from([("risk_score".to_string(), json!(0.9))]), // D (Fraudster)
];
let user_vids = run!(db.bulk_insert_vertices("User", users)).unwrap();
let (ua, ub, uc, ud) = (user_vids[0], user_vids[1], user_vids[2], user_vids[3]);
// Device
let devices = vec![HashMap::new()];
let device_vids = run!(db.bulk_insert_vertices("Device", devices)).unwrap();
let d1 = device_vids[0];
// Money transfer cycle: A -> B -> C -> A
run!(db.bulk_insert_edges("SENT_MONEY", vec![
(ua, ub, HashMap::from([("amount".to_string(), json!(5000.0))])),
(ub, uc, HashMap::from([("amount".to_string(), json!(5000.0))])),
(uc, ua, HashMap::from([("amount".to_string(), json!(5000.0))])),
])).unwrap();
// Shared device: User A and Fraudster D share device
run!(db.bulk_insert_edges("USED_DEVICE", vec![
(ua, d1, HashMap::new()),
(ud, d1, HashMap::new()),
])).unwrap();
run!(db.flush()).unwrap();
println!("Fraud data ingested");
// Users with risk scores
let users = vec![
HashMap::from([("risk_score".to_string(), json!(0.1))]), // A
HashMap::from([("risk_score".to_string(), json!(0.2))]), // B
HashMap::from([("risk_score".to_string(), json!(0.3))]), // C
HashMap::from([("risk_score".to_string(), json!(0.9))]), // D (Fraudster)
];
let user_vids = run!(db.bulk_insert_vertices("User", users)).unwrap();
let (ua, ub, uc, ud) = (user_vids[0], user_vids[1], user_vids[2], user_vids[3]);
// Device
let devices = vec![HashMap::new()];
let device_vids = run!(db.bulk_insert_vertices("Device", devices)).unwrap();
let d1 = device_vids[0];
// Money transfer cycle: A -> B -> C -> A
run!(db.bulk_insert_edges("SENT_MONEY", vec![
(ua, ub, HashMap::from([("amount".to_string(), json!(5000.0))])),
(ub, uc, HashMap::from([("amount".to_string(), json!(5000.0))])),
(uc, ua, HashMap::from([("amount".to_string(), json!(5000.0))])),
])).unwrap();
// Shared device: User A and Fraudster D share device
run!(db.bulk_insert_edges("USED_DEVICE", vec![
(ua, d1, HashMap::new()),
(ud, d1, HashMap::new()),
])).unwrap();
run!(db.flush()).unwrap();
println!("Fraud data ingested");
3. Cycle Detection¶
Identifying circular money flow.
In [ ]:
Copied!
let query_cycle = r#"
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
RETURN count(*) as count
"#;
let results = run!(db.query(query_cycle)).unwrap();
println!("Cycles detected: {:?}", results.rows[0]);
let query_cycle = r#"
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
RETURN count(*) as count
"#;
let results = run!(db.query(query_cycle)).unwrap();
println!("Cycles detected: {:?}", results.rows[0]);
4. Shared Device Analysis¶
Identifying users who share devices with high-risk users.
In [ ]:
Copied!
let query_shared = r#"
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u._vid as uid
"#;
let results = run!(db.query(query_shared)).unwrap();
println!("User sharing device with fraudster: {:?}", results.rows[0]);
let query_shared = r#"
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u._vid as uid
"#;
let results = run!(db.query(query_shared)).unwrap();
println!("User sharing device with fraudster: {:?}", results.rows[0]);