Skip to content

Locy Case Study: Patent Freedom-to-Operate Analysis

This case study demonstrates probabilistic graph reasoning for patent FTO (freedom-to-operate) analysis — determining whether a product infringes existing patents before market launch.

Key Locy features demonstrated: - FOLD MPROD — all-elements claim infringement (conjunction: ALL elements must read on product) - FOLD MNOR — multi-claim patent risk (disjunction: ANY independent claim infringed) - Recursive IS — dependent claims inherit parent claim elements - similar_to — prior art search via semantic embedding similarity - IS NOT — identify claims with no strong prior art (hardest to invalidate) - ASSUME DELETE — design-around simulation (remove feature-element mappings) - ABDUCE — minimal design changes to eliminate infringement - EXPLAIN RULE — computational claim chart generation

How To Read This Notebook

  • Each code cell is preceded by a markdown cell explaining what it does and what to expect.
  • The dataset covers patents, claims, claim elements, products, features, and prior art publications.
  • Commands are grouped: facts -> inference -> explanation -> counterfactual -> abduction.
  • MPROD models conjunction (ALL elements must match); MNOR models disjunction (ANY claim suffices).
  • Recursive claim_elements propagates parent claim elements down to dependent claims.

1) Setup & Data Discovery

What this does: Initialize helper utilities, locate prepared data files, and create an isolated temporary database.

What to expect: Printed DATA_DIR and DB_DIR paths.

from pathlib import Path
from pprint import pprint
import csv
import json
import shutil
import tempfile
import os

import uni_db

def _read_csv(path: Path) -> list[dict[str, str]]:
    with path.open('r', encoding='utf-8', newline='') as f:
        return list(csv.DictReader(f))

def _esc(value: str) -> str:
    return str(value).replace('\\', '\\\\').replace("'", "\\'")

def _f(value: str) -> float:
    return float(value) if value not in ('', None) else 0.0

def _vec(value: str) -> list[float]:
    return [float(x) for x in json.loads(value)]

def _norm_key(key: object) -> str:
    s = str(key)
    if s.startswith('Variable("') and s.endswith('")'):
        return s[len('Variable("'):-2]
    return s

def _norm_rows(rows: list[dict[object, object]]) -> list[dict[str, object]]:
    return [{_norm_key(k): v for k, v in row.items()} for row in rows]

def _print_tree(node, depth=0, max_depth=4, max_children=4):
    indent = '  ' * depth
    print(f"{indent}- rule={node.get('rule')}, clause={node.get('clause_index')}, bindings={node.get('bindings', {})}")
    if depth >= max_depth:
        return
    for child in node.get('children', [])[:max_children]:
        _print_tree(child, depth + 1, max_depth=max_depth, max_children=max_children)

_default_candidates = [
    Path('docs/examples/data/locy_patent_fto'),
    Path('website/docs/examples/data/locy_patent_fto'),
    Path('examples/data/locy_patent_fto'),
    Path('../data/locy_patent_fto'),
]
if 'LOCY_DATA_DIR' in os.environ:
    DATA_DIR = Path(os.environ['LOCY_DATA_DIR']).resolve()
else:
    DATA_DIR = next(
        (p.resolve() for p in _default_candidates if (p / 'patents.csv').exists()),
        _default_candidates[0].resolve(),
    )
if not (DATA_DIR / 'patents.csv').exists():
    raise FileNotFoundError(
        'Expected dataset under docs/examples/data/locy_patent_fto. '
        'Run from website/ (or repo root) or set LOCY_DATA_DIR to the dataset path.'
    )
DB_DIR = tempfile.mkdtemp(prefix='uni_locy_patent_')
db = uni_db.Uni.open(DB_DIR)
session = db.session()

print('DATA_DIR:', DATA_DIR)
print('DB_DIR:', DB_DIR)
DATA_DIR: /home/runner/work/uni-db/uni-db/website/docs/examples/data/locy_patent_fto
DB_DIR: /tmp/uni_locy_patent_f0c94wr5

2) Load Data & Build Focus Cohort

What this does: Loads all 13 CSVs and identifies the focus product from notebook_cases.csv.

What to expect: Counts for focus products, patents, claims, claim elements, features, reads_on mappings, and publications.

patents = _read_csv(DATA_DIR / 'patents.csv')
claims = _read_csv(DATA_DIR / 'claims.csv')
claim_elements = _read_csv(DATA_DIR / 'claim_elements.csv')
products = _read_csv(DATA_DIR / 'products.csv')
features = _read_csv(DATA_DIR / 'features.csv')
publications = _read_csv(DATA_DIR / 'publications.csv')
has_claim = _read_csv(DATA_DIR / 'has_claim.csv')
depends_on = _read_csv(DATA_DIR / 'depends_on.csv')
has_element = _read_csv(DATA_DIR / 'has_element.csv')
has_feature = _read_csv(DATA_DIR / 'has_feature.csv')
reads_on = _read_csv(DATA_DIR / 'reads_on.csv')
prior_art_for = _read_csv(DATA_DIR / 'prior_art_for.csv')
notebook_cases = _read_csv(DATA_DIR / 'notebook_cases.csv')

focus_product_ids = {r['product_id'] for r in notebook_cases}
focus_products = [r for r in products if r['product_id'] in focus_product_ids]
focus_feature_ids = {r['feature_id'] for r in has_feature if r['product_id'] in focus_product_ids}
focus_features = [r for r in features if r['feature_id'] in focus_feature_ids]
focus_reads_on = [r for r in reads_on if r['feature_id'] in focus_feature_ids]

print('focus products:', len(focus_products))
print('patents:', len(patents))
print('claims:', len(claims))
print('claim elements:', len(claim_elements))
print('focus features:', len(focus_features))
print('focus reads_on mappings:', len(focus_reads_on))
print('publications:', len(publications))
focus products: 1
patents: 8
claims: 18
claim elements: 48
focus features: 6
focus reads_on mappings: 18
publications: 10

3) Define Schema

What this does: Creates explicit labels, typed properties, vector dimensions, and edge types before ingest.

What to expect: A single Schema created confirmation.

(
    db.schema()
    .label('Patent')
        .property('patent_id', 'string')
        .property('title', 'string')
        .property('assignee', 'string')
        .property('priority_date', 'string')
        .property('status', 'string')
        .property('jurisdiction', 'string')
    .done()
    .label('Claim')
        .property('claim_id', 'string')
        .property('claim_type', 'string')
        .property('claim_text', 'string')
        .property('parent_claim_id', 'string')
        .vector('embedding', 4)
    .done()
    .label('ClaimElement')
        .property('element_id', 'string')
        .property('element_text', 'string')
    .done()
    .label('Product')
        .property('product_id', 'string')
        .property('name', 'string')
        .property('description', 'string')
    .done()
    .label('Feature')
        .property('feature_id', 'string')
        .property('name', 'string')
        .property('description', 'string')
    .done()
    .label('Publication')
        .property('pub_id', 'string')
        .property('title', 'string')
        .property('pub_date', 'string')
        .vector('embedding', 4)
    .done()
    .edge_type('HAS_CLAIM', ['Patent'], ['Claim']).done()
    .edge_type('DEPENDS_ON', ['Claim'], ['Claim']).done()
    .edge_type('HAS_ELEMENT', ['Claim'], ['ClaimElement']).done()
    .edge_type('HAS_FEATURE', ['Product'], ['Feature']).done()
    .edge_type('READS_ON', ['Feature'], ['ClaimElement'])
        .property('confidence', 'float64')
    .done()
    .edge_type('PRIOR_ART_FOR', ['Publication'], ['Patent'])
        .property('relevance', 'float64')
    .done()
    .apply()
)
print('Schema created')
Schema created

4) Ingest Graph Facts

What this does: Creates all nodes (Patents, Claims, ClaimElements, Products, Features, Publications) and all edges (HAS_CLAIM, DEPENDS_ON, HAS_ELEMENT, HAS_FEATURE, READS_ON, PRIOR_ART_FOR).

What to expect: Node and edge counts confirming the full graph was loaded.

tx = session.tx()

# --- Nodes ---
for row in patents:
    tx.execute(
        f"CREATE (:Patent {{patent_id: '{_esc(row['patent_id'])}', title: '{_esc(row['title'])}', "
        f"assignee: '{_esc(row['assignee'])}', priority_date: '{_esc(row['priority_date'])}', "
        f"status: '{_esc(row['status'])}', jurisdiction: '{_esc(row['jurisdiction'])}'}})"
    )

for row in claims:
    emb = _vec(row['embedding'])
    tx.execute(
        f"CREATE (:Claim {{claim_id: '{_esc(row['claim_id'])}', claim_type: '{_esc(row['claim_type'])}', "
        f"claim_text: '{_esc(row['claim_text'])}', parent_claim_id: '{_esc(row['parent_claim_id'])}', "
        f"embedding: {emb}}})"
    )

for row in claim_elements:
    tx.execute(
        f"CREATE (:ClaimElement {{element_id: '{_esc(row['element_id'])}', "
        f"element_text: '{_esc(row['element_text'])}'}})"
    )

for row in products:
    tx.execute(
        f"CREATE (:Product {{product_id: '{_esc(row['product_id'])}', name: '{_esc(row['name'])}', "
        f"description: '{_esc(row['description'])}'}})"
    )

for row in features:
    tx.execute(
        f"CREATE (:Feature {{feature_id: '{_esc(row['feature_id'])}', name: '{_esc(row['name'])}', "
        f"description: '{_esc(row['description'])}'}})"
    )

for row in publications:
    emb = _vec(row['embedding'])
    tx.execute(
        f"CREATE (:Publication {{pub_id: '{_esc(row['pub_id'])}', title: '{_esc(row['title'])}', "
        f"pub_date: '{_esc(row['pub_date'])}', embedding: {emb}}})"
    )

# --- Edges ---
for row in has_claim:
    tx.execute(
        f"MATCH (p:Patent {{patent_id: '{_esc(row['patent_id'])}'}}), "
        f"(c:Claim {{claim_id: '{_esc(row['claim_id'])}'}}) "
        "CREATE (p)-[:HAS_CLAIM]->(c)"
    )

for row in depends_on:
    tx.execute(
        f"MATCH (c:Claim {{claim_id: '{_esc(row['claim_id'])}'}}), "
        f"(parent:Claim {{claim_id: '{_esc(row['parent_claim_id'])}'}}) "
        "CREATE (c)-[:DEPENDS_ON]->(parent)"
    )

for row in has_element:
    tx.execute(
        f"MATCH (c:Claim {{claim_id: '{_esc(row['claim_id'])}'}}), "
        f"(ce:ClaimElement {{element_id: '{_esc(row['element_id'])}'}}) "
        "CREATE (c)-[:HAS_ELEMENT]->(ce)"
    )

for row in has_feature:
    tx.execute(
        f"MATCH (p:Product {{product_id: '{_esc(row['product_id'])}'}}), "
        f"(f:Feature {{feature_id: '{_esc(row['feature_id'])}'}}) "
        "CREATE (p)-[:HAS_FEATURE]->(f)"
    )

for row in reads_on:
    conf = _f(row['confidence'])
    tx.execute(
        f"MATCH (f:Feature {{feature_id: '{_esc(row['feature_id'])}'}}), "
        f"(ce:ClaimElement {{element_id: '{_esc(row['element_id'])}'}}) "
        f"CREATE (f)-[:READS_ON {{confidence: {conf}}}]->(ce)"
    )

for row in prior_art_for:
    rel = _f(row['relevance'])
    tx.execute(
        f"MATCH (pub:Publication {{pub_id: '{_esc(row['pub_id'])}'}}), "
        f"(pat:Patent {{patent_id: '{_esc(row['patent_id'])}'}}) "
        f"CREATE (pub)-[:PRIOR_ART_FOR {{relevance: {rel}}}]->(pat)"
    )

tx.commit()

# --- Verify counts ---
counts = session.query("""
MATCH (pat:Patent)
WITH count(pat) AS patents
MATCH (c:Claim)
WITH patents, count(c) AS claims
MATCH (ce:ClaimElement)
WITH patents, claims, count(ce) AS elements
MATCH (p:Product)
WITH patents, claims, elements, count(p) AS products
MATCH (f:Feature)
WITH patents, claims, elements, products, count(f) AS features
MATCH (pub:Publication)
RETURN patents, claims, elements, products, features, count(pub) AS publications
""")
print('Graph node counts:')
pprint(counts[0])

edge_counts = session.query("""
MATCH ()-[r:READS_ON]->()
RETURN count(r) AS reads_on_edges
""")
print('\nREADS_ON edges:', edge_counts[0]['reads_on_edges'])
Graph node counts:
Row(patents=..., claims=..., elements=..., products=..., features=..., publications=...)

READS_ON edges: 26

5) Baseline Locy Program

What this does: Defines 7 rules that model the patent FTO reasoning pipeline: 1. element_mapped — base feature-to-claim-element mapping with confidence scores 2. claim_elements — direct + recursive (dependent claims inherit parent elements) 3. claim_infringed — MPROD conjunction: ALL elements must be mapped for claim infringement 4. patent_risk — MNOR disjunction: ANY independent claim infringed creates patent risk 5. prior_art_match — semantic similarity between claims and publications 6. has_prior_art / no_prior_art — claims with/without strong prior art

What to expect: - claim_infringed rows with MPROD scores in [0, 1] - patent_risk rows with MNOR scores in [0, 1] - prior_art_match rows with similarity scores - no_prior_art rows listing claims hardest to invalidate

program = r'''
CREATE RULE element_mapped AS
  MATCH (p:Product)-[:HAS_FEATURE]->(f:Feature)-[r:READS_ON]->(ce:ClaimElement)
  YIELD KEY p, KEY ce, r.confidence AS mapping_conf

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:HAS_ELEMENT]->(ce:ClaimElement)
  YIELD KEY c, KEY ce

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:DEPENDS_ON]->(parent:Claim)
  WHERE parent IS claim_elements TO ce
  YIELD KEY c, KEY ce

CREATE RULE claim_size AS
  MATCH (c:Claim)
  WHERE c IS claim_elements TO ce
  FOLD n_total = COUNT(ce)
  YIELD KEY c, n_total

CREATE RULE pc_mapped AS
  MATCH (p:Product), (c:Claim)
  WHERE c IS claim_elements TO ce, p IS element_mapped TO ce
  FOLD n_mapped = COUNT(ce), infringement = MPROD(mapping_conf)
  YIELD KEY p, KEY c, n_mapped, infringement

CREATE RULE claim_infringed AS
  MATCH (p:Product), (c:Claim)
  WHERE (p, c) IS pc_mapped, c IS claim_size TO n_total, n_mapped = n_total
  YIELD KEY p, KEY c, infringement

CREATE RULE patent_risk AS
  MATCH (p:Product), (pat:Patent)-[:HAS_CLAIM]->(c:Claim)
  WHERE c.claim_type = 'independent', p IS claim_infringed TO c
  FOLD risk = MNOR(infringement)
  YIELD KEY p, KEY pat, risk

CREATE RULE prior_art_match AS
  MATCH (c:Claim), (pub:Publication)
  YIELD KEY c, KEY pub, similar_to(c.embedding, pub.embedding) AS relevance

CREATE RULE has_prior_art AS
  MATCH (c:Claim)
  WHERE c IS prior_art_match TO pub, relevance >= 0.7
  YIELD KEY c

CREATE RULE no_prior_art AS
  MATCH (c:Claim)
  WHERE c IS NOT has_prior_art
  YIELD KEY c

QUERY claim_infringed WHERE p = p RETURN p.name AS product, c.claim_id AS claim, infringement ORDER BY infringement DESC
QUERY patent_risk WHERE p = p RETURN p.name AS product, pat.title AS patent, risk ORDER BY risk DESC
QUERY prior_art_match WHERE relevance >= 0.5 RETURN c.claim_id AS claim, pub.title AS publication, relevance ORDER BY relevance DESC LIMIT 10
QUERY no_prior_art WHERE c = c RETURN c.claim_id AS claim, c.claim_text AS text
'''

baseline_out = session.locy_with(program).with_config({'max_iterations': 400, 'timeout_secs': 180.0}).run()
stats = baseline_out.stats
print('Iterations:', stats.total_iterations)
print('Strata:', stats.strata_evaluated)
print('Queries executed:', stats.queries_executed)

infringement_rows = []
risk_rows = []
prior_art_rows = []
no_prior_art_rows = []
for i, cmd in enumerate(baseline_out.command_results, start=1):
    print(f'\nCommand #{i}:', cmd.command_type)
    rows = _norm_rows(cmd.rows)
    print('rows:', len(rows))
    pprint(rows[:5])
    if rows and 'infringement' in rows[0]:
        infringement_rows = rows
    elif rows and 'risk' in rows[0]:
        risk_rows = rows
    elif rows and 'relevance' in rows[0]:
        prior_art_rows = rows
    elif rows and 'text' in rows[0]:
        no_prior_art_rows = rows

# Verify MPROD scores
for row in infringement_rows:
    score = float(row['infringement'])
    assert 0.0 <= score <= 1.0, f"MPROD score out of range: {score}"
print(f'\nAll {len(infringement_rows)} claim infringement scores in [0, 1]')
Iterations: 11
Strata: 9
Queries executed: 10

Command #1: query
rows: 6
[{'claim': 'Pat1-C1', 'infringement': 0.7866, 'product': 'SensorHub Pro'},
 {'claim': 'Pat1-C3',
  'infringement': 0.6966960000000001,
  'product': 'SensorHub Pro'},
 {'claim': 'Pat1-C4',
  'infringement': 0.4129649999999999,
  'product': 'SensorHub Pro'},
 {'claim': 'Pat6-C1',
  'infringement': 0.36585120000000004,
  'product': 'SensorHub Pro'},
 {'claim': 'Pat6-C2',
  'infringement': 0.24877881600000004,
  'product': 'SensorHub Pro'}]

Command #2: query
rows: 3
[{'patent': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power '
            'Operation',
  'product': 'SensorHub Pro',
  'risk': 0.9352749264},
 {'patent': 'Enhanced IoT Data Aggregation with Machine Learning Inference at '
            'the Edge',
  'product': 'SensorHub Pro',
  'risk': 0.36585120000000004},
 {'patent': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power '
            'Operation',
  'product': 'MeshBridge',
  'risk': 0.22312500000000002}]

Command #3: query
rows: 10
[{'claim': 'Pat7-C1',
  'publication': 'Kalman Filter Approaches for Multi-Sensor Fusion in IoT '
                 'Systems',
  'relevance': 0.9999078738474938},
 {'claim': 'Pat4-C1',
  'publication': 'BLE Mesh Networking for Industrial IoT: A Survey',
  'relevance': 0.9999019249499046},
 {'claim': 'Pat5-C1',
  'publication': 'Kalman Filter Approaches for Multi-Sensor Fusion in IoT '
                 'Systems',
  'relevance': 0.9998418302861133},
 {'claim': 'Pat3-C1',
  'publication': 'BLE Mesh Networking for Industrial IoT: A Survey',
  'relevance': 0.999832012458472},
 {'claim': 'Pat1-C1',
  'publication': 'BLE Mesh Networking for Industrial IoT: A Survey',
  'relevance': 0.9997095010522546}]

Command #4: query
rows: 0
[]

All 6 claim infringement scores in [0, 1]

6) EXPLAIN RULE — Claim Chart Generation

What this does: Generates a derivation tree showing why the focus product has patent risk against a specific patent. This is equivalent to a computational claim chart.

What to expect: A tree-like printout with rule names, clause indices, and variable bindings tracing the proof path.

# Pick the focus product and highest-risk patent from baseline results
focus_product_name = focus_products[0]['name']
focus_patent_id = patents[0]['patent_id']

explain_program = f'''
CREATE RULE element_mapped AS
  MATCH (p:Product)-[:HAS_FEATURE]->(f:Feature)-[r:READS_ON]->(ce:ClaimElement)
  YIELD KEY p, KEY ce, r.confidence AS mapping_conf

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:HAS_ELEMENT]->(ce:ClaimElement)
  YIELD KEY c, KEY ce

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:DEPENDS_ON]->(parent:Claim)
  WHERE parent IS claim_elements TO ce
  YIELD KEY c, KEY ce

CREATE RULE claim_size AS
  MATCH (c:Claim)
  WHERE c IS claim_elements TO ce
  FOLD n_total = COUNT(ce)
  YIELD KEY c, n_total

CREATE RULE pc_mapped AS
  MATCH (p:Product), (c:Claim)
  WHERE c IS claim_elements TO ce, p IS element_mapped TO ce
  FOLD n_mapped = COUNT(ce), infringement = MPROD(mapping_conf)
  YIELD KEY p, KEY c, n_mapped, infringement

CREATE RULE claim_infringed AS
  MATCH (p:Product), (c:Claim)
  WHERE (p, c) IS pc_mapped, c IS claim_size TO n_total, n_mapped = n_total
  YIELD KEY p, KEY c, infringement

CREATE RULE patent_risk AS
  MATCH (p:Product), (pat:Patent)-[:HAS_CLAIM]->(c:Claim)
  WHERE c.claim_type = \'independent\', p IS claim_infringed TO c
  FOLD risk = MNOR(infringement)
  YIELD KEY p, KEY pat, risk

EXPLAIN RULE patent_risk WHERE p.name = \'{_esc(focus_product_name)}\' AND pat.patent_id = \'{_esc(focus_patent_id)}\'
'''

explain_out = session.locy_with(explain_program).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
explain_cmd = next(cmd for cmd in explain_out.command_results if cmd.command_type == 'explain')
tree = explain_cmd.tree

print(f'Explain tree for {focus_product_name} vs {focus_patent_id}:')
_print_tree(tree)
Explain tree for SensorHub Pro vs US-11234567:
- rule=patent_risk, clause=0, bindings={}
  - rule=patent_risk, clause=0, bindings={'pat': Node(id=0, labels=["Patent"], properties={'assignee': 'SensorTech Corp', 'jurisdiction': 'US', 'priority_date': '2021-06-15', 'status': 'active', 'title': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power Operation', 'patent_id': 'US-11234567'}), 'p': Node(id=74, labels=["Product"], properties={'name': 'SensorHub Pro', 'product_id': 'PROD-001', 'description': 'Full-featured IoT gateway with BLE mesh networking, sensor fusion, data aggregation, and edge ML inference capabilities for industrial wireless sensor deployments.'}), 'risk': 0.6966960000000001}
  - rule=patent_risk, clause=0, bindings={'p': Node(id=74, labels=["Product"], properties={'product_id': 'PROD-001', 'description': 'Full-featured IoT gateway with BLE mesh networking, sensor fusion, data aggregation, and edge ML inference capabilities for industrial wireless sensor deployments.', 'name': 'SensorHub Pro'}), 'pat': Node(id=0, labels=["Patent"], properties={'status': 'active', 'priority_date': '2021-06-15', 'jurisdiction': 'US', 'title': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power Operation', 'patent_id': 'US-11234567', 'assignee': 'SensorTech Corp'}), 'risk': 0.7866}

7) ASSUME DELETE — Design-Around Simulation

What this does: Simulates a design-around scenario: "What if we redesign the BLE Radio Module so it no longer reads on the wireless transceiver element (CE-Pat1-C1-01)?" Uses ASSUME { DELETE } to temporarily remove the READS_ON edge, then re-evaluates patent risk.

What to expect: - Patent risk scores that may decrease (or disappear) after the design change. - The original edge still exists after evaluation (ASSUME is hypothetical, rollback is automatic).

assume_program = r'''
CREATE RULE element_mapped AS
  MATCH (p:Product)-[:HAS_FEATURE]->(f:Feature)-[r:READS_ON]->(ce:ClaimElement)
  YIELD KEY p, KEY ce, r.confidence AS mapping_conf

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:HAS_ELEMENT]->(ce:ClaimElement)
  YIELD KEY c, KEY ce

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:DEPENDS_ON]->(parent:Claim)
  WHERE parent IS claim_elements TO ce
  YIELD KEY c, KEY ce

CREATE RULE claim_size AS
  MATCH (c:Claim)
  WHERE c IS claim_elements TO ce
  FOLD n_total = COUNT(ce)
  YIELD KEY c, n_total

CREATE RULE pc_mapped AS
  MATCH (p:Product), (c:Claim)
  WHERE c IS claim_elements TO ce, p IS element_mapped TO ce
  FOLD n_mapped = COUNT(ce), infringement = MPROD(mapping_conf)
  YIELD KEY p, KEY c, n_mapped, infringement

CREATE RULE claim_infringed AS
  MATCH (p:Product), (c:Claim)
  WHERE (p, c) IS pc_mapped, c IS claim_size TO n_total, n_mapped = n_total
  YIELD KEY p, KEY c, infringement

CREATE RULE patent_risk AS
  MATCH (p:Product), (pat:Patent)-[:HAS_CLAIM]->(c:Claim)
  WHERE c.claim_type = 'independent', p IS claim_infringed TO c
  FOLD risk = MNOR(infringement)
  YIELD KEY p, KEY pat, risk

ASSUME {
  MATCH (f:Feature {name: 'BLE Radio Module'})-[r:READS_ON]->(ce:ClaimElement {element_id: 'CE-Pat1-C1-01'})
  DELETE r
} THEN {
  QUERY patent_risk WHERE p.name = 'SensorHub Pro' RETURN pat.title AS patent, risk ORDER BY risk DESC
}
'''

assume_out = session.locy_with(assume_program).with_config({'max_iterations': 200, 'timeout_secs': 60.0}).run()
assume_cmd = next(cmd for cmd in assume_out.command_results if cmd.command_type == 'assume')
assume_rows = assume_cmd.rows
print('Patent risk after design-around (BLE Radio Module redesign):')
pprint(_norm_rows(assume_rows))

# Verify rollback — the edge should still exist in the real graph
rollback = session.query(
    "MATCH (f:Feature {name: 'BLE Radio Module'})-[r:READS_ON]->(ce:ClaimElement {element_id: 'CE-Pat1-C1-01'}) "
    "RETURN count(r) AS c"
)
print('Rollback check (should be 1 -- edge still exists):', rollback[0]['c'])
Patent risk after design-around (BLE Radio Module redesign):
[{'patent': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power '
            'Operation',
  'risk': 0.6966960000000001},
 {'patent': 'Enhanced IoT Data Aggregation with Machine Learning Inference at '
            'the Edge',
  'risk': 0.36585120000000004},
 {'patent': 'Wireless Sensor Network with Adaptive Mesh Routing and Low-Power '
            'Operation',
  'risk': 0.22312500000000002}]
Rollback check (should be 1 -- edge still exists): 1

8) ABDUCE — Minimal Design Changes to Eliminate Infringement

What this does: Asks: "What minimum feature redesigns eliminate infringement on the focus patent?" ABDUCE searches for the smallest set of fact removals (READS_ON edges) that would make patent_risk no longer hold for the focus product and patent.

What to expect: A list of candidate modifications (edge removals) that break the MPROD chain for at least one independent claim, thereby eliminating patent risk.

program_abduce = f'''
CREATE RULE element_mapped AS
  MATCH (p:Product)-[:HAS_FEATURE]->(f:Feature)-[r:READS_ON]->(ce:ClaimElement)
  YIELD KEY p, KEY ce, r.confidence AS mapping_conf

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:HAS_ELEMENT]->(ce:ClaimElement)
  YIELD KEY c, KEY ce

CREATE RULE claim_elements AS
  MATCH (c:Claim)-[:DEPENDS_ON]->(parent:Claim)
  WHERE parent IS claim_elements TO ce
  YIELD KEY c, KEY ce

CREATE RULE claim_size AS
  MATCH (c:Claim)
  WHERE c IS claim_elements TO ce
  FOLD n_total = COUNT(ce)
  YIELD KEY c, n_total

CREATE RULE pc_mapped AS
  MATCH (p:Product), (c:Claim)
  WHERE c IS claim_elements TO ce, p IS element_mapped TO ce
  FOLD n_mapped = COUNT(ce), infringement = MPROD(mapping_conf)
  YIELD KEY p, KEY c, n_mapped, infringement

CREATE RULE claim_infringed AS
  MATCH (p:Product), (c:Claim)
  WHERE (p, c) IS pc_mapped, c IS claim_size TO n_total, n_mapped = n_total
  YIELD KEY p, KEY c, infringement

CREATE RULE patent_risk AS
  MATCH (p:Product), (pat:Patent)-[:HAS_CLAIM]->(c:Claim)
  WHERE c.claim_type = \'independent\', p IS claim_infringed TO c
  FOLD risk = MNOR(infringement)
  YIELD KEY p, KEY pat, risk

ABDUCE NOT patent_risk WHERE p.name = \'{_esc(focus_product_name)}\' AND pat.patent_id = \'{_esc(focus_patent_id)}\'
'''

abduce_out = session.locy_with(program_abduce).with_config({'max_abduce_candidates': 120, 'max_abduce_results': 12, 'timeout_secs': 180.0}).run()
abduce_cmd = next(cmd for cmd in abduce_out.command_results if cmd.command_type == 'abduce')
mods = abduce_cmd.modifications
print(f'Minimum design changes to clear {focus_patent_id}:')
for i, item in enumerate(mods[:8], start=1):
    print(f'\nCandidate #{i}')
    pprint(item)
Minimum design changes to clear US-11234567:

Candidate #1
{'cost': 1.0,
 'modification': {'edge_type': 'HAS_CLAIM',
                  'edge_var': '',
                  'match_properties': {},
                  'source_var': 'pat',
                  'target_var': 'c',
                  'type': 'remove_edge'},
 'validated': True}

Candidate #2
{'cost': 1.0,
 'modification': {'edge_type': 'HAS_CLAIM',
                  'edge_var': '',
                  'match_properties': {},
                  'source_var': 'pat',
                  'target_var': 'c',
                  'type': 'remove_edge'},
 'validated': True}

Candidate #3
{'cost': 0.5,
 'modification': {'element_var': 'risk',
                  'new_value': 0.0,
                  'old_value': 0.696696,
                  'property': 'risk',
                  'type': 'change_property'},
 'validated': False}

Candidate #4
{'cost': 0.5,
 'modification': {'element_var': 'risk',
                  'new_value': 0.0,
                  'old_value': 0.7866,
                  'property': 'risk',
                  'type': 'change_property'},
 'validated': False}

9) What To Expect

  • MPROD scores: Each claim's infringement probability is the product of all element mapping confidences. Missing ANY element drops the score significantly (all-elements conjunction rule).
  • MNOR scores: Patent risk aggregates across independent claims. Even if one claim has low infringement, other claims may create risk (any-claim disjunction).
  • Recursive claim_elements: Dependent claims inherit all parent elements, making them harder to infringe (more elements required in the MPROD product).
  • Design-around (ASSUME DELETE): Removing one feature-element mapping can break the MPROD chain for a claim, potentially zeroing out patent risk for that claim.
  • ABDUCE: Returns the minimum set of READS_ON edge removals that break all independent claim infringement, guiding engineering design-around decisions.

10) Build-Time Assertions

What this does: Self-validates the notebook outputs so CI/docs builds catch regressions.

What to expect: All assertions pass with a confirmation message.

assert infringement_rows, 'Expected non-empty claim infringement rows'
assert risk_rows, 'Expected non-empty patent risk rows'
assert all(0.0 <= float(r['infringement']) <= 1.0 for r in infringement_rows), 'MPROD scores must be in [0,1]'
assert all(0.0 <= float(r['risk']) <= 1.0 for r in risk_rows), 'MNOR scores must be in [0,1]'
assert prior_art_rows, 'Expected non-empty prior art matches'
assert tree, 'Expected EXPLAIN RULE tree'
assert tree.get('children') or tree.get('rule'), 'Expected derivation tree structure'
if not assume_rows:
    print('Note: ASSUME returned no results (hypothesis may eliminate all matching facts)')
else:
    print(f'ASSUME returned {len(assume_rows)} rows')
print('Rollback check:', rollback[0]['c'])
if not mods:
    print('Note: ABDUCE returned no modifications (may need higher timeout or different target)')
else:
    print(f'ABDUCE found {len(mods)} modifications')
print('All notebook assertions passed.')
ASSUME returned 3 rows
Rollback check: 1
ABDUCE found 4 modifications
All notebook assertions passed.

11) Cleanup

What this does: Deletes the temporary on-disk database used for this notebook run.

What to expect: Confirmation that the temporary directory was removed.

shutil.rmtree(DB_DIR, ignore_errors=True)
print('Cleaned up', DB_DIR)
Cleaned up /tmp/uni_locy_patent_f0c94wr5