Agents¶
Agents are the fundamental, autonomous, and message-driven entities in Rustic AI. They encapsulate specific logic, maintain their own state, communicate with other agents via asynchronous messages, and collaborate within a Guild to achieve complex tasks. Agents can represent automated processes (bots) or act as proxies for human users.
Prerequisites: Familiarity with Guilds, Messaging, State Management, and Dependencies is recommended.
Table of Contents¶
- Core Concepts
- Defining an Agent Class
- Agent Specification (AgentSpec)
- The AgentMetaclass and its Magic
- Message Handling
- Agent Fixtures and Modifiers
- Dependency Injection in Handlers
- Agent State Management
- Error Handling Strategies
- Testing Agents in Isolation
- Agent Mixins: Behind-the-Scenes Support
- Advanced Topics
Core Concepts¶
- Encapsulation: Agents bundle their own logic, configuration, and state.
- Message-Driven: Agents react to incoming messages, process them, and can send new messages.
- Stateful: Agents can maintain internal state across message interactions. See State Management.
- Collaborative: Agents operate within Guilds, allowing for coordinated multi-agent workflows.
- Configurable: Agent behavior is defined by their Python class and configured via an
AgentSpec.
Agent Types and Modes¶
Rustic AI provides two enumerations that define fundamental characteristics of agents:
AgentType¶
AgentType specifies the nature of the agent:
from rustic_ai.core.guild import AgentType
# Usage:
class MyAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec: AgentSpec[BaseAgentProps]):
super().__init__(agent_spec, agent_type=AgentType.BOT) # Default is BOT
| Value | Description |
|---|---|
AgentType.BOT |
A fully automated agent that operates without human intervention (default) |
AgentType.HUMAN |
Represents a human user or requires human participation |
AgentMode¶
AgentMode defines how the agent is executed by the execution engine:
from rustic_ai.core.guild import AgentMode
# Usage:
class MyAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec: AgentSpec[BaseAgentProps]):
super().__init__(agent_spec, agent_mode=AgentMode.LOCAL) # Default is LOCAL
| Value | Description |
|---|---|
AgentMode.LOCAL |
Runs in the same process as the Guild (default) |
AgentMode.REMOTE |
Runs in a separate process (potentially distributed) |
These values are used by the execution engine to determine how to instantiate and run the agent, especially in distributed or multi-process environments.
Defining an Agent Class¶
To create a custom agent, you define a Python class that inherits from rustic_ai.core.guild.Agent. This base class, along with the powerful AgentMetaclass, provides the core machinery for message handling, dependency injection, and lifecycle management.
Agent Properties (BaseAgentProps)¶
Each agent can have its own set of configurable properties. These are defined in a Pydantic model that inherits from rustic_ai.core.guild.dsl.BaseAgentProps.
from rustic_ai.core.guild import Agent, agent # agent provides the @agent.processor decorator
from rustic_ai.core.guild.dsl import BaseAgentProps, AgentSpec
# 1. Define messages models
class GreetRequest(BaseModel):
name: Optional[str]
class GreetResponse(BaseModel):
greeting: str
count: int
# 1. Define Properties Model (Optional but Recommended)
class MyGreeterAgentProps(BaseAgentProps):
greeting_prefix: str = "Hello"
default_name: str = "World"
# 2. Define the Agent Class
class GreeterAgent(Agent[MyGreeterAgentProps]): # Generic type specifies the props model
def __init__(self, agent_spec: AgentSpec[MyGreeterAgentProps]):
super().__init__(agent_spec)
# Access configured properties:
self.greeting_prefix = self.get_spec().props.greeting_prefix
self.default_name = self.get_spec().props.default_name
self.greet_count = 0
@agent.processor(clz=GreetRequest) # Handles raw string payloads
def handle_name(self, ctx: agent.ProcessContext[GreetRequest]):
name_to_greet = ctx.payload.name if ctx.payload.name else self.default_name
response = f"{self.greeting_prefix}, {name_to_greet}!"
self.greet_count += 1
# Update agent's own state (illustrative, actual state updates are more structured)
# self._state["greet_count"] = self.greet_count
ctx.send_dict(GreetResponse(greeting = response, count=count))
# To use this agent, you'd create an AgentSpec for it, often via AgentBuilder.
# from rustic_ai.core.guild.builders import AgentBuilder
# greeter_spec = AgentBuilder(GreeterAgent) \\
# .set_name("MyGreeter") \\
# .set_description("A friendly greeter agent.") \\
# .set_properties(MyGreeterAgentProps(greeting_prefix="Greetings")) \\
# .build_spec()
Agent[MyAgentPropsType]. This tells the system about the expected structure of its properties.
- The __init__ method must call super().__init__(agent_spec).
- The agent_spec (an instance of AgentSpec[MyAgentPropsType]) provides access to configured properties via self.get_spec().props.
Agent Specification (AgentSpec)¶
As introduced in Guilds, an AgentSpec is a data structure (typically created via AgentBuilder or loaded from YAML/JSON) that defines an agent's configuration. Key fields relevant from an agent's perspective:
id(str): Unique identifier.name(str): Human-readable name.description(str): Purpose of the agent.class_name(str): The fully qualified Python class name of the agent (e.g.,"my_project.agents.GreeterAgent").properties(Pydantic Model | Dict): An instance of the agent's properties model (e.g.,MyGreeterAgentProps) or a dictionary that can be validated into it. This is how you customize an agent instance.additional_topics(List[str]): Specific message topics the agent subscribes to, beyond the default guild topic.listen_to_default_topic(bool): IfTrue(default), the agent listens to messages on the guild's default topic.dependency_map(Dict[str, DependencySpec]): Agent-specific dependencies. These can override or supplement guild-level dependencies.act_only_when_tagged(bool): IfTrue, the agent will only process messages where itsAgentTag(ID or name) is explicitly included in the message'srecipient_list. This is useful for targeted communication in busy topics.predicates(Dict[str, SimpleRuntimePredicate]): A dictionary mapping method names (of@processordecorated methods) toSimpleRuntimePredicateobjects. A predicate contains a JSONata expression that is evaluated against the incoming message, agent state, and guild state. The handler method is only invoked if the predicate evaluates to true.// Example predicate in AgentSpec (conceptual) "predicates": { "handle_urgent_task": { "expression": "message.priority > 7 and agent_state.is_available" } }
The AgentMetaclass and its Magic¶
The Agent base class uses AgentMetaclass. This metaclass works behind the scenes during agent class definition to automate several setup tasks:
- Handler Registration: It inspects the agent class for methods decorated with
@agent.processor(...)and registers them as message handlers. - Fixture Registration: It finds methods decorated with
@AgentFixtures.*and registers them as lifecycle hooks or message modifiers. - Properties Type Inference: It determines the agent's specific properties type (e.g.,
MyGreeterAgentProps) from the generic type hint (Agent[MyGreeterAgentProps]). - Dependency Analysis: It can analyze
@processormethods fordepends_onarguments to understand their dependencies. - Mixin Injection: It automatically includes default mixins (e.g.,
StateRefresherMixin,HealthMixin,TelemetryMixin) that provide common cross-cutting concerns.
This automation reduces boilerplate and allows developers to focus on the agent's core logic.
Message Handling¶
Agents are fundamentally message-driven. The process of an agent receiving and handling a message involves several steps, orchestrated by the AgentMetaclass and the Agent base class.
The @agent.processor Decorator¶
Methods that handle incoming messages are decorated with @agent.processor.
from pydantic import BaseModel
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.messaging.core import JsonDict # For raw JSON
class MyData(BaseModel):
value: str
count: int
class MyOtherAgent(Agent[BaseAgentProps]):
# Handles messages where payload is MyData
@agent.processor(clz=MyData)
def handle_my_data(self, ctx: ProcessContext[MyData]):
data_object = ctx.payload # data_object is an instance of MyData
# ... process data_object ...
ctx.send(AnotherMessage(...))
# Handles messages where payload is a raw JSON dictionary
@agent.processor(clz=JsonDict)
def handle_any_json(self, ctx: ProcessContext[JsonDict]):
raw_payload = ctx.payload # raw_payload is a dict
# ... process raw_payload ...
ctx.send_dict({"status": "processed raw json"})
# Handles messages of MyData type only on "essential" topics,
# and also injects a 'db_connection' dependency.
@agent.processor(clz=MyData, handle_essential=True, depends_on=["db_connection"])
def handle_essential_data(self, ctx: ProcessContext[MyData], db_connection: Any):
# This handler will run for MyData messages on system/status topics
# even if the agent doesn't explicitly subscribe to them.
# db_connection is resolved and injected.
pass
clz: Specifies the expected Pydantic model for the message payload. If the incoming message'sformatmatches the qualified name of this model, the payload is automatically parsed and validated into an instance of this model. UseJsonDictto receive the payload as a raw dictionary without parsing.handle_essential(bool, defaultFalse): IfTrue, this handler can process messages on essential guild topics (like status or system topics), even if the agent isn't explicitly subscribed to them or if the message isn't directly addressed to it.depends_on(List[str | AgentDependency], optional): A list of dependency keys. These dependencies are resolved at runtime and injected as arguments into the handler method. See Dependency Injection in Handlers.predicate(Callable[[Agent, Message], bool], optional, less common): An older style of predicate, a callable that takes the agent instance and the raw message and returnsTrueif the handler should run. PreferAgentSpec.predicatesfor declarative JSONata-based predicates.
Asynchronous Handlers¶
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine() method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run() to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
Incoming Message Flow¶
- Delivery: A message is delivered to the agent's internal
_on_message(self, message: Message)method by the messaging system. - Handler Discovery:
* The
Agentbase class, using handler maps prepared byAgentMetaclass, identifies applicable@processormethods. * This matching considers: * Theformatstring in theMessageagainst theclzargument of@processor. * Ifagent_spec.act_only_when_taggedisTrue, the message must explicitly tag the agent. * Thehandle_essentialflag of the processor if the message is on an essential topic. - Processing Each Applicable Handler: For each matched handler method:
* Runtime Predicate Check: If a predicate is defined for this handler in
agent_spec.predicates, it's evaluated. If it returnsFalse, the handler is skipped. * Dependency Resolution: Dependencies specified in@processor(depends_on=[...])are resolved. *ProcessContextCreation: AProcessContextinstance (ctx) is created, providing access to the message, payload, agent state, and sending capabilities. *before_processFixtures: Any methods decorated with@AgentFixtures.before_processare executed. * Handler Invocation: The handler method is called withself(the agent instance),ctx(theProcessContext), and any resolved dependencies as arguments. *after_processFixtures: Any methods decorated with@AgentFixtures.after_processare executed. * Error Handling: If the handler raises an unhandled exception, it's caught, wrapped in anErrorMessage, and sent out (triggeringon_send_errorfixtures). The exception might also be bubbled to the execution engine.
The ProcessContext (ctx)¶
The ProcessContext is passed as the second argument (after self) to every message handler and provides essential tools for message processing:
ctx.payload->MDT(MessageDataType): * If the@processorspecified a Pydantic model (e.g.,clz=MyData),ctx.payloadis an instance of that model, automatically parsed and validated from the message's payload. * Ifclz=JsonDict,ctx.payloadis the raw dictionary payload.ctx.message->Message: The full incomingMessageobject, including headers, sender/recipient info, topic,routing_slip, etc.ctx.agent->Agent: The instance of the agent processing the message.ctx.method_name->str: The name of the handler method currently being executed.ctx.update_context(updates: JsonDict): Updates a temporary, per-message-processing-flow, session-like state. This context is passed along if messages are sent viactx.send()and the routing rules dictate context propagation.ctx.get_context() -> JsonDict: Retrieves the current per-message-flow context.ctx.add_routing_step(routing_entry: RoutingRule): Adds a newRoutingRuleto theRoutingSlipof the original incoming message. This can dynamically alter the message's future path if it's part of a multi-step workflow.- Sending Methods:
*
ctx.send(payload: BaseModel, new_thread: bool = False, forwarding: bool = False) -> List[GemstoneID]: Sends a new message with a PydanticBaseModelas its payload. -new_thread: IfTrue, starts a new message thread ID. -forwarding: IfTrue, indicates the message is being forwarded, which can affect routing rule application (e.g.,mark_forwarded). *ctx.send_dict(payload: JsonDict, format: str = MessageConstants.RAW_JSON_FORMAT, new_thread: bool = False, forwarding: bool = False) -> List[GemstoneID]: Sends a new message with a raw dictionary as its payload. Theformatstring should indicate the type of the payload. *ctx.send_error(payload: BaseModel) -> List[GemstoneID]: Sends a new message marked as an error, typically using anErrorMessageor a custom error model.When
ctx.send*is called: 1. Registered@AgentFixtures.on_send(or@AgentFixtures.on_send_errorforsend_error) fixtures are executed. 2. The routing logic (detailed in Guilds and Messaging) determines the next step(s) based on the incoming message'sRoutingSlipor guild-level routes. 3. For each routing step, a newMessageis constructed. 4. Registered@AgentFixtures.outgoing_message_modifierfixtures can alter the new message before it's published. 5. The message is published via the agent's messaging client. 6. IDs of the sent messages are returned.
End-to-End Handler Example¶
from pydantic import BaseModel
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.guild.dsl import BaseAgentProps
class UserQuery(BaseModel):
text: str
user_id: str
class QueryResponse(BaseModel):
answer: str
source: str
class MyQnAAgent(Agent[BaseAgentProps]):
@agent.processor(clz=UserQuery)
def handle_query(self, ctx: ProcessContext[UserQuery]):
query_text = ctx.payload.text # Accessing typed payload
user = ctx.payload.user_id
# Illustrative: update per-message context
ctx.update_context({"current_user_id": user, "query_received_at": "timestamp"})
print(f"Agent {self.name} received query: '{query_text}' from {user}")
print(f"Original message ID: {ctx.message.id}, Topic: {ctx.message.topic_published_to}")
# Simulate processing
answer = f"Answer to '{query_text}'"
response_payload = QueryResponse(answer=answer, source=self.name)
# Send the response
# Routing will be determined by incoming message's slip or guild routes
sent_ids = ctx.send(response_payload)
print(f"Sent response message(s) with IDs: {sent_ids}")
Agent Fixtures and Modifiers (@AgentFixtures)¶
Fixtures allow you to inject logic at various points in an agent's message processing lifecycle or when messages are sent. They are methods in your agent class decorated with specific decorators from rustic_ai.core.guild.agent.AgentFixtures.
@AgentFixtures.before_process: - Signature:def my_fixture(self, ctx: ProcessContext[MDT])- Called before each message handler (@processor) method is executed for a message that the handler matches.MDTis the type the handler expects.@AgentFixtures.after_process: - Signature:def my_fixture(self, ctx: ProcessContext[MDT])- Called after each message handler method completes (normally or with an exception that was handled by the system).@AgentFixtures.on_send: - Signature:def my_fixture(self, ctx: ProcessContext[Any])- Called whenctx.send()orctx.send_dict()is invoked (but notctx.send_error()), before the routing logic determines the final outgoing message(s). Thectxhere is the context of the handler that initiated the send.@AgentFixtures.on_send_error: - Signature:def my_fixture(self, ctx: ProcessContext[Any])- Called whenctx.send_error()is invoked, before the routing logic.@AgentFixtures.outgoing_message_modifier: - Signature:def my_fixture(self, ctx: ProcessContext[Any], message_to_be_sent: Message)- Called for each message that is about to be published via the agent's client (resulting fromctx.send*calls). Allows direct modification of themessage_to_be_sentobject (e.g., adding custom headers) before it goes out.
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import AgentFixtures, ProcessContext, Message
from rustic_ai.core.guild.dsl import BaseAgentProps
from pydantic import BaseModel
class LoggableEvent(BaseModel):
event_data: str
class FixtureDemoAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec):
super().__init__(agent_spec)
self.processed_count = 0
self.sent_count = 0
@agent.processor(LoggableEvent)
def log_event(self, ctx: ProcessContext[LoggableEvent]):
print(f"HANDLER: Processing event: {ctx.payload.event_data}")
ctx.send(LoggableEvent(event_data="Response to " + ctx.payload.event_data))
@AgentFixtures.before_process
def count_incoming(self, ctx: ProcessContext[LoggableEvent]):
self.processed_count +=1
print(f"BEFORE_PROCESS: Message #{self.processed_count} for {ctx.method_name}. Payload: {ctx.payload}")
@AgentFixtures.after_process
def confirm_processed(self, ctx: ProcessContext[LoggableEvent]):
print(f"AFTER_PROCESS: Finished processing for {ctx.method_name}.")
@AgentFixtures.on_send
def count_outgoing(self, ctx: ProcessContext[LoggableEvent]): # Context of the calling handler
self.sent_count += 1
print(f"ON_SEND: Agent {self.name} is sending message #{self.sent_count} (initiated by {ctx.method_name}).")
@AgentFixtures.outgoing_message_modifier
def add_custom_header(self, ctx: ProcessContext[LoggableEvent], message_to_be_sent: Message):
print(f"OUTGOING_MODIFIER: Modifying message ID {message_to_be_sent.id}")
if message_to_be_sent.payload: # Ensure payload exists
message_to_be_sent.payload["modified_by"] = self.name # Example modification
Dependency Injection in Handlers¶
Message handlers can declare dependencies that are automatically resolved and injected by the framework when the handler is called. This is done using the depends_on argument of the @agent.processor decorator.
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.guild.dsl import BaseAgentProps, AgentSpec, DependencySpec
# Assume MyDatabaseService and MyApiClient are defined elsewhere
# Example Dependency Resolvers (simplified, see dependencies.md for full structure)
class MyDatabaseResolver: # Implements rustic_ai.core.guild.agent_ext.depends.DependencyResolver
def __init__(self, connection_string: str):
super().__init__()
self.conn_str = connection_string
def resolve(self, guild_id, agent_id=None): return f"DBConnection({self.conn_str})" # Returns actual service
class MyApiClientResolver:
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
def resolve(self, guild_id, agent_id=None): return f"ApiClient(key={self.api_key})"
class OrderRequest(BaseModel):
item_id: str
quantity: int
class OrderProcessorAgent(Agent[BaseAgentProps]):
@agent.processor(clz=OrderRequest, depends_on=["db", "api_client"])
def process_order(self, ctx: ProcessContext[OrderRequest], db: str, api_client: str):
# 'db' will be an instance of MyDatabaseService (or whatever the resolver returns)
# 'api_client' will be an instance of MyApiClient
print(f"Processing order for item {ctx.payload.item_id} using DB: {db} and API: {api_client}")
# ... use db and api_client ...
ctx.send_dict({"status": "order processed"})
# In AgentSpec for OrderProcessorAgent:
# "dependency_map": {
# "db": {
# "class_name": "my_project.resolvers.MyDatabaseResolver",
# "properties": {"connection_string": "prod_db_uri"}
# },
# "api_client": {
# "class_name": "my_project.resolvers.MyApiClientResolver",
# "properties": {"api_key": "secret_key"}
# }
# }
# Or these could be defined at the Guild level and inherited/overridden.
- The
depends_onlist contains string keys. - These keys must match entries in the agent's own
dependency_mapor thedependency_mapof its Guild. - The corresponding
DependencySpecdefines a resolver class and its configuration. - The resolver's
resolve()method is called to provide the dependency instance. - The names of the parameters in the handler method must match the keys in
depends_on.
Refer to the Dependencies documentation for a full explanation of dependency resolvers.
Agent State Management¶
Agents are often stateful. Rustic AI provides mechanisms for managing agent state persistently.
- Internal State Access: Within an agent,
self._stateholds a dictionary representing the agent's current state.self._guild_stateholds the guild's state. Direct modification of these is generally discouraged for persistence. StateRefresherMixin: This default mixin, added byAgentMetaclass, handles messages likeStateFetchResponseandStateUpdateResponseto keepself._stateandself._guild_statesynchronized with the authoritativeStateManager.- Requesting State Changes: To persist state changes, an agent typically sends a
StateUpdateRequestmessage (often to a system topic likeGuildTopics.GUILD_STATUS_TOPIC). TheStateManager(via a system agent likeGuildManagerAgentor a dedicated state agent) processes this request. ProcessContextand State: Whilectx.update_context()modifies a per-message-flow temporary context, it does not directly persist to the agent's long-term state.
# Conceptual example of an agent requesting a state update
# (Actual implementation details may vary based on system setup)
from rustic_ai.core.state.models import StateOwner, StateUpdateRequest
# Inside a handler:
# new_state_values = {"last_processed_id": ctx.message.id, "item_count": self.item_count + 1}
# update_request = StateUpdateRequest(
# state_owner=StateOwner.AGENT,
# guild_id=self.guild_id,
# agent_id=self.id,
# update_format="MERGE_DICT", # or "REPLACE_DICT", "JMESPATH_UPDATE"
# state_update=new_state_values
# )
# # Send this request, typically to a system topic
# ctx.send_dict(payload=update_request.model_dump(), format=StateUpdateRequest.get_qualified_class_name(), ...)
For detailed information, see the State Management documentation.
Error Handling Strategies¶
Robust agents need to handle errors gracefully.
-
Standard Exceptions in Handlers: * If a
@processormethod raises an unhandled Python exception: 1. The system catches the exception. 2. It wraps the error details into anErrorMessage(or similar). 3.ctx.send_error()is implicitly called with thisErrorMessage. 4. This triggers any@AgentFixtures.on_send_errorhooks. 5. TheErrorMessageis then published, often to a designated error topic or back to the sender based on routing. 6. The original exception might also be logged and potentially bubbled up to theExecutionEngine, which could have policies for restarting the agent. -
Custom Typed Errors (
AgentError): * For business logic errors or expected fault conditions, it's good practice to define custom exception classes inheriting fromrustic_ai.core.guild.AgentError(or a more specific base error if available). * Raise these custom errors from your handlers. * Other agents can then have@processormethods specifically for these custom error types, allowing for targeted error handling workflows.
# Create your own error class to support domain-specific error handling
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from pydantic import BaseModel
# Define a custom error class based on exception or a framework base
class AgentError(Exception):
"""Base class for agent-specific exceptions"""
pass
class InsufficientStockError(AgentError):
item_id: str
requested_qty: int
available_qty: int
def __init__(self, item_id: str, requested_qty: int, available_qty: int, message: str = "Insufficient stock"):
super().__init__(message)
self.item_id = item_id
self.requested_qty = requested_qty
self.available_qty = available_qty
class OrderRequest(BaseModel):
item_id: str
quantity: int
class StockCheckerAgent(Agent[BaseAgentProps]):
@agent.processor(OrderRequest)
def check_stock(self, ctx: ProcessContext[OrderRequest]):
# ... logic to check stock ...
available = 5 # simplified
if ctx.payload.quantity > available:
raise InsufficientStockError(
item_id=ctx.payload.item_id,
requested_qty=ctx.payload.quantity,
available_qty=available
)
# ... proceed with order ...
ctx.send_dict({"status": "stock_ok"})
class ErrorHandlerAgent(Agent[BaseAgentProps]):
@agent.processor(InsufficientStockError) # Handles the specific typed error
def handle_stock_error(self, ctx: ProcessContext[InsufficientStockError]):
error_data = ctx.payload # error_data is an InsufficientStockError instance
print(f"Stock error: Item {error_data.item_id}, wanted {error_data.requested_qty}, have {error_data.available_qty}")
# ... notify user, log, etc. ...
Testing Agents in Isolation¶
Rustic AI promotes robust unit and integration testing of agents. The framework provides utilities, most notably wrap_agent_for_testing from rustic_ai.testing.helpers, to simplify the setup and execution of agent tests.
The wrap_agent_for_testing helper typically handles:
- Agent Spec: Taking an agent spec (usually created via AgentBuilder(...).build_spec()).
- Dependency Setup: Allowing you to provide DependencySpecs for the agent's dependencies, enabling the use of mock resolvers or test-specific configurations.
- Message ID Generation: Often takes a generator (e.g., a GemstoneGenerator instance) for creating unique message IDs during the test.
- Outgoing Message Capture: It returns the configured agent instance and a results list, which automatically collects all messages sent by the agent under test.
Unit Testing with wrap_agent_for_testing¶
Here's how you typically test an agent:
import pytest
import asyncio # For testing async handlers
from pydantic import BaseModel # For defining message payloads
from typing import Any, Callable # For type hinting mock callables
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.builders import AgentBuilder
from rustic_ai.core.guild.dsl import AgentSpec, BaseAgentProps, DependencySpec
from rustic_ai.core.guild.agent import ProcessContext # For type hinting if needed
from rustic_ai.core.messaging.core.message import Message, AgentTag, MessageConstants
from rustic_ai.core.utils.basic_class_utils import get_qualified_class_name
from rustic_ai.core.utils.priority import Priority
from rustic_ai.core.utils.gemstone_id import GemstoneGenerator, GemstoneID # For ID generation in tests
from rustic_ai.testing.helpers import wrap_agent_for_testing # The real helper for testing agents
Example Test Setup¶
# Agent under test
class GreeterAgentProps(BaseAgentProps):
greeting: str = "Hello"
class GreetingResponse(BaseModel):
message: str
class GreeterAgent(Agent[GreeterAgentProps]):
def __init__(self, agent_spec: AgentSpec[GreeterAgentProps]):
super().__init__(agent_spec)
@agent.processor(clz=str)
def greet(self, ctx: ProcessContext[str]):
name = ctx.payload or "World"
ctx.send(GreetingResponse(message=f"{self.get_spec().props.greeting}, {name}!"))
# Test fixture
@pytest.fixture
def greeter_test_setup():
# Create agent instance
agent_instance = AgentBuilder(GreeterAgent)\
.set_name("TestGreeter")\
.set_properties(GreeterAgentProps(greeting="Greetings"))\
.build_spec()
test_agent, results = wrap_agent_for_testing(agent_instance)
return test_agent, results, id_generator
# Actual test
def test_greeter(greeter_test_setup):
agent, results, id_generator = greeter_test_setup
# Create test message
msg = Message(
id_obj=id_generator.get_id(Priority.NORMAL),
topics=["test_topic"],
sender=AgentTag(id="test_sender", name="Tester"),
payload="Friend",
format="str" # Matches @agent.processor(clz=str)
)
# Deliver message
agent._on_message(msg)
# Verify response
assert len(results) == 1
response = GreetingResponse.model_validate(results[0].payload)
assert response.message == "Greetings, Friend!"
Key Aspects of Testing with wrap_agent_for_testing¶
- Agent Spec: Pass an Spec for your agent (usually from
AgentBuilder(...).build_spec()) towrap_agent_for_testing. - Dependency Management:
- Provide a
dependenciesdictionary towrap_agent_for_testing. The keys are dependency names (as used in@agent.processor(depends_on=[...])) and values areDependencySpecobjects. - Your
DependencySpecin the test should point to a mock resolver or a real resolver configured for a test environment. The mock resolver'sresolve()method should return the mock object/function your agent expects.
- Provide a
- Simulating Input: Craft
Messageobjects with appropriatepayload,format,sender,id_obj(using the providedid_generator), etc., to correctly trigger your target handler. - Invoking Handler: Call
agent_instance._on_message(input_message)to simulate message arrival. TheAgentMetaclassand baseAgentlogic will then find and invoke the correct@processor. - Capturing & Asserting Output: The
resultslist returned bywrap_agent_for_testingcollects allMessageobjects sent by the agent. Assert theirpayload,format,recipient_list,in_response_tofield, etc.
Mocking External Calls with unittest.mock.patch¶
If an agent makes direct external calls that are not managed via the dependency_map (e.g., using a globally imported library or a method on an unmanaged object), you can use Python's standard unittest.mock.patch:
from unittest.mock import patch
# Inside your test function or class:
# @patch('my_agent_module.some_external_library.some_function')
# def test_agent_with_patched_call(mock_some_function, my_agent_test_setup):
# agent_uut, captured_messages, id_generator = my_agent_test_setup
# mock_some_function.return_value = "mocked_external_data"
#
# # ... create input message and call agent_uut._on_message(...) ...
#
# mock_some_function.assert_called_once_with(...)
# # ... assert on captured_messages ...
Testing Asynchronous Handlers¶
If your agent's @processor methods are async def:
- The test function itself might need to be async (e.g., using pytest-asyncio and decorating the test with @pytest.mark.asyncio).
- The Agent's _on_message method and the @agent.processor decorator handle the execution of the async handler correctly.
- If your assertions depend on the completion of asynchronous tasks that the handler might have initiated (e.g., messages sent after an await within the handler, or background tasks), you may need to use await asyncio.sleep() in your test to allow these operations time to complete before making assertions on the captured_messages list or other side effects. The Playwright agent tests demonstrate this pattern by looping with await asyncio.sleep() while checking the results list.
# Conceptual snippet for testing an async handler
# @pytest.mark.asyncio
# async def test_async_agent_handler(my_async_agent_test_setup):
# agent_uut, captured_messages, id_generator = my_async_agent_test_setup
# # ... prepare incoming_message ...
# agent_uut._on_message(incoming_message) # This will schedule the async handler
# # Allow time for async operations within the handler to complete
# await asyncio.sleep(0.1)
# # ... assertions on captured_messages ...
General Testing Guidelines:¶
- Focus: Test the specific logic within your agent's handlers and fixtures.
- Isolation: Use dependency injection (via
wrap_agent_for_testingandDependencySpec) and standard mocking techniques (likeunittest.mock.patch) to isolate your agent from external systems for unit tests. - Variety: Test with different valid and invalid input message payloads and formats to ensure robustness.
- Error Conditions: Simulate errors from dependencies (e.g., make mock services raise exceptions or return error indicators) and verify your agent's error handling logic.
- State Changes: If your agent is stateful, assert that its internal state is updated correctly after message processing. For persistent state, you might need more integrated tests involving a test
StateManager. - Agent-to-Agent Interactions: For testing how multiple agents interact within a workflow, consider setting up a minimal test
Guild(e.g., usingGuildBuilder().launch(add_probe=True)or by manually adding aProbeAgent) to capture messages on relevant topics or observe interactions.
Agent Mixins: Behind-the-Scenes Support¶
When you define an agent class, the AgentMetaclass automatically incorporates several default "mixin" classes into its structure. These mixins operate largely behind the scenes, providing your agent with essential cross-cutting functionalities without requiring explicit code in your agent logic.
The primary default mixins include:
StateRefresherMixin: This is crucial for stateful agents. It listens for system messages (likeStateFetchResponseandStateUpdateResponse) from theStateManagerand ensures that the agent's local cached copies of its own state (self._state) and the guild's state (self._guild_state) are kept synchronized with the authoritative state. This allows you to access reasonably up-to-date state within your agent without manually fetching it on every message.This mixin provides helpful methods for state management that you can safely use in your handlers:
# Get state from the state manager self.request_state(ctx) # Request agent's own state self.request_guild_state(ctx) # Request guild state # Update state persistently self.update_state( ctx, update_format=StateUpdateFormat.MERGE_DICT, # or REPLACE_DICT, JMESPATH_UPDATE update={"counter": self.counter} # New state values ) self.update_guild_state( ctx, update_format=StateUpdateFormat.MERGE_DICT, update={"last_active_agent": self.id} )
HealthMixin: This mixin provides a basic health check capability. It defines a handler for a system "ping" message (Heartbeat), allowing the execution environment or guild management services to verify that the agent is alive and responsive. The default implementation responds withHeartbeatResponseand status "OK".
TelemetryMixin: This mixin integrates the agent with the broader observability infrastructure. It helps in propagating tracing contexts across messages and emitting standard metrics, which are invaluable for monitoring and debugging distributed multi-agent interactions. It automatically adds tracing information to outgoing messages and captures spans for message processing.
Interaction and Customization:
For most agent development, you don't need to interact directly with these mixins or even be explicitly aware of their detailed implementation. They are designed to provide foundational capabilities transparently.
However, if you are an advanced user looking to deeply customize these core behaviors (e.g., changing how state is refreshed or implementing a very custom health check), you would then need to understand the specific mixin's interface and potentially override its methods or provide alternative implementations. For such advanced scenarios, referring to the source code of these mixins (typically found in rustic_ai.core.guild.agent_ext.mixins.*) would be necessary.
Advanced Topics¶
In this section, we cover more advanced topics related to agent development and usage.
Custom Mixins¶
You can create your own mixins and let the AgentMetaclass auto-inject them. To do this:
- Define your mixin class with functionality you want to add to agents
- Add your mixin to the
DEFAULT_MIXINSlist in a custom metaclass extendingAgentMetaclass - Use your custom metaclass when defining agent classes
Dynamic Agent Deployment¶
The GuildManagerAgent provides capabilities for dynamic agent deployment:
- Hot-reloading agents with updated implementation or configuration
- Blue-green deployment strategies for zero-downtime updates
- Dynamic scaling by adding/removing agent instances
Performance Considerations¶
For high-performance agent systems:
- Message priority control using the
Priorityenum - Message batching for reducing overhead
- Efficient state management strategies for agents with large state
- Gemstone ID sharding for distributed systems
The @agent.processor decorator also supports asynchronous functions with async def. The framework will automatically detect and properly schedule these handlers:
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine() method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run() to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
Asynchronous Handlers¶
The @agent.processor decorator also supports asynchronous functions with async def. The framework will automatically detect and properly schedule these handlers:
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine() method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run() to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
clz: Specifies the expected Pydantic model for the message payload. If the incoming message'sformatmatches the qualified name of this model, the payload is automatically parsed and validated into an instance of this model. UseJsonDictto receive the payload as a raw dictionary without parsing.