Skip to Content
Naylence Docs are in active development. Share feedback in Discord.

AgentProxy

Remote proxy for invoking agent methods over the network.

Created via :meth:Agent.remote or :meth:Agent.remote_by_address. Supports both synchronous task execution (:meth:run_task) and async streaming (:meth:subscribe_to_task_updates).

The proxy can target agents by:

  • Direct address (fame://agent-name)
  • Capability matching (finds any agent advertising the capability)
  • Natural language intent (future)

Args: address: Direct Fame address of the agent. capabilities: List of capabilities to match. intent_nl: Natural language description (future). fabric: Fame fabric for network communication.

Raises: ValueError: If not exactly one of address/capabilities/intent_nl is set.

Example:

proxy = await MyAgent.remote(fabric) result = await proxy.run_task({"x": 1, "y": 2})

Extends: Agent, RpcProxy, Generic[TAgent]

Constructor

def __init__(address: Optional[FameAddress] = None, capabilities: Optional[list[str]] = None, intent_nl: Optional[str] = None, fabric: FameFabric) -> None

Create an agent proxy.

Args: address: Direct Fame address of the target agent. capabilities: Capability list for capability-based routing. intent_nl: Natural language intent for semantic routing (future). fabric: Fame fabric instance for network calls.

Raises: ValueError: If not exactly one routing method is specified.

Attributes

AttributeTypeDescription
addressOptional[FameAddress]-
namestr-
specdict[str, Any]-

Methods

authenticate

def authenticate(credentials: AuthenticationInfo) -> bool

Authenticate with the remote agent.

Args: credentials: Authentication credentials.

Raises: NotImplementedError: Proxy authentication is not supported.

cancel_task

def cancel_task(params: TaskIdParams) -> Task

Cancel a task on the remote agent.

Args: params: Parameters with the task id.

Returns: Task object with CANCELED status.

get_agent_card

def get_agent_card() -> AgentCard

Fetch the remote agent’s metadata card.

Raises: NotImplementedError: Not yet implemented.

get_push_notification_config

def get_push_notification_config(params: TaskIdParams) -> TaskPushNotificationConfig

Get the push notification configuration for a task.

Args: params: Parameters with the task id.

Returns: Current push notification configuration.

get_task_status

def get_task_status(params: TaskQueryParams) -> Task

Query the status of a task on the remote agent.

Args: params: Query parameters with the task id.

Returns: Task object with current status.

register_push_endpoint

def register_push_endpoint(config: TaskPushNotificationConfig) -> TaskPushNotificationConfig

Register a push notification endpoint for task updates.

Args: config: Push notification configuration with endpoint URL.

Returns: Confirmed push notification configuration.

run_task

def run_task(payload: dict[str, Any] | str | None = None, id: str | None = None) -> Any

Execute a task on the remote agent and wait for completion.

Starts a task, subscribes to updates, and returns the final result. For long-running tasks, consider using :meth:start_task and :meth:subscribe_to_task_updates separately.

Args: payload: Input data as a dict or string. id: Optional task id (auto-generated if omitted).

Returns: The task result extracted from the completion message.

Raises: ValueError: If the task fails or returns an unrecognized part type.

start_task

def start_task(args: Any = (), kwargs: Any = {}) -> Task

subscribe_to_task_updates

def subscribe_to_task_updates(params: TaskSendParams, timeout_ms: Optional[int] = None, max_items: Optional[int] = None) -> AsyncIterator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Subscribe to live task updates from the remote agent.

Streams status and artifact events until the task completes or the timeout/max_items limit is reached.

Args: params: Parameters with the task id. timeout_ms: Optional timeout in milliseconds between events. max_items: Optional maximum number of events to receive.

Yields: TaskStatusUpdateEvent or TaskArtifactUpdateEvent objects.

unsubscribe_task

def unsubscribe_task(params: TaskIdParams) -> Any

Stop receiving updates for a task.

Args: params: Parameters with the task id.

Returns: Server acknowledgment.

Last updated on