Locy Use Case: Compliance Remediation¶
Compute exposed + vulnerable services and emit prioritized remediation actions.
This notebook uses schema-first mode (recommended): labels, edge types, and typed properties are defined before ingest.
How To Read This Notebook¶
- Step 1 initializes an isolated local database.
- Step 2 defines schema (the recommended production path).
- Step 3 seeds a minimal graph for this use case.
- Step 4 declares Locy rules and query statements.
- Steps 5-6 evaluate and inspect command/query outputs.
- Step 7 tells you what to look for in the results.
1) Setup¶
Creates a temporary database directory so the example is reproducible and leaves no state behind.
import os
import shutil
import tempfile
from pprint import pprint
import uni_db
DB_DIR = tempfile.mkdtemp(prefix="uni_locy_")
print("DB_DIR:", DB_DIR)
db = uni_db.Database(DB_DIR)
DB_DIR: /tmp/uni_locy_vyp5kdh9
2) Define Schema (Recommended)¶
Define labels, property types, and edge types before inserting data.
(
db.schema()
.label("Internet")
.property("name", "string")
.done()
.label("Service")
.property("name", "string")
.property("cve_score", "float64")
.done()
.edge_type("EXPOSES", ["Internet"], ["Service"])
.done()
.edge_type("DEPENDS_ON", ["Service"], ["Service"])
.done()
.apply()
)
print('Schema created')
Schema created
3) Seed Graph Data¶
Insert only the entities/relationships needed for this scenario so rule behavior stays easy to inspect.
db.execute("CREATE (:Internet {name: 'public'})")
db.execute("CREATE (:Service {name: 'api', cve_score: 9.1})")
db.execute("CREATE (:Service {name: 'worker', cve_score: 4.0})")
db.execute("CREATE (:Service {name: 'db', cve_score: 8.4})")
db.execute("MATCH (i:Internet {name:'public'}), (s:Service {name:'api'}) CREATE (i)-[:EXPOSES]->(s)")
db.execute("MATCH (a:Service {name:'api'}), (w:Service {name:'worker'}) CREATE (a)-[:DEPENDS_ON]->(w)")
db.execute("MATCH (w:Service {name:'worker'}), (d:Service {name:'db'}) CREATE (w)-[:DEPENDS_ON]->(d)")
print('Seeded graph data')
Seeded graph data
4) Locy Program¶
CREATE RULE defines derived relations. QUERY ... WHERE ... RETURN ... reads from those relations.
program = r'''
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(b:Service)
YIELD KEY a, KEY b
CREATE RULE depends_on AS
MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service)
WHERE mid IS depends_on TO b
YIELD KEY a, KEY b
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(s:Service)
YIELD KEY s
CREATE RULE exposed AS
MATCH (i:Internet)-[:EXPOSES]->(entry:Service)
WHERE entry IS depends_on TO s
YIELD KEY s
CREATE RULE vulnerable AS
MATCH (s:Service)
WHERE s.cve_score >= 7
YIELD KEY s
CREATE RULE non_compliant AS
MATCH (s:Service)
WHERE s IS vulnerable, s IS exposed
YIELD KEY s, 'patch-now' AS action
QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action
'''
print(program)
CREATE RULE depends_on AS MATCH (a:Service)-[:DEPENDS_ON]->(b:Service) YIELD KEY a, KEY b CREATE RULE depends_on AS MATCH (a:Service)-[:DEPENDS_ON]->(mid:Service) WHERE mid IS depends_on TO b YIELD KEY a, KEY b CREATE RULE exposed AS MATCH (i:Internet)-[:EXPOSES]->(s:Service) YIELD KEY s CREATE RULE exposed AS MATCH (i:Internet)-[:EXPOSES]->(entry:Service) WHERE entry IS depends_on TO s YIELD KEY s CREATE RULE vulnerable AS MATCH (s:Service) WHERE s.cve_score >= 7 YIELD KEY s CREATE RULE non_compliant AS MATCH (s:Service) WHERE s IS vulnerable, s IS exposed YIELD KEY s, 'patch-now' AS action QUERY non_compliant WHERE s.name = s.name RETURN s.name AS service, action
5) Evaluate Locy Program¶
Run the program, then inspect materialization stats (iterations, strata, and executed queries).
out = db.locy_evaluate(program)
print("Derived relations:", list(out["derived"].keys()))
stats = out["stats"]
print("Iterations:", stats.total_iterations)
print("Strata:", stats.strata_evaluated)
print("Queries executed:", stats.queries_executed)
Derived relations: ['non_compliant', 'depends_on', 'exposed', 'vulnerable'] Iterations: 3 Strata: 4 Queries executed: 14
6) Inspect Command Results¶
Each command result can contain rows; this is the easiest way to verify your rule outputs and query projections.
print("Derived relation snapshots:")
for rel_name, rel_rows in out["derived"].items():
print(f"\\n{rel_name}: {len(rel_rows)} row(s)")
pprint(rel_rows)
if out["command_results"]:
print("\\nCommand results:")
for i, cmd in enumerate(out["command_results"], start=1):
print(f"\\nCommand #{i}:", cmd.get("type"))
rows = cmd.get("rows")
if rows is not None:
pprint(rows)
if not out["command_results"]:
print("\\nNo QUERY/EXPLAIN/ABDUCE command outputs in this program.")
Derived relation snapshots:
\nnon_compliant: 6 row(s)
[{'action': None,
's': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'}},
{'action': None,
's': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}},
{'action': None,
's': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'}},
{'action': None,
's': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}},
{'action': None,
's': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'}},
{'action': None,
's': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}}]
\ndepends_on: 3 row(s)
[{'a': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'},
'b': {'_id': '2',
'_labels': ['Service'],
'cve_score': 4.0,
'name': 'worker'}},
{'a': {'_id': '2', '_labels': ['Service'], 'cve_score': 4.0, 'name': 'worker'},
'b': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}},
{'a': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'},
'b': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}}]
\nexposed: 3 row(s)
[{'s': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'}},
{'s': {'_id': '2',
'_labels': ['Service'],
'cve_score': 4.0,
'name': 'worker'}},
{'s': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}}]
\nvulnerable: 2 row(s)
[{'s': {'_id': '1', '_labels': ['Service'], 'cve_score': 9.1, 'name': 'api'}},
{'s': {'_id': '3', '_labels': ['Service'], 'cve_score': 8.4, 'name': 'db'}}]
\nCommand results:
\nCommand #1: query
[{'Variable("action")': None, 'service': 'api'},
{'Variable("action")': None, 'service': 'db'}]
7) What To Expect¶
Use these checks to validate output after evaluation:
non_compliantshould includeapianddb(reachable from internet and CVE >= 7).workershould not appear in remediation rows because its CVE score is below threshold.Queries executedshould be 1 for this program.
8) Cleanup¶
Delete the temporary database directory created in setup.
shutil.rmtree(DB_DIR, ignore_errors=True)
print("Cleaned up", DB_DIR)
Cleaned up /tmp/uni_locy_vyp5kdh9