BaseAgent
Standard implementation of the :class:Agent protocol.
Provides built-in state management, message handling, and task lifecycle support.
Extend this class and override :meth:run_task to implement your agent logic.
BaseAgent handles:
- JSON-RPC message routing
- State persistence with Pydantic validation
- Task creation from run_task results
- Graceful shutdown via SIGINT/SIGTERM
For background/async task execution, use :class:BackgroundTaskAgent instead.
Args: name: Agent name. Defaults to snake_case of the class name. state_model: State model class for typed state management. state_namespace: Namespace for state storage. Defaults to agent name. state_key: Key under which state is stored. Defaults to ‘state’. state_factory: Factory function to create initial state.
Example:
class CounterState(BaseAgentState):
count: int = 0
class CounterAgent(BaseAgent[CounterState]):
STATE_MODEL = CounterState
async def run_task(self, payload, id):
async with self.state as s:
s.count += 1
return s.count
agent = CounterAgent("counter")
await agent.aserve("fame://counter")Extends: Agent, Generic[StateT]
Constructor
def __init__(name: str | None = None, state_model: type[BaseModel] | None = None, state_namespace: str | None = None, state_key: str = 'state', state_factory = None)Creates a new BaseAgent.
Args: name: Agent name. Defaults to snake_case of the class name. state_model: State model class for typed state management. state_namespace: Namespace for state storage. Defaults to agent name. state_key: Key under which state is stored. Defaults to ‘state’. state_factory: Callable that creates default state.
Attributes
| Attribute | Type | Description |
|---|---|---|
STATE_MODEL | `type[BaseModel] | None` |
address | Optional[FameAddress] | The address this agent is registered at. |
capabilities | Any | Capabilities advertised by this agent. |
name | Optional[str] | The agent’s name. |
spec | Dict | Returns metadata about this agent. |
state | AsyncContextManager[StateT] | Async context manager for exclusive state access. |
storage_provider | Optional[StorageProvider] | Storage provider for state persistence. |
Methods
aserve
def aserve(address: FameAddress | str, log_level: str | int | None = None, kwargs = {})Start the agent and register it at the given address.
This is the main entry point for running an agent. It handles signal registration for graceful shutdown.
Args: address: Fame address to register at (e.g., ‘fame://my-agent’). log_level: Optional logging level override. **kwargs: Additional arguments passed to parent aserve.
Returns: Async context manager for the serving agent.
authenticate
def authenticate(credentials: AuthenticationInfo) -> boolValidate authentication credentials.
Override to implement custom authentication logic.
Args: credentials: Authentication info from the caller.
Returns: True if authentication succeeds, False otherwise.
cancel_task
def cancel_task(params: TaskIdParams) -> TaskCancel a running task.
Args: params: Parameters with the task id.
Raises: TaskNotCancelableException: Always (not implemented in base).
clear_state
def clear_state() -> NoneDeletes all persisted state for this agent.
get_agent_card
def get_agent_card() -> AgentCardGet the agent’s metadata card.
Raises: UnsupportedOperationException: Always (not implemented in base).
get_push_notification_config
def get_push_notification_config(params: TaskIdParams) -> TaskPushNotificationConfigGet push notification configuration for a task.
Args: params: Parameters with the task id.
Raises: PushNotificationNotSupportedException: Always (not implemented).
get_state
def get_state() -> StateTRetrieves a snapshot of the current state.
Returns a point-in-time copy. For modifications, use the
:attr:state context manager instead.
Returns: The current state instance.
get_task_status
def get_task_status(params: TaskQueryParams) -> TaskGet the current status of a task.
Args: params: Query parameters with the task id.
Raises: UnsupportedOperationException: Always (not implemented in base).
handle_message
def handle_message(envelope: FameEnvelope, context: Optional[FameDeliveryContext] = None) -> Optional[FameMessageResponse | AsyncIterator[FameMessageResponse]]Internal: Process an incoming Fame envelope.
on_message
def on_message(message: Any) -> Optional[FameMessageResponse]Override to handle non-RPC messages.
Called when the agent receives a message that is not a JSON-RPC request. The default implementation logs a warning and returns None.
Args: message: The decoded message payload.
Returns: Optional response to send back.
register_push_endpoint
def register_push_endpoint(config: TaskPushNotificationConfig) -> TaskPushNotificationConfigRegister a push notification endpoint.
Args: config: Push notification configuration.
Raises: PushNotificationNotSupportedException: Always (not implemented).
run_task
def run_task(payload: dict[str, Any] | str | None, id: str | None) -> AnyExecute a task synchronously and return the result.
Override this method to implement your agent’s core logic. The return value becomes the task’s completion message.
Args: payload: Input data from the task request. id: Task identifier.
Returns: Result to include in the task completion message.
Raises: UnsupportedOperationException: If not overridden.
start_task
def start_task(params: TaskSendParams) -> TaskStart a task and return its initial status.
Routes to the appropriate implementation:
- Subclass override of start_task
- Fallback to run_task for synchronous execution
Args: params: Task parameters including id and input message.
Returns: Task object with status.
Raises: NotImplementedError: If neither start_task nor run_task is implemented.
subscribe_to_task_updates
def subscribe_to_task_updates(params: TaskSendParams) -> AsyncIterator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]Subscribe to task status updates.
Default implementation polls :meth:get_task_status every 500ms
until a terminal state is reached.
Args: params: Parameters with the task id.
Yields: TaskStatusUpdateEvent on each state change.
unsubscribe_task
def unsubscribe_task(params: TaskIdParams) -> AnyUnsubscribe from task updates.
Args: params: Parameters with the task id.
Raises: UnsupportedOperationException: Always (not implemented in base).