Skip to content

Locy Case Study: Regulatory Change Impact Analysis

This case study demonstrates probabilistic graph reasoning for regulatory compliance impact analysis — tracing how regulatory changes propagate through obligation → control → process → system → vendor dependency chains.

Key Locy features demonstrated: - FOLD MNOR — multi-regulation risk aggregation per system - similar_to — semantic matching of obligation text to control descriptions
- IS NOT — identify obligations without adequate control coverage - ASSUME SET — counterfactual vendor upgrade simulation - ABDUCE — minimal control implementations to close compliance gaps - EXPLAIN RULE — full audit trail from regulation to impacted system

How To Read This Notebook

  • Each section explains what code is doing and what output you should expect.
  • The dataset is deterministic for stable docs/CI execution.
  • Follow the flow: load facts → derive compliance gaps → propagate risk → aggregate per system → explain and optimize.
  • What this does describes intent; What to expect describes output shape.

1) Setup & Data Discovery

What this does: Loads helpers, locates prepared regulatory data files, and creates an isolated temporary database.

What to expect: Printed DATA_DIR and DB_DIR paths.

from pathlib import Path
from pprint import pprint
import csv
import json
import os
import shutil
import tempfile

import uni_db

def _read_csv(path: Path) -> list[dict[str, str]]:
    with path.open('r', encoding='utf-8', newline='') as f:
        return list(csv.DictReader(f))

def _esc(value: str) -> str:
    return str(value).replace('\\', '\\\\').replace("'", "\\'")

def _f(value: str) -> float:
    return float(value) if value not in ('', None) else 0.0

def _to_int(value: str) -> int:
    return int(float(value)) if value not in ('', None) else 0

def _vec(value: str) -> list[float]:
    return [float(x) for x in json.loads(value)]

def _norm_key(key: object) -> str:
    s = str(key)
    if s.startswith('Variable("') and s.endswith('")'):
        return s[len('Variable("'):-2]
    return s

def _norm_rows(rows: list[dict[object, object]]) -> list[dict[str, object]]:
    return [{_norm_key(k): v for k, v in row.items()} for row in rows]

_default_candidates = [
    Path('docs/examples/data/locy_regulatory_impact'),
    Path('website/docs/examples/data/locy_regulatory_impact'),
    Path('examples/data/locy_regulatory_impact'),
    Path('../data/locy_regulatory_impact'),
]
if 'LOCY_DATA_DIR' in os.environ:
    DATA_DIR = Path(os.environ['LOCY_DATA_DIR']).resolve()
else:
    DATA_DIR = next(
        (p.resolve() for p in _default_candidates if (p / 'regulations.csv').exists()),
        _default_candidates[0].resolve(),
    )
if not (DATA_DIR / 'regulations.csv').exists():
    raise FileNotFoundError(
        'Expected data under docs/examples/data/locy_regulatory_impact. '
        'Run from website/ (or repo root) or set LOCY_DATA_DIR.'
    )
DB_DIR = tempfile.mkdtemp(prefix='uni_locy_reg_')
db = uni_db.Uni.open(DB_DIR)
session = db.session()
print('DATA_DIR:', DATA_DIR)
print('DB_DIR:', DB_DIR)
DATA_DIR: /home/runner/work/uni-db/uni-db/website/docs/examples/data/locy_regulatory_impact
DB_DIR: /tmp/uni_locy_reg_u3mr0c0v

2) Load Data & Build Focus Cohort

What this does: Loads all 14 CSV files and identifies the 3 focus systems from notebook_cases.csv.

What to expect: Counts for each entity type and the set of focus system IDs.

regulations = _read_csv(DATA_DIR / 'regulations.csv')
obligations = _read_csv(DATA_DIR / 'obligations.csv')
controls = _read_csv(DATA_DIR / 'controls.csv')
processes = _read_csv(DATA_DIR / 'processes.csv')
systems = _read_csv(DATA_DIR / 'systems.csv')
vendors = _read_csv(DATA_DIR / 'vendors.csv')
contracts = _read_csv(DATA_DIR / 'contracts.csv')
requires = _read_csv(DATA_DIR / 'requires.csv')
satisfied_by = _read_csv(DATA_DIR / 'satisfied_by.csv')
protects = _read_csv(DATA_DIR / 'protects.csv')
runs_on = _read_csv(DATA_DIR / 'runs_on.csv')
operated_by = _read_csv(DATA_DIR / 'operated_by.csv')
governed_by = _read_csv(DATA_DIR / 'governed_by.csv')
notebook_cases = _read_csv(DATA_DIR / 'notebook_cases.csv')

focus_sys_ids = {r['sys_id'] for r in notebook_cases}
print('regulations:', len(regulations))
print('obligations:', len(obligations))
print('controls:', len(controls))
print('processes:', len(processes))
print('systems:', len(systems))
print('vendors:', len(vendors))
print('focus systems:', sorted(focus_sys_ids))
regulations: 5
obligations: 20
controls: 25
processes: 10
systems: 8
vendors: 6
focus systems: ['SYS-01', 'SYS-02', 'SYS-03']

3) Define Schema

What this does: Defines explicit labels, typed properties, vector fields, and edge types before ingest.

What to expect: A single Schema created confirmation.

(
    db.schema()
    .label('Regulation')
        .property('reg_id', 'string')
        .property('name', 'string')
        .property('jurisdiction', 'string')
        .property('effective_date', 'string')
        .property('penalty_factor', 'float64')
    .done()
    .label('Obligation')
        .property('obl_id', 'string')
        .property('text', 'string')
        .property('category', 'string')
        .property('severity', 'string')
        .property('weight', 'float64')
        .vector('embedding', 4)
    .done()
    .label('Control')
        .property('ctrl_id', 'string')
        .property('name', 'string')
        .property('nist_family', 'string')
        .property('status', 'string')
        .property('effectiveness', 'float64')
        .vector('embedding', 4)
    .done()
    .label('Process')
        .property('proc_id', 'string')
        .property('name', 'string')
        .property('department', 'string')
        .property('criticality', 'float64')
    .done()
    .label('System')
        .property('sys_id', 'string')
        .property('name', 'string')
        .property('env', 'string')
        .property('tier', 'int64')
    .done()
    .label('Vendor')
        .property('vendor_id', 'string')
        .property('name', 'string')
        .property('soc2', 'bool')
        .property('risk_rating', 'float64')
    .done()
    .label('Contract')
        .property('contract_id', 'string')
        .property('renewal_date', 'string')
        .property('annual_value', 'float64')
    .done()
    .edge_type('REQUIRES', ['Regulation'], ['Obligation']).property('priority', 'string').done()
    .edge_type('SATISFIED_BY', ['Obligation'], ['Control']).property('coverage', 'float64').done()
    .edge_type('PROTECTS', ['Control'], ['Process']).property('relevance', 'float64').done()
    .edge_type('RUNS_ON', ['Process'], ['System']).property('dependency', 'float64').done()
    .edge_type('OPERATED_BY', ['System'], ['Vendor']).property('criticality', 'float64').done()
    .edge_type('GOVERNED_BY', ['Vendor'], ['Contract']).done()
    .apply()
)
print('Schema created')
Schema created

4) Ingest Graph Facts

What this does: Creates all nodes (Regulations, Obligations, Controls, Processes, Systems, Vendors, Contracts) and all edges (REQUIRES, SATISFIED_BY, PROTECTS, RUNS_ON, OPERATED_BY, GOVERNED_BY) from the full dataset.

What to expect: Graph counts for each node and edge type.

tx = session.tx()

# --- Nodes ---
for row in regulations:
    tx.execute(
        f"CREATE (:Regulation {{reg_id: '{_esc(row['reg_id'])}', name: '{_esc(row['name'])}', "
        f"jurisdiction: '{_esc(row['jurisdiction'])}', effective_date: '{_esc(row['effective_date'])}', "
        f"penalty_factor: {_f(row['penalty_factor'])}}})"
    )

for row in obligations:
    tx.execute(
        f"CREATE (:Obligation {{obl_id: '{_esc(row['obl_id'])}', text: '{_esc(row['text'])}', "
        f"category: '{_esc(row['category'])}', severity: '{_esc(row['severity'])}', "
        f"weight: {_f(row['weight'])}, embedding: {_vec(row['embedding'])}}})"
    )

for row in controls:
    tx.execute(
        f"CREATE (:Control {{ctrl_id: '{_esc(row['ctrl_id'])}', name: '{_esc(row['name'])}', "
        f"nist_family: '{_esc(row['nist_family'])}', status: '{_esc(row['status'])}', "
        f"effectiveness: {_f(row['effectiveness'])}, embedding: {_vec(row['embedding'])}}})"
    )

for row in processes:
    tx.execute(
        f"CREATE (:Process {{proc_id: '{_esc(row['proc_id'])}', name: '{_esc(row['name'])}', "
        f"department: '{_esc(row['department'])}', criticality: {_f(row['criticality'])}}})"
    )

for row in systems:
    tx.execute(
        f"CREATE (:System {{sys_id: '{_esc(row['sys_id'])}', name: '{_esc(row['name'])}', "
        f"env: '{_esc(row['env'])}', tier: {_to_int(row['tier'])}}})"
    )

for row in vendors:
    tx.execute(
        f"CREATE (:Vendor {{vendor_id: '{_esc(row['vendor_id'])}', name: '{_esc(row['name'])}', "
        f"soc2: {str(bool(int(row['soc2']))).lower()}, risk_rating: {_f(row['risk_rating'])}}})"
    )

for row in contracts:
    tx.execute(
        f"CREATE (:Contract {{contract_id: '{_esc(row['contract_id'])}', "
        f"renewal_date: '{_esc(row['renewal_date'])}', annual_value: {_f(row['annual_value'])}}})"
    )

# --- Edges ---
for row in requires:
    tx.execute(
        f"MATCH (r:Regulation {{reg_id: '{_esc(row['reg_id'])}'}}), "
        f"(o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}) "
        f"CREATE (r)-[:REQUIRES {{priority: '{_esc(row['priority'])}'}}]->(o)"
    )

for row in satisfied_by:
    tx.execute(
        f"MATCH (o:Obligation {{obl_id: '{_esc(row['obl_id'])}'}}), "
        f"(c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}) "
        f"CREATE (o)-[:SATISFIED_BY {{coverage: {_f(row['coverage'])}}}]->(c)"
    )

for row in protects:
    tx.execute(
        f"MATCH (c:Control {{ctrl_id: '{_esc(row['ctrl_id'])}'}}), "
        f"(p:Process {{proc_id: '{_esc(row['proc_id'])}'}}) "
        f"CREATE (c)-[:PROTECTS {{relevance: {_f(row['relevance'])}}}]->(p)"
    )

for row in runs_on:
    tx.execute(
        f"MATCH (p:Process {{proc_id: '{_esc(row['proc_id'])}'}}), "
        f"(s:System {{sys_id: '{_esc(row['sys_id'])}'}}) "
        f"CREATE (p)-[:RUNS_ON {{dependency: {_f(row['dependency'])}}}]->(s)"
    )

for row in operated_by:
    tx.execute(
        f"MATCH (s:System {{sys_id: '{_esc(row['sys_id'])}'}}), "
        f"(v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}) "
        f"CREATE (s)-[:OPERATED_BY {{criticality: {_f(row['criticality'])}}}]->(v)"
    )

for row in governed_by:
    tx.execute(
        f"MATCH (v:Vendor {{vendor_id: '{_esc(row['vendor_id'])}'}}), "
        f"(c:Contract {{contract_id: '{_esc(row['contract_id'])}'}}) "
        f"CREATE (v)-[:GOVERNED_BY]->(c)"
    )

tx.commit()

# --- Verification ---
counts = session.query("""
MATCH (r:Regulation) WITH count(*) AS regulations
MATCH (o:Obligation) WITH regulations, count(*) AS obligations
MATCH (c:Control) WITH regulations, obligations, count(*) AS controls
MATCH (p:Process) WITH regulations, obligations, controls, count(*) AS processes
MATCH (s:System) WITH regulations, obligations, controls, processes, count(*) AS systems
MATCH (v:Vendor) WITH regulations, obligations, controls, processes, systems, count(*) AS vendors
MATCH (ct:Contract)
RETURN regulations, obligations, controls, processes, systems, vendors, count(ct) AS contracts
""")
print('Graph counts:')
pprint(counts[0])
Graph counts:
Row(regulations=..., obligations=..., controls=..., processes=..., systems=..., vendors=..., contracts=...)

5) Baseline Locy Program

What this does: Defines a multi-rule Locy program that: 1. weak_control — identifies controls that are gaps or have low effectiveness 2. system_exposure — traces the full regulation→obligation→control→process→system chain, joining on weak controls, and aggregates multi-regulation risk per system using FOLD MNOR 3. vendor_risk — rolls up system exposure to vendors 4. semantic_match — uses similar_to to find obligation-to-control semantic fits 5. has_fit — identifies obligations with a good control match (fit >= 0.7) 6. unmatched_obligation — uses IS NOT to find obligations without adequate coverage

What to expect: - Weak control rows with severity scores - System exposure rows with MNOR-aggregated risk in [0, 1] - Vendor risk rollup rows - Semantic match rows with fit scores >= 0.6 - Unmatched obligation rows for obligations without good control coverage

program = r'''
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
  MATCH (c:Control)
  WHERE c.status = 'gap' OR c.effectiveness < 0.5
  YIELD KEY c, 1.0 - c.effectiveness AS severity

// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
  MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
  WHERE c IS weak_control
  FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
  YIELD KEY sys, aggregate_risk

// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
  MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
  WHERE sys IS system_exposure
  FOLD v_risk = MNOR(aggregate_risk * op.criticality)
  YIELD KEY v, v_risk

// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
  MATCH (o:Obligation), (c:Control)
  YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit

// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
  MATCH (o:Obligation)
  WHERE o IS semantic_match TO c, fit >= 0.7
  YIELD KEY o

// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
  MATCH (o:Obligation)
  WHERE o IS NOT has_fit
  YIELD KEY o

// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
'''
print(program)
// Rule 1: Identify weak controls (gaps or low effectiveness)
CREATE RULE weak_control AS
  MATCH (c:Control)
  WHERE c.status = 'gap' OR c.effectiveness < 0.5
  YIELD KEY c, 1.0 - c.effectiveness AS severity

// Rule 2: System exposure — MNOR across all regulation->...->system paths
CREATE RULE system_exposure AS
  MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
  WHERE c IS weak_control
  FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
  YIELD KEY sys, aggregate_risk

// Rule 3: Vendor risk rollup
CREATE RULE vendor_risk AS
  MATCH (sys:System)-[op:OPERATED_BY]->(v:Vendor)
  WHERE sys IS system_exposure
  FOLD v_risk = MNOR(aggregate_risk * op.criticality)
  YIELD KEY v, v_risk

// Rule 4: Semantic obligation-to-control matching
CREATE RULE semantic_match AS
  MATCH (o:Obligation), (c:Control)
  YIELD KEY o, KEY c, similar_to(o.embedding, c.embedding) AS fit

// Rule 5: Obligations with good control match
CREATE RULE has_fit AS
  MATCH (o:Obligation)
  WHERE o IS semantic_match TO c, fit >= 0.7
  YIELD KEY o

// Rule 6: Unmatched obligations (no well-fitting control)
CREATE RULE unmatched_obligation AS
  MATCH (o:Obligation)
  WHERE o IS NOT has_fit
  YIELD KEY o

// Queries
QUERY weak_control WHERE c = c RETURN c.name AS control, c.status AS status, severity ORDER BY severity DESC
QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
QUERY vendor_risk WHERE v = v RETURN v.name AS vendor, v_risk ORDER BY v_risk DESC
QUERY semantic_match WHERE fit >= 0.6 RETURN o.text AS obligation, c.name AS control, fit ORDER BY fit DESC LIMIT 10
QUERY unmatched_obligation WHERE o = o RETURN o.obl_id AS obligation_id, o.text AS obligation_text
baseline_out = session.locy_with(program).with_config({'max_iterations': 400, 'timeout_secs': 180.0}).run()
stats = baseline_out.stats
print('Iterations:', stats.total_iterations)
print('Strata:', stats.strata_evaluated)
print('Queries executed:', stats.queries_executed)

gap_rows = []
exposure_rows = []
vendor_rows = []
semantic_rows = []
unmatched_rows = []
for i, cmd in enumerate(baseline_out.command_results, start=1):
    print(f'\nCommand #{i}:', cmd.command_type)
    if cmd.command_type in ('query', 'cypher'):
        rows = _norm_rows(cmd.rows)
        print('rows:', len(rows))
        pprint(rows[:5])
        if rows and 'severity' in rows[0]:
            gap_rows = rows
        elif rows and 'aggregate_risk' in rows[0]:
            exposure_rows = rows
        elif rows and 'v_risk' in rows[0]:
            vendor_rows = rows
        elif rows and 'fit' in rows[0]:
            semantic_rows = rows
        elif rows and 'obligation_id' in rows[0]:
            unmatched_rows = rows

# Verify MNOR bounds
for row in exposure_rows:
    r = float(row['aggregate_risk'])
    assert 0.0 <= r <= 1.0, f"MNOR score out of range: {r}"
print(f'\nAll {len(exposure_rows)} system exposure scores in [0, 1] \u2713')
Iterations: 0
Strata: 6
Queries executed: 10

Command #1: query
rows: 12
[{'control': 'Threat-led penetration testing (TLPT)',
  'severity': 0.95,
  'status': 'gap'},
 {'control': 'Digital operational resilience dashboard',
  'severity': 0.95,
  'status': 'gap'},
 {'control': 'ICT supply chain mapping', 'severity': 0.92, 'status': 'gap'},
 {'control': 'ICT resilience testing', 'severity': 0.9, 'status': 'gap'},
 {'control': 'Automated compliance reporting',
  'severity': 0.88,
  'status': 'gap'}]

Command #2: query
rows: 5
[{'aggregate_risk': 0.9951514424879684, 'system': 'ERP Core'},
 {'aggregate_risk': 0.987753477375, 'system': 'Trading Platform'},
 {'aggregate_risk': 0.843252896431, 'system': 'CRM System'},
 {'aggregate_risk': 0.7695, 'system': 'Data Warehouse'},
 {'aggregate_risk': 0.7695, 'system': 'Disaster Recovery'}]

Command #3: query
rows: 4
[{'v_risk': 0.9321668139832101, 'vendor': 'DataVault Solutions'},
 {'v_risk': 0.8956362982391716, 'vendor': 'CloudOps Inc'},
 {'v_risk': 0.5902770275017, 'vendor': 'NetSecure Systems'},
 {'v_risk': 0.500175, 'vendor': 'OffshoreIT Services'}]

Command #4: query
rows: 10
[{'control': 'Data classification',
  'fit': 0.9999273431022875,
  'obligation': 'Data protection impact assessment'},
 {'control': 'Multi-factor authentication',
  'fit': 0.999893226979695,
  'obligation': 'ICT third-party risk management'},
 {'control': 'ICT resilience testing',
  'fit': 0.999805307142468,
  'obligation': 'Digital operational resilience testing'},
 {'control': 'Threat-led penetration testing (TLPT)',
  'fit': 0.999805307142468,
  'obligation': 'Threat-led penetration testing'},
 {'control': 'DPO governance framework',
  'fit': 0.9998006197156357,
  'obligation': 'Data protection officer appointment'}]

Command #5: query
rows: 0
[]

All 5 system exposure scores in [0, 1] ✓

6) EXPLAIN RULE

What this does: Shows the full derivation tree behind the system_exposure score for ERP Core, tracing back through the weak_control join to the original compliance gaps.

What to expect: A tree with rule names, clause indices, and child derivations showing the regulation → obligation → control → process → system chain.

program_explain = r'''
CREATE RULE weak_control AS
  MATCH (c:Control)
  WHERE c.status = 'gap' OR c.effectiveness < 0.5
  YIELD KEY c, 1.0 - c.effectiveness AS severity

CREATE RULE system_exposure AS
  MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
  WHERE c IS weak_control
  FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
  YIELD KEY sys, aggregate_risk

EXPLAIN RULE system_exposure WHERE sys.name = 'ERP Core'
'''

explain_out = session.locy_with(program_explain).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
explain_cmd = next(cmd for cmd in explain_out.command_results if cmd.command_type == 'explain')
tree = explain_cmd.tree

def _print_tree(node, depth=0, max_depth=4, max_children=4):
    indent = '  ' * depth
    print(f"{indent}- rule={node.get('rule')}, clause={node.get('clause_index')}, bindings={node.get('bindings', {})}")
    if depth >= max_depth:
        return
    children = node.get('children', [])
    for child in children[:max_children]:
        _print_tree(child, depth + 1, max_depth=max_depth, max_children=max_children)
    if len(children) > max_children:
        print(f"{indent}  ... {len(children) - max_children} more child derivations")

print('Explain tree for ERP Core system_exposure:')
_print_tree(tree)
Explain tree for ERP Core system_exposure:
- rule=system_exposure, clause=0, bindings={}
  - rule=system_exposure, clause=0, bindings={'o': Node(id=8, labels=["Obligation"], properties={'text': 'ICT third-party risk management', 'embedding': [0.30000001192092896, 0.20000000298023224, -0.5, 0.75], 'weight': 0.9, 'obl_id': 'OBL-DORA-04', 'category': 'third_party', 'severity': 'high'}), 'c': Node(id=33, labels=["Control"], properties={'effectiveness': 0.45, 'status': 'partial', 'ctrl_id': 'CTRL-09', 'nist_family': 'SA', 'embedding': [0.3199999928474426, 0.2199999988079071, -0.47999998927116394, 0.7200000286102295], 'name': 'Third-party risk assessment'}), 'aggregate_risk': None, 'proc': Node(id=51, labels=["Process"], properties={'criticality': 0.9, 'name': 'Customer onboarding (KYC)', 'department': 'compliance', 'proc_id': 'PROC-02'}), 'sys': Node(id=60, labels=["System"], properties={'env': 'prod', 'name': 'ERP Core', 'sys_id': 'SYS-01', 'tier': 1}), 'r': Node(id=2, labels=["Regulation"], properties={'overflow_json': None, 'effective_date': '2025-01-17', 'penalty_factor': 0.9, 'name': 'DORA', 'reg_id': 'REG-DORA', 'jurisdiction': 'EU'}), 'severity': 0.55}
    - rule=weak_control, clause=0, bindings={'c': Node(id=33, labels=["Control"], properties={'nist_family': 'SA', 'overflow_json': None, 'name': 'Third-party risk assessment', 'ctrl_id': 'CTRL-09', 'embedding': [0.32, 0.22, -0.48, 0.72], 'effectiveness': 0.45, 'status': 'partial'}), 'severity': 0.55}
  - rule=system_exposure, clause=0, bindings={'aggregate_risk': None, 'proc': Node(id=52, labels=["Process"], properties={'department': 'trading', 'criticality': 0.95, 'proc_id': 'PROC-03', 'name': 'Trade execution'}), 'severity': 0.95, 'o': Node(id=12, labels=["Obligation"], properties={'text': 'Threat-led penetration testing', 'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25], 'severity': 'high', 'obl_id': 'OBL-DORA-08', 'category': 'testing', 'weight': 0.9}), 'c': Node(id=44, labels=["Control"], properties={'ctrl_id': 'CTRL-20', 'status': 'gap', 'name': 'Threat-led penetration testing (TLPT)', 'effectiveness': 0.05, 'nist_family': 'CA', 'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513]}), 'r': Node(id=2, labels=["Regulation"], properties={'effective_date': '2025-01-17', 'overflow_json': None, 'jurisdiction': 'EU', 'reg_id': 'REG-DORA', 'penalty_factor': 0.9, 'name': 'DORA'}), 'sys': Node(id=60, labels=["System"], properties={'tier': 1, 'sys_id': 'SYS-01', 'name': 'ERP Core', 'env': 'prod'})}
    - rule=weak_control, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'name': 'Threat-led penetration testing (TLPT)', 'status': 'gap', 'effectiveness': 0.05, 'ctrl_id': 'CTRL-20', 'embedding': [0.78, 0.08, 0.73, -0.23], 'nist_family': 'CA', 'overflow_json': None}), 'severity': 0.95}
  - rule=system_exposure, clause=0, bindings={'aggregate_risk': None, 'sys': Node(id=60, labels=["System"], properties={'tier': 1, 'sys_id': 'SYS-01', 'env': 'prod', 'name': 'ERP Core'}), 'r': Node(id=2, labels=["Regulation"], properties={'penalty_factor': 0.9, 'jurisdiction': 'EU', 'reg_id': 'REG-DORA', 'overflow_json': None, 'effective_date': '2025-01-17', 'name': 'DORA'}), 'c': Node(id=44, labels=["Control"], properties={'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513], 'ctrl_id': 'CTRL-20', 'name': 'Threat-led penetration testing (TLPT)', 'status': 'gap', 'nist_family': 'CA', 'effectiveness': 0.05}), 'severity': 0.95, 'proc': Node(id=52, labels=["Process"], properties={'name': 'Trade execution', 'department': 'trading', 'proc_id': 'PROC-03', 'criticality': 0.95}), 'o': Node(id=12, labels=["Obligation"], properties={'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25], 'category': 'testing', 'severity': 'high', 'obl_id': 'OBL-DORA-08', 'text': 'Threat-led penetration testing', 'weight': 0.9})}
    - rule=weak_control, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'status': 'gap', 'nist_family': 'CA', 'effectiveness': 0.05, 'overflow_json': None, 'name': 'Threat-led penetration testing (TLPT)', 'ctrl_id': 'CTRL-20', 'embedding': [0.78, 0.08, 0.73, -0.23]}), 'severity': 0.95}
  - rule=system_exposure, clause=0, bindings={'c': Node(id=44, labels=["Control"], properties={'name': 'Threat-led penetration testing (TLPT)', 'nist_family': 'CA', 'embedding': [0.7799999713897705, 0.07999999821186066, 0.7300000190734863, -0.23000000417232513], 'ctrl_id': 'CTRL-20', 'status': 'gap', 'effectiveness': 0.05}), 'proc': Node(id=52, labels=["Process"], properties={'name': 'Trade execution', 'criticality': 0.95, 'proc_id': 'PROC-03', 'department': 'trading'}), 'r': Node(id=2, labels=["Regulation"], properties={'jurisdiction': 'EU', 'effective_date': '2025-01-17', 'reg_id': 'REG-DORA', 'overflow_json': None, 'penalty_factor': 0.9, 'name': 'DORA'}), 'severity': 0.95, 'sys': Node(id=60, labels=["System"], properties={'name': 'ERP Core', 'env': 'prod', 'sys_id': 'SYS-01', 'tier': 1}), 'o': Node(id=12, labels=["Obligation"], properties={'obl_id': 'OBL-DORA-08', 'severity': 'high', 'text': 'Threat-led penetration testing', 'category': 'testing', 'weight': 0.9, 'embedding': [0.800000011920929, 0.10000000149011612, 0.75, -0.25]}), 'aggregate_risk': None}
    - rule=weak_control, clause=0, bindings={'severity': 0.95, 'c': Node(id=44, labels=["Control"], properties={'status': 'gap', 'embedding': [0.78, 0.08, 0.73, -0.23], 'effectiveness': 0.05, 'nist_family': 'CA', 'ctrl_id': 'CTRL-20', 'overflow_json': None, 'name': 'Threat-led penetration testing (TLPT)'})}
  ... 1 more child derivations

7) ASSUME

What this does: Simulates a counterfactual: "What if controls connected to CloudOps Inc improve to 85% effectiveness?" The ASSUME block modifies control effectiveness for controls in CloudOps Inc's dependency chain, then re-evaluates system exposure.

What to expect: - System exposure rows from the hypothetical world - A comparison showing reduced risk for systems operated by CloudOps Inc

assume_program = r'''
CREATE RULE weak_control AS
  MATCH (c:Control)
  WHERE c.status = 'gap' OR c.effectiveness < 0.5
  YIELD KEY c, 1.0 - c.effectiveness AS severity

CREATE RULE system_exposure AS
  MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
  WHERE c IS weak_control
  FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
  YIELD KEY sys, aggregate_risk

ASSUME {
  MATCH (c:Control)-[:PROTECTS]->(:Process)-[:RUNS_ON]->(:System)-[:OPERATED_BY]->(v:Vendor {name: 'CloudOps Inc'})
  WHERE c.status <> 'implemented'
  SET c.effectiveness = 0.85, c.status = 'implemented'
} THEN {
  QUERY system_exposure WHERE sys = sys RETURN sys.name AS system, aggregate_risk ORDER BY aggregate_risk DESC
}
'''

assume_out = session.locy_with(assume_program).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
assume_cmd = next(cmd for cmd in assume_out.command_results if cmd.command_type == 'assume')
assume_rows = _norm_rows(assume_cmd.rows)
print('System exposure after CloudOps Inc upgrade:')
pprint(assume_rows)

# Compare with baseline
print('\nBaseline exposure vs hypothetical:')
baseline_map = {str(r['system']): float(r['aggregate_risk']) for r in exposure_rows}
for row in assume_rows:
    sys_name = str(row['system'])
    new_risk = float(row['aggregate_risk'])
    old_risk = baseline_map.get(sys_name, 0.0)
    delta = new_risk - old_risk
    print(f"  {sys_name}: {old_risk:.4f} -> {new_risk:.4f} (delta {delta:+.4f})")
System exposure after CloudOps Inc upgrade:
[]

Baseline exposure vs hypothetical:

8) ABDUCE

What this does: Asks: "What minimum control improvements close gaps for ERP Core?" Defines a rule unacceptable for systems with aggregate_risk >= 0.5, then abduces what changes would make ERP Core no longer unacceptable.

What to expect: A set of candidate modifications (property changes, edge removals) that would bring ERP Core below the 0.5 risk threshold.

program_abduce = r'''
CREATE RULE weak_control AS
  MATCH (c:Control)
  WHERE c.status = 'gap' OR c.effectiveness < 0.5
  YIELD KEY c, 1.0 - c.effectiveness AS severity

CREATE RULE system_exposure AS
  MATCH (r:Regulation)-[:REQUIRES]->(o:Obligation)-[:SATISFIED_BY]->(c:Control)-[:PROTECTS]->(proc:Process)-[:RUNS_ON]->(sys:System)
  WHERE c IS weak_control
  FOLD aggregate_risk = MNOR(r.penalty_factor * o.weight * severity)
  YIELD KEY sys, aggregate_risk

CREATE RULE unacceptable AS
  MATCH (sys:System)
  WHERE sys IS system_exposure, aggregate_risk >= 0.5
  YIELD KEY sys

ABDUCE NOT unacceptable WHERE sys.name = 'ERP Core'
'''

abduce_out = session.locy_with(program_abduce).with_config({'max_abduce_candidates': 120, 'max_abduce_results': 12, 'timeout_secs': 180.0}).run()
abduce_cmd = next(cmd for cmd in abduce_out.command_results if cmd.command_type == 'abduce')
mods = abduce_cmd.modifications
print('Minimum control changes to reduce ERP Core risk below 0.5:')
for i, item in enumerate(mods[:8], start=1):
    print(f'\nCandidate #{i}')
    pprint(item)
Minimum control changes to reduce ERP Core risk below 0.5:

9) What To Expect

Use these checks to validate output after evaluation:

  • Section 5 (Baseline): weak_control rows should list controls with severity > 0. system_exposure rows should have MNOR-aggregated scores strictly in [0, 1]. vendor_risk rows should show vendor-level rollup. semantic_match rows should have fit scores >= 0.6. unmatched_obligation rows identify obligations without good control coverage.
  • Section 6 (EXPLAIN RULE): The derivation tree should trace from system_exposure back through weak_control with concrete bindings.
  • Section 7 (ASSUME): Systems in CloudOps Inc's dependency chain should show reduced exposure compared to baseline.
  • Section 8 (ABDUCE): At least one candidate modification should be returned, suggesting control improvements that bring ERP Core below the 0.5 threshold.

10) Build-Time Assertions

What this does: Validates key invariants from all sections to keep the notebook self-checking in CI/docs builds.

What to expect: All assertions pass with a final confirmation message.

assert gap_rows, 'Expected non-empty compliance gap rows'
assert exposure_rows, 'Expected non-empty system exposure rows'
assert vendor_rows, 'Expected non-empty vendor risk rows'
assert all(0.0 <= float(r['aggregate_risk']) <= 1.0 for r in exposure_rows), 'MNOR scores must be in [0,1]'
assert tree, 'Expected EXPLAIN RULE tree'
assert tree.get('children') or tree.get('rule'), 'Expected derivation tree structure'
if not assume_rows:
    print('Note: ASSUME returned no results (hypothesis may eliminate all matching facts)')
else:
    print(f'ASSUME returned {len(assume_rows)} rows')
if not mods:
    print('Note: ABDUCE returned no modifications (may need higher timeout or different target)')
else:
    print(f'ABDUCE found {len(mods)} modifications')
print('All notebook assertions passed.')
Note: ASSUME returned no results (hypothesis may eliminate all matching facts)
Note: ABDUCE returned no modifications (may need higher timeout or different target)
All notebook assertions passed.

11) Cleanup

What this does: Removes the temporary on-disk database created for this notebook run.

What to expect: A confirmation that the temporary directory has been deleted.

shutil.rmtree(DB_DIR, ignore_errors=True)
print('Cleaned up', DB_DIR)
Cleaned up /tmp/uni_locy_reg_u3mr0c0v