Skip to content

Fraud Detection

Detecting money laundering rings (3-cycles) and shared device anomalies using graph pattern matching.

import os
import shutil
import tempfile

import uni_db
db_path = os.path.join(tempfile.gettempdir(), "fraud_db")
if os.path.exists(db_path):
    shutil.rmtree(db_path)
db = uni_db.Uni.open(db_path)
session = db.session()
print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_db

1. Schema

(
    db.schema()
    .label("User")
    .property("name", "string")
    .property("email", "string")
    .property_nullable("risk_score", "float32")
    .done()
    .label("Device")
    .property("device_id", "string")
    .done()
    .edge_type("SENT_MONEY", ["User"], ["User"])
    .property("amount", "float64")
    .done()
    .edge_type("USED_DEVICE", ["User"], ["Device"])
    .done()
    .apply()
)

print("Schema created")
Schema created

2. Ingestion

5 named users, 3 devices, a money ring, and suspicious cross-device links.

tx = session.tx()
with tx.bulk_writer().build() as bw:
    # 5 users: 3 in a ring, 2 high-risk fraudsters
    u_vids = bw.insert_vertices(
        "User",
        [
            {"name": "Alice", "email": "alice@example.com", "risk_score": 0.10},
            {"name": "Bob", "email": "bob@example.com", "risk_score": 0.15},
            {"name": "Carlos", "email": "carlos@example.com", "risk_score": 0.20},
            {"name": "Dana", "email": "dana@example.com", "risk_score": 0.92},
            {"name": "Eve", "email": "eve@example.com", "risk_score": 0.88},
        ],
    )
    alice, bob, carlos, dana, eve = u_vids

    # 3 devices
    d_vids = bw.insert_vertices(
        "Device",
        [
            {"device_id": "device_A"},
            {"device_id": "device_B"},
            {"device_id": "device_C"},
        ],
    )
    device_a, device_b, device_c = d_vids

    # Money ring: Alice -> Bob -> Carlos -> Alice
    bw.insert_edges(
        "SENT_MONEY",
        [
            (alice, bob, {"amount": 9500.0}),
            (bob, carlos, {"amount": 9000.0}),
            (carlos, alice, {"amount": 8750.0}),
            (dana, eve, {"amount": 15000.0}),  # Suspicious transfer
        ],
    )

    # Device sharing: Alice+Dana on device_A, Bob+Eve on device_B, Carlos alone on device_C
    bw.insert_edges(
        "USED_DEVICE",
        [
            (alice, device_a, {}),
            (dana, device_a, {}),
            (bob, device_b, {}),
            (eve, device_b, {}),
            (carlos, device_c, {}),
        ],
    )

    bw.commit()
tx.commit()
print("Data ingested")
Data ingested

3. Ring Detection

Find 3-cycles in the money transfer graph. Deduplication: a._vid < b._vid AND a._vid < c._vid prevents each ring appearing 3 times (once per starting node).

query_ring = """
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
           COUNT(*) AS rings
"""
results = session.query(query_ring)
print("Money laundering rings detected:")
for r in results:
    print(
        f"  Ring: {r['user_a']} | {r['user_b']} | {r['user_c']} ({r['rings']} ring(s))"
    )
assert len(results) == 1, f"Expected 1 ring, got {len(results)}"
Money laundering rings detected:
  Ring: Alice | Bob | Carlos (1 ring(s))

4. Ring with Transfer Amounts

Same pattern, but also retrieve edge properties to show total cycled money.

query_amounts = """
    MATCH (a:User)-[r1:SENT_MONEY]->(b:User)-[r2:SENT_MONEY]->(c:User)-[r3:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
           r1.amount AS leg1, r2.amount AS leg2, r3.amount AS leg3,
           r1.amount + r2.amount + r3.amount AS total_cycled
"""
results = session.query(query_amounts)
for r in results:
    print(f"Ring: {r['user_a']} -> {r['user_b']} -> {r['user_c']} -> {r['user_a']}")
    print(f"  Leg amounts: ${r['leg1']:.0f}, ${r['leg2']:.0f}, ${r['leg3']:.0f}")
    print(f"  Total cycled: ${r['total_cycled']:,.0f}")
Ring: Alice -> Bob -> Carlos -> Alice
  Leg amounts: $9500, $9000, $8750
  Total cycled: $27,250

5. Shared Device Risk

Find users who share a device with a high-risk user (risk > 0.8). Carlos should NOT appear — he only uses device_C alone.

query_shared = """
    MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
    WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
    RETURN u.name AS user, d.device_id AS device, fraudster.name AS flagged_contact
    ORDER BY user
"""
results = session.query(query_shared)
print("Users sharing device with high-risk account:")
for r in results:
    print(f"  {r['user']} shares {r['device']} with {r['flagged_contact']}")

names = [r["user"] for r in results]
assert "Carlos" not in names, f"Carlos should not appear, got {names}"
Users sharing device with high-risk account:
  Alice shares device_A with Dana
  Bob shares device_B with Eve

6. Combined Alert: Ring + Device Sharing

Users appearing in BOTH a money ring AND sharing a device with a fraudster are the highest-priority investigation targets.

# Ring members
ring_query = """
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS n UNION
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN b.name AS n UNION
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN c.name AS n
"""
ring_members = {r["n"] for r in session.query(ring_query)}

# Device-sharing users
device_query = """
    MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
    WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
    RETURN u.name AS n
"""
device_risk = {r["n"] for r in session.query(device_query)}

combined = ring_members & device_risk
print(f"Ring members: {sorted(ring_members)}")
print(f"Device-sharing users: {sorted(device_risk)}")
print(f"HIGH PRIORITY (both signals): {sorted(combined)}")
assert "Alice" in combined, f"Alice should be in combined alert, got {combined}"
Ring members: ['Alice', 'Bob', 'Carlos']
Device-sharing users: ['Alice', 'Bob']
HIGH PRIORITY (both signals): ['Alice', 'Bob']