Messaging¶
The messaging system in Rustic AI enables asynchronous, topic-based communication between agents and guilds. It is designed for scalability, flexibility, and extensibility.
Purpose¶
- Facilitate communication between agents, guilds, and external systems
- Support topic-based publish/subscribe messaging
- Enable routing, filtering, and transformation of messages
Key Concepts¶
- Topics: Channels for message delivery. Agents and guilds subscribe to topics to receive relevant messages.
- Messages: Structured data packets exchanged between agents. Support for custom formats and priorities.
- Routing Slip: An ordered list of routing rules that define a multi-step processing pipeline.
- Delivery Semantics: At-least-once by default, with configurable exactly-once support when the broker provides deduplication.
- Clients: Agents interact with the messaging system via client interfaces.
Message Schema¶
Every Message
instance serialises to the following canonical JSON structure:
{
"id": "8e25e068-...",
"sender": "agent-1",
"recipient": "agent-2", // optional for broadcast
"topic": "updates",
"format": "application/json", // MIME-type of payload
"payload": { /* arbitrary JSON */ },
"routing_slip": [ // optional
{ "agent": "AgentB", "method": "handle" }
],
"priority": 5, // 0 (lowest) .. 9 (highest)
"timestamp": "2025-05-18T10:20:04Z",
"headers": {
"trace_id": "a1b2c3",
"auth": "jwt-token",
"correlation_id": "..."
}
}
The concrete wire-format (Avro, Protobuf, etc.) is pluggable via the serialization layer.
Routing Slip Deep-Dive¶
Routing slips unlock complex workflows by allowing a message to carry its itinerary inside itself.
- Each element is a
RoutingRule
containing the target agent, handler method, and optional timeouts. - When an agent calls
ctx.send(payload)
, the first rule is popped; the resulting message is delivered accordingly. - Agents may modify or append rules mid-flight, enabling dynamic branching.
flowchart TD
subgraph Slip [Routing Slip]
step1[AgentA.handle_request]
step2[AgentB.process]
step3[AgentC.finalise]
end
step1 --> step2 --> step3
Error Routing¶
Errors raised inside handlers are wrapped in a MessageError
payload and sent to either:
* The on_send_error
fixtures registered on the emitting agent, or
* A designated error topic (default: errors.<guild_id>
).
This allows centralised error dashboards or compensating actions.
Broker Integrations¶
Rustic AI provides reference clients for:
Broker | Library | Guarantees |
---|---|---|
In-Memory | Built-in | At-least-once (process-local) |
RabbitMQ | aio-pika |
At-least-once / ordered per queue |
Kafka | aiokafka |
At-least-once / partition-ordered |
NATS | async-nats |
At-most-once / streaming optional |
You can create a new integration by implementing the BaseBrokerClient
ABC.
Delivery Guarantees & Retries¶
- Each message carries a delivery attempt counter.
- Clients can apply exponential back-off and dead-letter routing by overriding
on_send_failure
.
Monitoring & Tracing¶
Messages propagate OpenTelemetry context by default, enabling end-to-end distributed tracing.
Message Lifecycle¶
- Creation: An agent or guild creates a message.
- Routing: The message is routed to the appropriate topic(s) and recipients.
- Delivery: Subscribed agents receive and process the message.
- Acknowledgement: Optionally, agents can acknowledge or respond to messages.
Example: Sending and Receiving Messages¶
from rustic_ai.core.messaging import Message, Client
# Agent sends a message
message = Message(
sender="agent-1",
recipient="agent-2",
topic="updates",
payload={"text": "Hello!"},
)
client.send(message)
# Agent receives a message
@agent.on_message
def handle_message(self, message):
print(f"Received: {message.payload}")
Advanced Features¶
- Priority Messaging: Messages can have priorities for ordering and delivery.
- Routing Rules: Define custom routing logic for complex workflows.
- Message Transformation: Transform message payloads in transit.
- State Updates: Messages can trigger state changes in agents or guilds.
Custom Message Formats¶
Agents can define and handle custom message formats for domain-specific communication.
See the Agents and Guilds sections for how messaging integrates with agent and guild lifecycles.