Skip to Content
Naylence Docs are in active development. Share feedback in Discord.
ReferenceAgent APIAgent API Guide

Agent API Guide

This comprehensive guide covers everything you need to know about the Naylence Agent API. You’ll learn about different ways to implement agents, how to invoke them remotely, and advanced patterns like background tasks, streaming, and multi-agent orchestration.

The Naylence Agent SDK provides consistent APIs for both TypeScript and Python. All examples in this guide show both languages side-by-side.

API Reference Documentation

For detailed API documentation generated from source code, see the language-specific references:


Overview

The Agent API is built around several core concepts:

ConceptDescription
BaseAgentThe foundation class for all agents, providing core message handling
BackgroundTaskAgentExtended agent class for long-running tasks with progress updates
Agent.remoteByAddressCreate a proxy to invoke a remote agent by its address
Agent.remoteByCapabilitiesDiscover and invoke agents by their declared capabilities
RPC OperationsDefine custom operations with the @operation decorator
StreamingReturn async generators for streaming responses

Agent Implementation Patterns

1. Basic Agent with runTask

The simplest agent pattern implements runTask to handle incoming requests and return a response synchronously.

import asyncio from typing import Any from naylence.agent import BaseAgent, configs class EchoAgent(BaseAgent): async def run_task(self, payload: Any, id: Any) -> Any: return payload if __name__ == "__main__": asyncio.run( EchoAgent().aserve("echo@fame.fabric", root_config=configs.NODE_CONFIG) )

2. Function as Agent

For simple use cases, you can wrap a plain function as an agent without creating a class:

import asyncio from datetime import datetime, timezone from typing import Any from naylence.fame.core import FameFabric from naylence.agent import Agent async def main(): async with FameFabric.create() as fabric: # Define a simple handler function async def time_agent(payload: Any, id: Any) -> Any: return datetime.now(timezone.utc).isoformat() # Wrap it as an agent and serve agent_address = await fabric.serve(Agent.from_handler(time_agent)) # Invoke it remote = Agent.remote_by_address(agent_address) result = await remote.run_task(payload="Hello") print(f"Time: {result}") if __name__ == "__main__": asyncio.run(main())

3. RPC Operations with Custom Methods

Use the @operation decorator to expose custom methods on your agent. This is ideal for agents that provide multiple related operations.

import asyncio from naylence.fame.core import FameFabric from naylence.fame.service import operation from naylence.agent import Agent, BaseAgent class MathAgent(BaseAgent): @operation # Exposed as "add" async def add(self, x: int, y: int) -> int: return x + y @operation(name="multiply") # Exposed as "multiply" (custom name) async def multi(self, x: int, y: int) -> int: return x * y async def main(): async with FameFabric.create() as fabric: agent_address = await fabric.serve(MathAgent()) agent = Agent.remote_by_address(agent_address) # Call the operations print(await agent.add(x=3, y=4)) # 7 print(await agent.multiply(x=6, y=7)) # 42 if __name__ == "__main__": asyncio.run(main())

4. Streaming Responses

For operations that produce multiple values over time (like LLM token streaming or progress updates), use async generators with streaming: true:

import asyncio from naylence.fame.core import FameFabric from naylence.fame.service import operation from naylence.agent import Agent, BaseAgent class StreamingAgent(BaseAgent): @operation(name="fib_stream", streaming=True) async def fib(self, n: int): """Generate first n Fibonacci numbers as a stream.""" a, b = 0, 1 for _ in range(n): yield a a, b = b, a + b async def main(): async with FameFabric.create() as fabric: agent_address = await fabric.serve(StreamingAgent()) agent = Agent.remote_by_address(agent_address) # Consume the stream async for value in await agent.fib_stream(n=10, _stream=True): print(value, end=" ") # Output: 0 1 1 2 3 5 8 13 21 34 if __name__ == "__main__": asyncio.run(main())

5. A2A Task Model (Agent-to-Agent)

For structured task workflows, implement startTask and getTaskStatus. This follows the A2A (Agent-to-Agent) protocol:

import asyncio from naylence.fame.core import FameFabric from naylence.agent import ( Agent, BaseAgent, Task, TaskSendParams, TaskState, make_task, make_task_params, first_text_part, ) class GreetingAgent(BaseAgent): async def start_task(self, params: TaskSendParams) -> Task: return make_task( id=params.id, state=TaskState.COMPLETED, payload="Hello! đź‘‹", ) async def main(): async with FameFabric.create() as fabric: agent_address = await fabric.serve(GreetingAgent("greeter")) remote = Agent.remote_by_address(agent_address) # Start a task result = await remote.start_task( make_task_params(id="task-1", payload="Hi there!") ) print(f"State: {result.status.state}") print(f"Reply: {first_text_part(result.status.message)}") if __name__ == "__main__": asyncio.run(main())

6. Background Tasks with Progress Updates

For long-running operations, extend BackgroundTaskAgent. This provides automatic task state management and progress updates:

import asyncio from naylence.agent import ( Artifact, BackgroundTaskAgent, DataPart, TaskSendParams, configs, ) class ProcessingAgent(BackgroundTaskAgent): async def run_background_task(self, params: TaskSendParams) -> None: """Long-running task with progress updates.""" for i in range(1, 6): await asyncio.sleep(0.5) # Emit progress artifact await self.update_task_artifact( params.id, Artifact(parts=[DataPart(data={"progress": f"step {i}/5"})]), ) # Task auto-completes when this method returns if __name__ == "__main__": asyncio.run( ProcessingAgent().aserve( "processor@fame.fabric", root_config=configs.NODE_CONFIG ) )

7. Cancellable Tasks

Background tasks can be cancelled by clients. Check the task state periodically to handle cancellation gracefully:

import asyncio from naylence.agent import ( Artifact, BackgroundTaskAgent, DataPart, TaskSendParams, TaskState, configs, ) class CancellableAgent(BackgroundTaskAgent): async def run_background_task(self, params: TaskSendParams) -> None: for i in range(1, 11): # Check if task was cancelled task_state = await self.get_task_state(params.id) if task_state == TaskState.CANCELED: print(f"Task {params.id} was cancelled at step {i}") break progress = i / 10 await self.update_task_artifact( params.id, Artifact(parts=[DataPart(data={"progress": progress})]), ) await asyncio.sleep(0.5) if __name__ == "__main__": asyncio.run( CancellableAgent().aserve( "cancellable@fame.fabric", root_config=configs.NODE_CONFIG ) )

Client Invocation Patterns

1. Remote by Address

The most direct way to invoke an agent—use its known address:

from naylence.fame.core import FameFabric from naylence.agent import Agent, configs async with FameFabric.create(root_config=configs.CLIENT_CONFIG): remote = Agent.remote_by_address("echo@fame.fabric") result = await remote.run_task("Hello, World!") print(result)

2. Remote by Capabilities (Service Discovery)

Discover agents by their declared capabilities instead of hardcoding addresses:

from naylence.fame.core import FameFabric, AGENT_CAPABILITY from naylence.agent import Agent, configs MATH_CAPABILITY = "math.operations" async with FameFabric.create(root_config=configs.CLIENT_CONFIG): # Find any agent with these capabilities math_agent = Agent.remote_by_capabilities([ AGENT_CAPABILITY, MATH_CAPABILITY, ]) print(await math_agent.add(x=3, y=4)) print(await math_agent.multiply(x=6, y=7))

Agent side: Declare capabilities by overriding the capabilities property:

from naylence.fame.core import AGENT_CAPABILITY from naylence.agent import BaseAgent MATH_CAPABILITY = "math.operations" class MathAgent(BaseAgent): @property def capabilities(self): return [AGENT_CAPABILITY, MATH_CAPABILITY] # ... operations ...

3. Subscribing to Task Updates

For background tasks, subscribe to real-time status and artifact updates:

import asyncio from naylence.fame.core import FameFabric, generate_id from naylence.agent import ( Agent, DataPart, TaskArtifactUpdateEvent, TaskStatusUpdateEvent, configs, make_task_params, ) async def main(): async with FameFabric.create(root_config=configs.CLIENT_CONFIG): agent = Agent.remote_by_address("processor@fame.fabric") task_id = generate_id() # Start the task await agent.start_task(make_task_params(id=task_id)) # Subscribe to updates updates = await agent.subscribe_to_task_updates( make_task_params(id=task_id) ) async for evt in updates: if isinstance(evt, TaskStatusUpdateEvent): print(f"[STATUS] {evt.status.state}") elif isinstance(evt, TaskArtifactUpdateEvent): part = evt.artifact.parts[0] if isinstance(part, DataPart): print(f"[DATA] {part.data}") if __name__ == "__main__": asyncio.run(main())

4. Broadcast to Multiple Agents

Send the same payload to multiple agents in parallel:

from naylence.agent import Agent # Send to multiple agents and collect results result = await Agent.broadcast( ["summarizer@fame.fabric", "sentiment@fame.fabric"], payload="This is the text to analyze.", ) # result is a list of (address, response) tuples for address, response in result: print(f"{address}: {response}")

Multi-Agent Patterns

Agent-to-Agent Delegation

Agents can invoke other agents to delegate work:

from naylence.agent import Agent, BaseAgent SUMMARIZER_ADDR = "summarizer@fame.fabric" SENTIMENT_ADDR = "sentiment@fame.fabric" class AnalysisAgent(BaseAgent): async def run_task(self, payload, id): # Delegate to multiple specialized agents result = await Agent.broadcast( [SUMMARIZER_ADDR, SENTIMENT_ADDR], payload, ) return { "summary": result[0][1], "sentiment": result[1][1], }

Ping-Pong Pattern

Agents can forward requests to each other:

from naylence.fame.core import FameAddress from naylence.agent import ( Agent, BaseAgent, Task, TaskSendParams, TaskState, make_task, first_text_part, ) class PongAgent(BaseAgent): async def start_task(self, params: TaskSendParams) -> Task: incoming_text = first_text_part(params.message) return make_task( id=params.id, state=TaskState.COMPLETED, payload=f"Pong: {incoming_text}", ) class PingAgent(BaseAgent): def __init__(self, name: str, pong_address: FameAddress): super().__init__(name) self._pong_address = pong_address async def start_task(self, params: TaskSendParams) -> Task: # Forward to PongAgent pong_proxy = Agent.remote_by_address(self._pong_address) return await pong_proxy.start_task(params)

Stateful Conversations

For multi-turn conversations, track state per task ID:

from typing import Dict, List from pydantic import BaseModel, Field from naylence.fame.service import operation from naylence.agent import ( BaseAgent, Task, TaskSendParams, TaskState, make_task, first_data_part, ) class ConversationState(BaseModel): system_prompt: str history: List[Dict[str, str]] = Field(default_factory=list) max_history_length: int = 10 class ChatAgent(BaseAgent): def __init__(self): super().__init__() self._states: Dict[str, ConversationState] = {} async def start_task(self, params: TaskSendParams) -> Task: """Initialize a new conversation.""" if params.id in self._states: raise ValueError(f"Duplicate task: {params.id}") self._states[params.id] = ConversationState.model_validate( first_data_part(params.message) ) return make_task(id=params.id, state=TaskState.WORKING, payload="") @operation async def run_turn(self, task_id: str, user_message: str) -> str: """Execute a single conversation turn.""" state = self._states.get(task_id) if not state: raise ValueError(f"Invalid task: {task_id}") state.history.append({"role": "user", "content": user_message}) # ... call LLM here ... answer = "LLM response" state.history.append({"role": "assistant", "content": answer}) return answer @operation async def end_conversation(self, task_id: str): """Clean up conversation state.""" self._states.pop(task_id, None)

Push Notifications

For webhook-style notifications, agents can send updates to external URLs:

from naylence.agent import ( Agent, BackgroundTaskAgent, PushNotificationConfig, TaskPushNotificationConfig, ) # Client: Configure push endpoint before starting task agent = Agent.remote_by_address("processor@fame.fabric") task_id = generate_id() await agent.register_push_endpoint( TaskPushNotificationConfig( id=task_id, pushNotificationConfig=PushNotificationConfig( url="https://your-webhook.example.com/notifications" ), ) ) await agent.run_task(id=task_id)

Task States

The A2A task model uses the following states:

StateDescription
SUBMITTEDTask has been received but not yet started
WORKINGTask is actively being processed
INPUT_REQUIREDTask is waiting for additional input
COMPLETEDTask finished successfully
CANCELEDTask was cancelled by the client
FAILEDTask encountered an error
UNKNOWNTask state cannot be determined

Deployment Configurations

The SDK provides pre-built configurations for different deployment scenarios:

ConfigUse Case
CLIENT_CONFIGLightweight client that connects to a remote sentinel
NODE_CONFIGAgent node that connects to and registers with a sentinel
SENTINEL_CONFIGSentinel node that accepts connections and routes messages
from naylence.agent import configs # Client: Connect to remote agents configs.CLIENT_CONFIG # Agent node: Register with sentinel configs.NODE_CONFIG # Sentinel: Accept connections configs.SENTINEL_CONFIG

Message Handling

For simple fire-and-forget messaging (non-request/response), override onMessage:

from typing import Any from naylence.agent import BaseAgent class NotificationHandler(BaseAgent): async def on_message(self, message: Any) -> Any: print(f"Received notification: {message}") # No return value needed for fire-and-forget

Summary

The Naylence Agent API provides a flexible foundation for building distributed agent systems:

  • Simple agents with runTask for basic request/response
  • Function handlers for lightweight, single-purpose agents
  • RPC operations with @operation for multi-method agents
  • Streaming for real-time data generation
  • Background tasks for long-running operations with progress
  • Capability-based discovery for dynamic agent location
  • Multi-agent orchestration with broadcast and delegation
  • Stateful conversations for multi-turn interactions

Choose the pattern that best fits your use case, and combine them as needed for complex workflows.

Last updated on