Regional Sales Analytics with uni-pydantic¶
Combining Graph Traversal with Columnar Aggregation using Pydantic models.
In [1]:
Copied!
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
import os
import shutil
import tempfile
import uni_db
from uni_pydantic import UniNode, UniEdge, UniSession, Field, Relationship
1. Define Models¶
Orders shipped to regions with type-safe Pydantic models.
In [2]:
Copied!
class Region(UniNode):
"""A geographic region for sales tracking."""
__label__ = "Region"
name: str = Field(index="btree")
# Relationships
orders: list["Order"] = Relationship("SHIPPED_TO", direction="incoming")
class Order(UniNode):
"""A sales order."""
__label__ = "Order"
amount: float
# Relationships
region: "Region | None" = Relationship("SHIPPED_TO", direction="outgoing")
class ShippedTo(UniEdge):
"""Edge representing order shipped to region."""
__edge_type__ = "SHIPPED_TO"
__from__ = Order
__to__ = Region
class Region(UniNode):
"""A geographic region for sales tracking."""
__label__ = "Region"
name: str = Field(index="btree")
# Relationships
orders: list["Order"] = Relationship("SHIPPED_TO", direction="incoming")
class Order(UniNode):
"""A sales order."""
__label__ = "Order"
amount: float
# Relationships
region: "Region | None" = Relationship("SHIPPED_TO", direction="outgoing")
class ShippedTo(UniEdge):
"""Edge representing order shipped to region."""
__edge_type__ = "SHIPPED_TO"
__from__ = Order
__to__ = Region
2. Setup Database and Session¶
In [3]:
Copied!
db_path = os.path.join(tempfile.gettempdir(), "sales_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(Region, Order, ShippedTo)
session.sync_schema()
print(f"Opened database at {db_path}")
db_path = os.path.join(tempfile.gettempdir(), "sales_pydantic_db")
if os.path.exists(db_path):
shutil.rmtree(db_path)
db = uni_db.Database(db_path)
# Create session and register models
session = UniSession(db)
session.register(Region, Order, ShippedTo)
session.sync_schema()
print(f"Opened database at {db_path}")
Opened database at /tmp/sales_pydantic_db
3. Create Data¶
Create regions and orders using Pydantic models.
In [4]:
Copied!
# Create regions
north = Region(name="North")
south = Region(name="South")
east = Region(name="East")
west = Region(name="West")
session.add_all([north, south, east, west])
session.commit()
print("Created regions: North, South, East, West")
# Create regions
north = Region(name="North")
south = Region(name="South")
east = Region(name="East")
west = Region(name="West")
session.add_all([north, south, east, west])
session.commit()
print("Created regions: North, South, East, West")
Created regions: North, South, East, West
In [5]:
Copied!
# Create 100 orders for North region
orders = [Order(amount=10.0 * (i + 1)) for i in range(100)]
session.add_all(orders)
session.commit()
# Ship all orders to North
for order in orders:
session.create_edge(order, "SHIPPED_TO", north)
# Create some orders for other regions
south_orders = [Order(amount=50.0 * (i + 1)) for i in range(20)]
session.add_all(south_orders)
session.commit()
for order in south_orders:
session.create_edge(order, "SHIPPED_TO", south)
session.commit()
print("Created 100 orders for North and 20 orders for South")
# Create 100 orders for North region
orders = [Order(amount=10.0 * (i + 1)) for i in range(100)]
session.add_all(orders)
session.commit()
# Ship all orders to North
for order in orders:
session.create_edge(order, "SHIPPED_TO", north)
# Create some orders for other regions
south_orders = [Order(amount=50.0 * (i + 1)) for i in range(20)]
session.add_all(south_orders)
session.commit()
for order in south_orders:
session.create_edge(order, "SHIPPED_TO", south)
session.commit()
print("Created 100 orders for North and 20 orders for South")
Created 100 orders for North and 20 orders for South
4. Analytical Queries¶
Aggregation queries combining graph traversal with columnar analytics.
In [6]:
Copied!
# Sum of amounts for orders in North region
query = """
MATCH (r:Region {name: 'North'})<-[:SHIPPED_TO]-(o:Order)
RETURN SUM(o.amount) as total
"""
results = session.cypher(query)
print(f"Total Sales for North Region: ${results[0]['total']:,.2f}")
# Sum of amounts for orders in North region
query = """
MATCH (r:Region {name: 'North'})<-[:SHIPPED_TO]-(o:Order)
RETURN SUM(o.amount) as total
"""
results = session.cypher(query)
print(f"Total Sales for North Region: ${results[0]['total']:,.2f}")
Total Sales for North Region: $50,500.00
In [7]:
Copied!
# Sum of amounts for orders in South region
query = """
MATCH (r:Region {name: 'South'})<-[:SHIPPED_TO]-(o:Order)
RETURN SUM(o.amount) as total, COUNT(o) as order_count, AVG(o.amount) as avg_order
"""
results = session.cypher(query)
r = results[0]
print("South Region Analytics:")
print(f" Total Sales: ${r['total']:,.2f}")
print(f" Order Count: {r['order_count']}")
print(f" Average Order: ${r['avg_order']:,.2f}")
# Sum of amounts for orders in South region
query = """
MATCH (r:Region {name: 'South'})<-[:SHIPPED_TO]-(o:Order)
RETURN SUM(o.amount) as total, COUNT(o) as order_count, AVG(o.amount) as avg_order
"""
results = session.cypher(query)
r = results[0]
print("South Region Analytics:")
print(f" Total Sales: ${r['total']:,.2f}")
print(f" Order Count: {r['order_count']}")
print(f" Average Order: ${r['avg_order']:,.2f}")
South Region Analytics: Total Sales: $10,500.00 Order Count: 20 Average Order: $525.00
5. Query Builder Demo¶
Using the type-safe query builder for analytics.
In [8]:
Copied!
# Find high-value orders using query builder
high_value_orders = (
session.query(Order)
.filter(Order.amount >= 500.0)
.order_by(Order.amount, descending=True)
.limit(10)
.all()
)
print("Top 10 High-Value Orders (>=$500):")
for i, order in enumerate(high_value_orders, 1):
print(f" {i}. Order vid={order.vid}: ${order.amount:,.2f}")
# Find high-value orders using query builder
high_value_orders = (
session.query(Order)
.filter(Order.amount >= 500.0)
.order_by(Order.amount, descending=True)
.limit(10)
.all()
)
print("Top 10 High-Value Orders (>=$500):")
for i, order in enumerate(high_value_orders, 1):
print(f" {i}. Order vid={order.vid}: ${order.amount:,.2f}")
Top 10 High-Value Orders (>=$500): 1. Order vid=103: $1,000.00 2. Order vid=123: $1,000.00 3. Order vid=102: $990.00 4. Order vid=101: $980.00 5. Order vid=100: $970.00 6. Order vid=99: $960.00 7. Order vid=98: $950.00 8. Order vid=122: $950.00 9. Order vid=97: $940.00 10. Order vid=96: $930.00
DEBUG 2: DataFusion execution failed (falling back to execute_subplan): Error during planning: UDF 'properties' is not registered. Register it via SessionContext.
In [9]:
Copied!
# Count orders by value range
small_orders = session.query(Order).filter(Order.amount < 100).count()
medium_orders = session.query(Order).filter(Order.amount >= 100).filter(Order.amount < 500).count()
large_orders = session.query(Order).filter(Order.amount >= 500).count()
print("Order Distribution:")
print(f" Small (<$100): {small_orders}")
print(f" Medium ($100-499): {medium_orders}")
print(f" Large (>=$500): {large_orders}")
# Count orders by value range
small_orders = session.query(Order).filter(Order.amount < 100).count()
medium_orders = session.query(Order).filter(Order.amount >= 100).filter(Order.amount < 500).count()
large_orders = session.query(Order).filter(Order.amount >= 500).count()
print("Order Distribution:")
print(f" Small (<$100): {small_orders}")
print(f" Medium ($100-499): {medium_orders}")
print(f" Large (>=$500): {large_orders}")
Order Distribution: Small (<$100): 10 Medium ($100-499): 48 Large (>=$500): 62