Skip to Content
Naylence Docs are in active development. Share feedback in Discord.

BaseAgent

Standard implementation of the :class:Agent protocol.

Provides built-in state management, message handling, and task lifecycle support. Extend this class and override :meth:run_task to implement your agent logic.

BaseAgent handles:

  • JSON-RPC message routing
  • State persistence with Pydantic validation
  • Task creation from run_task results
  • Graceful shutdown via SIGINT/SIGTERM

For background/async task execution, use :class:BackgroundTaskAgent instead.

Args: name: Agent name. Defaults to snake_case of the class name. state_model: State model class for typed state management. state_namespace: Namespace for state storage. Defaults to agent name. state_key: Key under which state is stored. Defaults to ‘state’. state_factory: Factory function to create initial state.

Example:

class CounterState(BaseAgentState): count: int = 0 class CounterAgent(BaseAgent[CounterState]): STATE_MODEL = CounterState async def run_task(self, payload, id): async with self.state as s: s.count += 1 return s.count agent = CounterAgent("counter") await agent.aserve("fame://counter")

Extends: Agent, Generic[StateT]

Constructor

def __init__(name: str | None = None, state_model: type[BaseModel] | None = None, state_namespace: str | None = None, state_key: str = 'state', state_factory = None)

Creates a new BaseAgent.

Args: name: Agent name. Defaults to snake_case of the class name. state_model: State model class for typed state management. state_namespace: Namespace for state storage. Defaults to agent name. state_key: Key under which state is stored. Defaults to ‘state’. state_factory: Callable that creates default state.

Attributes

AttributeTypeDescription
STATE_MODEL`type[BaseModel]None`
addressOptional[FameAddress]The address this agent is registered at.
capabilitiesAnyCapabilities advertised by this agent.
nameOptional[str]The agent’s name.
specDictReturns metadata about this agent.
stateAsyncContextManager[StateT]Async context manager for exclusive state access.
storage_providerOptional[StorageProvider]Storage provider for state persistence.

Methods

aserve

def aserve(address: FameAddress | str, log_level: str | int | None = None, kwargs = {})

Start the agent and register it at the given address.

This is the main entry point for running an agent. It handles signal registration for graceful shutdown.

Args: address: Fame address to register at (e.g., ‘fame://my-agent’). log_level: Optional logging level override. **kwargs: Additional arguments passed to parent aserve.

Returns: Async context manager for the serving agent.

authenticate

def authenticate(credentials: AuthenticationInfo) -> bool

Validate authentication credentials.

Override to implement custom authentication logic.

Args: credentials: Authentication info from the caller.

Returns: True if authentication succeeds, False otherwise.

cancel_task

def cancel_task(params: TaskIdParams) -> Task

Cancel a running task.

Args: params: Parameters with the task id.

Raises: TaskNotCancelableException: Always (not implemented in base).

clear_state

def clear_state() -> None

Deletes all persisted state for this agent.

get_agent_card

def get_agent_card() -> AgentCard

Get the agent’s metadata card.

Raises: UnsupportedOperationException: Always (not implemented in base).

get_push_notification_config

def get_push_notification_config(params: TaskIdParams) -> TaskPushNotificationConfig

Get push notification configuration for a task.

Args: params: Parameters with the task id.

Raises: PushNotificationNotSupportedException: Always (not implemented).

get_state

def get_state() -> StateT

Retrieves a snapshot of the current state.

Returns a point-in-time copy. For modifications, use the :attr:state context manager instead.

Returns: The current state instance.

get_task_status

def get_task_status(params: TaskQueryParams) -> Task

Get the current status of a task.

Args: params: Query parameters with the task id.

Raises: UnsupportedOperationException: Always (not implemented in base).

handle_message

def handle_message(envelope: FameEnvelope, context: Optional[FameDeliveryContext] = None) -> Optional[FameMessageResponse | AsyncIterator[FameMessageResponse]]

Internal: Process an incoming Fame envelope.

on_message

def on_message(message: Any) -> Optional[FameMessageResponse]

Override to handle non-RPC messages.

Called when the agent receives a message that is not a JSON-RPC request. The default implementation logs a warning and returns None.

Args: message: The decoded message payload.

Returns: Optional response to send back.

register_push_endpoint

def register_push_endpoint(config: TaskPushNotificationConfig) -> TaskPushNotificationConfig

Register a push notification endpoint.

Args: config: Push notification configuration.

Raises: PushNotificationNotSupportedException: Always (not implemented).

run_task

def run_task(payload: dict[str, Any] | str | None, id: str | None) -> Any

Execute a task synchronously and return the result.

Override this method to implement your agent’s core logic. The return value becomes the task’s completion message.

Args: payload: Input data from the task request. id: Task identifier.

Returns: Result to include in the task completion message.

Raises: UnsupportedOperationException: If not overridden.

start_task

def start_task(params: TaskSendParams) -> Task

Start a task and return its initial status.

Routes to the appropriate implementation:

  1. Subclass override of start_task
  2. Fallback to run_task for synchronous execution

Args: params: Task parameters including id and input message.

Returns: Task object with status.

Raises: NotImplementedError: If neither start_task nor run_task is implemented.

subscribe_to_task_updates

def subscribe_to_task_updates(params: TaskSendParams) -> AsyncIterator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Subscribe to task status updates.

Default implementation polls :meth:get_task_status every 500ms until a terminal state is reached.

Args: params: Parameters with the task id.

Yields: TaskStatusUpdateEvent on each state change.

unsubscribe_task

def unsubscribe_task(params: TaskIdParams) -> Any

Unsubscribe from task updates.

Args: params: Parameters with the task id.

Raises: UnsupportedOperationException: Always (not implemented in base).

Last updated on