Skip to content

Messaging

The messaging system in Rustic AI enables asynchronous, topic-based communication between agents within guilds. It provides a layered architecture with pluggable backends for different deployment scenarios, from development to distributed production systems.

Purpose

  • Facilitate communication between agents within and across guilds
  • Support topic-based publish/subscribe messaging with namespace isolation
  • Enable message persistence, retrieval, and real-time notifications
  • Provide pluggable storage backends for different deployment needs

Architecture Overview

The messaging system consists of three main layers:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│     Clients     │───▶│ MessagingInterface│───▶│ MessagingBackend│
│   (Agents)      │    │  (Message Bus)   │    │   (Storage)     │
└─────────────────┘    └──────────────────┘    └─────────────────┘

Key Concepts

Messages

Structured data packets exchanged between agents with the following key properties: - Unique IDs: Generated using GemstoneID with embedded timestamp and priority - Topics: Support single or multiple topic publishing - Threading: Maintain conversation threads and message history - Namespacing: Automatic namespace isolation per guild

Topics

Channels for message delivery with automatic namespace prefixing: - Guild Isolation: Each guild gets its own namespace (e.g., guild-123:topic-name) - Subscription Management: Agents subscribe to topics to receive relevant messages - Multi-Topic Publishing: Single message can be published to multiple topics

Clients

Agents interact with the messaging system via client interfaces: - MessageTrackingClient: Default client with message ordering and tracking - SimpleClient: Basic client for simple use cases - Specialized Clients: Pipeline, filtering, throttling, and other decorators

Backends

Pluggable storage implementations: - InMemoryMessagingBackend: Fast, in-memory storage for development/testing - EmbeddedMessagingBackend: Embedded messaging for testing without external dependencies - RedisMessagingBackend: Persistent, distributed storage for production - Custom Backends: Implement MessagingBackend interface for specialized needs

Message Schema

Every Message instance has the following structure:

class Message(BaseModel):
    # Core fields
    sender: AgentTag                    # Who sent the message
    topics: Union[str, List[str]]       # Target topic(s)
    payload: JsonDict                   # Message content
    format: str                         # Message format (default: "generic_json")

    # Optional fields
    recipient_list: List[AgentTag]      # Tagged recipients
    in_response_to: Optional[int]       # Reply to message ID
    thread: List[int]                   # Conversation thread
    conversation_id: Optional[int]      # Conversation grouping
    forward_header: Optional[ForwardHeader]  # Forwarding information
    routing_slip: Optional[RoutingSlip] # Multi-step routing
    message_history: List[ProcessEntry] # Processing history
    ttl: Optional[int]                  # Time to live

    # Computed fields (read-only)
    id: int                            # Unique message ID
    priority: Priority                 # Message priority (0-9)
    timestamp: float                   # Creation timestamp

JSON Serialization Example

{
  "id": 123456789,
  "sender": {"id": "agent-1", "name": "Agent One"},
  "topics": "updates",
  "payload": {"text": "Hello, World!"},
  "format": "generic_json",
  "priority": 4,
  "timestamp": 1640995200.123,
  "thread": [123456789],
  "recipient_list": [],
  "in_response_to": null,
  "conversation_id": null,
  "forward_header": null,
  "routing_slip": null,
  "message_history": [],
  "ttl": null,
  "is_error_message": false,
  "traceparent": null,
  "session_state": null,
  "topic_published_to": "updates",
  "enrich_with_history": 0
}

MessagingInterface: The Message Bus

The MessagingInterface serves as the central message bus with the following responsibilities:

Client Management

# Register agent as messaging client
messaging.register_client(client)

# Unregister when agent shuts down
messaging.unregister_client(client)

Topic Subscription

# Subscribe agent to topic
messaging.subscribe("updates", client)

# Unsubscribe from topic
messaging.unsubscribe("updates", client)

Message Publishing

# Publish message to topic(s)
messaging.publish(sender_client, message)

Message Retrieval

# Get all messages for a topic
messages = messaging.get_messages("updates")

# Get messages since specific ID
recent_messages = messaging.get_messages_for_topic_since("updates", last_id)

# Get messages for a specific client across all subscribed topics
client_messages = messaging.get_messages_for_client_since(client_id, last_id)

Advanced Features

Namespace Isolation

  • Automatic topic prefixing: topicguild-123:topic
  • Prevents message leakage between guilds
  • Transparent to agents

Message History Enrichment

# Messages can request conversation history
message.enrich_with_history = 5  # Include last 5 messages
# History automatically added to message.session_state["enriched_history"]

Smart Subscription Management

  • Lazy backend subscription (only when first client subscribes)
  • Reference counting (unsubscribe from backend when last client leaves)
  • Automatic cleanup on client disconnect

MessagingBackend: Storage Layer

The MessagingBackend abstract base class defines the interface for message storage:

Core Interface

class MessagingBackend(ABC):
    @abstractmethod
    def store_message(self, namespace: str, topic: str, message: Message) -> None:
        """Store a message in the backend"""

    @abstractmethod
    def get_messages_for_topic(self, topic: str) -> List[Message]:
        """Retrieve all messages for a topic"""

    @abstractmethod
    def get_messages_for_topic_since(self, topic: str, msg_id_since: int) -> List[Message]:
        """Retrieve messages since a specific ID"""

    @abstractmethod
    def subscribe(self, topic: str, handler: Callable[[Message], None]) -> None:
        """Subscribe to real-time notifications"""

    @abstractmethod
    def get_messages_by_id(self, namespace: str, msg_ids: List[int]) -> List[Message]:
        """Batch retrieve messages by ID"""

Available Backends

InMemoryMessagingBackend

# Configuration
MessagingConfig(
    backend_module="rustic_ai.core.messaging.backend",
    backend_class="InMemoryMessagingBackend",
    backend_config={}
)

Features:

  • Performance: Extremely fast in-memory operations
  • Real-time: Immediate callback-based notifications
  • Singleton: Shared state across multiple interface instances
  • Development: Perfect for testing and development

Limitations:

  • Not Persistent: Data lost on restart
  • Single Process: Cannot share across processes
  • Memory Bound: Limited by available RAM

EmbeddedMessagingBackend

# Configuration with auto-start server
MessagingConfig(
    backend_module="rustic_ai.core.messaging.backend.embedded_backend",
    backend_class="EmbeddedMessagingBackend",
    backend_config={}
)

# Configuration with external server
MessagingConfig(
    backend_module="rustic_ai.core.messaging.backend.embedded_backend",
    backend_class="EmbeddedMessagingBackend",
    backend_config={
        "port": 31134,
        "auto_start_server": False
    }
)

Features:

  • Cross-Process: Enables communication between separate processes
  • Socket-Based: Fast TCP socket communication with asyncio
  • No External Dependencies: Uses only Python standard library
  • Real-time Subscriptions: True push delivery with asyncio
  • Message TTL: Automatic message expiration
  • Auto-cleanup: Automatic resource management and cleanup
  • Back-pressure Handling: Per-connection queues with overflow protection
  • Testing Focused: Ideal for testing multiprocess scenarios

Use Cases:

  • Multiprocess Testing: Test agents running in separate processes
  • Development: Fast messaging without external setup
  • CI/CD: Testing distributed scenarios without infrastructure
  • Process Isolation: When you need true process separation

Limitations:

  • Memory Only: Data not persisted to disk
  • Single Server: No clustering or high availability
  • Testing Scope: Designed for testing rather than production

RedisMessagingBackend

# Configuration
MessagingConfig(
    backend_module="rustic_ai.redis.messaging.backend",
    backend_class="RedisMessagingBackend",
    backend_config={
        "redis_client": {
            "host": "localhost",
            "port": 6379,
            "ssl": False
        }
    }
)

Features:

  • Persistent: Data survives restarts
  • Distributed: Shared across processes and machines
  • Scalable: Redis clustering support
  • Real-time: Redis pub/sub for notifications
  • TTL Support: Configurable message expiration
  • Efficient: Pipeline operations for batch processing

Storage Strategy:

  • Sorted Sets: Messages stored with timestamp scores for ordering
  • Secondary Index: Direct ID lookup via msg:namespace:id keys
  • Pub/Sub: Real-time notifications via Redis pub/sub

Configuration

MessagingConfig

class MessagingConfig(BaseModel):
    backend_module: str      # Python module containing backend
    backend_class: str       # Backend class name
    backend_config: Dict     # Backend-specific configuration

Dynamic Backend Creation

# MessagingConfig automatically creates backend instances
config = MessagingConfig(
    backend_module="rustic_ai.core.messaging.backend",
    backend_class="InMemoryMessagingBackend",
    backend_config={}
)

# Socket messaging backend helper
from rustic_ai.core.messaging.backend.embedded_backend import create_embedded_messaging_config
embedded_config = create_embedded_messaging_config()

# MessagingInterface uses config to create backend
messaging = MessagingInterface(namespace="guild-123", messaging_config=config)

Message Flow Example

# 1. Agent A sends message
message = Message(
    id_obj=generator.get_id(Priority.NORMAL),
    sender=AgentTag(id="agent-a", name="Agent A"),
    topics="updates",
    payload={"text": "Hello, Agent B!"}
)

# 2. Publish via messaging interface
messaging_interface.publish(client_a, message)

# 3. MessagingInterface processes:
#    - Adds namespace prefix: "updates" → "guild-123:updates"
#    - Stores in backend
#    - Notifies subscribers

# 4. Backend stores and notifies:
#    - InMemory: Immediate callback
#    - SocketMessaging: Socket push delivery
#    - Redis: Pub/sub notification

# 5. Agent B receives message:
#    - Client notified via callback
#    - Message delivered to agent's message handler

Cross-Process Messaging Example

# Process 1: Setup embedded messaging backend with MultiProcessExecutionEngine
from rustic_ai.core.messaging.backend.embedded_backend import create_embedded_messaging_config
from rustic_ai.core.guild.execution.multiprocess import MultiProcessExecutionEngine

# Create messaging config for cross-process communication
messaging_config = create_embedded_messaging_config()

# Create guild with multiprocess execution
guild = Guild(
    guild_id="distributed-guild",
    execution_engine=MultiProcessExecutionEngine(guild_id="distributed-guild"),
    messaging_config=messaging_config
)

# Launch agents in separate processes
for i in range(3):
    agent_spec = create_worker_agent_spec(f"worker-{i}")
    guild.launch_agent(agent_spec)

# Agents can now communicate across processes via socket messaging backend
# - No Redis setup required
# - Process isolation maintained
# - Real-time messaging via socket push delivery

Client Types

MessageTrackingClient (Default)

class MessageTrackingClient(Client):
    """Tracks message processing with ordering and deduplication"""
  • Message Ordering: Processes messages in ID order
  • Tracking: Tracks last processed message ID
  • Threading: Uses events and heaps for efficient processing

SimpleClient

class SimpleClient(Client):
    """Basic client for simple messaging needs"""
- Lightweight: Minimal overhead - Direct: Immediate message processing

Specialized Clients

  • PipelineClient: Chain multiple processing steps
  • FilteringClient: Filter messages based on criteria
  • ThrottlingClient: Rate-limit message processing
  • RetryingClient: Automatic retry on failures
  • LoggingClient: Add logging to message processing

Integration with Execution Engines

Execution Engine Compatibility

Different execution engines work optimally with different messaging backends:

Execution Engine Recommended Backend Reason
SyncExecutionEngine InMemoryMessagingBackend Fast, single-process operation
MultiThreadedEngine InMemoryMessagingBackend or RedisMessagingBackend Thread-safe with shared memory
MultiProcessExecutionEngine EmbeddedMessagingBackend or RedisMessagingBackend Cross-process communication
RayExecutionEngine RedisMessagingBackend Distributed messaging

The EmbeddedMessagingBackend is particularly well-suited for the MultiProcessExecutionEngine because: - No External Dependencies: Works without Redis setup - Process Isolation: Each process gets proper isolation while maintaining communication - Testing Friendly: Ideal for testing distributed scenarios in CI/CD - Redis-like Features: Provides coordination primitives for process synchronization

Execution Engine Responsibilities

  • Configuration: Provide MessagingConfig to agent wrappers
  • Lifecycle: Coordinate messaging setup during agent initialization
  • Isolation: Each agent gets its own messaging client

Agent Wrapper Integration

class AgentWrapper:
    def initialize_agent(self):
        # Create MessagingInterface from config
        self.messaging = MessagingInterface(self.agent.guild_id, self.messaging_config)

        # Create and register client
        client = self.client_type(
            id=self.agent.id,
            name=self.agent.name,
            message_handler=self.agent._on_message
        )
        self.messaging.register_client(client)

        # Subscribe to agent's topics
        for topic in self.agent.subscribed_topics:
            self.messaging.subscribe(topic, client)

Performance Characteristics

MessagingInterface Performance

  • Client Lookup: O(1) by client ID
  • Topic Subscription: O(1) for subscription management
  • Message Copying: Deep copies for isolation (memory overhead)
  • Namespace Processing: O(1) string operations

Backend Performance Comparison

Feature InMemoryBackend EmbeddedMessagingBackend RedisBackend
Latency ~1μs ~0.1-1ms ~1-10ms
Throughput Very High Very High High
Persistence None None Full
Scalability Single Process Multi-Process Distributed
Memory Usage High (all in RAM) Medium (socket overhead) Low (Redis manages)
Real-time Immediate Push delivery Near real-time
Cross-Process No Yes Yes
External Dependencies None None Redis Server

Best Practices

Development

  • Use InMemoryMessagingBackend for fast iteration and single-process testing
  • Use EmbeddedMessagingBackend for testing multiprocess scenarios and process isolation
  • Enable debug logging for message flow visibility
  • Use SyncExecutionEngine for deterministic testing

Testing

  • Use EmbeddedMessagingBackend with MultiProcessExecutionEngine for realistic testing
  • Leverage embedded messaging backend's message-based operations for test coordination
  • Use real-time subscriptions for responsive test monitoring
  • Take advantage of automatic cleanup for test isolation

Production

  • Use RedisMessagingBackend for persistence and scalability
  • Configure appropriate message TTL to manage storage
  • Use MultiThreadedEngine or RayExecutionEngine for concurrency
  • Monitor Redis memory usage and performance

Message Design

  • Keep payloads reasonably sized (< 1MB recommended)
  • Use meaningful topic names with clear hierarchy
  • Include correlation IDs for request/response patterns
  • Leverage message threading for conversation tracking

Error Handling

Message Validation

  • Pydantic-based validation for message structure
  • Automatic type checking and conversion
  • Clear error messages for invalid data

Backend Failures

  • MessagingInterface handles backend initialization errors
  • Graceful degradation when backend unavailable
  • Proper cleanup on shutdown

Client Disconnection

  • Automatic cleanup of subscriptions
  • Resource cleanup on client disconnect
  • Graceful handling of client failures

Monitoring and Observability

Built-in Logging

  • Structured logging for message flow
  • Debug-level logging for detailed tracing
  • Error logging for failure scenarios

OpenTelemetry Support

  • Trace context propagation via traceparent field
  • Distributed tracing across agent interactions
  • Integration with observability platforms
  • Message throughput per topic
  • Client subscription counts
  • Backend performance metrics
  • Error rates and types

See the Execution section for how messaging integrates with execution engines, Embedded Messaging Backend for detailed information about cross-process messaging, and Agents for agent-specific messaging patterns.