0.4.0 → 0.4.1 — AG-UI Frontend Tools (HITL)¤
Summary¤
The AG-UI protocol lets the front declare its own tools in RunAgentInput.tools — tools that the LLM can "call" but whose actual execution happens on the client side (widgets, UI pickers, local API calls, user input prompts…). This release adds native support for these frontend tools in digitalkin.community.agno via a Human-In-The-Loop (HITL) runner that:
- registers frontend tools as
Function(external_execution=True)on the Agno agent, - pauses the run when the LLM calls a frontend tool, streams the
ToolCallStart/Args/Endevents to the client, and persists theRunOutputviaStorageStrategy, - emits an AG-UI
RunFinishedwithresult.status="awaiting_tool_result"to tell the front it must execute the tool and send the result back, - automatically resumes the run on the next request as soon as a matching
ToolMessagearrives in the message history, regardless of which replica handles the request (process-stateless).
No breaking change: everything is additive and opt-in.
What changes¤
1. AgnoStreamAdapter — RunEvent.run_paused support¤
Agno has a non-trivial behavior: for tools marked external_execution=True, it does not emit RunEvent.tool_call_started or RunEvent.tool_call_completed. It yields a RunPausedEvent directly, carrying the list tools: list[ToolExecution] (see agno/models/base.py:2585). Consequence: without dedicated handling, the front never sees ToolCallStart/Args/End events for frontend tools — it only receives an orphan RunFinished.
AgnoStreamAdapter now handles RunEvent.run_paused and synthesizes one ToolCallStartedEvent + ToolCallCompletedEvent(content=None) pair per pending tool, reconstructed from RunPausedEvent.tools[*] (tool_call_id, tool_name, tool_args). The downstream AgUiMixin bridge then emits ToolCallStart + ToolCallArgs + ToolCallEnd normally, without a ToolCallResult (guarded by if result_content: — passing content=None disables it, which is the correct behavior since the tool is not executed server-side).
The adapter additionally exposes three properties that consumers can read after streaming to detect the pause:
| Property | Type | Description |
|---|---|---|
is_paused |
bool |
True if a RunPausedEvent was seen during the last streaming session |
paused_tool_executions |
list[ToolExecution] |
Tools awaiting external execution |
paused_requirements |
list[RunRequirement] |
Matching Agno RunRequirement objects |
Important: this fix is additive and benefits all consumers of AgnoStreamAdapter, not just HITL. Any module that uses external_execution=True with the adapter will now see the ToolCall* events correctly.
1b. AgnoStreamAdapter — RunEvent.reasoning_step auto-wrapping¤
Agno's ReasoningTools toolkit (the think / analyze tools) emits RunEvent.reasoning_step events without the surrounding reasoning_started / reasoning_completed lifecycle that the AG-UI protocol requires. Previously, the adapter forwarded these as orphaned REASONING_MESSAGE_CONTENT events — no REASONING_START before, no REASONING_END after, empty messageId. The front had no way to render them correctly.
The adapter now auto-opens a reasoning sequence when a reasoning_step arrives while _reasoning_active is False:
- Closes any active text message (
_close_content). - Generates a fresh
reasoning_id, sets_reasoning_active = True. - Emits
ReasoningStartedEvent(→REASONING_START+REASONING_MESSAGE_STARTviaAgUiMixin). - Then emits the
ReasoningStepEventcontent as usual (→REASONING_MESSAGE_CONTENT).
The reasoning sequence is auto-closed by the next non-reasoning event (_handle_run_content, _handle_tool_call_started, _handle_run_completed) or by flush(), which emit REASONING_MESSAGE_END + REASONING_END.
Result: the front now receives a proper lifecycle for tool-based reasoning:
TOOL_CALL_RESULT (analyze)
REASONING_START ← auto-opened
REASONING_MESSAGE_START
REASONING_MESSAGE_CONTENT ← the step content (no longer orphaned)
REASONING_MESSAGE_END ← auto-closed by next event
REASONING_END
TEXT_MESSAGE_START
This fix benefits any module using Agno's ReasoningTools (think, analyze) — not just HITL.
2. New module community/agno/agui_tools.py¤
Two helpers to wire frontend tools onto an Agno agent:
from digitalkin.community.agno import agui_tool_to_external_function, make_tools_factory
| Function | Role |
|---|---|
agui_tool_to_external_function(tool: AgUiTool) -> Function |
Wraps an ag_ui.core.types.Tool into an agno.tools.function.Function marked external_execution=True, passing the AG-UI JSON Schema through as parameters. |
make_tools_factory(base_tools, dependency_key="agui_tools") -> Callable[[RunContext], list] |
Builds a factory for the Agent's tools= parameter. On every run, the factory reads run_context.dependencies[dependency_key] (the list of AgUiTool passed via agent.arun(dependencies={...})), converts each entry with agui_tool_to_external_function, and concatenates the result with base_tools. |
⚠️ Critical requirement: the Agent using this factory must be built with cache_callables=False. Otherwise Agno caches the result of the first factory call and subsequent runs will never see new frontend tools (agno/utils/callables.py:260-284).
Why use dependencies as a transport channel? It's a repurposed but intentional use: the tools factory receives the RunContext, which inherits the dependencies passed to arun(dependencies=...) before tool resolution (verified at agno/agent/_run.py:2563). It's the only Agno-native mechanism that lets us inject per-run tools without mutating the Agent instance.
3. New module community/agno/hitl.py¤
The full HITL runtime. Everything is re-exported from digitalkin.community.agno.
| Symbol | Type | Role |
|---|---|---|
PausedRunRecord |
BaseModel |
Pydantic schema for the paused_runs storage collection. Holds thread_id, run_id, pending_tool_call_ids, payload (= RunOutput.to_dict()). |
HITL_STORAGE_CONFIG |
dict |
Ready-to-merge config fragment for services_config_params["storage"]["config"] — registers {"paused_runs": PausedRunRecord}. |
PauseInfo |
dataclass |
Returned by the runner when a run pauses. Holds thread_id, run_id, pending_tool_call_ids. |
PausedRunStore |
class |
Typed wrapper around StorageStrategy: save(run_output, thread_id), load(thread_id), delete(thread_id). Handles serialization via RunOutput.to_dict() / RunOutput.from_dict(). |
AgnoHitlRunner |
class |
Main runner. Three-level API: run() / continue_paused_run() (low-level), try_resume() (mid-level), handle_agui_input() (all-in-one for triggers). |
emit_awaiting_tool_result() |
async func |
Helper that emits the AG-UI RunFinished with result={"status": "awaiting_tool_result", "pending_tool_call_ids": [...]} via context.callbacks.send_message. |
The runner supports all five HITL cases:
- Fresh run: extracts the last
UserMessagefrominput_data.messages, launchesagent.arun()withdependencies={"agui_tools": input_data.tools}. - Resume: detects a
ToolMessagewhosetool_call_idmatches a storedpending_tool_call_id, loads the pausedRunOutput, injects the result directly ontorun_output.tools[i].result, and resumes viaagent.acontinue_run(). - Partial resume (guard): if only k/N pending tool results are provided, the runner emits a
RUN_STARTED+RUN_ERRORwitherror_type="partial_tool_results"and a message listing missingtool_call_ids. The paused record is preserved so the client can retry with all results. This prevents a cryptic Agno retry loop. - Abandon: if the front sends a new
UserMessagewhile a tool is pending, the paused record is dropped and a fresh run starts. - Cascade: if the agent calls another frontend tool after a resume, the runner re-persists and emits a new
awaiting_tool_result.
Agno quirks handled transparently¤
Two Agno behaviors required workarounds in the runner (invisible to consumers):
RunOutput.to_dict() / from_dict() breaks object identity. After a storage round-trip, run_output.tools[i] and run_output.requirements[i].tool_execution are two different ToolExecution instances with the same data. Agno's handle_tool_call_updates (_tools.py:762) iterates run_response.tools, not requirements. The runner writes tool.result directly onto run_output.tools[i] — not via RunRequirement.set_external_execution_result() — so the injected results are visible to Agno at resume time.
acontinue_run emits RunContinued, not RunStarted. The AG-UI protocol requires every run to begin with a RUN_STARTED event. Agno's resume path emits RunContinued instead, and the AgnoStreamAdapter has no handler for it. The runner emits a synthetic RunStartedEvent (with the current AG-UI run_id and thread_id) before draining the Agno stream — continue_paused_run accepts an optional run_id parameter for this purpose.
Architecture¤
Front (AG-UI) → RunAgentInput(tools=[...], messages=[...])
↓
Trigger.handle → AgnoHitlRunner.handle_agui_input(input_data, send, context)
↓
┌─ Resume? (a ToolMessage matches a pending_tool_call_id)
│ ↓ yes ↓ no
│ continue_paused_run run(message, agui_tools)
│ ↓ ↓
│ Agno.acontinue_run Agno.arun(dependencies={"agui_tools": [...]})
│ ↓ ↓
│ factory resolves tools factory resolves base_tools + external tools
│ ↓ ↓
│ AgnoStreamAdapter drains the stream
│ ├── normal events → send() → AgUiMixin → front
│ └── RunPausedEvent → synthesize ToolCallStart/End + is_paused=True
│ ↓
│ if adapter.is_paused:
│ PausedRunStore.save(RunOutput, thread_id) via StorageStrategy
│ ↓
└─ emit_awaiting_tool_result(context, ...)
→ AG-UI RunFinished(result.status="awaiting_tool_result")
Front ↔ back protocol (pause / resume)¤
Pause signal — the back emits an AG-UI RunFinished with a non-empty result:
{
"type": "RUN_FINISHED",
"threadId": "thread-123",
"runId": "run-abc",
"result": {
"status": "awaiting_tool_result",
"pending_tool_call_ids": ["toolu_01AnKEg33agtt..."]
}
}
Resume — the front sends a new RunAgentInput with:
- the same
threadId(the resume key on the back), tools= the same list as the previous turn (the schemas must stay declared),messages= history + oneToolMessageper pendingtool_call_id:
{
"id": "uuid-front",
"role": "tool",
"toolCallId": "toolu_01AnKEg33agtt...",
"content": "{\"temp_c\": 14, \"conditions\": \"cloudy\"}"
}
The content must be a string (JSON-stringified if structured) — that's the standard AG-UI / OpenAI constraint on ToolMessage.
Abandon — if the front sends a new UserMessage as the last entry (without a ToolMessage), the back drops the paused record and starts a normal run. No error.
Cascade — if the LLM calls another frontend tool after a resume, a new TOOL_CALL_* + RUN_FINISHED(awaiting_tool_result) sequence is emitted with new pending_tool_call_ids. The front loops again.
Tutorial — integrating into an archetype¤
Five steps to wire HITL into an Agno module (example inspired by template-archetype).
1. Register the storage collection¤
In the Module that inherits from ArchetypeModule, add HITL_STORAGE_CONFIG to the storage collections mapping:
# src/my_module/module.py
from digitalkin.community.agno import HITL_STORAGE_CONFIG
from digitalkin.modules.archetype_module import ArchetypeModule
from my_module.models import AgnoSession, MyInput, MyOutput, MySecret, MySetup
class MyModule(ArchetypeModule[MyInput, MyOutput, MySetup, MySecret]):
name = "MyModule"
# ...
services_config_params: ClassVar[dict] = {
"storage": {
"config": {
"agno_sessions": AgnoSession,
"chat_history": ChatHistory,
**HITL_STORAGE_CONFIG,
},
"client_config": client_config,
},
# ... other services
}
HITL_STORAGE_CONFIG registers the paused_runs collection with the PausedRunRecord schema.
2. Inject the tools factory into the Agno agent¤
Wherever you build the Agent, replace tools=[...] with tools=make_tools_factory(...) and add cache_callables=False:
# src/my_module/agents/helpers.py
from agno.agent import Agent
from agno.models.litellm import LiteLLM
from digitalkin.community.agno import make_tools_factory
def create_my_agent(config, storage, cost, *, session_id=None) -> Agent:
base_tools = [AsyncDuckDuckGoTools(), MyCustomToolkit()]
return Agent(
model=LiteLLM(...),
tools=make_tools_factory(base_tools),
cache_callables=False, # CRITICAL: otherwise the factory is cached
instructions=config.instructions,
# ... rest of the config
)
base_tools stay available at all times. Frontend tools sent by the client are dynamically added on each run.
3. Instantiate the AgnoHitlRunner¤
In the module's agent wrapper, keep a reference to the runner and expose a handle_agui_input method that delegates to it:
# src/my_module/agents/my_agent.py
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from digitalkin.community.agno import AgnoHitlRunner, PauseInfo
from my_module.agents.helpers import create_my_agent
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine
from agno.agent import Agent
from digitalkin.models.events import BaseAgentRunEvent
from digitalkin.models.module import ModuleContext
from digitalkin.services.storage import StorageStrategy
class MyAgent:
def __init__(self, agent: Agent, storage: StorageStrategy) -> None:
self.agent = agent
self.runner = AgnoHitlRunner(agent=agent, storage=storage)
@classmethod
async def create(cls, config, storage, cost) -> "MyAgent":
agent = create_my_agent(config, storage, cost, session_id=config.session.mission_id)
return cls(agent=agent, storage=storage)
async def handle_agui_input(
self,
input_data: Any,
*,
send: Callable[[BaseAgentRunEvent], Coroutine[Any, Any, None]],
context: ModuleContext,
) -> PauseInfo | None:
return await self.runner.handle_agui_input(
input_data=input_data,
send=send,
context=context,
)
4. Wire the trigger¤
The TriggerHandler listening on the agui_stream protocol becomes minimal — all logic lives in the runner:
# src/my_module/triggers/message_trigger.py
import logging
from digitalkin.models.events import BaseAgentRunEvent
from digitalkin.models.module import ModuleContext
from digitalkin.models.module.ag_ui import AgUiOutput
from digitalkin.modules.trigger_handler import TriggerHandler
from my_module.models import AgUiStreamInput, MySetup
from my_module.module import MyModule
logger = logging.getLogger(__name__)
@MyModule.register
class MessageTrigger(TriggerHandler):
protocol = "agui_stream"
input_format = AgUiStreamInput
output_format = AgUiOutput
async def handle(
self,
input_data: AgUiStreamInput,
setup_data: MySetup, # noqa: ARG002
context: ModuleContext,
) -> None:
agent = context.state.my_agent
async def send_event(event: BaseAgentRunEvent) -> None:
await self.send_message(context, event)
try:
await agent.handle_agui_input(
input_data=input_data,
send=send_event,
context=context,
)
except Exception:
logger.exception("Error during agent invocation")
raise
That's it. handle_agui_input takes care of:
- detecting whether it's a resume (a
ToolMessagematches apending_tool_call_id) or a fresh run (lastUserMessage), - dropping orphan records on HITL abandon,
- streaming digitalkin events via
send_event(themselves translated to AG-UI byAgUiMixin), - persisting the paused
RunOutputin thepaused_runscollection, - emitting the
RunFinishedwithawaiting_tool_resulton pause (fresh or cascade).
5. End-to-end verification¤
With an AG-UI front that sends at least one test tool in tools (e.g. get_weather(city: string)):
- Pause: user prompt "show me the weather in Lyon" → the stream contains
TOOL_CALL_START,TOOL_CALL_ARGS,TOOL_CALL_END, then aRUN_FINISHEDwhoseresult.status === "awaiting_tool_result"andpending_tool_call_idsis non-empty. - Persistence:
await context.storage.read("paused_runs", thread_id)returns aPausedRunRecordwith the correctthread_idandpending_tool_call_ids. - Resume: send a new
RunAgentInputwith the samethread_id, identicaltools, andmessagesenriched with aToolMessage(tool_call_id, content="{\"temp\":14}")→ the agent resumes, emits the finalTEXT_MESSAGE_*, then a standardRUN_FINISHED(withoutresult.status). - Cleanup: after a successful resume,
storage.read("paused_runs", thread_id)returnsNone. - Abandon: during a pause, send a new
UserMessagewithout aToolMessage→ the record is dropped and a fresh run starts. - Cascade: if the agent calls another frontend tool after a resume, confirm a new
RUN_FINISHED(awaiting_tool_result)is emitted with the new IDs and the record is re-persisted.
Public API exported¤
All these symbols are available from digitalkin.community.agno:
| Symbol | Type | Primary use |
|---|---|---|
AgnoStreamAdapter |
class |
Agno → DigitalKin event conversion (unchanged, except the new run_paused handler) |
agui_tool_to_external_function |
func |
Convert a single AgUiTool — advanced use |
make_tools_factory |
func |
Build the tools= factory for Agent |
AgnoHitlRunner |
class |
High-level HITL runner — the one to use in modules |
PausedRunStore |
class |
StorageStrategy wrapper — advanced use |
PausedRunRecord |
BaseModel |
Storage schema — rarely imported directly |
HITL_STORAGE_CONFIG |
dict |
Ready-to-merge fragment for services_config_params |
PauseInfo |
dataclass |
Return type of the runner on pause |
emit_awaiting_tool_result |
async func |
Manual emission of the pause RunFinished — advanced use |
Design notes¤
Why use dependencies as a transport channel for frontend tools? The tools= factory receives the RunContext at the moment Agno resolves tools for the run (aresolve_callable_tools). The RunContext carries the dependencies passed to arun(dependencies=...), and they are populated before tool resolution (agno/agent/_run.py:2563). It's therefore the only per-run injection point supported natively by Agno without mutating the Agent instance. The actual tool registration still goes through the factory (i.e. the tools= parameter), not through dependencies, which only conveys the list.
Why is the storage key thread_id and not tool_call_id? One pause per thread at a time, which is the standard HITL model. If several frontend tools are called in the same run, they are all gathered in the same RunPausedEvent and stored under a single record. Nested pauses (a thread with several pauses stacked in parallel) are not supported — this is not an Agno limitation, it's a deliberate simplicity choice.
Why cache_callables=False is non-negotiable. aresolve_callable_tools caches the factory result by default (agno/utils/callables.py:260-284) — without explicitly disabling it, the first call freezes the tools for the entire lifetime of the Agent instance and subsequent runs never see different frontend tools. The factory is designed to be cheap (just a concatenation), so there's no penalty in disabling the cache.
Process-stateless by design. All the paused state lives in StorageStrategy (an external gRPC service). Any replica can handle the resume request for the same thread_id — no need for sticky routing or in-process caching.
No breaking change¤
The AgnoStreamAdapter patch is additive: modules that don't use external_execution=True see no difference in the streamed events. The new agui_tools and hitl modules are opt-in — you have to explicitly import them and wire the AgnoHitlRunner to enable HITL. No deprecation.
Migration from a homegrown HITL module¤
If you had already implemented frontend-tools support with local code (custom PausedRun schema, custom converter, resume logic in the trigger), you can now remove:
- The local Pydantic model for the
paused_runscollection → usePausedRunRecordviaHITL_STORAGE_CONFIG. - Any local
AgUiTool→Functionconversion file → usemake_tools_factory. - The
_try_resume/_maybe_emit_awaitinglogic in your trigger → usehandle_agui_input. - The manual
RunPausedEventdetection in your agent wrapper → the SDK adapter handles it now.
Concrete example of the refactor on template-archetype (before/after):
| Before | After |
|---|---|
models/storage.py::PausedRun (local model) |
Removed — replaced by HITL_STORAGE_CONFIG |
agents/frontend_tools.py (local converter) |
Removed — replaced by make_tools_factory |
agents/agno_agent.py ~250 lines with homemade pause detection |
~60 lines, just a wrapper around AgnoHitlRunner |
triggers/message_trigger.py ~210 lines with _try_resume / _maybe_emit_awaiting |
~55 lines, just a call to agent.handle_agui_input(...) |