Agent
Abstract base class for all agents.
Agents are addressable services that handle tasks over the Fame fabric. This class defines the core protocol methods every agent must implement.
Do not extend Agent directly. Instead:
- Extend :class:
BaseAgentfor standard request-response agents. - Extend :class:
BackgroundTaskAgentfor long-running background work.
Use :meth:Agent.remote or :meth:Agent.remote_by_address to create proxies
for communicating with remote agents.
Example:
from naylence.agent import BaseAgent, Payload
class EchoAgent(BaseAgent):
async def run_task(self, payload: Payload, id: str | None) -> Payload:
return payload
agent = EchoAgent("echo")
await agent.aserve("fame://echo")Extends: RpcMixin, FameService, ABC
Attributes
| Attribute | Type | Description |
|---|---|---|
name | Optional[str] | The agent’s name, used for logging and identification. |
spec | dict[str, Any] | Returns metadata about this agent (address, capabilities, etc.). |
Methods
aserve
def aserve(address: FameAddress | str, log_level: str | int | None = None, kwargs = {})Starts serving this agent at the given address (async version).
Listens for SIGINT/SIGTERM to shut down gracefully. The method returns when the agent stops serving.
Args: address: The address to serve at (e.g., ‘fame://my-agent’). log_level: Optional log level for the agent. **kwargs: Additional options passed to FameFabric.get_or_create().
authenticate
def authenticate(credentials: AuthenticationInfo) -> boolValidates authentication credentials.
Args: credentials: The credentials to validate.
Returns: True if authentication succeeds.
broadcast
def broadcast(addresses: list[FameAddress | str], payload: Payload = None, kw = {}) -> list[tuple[str, Any | Exception]]Sends the same payload to multiple agents.
Args:
addresses: List of agent addresses.
payload: Payload to send to all agents.
**kw: Additional options passed to :meth:run_many.
Returns: List of (address, result_or_error) tuples.
cancel_task
def cancel_task(params: TaskIdParams) -> TaskCancel a running task.
Args: params: Parameters with the task id.
Returns: Task object with CANCELED status.
from_handler
def from_handler(handler: Callable[[dict[str, Any] | str | None, str | None], Awaitable[Any]]) -> 'Agent'Creates an agent from a simple handler function.
Useful for quick prototyping without defining a full agent class.
Args: handler: Async function that processes task payloads.
Returns: A new agent instance wrapping the handler.
get_agent_card
def get_agent_card() -> AgentCardReturns the agent’s card describing its capabilities and metadata.
get_push_notification_config
def get_push_notification_config(params: TaskIdParams) -> TaskPushNotificationConfigRetrieves the push notification config for a task.
Args: params: Parameters including the task ID.
Returns: The push notification configuration.
get_task_status
def get_task_status(params: TaskQueryParams) -> TaskGet the current status of a task.
Args: params: Query parameters with the task id.
Returns: Task object with current status.
register_push_endpoint
def register_push_endpoint(config: TaskPushNotificationConfig) -> TaskPushNotificationConfigRegisters a push notification endpoint for task updates.
Args: config: Push notification configuration.
Returns: The registered configuration.
remote
def remote(address: Optional[FameAddress | str] = None, capabilities: Optional[list[str]] = None, fabric: Optional[FameFabric] = None, kwargs = {}) -> 'AgentProxy[TAgent]'Creates a proxy for communicating with a remote agent.
Provide exactly one of address or capabilities.
Args: address: Direct address of the target agent. capabilities: Required capabilities for discovery. fabric: Fabric instance to use. Defaults to the current fabric.
Returns: A proxy for the remote agent.
Raises: ValueError: If both or neither of address/capabilities are provided.
Example:
proxy = Agent.remote(address="fame://echo")
result = await proxy.run_task("hello")remote_by_address
def remote_by_address(address: FameAddress | str, fabric: Optional[FameFabric] = None, kwargs = {}) -> 'AgentProxy[TAgent]'Creates a proxy for a remote agent by its address.
Args: address: The target agent’s address. fabric: Optional fabric configuration.
Returns: A proxy for the remote agent.
remote_by_capabilities
def remote_by_capabilities(capabilities: list[str], fabric: Optional[FameFabric] = None, kwargs = {}) -> 'AgentProxy[TAgent]'Creates a proxy for a remote agent by required capabilities.
Args: capabilities: Required capabilities for discovery. fabric: Optional fabric configuration.
Returns: A proxy for a matching remote agent.
run_many
def run_many(targets: Targets, fabric: FameFabric | None = None, gather_exceptions: bool = True) -> list[tuple[str, Any | Exception]]Runs tasks on multiple agents with individual payloads.
Args: targets: Iterable of (address, payload) pairs. fabric: Fabric instance to use. Defaults to the current fabric. gather_exceptions: If True (default), errors are collected alongside results. If False, the first error raises immediately.
Returns: List of (address, result_or_error) tuples, ordered like targets.
run_task
def run_task(payload: dict[str, Any] | str | None, id: str | None) -> AnyExecute a task synchronously and return the result.
Args: payload: Input data from the task request. id: Task identifier.
Returns: Result to include in the task completion message.
serve
def serve(address: FameAddress | str, kwargs: Any = {})Starts serving this agent at the given address (sync entry point).
If there is already an event loop running, schedules the coroutine on it and returns a Task. Otherwise, blocks in asyncio.run() until finished.
Args:
address: The address to serve at.
**kwargs: Options passed to :meth:aserve.
Returns: An asyncio.Task if a loop is running, otherwise None.
start_task
def start_task(params: TaskSendParams) -> TaskInitiates a new task.
Args: params: Task parameters including message and optional metadata.
Returns: The created task with its initial status.
subscribe_to_task_updates
def subscribe_to_task_updates(params: TaskSendParams) -> AsyncIterator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]Subscribe to live task status and artifact updates.
Args: params: Parameters with the task id.
Yields: TaskStatusUpdateEvent or TaskArtifactUpdateEvent objects.
unsubscribe_task
def unsubscribe_task(params: TaskIdParams) -> AnyStop receiving updates for a task.
Args: params: Parameters with the task id.
Returns: Server acknowledgment.