Fraud Detection with uni-pydantic¶
Detecting money laundering rings (cycles) and shared device anomalies using Pydantic models.
In [1]:
Copied!
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
1. Define Models¶
Using Pydantic models to define our graph schema with type safety.
In [2]:
Copied!
class User(UniNode):
"""A user in the fraud detection system."""
__label__ = "User"
risk_score: float | None = Field(default=None)
# Relationships
sent_to: list["User"] = Relationship("SENT_MONEY", direction="outgoing")
received_from: list["User"] = Relationship("SENT_MONEY", direction="incoming")
devices: list["Device"] = Relationship("USED_DEVICE", direction="outgoing")
class Device(UniNode):
"""A device used by users."""
__label__ = "Device"
# Relationships
users: list[User] = Relationship("USED_DEVICE", direction="incoming")
class SentMoney(UniEdge):
"""Edge representing money transfer between users."""
__edge_type__ = "SENT_MONEY"
__from__ = User
__to__ = User
amount: float
class UsedDevice(UniEdge):
"""Edge representing user-device association."""
__edge_type__ = "USED_DEVICE"
__from__ = User
__to__ = Device
class User(UniNode):
"""A user in the fraud detection system."""
__label__ = "User"
risk_score: float | None = Field(default=None)
# Relationships
sent_to: list["User"] = Relationship("SENT_MONEY", direction="outgoing")
received_from: list["User"] = Relationship("SENT_MONEY", direction="incoming")
devices: list["Device"] = Relationship("USED_DEVICE", direction="outgoing")
class Device(UniNode):
"""A device used by users."""
__label__ = "Device"
# Relationships
users: list[User] = Relationship("USED_DEVICE", direction="incoming")
class SentMoney(UniEdge):
"""Edge representing money transfer between users."""
__edge_type__ = "SENT_MONEY"
__from__ = User
__to__ = User
amount: float
class UsedDevice(UniEdge):
"""Edge representing user-device association."""
__edge_type__ = "USED_DEVICE"
__from__ = User
__to__ = Device
2. Setup Database and Session¶
In [3]:
Copied!
db_path = os.path.join(tempfile.gettempdir(), "fraud_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(User, Device, SentMoney, UsedDevice)
session.sync_schema()
print(f"Opened database at {db_path}")
db_path = os.path.join(tempfile.gettempdir(), "fraud_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(User, Device, SentMoney, UsedDevice)
session.sync_schema()
print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_pydantic_db
3. Create Data¶
Creating a cycle A->B->C->A and a shared device scenario using Pydantic models.
In [4]:
Copied!
# Create users with type-safe models
user_a = User(risk_score=0.1)
user_b = User(risk_score=0.2)
user_c = User(risk_score=0.3)
fraudster = User(risk_score=0.9) # High risk user
# Create device
device = Device()
# Add all nodes to session
session.add_all([user_a, user_b, user_c, fraudster, device])
session.commit()
print(f"Created users: A(vid={user_a.vid}), B(vid={user_b.vid}), C(vid={user_c.vid}), Fraudster(vid={fraudster.vid})")
print(f"Created device: vid={device.vid}")
# Create users with type-safe models
user_a = User(risk_score=0.1)
user_b = User(risk_score=0.2)
user_c = User(risk_score=0.3)
fraudster = User(risk_score=0.9) # High risk user
# Create device
device = Device()
# Add all nodes to session
session.add_all([user_a, user_b, user_c, fraudster, device])
session.commit()
print(f"Created users: A(vid={user_a.vid}), B(vid={user_b.vid}), C(vid={user_c.vid}), Fraudster(vid={fraudster.vid})")
print(f"Created device: vid={device.vid}")
Created users: A(vid=0), B(vid=1), C(vid=2), Fraudster(vid=3) Created device: vid=4
In [5]:
Copied!
# Create money transfer cycle: A -> B -> C -> A
session.create_edge(user_a, "SENT_MONEY", user_b, {"amount": 5000.0})
session.create_edge(user_b, "SENT_MONEY", user_c, {"amount": 5000.0})
session.create_edge(user_c, "SENT_MONEY", user_a, {"amount": 5000.0})
# Create shared device scenario: User A and Fraudster share device
session.create_edge(user_a, "USED_DEVICE", device)
session.create_edge(fraudster, "USED_DEVICE", device)
session.commit()
print("Created money transfer cycle and shared device relationships")
# Create money transfer cycle: A -> B -> C -> A
session.create_edge(user_a, "SENT_MONEY", user_b, {"amount": 5000.0})
session.create_edge(user_b, "SENT_MONEY", user_c, {"amount": 5000.0})
session.create_edge(user_c, "SENT_MONEY", user_a, {"amount": 5000.0})
# Create shared device scenario: User A and Fraudster share device
session.create_edge(user_a, "USED_DEVICE", device)
session.create_edge(fraudster, "USED_DEVICE", device)
session.commit()
print("Created money transfer cycle and shared device relationships")
Created money transfer cycle and shared device relationships
4. Cycle Detection¶
Identifying circular money flow using Cypher queries.
In [6]:
Copied!
# Detect cycles using raw Cypher
query_cycle = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
RETURN count(*) as count
"""
results = session.cypher(query_cycle)
print(f"Cycles detected: {results[0]['count']}")
# Detect cycles using raw Cypher
query_cycle = """
MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a)
RETURN count(*) as count
"""
results = session.cypher(query_cycle)
print(f"Cycles detected: {results[0]['count']}")
Cycles detected: 3
5. Shared Device Analysis¶
Identifying users who share devices with high-risk users.
In [7]:
Copied!
# Find users sharing devices with fraudsters
query_shared = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u._vid as uid, u.risk_score as risk_score
"""
results = session.cypher(query_shared)
print("Users sharing device with fraudster:")
for r in results:
print(f" User vid={r['uid']}, risk_score={r['risk_score']}")
# Find users sharing devices with fraudsters
query_shared = """
MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User)
WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid
RETURN u._vid as uid, u.risk_score as risk_score
"""
results = session.cypher(query_shared)
print("Users sharing device with fraudster:")
for r in results:
print(f" User vid={r['uid']}, risk_score={r['risk_score']}")
Users sharing device with fraudster: User vid=0, risk_score=0.1
6. Query Builder Demo¶
Using the type-safe query builder to find high-risk users.
In [8]:
Copied!
# Find all high-risk users using the query builder
high_risk_users = (
session.query(User)
.filter(User.risk_score >= 0.5)
.all()
)
print(f"High-risk users found: {len(high_risk_users)}")
for user in high_risk_users:
print(f" User vid={user.vid}, risk_score={user.risk_score}")
# Find all high-risk users using the query builder
high_risk_users = (
session.query(User)
.filter(User.risk_score >= 0.5)
.all()
)
print(f"High-risk users found: {len(high_risk_users)}")
for user in high_risk_users:
print(f" User vid={user.vid}, risk_score={user.risk_score}")
High-risk users found: 1 User vid=3, risk_score=0.9
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Error during planning: UDF 'properties' is not registered. Register it via SessionContext.