Agents¶
Agents are the fundamental, autonomous, and message-driven entities in Rustic AI. They encapsulate specific logic, maintain their own state, communicate with other agents via asynchronous messages, and collaborate within a Guild to achieve complex tasks. Agents can represent automated processes (bots) or act as proxies for human users.
Prerequisites: Familiarity with Guilds, Messaging, State Management, and Dependencies is recommended.
Table of Contents¶
- Core Concepts
- Defining an Agent Class
- Agent Specification (AgentSpec)
- The AgentMetaclass and its Magic
- Message Handling
- Agent Fixtures and Modifiers
- Dependency Injection in Handlers
- Agent State Management
- Error Handling Strategies
- Testing Agents in Isolation
- Agent Mixins: Behind-the-Scenes Support
- Advanced Topics
Core Concepts¶
- Encapsulation: Agents bundle their own logic, configuration, and state.
- Message-Driven: Agents react to incoming messages, process them, and can send new messages.
- Stateful: Agents can maintain internal state across message interactions. See State Management.
- Collaborative: Agents operate within Guilds, allowing for coordinated multi-agent workflows.
- Configurable: Agent behavior is defined by their Python class and configured via an
AgentSpec
.
Agent Types and Modes¶
Rustic AI provides two enumerations that define fundamental characteristics of agents:
AgentType¶
AgentType
specifies the nature of the agent:
from rustic_ai.core.guild import AgentType
# Usage:
class MyAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec: AgentSpec[BaseAgentProps]):
super().__init__(agent_spec, agent_type=AgentType.BOT) # Default is BOT
Value | Description |
---|---|
AgentType.BOT |
A fully automated agent that operates without human intervention (default) |
AgentType.HUMAN |
Represents a human user or requires human participation |
AgentMode¶
AgentMode
defines how the agent is executed by the execution engine:
from rustic_ai.core.guild import AgentMode
# Usage:
class MyAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec: AgentSpec[BaseAgentProps]):
super().__init__(agent_spec, agent_mode=AgentMode.LOCAL) # Default is LOCAL
Value | Description |
---|---|
AgentMode.LOCAL |
Runs in the same process as the Guild (default) |
AgentMode.REMOTE |
Runs in a separate process (potentially distributed) |
These values are used by the execution engine to determine how to instantiate and run the agent, especially in distributed or multi-process environments.
Defining an Agent Class¶
To create a custom agent, you define a Python class that inherits from rustic_ai.core.guild.Agent
. This base class, along with the powerful AgentMetaclass
, provides the core machinery for message handling, dependency injection, and lifecycle management.
Agent Properties (BaseAgentProps
)¶
Each agent can have its own set of configurable properties. These are defined in a Pydantic model that inherits from rustic_ai.core.guild.dsl.BaseAgentProps
.
from rustic_ai.core.guild import Agent, agent # agent provides the @agent.processor decorator
from rustic_ai.core.guild.dsl import BaseAgentProps, AgentSpec
# 1. Define messages models
class GreetRequest(BaseModel):
name: Optional[str]
class GreetResponse(BaseModel):
greeting: str
count: int
# 1. Define Properties Model (Optional but Recommended)
class MyGreeterAgentProps(BaseAgentProps):
greeting_prefix: str = "Hello"
default_name: str = "World"
# 2. Define the Agent Class
class GreeterAgent(Agent[MyGreeterAgentProps]): # Generic type specifies the props model
def __init__(self, agent_spec: AgentSpec[MyGreeterAgentProps]):
super().__init__(agent_spec)
# Access configured properties:
self.greeting_prefix = self.get_spec().props.greeting_prefix
self.default_name = self.get_spec().props.default_name
self.greet_count = 0
@agent.processor(clz=GreetRequest) # Handles raw string payloads
def handle_name(self, ctx: agent.ProcessContext[GreetRequest]):
name_to_greet = ctx.payload.name if ctx.payload.name else self.default_name
response = f"{self.greeting_prefix}, {name_to_greet}!"
self.greet_count += 1
# Update agent's own state (illustrative, actual state updates are more structured)
# self._state["greet_count"] = self.greet_count
ctx.send_dict(GreetResponse(greeting = response, count=count))
# To use this agent, you'd create an AgentSpec for it, often via AgentBuilder.
# from rustic_ai.core.guild.builders import AgentBuilder
# greeter_spec = AgentBuilder(GreeterAgent) \\
# .set_name("MyGreeter") \\
# .set_description("A friendly greeter agent.") \\
# .set_properties(MyGreeterAgentProps(greeting_prefix="Greetings")) \\
# .build_spec()
Agent[MyAgentPropsType]
. This tells the system about the expected structure of its properties
.
- The __init__
method must call super().__init__(agent_spec)
.
- The agent_spec
(an instance of AgentSpec[MyAgentPropsType]
) provides access to configured properties via self.get_spec().props
.
Agent Specification (AgentSpec
)¶
As introduced in Guilds, an AgentSpec
is a data structure (typically created via AgentBuilder
or loaded from YAML/JSON) that defines an agent's configuration. Key fields relevant from an agent's perspective:
id
(str): Unique identifier.name
(str): Human-readable name.description
(str): Purpose of the agent.class_name
(str): The fully qualified Python class name of the agent (e.g.,"my_project.agents.GreeterAgent"
).properties
(Pydantic Model | Dict): An instance of the agent's properties model (e.g.,MyGreeterAgentProps
) or a dictionary that can be validated into it. This is how you customize an agent instance.additional_topics
(List[str]): Specific message topics the agent subscribes to, beyond the default guild topic.listen_to_default_topic
(bool): IfTrue
(default), the agent listens to messages on the guild's default topic.dependency_map
(Dict[str, DependencySpec]): Agent-specific dependencies. These can override or supplement guild-level dependencies.act_only_when_tagged
(bool): IfTrue
, the agent will only process messages where itsAgentTag
(ID or name) is explicitly included in the message'srecipient_list
. This is useful for targeted communication in busy topics.-
predicates
(Dict[str, SimpleRuntimePredicate]): A dictionary mapping method names (of@processor
decorated methods) toSimpleRuntimePredicate
objects. A predicate contains a JSONata expression that is evaluated against the incoming message, agent state, and guild state. The handler method is only invoked if the predicate evaluates to true.// Example predicate in AgentSpec (conceptual) "predicates": { "handle_urgent_task": { "expression": "message.priority > 7 and agent_state.is_available" } }
The AgentMetaclass
and its Magic¶
The Agent
base class uses AgentMetaclass
. This metaclass works behind the scenes during agent class definition to automate several setup tasks:
- Handler Registration: It inspects the agent class for methods decorated with
@agent.processor(...)
and registers them as message handlers. - Fixture Registration: It finds methods decorated with
@AgentFixtures.*
and registers them as lifecycle hooks or message modifiers. - Properties Type Inference: It determines the agent's specific properties type (e.g.,
MyGreeterAgentProps
) from the generic type hint (Agent[MyGreeterAgentProps]
). - Dependency Analysis: It can analyze
@processor
methods fordepends_on
arguments to understand their dependencies. - Mixin Injection: It automatically includes default mixins (e.g.,
StateRefresherMixin
,HealthMixin
,TelemetryMixin
) that provide common cross-cutting concerns.
This automation reduces boilerplate and allows developers to focus on the agent's core logic.
Message Handling¶
Agents are fundamentally message-driven. The process of an agent receiving and handling a message involves several steps, orchestrated by the AgentMetaclass
and the Agent
base class.
The @agent.processor
Decorator¶
Methods that handle incoming messages are decorated with @agent.processor
.
from pydantic import BaseModel
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.messaging.core import JsonDict # For raw JSON
class MyData(BaseModel):
value: str
count: int
class MyOtherAgent(Agent[BaseAgentProps]):
# Handles messages where payload is MyData
@agent.processor(clz=MyData)
def handle_my_data(self, ctx: ProcessContext[MyData]):
data_object = ctx.payload # data_object is an instance of MyData
# ... process data_object ...
ctx.send(AnotherMessage(...))
# Handles messages where payload is a raw JSON dictionary
@agent.processor(clz=JsonDict)
def handle_any_json(self, ctx: ProcessContext[JsonDict]):
raw_payload = ctx.payload # raw_payload is a dict
# ... process raw_payload ...
ctx.send_dict({"status": "processed raw json"})
# Handles messages of MyData type only on "essential" topics,
# and also injects a 'db_connection' dependency.
@agent.processor(clz=MyData, handle_essential=True, depends_on=["db_connection"])
def handle_essential_data(self, ctx: ProcessContext[MyData], db_connection: Any):
# This handler will run for MyData messages on system/status topics
# even if the agent doesn't explicitly subscribe to them.
# db_connection is resolved and injected.
pass
clz
: Specifies the expected Pydantic model for the message payload. If the incoming message'sformat
matches the qualified name of this model, the payload is automatically parsed and validated into an instance of this model. UseJsonDict
to receive the payload as a raw dictionary without parsing.handle_essential
(bool, defaultFalse
): IfTrue
, this handler can process messages on essential guild topics (like status or system topics), even if the agent isn't explicitly subscribed to them or if the message isn't directly addressed to it.depends_on
(List[str | AgentDependency], optional): A list of dependency keys. These dependencies are resolved at runtime and injected as arguments into the handler method. See Dependency Injection in Handlers.predicate
(Callable[[Agent, Message], bool], optional, less common): An older style of predicate, a callable that takes the agent instance and the raw message and returnsTrue
if the handler should run. PreferAgentSpec.predicates
for declarative JSONata-based predicates.
Asynchronous Handlers¶
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine()
method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run()
to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
Incoming Message Flow¶
- Delivery: A message is delivered to the agent's internal
_on_message(self, message: Message)
method by the messaging system. - Handler Discovery:
- The
Agent
base class, using handler maps prepared byAgentMetaclass
, identifies applicable@processor
methods. - This matching considers:
- The
format
string in theMessage
against theclz
argument of@processor
. - If
agent_spec.act_only_when_tagged
isTrue
, the message must explicitly tag the agent. - The
handle_essential
flag of the processor if the message is on an essential topic.
- The
- The
- Processing Each Applicable Handler: For each matched handler method:
- Runtime Predicate Check: If a predicate is defined for this handler in
agent_spec.predicates
, it's evaluated. If it returnsFalse
, the handler is skipped. - Dependency Resolution: Dependencies specified in
@processor(depends_on=[...])
are resolved. ProcessContext
Creation: AProcessContext
instance (ctx
) is created, providing access to the message, payload, agent state, and sending capabilities.before_process
Fixtures: Any methods decorated with@AgentFixtures.before_process
are executed.- Handler Invocation: The handler method is called with
self
(the agent instance),ctx
(theProcessContext
), and any resolved dependencies as arguments. after_process
Fixtures: Any methods decorated with@AgentFixtures.after_process
are executed.- Error Handling: If the handler raises an unhandled exception, it's caught, wrapped in an
ErrorMessage
, and sent out (triggeringon_send_error
fixtures). The exception might also be bubbled to the execution engine.
- Runtime Predicate Check: If a predicate is defined for this handler in
The ProcessContext
(ctx
)¶
The ProcessContext
is passed as the second argument (after self
) to every message handler and provides essential tools for message processing:
ctx.payload
->MDT
(MessageDataType):- If the
@processor
specified a Pydantic model (e.g.,clz=MyData
),ctx.payload
is an instance of that model, automatically parsed and validated from the message's payload. - If
clz=JsonDict
,ctx.payload
is the raw dictionary payload.
- If the
ctx.message
->Message
: The full incomingMessage
object, including headers, sender/recipient info, topic,routing_slip
, etc.ctx.agent
->Agent
: The instance of the agent processing the message.ctx.method_name
->str
: The name of the handler method currently being executed.ctx.update_context(updates: JsonDict)
: Updates a temporary, per-message-processing-flow, session-like state. This context is passed along if messages are sent viactx.send()
and the routing rules dictate context propagation.ctx.get_context() -> JsonDict
: Retrieves the current per-message-flow context.ctx.add_routing_step(routing_entry: RoutingRule)
: Adds a newRoutingRule
to theRoutingSlip
of the original incoming message. This can dynamically alter the message's future path if it's part of a multi-step workflow.-
Sending Methods:
ctx.send(payload: BaseModel, new_thread: bool = False, forwarding: bool = False) -> List[GemstoneID]
: Sends a new message with a PydanticBaseModel
as its payload.new_thread
: IfTrue
, starts a new message thread ID.forwarding
: IfTrue
, indicates the message is being forwarded, which can affect routing rule application (e.g.,mark_forwarded
).
ctx.send_dict(payload: JsonDict, format: str = MessageConstants.RAW_JSON_FORMAT, new_thread: bool = False, forwarding: bool = False) -> List[GemstoneID]
: Sends a new message with a raw dictionary as its payload. Theformat
string should indicate the type of the payload.ctx.send_error(payload: BaseModel) -> List[GemstoneID]
: Sends a new message marked as an error, typically using anErrorMessage
or a custom error model.
When
ctx.send*
is called: 1. Registered@AgentFixtures.on_send
(or@AgentFixtures.on_send_error
forsend_error
) fixtures are executed. 2. The routing logic (detailed in Guilds and Messaging) determines the next step(s) based on the incoming message'sRoutingSlip
or guild-level routes. 3. For each routing step, a newMessage
is constructed. 4. Registered@AgentFixtures.outgoing_message_modifier
fixtures can alter the new message before it's published. 5. The message is published via the agent's messaging client. 6. IDs of the sent messages are returned.
End-to-End Handler Example¶
from pydantic import BaseModel
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.guild.dsl import BaseAgentProps
class UserQuery(BaseModel):
text: str
user_id: str
class QueryResponse(BaseModel):
answer: str
source: str
class MyQnAAgent(Agent[BaseAgentProps]):
@agent.processor(clz=UserQuery)
def handle_query(self, ctx: ProcessContext[UserQuery]):
query_text = ctx.payload.text # Accessing typed payload
user = ctx.payload.user_id
# Illustrative: update per-message context
ctx.update_context({"current_user_id": user, "query_received_at": "timestamp"})
print(f"Agent {self.name} received query: '{query_text}' from {user}")
print(f"Original message ID: {ctx.message.id}, Topic: {ctx.message.topic_published_to}")
# Simulate processing
answer = f"Answer to '{query_text}'"
response_payload = QueryResponse(answer=answer, source=self.name)
# Send the response
# Routing will be determined by incoming message's slip or guild routes
sent_ids = ctx.send(response_payload)
print(f"Sent response message(s) with IDs: {sent_ids}")
Agent Fixtures and Modifiers (@AgentFixtures
)¶
Fixtures allow you to inject logic at various points in an agent's message processing lifecycle or when messages are sent. They are methods in your agent class decorated with specific decorators from rustic_ai.core.guild.agent.AgentFixtures
.
@AgentFixtures.before_process
:- Signature:
def my_fixture(self, ctx: ProcessContext[MDT])
- Called before each message handler (
@processor
) method is executed for a message that the handler matches.MDT
is the type the handler expects.
- Signature:
@AgentFixtures.after_process
:- Signature:
def my_fixture(self, ctx: ProcessContext[MDT])
- Called after each message handler method completes (normally or with an exception that was handled by the system).
- Signature:
@AgentFixtures.on_send
:- Signature:
def my_fixture(self, ctx: ProcessContext[Any])
- Called when
ctx.send()
orctx.send_dict()
is invoked (but notctx.send_error()
), before the routing logic determines the final outgoing message(s). Thectx
here is the context of the handler that initiated the send.
- Signature:
@AgentFixtures.on_send_error
:- Signature:
def my_fixture(self, ctx: ProcessContext[Any])
- Called when
ctx.send_error()
is invoked, before the routing logic.
- Signature:
@AgentFixtures.outgoing_message_modifier
:- Signature:
def my_fixture(self, ctx: ProcessContext[Any], message_to_be_sent: Message)
- Called for each message that is about to be published via the agent's client (resulting from
ctx.send*
calls). Allows direct modification of themessage_to_be_sent
object (e.g., adding custom headers) before it goes out.
- Signature:
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import AgentFixtures, ProcessContext, Message
from rustic_ai.core.guild.dsl import BaseAgentProps
from pydantic import BaseModel
class LoggableEvent(BaseModel):
event_data: str
class FixtureDemoAgent(Agent[BaseAgentProps]):
def __init__(self, agent_spec):
super().__init__(agent_spec)
self.processed_count = 0
self.sent_count = 0
@agent.processor(LoggableEvent)
def log_event(self, ctx: ProcessContext[LoggableEvent]):
print(f"HANDLER: Processing event: {ctx.payload.event_data}")
ctx.send(LoggableEvent(event_data="Response to " + ctx.payload.event_data))
@AgentFixtures.before_process
def count_incoming(self, ctx: ProcessContext[LoggableEvent]):
self.processed_count +=1
print(f"BEFORE_PROCESS: Message #{self.processed_count} for {ctx.method_name}. Payload: {ctx.payload}")
@AgentFixtures.after_process
def confirm_processed(self, ctx: ProcessContext[LoggableEvent]):
print(f"AFTER_PROCESS: Finished processing for {ctx.method_name}.")
@AgentFixtures.on_send
def count_outgoing(self, ctx: ProcessContext[LoggableEvent]): # Context of the calling handler
self.sent_count += 1
print(f"ON_SEND: Agent {self.name} is sending message #{self.sent_count} (initiated by {ctx.method_name}).")
@AgentFixtures.outgoing_message_modifier
def add_custom_header(self, ctx: ProcessContext[LoggableEvent], message_to_be_sent: Message):
print(f"OUTGOING_MODIFIER: Modifying message ID {message_to_be_sent.id}")
if message_to_be_sent.payload: # Ensure payload exists
message_to_be_sent.payload["modified_by"] = self.name # Example modification
Dependency Injection in Handlers¶
Message handlers can declare dependencies that are automatically resolved and injected by the framework when the handler is called. This is done using the depends_on
argument of the @agent.processor
decorator.
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from rustic_ai.core.guild.dsl import BaseAgentProps, AgentSpec, DependencySpec
# Assume MyDatabaseService and MyApiClient are defined elsewhere
# Example Dependency Resolvers (simplified, see dependencies.md for full structure)
class MyDatabaseResolver: # Implements rustic_ai.core.guild.agent_ext.depends.DependencyResolver
def __init__(self, connection_string: str):
super().__init__()
self.conn_str = connection_string
def resolve(self, guild_id, agent_id=None): return f"DBConnection({self.conn_str})" # Returns actual service
class MyApiClientResolver:
def __init__(self, api_key: str):
super().__init__()
self.api_key = api_key
def resolve(self, guild_id, agent_id=None): return f"ApiClient(key={self.api_key})"
class OrderRequest(BaseModel):
item_id: str
quantity: int
class OrderProcessorAgent(Agent[BaseAgentProps]):
@agent.processor(clz=OrderRequest, depends_on=["db", "api_client"])
def process_order(self, ctx: ProcessContext[OrderRequest], db: str, api_client: str):
# 'db' will be an instance of MyDatabaseService (or whatever the resolver returns)
# 'api_client' will be an instance of MyApiClient
print(f"Processing order for item {ctx.payload.item_id} using DB: {db} and API: {api_client}")
# ... use db and api_client ...
ctx.send_dict({"status": "order processed"})
# In AgentSpec for OrderProcessorAgent:
# "dependency_map": {
# "db": {
# "class_name": "my_project.resolvers.MyDatabaseResolver",
# "properties": {"connection_string": "prod_db_uri"}
# },
# "api_client": {
# "class_name": "my_project.resolvers.MyApiClientResolver",
# "properties": {"api_key": "secret_key"}
# }
# }
# Or these could be defined at the Guild level and inherited/overridden.
- The
depends_on
list contains string keys. - These keys must match entries in the agent's own
dependency_map
or thedependency_map
of its Guild. - The corresponding
DependencySpec
defines a resolver class and its configuration. - The resolver's
resolve()
method is called to provide the dependency instance. - The names of the parameters in the handler method must match the keys in
depends_on
.
Refer to the Dependencies documentation for a full explanation of dependency resolvers.
Agent State Management¶
Agents are often stateful. Rustic AI provides mechanisms for managing agent state persistently.
- Internal State Access: Within an agent,
self._state
holds a dictionary representing the agent's current state.self._guild_state
holds the guild's state. Direct modification of these is generally discouraged for persistence. StateRefresherMixin
: This default mixin, added byAgentMetaclass
, handles messages likeStateFetchResponse
andStateUpdateResponse
to keepself._state
andself._guild_state
synchronized with the authoritativeStateManager
.- Requesting State Changes: To persist state changes, an agent typically sends a
StateUpdateRequest
message (often to a system topic likeGuildTopics.GUILD_STATUS_TOPIC
). TheStateManager
(via a system agent likeGuildManagerAgent
or a dedicated state agent) processes this request. ProcessContext
and State: Whilectx.update_context()
modifies a per-message-flow temporary context, it does not directly persist to the agent's long-term state.
# Conceptual example of an agent requesting a state update
# (Actual implementation details may vary based on system setup)
from rustic_ai.core.state.models import StateOwner, StateUpdateRequest
# Inside a handler:
# new_state_values = {"last_processed_id": ctx.message.id, "item_count": self.item_count + 1}
# update_request = StateUpdateRequest(
# state_owner=StateOwner.AGENT,
# guild_id=self.guild_id,
# agent_id=self.id,
# update_format="MERGE_DICT", # or "REPLACE_DICT", "JMESPATH_UPDATE"
# state_update=new_state_values
# )
# # Send this request, typically to a system topic
# ctx.send_dict(payload=update_request.model_dump(), format=StateUpdateRequest.get_qualified_class_name(), ...)
For detailed information, see the State Management documentation.
Error Handling Strategies¶
Robust agents need to handle errors gracefully.
-
Standard Exceptions in Handlers:
- If a
@processor
method raises an unhandled Python exception:- The system catches the exception.
- It wraps the error details into an
ErrorMessage
(or similar). ctx.send_error()
is implicitly called with thisErrorMessage
.- This triggers any
@AgentFixtures.on_send_error
hooks. - The
ErrorMessage
is then published, often to a designated error topic or back to the sender based on routing. - The original exception might also be logged and potentially bubbled up to the
ExecutionEngine
, which could have policies for restarting the agent.
- If a
-
Custom Typed Errors (
AgentError
):- For business logic errors or expected fault conditions, it's good practice to define custom exception classes inheriting from
rustic_ai.core.guild.AgentError
(or a more specific base error if available). - Raise these custom errors from your handlers.
- Other agents can then have
@processor
methods specifically for these custom error types, allowing for targeted error handling workflows.
- For business logic errors or expected fault conditions, it's good practice to define custom exception classes inheriting from
# Create your own error class to support domain-specific error handling
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.agent import ProcessContext
from pydantic import BaseModel
# Define a custom error class based on exception or a framework base
class AgentError(Exception):
"""Base class for agent-specific exceptions"""
pass
class InsufficientStockError(AgentError):
item_id: str
requested_qty: int
available_qty: int
def __init__(self, item_id: str, requested_qty: int, available_qty: int, message: str = "Insufficient stock"):
super().__init__(message)
self.item_id = item_id
self.requested_qty = requested_qty
self.available_qty = available_qty
class OrderRequest(BaseModel):
item_id: str
quantity: int
class StockCheckerAgent(Agent[BaseAgentProps]):
@agent.processor(OrderRequest)
def check_stock(self, ctx: ProcessContext[OrderRequest]):
# ... logic to check stock ...
available = 5 # simplified
if ctx.payload.quantity > available:
raise InsufficientStockError(
item_id=ctx.payload.item_id,
requested_qty=ctx.payload.quantity,
available_qty=available
)
# ... proceed with order ...
ctx.send_dict({"status": "stock_ok"})
class ErrorHandlerAgent(Agent[BaseAgentProps]):
@agent.processor(InsufficientStockError) # Handles the specific typed error
def handle_stock_error(self, ctx: ProcessContext[InsufficientStockError]):
error_data = ctx.payload # error_data is an InsufficientStockError instance
print(f"Stock error: Item {error_data.item_id}, wanted {error_data.requested_qty}, have {error_data.available_qty}")
# ... notify user, log, etc. ...
Testing Agents in Isolation¶
Rustic AI promotes robust unit and integration testing of agents. The framework provides utilities, most notably wrap_agent_for_testing
from rustic_ai.testing.helpers
, to simplify the setup and execution of agent tests.
The wrap_agent_for_testing
helper typically handles:
- Agent Instantiation: Taking an agent instance (usually created via AgentBuilder(...).build()
).
- Dependency Setup: Allowing you to provide DependencySpec
s for the agent's dependencies, enabling the use of mock resolvers or test-specific configurations.
- Message ID Generation: Often takes a generator
(e.g., a GemstoneGenerator
instance) for creating unique message IDs during the test.
- Outgoing Message Capture: It returns the configured agent instance and a results
list, which automatically collects all messages sent by the agent under test.
Unit Testing with wrap_agent_for_testing
¶
Here's how you typically test an agent:
import pytest
import asyncio # For testing async handlers
from pydantic import BaseModel # For defining message payloads
from typing import Any, Callable # For type hinting mock callables
from rustic_ai.core.guild import Agent, agent
from rustic_ai.core.guild.builders import AgentBuilder
from rustic_ai.core.guild.dsl import AgentSpec, BaseAgentProps, DependencySpec
from rustic_ai.core.guild.agent import ProcessContext # For type hinting if needed
from rustic_ai.core.messaging.core.message import Message, AgentTag, MessageConstants
from rustic_ai.core.utils.basic_class_utils import get_qualified_class_name
from rustic_ai.core.utils.priority import Priority
from rustic_ai.core.utils.gemstone_id import GemstoneGenerator, GemstoneID # For ID generation in tests
from rustic_ai.testing.helpers import wrap_agent_for_testing # The real helper for testing agents
Example Test Setup¶
# Agent under test
class GreeterAgentProps(BaseAgentProps):
greeting: str = "Hello"
class GreetingResponse(BaseModel):
message: str
class GreeterAgent(Agent[GreeterAgentProps]):
def __init__(self, agent_spec: AgentSpec[GreeterAgentProps]):
super().__init__(agent_spec)
@agent.processor(clz=str)
def greet(self, ctx: ProcessContext[str]):
name = ctx.payload or "World"
ctx.send(GreetingResponse(message=f"{self.get_spec().props.greeting}, {name}!"))
# Test fixture
@pytest.fixture
def greeter_test_setup():
# Create agent instance
agent_instance = AgentBuilder(GreeterAgent)\
.set_name("TestGreeter")\
.set_properties(GreeterAgentProps(greeting="Greetings"))\
.build()
# Set up for testing
id_generator = GemstoneGenerator(machine_id=1)
test_agent, results = wrap_agent_for_testing(agent_instance, id_generator)
return test_agent, results, id_generator
# Actual test
def test_greeter(greeter_test_setup):
agent, results, id_generator = greeter_test_setup
# Create test message
msg = Message(
id_obj=id_generator.get_id(Priority.NORMAL),
topics=["test_topic"],
sender=AgentTag(id="test_sender", name="Tester"),
payload="Friend",
format="str" # Matches @agent.processor(clz=str)
)
# Deliver message
agent._on_message(msg)
# Verify response
assert len(results) == 1
response = GreetingResponse.model_validate(results[0].payload)
assert response.message == "Greetings, Friend!"
Key Aspects of Testing with wrap_agent_for_testing
¶
- Agent Instantiation: Pass an instance of your agent (usually from
AgentBuilder(...).build()
) towrap_agent_for_testing
. - Dependency Management:
- Provide a
dependencies
dictionary towrap_agent_for_testing
. The keys are dependency names (as used in@agent.processor(depends_on=[...])
) and values areDependencySpec
objects. - Your
DependencySpec
in the test should point to a mock resolver or a real resolver configured for a test environment. The mock resolver'sresolve()
method should return the mock object/function your agent expects. - Simulating Input: Craft
Message
objects with appropriatepayload
,format
,sender
,id_obj
(using the providedid_generator
), etc., to correctly trigger your target handler. - Invoking Handler: Call
agent_instance._on_message(input_message)
to simulate message arrival. TheAgentMetaclass
and baseAgent
logic will then find and invoke the correct@processor
. - Capturing & Asserting Output: The
results
list returned bywrap_agent_for_testing
collects allMessage
objects sent by the agent. Assert theirpayload
,format
,recipient_list
,in_response_to
field, etc.
Mocking External Calls with unittest.mock.patch
¶
If an agent makes direct external calls that are not managed via the dependency_map
(e.g., using a globally imported library or a method on an unmanaged object), you can use Python's standard unittest.mock.patch
:
from unittest.mock import patch
# Inside your test function or class:
# @patch('my_agent_module.some_external_library.some_function')
# def test_agent_with_patched_call(mock_some_function, my_agent_test_setup):
# agent_uut, captured_messages, id_generator = my_agent_test_setup
# mock_some_function.return_value = "mocked_external_data"
#
# # ... create input message and call agent_uut._on_message(...) ...
#
# mock_some_function.assert_called_once_with(...)
# # ... assert on captured_messages ...
Testing Asynchronous Handlers¶
If your agent's @processor
methods are async def
:
- The test function itself might need to be async
(e.g., using pytest-asyncio
and decorating the test with @pytest.mark.asyncio
).
- The Agent
's _on_message
method and the @agent.processor
decorator handle the execution of the async handler correctly.
- If your assertions depend on the completion of asynchronous tasks that the handler might have initiated (e.g., messages sent after an await
within the handler, or background tasks), you may need to use await asyncio.sleep()
in your test to allow these operations time to complete before making assertions on the captured_messages
list or other side effects. The Playwright agent tests demonstrate this pattern by looping with await asyncio.sleep()
while checking the results
list.
# Conceptual snippet for testing an async handler
# @pytest.mark.asyncio
# async def test_async_agent_handler(my_async_agent_test_setup):
# agent_uut, captured_messages, id_generator = my_async_agent_test_setup
# # ... prepare incoming_message ...
# agent_uut._on_message(incoming_message) # This will schedule the async handler
# # Allow time for async operations within the handler to complete
# await asyncio.sleep(0.1)
# # ... assertions on captured_messages ...
General Testing Guidelines:¶
- Focus: Test the specific logic within your agent's handlers and fixtures.
- Isolation: Use dependency injection (via
wrap_agent_for_testing
andDependencySpec
) and standard mocking techniques (likeunittest.mock.patch
) to isolate your agent from external systems for unit tests. - Variety: Test with different valid and invalid input message payloads and formats to ensure robustness.
- Error Conditions: Simulate errors from dependencies (e.g., make mock services raise exceptions or return error indicators) and verify your agent's error handling logic.
- State Changes: If your agent is stateful, assert that its internal state is updated correctly after message processing. For persistent state, you might need more integrated tests involving a test
StateManager
. - Agent-to-Agent Interactions: For testing how multiple agents interact within a workflow, consider setting up a minimal test
Guild
(e.g., usingGuildBuilder().launch(add_probe=True)
or by manually adding aProbeAgent
) to capture messages on relevant topics or observe interactions.
Agent Mixins: Behind-the-Scenes Support¶
When you define an agent class, the AgentMetaclass
automatically incorporates several default "mixin" classes into its structure. These mixins operate largely behind the scenes, providing your agent with essential cross-cutting functionalities without requiring explicit code in your agent logic.
The primary default mixins include:
-
StateRefresherMixin
: This is crucial for stateful agents. It listens for system messages (likeStateFetchResponse
andStateUpdateResponse
) from theStateManager
and ensures that the agent's local cached copies of its own state (self._state
) and the guild's state (self._guild_state
) are kept synchronized with the authoritative state. This allows you to access reasonably up-to-date state within your agent without manually fetching it on every message.This mixin provides helpful methods for state management that you can safely use in your handlers:
# Get state from the state manager self.request_state(ctx) # Request agent's own state self.request_guild_state(ctx) # Request guild state # Update state persistently self.update_state( ctx, update_format=StateUpdateFormat.MERGE_DICT, # or REPLACE_DICT, JMESPATH_UPDATE update={"counter": self.counter} # New state values ) self.update_guild_state( ctx, update_format=StateUpdateFormat.MERGE_DICT, update={"last_active_agent": self.id} )
-
HealthMixin
: This mixin provides a basic health check capability. It defines a handler for a system "ping" message (Heartbeat
), allowing the execution environment or guild management services to verify that the agent is alive and responsive. The default implementation responds withHeartbeatResponse
and status "OK". -
TelemetryMixin
: This mixin integrates the agent with the broader observability infrastructure. It helps in propagating tracing contexts across messages and emitting standard metrics, which are invaluable for monitoring and debugging distributed multi-agent interactions. It automatically adds tracing information to outgoing messages and captures spans for message processing.
Interaction and Customization:
For most agent development, you don't need to interact directly with these mixins or even be explicitly aware of their detailed implementation. They are designed to provide foundational capabilities transparently.
However, if you are an advanced user looking to deeply customize these core behaviors (e.g., changing how state is refreshed or implementing a very custom health check), you would then need to understand the specific mixin's interface and potentially override its methods or provide alternative implementations. For such advanced scenarios, referring to the source code of these mixins (typically found in rustic_ai.core.guild.agent_ext.mixins.*
) would be necessary.
Advanced Topics¶
In this section, we cover more advanced topics related to agent development and usage.
Custom Mixins¶
You can create your own mixins and let the AgentMetaclass auto-inject them. To do this:
- Define your mixin class with functionality you want to add to agents
- Add your mixin to the
DEFAULT_MIXINS
list in a custom metaclass extendingAgentMetaclass
- Use your custom metaclass when defining agent classes
Dynamic Agent Deployment¶
The GuildManagerAgent
provides capabilities for dynamic agent deployment:
- Hot-reloading agents with updated implementation or configuration
- Blue-green deployment strategies for zero-downtime updates
- Dynamic scaling by adding/removing agent instances
Performance Considerations¶
For high-performance agent systems:
- Message priority control using the
Priority
enum - Message batching for reducing overhead
- Efficient state management strategies for agents with large state
- Gemstone ID sharding for distributed systems
The @agent.processor
decorator also supports asynchronous functions with async def
. The framework will automatically detect and properly schedule these handlers:
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine()
method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run()
to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
Asynchronous Handlers¶
The @agent.processor
decorator also supports asynchronous functions with async def
. The framework will automatically detect and properly schedule these handlers:
from aiohttp import ClientSession
class AsyncAgent(Agent[BaseAgentProps]):
@agent.processor(clz=SearchQuery)
async def search_web(self, ctx: ProcessContext[SearchQuery]):
query = ctx.payload.query
# Perform asynchronous operations
async with ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={query}") as response:
result = await response.json()
# Process results and respond
ctx.send(SearchResult(results=result["items"]))
When an async handler is invoked:
1. The framework detects it's an async function via inspect.iscoroutinefunction()
2. The ProcessorHelper.run_coroutine()
method handles scheduling:
- If in an already running event loop, it creates a new task
- Otherwise, it calls asyncio.run()
to run the coroutine to completion
This allows your agent to perform non-blocking I/O operations like network requests or database queries without blocking the entire agent/guild.
clz
: Specifies the expected Pydantic model for the message payload. If the incoming message'sformat
matches the qualified name of this model, the payload is automatically parsed and validated into an instance of this model. UseJsonDict
to receive the payload as a raw dictionary without parsing.