Agent API Guide
This comprehensive guide covers everything you need to know about the Naylence Agent API. You’ll learn about different ways to implement agents, how to invoke them remotely, and advanced patterns like background tasks, streaming, and multi-agent orchestration.
The Naylence Agent SDK provides consistent APIs for both TypeScript and Python. All examples in this guide show both languages side-by-side.
API Reference Documentation
For detailed API documentation generated from source code, see the language-specific references:
Overview
The Agent API is built around several core concepts:
| Concept | Description |
|---|---|
| BaseAgent | The foundation class for all agents, providing core message handling |
| BackgroundTaskAgent | Extended agent class for long-running tasks with progress updates |
| Agent.remoteByAddress | Create a proxy to invoke a remote agent by its address |
| Agent.remoteByCapabilities | Discover and invoke agents by their declared capabilities |
| RPC Operations | Define custom operations with the @operation decorator |
| Streaming | Return async generators for streaming responses |
Agent Implementation Patterns
1. Basic Agent with runTask
The simplest agent pattern implements runTask to handle incoming requests and return a response synchronously.
import asyncio
from typing import Any
from naylence.agent import BaseAgent, configs
class EchoAgent(BaseAgent):
async def run_task(self, payload: Any, id: Any) -> Any:
return payload
if __name__ == "__main__":
asyncio.run(
EchoAgent().aserve("echo@fame.fabric", root_config=configs.NODE_CONFIG)
)2. Function as Agent
For simple use cases, you can wrap a plain function as an agent without creating a class:
import asyncio
from datetime import datetime, timezone
from typing import Any
from naylence.fame.core import FameFabric
from naylence.agent import Agent
async def main():
async with FameFabric.create() as fabric:
# Define a simple handler function
async def time_agent(payload: Any, id: Any) -> Any:
return datetime.now(timezone.utc).isoformat()
# Wrap it as an agent and serve
agent_address = await fabric.serve(Agent.from_handler(time_agent))
# Invoke it
remote = Agent.remote_by_address(agent_address)
result = await remote.run_task(payload="Hello")
print(f"Time: {result}")
if __name__ == "__main__":
asyncio.run(main())3. RPC Operations with Custom Methods
Use the @operation decorator to expose custom methods on your agent. This is ideal for agents that provide multiple related operations.
import asyncio
from naylence.fame.core import FameFabric
from naylence.fame.service import operation
from naylence.agent import Agent, BaseAgent
class MathAgent(BaseAgent):
@operation # Exposed as "add"
async def add(self, x: int, y: int) -> int:
return x + y
@operation(name="multiply") # Exposed as "multiply" (custom name)
async def multi(self, x: int, y: int) -> int:
return x * y
async def main():
async with FameFabric.create() as fabric:
agent_address = await fabric.serve(MathAgent())
agent = Agent.remote_by_address(agent_address)
# Call the operations
print(await agent.add(x=3, y=4)) # 7
print(await agent.multiply(x=6, y=7)) # 42
if __name__ == "__main__":
asyncio.run(main())4. Streaming Responses
For operations that produce multiple values over time (like LLM token streaming or progress updates), use async generators with streaming: true:
import asyncio
from naylence.fame.core import FameFabric
from naylence.fame.service import operation
from naylence.agent import Agent, BaseAgent
class StreamingAgent(BaseAgent):
@operation(name="fib_stream", streaming=True)
async def fib(self, n: int):
"""Generate first n Fibonacci numbers as a stream."""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
async def main():
async with FameFabric.create() as fabric:
agent_address = await fabric.serve(StreamingAgent())
agent = Agent.remote_by_address(agent_address)
# Consume the stream
async for value in await agent.fib_stream(n=10, _stream=True):
print(value, end=" ")
# Output: 0 1 1 2 3 5 8 13 21 34
if __name__ == "__main__":
asyncio.run(main())5. A2A Task Model (Agent-to-Agent)
For structured task workflows, implement startTask and getTaskStatus. This follows the A2A (Agent-to-Agent) protocol:
import asyncio
from naylence.fame.core import FameFabric
from naylence.agent import (
Agent,
BaseAgent,
Task,
TaskSendParams,
TaskState,
make_task,
make_task_params,
first_text_part,
)
class GreetingAgent(BaseAgent):
async def start_task(self, params: TaskSendParams) -> Task:
return make_task(
id=params.id,
state=TaskState.COMPLETED,
payload="Hello! đź‘‹",
)
async def main():
async with FameFabric.create() as fabric:
agent_address = await fabric.serve(GreetingAgent("greeter"))
remote = Agent.remote_by_address(agent_address)
# Start a task
result = await remote.start_task(
make_task_params(id="task-1", payload="Hi there!")
)
print(f"State: {result.status.state}")
print(f"Reply: {first_text_part(result.status.message)}")
if __name__ == "__main__":
asyncio.run(main())6. Background Tasks with Progress Updates
For long-running operations, extend BackgroundTaskAgent. This provides automatic task state management and progress updates:
import asyncio
from naylence.agent import (
Artifact,
BackgroundTaskAgent,
DataPart,
TaskSendParams,
configs,
)
class ProcessingAgent(BackgroundTaskAgent):
async def run_background_task(self, params: TaskSendParams) -> None:
"""Long-running task with progress updates."""
for i in range(1, 6):
await asyncio.sleep(0.5)
# Emit progress artifact
await self.update_task_artifact(
params.id,
Artifact(parts=[DataPart(data={"progress": f"step {i}/5"})]),
)
# Task auto-completes when this method returns
if __name__ == "__main__":
asyncio.run(
ProcessingAgent().aserve(
"processor@fame.fabric", root_config=configs.NODE_CONFIG
)
)7. Cancellable Tasks
Background tasks can be cancelled by clients. Check the task state periodically to handle cancellation gracefully:
import asyncio
from naylence.agent import (
Artifact,
BackgroundTaskAgent,
DataPart,
TaskSendParams,
TaskState,
configs,
)
class CancellableAgent(BackgroundTaskAgent):
async def run_background_task(self, params: TaskSendParams) -> None:
for i in range(1, 11):
# Check if task was cancelled
task_state = await self.get_task_state(params.id)
if task_state == TaskState.CANCELED:
print(f"Task {params.id} was cancelled at step {i}")
break
progress = i / 10
await self.update_task_artifact(
params.id,
Artifact(parts=[DataPart(data={"progress": progress})]),
)
await asyncio.sleep(0.5)
if __name__ == "__main__":
asyncio.run(
CancellableAgent().aserve(
"cancellable@fame.fabric", root_config=configs.NODE_CONFIG
)
)Client Invocation Patterns
1. Remote by Address
The most direct way to invoke an agent—use its known address:
from naylence.fame.core import FameFabric
from naylence.agent import Agent, configs
async with FameFabric.create(root_config=configs.CLIENT_CONFIG):
remote = Agent.remote_by_address("echo@fame.fabric")
result = await remote.run_task("Hello, World!")
print(result)2. Remote by Capabilities (Service Discovery)
Discover agents by their declared capabilities instead of hardcoding addresses:
from naylence.fame.core import FameFabric, AGENT_CAPABILITY
from naylence.agent import Agent, configs
MATH_CAPABILITY = "math.operations"
async with FameFabric.create(root_config=configs.CLIENT_CONFIG):
# Find any agent with these capabilities
math_agent = Agent.remote_by_capabilities([
AGENT_CAPABILITY,
MATH_CAPABILITY,
])
print(await math_agent.add(x=3, y=4))
print(await math_agent.multiply(x=6, y=7))Agent side: Declare capabilities by overriding the capabilities property:
from naylence.fame.core import AGENT_CAPABILITY
from naylence.agent import BaseAgent
MATH_CAPABILITY = "math.operations"
class MathAgent(BaseAgent):
@property
def capabilities(self):
return [AGENT_CAPABILITY, MATH_CAPABILITY]
# ... operations ...3. Subscribing to Task Updates
For background tasks, subscribe to real-time status and artifact updates:
import asyncio
from naylence.fame.core import FameFabric, generate_id
from naylence.agent import (
Agent,
DataPart,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
configs,
make_task_params,
)
async def main():
async with FameFabric.create(root_config=configs.CLIENT_CONFIG):
agent = Agent.remote_by_address("processor@fame.fabric")
task_id = generate_id()
# Start the task
await agent.start_task(make_task_params(id=task_id))
# Subscribe to updates
updates = await agent.subscribe_to_task_updates(
make_task_params(id=task_id)
)
async for evt in updates:
if isinstance(evt, TaskStatusUpdateEvent):
print(f"[STATUS] {evt.status.state}")
elif isinstance(evt, TaskArtifactUpdateEvent):
part = evt.artifact.parts[0]
if isinstance(part, DataPart):
print(f"[DATA] {part.data}")
if __name__ == "__main__":
asyncio.run(main())4. Broadcast to Multiple Agents
Send the same payload to multiple agents in parallel:
from naylence.agent import Agent
# Send to multiple agents and collect results
result = await Agent.broadcast(
["summarizer@fame.fabric", "sentiment@fame.fabric"],
payload="This is the text to analyze.",
)
# result is a list of (address, response) tuples
for address, response in result:
print(f"{address}: {response}")Multi-Agent Patterns
Agent-to-Agent Delegation
Agents can invoke other agents to delegate work:
from naylence.agent import Agent, BaseAgent
SUMMARIZER_ADDR = "summarizer@fame.fabric"
SENTIMENT_ADDR = "sentiment@fame.fabric"
class AnalysisAgent(BaseAgent):
async def run_task(self, payload, id):
# Delegate to multiple specialized agents
result = await Agent.broadcast(
[SUMMARIZER_ADDR, SENTIMENT_ADDR],
payload,
)
return {
"summary": result[0][1],
"sentiment": result[1][1],
}Ping-Pong Pattern
Agents can forward requests to each other:
from naylence.fame.core import FameAddress
from naylence.agent import (
Agent,
BaseAgent,
Task,
TaskSendParams,
TaskState,
make_task,
first_text_part,
)
class PongAgent(BaseAgent):
async def start_task(self, params: TaskSendParams) -> Task:
incoming_text = first_text_part(params.message)
return make_task(
id=params.id,
state=TaskState.COMPLETED,
payload=f"Pong: {incoming_text}",
)
class PingAgent(BaseAgent):
def __init__(self, name: str, pong_address: FameAddress):
super().__init__(name)
self._pong_address = pong_address
async def start_task(self, params: TaskSendParams) -> Task:
# Forward to PongAgent
pong_proxy = Agent.remote_by_address(self._pong_address)
return await pong_proxy.start_task(params)Stateful Conversations
For multi-turn conversations, track state per task ID:
from typing import Dict, List
from pydantic import BaseModel, Field
from naylence.fame.service import operation
from naylence.agent import (
BaseAgent,
Task,
TaskSendParams,
TaskState,
make_task,
first_data_part,
)
class ConversationState(BaseModel):
system_prompt: str
history: List[Dict[str, str]] = Field(default_factory=list)
max_history_length: int = 10
class ChatAgent(BaseAgent):
def __init__(self):
super().__init__()
self._states: Dict[str, ConversationState] = {}
async def start_task(self, params: TaskSendParams) -> Task:
"""Initialize a new conversation."""
if params.id in self._states:
raise ValueError(f"Duplicate task: {params.id}")
self._states[params.id] = ConversationState.model_validate(
first_data_part(params.message)
)
return make_task(id=params.id, state=TaskState.WORKING, payload="")
@operation
async def run_turn(self, task_id: str, user_message: str) -> str:
"""Execute a single conversation turn."""
state = self._states.get(task_id)
if not state:
raise ValueError(f"Invalid task: {task_id}")
state.history.append({"role": "user", "content": user_message})
# ... call LLM here ...
answer = "LLM response"
state.history.append({"role": "assistant", "content": answer})
return answer
@operation
async def end_conversation(self, task_id: str):
"""Clean up conversation state."""
self._states.pop(task_id, None)Push Notifications
For webhook-style notifications, agents can send updates to external URLs:
from naylence.agent import (
Agent,
BackgroundTaskAgent,
PushNotificationConfig,
TaskPushNotificationConfig,
)
# Client: Configure push endpoint before starting task
agent = Agent.remote_by_address("processor@fame.fabric")
task_id = generate_id()
await agent.register_push_endpoint(
TaskPushNotificationConfig(
id=task_id,
pushNotificationConfig=PushNotificationConfig(
url="https://your-webhook.example.com/notifications"
),
)
)
await agent.run_task(id=task_id)Task States
The A2A task model uses the following states:
| State | Description |
|---|---|
SUBMITTED | Task has been received but not yet started |
WORKING | Task is actively being processed |
INPUT_REQUIRED | Task is waiting for additional input |
COMPLETED | Task finished successfully |
CANCELED | Task was cancelled by the client |
FAILED | Task encountered an error |
UNKNOWN | Task state cannot be determined |
Deployment Configurations
The SDK provides pre-built configurations for different deployment scenarios:
| Config | Use Case |
|---|---|
CLIENT_CONFIG | Lightweight client that connects to a remote sentinel |
NODE_CONFIG | Agent node that connects to and registers with a sentinel |
SENTINEL_CONFIG | Sentinel node that accepts connections and routes messages |
from naylence.agent import configs
# Client: Connect to remote agents
configs.CLIENT_CONFIG
# Agent node: Register with sentinel
configs.NODE_CONFIG
# Sentinel: Accept connections
configs.SENTINEL_CONFIGMessage Handling
For simple fire-and-forget messaging (non-request/response), override onMessage:
from typing import Any
from naylence.agent import BaseAgent
class NotificationHandler(BaseAgent):
async def on_message(self, message: Any) -> Any:
print(f"Received notification: {message}")
# No return value needed for fire-and-forgetSummary
The Naylence Agent API provides a flexible foundation for building distributed agent systems:
- Simple agents with
runTaskfor basic request/response - Function handlers for lightweight, single-purpose agents
- RPC operations with
@operationfor multi-method agents - Streaming for real-time data generation
- Background tasks for long-running operations with progress
- Capability-based discovery for dynamic agent location
- Multi-agent orchestration with broadcast and delegation
- Stateful conversations for multi-turn interactions
Choose the pattern that best fits your use case, and combine them as needed for complex workflows.