Fraud Detection¶
Detecting money laundering rings (3-cycles) and shared device anomalies using graph pattern matching.
db_path = os.path.join(tempfile.gettempdir(), "fraud_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Uni.open(db_path)
session = db.session()
print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_db
1. Schema¶
(
db.schema()
.label("User")
.property("name", "string")
.property("email", "string")
.property_nullable("risk_score", "float32")
.done()
.label("Device")
.property("device_id", "string")
.done()
.edge_type("SENT_MONEY", ["User"], ["User"])
.property("amount", "float64")
.done()
.edge_type("USED_DEVICE", ["User"], ["Device"])
.done()
.apply()
)
print("Schema created")
Schema created
2. Ingestion¶
5 named users, 3 devices, a money ring, and suspicious cross-device links.
tx = session.tx()
with tx.bulk_writer().build() as bw:
# 5 users: 3 in a ring, 2 high-risk fraudsters
u_vids = bw.insert_vertices(
"User",
[
{"name": "Alice", "email": "alice@example.com", "risk_score": 0.10},
{"name": "Bob", "email": "bob@example.com", "risk_score": 0.15},
{"name": "Carlos", "email": "carlos@example.com", "risk_score": 0.20},
{"name": "Dana", "email": "dana@example.com", "risk_score": 0.92},
{"name": "Eve", "email": "eve@example.com", "risk_score": 0.88},
],
)
alice, bob, carlos, dana, eve = u_vids
# 3 devices
d_vids = bw.insert_vertices(
"Device",
[
{"device_id": "device_A"},
{"device_id": "device_B"},
{"device_id": "device_C"},
],
)
device_a, device_b, device_c = d_vids
# Money ring: Alice -> Bob -> Carlos -> Alice
bw.insert_edges(
"SENT_MONEY",
[
(alice, bob, {"amount": 9500.0}),
(bob, carlos, {"amount": 9000.0}),
(carlos, alice, {"amount": 8750.0}),
(dana, eve, {"amount": 15000.0}), # Suspicious transfer
],
)
# Device sharing: Alice+Dana on device_A, Bob+Eve on device_B, Carlos alone on device_C
bw.insert_edges(
"USED_DEVICE",
[
(alice, device_a, {}),
(dana, device_a, {}),
(bob, device_b, {}),
(eve, device_b, {}),
(carlos, device_c, {}),
],
)
bw.commit()
tx.commit()
print("Data ingested")
Data ingested
3. Ring Detection¶
Find 3-cycles in the money transfer graph.
Deduplication: a._vid < b._vid AND a._vid < c._vid prevents each ring
appearing 3 times (once per starting node).
query_ring = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
COUNT(*) AS rings
"""
results = session.query(query_ring)
print("Money laundering rings detected:")
for r in results:
print(
f" Ring: {r['user_a']} | {r['user_b']} | {r['user_c']} ({r['rings']} ring(s))"
)
assert len(results) == 1, f"Expected 1 ring, got {len(results)}"
Money laundering rings detected:
Ring: Alice | Bob | Carlos (1 ring(s))
4. Ring with Transfer Amounts¶
Same pattern, but also retrieve edge properties to show total cycled money.
query_amounts = """
MATCH (a:User)-[r1:SENT_MONEY]->(b:User)-[r2:SENT_MONEY]->(c:User)-[r3:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
r1.amount AS leg1, r2.amount AS leg2, r3.amount AS leg3,
r1.amount + r2.amount + r3.amount AS total_cycled
"""
results = session.query(query_amounts)
for r in results:
print(f"Ring: {r['user_a']} -> {r['user_b']} -> {r['user_c']} -> {r['user_a']}")
print(f" Leg amounts: ${r['leg1']:.0f}, ${r['leg2']:.0f}, ${r['leg3']:.0f}")
print(f" Total cycled: ${r['total_cycled']:,.0f}")
Ring: Alice -> Bob -> Carlos -> Alice
Leg amounts: $9500, $9000, $8750
Total cycled: $27,250
5. Shared Device Risk¶
Find users who share a device with a high-risk user (risk > 0.8). Carlos should NOT appear — he only uses device_C alone.
query_shared = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS user, d.device_id AS device, fraudster.name AS flagged_contact
ORDER BY user
"""
results = session.query(query_shared)
print("Users sharing device with high-risk account:")
for r in results:
print(f" {r['user']} shares {r['device']} with {r['flagged_contact']}")
names = [r["user"] for r in results]
assert "Carlos" not in names, f"Carlos should not appear, got {names}"
Users sharing device with high-risk account:
Alice shares device_A with Dana
Bob shares device_B with Eve
6. Combined Alert: Ring + Device Sharing¶
Users appearing in BOTH a money ring AND sharing a device with a fraudster are the highest-priority investigation targets.
# Ring members
ring_query = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN b.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN c.name AS n
"""
ring_members = {r["n"] for r in session.query(ring_query)}
# Device-sharing users
device_query = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS n
"""
device_risk = {r["n"] for r in session.query(device_query)}
combined = ring_members & device_risk
print(f"Ring members: {sorted(ring_members)}")
print(f"Device-sharing users: {sorted(device_risk)}")
print(f"HIGH PRIORITY (both signals): {sorted(combined)}")
assert "Alice" in combined, f"Alice should be in combined alert, got {combined}"
Ring members: ['Alice', 'Bob', 'Carlos']
Device-sharing users: ['Alice', 'Bob']
HIGH PRIORITY (both signals): ['Alice', 'Bob']