Skip to content

Locy Use Case: Compliance Remediation

Compute exposed + vulnerable services and emit prioritized remediation actions.

This notebook uses schema-first mode (recommended): labels, edge types, and typed properties are defined before ingest.

How To Read This Notebook

  • Step 1 initializes an isolated local database.
  • Step 2 defines schema (the recommended production path).
  • Step 3 seeds a minimal graph for this use case.
  • Step 4 declares Locy rules and query statements.
  • Steps 5-6 evaluate and inspect command/query outputs.
  • Step 7 tells you what to look for in the results.

1) Setup

Creates a temporary database directory so the example is reproducible and leaves no state behind.

import os
import shutil
import tempfile
from pprint import pprint

import uni_db

DB_DIR = tempfile.mkdtemp(prefix="uni_locy_")
print("DB_DIR:", DB_DIR)

db = uni_db.Uni.open(DB_DIR)
session = db.session()
DB_DIR: /tmp/uni_locy_0fej5s_j

Define labels, property types, and edge types before inserting data.

(
    db.schema()
    .label("Internet")
        .property("name", "string")
    .done()
    .label("Service")
        .property("name", "string")
        .property("cve_score", "float64")
    .done()
    .edge_type("EXPOSES", ["Internet"], ["Service"])
    .done()
    .edge_type("DEPENDS_ON", ["Service"], ["Service"])
    .done()
    .apply()
)

print('Schema created')
Schema created

3) Seed Graph Data

Insert only the entities/relationships needed for this scenario so rule behavior stays easy to inspect.

tx = session.tx()
tx.execute("CREATE (:Internet {name: 'public'})")
tx.execute("CREATE (:Service {name: 'api', cve_score: 9.1})")
tx.execute("CREATE (:Service {name: 'worker', cve_score: 4.0})")
tx.execute("CREATE (:Service {name: 'db', cve_score: 8.4})")
tx.execute("MATCH (i:Internet {name:'public'}), (s:Service {name:'api'}) CREATE (i)-[:EXPOSES]->(s)")
tx.execute("MATCH (a:Service {name:'api'}), (w:Service {name:'worker'}) CREATE (a)-[:DEPENDS_ON]->(w)")
tx.execute("MATCH (w:Service {name:'worker'}), (d:Service {name:'db'}) CREATE (w)-[:DEPENDS_ON]->(d)")
tx.commit()
print('Seeded graph data')
Seeded graph data

4) Locy Program

CREATE RULE defines derived relations. QUERY ... WHERE ... RETURN ... reads from those relations.

program = r'''
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(b:Service)
YIELD KEY a, KEY b

CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service)
WHERE mid IS depends_on TO b
YIELD KEY a, KEY b

CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(s:Service)
YIELD KEY s

CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(entry:Service)
WHERE entry IS depends_on TO s
YIELD KEY s

CREATE RULE vulnerable AS
MATCH (s:Service)
WHERE s.cve_score >= 7
YIELD KEY s

CREATE RULE non_compliant AS
MATCH (s:Service)
WHERE s IS vulnerable, s IS exposed
YIELD KEY s, 'patch-now' AS action

QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action
'''
print(program)
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(b:Service)
YIELD KEY a, KEY b

CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service)
WHERE mid IS depends_on TO b
YIELD KEY a, KEY b

CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(s:Service)
YIELD KEY s

CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(entry:Service)
WHERE entry IS depends_on TO s
YIELD KEY s

CREATE RULE vulnerable AS
MATCH (s:Service)
WHERE s.cve_score >= 7
YIELD KEY s

CREATE RULE non_compliant AS
MATCH (s:Service)
WHERE s IS vulnerable, s IS exposed
YIELD KEY s, 'patch-now' AS action

QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action

5) Evaluate Locy Program

Run the program, then inspect materialization stats (iterations, strata, and executed queries).

out = session.locy(program)

print("Derived relations:", list(out.derived.keys()))
stats = out.stats
print("Iterations:", stats.total_iterations)
print("Strata:", stats.strata_evaluated)
print("Queries executed:", stats.queries_executed)
Derived relations: ['depends_on', 'exposed', 'vulnerable', 'non_compliant']
Iterations: 3
Strata: 4
Queries executed: 14

6) Inspect Command Results

Each command result can contain rows; this is the easiest way to verify your rule outputs and query projections.

print("Derived relation snapshots:")
for rel_name, rel_rows in out.derived.items():
    print(f"\\n{rel_name}: {len(rel_rows)} row(s)")
    pprint(rel_rows)

if out.command_results:
    print("\\nCommand results:")
for i, cmd in enumerate(out.command_results, start=1):
    print(f"\\nCommand #{i}:", cmd.command_type)
    rows = getattr(cmd, 'rows', None)
    if rows is not None:
        pprint(rows)
if not out.command_results:
    print("\\nNo QUERY/EXPLAIN/ABDUCE command outputs in this program.")
Derived relation snapshots:
\ndepends_on: 3 row(s)
[{'a': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'}),
  'b': Node(id=2, labels=["Service"], properties={'name': 'worker', 'cve_score': 4.0})},
 {'a': Node(id=2, labels=["Service"], properties={'cve_score': 4.0, 'name': 'worker'}),
  'b': Node(id=3, labels=["Service"], properties={'cve_score': 8.4, 'name': 'db'})},
 {'a': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'}),
  'b': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nexposed: 3 row(s)
[{'s': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
 {'s': Node(id=2, labels=["Service"], properties={'cve_score': 4.0, 'name': 'worker'})},
 {'s': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nvulnerable: 2 row(s)
[{'s': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
 {'s': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nnon_compliant: 6 row(s)
[{'action': None,
  's': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'})},
 {'action': None,
  's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})},
 {'action': None,
  's': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
 {'action': None,
  's': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'})},
 {'action': None,
  's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})},
 {'action': None,
  's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nCommand results:
\nCommand #1: query
[{'action': 'patch-now', 'service': 'api'},
 {'action': 'patch-now', 'service': 'db'}]

7) What To Expect

Use these checks to validate output after evaluation: - non_compliant should include api and db (reachable from internet and CVE >= 7). - worker should not appear in remediation rows because its CVE score is below threshold. - Queries executed should be 1 for this program.

8) Cleanup

Delete the temporary database directory created in setup.

shutil.rmtree(DB_DIR, ignore_errors=True)
print("Cleaned up", DB_DIR)
Cleaned up /tmp/uni_locy_0fej5s_j