Fraud Detection with uni-pydantic¶
Detecting money laundering rings (3-cycles) and shared device anomalies using Pydantic models.
In [1]:
Copied!
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
1. Define Models¶
Named users with risk scores, devices, and financial transaction edges.
In [2]:
Copied!
class User(UniNode):
"""A user in the fraud detection system."""
__label__ = "User"
name: str
email: str
risk_score: float | None = Field(default=None)
# Relationships
sent_to: list["User"] = Relationship("SENT_MONEY", direction="outgoing")
received_from: list["User"] = Relationship("SENT_MONEY", direction="incoming")
devices: list["Device"] = Relationship("USED_DEVICE", direction="outgoing")
class Device(UniNode):
"""A device used by users."""
__label__ = "Device"
device_id: str
# Relationships
users: list[User] = Relationship("USED_DEVICE", direction="incoming")
class SentMoney(UniEdge):
"""Edge representing money transfer between users."""
__edge_type__ = "SENT_MONEY"
__from__ = User
__to__ = User
amount: float
class UsedDevice(UniEdge):
"""Edge representing user-device association."""
__edge_type__ = "USED_DEVICE"
__from__ = User
__to__ = Device
class User(UniNode):
"""A user in the fraud detection system."""
__label__ = "User"
name: str
email: str
risk_score: float | None = Field(default=None)
# Relationships
sent_to: list["User"] = Relationship("SENT_MONEY", direction="outgoing")
received_from: list["User"] = Relationship("SENT_MONEY", direction="incoming")
devices: list["Device"] = Relationship("USED_DEVICE", direction="outgoing")
class Device(UniNode):
"""A device used by users."""
__label__ = "Device"
device_id: str
# Relationships
users: list[User] = Relationship("USED_DEVICE", direction="incoming")
class SentMoney(UniEdge):
"""Edge representing money transfer between users."""
__edge_type__ = "SENT_MONEY"
__from__ = User
__to__ = User
amount: float
class UsedDevice(UniEdge):
"""Edge representing user-device association."""
__edge_type__ = "USED_DEVICE"
__from__ = User
__to__ = Device
2. Setup Database and Session¶
In [3]:
Copied!
db_path = os.path.join(tempfile.gettempdir(), "fraud_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(User, Device, SentMoney, UsedDevice)
session.sync_schema()
print(f"Opened database at {db_path}")
db_path = os.path.join(tempfile.gettempdir(), "fraud_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(User, Device, SentMoney, UsedDevice)
session.sync_schema()
print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_pydantic_db
3. Create Data¶
5 named users, 3 devices, a money ring, and suspicious cross-device links.
In [4]:
Copied!
# 5 users: 3 in a ring, 2 high-risk fraudsters
alice = User(name="Alice", email="alice@example.com", risk_score=0.10)
bob = User(name="Bob", email="bob@example.com", risk_score=0.15)
carlos = User(name="Carlos", email="carlos@example.com", risk_score=0.20)
dana = User(name="Dana", email="dana@example.com", risk_score=0.92)
eve = User(name="Eve", email="eve@example.com", risk_score=0.88)
# 3 devices
device_a = Device(device_id="device_A")
device_b = Device(device_id="device_B")
device_c = Device(device_id="device_C")
session.add_all([alice, bob, carlos, dana, eve, device_a, device_b, device_c])
session.commit()
print("Users and devices created")
# 5 users: 3 in a ring, 2 high-risk fraudsters
alice = User(name="Alice", email="alice@example.com", risk_score=0.10)
bob = User(name="Bob", email="bob@example.com", risk_score=0.15)
carlos = User(name="Carlos", email="carlos@example.com", risk_score=0.20)
dana = User(name="Dana", email="dana@example.com", risk_score=0.92)
eve = User(name="Eve", email="eve@example.com", risk_score=0.88)
# 3 devices
device_a = Device(device_id="device_A")
device_b = Device(device_id="device_B")
device_c = Device(device_id="device_C")
session.add_all([alice, bob, carlos, dana, eve, device_a, device_b, device_c])
session.commit()
print("Users and devices created")
Users and devices created
In [5]:
Copied!
# Money ring: Alice -> Bob -> Carlos -> Alice
session.create_edge(alice, "SENT_MONEY", bob, {"amount": 9500.0})
session.create_edge(bob, "SENT_MONEY", carlos, {"amount": 9000.0})
session.create_edge(carlos, "SENT_MONEY", alice, {"amount": 8750.0})
session.create_edge(dana, "SENT_MONEY", eve, {"amount": 15000.0}) # Suspicious
# Device sharing: Alice+Dana on device_A, Bob+Eve on device_B, Carlos alone on device_C
session.create_edge(alice, "USED_DEVICE", device_a)
session.create_edge(dana, "USED_DEVICE", device_a)
session.create_edge(bob, "USED_DEVICE", device_b)
session.create_edge(eve, "USED_DEVICE", device_b)
session.create_edge(carlos, "USED_DEVICE", device_c)
session.commit()
print("Edges created")
# Money ring: Alice -> Bob -> Carlos -> Alice
session.create_edge(alice, "SENT_MONEY", bob, {"amount": 9500.0})
session.create_edge(bob, "SENT_MONEY", carlos, {"amount": 9000.0})
session.create_edge(carlos, "SENT_MONEY", alice, {"amount": 8750.0})
session.create_edge(dana, "SENT_MONEY", eve, {"amount": 15000.0}) # Suspicious
# Device sharing: Alice+Dana on device_A, Bob+Eve on device_B, Carlos alone on device_C
session.create_edge(alice, "USED_DEVICE", device_a)
session.create_edge(dana, "USED_DEVICE", device_a)
session.create_edge(bob, "USED_DEVICE", device_b)
session.create_edge(eve, "USED_DEVICE", device_b)
session.create_edge(carlos, "USED_DEVICE", device_c)
session.commit()
print("Edges created")
Edges created
4. Ring Detection¶
Find 3-cycles in the money transfer graph. Deduplication: a._vid < b._vid AND a._vid < c._vid prevents each ring appearing 3 times.
In [6]:
Copied!
query_ring = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
COUNT(*) AS rings
"""
results = session.cypher(query_ring)
print('Money laundering rings detected:')
for r in results:
print(f" Ring: {r['user_a']} | {r['user_b']} | {r['user_c']} ({r['rings']} ring(s))")
assert len(results) == 1, f'Expected 1 ring, got {len(results)}'
query_ring = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
COUNT(*) AS rings
"""
results = session.cypher(query_ring)
print('Money laundering rings detected:')
for r in results:
print(f" Ring: {r['user_a']} | {r['user_b']} | {r['user_c']} ({r['rings']} ring(s))")
assert len(results) == 1, f'Expected 1 ring, got {len(results)}'
Money laundering rings detected: Ring: Alice | Bob | Carlos (1 ring(s))
5. Ring with Transfer Amounts¶
Same pattern, but also retrieve edge properties to show total cycled money.
In [7]:
Copied!
query_amounts = """
MATCH (a:User)-[r1:SENT_MONEY]->(b:User)-[r2:SENT_MONEY]->(c:User)-[r3:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
r1.amount AS leg1, r2.amount AS leg2, r3.amount AS leg3,
r1.amount + r2.amount + r3.amount AS total_cycled
"""
results = session.cypher(query_amounts)
for r in results:
print(f"Ring: {r['user_a']} -> {r['user_b']} -> {r['user_c']} -> {r['user_a']}")
print(f" Leg amounts: ${r['leg1']:.0f}, ${r['leg2']:.0f}, ${r['leg3']:.0f}")
print(f" Total cycled: ${r['total_cycled']:,.0f}")
query_amounts = """
MATCH (a:User)-[r1:SENT_MONEY]->(b:User)-[r2:SENT_MONEY]->(c:User)-[r3:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS user_a, b.name AS user_b, c.name AS user_c,
r1.amount AS leg1, r2.amount AS leg2, r3.amount AS leg3,
r1.amount + r2.amount + r3.amount AS total_cycled
"""
results = session.cypher(query_amounts)
for r in results:
print(f"Ring: {r['user_a']} -> {r['user_b']} -> {r['user_c']} -> {r['user_a']}")
print(f" Leg amounts: ${r['leg1']:.0f}, ${r['leg2']:.0f}, ${r['leg3']:.0f}")
print(f" Total cycled: ${r['total_cycled']:,.0f}")
Ring: Alice -> Bob -> Carlos -> Alice Leg amounts: $9500, $9000, $8750 Total cycled: $27,250
6. Shared Device Risk¶
Find users who share a device with a high-risk user (risk > 0.8). Carlos should NOT appear — he only uses device_C alone.
In [8]:
Copied!
query_shared = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS user, d.device_id AS device, fraudster.name AS flagged_contact
ORDER BY user
"""
results = session.cypher(query_shared)
print('Users sharing device with high-risk account:')
for r in results:
print(f" {r['user']} shares {r['device']} with {r['flagged_contact']}")
names = [r['user'] for r in results]
assert 'Carlos' not in names, f'Carlos should not appear, got {names}'
query_shared = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS user, d.device_id AS device, fraudster.name AS flagged_contact
ORDER BY user
"""
results = session.cypher(query_shared)
print('Users sharing device with high-risk account:')
for r in results:
print(f" {r['user']} shares {r['device']} with {r['flagged_contact']}")
names = [r['user'] for r in results]
assert 'Carlos' not in names, f'Carlos should not appear, got {names}'
Users sharing device with high-risk account: Alice shares device_A with Dana Bob shares device_B with Eve
7. Combined Alert: Ring + Device Sharing¶
Users appearing in BOTH a money ring AND sharing a device with a fraudster are the highest-priority targets.
In [9]:
Copied!
# Ring members
ring_query = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN b.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN c.name AS n
"""
ring_members = {r['n'] for r in session.cypher(ring_query)}
# Device-sharing users
device_query = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS n
"""
device_risk = {r['n'] for r in session.cypher(device_query)}
combined = ring_members & device_risk
print(f'Ring members: {sorted(ring_members)}')
print(f'Device-sharing users: {sorted(device_risk)}')
print(f'HIGH PRIORITY (both signals): {sorted(combined)}')
assert 'Alice' in combined, f'Alice should be in combined alert, got {combined}'
# Ring members
ring_query = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN a.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN b.name AS n UNION
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
WHERE a._vid < b._vid AND a._vid < c._vid
RETURN c.name AS n
"""
ring_members = {r['n'] for r in session.cypher(ring_query)}
# Device-sharing users
device_query = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u.name AS n
"""
device_risk = {r['n'] for r in session.cypher(device_query)}
combined = ring_members & device_risk
print(f'Ring members: {sorted(ring_members)}')
print(f'Device-sharing users: {sorted(device_risk)}')
print(f'HIGH PRIORITY (both signals): {sorted(combined)}')
assert 'Alice' in combined, f'Alice should be in combined alert, got {combined}'
Ring members: ['Alice', 'Bob', 'Carlos'] Device-sharing users: ['Alice', 'Bob'] HIGH PRIORITY (both signals): ['Alice', 'Bob']
8. Query Builder Demo¶
Using the type-safe query builder to find high-risk users.
In [10]:
Copied!
# Find all high-risk users using the query builder
high_risk_users = (
session.query(User)
.filter(User.risk_score >= 0.5)
.all()
)
print(f"High-risk users found: {len(high_risk_users)}")
for user in high_risk_users:
print(f" {user.name} (risk={user.risk_score})")
# Find all high-risk users using the query builder
high_risk_users = (
session.query(User)
.filter(User.risk_score >= 0.5)
.all()
)
print(f"High-risk users found: {len(high_risk_users)}")
for user in high_risk_users:
print(f" {user.name} (risk={user.risk_score})")
High-risk users found: 2 Dana (risk=0.92) Eve (risk=0.88)