BackgroundTaskAgent
Base for agents that run long-running background tasks.
This class schedules work in a separate coroutine and streams status events back to subscribers. Tasks are tracked until completion and cached briefly so late subscribers can still retrieve final status.
Override :meth:run_background_task to implement your agent’s logic.
Args: max_queue_size: Maximum queued events per task (default 1000). max_task_lifetime_ms: Optional hard timeout after which tasks auto-cancel. completed_cache_size: Number of completed tasks to keep in memory. completed_cache_ttl_sec: TTL in seconds for cached completed tasks.
Example:
class SumAgent(BackgroundTaskAgent):
async def run_background_task(self, params):
await asyncio.sleep(1) # simulate work
return sum(params.message.parts[0].data)Extends: BaseAgent, ABC
Constructor
def __init__(args = (), max_queue_size: int = _DEFAULT_EVENT_QUEUE_SIZE, max_task_lifetime_ms: Optional[int] = None, completed_cache_size: int = 100, completed_cache_ttl_sec: float = 300.0)Methods
cancel_task
def cancel_task(params: TaskIdParams) -> TaskCancel a running task.
Sets the task state to CANCELED and returns the updated task.
Args: params: Parameters with the task id.
Returns: Task object with CANCELED status.
get_task_state
def get_task_state(task_id: str) -> TaskStateGet the current state of a task.
Args: task_id: The task identifier.
Returns: The task’s current state (WORKING, COMPLETED, FAILED, etc.).
get_task_status
def get_task_status(params: TaskQueryParams) -> TaskRetrieve the full status of a task.
Checks active tasks first, then the completed cache.
Args: params: Query parameters with the task id.
Returns: Task object with current status.
Raises: ValueError: If task is unknown or has expired from cache.
run_background_task
def run_background_task(params: TaskSendParams) -> AnyExecute the agent’s main logic.
Override this method to implement your background task. The return value becomes the task’s completion message. Exceptions cause the task to fail with the error message.
Args: params: Task parameters with id, message, and metadata.
Returns: Result to include in the task’s completion message.
Raises: Exception: Any exception marks the task as failed.
start_task
def start_task(params: TaskSendParams) -> TaskSchedule a background task and return immediately.
The task runs asynchronously. Use :meth:subscribe_to_task_updates
to stream status and artifact events.
Args: params: Task parameters including id and input message.
Returns: Task object with initial WORKING status.
subscribe_to_task_updates
def subscribe_to_task_updates(params: TaskSendParams) -> AsyncIterator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]Subscribe to live status and artifact updates for a task.
Returns an async iterator that yields events until the task reaches a terminal state. Safe to call after task completion if still in cache.
Args: params: Parameters with the task id.
Yields: TaskStatusUpdateEvent or TaskArtifactUpdateEvent objects.
unsubscribe_task
def unsubscribe_task(params: TaskIdParams) -> AnyStop receiving updates for a task.
Cleans up the event queue for this task.
Args: params: Parameters with the task id.
update_task_artifact
def update_task_artifact(task_id: str, artifact: Artifact)Push an artifact update to task subscribers.
Use this to stream intermediate results (e.g., partial LLM output) while the task is still running.
Args: task_id: The task identifier. artifact: Artifact data to send to subscribers.
update_task_state
def update_task_state(task_id: str, state: TaskState, message: Optional[Message] = None) -> boolUpdate a task’s state and notify subscribers.
Terminal states (COMPLETED, FAILED, CANCELED) move the task to the completed cache and cannot be updated further.
Args: task_id: The task identifier. state: New state to set. message: Optional message with additional context.
Returns: True if the state was updated, False if task is already terminal.