Fraud Detection¶
Detecting money laundering rings (cycles) and shared device anomalies.
In [1]:
Copied!
import os
import shutil
import tempfile
import uni_db
import os
import shutil
import tempfile
import uni_db
In [2]:
Copied!
db_path = os.path.join(tempfile.gettempdir(), "fraud_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
print(f"Opened database at {db_path}")
db_path = os.path.join(tempfile.gettempdir(), "fraud_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
print(f"Opened database at {db_path}")
Opened database at /tmp/fraud_db
1. Schema¶
In [3]:
Copied!
db.create_label("User")
db.create_label("Device")
db.create_edge_type("SENT_MONEY", ["User"], ["User"])
db.create_edge_type("USED_DEVICE", ["User"], ["Device"])
db.add_property("SENT_MONEY", "amount", "float64", False)
db.add_property("User", "risk_score", "float32", True)
db.create_label("User")
db.create_label("Device")
db.create_edge_type("SENT_MONEY", ["User"], ["User"])
db.create_edge_type("USED_DEVICE", ["User"], ["Device"])
db.add_property("SENT_MONEY", "amount", "float64", False)
db.add_property("User", "risk_score", "float32", True)
2. Ingestion¶
Creating a cycle A->B->C->A and a shared device scenario.
In [4]:
Copied!
u_vids = db.bulk_insert_vertices('User', [
{'risk_score': 0.1}, # A
{'risk_score': 0.2}, # B
{'risk_score': 0.3}, # C
{'risk_score': 0.9} # D (Fraudster)
])
ua, ub, uc, ud = u_vids
d_vids = db.bulk_insert_vertices('Device', [{}])
d1 = d_vids[0]
db.bulk_insert_edges('SENT_MONEY', [
(ua, ub, {'amount': 5000.0}),
(ub, uc, {'amount': 5000.0}),
(uc, ua, {'amount': 5000.0})
])
db.bulk_insert_edges('USED_DEVICE', [(ua, d1, {}), (ud, d1, {})])
db.flush()
u_vids = db.bulk_insert_vertices('User', [
{'risk_score': 0.1}, # A
{'risk_score': 0.2}, # B
{'risk_score': 0.3}, # C
{'risk_score': 0.9} # D (Fraudster)
])
ua, ub, uc, ud = u_vids
d_vids = db.bulk_insert_vertices('Device', [{}])
d1 = d_vids[0]
db.bulk_insert_edges('SENT_MONEY', [
(ua, ub, {'amount': 5000.0}),
(ub, uc, {'amount': 5000.0}),
(uc, ua, {'amount': 5000.0})
])
db.bulk_insert_edges('USED_DEVICE', [(ua, d1, {}), (ud, d1, {})])
db.flush()
3. Cycle Detection¶
Identifying circular money flow.
In [5]:
Copied!
query_cycle = "MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a) RETURN count(*) as count"
results = db.query(query_cycle)
print(f"Cycles detected: {results[0]['count']}")
query_cycle = "MATCH (a:User)-[:SENT_MONEY]->(b:User)-[:SENT_MONEY]->(c:User)-[:SENT_MONEY]->(a) RETURN count(*) as count"
results = db.query(query_cycle)
print(f"Cycles detected: {results[0]['count']}")
Cycles detected: 3
4. Shared Device Analysis¶
Identifying users who share devices with high-risk users.
In [6]:
Copied!
query_shared = "MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User) WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid RETURN u._vid as uid"
results = db.query(query_shared)
print(f"User sharing device with fraudster: {results[0]['uid']}")
query_shared = "MATCH (u:User)-[:USED_DEVICE]->(d:Device)<-[:USED_DEVICE]-(fraudster:User) WHERE fraudster.risk_score > 0.8 AND u._vid <> fraudster._vid RETURN u._vid as uid"
results = db.query(query_shared)
print(f"User sharing device with fraudster: {results[0]['uid']}")
User sharing device with fraudster: 0