Locy Case Study: Regulatory Change Impact Analysis¶
This case study demonstrates probabilistic graph reasoning for regulatory compliance impact analysis — tracing how regulatory changes propagate through obligation → control → process → system → vendor dependency chains.
Key Locy features demonstrated:
- FOLD MNOR — multi-regulation risk aggregation per system
- similar_to — semantic matching of obligation text to control descriptions
- IS NOT — identify obligations without adequate control coverage
- ASSUME SET — counterfactual vendor upgrade simulation
- ABDUCE — minimal control implementations to close compliance gaps
- EXPLAIN RULE — full audit trail from regulation to impacted system
How To Read This Notebook¶
- Each section explains what code is doing and what output you should expect.
- The dataset is deterministic for stable docs/CI execution.
- Follow the flow: load facts → derive compliance gaps → propagate risk → aggregate per system → explain and optimize.
What this doesdescribes intent;What to expectdescribes output shape.
1) Setup & Data Discovery¶
What this does: Loads helpers, locates prepared regulatory data files, and creates an isolated temporary database.
What to expect:
Printed DATA_DIR and DB_DIR paths.
from pathlib import Path
from pprint import pprint
import csv
import json
import os
import shutil
import tempfile
import uni_db
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open('r', encoding='utf-8', newline='') as f:
return list(csv.DictReader(f))
def _esc(value: str) -> str:
return str(value).replace('\\', '\\\\').replace("'", "\\'")
def _f(value: str) -> float:
return float(value) if value not in ('', None) else 0.0
def _to_int(value: str) -> int:
return int(float(value)) if value not in ('', None) else 0
def _vec(value: str) -> list[float]:
return [float(x) for x in json.loads(value)]
def _norm_key(key: object) -> str:
s = str(key)
if s.startswith('Variable("') and s.endswith('")'):
return s[len('Variable("'):-2]
return s
def _norm_rows(rows: list[dict[object, object]]) -> list[dict[str, object]]:
return [{_norm_key(k): v for k, v in row.items()} for row in rows]
_default_candidates = [
Path('docs/examples/data/locy_regulatory_impact'),
Path('website/docs/examples/data/locy_regulatory_impact'),
Path('examples/data/locy_regulatory_impact'),
Path('../data/locy_regulatory_impact'),
]
if 'LOCY_DATA_DIR' in os.environ:
DATA_DIR = Path(os.environ['LOCY_DATA_DIR']).resolve()
else:
DATA_DIR = next(
(p.resolve() for p in _default_candidates if (p / 'regulations.csv').exists()),
_default_candidates[0].resolve(),
)
if not (DATA_DIR / 'regulations.csv').exists():
raise FileNotFoundError(
'Expected data under docs/examples/data/locy_regulatory_impact. '
'Run from website/ (or repo root) or set LOCY_DATA_DIR.'
)
DB_DIR = tempfile.mkdtemp(prefix='uni_locy_reg_')
db = uni_db.Uni.open(DB_DIR)
session = db.session()
print('DATA_DIR:', DATA_DIR)
print('DB_DIR:', DB_DIR)
DATA_DIR: /home/runner/work/uni-db/uni-db/website/docs/examples/data/locy_regulatory_impact
DB_DIR: /tmp/uni_locy_reg_u3mr0c0v
2) Load Data & Build Focus Cohort¶
What this does:
Loads all 14 CSV files and identifies the 3 focus systems from notebook_cases.csv.
What to expect: Counts for each entity type and the set of focus system IDs.
regulations = _read_csv(DATA_DIR / 'regulations.csv')
obligations = _read_csv(DATA_DIR / 'obligations.csv')
controls = _read_csv(DATA_DIR / 'controls.csv')
processes = _read_csv(DATA_DIR / 'processes.csv')
systems = _read_csv(DATA_DIR / 'systems.csv')
vendors = _read_csv(DATA_DIR / 'vendors.csv')
contracts = _read_csv(DATA_DIR / 'contracts.csv')
requires = _read_csv(DATA_DIR / 'requires.csv')
satisfied_by = _read_csv(DATA_DIR / 'satisfied_by.csv')
protects = _read_csv(DATA_DIR / 'protects.csv')
runs_on = _read_csv(DATA_DIR / 'runs_on.csv')
operated_by = _read_csv(DATA_DIR / 'operated_by.csv')
governed_by = _read_csv(DATA_DIR / 'governed_by.csv')
notebook_cases = _read_csv(DATA_DIR / 'notebook_cases.csv')
focus_sys_ids = {r['sys_id'] for r in notebook_cases}
print('regulations:', len(regulations))
print('obligations:', len(obligations))
print('controls:', len(controls))
print('processes:', len(processes))
print('systems:', len(systems))
print('vendors:', len(vendors))
print('focus systems:', sorted(focus_sys_ids))
regulations: 5
obligations: 20
controls: 25
processes: 10
systems: 8
vendors: 6
focus systems: ['SYS-01', 'SYS-02', 'SYS-03']
3) Define Schema¶
What this does: Defines explicit labels, typed properties, vector fields, and edge types before ingest.
What to expect:
A single Schema created confirmation.
(
db.schema()
.label('Regulation')
.property('reg_id', 'string')
.property('name', 'string')
.property('jurisdiction', 'string')
.property('effective_date', 'string')
.property('penalty_factor', 'float64')
.done()
.label('Obligation')
.property('obl_id', 'string')
.property('text', 'string')
.property('category', 'string')
.property('severity', 'string')
.property('weight', 'float64')
.vector('embedding', 4)
.done()
.label('Control')
.property('ctrl_id', 'string')
.property('name', 'string')
.property('nist_family', 'string')
.property('status', 'string')
.property('effectiveness', 'float64')
.vector('embedding', 4)
.done()
.label('Process')
.property('proc_id', 'string')
.property('name', 'string')
.property('department', 'string')
.property('criticality', 'float64')
.done()
.label('System')
.property('sys_id', 'string')
.property('name', 'string')
.property('env', 'string')
.property('tier', 'int64')
.done()
.label('Vendor')
.property('vendor_id', 'string')
.property('name', 'string')
.property('soc2', 'bool')
.property('risk_rating', 'float64')
.done()
.label('Contract')
.property('contract_id', 'string')
.property('renewal_date', 'string')
.property('annual_value', 'float64')
.done()
.edge_type('REQUIRES', ['Regulation'], ['Obligation']).property('priority', 'string').done()
.edge_type('SATISFIED_BY', ['Obligation'], ['Control']).property('coverage', 'float64').done()
.edge_type('PROTECTS', ['Control'], ['Process']).property('relevance', 'float64').done()
.edge_type('RUNS_ON', ['Process'], ['System']).property('dependency', 'float64').done()
.edge_type('OPERATED_BY', ['System'], ['Vendor']).property('criticality', 'float64').done()
.edge_type('GOVERNED_BY', ['Vendor'], ['Contract']).done()
.apply()
)
print('Schema created')
Schema created
4) Ingest Graph Facts¶
What this does: Creates all nodes (Regulations, Obligations, Controls, Processes, Systems, Vendors, Contracts) and all edges (REQUIRES, SATISFIED_BY, PROTECTS, RUNS_ON, OPERATED_BY, GOVERNED_BY) from the full dataset.
What to expect: Graph counts for each node and edge type.
tx = session.tx()
# --- Nodes ---
for row in regulations:
tx.execute(
f"CREATE (:Regulation {{reg_id: '{_esc(row['reg_id'])}', name: '{_esc(row['name'])}', "
f"jurisdiction: '{_esc(row['jurisdiction'])}', effective_date: '{_esc(row['effective_date'])}', "
f"penalty_factor: {_f(row['penalty_factor'])}}})"
)
for row in obligations:
tx.execute(
f"CREATE (:Obligation {{obl_id: '{_esc(row['obl_id'])}', text: '{_esc(row['text'])}', "
f"category: '{_esc(row['category'])}', severity: '{_esc(row['severity'])}', "
f"weight: {_f(row['weight'])}, embedding: {_vec(row['embedding'])}}})"
)
for row in controls:
tx.execute(
f"CREATE (:Control {{ctrl_id: '{_esc(row['ctrl_id'])}', name: '{_esc(row['name'])}', "
f"nist_family: '{_esc(row['nist_family'])}', status: '{_esc(row['status'])}', "
f"effectiveness: {_f(row['effectiveness'])}, embedding: {_vec(row['embedding'])}}})"
)
for row in processes:
tx.execute(
f"CREATE (:Process {{proc_id: '{_esc(row['proc_id'])}', name: '{_esc(row['name'])}', "
f"department: '{_esc(row['department'])}', criticality: {_f(row['criticality'])}}})"
)
for row in systems:
tx.execute(
f"CREATE (:System {{sys_id: '{_esc(row['sys_id'])}', name: '{_esc(row['name'])}', "
f"env: '{_esc(row['env'])}', tier: {_to_int(row['tier'])}}})"
)
for row in vendors:
tx.execute(
f"CREATE (:Vendor {{vendor_id: '{_esc(row['vendor_id'])}', name: '{_esc(row['name'])}', "
f"soc2: {str(bool(int(row['soc2']))).lower()}, risk_rating: {_f(row['risk_rating'])}}})"
)
for row in contracts:
tx.execute(
f"CREATE (:Contract {{contract_id: '{_esc(row['contract_id'])}', "
f"renewal_date: '{_esc(row['renewal_date'])}', annual_value: {_f(row['annual_value'])}}})"
)
# --- Edges ---
for row in requires:
tx.execute(
f"MATCH (r:Regulation {{reg_id: '{_esc(row['reg_id'])}'}}), "
f"(o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}) "
f"CREATE (r)-[:REQUIRES {{priority: '{_esc(row['priority'])}'}}]->(o)"
)
for row in satisfied_by:
tx.execute(
f"MATCH (o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}), "
f"(c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}) "
f"CREATE (o)-[:SATISFIED_BY {{coverage: {_f(row['coverage'])}}}]->(c)"
)
for row in protects:
tx.execute(
f"MATCH (c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}), "
f"(p:Process {{proc_id: '{_esc(row['proc_id'])}'}}) "
f"CREATE (c)-[:PROTECTS {{relevance: {_f(row['relevance'])}}}]->(p)"
)
for row in runs_on:
tx.execute(
f"MATCH (p:Process {{proc_id: '{_esc(row['proc_id'])}'}}), "
f"(s:System {{sys_id: '{_esc(row['sys_id'])}'}}) "
f"CREATE (p)-[:RUNS_ON {{dependency: {_f(row['dependency'])}}}]->(s)"
)
for row in operated_by:
tx.execute(
f"MATCH (s:System {{sys_id: '{_esc(row['sys_id'])}'}}), "
f"(v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}) "
f"CREATE (s)-[:OPERATED_BY {{criticality: {_f(row['criticality'])}}}]->(v)"
)
for row in governed_by:
tx.execute(
f"MATCH (v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}), "
f"(c:Contract {{contract_id: '{_esc(row['contract_id'])}'}}) "
f"CREATE (v)-[:GOVERNED_BY]->(c)"
)
tx.commit()
# --- Verification ---
counts = session.query("""
MATCH (r:Regulation) WITH count(*) AS regulations
MATCH (o:Obligation) WITH regulations, count(*) AS obligations
MATCH (c:Control) WITH regulations, obligations, count(*) AS controls
MATCH (p:Process) WITH regulations, obligations, controls, count(*) AS processes
MATCH (s:System) WITH regulations, obligations, controls, processes, count(*) AS systems
MATCH (v:Vendor) WITH regulations, obligations, controls, processes, systems, count(*) AS vendors
MATCH (ct:Contract)
RETURN regulations, obligations, controls, processes, systems, vendors, count(ct) AS contracts
""")
print('Graph counts:')
pprint(counts[0])
Graph counts:
Row(regulations=..., obligations=..., controls=..., processes=..., systems=..., vendors=..., contracts=...)
5) Baseline Locy Program¶
What this does:
Defines a multi-rule Locy program that:
1. weak_control — identifies controls that are gaps or have low effectiveness
2. system_exposure — traces the full regulation→obligation→control→process→system chain,
joining on weak controls, and aggregates multi-regulation risk per system using FOLD MNOR
3. vendor_risk — rolls up system exposure to vendors
4. semantic_match — uses similar_to to find obligation-to-control semantic fits
5. has_fit — identifies obligations with a good control match (fit >= 0.7)
6. unmatched_obligation — uses IS NOT to find obligations without adequate coverage
What to expect: - Weak control rows with severity scores - System exposure rows with MNOR-aggregated risk in [0, 1] - Vendor risk rollup rows - Semantic match rows with fit scores >= 0.6 - Unmatched obligation rows for obligations without good control coverage
program = r'''
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
WHERE sys IS system_exposure
FOLD v_risk = MNOR(aggregate_risk * op.criticality)
YIELD KEY v, v_risk
// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
MATCH (o:Obligation), (c:Control)
YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit
// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
MATCH (o:Obligation)
WHERE o IS semantic_match TO c, fit >= 0.7
YIELD KEY o
// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
MATCH (o:Obligation)
WHERE o IS NOT has_fit
YIELD KEY o
// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
'''
print(program)
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
WHERE sys IS system_exposure
FOLD v_risk = MNOR(aggregate_risk * op.criticality)
YIELD KEY v, v_risk
// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
MATCH (o:Obligation), (c:Control)
YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit
// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
MATCH (o:Obligation)
WHERE o IS semantic_match TO c, fit >= 0.7
YIELD KEY o
// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
MATCH (o:Obligation)
WHERE o IS NOT has_fit
YIELD KEY o
// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
baseline_out = session.locy_with(program).with_config({'max_iterations': 400, 'timeout_secs': 180.0}).run()
stats = baseline_out.stats
print('Iterations:', stats.total_iterations)
print('Strata:', stats.strata_evaluated)
print('Queries executed:', stats.queries_executed)
gap_rows = []
exposure_rows = []
vendor_rows = []
semantic_rows = []
unmatched_rows = []
for i, cmd in enumerate(baseline_out.command_results, start=1):
print(f'\nCommand #{i}:', cmd.command_type)
if cmd.command_type in ('query', 'cypher'):
rows = _norm_rows(cmd.rows)
print('rows:', len(rows))
pprint(rows[:5])
if rows and 'severity' in rows[0]:
gap_rows = rows
elif rows and 'aggregate_risk' in rows[0]:
exposure_rows = rows
elif rows and 'v_risk' in rows[0]:
vendor_rows = rows
elif rows and 'fit' in rows[0]:
semantic_rows = rows
elif rows and 'obligation_id' in rows[0]:
unmatched_rows = rows
# Verify MNOR bounds
for row in exposure_rows:
r = float(row['aggregate_risk'])
assert 0.0 <= r <= 1.0, f"MNOR score out of range: {r}"
print(f'\nAll {len(exposure_rows)} system exposure scores in [0, 1] \u2713')
Iterations: 0
Strata: 6
Queries executed: 10
Command #1: query
rows: 12
[{'control': 'Threat-led penetration testing (TLPT)',
'severity': 0.95,
'status': 'gap'},
{'control': 'Digital operational resilience dashboard',
'severity': 0.95,
'status': 'gap'},
{'control': 'ICT supply chain mapping', 'severity': 0.92, 'status': 'gap'},
{'control': 'ICT resilience testing', 'severity': 0.9, 'status': 'gap'},
{'control': 'Automated compliance reporting',
'severity': 0.88,
'status': 'gap'}]
Command #2: query
rows: 5
[{'aggregate_risk': 0.9951514424879684, 'system': 'ERP Core'},
{'aggregate_risk': 0.987753477375, 'system': 'Trading Platform'},
{'aggregate_risk': 0.843252896431, 'system': 'CRM System'},
{'aggregate_risk': 0.7695, 'system': 'Data Warehouse'},
{'aggregate_risk': 0.7695, 'system': 'Disaster Recovery'}]
Command #3: query
rows: 4
[{'v_risk': 0.9321668139832101, 'vendor': 'DataVault Solutions'},
{'v_risk': 0.8956362982391716, 'vendor': 'CloudOps Inc'},
{'v_risk': 0.5902770275017, 'vendor': 'NetSecure Systems'},
{'v_risk': 0.500175, 'vendor': 'OffshoreIT Services'}]
Command #4: query
rows: 10
[{'control': 'Data classification',
'fit': 0.9999273431022875,
'obligation': 'Data protection impact assessment'},
{'control': 'Multi-factor authentication',
'fit': 0.999893226979695,
'obligation': 'ICT third-party risk management'},
{'control': 'ICT resilience testing',
'fit': 0.999805307142468,
'obligation': 'Digital operational resilience testing'},
{'control': 'Threat-led penetration testing (TLPT)',
'fit': 0.999805307142468,
'obligation': 'Threat-led penetration testing'},
{'control': 'DPO governance framework',
'fit': 0.9998006197156357,
'obligation': 'Data protection officer appointment'}]
Command #5: query
rows: 0
[]
All 5 system exposure scores in [0, 1] ✓
6) EXPLAIN RULE¶
What this does: Shows the full derivation tree behind the system_exposure score for ERP Core, tracing back through the weak_control join to the original compliance gaps.
What to expect: A tree with rule names, clause indices, and child derivations showing the regulation → obligation → control → process → system chain.
program_explain = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
EXPLAIN RULE system_exposure WHERE sys.name = 'ERP Core'
'''
explain_out = session.locy_with(program_explain).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
explain_cmd = next(cmd for cmd in explain_out.command_results if cmd.command_type == 'explain')
tree = explain_cmd.tree
def _print_tree(node, depth=0, max_depth=4, max_children=4):
indent = ' ' * depth
print(f"{indent}- rule={node.get('rule')}, clause={node.get('clause_index')}, bindings={node.get('bindings', {})}")
if depth >= max_depth:
return
children = node.get('children', [])
for child in children[:max_children]:
_print_tree(child, depth + 1, max_depth=max_depth, max_children=max_children)
if len(children) > max_children:
print(f"{indent} ... {len(children) - max_children} more child derivations")
print('Explain tree for ERP Core system_exposure:')
_print_tree(tree)
Explain tree for ERP Core system_exposure:
- rule=system_exposure, clause=0, bindings={}
- rule=system_exposure, clause=0, bindings={'o': Node(id=8, labels=["Obligation"], properties={'text': 'ICT third-party risk management', 'embedding': [0.30000001192092896, 0.20000000298023224, -0.5, 0.75], 'weight': 0.9, 'obl_id': 'OBL-DORA-04', 'category': 'third_party', 'severity': 'high'}), 'c': Node(id=33, labels=["Control"], properties={'effectiveness': 0.45, 'status': 'partial', 'ctrl_id': 'CTRL-09', 'nist_family': 'SA', 'embedding': [0.3199999928474426, 0.2199999988079071, -0.47999998927116394, 0.7200000286102295], 'name': 'Third-party risk assessment'}), 'aggregate_risk': None, 'proc': Node(id=51, labels=["Process"], properties={'criticality': 0.9, 'name': 'Customer onboarding (KYC)', 'department': 'compliance', 'proc_id': 'PROC-02'}), 'sys': Node(id=60, labels=["System"], properties={'env': 'prod', 'name': 'ERP Core', 'sys_id': 'SYS-01', 'tier': 1}), 'r': Node(id=2, labels=["Regulation"], properties={'overflow_json': None, 'effective_date': '2025-01-17', 'penalty_factor': 0.9, 'name': 'DORA', 'reg_id': 'REG-DORA', 'jurisdiction': 'EU'}), 'severity': 0.55}
- rule=weak_control, clause=0, bindings={'c': Node(id=33, labels=["Control"], properties={'nist_family': 'SA', 'overflow_json': None, 'name': 'Third-party risk assessment', 'ctrl_id': 'CTRL-09', 'embedding': [0.32, 0.22, -0.48, 0.72], 'effectiveness': 0.45, 'status': 'partial'}), 'severity': 0.55}
- rule=system_exposure, clause=0, bindings={'aggregate_risk': None, 'proc': Node(id=52, labels=["Process"], properties={'department': 'trading', 'criticality': 0.95, 'proc_id': 'PROC-03', 'name': 'Trade execution'}), 'severity': 0.95, 'o': Node(id=12, labels=["Obligation"], properties={'text': 'Threat-led penetration testing', 'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25], 'severity': 'high', 'obl_id': 'OBL-DORA-08', 'category': 'testing', 'weight': 0.9}), 'c': Node(id=44, labels=["Control"], properties={'ctrl_id': 'CTRL-20', 'status': 'gap', 'name': 'Threat-led penetration testing (TLPT)', 'effectiveness': 0.05, 'nist_family': 'CA', 'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513]}), 'r': Node(id=2, labels=["Regulation"], properties={'effective_date': '2025-01-17', 'overflow_json': None, 'jurisdiction': 'EU', 'reg_id': 'REG-DORA', 'penalty_factor': 0.9, 'name': 'DORA'}), 'sys': Node(id=60, labels=["System"], properties={'tier': 1, 'sys_id': 'SYS-01', 'name': 'ERP Core', 'env': 'prod'})}
- rule=weak_control, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'name': 'Threat-led penetration testing (TLPT)', 'status': 'gap', 'effectiveness': 0.05, 'ctrl_id': 'CTRL-20', 'embedding': [0.78, 0.08, 0.73, -0.23], 'nist_family': 'CA', 'overflow_json': None}), 'severity': 0.95}
- rule=system_exposure, clause=0, bindings={'aggregate_risk': None, 'sys': Node(id=60, labels=["System"], properties={'tier': 1, 'sys_id': 'SYS-01', 'env': 'prod', 'name': 'ERP Core'}), 'r': Node(id=2, labels=["Regulation"], properties={'penalty_factor': 0.9, 'jurisdiction': 'EU', 'reg_id': 'REG-DORA', 'overflow_json': None, 'effective_date': '2025-01-17', 'name': 'DORA'}), 'c': Node(id=44, labels=["Control"], properties={'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513], 'ctrl_id': 'CTRL-20', 'name': 'Threat-led penetration testing (TLPT)', 'status': 'gap', 'nist_family': 'CA', 'effectiveness': 0.05}), 'severity': 0.95, 'proc': Node(id=52, labels=["Process"], properties={'name': 'Trade execution', 'department': 'trading', 'proc_id': 'PROC-03', 'criticality': 0.95}), 'o': Node(id=12, labels=["Obligation"], properties={'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25], 'category': 'testing', 'severity': 'high', 'obl_id': 'OBL-DORA-08', 'text': 'Threat-led penetration testing', 'weight': 0.9})}
- rule=weak_control, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'status': 'gap', 'nist_family': 'CA', 'effectiveness': 0.05, 'overflow_json': None, 'name': 'Threat-led penetration testing (TLPT)', 'ctrl_id': 'CTRL-20', 'embedding': [0.78, 0.08, 0.73, -0.23]}), 'severity': 0.95}
- rule=system_exposure, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'name': 'Threat-led penetration testing (TLPT)', 'nist_family': 'CA', 'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513], 'ctrl_id': 'CTRL-20', 'status': 'gap', 'effectiveness': 0.05}), 'proc': Node(id=52, labels=["Process"], properties={'name': 'Trade execution', 'criticality': 0.95, 'proc_id': 'PROC-03', 'department': 'trading'}), 'r': Node(id=2, labels=["Regulation"], properties={'jurisdiction': 'EU', 'effective_date': '2025-01-17', 'reg_id': 'REG-DORA', 'overflow_json': None, 'penalty_factor': 0.9, 'name': 'DORA'}), 'severity': 0.95, 'sys': Node(id=60, labels=["System"], properties={'name': 'ERP Core', 'env': 'prod', 'sys_id': 'SYS-01', 'tier': 1}), 'o': Node(id=12, labels=["Obligation"], properties={'obl_id': 'OBL-DORA-08', 'severity': 'high', 'text': 'Threat-led penetration testing', 'category': 'testing', 'weight': 0.9, 'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25]}), 'aggregate_risk': None}
- rule=weak_control, clause=0, bindings={'severity': 0.95, 'c': Node(id=44, labels=["Control"], properties={'status': 'gap', 'embedding': [0.78, 0.08, 0.73, -0.23], 'effectiveness': 0.05, 'nist_family': 'CA', 'ctrl_id': 'CTRL-20', 'overflow_json': None, 'name': 'Threat-led penetration testing (TLPT)'})}
... 1 more child derivations
7) ASSUME¶
What this does: Simulates a counterfactual: "What if controls connected to CloudOps Inc improve to 85% effectiveness?" The ASSUME block modifies control effectiveness for controls in CloudOps Inc's dependency chain, then re-evaluates system exposure.
What to expect: - System exposure rows from the hypothetical world - A comparison showing reduced risk for systems operated by CloudOps Inc
assume_program = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
ASSUME {
MATCH (c:Control)-[:PROTECTS]->(:Process)-[:RUNS_ON]->(:System)-[:OPERATED_BY]->(v:Vendor {name: 'CloudOps Inc'})
WHERE c.status <> 'implemented'
SET c.effectiveness = 0.85, c.status = 'implemented'
} THEN {
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
}
'''
assume_out = session.locy_with(assume_program).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
assume_cmd = next(cmd for cmd in assume_out.command_results if cmd.command_type == 'assume')
assume_rows = _norm_rows(assume_cmd.rows)
print('System exposure after CloudOps Inc upgrade:')
pprint(assume_rows)
# Compare with baseline
print('\nBaseline exposure vs hypothetical:')
baseline_map = {str(r['system']): float(r['aggregate_risk']) for r in exposure_rows}
for row in assume_rows:
sys_name = str(row['system'])
new_risk = float(row['aggregate_risk'])
old_risk = baseline_map.get(sys_name, 0.0)
delta = new_risk - old_risk
print(f" {sys_name}: {old_risk:.4f} -> {new_risk:.4f} (delta {delta:+.4f})")
System exposure after CloudOps Inc upgrade:
[]
Baseline exposure vs hypothetical:
8) ABDUCE¶
What this does:
Asks: "What minimum control improvements close gaps for ERP Core?" Defines a rule
unacceptable for systems with aggregate_risk >= 0.5, then abduces what changes
would make ERP Core no longer unacceptable.
What to expect: A set of candidate modifications (property changes, edge removals) that would bring ERP Core below the 0.5 risk threshold.
program_abduce = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
CREATE RULE unacceptable AS
MATCH (sys:System)
WHERE sys IS system_exposure, aggregate_risk >= 0.5
YIELD KEY sys
ABDUCE NOT unacceptable WHERE sys.name = 'ERP Core'
'''
abduce_out = session.locy_with(program_abduce).with_config({'max_abduce_candidates': 120, 'max_abduce_results': 12, 'timeout_secs': 180.0}).run()
abduce_cmd = next(cmd for cmd in abduce_out.command_results if cmd.command_type == 'abduce')
mods = abduce_cmd.modifications
print('Minimum control changes to reduce ERP Core risk below 0.5:')
for i, item in enumerate(mods[:8], start=1):
print(f'\nCandidate #{i}')
pprint(item)
Minimum control changes to reduce ERP Core risk below 0.5:
9) What To Expect¶
Use these checks to validate output after evaluation:
- Section 5 (Baseline):
weak_controlrows should list controls with severity > 0.system_exposurerows should have MNOR-aggregated scores strictly in [0, 1].vendor_riskrows should show vendor-level rollup.semantic_matchrows should have fit scores >= 0.6.unmatched_obligationrows identify obligations without good control coverage. - Section 6 (EXPLAIN RULE): The derivation tree should trace from
system_exposureback throughweak_controlwith concrete bindings. - Section 7 (ASSUME): Systems in CloudOps Inc's dependency chain should show reduced exposure compared to baseline.
- Section 8 (ABDUCE): At least one candidate modification should be returned, suggesting control improvements that bring ERP Core below the 0.5 threshold.
10) Build-Time Assertions¶
What this does: Validates key invariants from all sections to keep the notebook self-checking in CI/docs builds.
What to expect: All assertions pass with a final confirmation message.
assert gap_rows, 'Expected non-empty compliance gap rows'
assert exposure_rows, 'Expected non-empty system exposure rows'
assert vendor_rows, 'Expected non-empty vendor risk rows'
assert all(0.0 <= float(r['aggregate_risk']) <= 1.0 for r in exposure_rows), 'MNOR scores must be in [0,1]'
assert tree, 'Expected EXPLAIN RULE tree'
assert tree.get('children') or tree.get('rule'), 'Expected derivation tree structure'
if not assume_rows:
print('Note: ASSUME returned no results (hypothesis may eliminate all matching facts)')
else:
print(f'ASSUME returned {len(assume_rows)} rows')
if not mods:
print('Note: ABDUCE returned no modifications (may need higher timeout or different target)')
else:
print(f'ABDUCE found {len(mods)} modifications')
print('All notebook assertions passed.')
Note: ASSUME returned no results (hypothesis may eliminate all matching facts)
Note: ABDUCE returned no modifications (may need higher timeout or different target)
All notebook assertions passed.
11) Cleanup¶
What this does: Removes the temporary on-disk database created for this notebook run.
What to expect: A confirmation that the temporary directory has been deleted.
Cleaned up /tmp/uni_locy_reg_u3mr0c0v