Supply Chain Management with uni-pydantic¶
This notebook demonstrates how to model a supply chain graph to perform BOM (Bill of Materials) explosion and cost rollup using Pydantic models.
In [1]:
Copied!
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
1. Define Models¶
Parts, Suppliers, and Products with assembly relationships using type-safe Pydantic models.
In [2]:
Copied!
class Part(UniNode):
"""A component part in the supply chain."""
__label__ = "Part"
sku: str = Field(index="btree", unique=True)
cost: float
# Relationships
used_in: list["Part"] = Relationship("ASSEMBLED_FROM", direction="incoming")
components: list["Part"] = Relationship("ASSEMBLED_FROM", direction="outgoing")
suppliers: list["Supplier"] = Relationship("SUPPLIED_BY", direction="outgoing")
class Supplier(UniNode):
"""A supplier of parts."""
__label__ = "Supplier"
name: str = Field(index="btree")
country: str | None = None
# Relationships
supplies: list[Part] = Relationship("SUPPLIED_BY", direction="incoming")
class Product(UniNode):
"""A finished product assembled from parts."""
__label__ = "Product"
name: str = Field(index="btree")
price: float
# Relationships
components: list[Part] = Relationship("ASSEMBLED_FROM", direction="outgoing")
class AssembledFrom(UniEdge):
"""Edge representing assembly relationship."""
__edge_type__ = "ASSEMBLED_FROM"
__from__ = (Product, Part)
__to__ = Part
class SuppliedBy(UniEdge):
"""Edge representing supplier relationship."""
__edge_type__ = "SUPPLIED_BY"
__from__ = Part
__to__ = Supplier
class Part(UniNode):
"""A component part in the supply chain."""
__label__ = "Part"
sku: str = Field(index="btree", unique=True)
cost: float
# Relationships
used_in: list["Part"] = Relationship("ASSEMBLED_FROM", direction="incoming")
components: list["Part"] = Relationship("ASSEMBLED_FROM", direction="outgoing")
suppliers: list["Supplier"] = Relationship("SUPPLIED_BY", direction="outgoing")
class Supplier(UniNode):
"""A supplier of parts."""
__label__ = "Supplier"
name: str = Field(index="btree")
country: str | None = None
# Relationships
supplies: list[Part] = Relationship("SUPPLIED_BY", direction="incoming")
class Product(UniNode):
"""A finished product assembled from parts."""
__label__ = "Product"
name: str = Field(index="btree")
price: float
# Relationships
components: list[Part] = Relationship("ASSEMBLED_FROM", direction="outgoing")
class AssembledFrom(UniEdge):
"""Edge representing assembly relationship."""
__edge_type__ = "ASSEMBLED_FROM"
__from__ = (Product, Part)
__to__ = Part
class SuppliedBy(UniEdge):
"""Edge representing supplier relationship."""
__edge_type__ = "SUPPLIED_BY"
__from__ = Part
__to__ = Supplier
2. Setup Database and Session¶
In [3]:
Copied!
db_path = os.path.join(tempfile.gettempdir(), "supply_chain_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(Part, Supplier, Product, AssembledFrom, SuppliedBy)
session.sync_schema()
print(f"Opened database at {db_path}")
db_path = os.path.join(tempfile.gettempdir(), "supply_chain_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(Part, Supplier, Product, AssembledFrom, SuppliedBy)
session.sync_schema()
print(f"Opened database at {db_path}")
Opened database at /tmp/supply_chain_pydantic_db
3. Create Data¶
Create parts, suppliers, and products with assembly hierarchy.
In [4]:
Copied!
# Create parts
resistor = Part(sku="RES-10K", cost=0.05)
motherboard = Part(sku="MB-X1", cost=50.0)
screen = Part(sku="SCR-OLED", cost=30.0)
battery = Part(sku="BAT-5000", cost=15.0)
case = Part(sku="CASE-ALU", cost=20.0)
# Create suppliers
electronics_co = Supplier(name="ElectronicsCo", country="China")
display_inc = Supplier(name="DisplayInc", country="South Korea")
# Create product
smartphone = Product(name="Smartphone X", price=500.0)
# Add all nodes
session.add_all([
resistor, motherboard, screen, battery, case,
electronics_co, display_inc,
smartphone
])
session.commit()
print("Created parts, suppliers, and product")
# Create parts
resistor = Part(sku="RES-10K", cost=0.05)
motherboard = Part(sku="MB-X1", cost=50.0)
screen = Part(sku="SCR-OLED", cost=30.0)
battery = Part(sku="BAT-5000", cost=15.0)
case = Part(sku="CASE-ALU", cost=20.0)
# Create suppliers
electronics_co = Supplier(name="ElectronicsCo", country="China")
display_inc = Supplier(name="DisplayInc", country="South Korea")
# Create product
smartphone = Product(name="Smartphone X", price=500.0)
# Add all nodes
session.add_all([
resistor, motherboard, screen, battery, case,
electronics_co, display_inc,
smartphone
])
session.commit()
print("Created parts, suppliers, and product")
Created parts, suppliers, and product
In [5]:
Copied!
# Create assembly hierarchy
# Smartphone is assembled from motherboard, screen, battery, case
session.create_edge(smartphone, "ASSEMBLED_FROM", motherboard)
session.create_edge(smartphone, "ASSEMBLED_FROM", screen)
session.create_edge(smartphone, "ASSEMBLED_FROM", battery)
session.create_edge(smartphone, "ASSEMBLED_FROM", case)
# Motherboard contains resistors
session.create_edge(motherboard, "ASSEMBLED_FROM", resistor)
# Supplier relationships
session.create_edge(resistor, "SUPPLIED_BY", electronics_co)
session.create_edge(motherboard, "SUPPLIED_BY", electronics_co)
session.create_edge(screen, "SUPPLIED_BY", display_inc)
session.commit()
print("Created assembly and supplier relationships")
# Create assembly hierarchy
# Smartphone is assembled from motherboard, screen, battery, case
session.create_edge(smartphone, "ASSEMBLED_FROM", motherboard)
session.create_edge(smartphone, "ASSEMBLED_FROM", screen)
session.create_edge(smartphone, "ASSEMBLED_FROM", battery)
session.create_edge(smartphone, "ASSEMBLED_FROM", case)
# Motherboard contains resistors
session.create_edge(motherboard, "ASSEMBLED_FROM", resistor)
# Supplier relationships
session.create_edge(resistor, "SUPPLIED_BY", electronics_co)
session.create_edge(motherboard, "SUPPLIED_BY", electronics_co)
session.create_edge(screen, "SUPPLIED_BY", display_inc)
session.commit()
print("Created assembly and supplier relationships")
Created assembly and supplier relationships
4. BOM Explosion¶
Find all products affected by a defective part, traversing up the assembly hierarchy.
In [6]:
Copied!
# Warm-up to ensure adjacency is loaded
session.cypher("MATCH (a:Part)-[:ASSEMBLED_FROM]->(b:Part) RETURN a.sku")
# Warm-up to ensure adjacency is loaded
session.cypher("MATCH (a:Part)-[:ASSEMBLED_FROM]->(b:Part) RETURN a.sku")
Out[6]:
[{'a.sku': 'MB-X1'}]
In [7]:
Copied!
# Find products affected by defective resistor
query = """
MATCH (defective:Part {sku: 'RES-10K'})
MATCH (product:Product)-[:ASSEMBLED_FROM*1..5]->(defective)
RETURN product.name as name, product.price as price
"""
results = session.cypher(query)
print("Products affected by defective RES-10K resistor:")
for r in results:
print(f" - {r['name']} (${r['price']})")
# Find products affected by defective resistor
query = """
MATCH (defective:Part {sku: 'RES-10K'})
MATCH (product:Product)-[:ASSEMBLED_FROM*1..5]->(defective)
RETURN product.name as name, product.price as price
"""
results = session.cypher(query)
print("Products affected by defective RES-10K resistor:")
for r in results:
print(f" - {r['name']} (${r['price']})")
Products affected by defective RES-10K resistor: - Smartphone X ($500.0) - Smartphone X ($500.0) - Smartphone X ($500.0) - Smartphone X ($500.0) - Smartphone X ($500.0)
5. Cost Rollup¶
Calculate the total cost of parts for a product by traversing down the assembly tree.
In [8]:
Copied!
# Calculate total BOM cost for Smartphone X
query_cost = """
MATCH (p:Product {name: 'Smartphone X'})
MATCH (p)-[:ASSEMBLED_FROM*1..5]->(part:Part)
RETURN SUM(part.cost) AS total_bom_cost
"""
results_cost = session.cypher(query_cost)
total_cost = results_cost[0]['total_bom_cost']
print(f"Total BOM Cost for Smartphone X: ${total_cost:,.2f}")
print(f"Product Price: ${smartphone.price:,.2f}")
print(f"Gross Margin: ${smartphone.price - total_cost:,.2f} ({(smartphone.price - total_cost) / smartphone.price * 100:.1f}%)")
# Calculate total BOM cost for Smartphone X
query_cost = """
MATCH (p:Product {name: 'Smartphone X'})
MATCH (p)-[:ASSEMBLED_FROM*1..5]->(part:Part)
RETURN SUM(part.cost) AS total_bom_cost
"""
results_cost = session.cypher(query_cost)
total_cost = results_cost[0]['total_bom_cost']
print(f"Total BOM Cost for Smartphone X: ${total_cost:,.2f}")
print(f"Product Price: ${smartphone.price:,.2f}")
print(f"Gross Margin: ${smartphone.price - total_cost:,.2f} ({(smartphone.price - total_cost) / smartphone.price * 100:.1f}%)")
Total BOM Cost for Smartphone X: $115.05 Product Price: $500.00 Gross Margin: $384.95 (77.0%)
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Schema error: No field named "part.cost". Valid fields are "p._vid", "p.name", _target_vid, _hop_count.
6. Supplier Analysis¶
Analyze supplier exposure and risk.
In [9]:
Copied!
# Find all parts from a specific supplier used in Smartphone X
query = """
MATCH (p:Product {name: 'Smartphone X'})-[:ASSEMBLED_FROM*1..5]->(part:Part)-[:SUPPLIED_BY]->(s:Supplier)
RETURN s.name as supplier, s.country as country, collect(part.sku) as parts, SUM(part.cost) as total_cost
"""
results = session.cypher(query)
print("Supplier Exposure for Smartphone X:")
print("-" * 50)
for r in results:
print(f"\nSupplier: {r['supplier']} ({r['country']})")
print(f" Parts: {', '.join(r['parts'])}")
print(f" Total Cost: ${r['total_cost']:,.2f}")
# Find all parts from a specific supplier used in Smartphone X
query = """
MATCH (p:Product {name: 'Smartphone X'})-[:ASSEMBLED_FROM*1..5]->(part:Part)-[:SUPPLIED_BY]->(s:Supplier)
RETURN s.name as supplier, s.country as country, collect(part.sku) as parts, SUM(part.cost) as total_cost
"""
results = session.cypher(query)
print("Supplier Exposure for Smartphone X:")
print("-" * 50)
for r in results:
print(f"\nSupplier: {r['supplier']} ({r['country']})")
print(f" Parts: {', '.join(r['parts'])}")
print(f" Total Cost: ${r['total_cost']:,.2f}")
Supplier Exposure for Smartphone X: -------------------------------------------------- Supplier: ElectronicsCo (China) Parts: MB-X1, RES-10K Total Cost: $50.05 Supplier: DisplayInc (South Korea) Parts: SCR-OLED Total Cost: $30.00
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Schema error: No field named "part.sku". Valid fields are "p._vid", "p.name", _target_vid, _hop_count, "s._vid", "s.country", "s.name".
7. Query Builder Demo¶
Using the type-safe query builder for supply chain queries.
In [10]:
Copied!
# Find all expensive parts using query builder
expensive_parts = (
session.query(Part)
.filter(Part.cost >= 20.0)
.order_by(Part.cost, descending=True)
.all()
)
print("Expensive Parts (>=$20):")
for part in expensive_parts:
print(f" - {part.sku}: ${part.cost:,.2f}")
# Find all expensive parts using query builder
expensive_parts = (
session.query(Part)
.filter(Part.cost >= 20.0)
.order_by(Part.cost, descending=True)
.all()
)
print("Expensive Parts (>=$20):")
for part in expensive_parts:
print(f" - {part.sku}: ${part.cost:,.2f}")
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Error during planning: UDF 'properties' is not registered. Register it via SessionContext.
Expensive Parts (>=$20): - MB-X1: $50.00 - SCR-OLED: $30.00 - CASE-ALU: $20.00
In [11]:
Copied!
# Find a specific part by SKU
found_part = session.query(Part).filter(Part.sku == "MB-X1").first()
if found_part:
print(f"Found part: {found_part.sku}")
print(f" Cost: ${found_part.cost:,.2f}")
print(f" VID: {found_part.vid}")
# Find a specific part by SKU
found_part = session.query(Part).filter(Part.sku == "MB-X1").first()
if found_part:
print(f"Found part: {found_part.sku}")
print(f" Cost: ${found_part.cost:,.2f}")
print(f" VID: {found_part.vid}")
Found part: MB-X1 Cost: $50.00 VID: 1
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Error during planning: UDF 'properties' is not registered. Register it via SessionContext.