Locy Flagship #2: Pharma Batch Genealogy Decisioning¶
This notebook uses real pharma process + laboratory data (Figshare / Scientific Data) and demonstrates:
ALONG: carry batch risk through campaign genealogy paths.FOLD: aggregate impact across derived paths.BEST BY: choose intervention strategies by risk first, cost second.DERIVE: materialize propagation edges.ASSUME: evaluate temporary containment scenarios.ABDUCE: search minimal changes that alter decisions.EXPLAIN RULE: inspect derivation evidence.
It is schema-first (recommended) and written for both first-time and advanced Locy readers.
How To Read This Notebook¶
- Every code section includes intent and expected outputs.
- Data is a deterministic focus slice for stable docs/CI runtime.
- The flow is: load facts -> derive risk paths -> optimize decisions -> simulate and explain.
1) Setup and Data Discovery¶
What this does: Load helpers, locate prepared pharma data, and create an isolated temporary database.
What to expect:
Printed DATA_DIR and DB_DIR paths.
from pathlib import Path
from pprint import pprint
import csv
import os
import shutil
import tempfile
import uni_db
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open('r', encoding='utf-8', newline='') as f:
return list(csv.DictReader(f))
def _esc(value: str) -> str:
return str(value).replace('\\', '\\\\').replace("'", "\\'")
def _f(value: str) -> float:
return float(value) if value not in ('', None) else 0.0
def _norm_key(key: object) -> str:
s = str(key)
if s.startswith('Variable("') and s.endswith('")'):
return s[len('Variable("'):-2]
return s
def _norm_rows(rows: list[dict[object, object]]) -> list[dict[str, object]]:
return [{_norm_key(k): v for k, v in row.items()} for row in rows]
_default_candidates = [
Path('docs/examples/data/locy_pharma_batch_genealogy'),
Path('website/docs/examples/data/locy_pharma_batch_genealogy'),
Path('examples/data/locy_pharma_batch_genealogy'),
Path('../data/locy_pharma_batch_genealogy'),
]
if 'LOCY_DATA_DIR' in os.environ:
DATA_DIR = Path(os.environ['LOCY_DATA_DIR']).resolve()
else:
DATA_DIR = next(
(p.resolve() for p in _default_candidates if (p / 'pharma_batches.csv').exists()),
_default_candidates[0].resolve(),
)
if not (DATA_DIR / 'pharma_batches.csv').exists():
raise FileNotFoundError(
'Expected data under docs/examples/data/locy_pharma_batch_genealogy. '
'Run from website/ (or repo root) or set LOCY_DATA_DIR.'
)
DB_DIR = tempfile.mkdtemp(prefix='uni_locy_pharma_')
db = uni_db.Database(DB_DIR)
print('DATA_DIR:', DATA_DIR)
print('DB_DIR:', DB_DIR)
DATA_DIR: /home/runner/work/uni-db/uni-db/website/docs/examples/data/locy_pharma_batch_genealogy DB_DIR: /tmp/uni_locy_pharma_5t6in59e
2) Load Real Data and Build a Focus Cohort¶
What this does: Loads batches, material lots, campaign links, and intervention plans; selects a deterministic cohort for fast execution.
What to expect: Counts for focus batches, deviation batches, material lots, genealogy edges, and action plans.
batches = _read_csv(DATA_DIR / 'pharma_batches.csv')
materials = _read_csv(DATA_DIR / 'pharma_material_lots.csv')
usage_edges = _read_csv(DATA_DIR / 'pharma_usage_edges.csv')
campaign_edges = _read_csv(DATA_DIR / 'pharma_campaign_edges.csv')
actions = _read_csv(DATA_DIR / 'pharma_action_plans.csv')
notebook_cases = _read_csv(DATA_DIR / 'pharma_notebook_cases.csv')
focus_deviation_ids = [r['batch_id'] for r in notebook_cases[:24]]
in_spec_ids = [r['batch_id'] for r in batches if r['quality_state'] == 'IN_SPEC'][:72]
focus_ids = set(focus_deviation_ids + in_spec_ids)
focus_batches = [r for r in batches if r['batch_id'] in focus_ids]
focus_usage = [r for r in usage_edges if r['batch_id'] in focus_ids]
material_ids = {r['material_lot_id'] for r in focus_usage}
focus_materials = [r for r in materials if r['material_lot_id'] in material_ids]
focus_campaign = [
r for r in campaign_edges
if r['src_batch_id'] in focus_ids and r['dst_batch_id'] in focus_ids
]
focus_actions = [r for r in actions if r['batch_id'] in focus_ids]
print('focus batches:', len(focus_batches))
print('focus deviation batches:', sum(1 for r in focus_batches if r['quality_state'] == 'DEVIATION'))
print('focus material lots:', len(focus_materials))
print('focus usage edges:', len(focus_usage))
print('focus campaign edges:', len(focus_campaign))
print('focus action plans:', len(focus_actions))
focus batches: 96 focus deviation batches: 24 focus material lots: 71 focus usage edges: 384 focus campaign edges: 84 focus action plans: 72
3) Define Schema (Recommended)¶
What this does: Defines typed nodes and relationships before ingest.
What to expect:
A single Schema created confirmation.
(
db.schema()
.label('Batch')
.property('batch_id', 'string')
.property('product_code', 'string')
.property('quality_state', 'string')
.property('deviation_score', 'float64')
.property('process_risk', 'float64')
.property('dissolution_min', 'float64')
.property('residual_solvent', 'float64')
.property('impurities_total', 'float64')
.done()
.label('MaterialLot')
.property('material_lot_id', 'string')
.property('material_type', 'string')
.property('intrinsic_risk', 'float64')
.property('batches_seen', 'int64')
.done()
.label('ActionPlan')
.property('action_id', 'string')
.property('batch_id', 'string')
.property('action_type', 'string')
.property('cost_index', 'float64')
.property('downtime_hours', 'float64')
.property('mitigation_factor', 'float64')
.done()
.edge_type('USED_IN', ['MaterialLot'], ['Batch'])
.property('criticality_weight', 'float64')
.done()
.edge_type('NEXT_BATCH', ['Batch'], ['Batch'])
.property('carry_risk', 'float64')
.done()
.edge_type('CANDIDATE_FOR', ['Batch'], ['ActionPlan'])
.done()
.edge_type('PROPAGATES_TO', ['Batch'], ['Batch'])
.done()
.edge_type('CONTAINED_BY', ['Batch'], ['ActionPlan'])
.done()
.apply()
)
print('Schema created')
Schema created
4) Ingest the Pharma Genealogy Graph¶
What this does:
Creates Batch, MaterialLot, and ActionPlan nodes, then connects material usage and campaign carryover edges.
What to expect: Graph counts for key node and edge types.
for row in focus_batches:
db.execute(
f"CREATE (:Batch {{batch_id: '{_esc(row['batch_id'])}', product_code: '{_esc(row['product_code'])}', "
f"quality_state: '{_esc(row['quality_state'])}', deviation_score: {_f(row['deviation_score'])}, "
f"process_risk: {_f(row['process_risk'])}, dissolution_min: {_f(row['dissolution_min'])}, "
f"residual_solvent: {_f(row['residual_solvent'])}, impurities_total: {_f(row['impurities_total'])}}})"
)
for row in focus_materials:
db.execute(
f"CREATE (:MaterialLot {{material_lot_id: '{_esc(row['material_lot_id'])}', material_type: '{_esc(row['material_type'])}', "
f"intrinsic_risk: {_f(row['intrinsic_risk'])}, batches_seen: {int(float(row['batches_seen']))}}})"
)
for row in focus_actions:
db.execute(
f"CREATE (:ActionPlan {{action_id: '{_esc(row['action_id'])}', batch_id: '{_esc(row['batch_id'])}', "
f"action_type: '{_esc(row['action_type'])}', cost_index: {_f(row['cost_index'])}, "
f"downtime_hours: {_f(row['downtime_hours'])}, mitigation_factor: {_f(row['mitigation_factor'])}}})"
)
for row in focus_usage:
db.execute(
f"MATCH (m:MaterialLot {{material_lot_id: '{_esc(row['material_lot_id'])}'}}), "
f"(b:Batch {{batch_id: '{_esc(row['batch_id'])}'}}) "
f"CREATE (m)-[:USED_IN {{criticality_weight: {_f(row['criticality_weight'])}}}]->(b)"
)
for row in focus_campaign:
db.execute(
f"MATCH (s:Batch {{batch_id: '{_esc(row['src_batch_id'])}'}}), "
f"(d:Batch {{batch_id: '{_esc(row['dst_batch_id'])}'}}) "
f"CREATE (s)-[:NEXT_BATCH {{carry_risk: {_f(row['carry_risk'])}}}]->(d)"
)
for row in focus_actions:
db.execute(
f"MATCH (b:Batch {{batch_id: '{_esc(row['batch_id'])}'}}), "
f"(a:ActionPlan {{action_id: '{_esc(row['action_id'])}'}}) "
"CREATE (b)-[:CANDIDATE_FOR]->(a)"
)
counts = db.query("""
MATCH (b:Batch) WITH count(*) AS batches
MATCH (m:MaterialLot) WITH batches, count(*) AS materials
MATCH ()-[u:USED_IN]->() WITH batches, materials, count(*) AS usage_edges
MATCH ()-[n:NEXT_BATCH]->() WITH batches, materials, usage_edges, count(*) AS campaign_edges
MATCH (a:ActionPlan)
RETURN batches, materials, usage_edges, campaign_edges, count(*) AS action_plans
""")
print('Graph counts:')
pprint(counts[0])
Graph counts:
{'action_plans': 72,
'batches': 96,
'campaign_edges': 84,
'materials': 71,
'usage_edges': 384}
5) Baseline Locy Program (DERIVE + ALONG + FOLD + BEST BY)¶
What this does:
Builds recursive campaign propagation, aggregates impact with FOLD, materializes propagation edges with DERIVE, and chooses best action per deviating batch with BEST BY.
What to expect:
- propagation path rows (
source_batch,impacted_batch,path_risk,hops) deriveaffected count- best action rows (
batch_id,action_type,residual_risk,plan_cost)
program_baseline = r'''
CREATE RULE deviation_batch AS
MATCH (b:Batch)
WHERE b.quality_state = 'DEVIATION'
YIELD KEY b
CREATE RULE propagation_path AS
MATCH (src:Batch)-[e:NEXT_BATCH]->(dst:Batch)
WHERE src IS deviation_batch
ALONG path_risk = src.process_risk + e.carry_risk, hops = 1
BEST BY path_risk DESC, hops ASC
YIELD KEY src, KEY dst, path_risk, hops
CREATE RULE propagation_path AS
MATCH (src:Batch)-[e:NEXT_BATCH]->(mid:Batch)
WHERE mid IS propagation_path TO dst
ALONG path_risk = prev.path_risk + e.carry_risk, hops = prev.hops + 1
BEST BY path_risk DESC, hops ASC
YIELD KEY src, KEY dst, path_risk, hops
CREATE RULE propagation_summary AS
MATCH (src:Batch)
WHERE src IS propagation_path TO dst
FOLD impacted_batches = COUNT(dst), total_path_risk = SUM(path_risk), max_hops = MAX(hops)
YIELD KEY src, impacted_batches, total_path_risk, max_hops
CREATE RULE derive_propagation AS
MATCH (src:Batch)-[:NEXT_BATCH]->(dst:Batch)
WHERE src IS deviation_batch
DERIVE (src)-[:PROPAGATES_TO]->(dst)
CREATE RULE best_action AS
MATCH (b:Batch)-[:CANDIDATE_FOR]->(a:ActionPlan)
WHERE b IS deviation_batch
ALONG residual_risk = b.process_risk * (1.0 - a.mitigation_factor), plan_cost = a.cost_index
BEST BY residual_risk ASC, plan_cost ASC
YIELD KEY b, a, residual_risk, plan_cost
QUERY propagation_path WHERE src = src RETURN src.batch_id AS source_batch, dst.batch_id AS impacted_batch, path_risk, hops
DERIVE derive_propagation
QUERY best_action WHERE b = b RETURN b.batch_id AS batch_id, a.action_type AS action_type, residual_risk, plan_cost
'''
baseline_out = db.locy_evaluate(program_baseline, {'max_iterations': 400, 'timeout': 60.0, 'max_abduce_candidates': 80, 'max_abduce_results': 12})
stats = baseline_out['stats']
print('Iterations:', stats.total_iterations)
print('Strata:', stats.strata_evaluated)
print('Queries executed:', stats.queries_executed)
propagation_rows = []
best_plan_rows = []
propagation_path_rows = []
for i, cmd in enumerate(baseline_out['command_results'], start=1):
print(f"\nCommand #{i}:", cmd.get('type'))
if cmd.get('type') in ('query', 'cypher'):
rows = _norm_rows(cmd.get('rows', []))
print('rows:', len(rows))
pprint(rows[:5])
if rows and 'impacted_batch' in rows[0]:
propagation_path_rows = rows
if rows and 'action_type' in rows[0]:
best_plan_rows = rows
elif cmd.get('type') == 'derive':
print('affected:', cmd.get('affected'))
source_rollup = {}
for row in propagation_path_rows:
source = str(row.get('source_batch', ''))
impacted = str(row.get('impacted_batch', ''))
info = source_rollup.setdefault(source, {'source_batch': source, 'impacted': set(), 'downstream_risk': 0.0, 'max_hops': 0})
if impacted:
info['impacted'].add(impacted)
info['downstream_risk'] += _f(str(row.get('path_risk', '0')))
info['max_hops'] = max(int(info['max_hops']), int(_f(str(row.get('hops', '0')))))
propagation_rows = [
{
'source_batch': v['source_batch'],
'impacted_batches': len(v['impacted']),
'downstream_risk': v['downstream_risk'],
'max_hops': v['max_hops'],
}
for v in source_rollup.values()
]
propagation_rows = sorted(
propagation_rows,
key=lambda r: (-int(r.get('impacted_batches', 0)), -_f(str(r.get('downstream_risk', '0'))), str(r.get('source_batch', ''))),
)
best_plan_rows = sorted(
best_plan_rows,
key=lambda r: (_f(str(r.get('residual_risk', '0'))), _f(str(r.get('plan_cost', '0'))), str(r.get('batch_id', ''))),
)
if not propagation_rows:
raise RuntimeError('Expected non-empty propagation summary rows')
if not best_plan_rows:
raise RuntimeError('Expected non-empty best action rows')
focus_source_batch = str(propagation_rows[0]['source_batch'])
focus_plan_batch = str(best_plan_rows[0]['batch_id'])
print('\nTop propagation source batch:', focus_source_batch)
print('Top selected action batch:', focus_plan_batch)
Iterations: 16
Strata: 5
Queries executed: 34
Command #1: query
rows: 95
[{'hops': 1,
'impacted_batch': 'B17-0225',
'path_risk': 1.476875,
'source_batch': 'B17-0224'},
{'hops': 1,
'impacted_batch': 'B17-0372',
'path_risk': 1.40574812,
'source_batch': 'B17-0370'},
{'hops': 1,
'impacted_batch': 'B17-0371',
'path_risk': 1.8283375,
'source_batch': 'B17-0370'},
{'hops': 1,
'impacted_batch': 'B17-0375',
'path_risk': 1.3443,
'source_batch': 'B17-0374'},
{'hops': 1,
'impacted_batch': 'B17-0376',
'path_risk': 1.93,
'source_batch': 'B17-0375'}]
Command #2: derive
affected: 84
Command #3: query
rows: 72
[{'action_type': 'targeted_retest',
'batch_id': 'B17-0224',
'plan_cost': 6.0,
'residual_risk': 0.3445},
{'action_type': 'release_with_sampling',
'batch_id': 'B17-0224',
'plan_cost': 2.5,
'residual_risk': 0.49024999999999996},
{'action_type': 'deep_clean_hold',
'batch_id': 'B17-0224',
'plan_cost': 14.0,
'residual_risk': 0.1855},
{'action_type': 'targeted_retest',
'batch_id': 'B17-0370',
'plan_cost': 6.0,
'residual_risk': 0.46241},
{'action_type': 'release_with_sampling',
'batch_id': 'B17-0370',
'plan_cost': 2.5,
'residual_risk': 0.658045}]
Top propagation source batch: B17-0192
Top selected action batch: B17-0674
6) Explain One Propagation Path (EXPLAIN RULE)¶
What this does: Produces a derivation tree for recursive propagation from one high-risk source batch.
What to expect: A non-empty tree with rule/clause and supporting child derivations.
program_explain = f'''
CREATE RULE deviation_batch AS
MATCH (b:Batch)
WHERE b.quality_state = 'DEVIATION'
YIELD KEY b
CREATE RULE propagation_path AS
MATCH (src:Batch)-[e:NEXT_BATCH]->(dst:Batch)
WHERE src IS deviation_batch
ALONG path_risk = src.process_risk + e.carry_risk, hops = 1
BEST BY path_risk DESC, hops ASC
YIELD KEY src, KEY dst, path_risk, hops
CREATE RULE propagation_path AS
MATCH (src:Batch)-[e:NEXT_BATCH]->(mid:Batch)
WHERE mid IS propagation_path TO dst
ALONG path_risk = prev.path_risk + e.carry_risk, hops = prev.hops + 1
BEST BY path_risk DESC, hops ASC
YIELD KEY src, KEY dst, path_risk, hops
EXPLAIN RULE propagation_path WHERE src.batch_id = '{focus_source_batch}' RETURN dst
'''
explain_out = db.locy_evaluate(program_explain)
explain_cmd = next(cmd for cmd in explain_out['command_results'] if cmd.get('type') == 'explain')
tree = explain_cmd['tree']
def _print_tree(node, depth=0, max_depth=3, max_children=3):
indent = ' ' * depth
print(f"{indent}- rule={node.get('rule')}, clause={node.get('clause_index')}, bindings={node.get('bindings', {})}")
if depth >= max_depth:
return
children = node.get('children', [])
for child in children[:max_children]:
_print_tree(child, depth + 1, max_depth=max_depth, max_children=max_children)
if len(children) > max_children:
print(f"{indent} ... {len(children) - max_children} more child derivations")
print('Explain tree for source batch:', focus_source_batch)
_print_tree(tree)
Explain tree for source batch: B17-0192
- rule=propagation_path, clause=0, bindings={}
- rule=propagation_path, clause=1, bindings={'hops': 4, 'e': {'_id': 384, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.38745}, 'src': {'_id': '0', '_labels': ['Batch'], 'deviation_score': 0.0, 'dissolution_min': 83.0, 'quality_state': 'IN_SPEC', 'impurities_total': 0.14, 'residual_solvent': 0.05, 'process_risk': 0.1, 'batch_id': 'B17-0192', 'product_code': '17'}, 'mid': {'_id': '1', '_labels': ['Batch'], 'process_risk': 0.2, 'dissolution_min': 85.0, 'quality_state': 'IN_SPEC', 'residual_solvent': 0.03, 'batch_id': 'B17-0193', 'deviation_score': 0.8, 'impurities_total': 0.1, 'product_code': '17'}, 'dst': {'_id': '4', '_labels': ['Batch'], 'batch_id': 'B17-0225', 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.13, 'dissolution_min': 82.0, 'deviation_score': 2.2, 'process_risk': 0.375, 'residual_solvent': 0.02}, 'path_risk': 2.900325}
- rule=propagation_path, clause=1, bindings={'src': {'_id': '1', '_labels': ['Batch'], 'residual_solvent': 0.03, 'quality_state': 'IN_SPEC', 'process_risk': 0.2, 'deviation_score': 0.8, 'product_code': '17', 'impurities_total': 0.1, 'dissolution_min': 85.0, 'batch_id': 'B17-0193'}, 'path_risk': 2.512875, 'dst': {'_id': '4', '_labels': ['Batch'], 'batch_id': 'B17-0225', 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.13, 'dissolution_min': 82.0, 'deviation_score': 2.2, 'process_risk': 0.375, 'residual_solvent': 0.02}, 'hops': 3, 'mid': {'_id': '2', '_labels': ['Batch'], 'process_risk': 0.2, 'dissolution_min': 85.0, 'impurities_total': 0.05, 'deviation_score': 0.8, 'quality_state': 'IN_SPEC', 'residual_solvent': 0.04, 'batch_id': 'B17-0194', 'product_code': '17'}, 'e': {'_id': 385, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.56}}
- rule=propagation_path, clause=1, bindings={'src': {'_id': '2', '_labels': ['Batch'], 'residual_solvent': 0.04, 'batch_id': 'B17-0194', 'product_code': '17', 'dissolution_min': 85.0, 'quality_state': 'IN_SPEC', 'deviation_score': 0.8, 'process_risk': 0.2, 'impurities_total': 0.05}, 'dst': {'_id': '4', '_labels': ['Batch'], 'batch_id': 'B17-0225', 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.13, 'dissolution_min': 82.0, 'deviation_score': 2.2, 'process_risk': 0.375, 'residual_solvent': 0.02}, 'path_risk': 1.952875, 'mid': {'_id': '3', '_labels': ['Batch'], 'dissolution_min': 84.0, 'impurities_total': 0.28, 'process_risk': 0.6625, 'residual_solvent': 0.02, 'deviation_score': 4.5, 'batch_id': 'B17-0224', 'product_code': '17', 'quality_state': 'DEVIATION'}, 'hops': 2, 'e': {'_id': 386, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.476}}
- rule=propagation_path, clause=1, bindings={'hops': 9, 'mid': {'_id': '1', '_labels': ['Batch'], 'batch_id': 'B17-0193', 'impurities_total': 0.1, 'quality_state': 'IN_SPEC', 'residual_solvent': 0.03, 'process_risk': 0.2, 'deviation_score': 0.8, 'dissolution_min': 85.0, 'product_code': '17'}, 'e': {'_id': 384, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.38745}, 'dst': {'_id': '10', '_labels': ['Batch'], 'deviation_score': 0.8, 'dissolution_min': 83.0, 'residual_solvent': 0.09, 'batch_id': 'B17-0371', 'process_risk': 0.2, 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.05}, 'path_risk': 5.8137, 'src': {'_id': '0', '_labels': ['Batch'], 'quality_state': 'IN_SPEC', 'dissolution_min': 83.0, 'process_risk': 0.1, 'deviation_score': 0.0, 'batch_id': 'B17-0192', 'product_code': '17', 'residual_solvent': 0.05, 'impurities_total': 0.14}}
- rule=propagation_path, clause=1, bindings={'hops': 8, 'mid': {'_id': '2', '_labels': ['Batch'], 'batch_id': 'B17-0194', 'dissolution_min': 85.0, 'residual_solvent': 0.04, 'quality_state': 'IN_SPEC', 'impurities_total': 0.05, 'product_code': '17', 'process_risk': 0.2, 'deviation_score': 0.8}, 'dst': {'_id': '10', '_labels': ['Batch'], 'deviation_score': 0.8, 'dissolution_min': 83.0, 'residual_solvent': 0.09, 'batch_id': 'B17-0371', 'process_risk': 0.2, 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.05}, 'path_risk': 5.42625, 'e': {'_id': 385, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.56}, 'src': {'_id': '1', '_labels': ['Batch'], 'process_risk': 0.2, 'dissolution_min': 85.0, 'batch_id': 'B17-0193', 'impurities_total': 0.1, 'residual_solvent': 0.03, 'deviation_score': 0.8, 'product_code': '17', 'quality_state': 'IN_SPEC'}}
- rule=propagation_path, clause=1, bindings={'dst': {'_id': '10', '_labels': ['Batch'], 'deviation_score': 0.8, 'dissolution_min': 83.0, 'residual_solvent': 0.09, 'batch_id': 'B17-0371', 'process_risk': 0.2, 'product_code': '17', 'quality_state': 'IN_SPEC', 'impurities_total': 0.05}, 'e': {'_id': 386, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.476}, 'src': {'_id': '2', '_labels': ['Batch'], 'dissolution_min': 85.0, 'quality_state': 'IN_SPEC', 'impurities_total': 0.05, 'deviation_score': 0.8, 'product_code': '17', 'process_risk': 0.2, 'residual_solvent': 0.04, 'batch_id': 'B17-0194'}, 'mid': {'_id': '3', '_labels': ['Batch'], 'batch_id': 'B17-0224', 'dissolution_min': 84.0, 'impurities_total': 0.28, 'product_code': '17', 'deviation_score': 4.5, 'residual_solvent': 0.02, 'quality_state': 'DEVIATION', 'process_risk': 0.6625}, 'hops': 7, 'path_risk': 4.86625}
- rule=propagation_path, clause=1, bindings={'e': {'_id': 384, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.38745}, 'mid': {'_id': '1', '_labels': ['Batch'], 'batch_id': 'B17-0193', 'deviation_score': 0.8, 'dissolution_min': 85.0, 'impurities_total': 0.1, 'product_code': '17', 'process_risk': 0.2, 'residual_solvent': 0.03, 'quality_state': 'IN_SPEC'}, 'path_risk': 5.39111062, 'dst': {'_id': '11', '_labels': ['Batch'], 'deviation_score': 3.0, 'dissolution_min': 82.0, 'product_code': '17', 'process_risk': 0.475, 'quality_state': 'IN_SPEC', 'batch_id': 'B17-0372', 'residual_solvent': 0.05, 'impurities_total': 0.05}, 'hops': 9, 'src': {'_id': '0', '_labels': ['Batch'], 'deviation_score': 0.0, 'impurities_total': 0.14, 'product_code': '17', 'quality_state': 'IN_SPEC', 'batch_id': 'B17-0192', 'dissolution_min': 83.0, 'residual_solvent': 0.05, 'process_risk': 0.1}}
- rule=propagation_path, clause=1, bindings={'e': {'_id': 385, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.56}, 'hops': 8, 'mid': {'_id': '2', '_labels': ['Batch'], 'quality_state': 'IN_SPEC', 'batch_id': 'B17-0194', 'process_risk': 0.2, 'product_code': '17', 'deviation_score': 0.8, 'impurities_total': 0.05, 'residual_solvent': 0.04, 'dissolution_min': 85.0}, 'dst': {'_id': '11', '_labels': ['Batch'], 'deviation_score': 3.0, 'dissolution_min': 82.0, 'product_code': '17', 'process_risk': 0.475, 'quality_state': 'IN_SPEC', 'batch_id': 'B17-0372', 'residual_solvent': 0.05, 'impurities_total': 0.05}, 'src': {'_id': '1', '_labels': ['Batch'], 'quality_state': 'IN_SPEC', 'impurities_total': 0.1, 'process_risk': 0.2, 'residual_solvent': 0.03, 'deviation_score': 0.8, 'dissolution_min': 85.0, 'batch_id': 'B17-0193', 'product_code': '17'}, 'path_risk': 5.00366062}
- rule=propagation_path, clause=1, bindings={'dst': {'_id': '11', '_labels': ['Batch'], 'deviation_score': 3.0, 'dissolution_min': 82.0, 'product_code': '17', 'process_risk': 0.475, 'quality_state': 'IN_SPEC', 'batch_id': 'B17-0372', 'residual_solvent': 0.05, 'impurities_total': 0.05}, 'path_risk': 4.44366062, 'mid': {'_id': '3', '_labels': ['Batch'], 'product_code': '17', 'impurities_total': 0.28, 'quality_state': 'DEVIATION', 'dissolution_min': 84.0, 'batch_id': 'B17-0224', 'residual_solvent': 0.02, 'process_risk': 0.6625, 'deviation_score': 4.5}, 'e': {'_id': 386, '_type': 'NEXT_BATCH', '_src': '0', '_dst': '0', 'carry_risk': 0.476}, 'src': {'_id': '2', '_labels': ['Batch'], 'impurities_total': 0.05, 'quality_state': 'IN_SPEC', 'deviation_score': 0.8, 'product_code': '17', 'residual_solvent': 0.04, 'process_risk': 0.2, 'dissolution_min': 85.0, 'batch_id': 'B17-0194'}, 'hops': 7}
... 2 more child derivations
7) Counterfactual Containment (ASSUME)¶
What this does: Temporarily applies deep-clean containment for high-risk deviating batches and compares contained vs residual deviations.
What to expect: Contained batch rows from the hypothetical world; rollback check should show zero persisted edges.
assume_program = '''
ASSUME {
MATCH (b:Batch {quality_state: 'DEVIATION'})-[:CANDIDATE_FOR]->(a:ActionPlan {action_type: 'deep_clean_hold'})
WHERE b.process_risk >= 0.55
CREATE (b)-[:CONTAINED_BY]->(a)
} THEN {
MATCH (b:Batch {quality_state: 'DEVIATION'})-[:CONTAINED_BY]->(a:ActionPlan)
RETURN b.batch_id AS batch_id, a.action_type AS action_type
}
'''
assume_out = db.locy_evaluate(assume_program)
assume_cmd = next(cmd for cmd in assume_out['command_results'] if cmd.get('type') == 'assume')
contained_rows = assume_cmd.get('rows', [])
contained_batch_ids = sorted({r['batch_id'] for r in contained_rows})
total_deviating_batches = sum(1 for r in focus_batches if r['quality_state'] == 'DEVIATION')
contained_deviations = len(contained_batch_ids)
residual_deviations = total_deviating_batches - contained_deviations
abduce_target_batch = contained_batch_ids[0] if contained_batch_ids else focus_plan_batch
print('Total deviation batches:', total_deviating_batches)
print('Contained deviation batches:', contained_deviations)
print('Residual deviation batches:', residual_deviations)
print('ABDUCE target batch:', abduce_target_batch)
print('\nContained sample:')
pprint(contained_rows[:10])
rollback_check = db.query("MATCH (:Batch)-[r:CONTAINED_BY]->(:ActionPlan) RETURN count(r) AS c")
print('\nRollback check (should be 0):', rollback_check[0]['c'])
Total deviation batches: 24
Contained deviation batches: 24
Residual deviation batches: 0
ABDUCE target batch: B17-0224
Contained sample:
[{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0224'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0370'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0374'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0375'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0378'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0488'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0491'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0580'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0581'},
{'action_type': 'deep_clean_hold', 'batch_id': 'B17-0583'}]
Rollback check (should be 0): 0
8) Minimal Change Search (ABDUCE)¶
What this does: Finds minimal graph/program changes that would remove a target batch from the deep-clean requirement.
What to expect: At least one validated candidate modification.
program_abduce = f'''
CREATE RULE needs_deep_clean AS
MATCH (b:Batch)-[:CANDIDATE_FOR]->(a:ActionPlan)
WHERE b.quality_state = 'DEVIATION', a.action_type = 'deep_clean_hold'
YIELD KEY b
ABDUCE NOT needs_deep_clean WHERE b.batch_id = '{abduce_target_batch}' RETURN b
'''
abduce_out = db.locy_evaluate(program_abduce, {'max_abduce_candidates': 120, 'max_abduce_results': 12, 'timeout': 60.0})
abduce_cmd = next(cmd for cmd in abduce_out['command_results'] if cmd.get('type') == 'abduce')
mods = abduce_cmd.get('modifications', [])
print('ABDUCE target batch:', abduce_target_batch)
print('Abduced modifications:', len(mods))
for i, item in enumerate(mods[:8], start=1):
print(f"\nCandidate #{i}")
pprint(item)
ABDUCE target batch: B17-0224
Abduced modifications: 1
Candidate #1
{'cost': 1.0,
'modification': {'edge_type': 'CANDIDATE_FOR',
'edge_var': '',
'match_properties': {},
'source_var': 'b',
'target_var': 'a',
'type': 'remove_edge'},
'validated': True}
9) What To Expect¶
- Propagation summary should show non-empty impacted-batch counts from
ALONGrecursion. - Best action rows should pick one action per batch using
BEST BY(risk first, cost second). ASSUMEshould contain at least one deviation batch.- Residual deviation count should be lower than total deviation count.
ABDUCE NOTshould return at least one validated candidate.EXPLAIN RULEshould return a derivation tree with children.
10) Build-Time Assertions¶
These assertions keep the notebook self-validating in CI/docs builds.
assert propagation_rows, 'Expected non-empty propagation rows'
assert best_plan_rows, 'Expected non-empty best action rows'
assert total_deviating_batches > 0, 'Expected deviation batches in focus cohort'
assert contained_deviations > 0, 'Expected ASSUME containment to affect at least one batch'
assert residual_deviations < total_deviating_batches, 'Expected residual deviations to decrease'
assert mods, 'Expected ABDUCE to produce modifications'
assert any(item.get('validated') for item in mods), 'Expected at least one validated ABDUCE candidate'
assert tree.get('children'), 'Expected EXPLAIN RULE tree to include child derivations'
print('Notebook assertions passed.')
Notebook assertions passed.
11) Cleanup¶
Deletes the temporary on-disk database for this notebook run.
shutil.rmtree(DB_DIR, ignore_errors=True)
print('Cleaned up', DB_DIR)
Cleaned up /tmp/uni_locy_pharma_5t6in59e