Skip to content

Fraud Detection with uni-pydantic

Detecting money laundering rings (3-cycles) and shared device anomalies using Pydantic models.

import os
import shutil
import tempfile

import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship

1. Define Models

Named users with risk scores, devices, and financial transaction edges.

class User(UniNode):
    """A user in the fraud detection system."""

    __label__ = "User"

    name: str
    email: str
    risk_score: float | None = Field(default=None)

    # Relationships
    sent_to: list["User"] = Relationship("SENT_MONEY", direction="outgoing")
    received_from: list["User"] = Relationship("SENT_MONEY", direction="incoming")
    devices: list["Device"] = Relationship("USED_DEVICE", direction="outgoing")


class Device(UniNode):
    """A device used by users."""

    __label__ = "Device"

    device_id: str

    # Relationships
    users: list[User] = Relationship("USED_DEVICE", direction="incoming")


class SentMoney(UniEdge):
    """Edge representing money transfer between users."""

    __edge_type__ = "SENT_MONEY"
    __from__ = User
    __to__ = User

    amount: float


class UsedDevice(UniEdge):
    """Edge representing user-device association."""

    __edge_type__ = "USED_DEVICE"
    __from__ = User
    __to__ = Device

2. Setup Database and Session

db_path = os.path.join(tempfile.gettempdir(), "fraud_pydantic_db")
if os.path.exists(db_path):
    shutil.rmtree(db_path)
db = uni_db.Uni.open(db_path)

# Create session and register models
session = UniSession(db)
session.register(User, Device, SentMoney, UsedDevice)
session.sync_schema()

print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_pydantic_db

3. Create Data

5 named users, 3 devices, a money ring, and suspicious cross-device links.

# 5 users: 3 in a ring, 2 high-risk fraudsters
alice = User(name="Alice", email="alice@example.com", risk_score=0.10)
bob = User(name="Bob", email="bob@example.com", risk_score=0.15)
carlos = User(name="Carlos", email="carlos@example.com", risk_score=0.20)
dana = User(name="Dana", email="dana@example.com", risk_score=0.92)
eve = User(name="Eve", email="eve@example.com", risk_score=0.88)

# 3 devices
device_a = Device(device_id="device_A")
device_b = Device(device_id="device_B")
device_c = Device(device_id="device_C")

session.add_all([alice, bob, carlos, dana, eve, device_a, device_b, device_c])
session.commit()
print("Users and devices created")
Users and devices created
# Money ring: Alice -> Bob -> Carlos -> Alice
session.create_edge(alice, "SENT_MONEY", bob, {"amount": 9500.0})
session.create_edge(bob, "SENT_MONEY", carlos, {"amount": 9000.0})
session.create_edge(carlos, "SENT_MONEY", alice, {"amount": 8750.0})
session.create_edge(dana, "SENT_MONEY", eve, {"amount": 15000.0})  # Suspicious

# Device sharing: Alice+Dana on device_A, Bob+Eve on device_B, Carlos alone on device_C
session.create_edge(alice, "USED_DEVICE", device_a)
session.create_edge(dana, "USED_DEVICE", device_a)
session.create_edge(bob, "USED_DEVICE", device_b)
session.create_edge(eve, "USED_DEVICE", device_b)
session.create_edge(carlos, "USED_DEVICE", device_c)

session.commit()
print("Edges created")
Edges created

4. Ring Detection

Find 3-cycles in the money transfer graph. Deduplication: a._vid < b._vid AND a._vid < c._vid prevents each ring appearing 3 times.

query_ring = """
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
           COUNT(*) AS rings
"""
results = session.cypher(query_ring)
print("Money laundering rings detected:")
for r in results:
    print(
        f"  Ring: {r['user_a']} | {r['user_b']} | {r['user_c']} ({r['rings']} ring(s))"
    )
assert len(results) == 1, f"Expected 1 ring, got {len(results)}"
Money laundering rings detected:
  Ring: Alice | Bob | Carlos (1 ring(s))

5. Ring with Transfer Amounts

Same pattern, but also retrieve edge properties to show total cycled money.

query_amounts = """
    MATCH (a:User)-[r1:SENT_MONEY]->(b:User)-[r2:SENT_MONEY]->(c:User)-[r3:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
           r1.amount AS leg1, r2.amount AS leg2, r3.amount AS leg3,
           r1.amount + r2.amount + r3.amount AS total_cycled
"""
results = session.cypher(query_amounts)
for r in results:
    print(f"Ring: {r['user_a']} -> {r['user_b']} -> {r['user_c']} -> {r['user_a']}")
    print(f"  Leg amounts: ${r['leg1']:.0f}, ${r['leg2']:.0f}, ${r['leg3']:.0f}")
    print(f"  Total cycled: ${r['total_cycled']:,.0f}")
Ring: Alice -> Bob -> Carlos -> Alice
  Leg amounts: $9500, $9000, $8750
  Total cycled: $27,250

6. Shared Device Risk

Find users who share a device with a high-risk user (risk > 0.8). Carlos should NOT appear — he only uses device_C alone.

query_shared = """
    MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
    WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
    RETURN u.name AS user, d.device_id AS device, fraudster.name AS flagged_contact
    ORDER BY user
"""
results = session.cypher(query_shared)
print("Users sharing device with high-risk account:")
for r in results:
    print(f"  {r['user']} shares {r['device']} with {r['flagged_contact']}")

names = [r["user"] for r in results]
assert "Carlos" not in names, f"Carlos should not appear, got {names}"
Users sharing device with high-risk account:
  Alice shares device_A with Dana
  Bob shares device_B with Eve

7. Combined Alert: Ring + Device Sharing

Users appearing in BOTH a money ring AND sharing a device with a fraudster are the highest-priority targets.

# Ring members
ring_query = """
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN a.name AS n UNION
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN b.name AS n UNION
    MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
    WHERE a._vid < b._vid AND a._vid < c._vid
    RETURN c.name AS n
"""
ring_members = {r["n"] for r in session.cypher(ring_query)}

# Device-sharing users
device_query = """
    MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
    WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
    RETURN u.name AS n
"""
device_risk = {r["n"] for r in session.cypher(device_query)}

combined = ring_members & device_risk
print(f"Ring members: {sorted(ring_members)}")
print(f"Device-sharing users: {sorted(device_risk)}")
print(f"HIGH PRIORITY (both signals): {sorted(combined)}")
assert "Alice" in combined, f"Alice should be in combined alert, got {combined}"
Ring members: ['Alice', 'Bob', 'Carlos']
Device-sharing users: ['Alice', 'Bob']
HIGH PRIORITY (both signals): ['Alice', 'Bob']

8. Query Builder Demo

Using the type-safe query builder to find high-risk users.

# Find all high-risk users using the query builder
high_risk_users = session.query(User).filter(User.risk_score >= 0.5).all()

print(f"High-risk users found: {len(high_risk_users)}")
for user in high_risk_users:
    print(f"  {user.name} (risk={user.risk_score})")
High-risk users found: 2
  Dana (risk=0.92)
  Eve (risk=0.88)