Locy Case Study: Regulatory Change Impact Analysis¶
This case study demonstrates probabilistic graph reasoning for regulatory compliance impact analysis — tracing how regulatory changes propagate through obligation → control → process → system → vendor dependency chains.
Key Locy features demonstrated:
- FOLD MNOR — multi-regulation risk aggregation per system
- similar_to — semantic matching of obligation text to control descriptions
- IS NOT — identify obligations without adequate control coverage
- ASSUME SET — counterfactual vendor upgrade simulation
- ABDUCE — minimal control implementations to close compliance gaps
- EXPLAIN RULE — full audit trail from regulation to impacted system
How To Read This Notebook¶
- Each section explains what code is doing and what output you should expect.
- The dataset is deterministic for stable docs/CI execution.
- Follow the flow: load facts → derive compliance gaps → propagate risk → aggregate per system → explain and optimize.
What this doesdescribes intent;What to expectdescribes output shape.
1) Setup & Data Discovery¶
What this does: Loads helpers, locates prepared regulatory data files, and creates an isolated temporary database.
What to expect:
Printed DATA_DIR and DB_DIR paths.
from pathlib import Path
from pprint import pprint
import csv
import json
import os
import shutil
import tempfile
import uni_db
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open('r', encoding='utf-8', newline='') as f:
return list(csv.DictReader(f))
def _esc(value: str) -> str:
return str(value).replace('\\', '\\\\').replace("'", "\\'")
def _f(value: str) -> float:
return float(value) if value not in ('', None) else 0.0
def _to_int(value: str) -> int:
return int(float(value)) if value not in ('', None) else 0
def _vec(value: str) -> list[float]:
return [float(x) for x in json.loads(value)]
def _norm_key(key: object) -> str:
s = str(key)
if s.startswith('Variable("') and s.endswith('")'):
return s[len('Variable("'):-2]
return s
def _norm_rows(rows: list[dict[object, object]]) -> list[dict[str, object]]:
return [{_norm_key(k): v for k, v in row.items()} for row in rows]
_default_candidates = [
Path('docs/examples/data/locy_regulatory_impact'),
Path('website/docs/examples/data/locy_regulatory_impact'),
Path('examples/data/locy_regulatory_impact'),
Path('../data/locy_regulatory_impact'),
]
if 'LOCY_DATA_DIR' in os.environ:
DATA_DIR = Path(os.environ['LOCY_DATA_DIR']).resolve()
else:
DATA_DIR = next(
(p.resolve() for p in _default_candidates if (p / 'regulations.csv').exists()),
_default_candidates[0].resolve(),
)
if not (DATA_DIR / 'regulations.csv').exists():
raise FileNotFoundError(
'Expected data under docs/examples/data/locy_regulatory_impact. '
'Run from website/ (or repo root) or set LOCY_DATA_DIR.'
)
DB_DIR = tempfile.mkdtemp(prefix='uni_locy_reg_')
db = uni_db.Uni.open(DB_DIR)
session = db.session()
print('DATA_DIR:', DATA_DIR)
print('DB_DIR:', DB_DIR)
DATA_DIR: /home/runner/work/uni-db/uni-db/website/docs/examples/data/locy_regulatory_impact
DB_DIR: /tmp/uni_locy_reg_f8m8qsvb
2) Load Data & Build Focus Cohort¶
What this does:
Loads all 14 CSV files and identifies the 3 focus systems from notebook_cases.csv.
What to expect: Counts for each entity type and the set of focus system IDs.
regulations = _read_csv(DATA_DIR / 'regulations.csv')
obligations = _read_csv(DATA_DIR / 'obligations.csv')
controls = _read_csv(DATA_DIR / 'controls.csv')
processes = _read_csv(DATA_DIR / 'processes.csv')
systems = _read_csv(DATA_DIR / 'systems.csv')
vendors = _read_csv(DATA_DIR / 'vendors.csv')
contracts = _read_csv(DATA_DIR / 'contracts.csv')
requires = _read_csv(DATA_DIR / 'requires.csv')
satisfied_by = _read_csv(DATA_DIR / 'satisfied_by.csv')
protects = _read_csv(DATA_DIR / 'protects.csv')
runs_on = _read_csv(DATA_DIR / 'runs_on.csv')
operated_by = _read_csv(DATA_DIR / 'operated_by.csv')
governed_by = _read_csv(DATA_DIR / 'governed_by.csv')
notebook_cases = _read_csv(DATA_DIR / 'notebook_cases.csv')
focus_sys_ids = {r['sys_id'] for r in notebook_cases}
print('regulations:', len(regulations))
print('obligations:', len(obligations))
print('controls:', len(controls))
print('processes:', len(processes))
print('systems:', len(systems))
print('vendors:', len(vendors))
print('focus systems:', sorted(focus_sys_ids))
regulations: 5
obligations: 20
controls: 25
processes: 10
systems: 8
vendors: 6
focus systems: ['SYS-01', 'SYS-02', 'SYS-03']
3) Define Schema¶
What this does: Defines explicit labels, typed properties, vector fields, and edge types before ingest.
What to expect:
A single Schema created confirmation.
(
db.schema()
.label('Regulation')
.property('reg_id', 'string')
.property('name', 'string')
.property('jurisdiction', 'string')
.property('effective_date', 'string')
.property('penalty_factor', 'float64')
.done()
.label('Obligation')
.property('obl_id', 'string')
.property('text', 'string')
.property('category', 'string')
.property('severity', 'string')
.property('weight', 'float64')
.vector('embedding', 4)
.done()
.label('Control')
.property('ctrl_id', 'string')
.property('name', 'string')
.property('nist_family', 'string')
.property('status', 'string')
.property('effectiveness', 'float64')
.vector('embedding', 4)
.done()
.label('Process')
.property('proc_id', 'string')
.property('name', 'string')
.property('department', 'string')
.property('criticality', 'float64')
.done()
.label('System')
.property('sys_id', 'string')
.property('name', 'string')
.property('env', 'string')
.property('tier', 'int64')
.done()
.label('Vendor')
.property('vendor_id', 'string')
.property('name', 'string')
.property('soc2', 'bool')
.property('risk_rating', 'float64')
.done()
.label('Contract')
.property('contract_id', 'string')
.property('renewal_date', 'string')
.property('annual_value', 'float64')
.done()
.edge_type('REQUIRES', ['Regulation'], ['Obligation']).property('priority', 'string').done()
.edge_type('SATISFIED_BY', ['Obligation'], ['Control']).property('coverage', 'float64').done()
.edge_type('PROTECTS', ['Control'], ['Process']).property('relevance', 'float64').done()
.edge_type('RUNS_ON', ['Process'], ['System']).property('dependency', 'float64').done()
.edge_type('OPERATED_BY', ['System'], ['Vendor']).property('criticality', 'float64').done()
.edge_type('GOVERNED_BY', ['Vendor'], ['Contract']).done()
.apply()
)
print('Schema created')
Schema created
4) Ingest Graph Facts¶
What this does: Creates all nodes (Regulations, Obligations, Controls, Processes, Systems, Vendors, Contracts) and all edges (REQUIRES, SATISFIED_BY, PROTECTS, RUNS_ON, OPERATED_BY, GOVERNED_BY) from the full dataset.
What to expect: Graph counts for each node and edge type.
tx = session.tx()
# --- Nodes ---
for row in regulations:
tx.execute(
f"CREATE (:Regulation {{reg_id: '{_esc(row['reg_id'])}', name: '{_esc(row['name'])}', "
f"jurisdiction: '{_esc(row['jurisdiction'])}', effective_date: '{_esc(row['effective_date'])}', "
f"penalty_factor: {_f(row['penalty_factor'])}}})"
)
for row in obligations:
tx.execute(
f"CREATE (:Obligation {{obl_id: '{_esc(row['obl_id'])}', text: '{_esc(row['text'])}', "
f"category: '{_esc(row['category'])}', severity: '{_esc(row['severity'])}', "
f"weight: {_f(row['weight'])}, embedding: {_vec(row['embedding'])}}})"
)
for row in controls:
tx.execute(
f"CREATE (:Control {{ctrl_id: '{_esc(row['ctrl_id'])}', name: '{_esc(row['name'])}', "
f"nist_family: '{_esc(row['nist_family'])}', status: '{_esc(row['status'])}', "
f"effectiveness: {_f(row['effectiveness'])}, embedding: {_vec(row['embedding'])}}})"
)
for row in processes:
tx.execute(
f"CREATE (:Process {{proc_id: '{_esc(row['proc_id'])}', name: '{_esc(row['name'])}', "
f"department: '{_esc(row['department'])}', criticality: {_f(row['criticality'])}}})"
)
for row in systems:
tx.execute(
f"CREATE (:System {{sys_id: '{_esc(row['sys_id'])}', name: '{_esc(row['name'])}', "
f"env: '{_esc(row['env'])}', tier: {_to_int(row['tier'])}}})"
)
for row in vendors:
tx.execute(
f"CREATE (:Vendor {{vendor_id: '{_esc(row['vendor_id'])}', name: '{_esc(row['name'])}', "
f"soc2: {str(bool(int(row['soc2']))).lower()}, risk_rating: {_f(row['risk_rating'])}}})"
)
for row in contracts:
tx.execute(
f"CREATE (:Contract {{contract_id: '{_esc(row['contract_id'])}', "
f"renewal_date: '{_esc(row['renewal_date'])}', annual_value: {_f(row['annual_value'])}}})"
)
# --- Edges ---
for row in requires:
tx.execute(
f"MATCH (r:Regulation {{reg_id: '{_esc(row['reg_id'])}'}}), "
f"(o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}) "
f"CREATE (r)-[:REQUIRES {{priority: '{_esc(row['priority'])}'}}]->(o)"
)
for row in satisfied_by:
tx.execute(
f"MATCH (o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}), "
f"(c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}) "
f"CREATE (o)-[:SATISFIED_BY {{coverage: {_f(row['coverage'])}}}]->(c)"
)
for row in protects:
tx.execute(
f"MATCH (c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}), "
f"(p:Process {{proc_id: '{_esc(row['proc_id'])}'}}) "
f"CREATE (c)-[:PROTECTS {{relevance: {_f(row['relevance'])}}}]->(p)"
)
for row in runs_on:
tx.execute(
f"MATCH (p:Process {{proc_id: '{_esc(row['proc_id'])}'}}), "
f"(s:System {{sys_id: '{_esc(row['sys_id'])}'}}) "
f"CREATE (p)-[:RUNS_ON {{dependency: {_f(row['dependency'])}}}]->(s)"
)
for row in operated_by:
tx.execute(
f"MATCH (s:System {{sys_id: '{_esc(row['sys_id'])}'}}), "
f"(v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}) "
f"CREATE (s)-[:OPERATED_BY {{criticality: {_f(row['criticality'])}}}]->(v)"
)
for row in governed_by:
tx.execute(
f"MATCH (v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}), "
f"(c:Contract {{contract_id: '{_esc(row['contract_id'])}'}}) "
f"CREATE (v)-[:GOVERNED_BY]->(c)"
)
tx.commit()
# --- Verification ---
counts = session.query("""
MATCH (r:Regulation) WITH count(*) AS regulations
MATCH (o:Obligation) WITH regulations, count(*) AS obligations
MATCH (c:Control) WITH regulations, obligations, count(*) AS controls
MATCH (p:Process) WITH regulations, obligations, controls, count(*) AS processes
MATCH (s:System) WITH regulations, obligations, controls, processes, count(*) AS systems
MATCH (v:Vendor) WITH regulations, obligations, controls, processes, systems, count(*) AS vendors
MATCH (ct:Contract)
RETURN regulations, obligations, controls, processes, systems, vendors, count(ct) AS contracts
""")
print('Graph counts:')
pprint(counts[0])
Graph counts:
Row(regulations=..., obligations=..., controls=..., processes=..., systems=..., vendors=..., contracts=...)
5) Baseline Locy Program¶
What this does:
Defines a multi-rule Locy program that:
1. weak_control — identifies controls that are gaps or have low effectiveness
2. system_exposure — traces the full regulation→obligation→control→process→system chain,
joining on weak controls, and aggregates multi-regulation risk per system using FOLD MNOR
3. vendor_risk — rolls up system exposure to vendors
4. semantic_match — uses similar_to to find obligation-to-control semantic fits
5. has_fit — identifies obligations with a good control match (fit >= 0.7)
6. unmatched_obligation — uses IS NOT to find obligations without adequate coverage
What to expect: - Weak control rows with severity scores - System exposure rows with MNOR-aggregated risk in [0, 1] - Vendor risk rollup rows - Semantic match rows with fit scores >= 0.6 - Unmatched obligation rows for obligations without good control coverage
program = r'''
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
WHERE sys IS system_exposure
FOLD v_risk = MNOR(aggregate_risk * op.criticality)
YIELD KEY v, v_risk
// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
MATCH (o:Obligation), (c:Control)
YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit
// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
MATCH (o:Obligation)
WHERE o IS semantic_match TO c, fit >= 0.7
YIELD KEY o
// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
MATCH (o:Obligation)
WHERE o IS NOT has_fit
YIELD KEY o
// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
'''
print(program)
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
WHERE sys IS system_exposure
FOLD v_risk = MNOR(aggregate_risk * op.criticality)
YIELD KEY v, v_risk
// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
MATCH (o:Obligation), (c:Control)
YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit
// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
MATCH (o:Obligation)
WHERE o IS semantic_match TO c, fit >= 0.7
YIELD KEY o
// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
MATCH (o:Obligation)
WHERE o IS NOT has_fit
YIELD KEY o
// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
baseline_out = session.locy_with(program).with_config({'max_iterations': 400, 'timeout_secs': 180.0}).run()
stats = baseline_out.stats
print('Iterations:', stats.total_iterations)
print('Strata:', stats.strata_evaluated)
print('Queries executed:', stats.queries_executed)
gap_rows = []
exposure_rows = []
vendor_rows = []
semantic_rows = []
unmatched_rows = []
for i, cmd in enumerate(baseline_out.command_results, start=1):
print(f'\nCommand #{i}:', cmd.command_type)
if cmd.command_type in ('query', 'cypher'):
rows = _norm_rows(cmd.rows)
print('rows:', len(rows))
pprint(rows[:5])
if rows and 'severity' in rows[0]:
gap_rows = rows
elif rows and 'aggregate_risk' in rows[0]:
exposure_rows = rows
elif rows and 'v_risk' in rows[0]:
vendor_rows = rows
elif rows and 'fit' in rows[0]:
semantic_rows = rows
elif rows and 'obligation_id' in rows[0]:
unmatched_rows = rows
# Verify MNOR bounds
for row in exposure_rows:
r = float(row['aggregate_risk'])
assert 0.0 <= r <= 1.0, f"MNOR score out of range: {r}"
print(f'\nAll {len(exposure_rows)} system exposure scores in [0, 1] \u2713')
Iterations: 6
Strata: 6
Queries executed: 10
Command #1: query
rows: 12
[{'control': 'Threat-led penetration testing (TLPT)',
'severity': 0.95,
'status': 'gap'},
{'control': 'Digital operational resilience dashboard',
'severity': 0.95,
'status': 'gap'},
{'control': 'ICT supply chain mapping', 'severity': 0.92, 'status': 'gap'},
{'control': 'ICT resilience testing', 'severity': 0.9, 'status': 'gap'},
{'control': 'Automated compliance reporting',
'severity': 0.88,
'status': 'gap'}]
Command #2: query
rows: 5
[{'aggregate_risk': 0.9951514424879684, 'system': 'ERP Core'},
{'aggregate_risk': 0.987753477375, 'system': 'Trading Platform'},
{'aggregate_risk': 0.843252896431, 'system': 'CRM System'},
{'aggregate_risk': 0.7695, 'system': 'Disaster Recovery'},
{'aggregate_risk': 0.7695, 'system': 'Data Warehouse'}]
Command #3: query
rows: 4
[{'v_risk': 0.9321668139832101, 'vendor': 'DataVault Solutions'},
{'v_risk': 0.8956362982391716, 'vendor': 'CloudOps Inc'},
{'v_risk': 0.5902770275017, 'vendor': 'NetSecure Systems'},
{'v_risk': 0.500175, 'vendor': 'OffshoreIT Services'}]
Command #4: query
rows: 10
[{'control': 'Data classification',
'fit': 0.9999273431022875,
'obligation': 'Data protection impact assessment'},
{'control': 'Multi-factor authentication',
'fit': 0.999893226979695,
'obligation': 'ICT third-party risk management'},
{'control': 'ICT resilience testing',
'fit': 0.999805307142468,
'obligation': 'Digital operational resilience testing'},
{'control': 'Threat-led penetration testing (TLPT)',
'fit': 0.999805307142468,
'obligation': 'Threat-led penetration testing'},
{'control': 'DPO governance framework',
'fit': 0.9998006197156357,
'obligation': 'Data protection officer appointment'}]
Command #5: query
rows: 0
[]
All 5 system exposure scores in [0, 1] ✓
6) EXPLAIN RULE¶
What this does: Shows the full derivation tree behind the system_exposure score for ERP Core, tracing back through the weak_control join to the original compliance gaps.
What to expect: A tree with rule names, clause indices, and child derivations showing the regulation → obligation → control → process → system chain.
program_explain = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
EXPLAIN RULE system_exposure WHERE sys.name = 'ERP Core'
'''
explain_out = session.locy_with(program_explain).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
explain_cmd = next(cmd for cmd in explain_out.command_results if cmd.command_type == 'explain')
tree = explain_cmd.tree
def _print_tree(node, depth=0, max_depth=4, max_children=4):
indent = ' ' * depth
print(f"{indent}- rule={node.get('rule')}, clause={node.get('clause_index')}, bindings={node.get('bindings', {})}")
if depth >= max_depth:
return
children = node.get('children', [])
for child in children[:max_children]:
_print_tree(child, depth + 1, max_depth=max_depth, max_children=max_children)
if len(children) > max_children:
print(f"{indent} ... {len(children) - max_children} more child derivations")
print('Explain tree for ERP Core system_exposure:')
_print_tree(tree)
Explain tree for ERP Core system_exposure:
- rule=system_exposure, clause=0, bindings={}
- rule=system_exposure, clause=0, bindings={'sys': Node(id=60, labels=["System"], properties={'sys_id': 'SYS-01', 'env': 'prod', 'name': 'ERP Core', 'tier': 1}), 'aggregate_risk': 0.44550000000000006}
- rule=system_exposure, clause=0, bindings={'sys': Node(id=60, labels=["System"], properties={'sys_id': 'SYS-01', 'env': 'prod', 'tier': 1, 'name': 'ERP Core'}), 'aggregate_risk': 0.7695}
- rule=system_exposure, clause=0, bindings={'aggregate_risk': 0.28600000000000003, 'sys': Node(id=60, labels=["System"], properties={'tier': 1, 'env': 'prod', 'name': 'ERP Core', 'sys_id': 'SYS-01'})}
7) ASSUME¶
What this does:
Simulates a counterfactual: "What if CloudOps Inc remediates its weakest gap control,
ICT resilience testing (a control on the path to ERP Core, which CloudOps operates),
by raising its effectiveness to 85%?" The ASSUME block flips that one control to
implemented, then re-evaluates system exposure.
What to expect: - System exposure rows from the hypothetical world (still non-empty: other weak controls remain) - Reduced (but non-zero) risk for the systems that depend on that control, including CloudOps' ERP Core
assume_program = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
ASSUME {
MATCH (c:Control {name: 'ICT resilience testing'})
SET c.effectiveness = 0.85, c.status = 'implemented'
} THEN {
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
}
'''
assume_out = session.locy_with(assume_program).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
assume_cmd = next(cmd for cmd in assume_out.command_results if cmd.command_type == 'assume')
assume_rows = _norm_rows(assume_cmd.rows)
print('System exposure after CloudOps Inc upgrade:')
pprint(assume_rows)
# Compare with baseline
print('\nBaseline exposure vs hypothetical:')
baseline_map = {str(r['system']): float(r['aggregate_risk']) for r in exposure_rows}
for row in assume_rows:
sys_name = str(row['system'])
new_risk = float(row['aggregate_risk'])
old_risk = baseline_map.get(sys_name, 0.0)
delta = new_risk - old_risk
print(f" {sys_name}: {old_risk:.4f} -> {new_risk:.4f} (delta {delta:+.4f})")
System exposure after CloudOps Inc upgrade:
[{'aggregate_risk': 0.94686975, 'system': 'Trading Platform'},
{'aggregate_risk': 0.9087420535, 'system': 'ERP Core'},
{'aggregate_risk': 0.843252896431, 'system': 'CRM System'},
{'aggregate_risk': 0.7695, 'system': 'Data Warehouse'}]
Baseline exposure vs hypothetical:
Trading Platform: 0.9878 -> 0.9469 (delta -0.0409)
ERP Core: 0.9952 -> 0.9087 (delta -0.0864)
CRM System: 0.8433 -> 0.8433 (delta +0.0000)
Data Warehouse: 0.7695 -> 0.7695 (delta +0.0000)
8) ABDUCE¶
What this does:
Asks: "What minimal changes would clear ERP Core's compliance exposure?" Defines a rule
unacceptable over the full regulation -> obligation -> control -> process -> system
chain (so the abducer can reason about the chain edges), then abduces what removals make
ERP Core no longer unacceptable.
What to expect: At least one validated candidate modification -- each a chain edge whose removal (decoupling an obligation, control, or process) breaks ERP Core's exposure path.
program_abduce = r'''
CREATE RULE weak_control AS
MATCH (c:Control)
WHERE c.status = 'gap' OR c.effectiveness < 0.5
YIELD KEY c, 1.0 - c.effectiveness AS severity
CREATE RULE system_exposure AS
MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control
FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
YIELD KEY sys, aggregate_risk
CREATE RULE unacceptable AS
MATCH (reg:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
WHERE c IS weak_control, sys IS system_exposure, aggregate_risk >= 0.5
YIELD KEY sys
ABDUCE NOT unacceptable WHERE sys.name = 'ERP Core'
'''
abduce_out = session.locy_with(program_abduce).with_config({'max_abduce_candidates': 120, 'max_abduce_results': 12, 'timeout_secs': 180.0}).run()
abduce_cmd = next(cmd for cmd in abduce_out.command_results if cmd.command_type == 'abduce')
mods = abduce_cmd.modifications
print('Minimum control changes to reduce ERP Core risk below 0.5:')
for i, item in enumerate(mods[:8], start=1):
print(f'\nCandidate #{i}')
pprint(item)
Minimum control changes to reduce ERP Core risk below 0.5:
Candidate #1
{'cost': 1.0,
'modification': {'edge_type': 'REQUIRES',
'edge_var': '',
'match_properties': {},
'source_var': 'reg',
'target_var': '',
'type': 'remove_edge'},
'validated': True}
Candidate #2
{'cost': 1.0,
'modification': {'edge_type': 'SATISFIED_BY',
'edge_var': '',
'match_properties': {},
'source_var': 'o',
'target_var': '',
'type': 'remove_edge'},
'validated': True}
Candidate #3
{'cost': 1.0,
'modification': {'edge_type': 'PROTECTS',
'edge_var': '',
'match_properties': {},
'source_var': 'c',
'target_var': '',
'type': 'remove_edge'},
'validated': True}
Candidate #4
{'cost': 1.0,
'modification': {'edge_type': 'RUNS_ON',
'edge_var': '',
'match_properties': {},
'source_var': 'proc',
'target_var': 'o',
'type': 'remove_edge'},
'validated': True}
9) What To Expect¶
Use these checks to validate output after evaluation:
- Section 5 (Baseline):
weak_controlrows should list controls with severity > 0.system_exposurerows should have MNOR-aggregated scores strictly in [0, 1].vendor_riskrows should show vendor-level rollup.semantic_matchrows should have fit scores >= 0.6.unmatched_obligationrows identify obligations without good control coverage. - Section 6 (EXPLAIN RULE): The derivation tree should trace from
system_exposureback throughweak_controlwith concrete bindings. - Section 7 (ASSUME): Remediating the ICT resilience testing control should reduce (but not zero out) exposure for the systems that depend on it, including CloudOps' ERP Core.
- Section 8 (ABDUCE): At least one validated candidate modification should be returned -- a chain edge whose removal clears ERP Core's exposure path.
10) Build-Time Assertions¶
What this does: Validates key invariants from all sections to keep the notebook self-checking in CI/docs builds.
What to expect: All assertions pass with a final confirmation message.
assert gap_rows, 'Expected non-empty compliance gap rows'
assert exposure_rows, 'Expected non-empty system exposure rows'
assert vendor_rows, 'Expected non-empty vendor risk rows'
assert all(0.0 <= float(r['aggregate_risk']) <= 1.0 for r in exposure_rows), 'MNOR scores must be in [0,1]'
assert tree, 'Expected EXPLAIN RULE tree'
assert tree.get('children') or tree.get('rule'), 'Expected derivation tree structure'
assert assume_rows, 'Expected non-empty ASSUME system-exposure rows'
assert any(float(r['aggregate_risk']) < baseline_map.get(str(r['system']), 1.0) for r in assume_rows), \
'Expected at least one system with reduced risk under the ASSUME scenario'
print(f'ASSUME returned {len(assume_rows)} rows (>=1 with reduced risk)')
assert mods, 'Expected at least one ABDUCE candidate modification'
assert any(m.get('validated') for m in mods), 'Expected at least one validated ABDUCE modification'
print(f'ABDUCE found {len(mods)} modifications')
print('All notebook assertions passed.')
ASSUME returned 4 rows (>=1 with reduced risk)
ABDUCE found 4 modifications
All notebook assertions passed.
11) Cleanup¶
What this does: Removes the temporary on-disk database created for this notebook run.
What to expect: A confirmation that the temporary directory has been deleted.
Cleaned up /tmp/uni_locy_reg_f8m8qsvb