Locy Use Case: Compliance Remediation¶
Compute exposed + vulnerable services and emit prioritized remediation actions.
This notebook uses schema-first mode (recommended): labels, edge types, and typed properties are defined before ingest.
How To Read This Notebook¶
- Step 1 initializes an isolated local database.
- Step 2 defines schema (the recommended production path).
- Step 3 seeds a minimal graph for this use case.
- Step 4 declares Locy rules and query statements.
- Steps 5-6 evaluate and inspect command/query outputs.
- Step 7 tells you what to look for in the results.
1) Setup¶
Creates a temporary database directory so the example is reproducible and leaves no state behind.
import os
import shutil
import tempfile
from pprint import pprint
import uni_db
DB_DIR = tempfile.mkdtemp(prefix="uni_locy_")
print("DB_DIR:", DB_DIR)
db = uni_db.Uni.open(DB_DIR)
session = db.session()
DB_DIR: /tmp/uni_locy_0fej5s_j
2) Define Schema (Recommended)¶
Define labels, property types, and edge types before inserting data.
(
db.schema()
.label("Internet")
.property("name", "string")
.done()
.label("Service")
.property("name", "string")
.property("cve_score", "float64")
.done()
.edge_type("EXPOSES", ["Internet"], ["Service"])
.done()
.edge_type("DEPENDS_ON", ["Service"], ["Service"])
.done()
.apply()
)
print('Schema created')
Schema created
3) Seed Graph Data¶
Insert only the entities/relationships needed for this scenario so rule behavior stays easy to inspect.
tx = session.tx()
tx.execute("CREATE (:Internet {name: 'public'})")
tx.execute("CREATE (:Service {name: 'api', cve_score: 9.1})")
tx.execute("CREATE (:Service {name: 'worker', cve_score: 4.0})")
tx.execute("CREATE (:Service {name: 'db', cve_score: 8.4})")
tx.execute("MATCH (i:Internet {name:'public'}), (s:Service {name:'api'}) CREATE (i)-[:EXPOSES]->(s)")
tx.execute("MATCH (a:Service {name:'api'}), (w:Service {name:'worker'}) CREATE (a)-[:DEPENDS_ON]->(w)")
tx.execute("MATCH (w:Service {name:'worker'}), (d:Service {name:'db'}) CREATE (w)-[:DEPENDS_ON]->(d)")
tx.commit()
print('Seeded graph data')
Seeded graph data
4) Locy Program¶
CREATE RULE defines derived relations. QUERY ... WHERE ... RETURN ... reads from those relations.
program = r'''
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(b:Service)
YIELD KEY a, KEY b
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service)
WHERE mid IS depends_on TO b
YIELD KEY a, KEY b
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(s:Service)
YIELD KEY s
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(entry:Service)
WHERE entry IS depends_on TO s
YIELD KEY s
CREATE RULE vulnerable AS
MATCH (s:Service)
WHERE s.cve_score >= 7
YIELD KEY s
CREATE RULE non_compliant AS
MATCH (s:Service)
WHERE s IS vulnerable, s IS exposed
YIELD KEY s, 'patch-now' AS action
QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action
'''
print(program)
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(b:Service)
YIELD KEY a, KEY b
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service)
WHERE mid IS depends_on TO b
YIELD KEY a, KEY b
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(s:Service)
YIELD KEY s
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(entry:Service)
WHERE entry IS depends_on TO s
YIELD KEY s
CREATE RULE vulnerable AS
MATCH (s:Service)
WHERE s.cve_score >= 7
YIELD KEY s
CREATE RULE non_compliant AS
MATCH (s:Service)
WHERE s IS vulnerable, s IS exposed
YIELD KEY s, 'patch-now' AS action
QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action
5) Evaluate Locy Program¶
Run the program, then inspect materialization stats (iterations, strata, and executed queries).
out = session.locy(program)
print("Derived relations:", list(out.derived.keys()))
stats = out.stats
print("Iterations:", stats.total_iterations)
print("Strata:", stats.strata_evaluated)
print("Queries executed:", stats.queries_executed)
Derived relations: ['depends_on', 'exposed', 'vulnerable', 'non_compliant']
Iterations: 3
Strata: 4
Queries executed: 14
6) Inspect Command Results¶
Each command result can contain rows; this is the easiest way to verify your rule outputs and query projections.
print("Derived relation snapshots:")
for rel_name, rel_rows in out.derived.items():
print(f"\\n{rel_name}: {len(rel_rows)} row(s)")
pprint(rel_rows)
if out.command_results:
print("\\nCommand results:")
for i, cmd in enumerate(out.command_results, start=1):
print(f"\\nCommand #{i}:", cmd.command_type)
rows = getattr(cmd, 'rows', None)
if rows is not None:
pprint(rows)
if not out.command_results:
print("\\nNo QUERY/EXPLAIN/ABDUCE command outputs in this program.")
Derived relation snapshots:
\ndepends_on: 3 row(s)
[{'a': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'}),
'b': Node(id=2, labels=["Service"], properties={'name': 'worker', 'cve_score': 4.0})},
{'a': Node(id=2, labels=["Service"], properties={'cve_score': 4.0, 'name': 'worker'}),
'b': Node(id=3, labels=["Service"], properties={'cve_score': 8.4, 'name': 'db'})},
{'a': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'}),
'b': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nexposed: 3 row(s)
[{'s': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
{'s': Node(id=2, labels=["Service"], properties={'cve_score': 4.0, 'name': 'worker'})},
{'s': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nvulnerable: 2 row(s)
[{'s': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
{'s': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nnon_compliant: 6 row(s)
[{'action': None,
's': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'})},
{'action': None,
's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})},
{'action': None,
's': Node(id=1, labels=["Service"], properties={'name': 'api', 'cve_score': 9.1})},
{'action': None,
's': Node(id=1, labels=["Service"], properties={'cve_score': 9.1, 'name': 'api'})},
{'action': None,
's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})},
{'action': None,
's': Node(id=3, labels=["Service"], properties={'name': 'db', 'cve_score': 8.4})}]
\nCommand results:
\nCommand #1: query
[{'action': 'patch-now', 'service': 'api'},
{'action': 'patch-now', 'service': 'db'}]
7) What To Expect¶
Use these checks to validate output after evaluation:
- non_compliant should include api and db (reachable from internet and CVE >= 7).
- worker should not appear in remediation rows because its CVE score is below threshold.
- Queries executed should be 1 for this program.
8) Cleanup¶
Delete the temporary database directory created in setup.
Cleaned up /tmp/uni_locy_0fej5s_j