Skip to content

0.4.0 → 0.4.1 — AG-UI Frontend Tools (HITL)¤

Summary¤

The AG-UI protocol lets the front declare its own tools in RunAgentInput.tools — tools that the LLM can "call" but whose actual execution happens on the client side (widgets, UI pickers, local API calls, user input prompts…). This release adds native support for these frontend tools in digitalkin.community.agno via a Human-In-The-Loop (HITL) runner that:

  • registers frontend tools as Function(external_execution=True) on the Agno agent,
  • pauses the run when the LLM calls a frontend tool, streams the ToolCallStart/Args/End events to the client, and persists the RunOutput via StorageStrategy,
  • emits an AG-UI RunFinished with result.status="awaiting_tool_result" to tell the front it must execute the tool and send the result back,
  • automatically resumes the run on the next request as soon as a matching ToolMessage arrives in the message history, regardless of which replica handles the request (process-stateless).

No breaking change: everything is additive and opt-in.

What changes¤

1. AgnoStreamAdapterRunEvent.run_paused support¤

Agno has a non-trivial behavior: for tools marked external_execution=True, it does not emit RunEvent.tool_call_started or RunEvent.tool_call_completed. It yields a RunPausedEvent directly, carrying the list tools: list[ToolExecution] (see agno/models/base.py:2585). Consequence: without dedicated handling, the front never sees ToolCallStart/Args/End events for frontend tools — it only receives an orphan RunFinished.

AgnoStreamAdapter now handles RunEvent.run_paused and synthesizes one ToolCallStartedEvent + ToolCallCompletedEvent(content=None) pair per pending tool, reconstructed from RunPausedEvent.tools[*] (tool_call_id, tool_name, tool_args). The downstream AgUiMixin bridge then emits ToolCallStart + ToolCallArgs + ToolCallEnd normally, without a ToolCallResult (guarded by if result_content: — passing content=None disables it, which is the correct behavior since the tool is not executed server-side).

The adapter additionally exposes three properties that consumers can read after streaming to detect the pause:

Property Type Description
is_paused bool True if a RunPausedEvent was seen during the last streaming session
paused_tool_executions list[ToolExecution] Tools awaiting external execution
paused_requirements list[RunRequirement] Matching Agno RunRequirement objects

Important: this fix is additive and benefits all consumers of AgnoStreamAdapter, not just HITL. Any module that uses external_execution=True with the adapter will now see the ToolCall* events correctly.

1b. AgnoStreamAdapterRunEvent.reasoning_step auto-wrapping¤

Agno's ReasoningTools toolkit (the think / analyze tools) emits RunEvent.reasoning_step events without the surrounding reasoning_started / reasoning_completed lifecycle that the AG-UI protocol requires. Previously, the adapter forwarded these as orphaned REASONING_MESSAGE_CONTENT events — no REASONING_START before, no REASONING_END after, empty messageId. The front had no way to render them correctly.

The adapter now auto-opens a reasoning sequence when a reasoning_step arrives while _reasoning_active is False:

  1. Closes any active text message (_close_content).
  2. Generates a fresh reasoning_id, sets _reasoning_active = True.
  3. Emits ReasoningStartedEvent (→ REASONING_START + REASONING_MESSAGE_START via AgUiMixin).
  4. Then emits the ReasoningStepEvent content as usual (→ REASONING_MESSAGE_CONTENT).

The reasoning sequence is auto-closed by the next non-reasoning event (_handle_run_content, _handle_tool_call_started, _handle_run_completed) or by flush(), which emit REASONING_MESSAGE_END + REASONING_END.

Result: the front now receives a proper lifecycle for tool-based reasoning:

TOOL_CALL_RESULT    (analyze)
REASONING_START                    ← auto-opened
REASONING_MESSAGE_START
REASONING_MESSAGE_CONTENT          ← the step content (no longer orphaned)
REASONING_MESSAGE_END              ← auto-closed by next event
REASONING_END
TEXT_MESSAGE_START

This fix benefits any module using Agno's ReasoningTools (think, analyze) — not just HITL.

2. New module community/agno/agui_tools.py¤

Two helpers to wire frontend tools onto an Agno agent:

from digitalkin.community.agno import agui_tool_to_external_function, make_tools_factory
Function Role
agui_tool_to_external_function(tool: AgUiTool) -> Function Wraps an ag_ui.core.types.Tool into an agno.tools.function.Function marked external_execution=True, passing the AG-UI JSON Schema through as parameters.
make_tools_factory(base_tools, dependency_key="agui_tools") -> Callable[[RunContext], list] Builds a factory for the Agent's tools= parameter. On every run, the factory reads run_context.dependencies[dependency_key] (the list of AgUiTool passed via agent.arun(dependencies={...})), converts each entry with agui_tool_to_external_function, and concatenates the result with base_tools.

⚠️ Critical requirement: the Agent using this factory must be built with cache_callables=False. Otherwise Agno caches the result of the first factory call and subsequent runs will never see new frontend tools (agno/utils/callables.py:260-284).

Why use dependencies as a transport channel? It's a repurposed but intentional use: the tools factory receives the RunContext, which inherits the dependencies passed to arun(dependencies=...) before tool resolution (verified at agno/agent/_run.py:2563). It's the only Agno-native mechanism that lets us inject per-run tools without mutating the Agent instance.

3. New module community/agno/hitl.py¤

The full HITL runtime. Everything is re-exported from digitalkin.community.agno.

Symbol Type Role
PausedRunRecord BaseModel Pydantic schema for the paused_runs storage collection. Holds thread_id, run_id, pending_tool_call_ids, payload (= RunOutput.to_dict()).
HITL_STORAGE_CONFIG dict Ready-to-merge config fragment for services_config_params["storage"]["config"] — registers {"paused_runs": PausedRunRecord}.
PauseInfo dataclass Returned by the runner when a run pauses. Holds thread_id, run_id, pending_tool_call_ids.
PausedRunStore class Typed wrapper around StorageStrategy: save(run_output, thread_id), load(thread_id), delete(thread_id). Handles serialization via RunOutput.to_dict() / RunOutput.from_dict().
AgnoHitlRunner class Main runner. Three-level API: run() / continue_paused_run() (low-level), try_resume() (mid-level), handle_agui_input() (all-in-one for triggers).
emit_awaiting_tool_result() async func Helper that emits the AG-UI RunFinished with result={"status": "awaiting_tool_result", "pending_tool_call_ids": [...]} via context.callbacks.send_message.

The runner supports all five HITL cases:

  • Fresh run: extracts the last UserMessage from input_data.messages, launches agent.arun() with dependencies={"agui_tools": input_data.tools}.
  • Resume: detects a ToolMessage whose tool_call_id matches a stored pending_tool_call_id, loads the paused RunOutput, injects the result directly onto run_output.tools[i].result, and resumes via agent.acontinue_run().
  • Partial resume (guard): if only k/N pending tool results are provided, the runner emits a RUN_STARTED + RUN_ERROR with error_type="partial_tool_results" and a message listing missing tool_call_ids. The paused record is preserved so the client can retry with all results. This prevents a cryptic Agno retry loop.
  • Abandon: if the front sends a new UserMessage while a tool is pending, the paused record is dropped and a fresh run starts.
  • Cascade: if the agent calls another frontend tool after a resume, the runner re-persists and emits a new awaiting_tool_result.

Agno quirks handled transparently¤

Two Agno behaviors required workarounds in the runner (invisible to consumers):

RunOutput.to_dict() / from_dict() breaks object identity. After a storage round-trip, run_output.tools[i] and run_output.requirements[i].tool_execution are two different ToolExecution instances with the same data. Agno's handle_tool_call_updates (_tools.py:762) iterates run_response.tools, not requirements. The runner writes tool.result directly onto run_output.tools[i] — not via RunRequirement.set_external_execution_result() — so the injected results are visible to Agno at resume time.

acontinue_run emits RunContinued, not RunStarted. The AG-UI protocol requires every run to begin with a RUN_STARTED event. Agno's resume path emits RunContinued instead, and the AgnoStreamAdapter has no handler for it. The runner emits a synthetic RunStartedEvent (with the current AG-UI run_id and thread_id) before draining the Agno stream — continue_paused_run accepts an optional run_id parameter for this purpose.

Architecture¤

Front (AG-UI)         → RunAgentInput(tools=[...], messages=[...])
    ↓
Trigger.handle        → AgnoHitlRunner.handle_agui_input(input_data, send, context)
    ↓
  ┌─ Resume? (a ToolMessage matches a pending_tool_call_id)
  │     ↓ yes                          ↓ no
  │  continue_paused_run          run(message, agui_tools)
  │     ↓                             ↓
  │  Agno.acontinue_run           Agno.arun(dependencies={"agui_tools": [...]})
  │     ↓                             ↓
  │  factory resolves tools       factory resolves base_tools + external tools
  │     ↓                             ↓
  │  AgnoStreamAdapter drains the stream
  │  ├── normal events            → send() → AgUiMixin → front
  │  └── RunPausedEvent           → synthesize ToolCallStart/End + is_paused=True
  │     ↓
  │  if adapter.is_paused:
  │  PausedRunStore.save(RunOutput, thread_id) via StorageStrategy
  │     ↓
  └─ emit_awaiting_tool_result(context, ...)
                           → AG-UI RunFinished(result.status="awaiting_tool_result")

Front ↔ back protocol (pause / resume)¤

Pause signal — the back emits an AG-UI RunFinished with a non-empty result:

{
  "type": "RUN_FINISHED",
  "threadId": "thread-123",
  "runId": "run-abc",
  "result": {
    "status": "awaiting_tool_result",
    "pending_tool_call_ids": ["toolu_01AnKEg33agtt..."]
  }
}

Resume — the front sends a new RunAgentInput with:

  • the same threadId (the resume key on the back),
  • tools = the same list as the previous turn (the schemas must stay declared),
  • messages = history + one ToolMessage per pending tool_call_id:
{
  "id": "uuid-front",
  "role": "tool",
  "toolCallId": "toolu_01AnKEg33agtt...",
  "content": "{\"temp_c\": 14, \"conditions\": \"cloudy\"}"
}

The content must be a string (JSON-stringified if structured) — that's the standard AG-UI / OpenAI constraint on ToolMessage.

Abandon — if the front sends a new UserMessage as the last entry (without a ToolMessage), the back drops the paused record and starts a normal run. No error.

Cascade — if the LLM calls another frontend tool after a resume, a new TOOL_CALL_* + RUN_FINISHED(awaiting_tool_result) sequence is emitted with new pending_tool_call_ids. The front loops again.

Tutorial — integrating into an archetype¤

Five steps to wire HITL into an Agno module (example inspired by template-archetype).

1. Register the storage collection¤

In the Module that inherits from ArchetypeModule, add HITL_STORAGE_CONFIG to the storage collections mapping:

# src/my_module/module.py
from digitalkin.community.agno import HITL_STORAGE_CONFIG
from digitalkin.modules.archetype_module import ArchetypeModule

from my_module.models import AgnoSession, MyInput, MyOutput, MySecret, MySetup


class MyModule(ArchetypeModule[MyInput, MyOutput, MySetup, MySecret]):
    name = "MyModule"
    # ...

    services_config_params: ClassVar[dict] = {
        "storage": {
            "config": {
                "agno_sessions": AgnoSession,
                "chat_history": ChatHistory,
                **HITL_STORAGE_CONFIG,
            },
            "client_config": client_config,
        },
        # ... other services
    }

HITL_STORAGE_CONFIG registers the paused_runs collection with the PausedRunRecord schema.

2. Inject the tools factory into the Agno agent¤

Wherever you build the Agent, replace tools=[...] with tools=make_tools_factory(...) and add cache_callables=False:

# src/my_module/agents/helpers.py
from agno.agent import Agent
from agno.models.litellm import LiteLLM
from digitalkin.community.agno import make_tools_factory


def create_my_agent(config, storage, cost, *, session_id=None) -> Agent:
    base_tools = [AsyncDuckDuckGoTools(), MyCustomToolkit()]

    return Agent(
        model=LiteLLM(...),
        tools=make_tools_factory(base_tools),
        cache_callables=False,   # CRITICAL: otherwise the factory is cached
        instructions=config.instructions,
        # ... rest of the config
    )

base_tools stay available at all times. Frontend tools sent by the client are dynamically added on each run.

3. Instantiate the AgnoHitlRunner¤

In the module's agent wrapper, keep a reference to the runner and expose a handle_agui_input method that delegates to it:

# src/my_module/agents/my_agent.py
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from digitalkin.community.agno import AgnoHitlRunner, PauseInfo

from my_module.agents.helpers import create_my_agent

if TYPE_CHECKING:
    from collections.abc import Callable, Coroutine

    from agno.agent import Agent
    from digitalkin.models.events import BaseAgentRunEvent
    from digitalkin.models.module import ModuleContext
    from digitalkin.services.storage import StorageStrategy


class MyAgent:
    def __init__(self, agent: Agent, storage: StorageStrategy) -> None:
        self.agent = agent
        self.runner = AgnoHitlRunner(agent=agent, storage=storage)

    @classmethod
    async def create(cls, config, storage, cost) -> "MyAgent":
        agent = create_my_agent(config, storage, cost, session_id=config.session.mission_id)
        return cls(agent=agent, storage=storage)

    async def handle_agui_input(
        self,
        input_data: Any,
        *,
        send: Callable[[BaseAgentRunEvent], Coroutine[Any, Any, None]],
        context: ModuleContext,
    ) -> PauseInfo | None:
        return await self.runner.handle_agui_input(
            input_data=input_data,
            send=send,
            context=context,
        )

4. Wire the trigger¤

The TriggerHandler listening on the agui_stream protocol becomes minimal — all logic lives in the runner:

# src/my_module/triggers/message_trigger.py
import logging

from digitalkin.models.events import BaseAgentRunEvent
from digitalkin.models.module import ModuleContext
from digitalkin.models.module.ag_ui import AgUiOutput
from digitalkin.modules.trigger_handler import TriggerHandler

from my_module.models import AgUiStreamInput, MySetup
from my_module.module import MyModule

logger = logging.getLogger(__name__)


@MyModule.register
class MessageTrigger(TriggerHandler):
    protocol = "agui_stream"
    input_format = AgUiStreamInput
    output_format = AgUiOutput

    async def handle(
        self,
        input_data: AgUiStreamInput,
        setup_data: MySetup,  # noqa: ARG002
        context: ModuleContext,
    ) -> None:
        agent = context.state.my_agent

        async def send_event(event: BaseAgentRunEvent) -> None:
            await self.send_message(context, event)

        try:
            await agent.handle_agui_input(
                input_data=input_data,
                send=send_event,
                context=context,
            )
        except Exception:
            logger.exception("Error during agent invocation")
            raise

That's it. handle_agui_input takes care of:

  • detecting whether it's a resume (a ToolMessage matches a pending_tool_call_id) or a fresh run (last UserMessage),
  • dropping orphan records on HITL abandon,
  • streaming digitalkin events via send_event (themselves translated to AG-UI by AgUiMixin),
  • persisting the paused RunOutput in the paused_runs collection,
  • emitting the RunFinished with awaiting_tool_result on pause (fresh or cascade).

5. End-to-end verification¤

With an AG-UI front that sends at least one test tool in tools (e.g. get_weather(city: string)):

  • Pause: user prompt "show me the weather in Lyon" → the stream contains TOOL_CALL_START, TOOL_CALL_ARGS, TOOL_CALL_END, then a RUN_FINISHED whose result.status === "awaiting_tool_result" and pending_tool_call_ids is non-empty.
  • Persistence: await context.storage.read("paused_runs", thread_id) returns a PausedRunRecord with the correct thread_id and pending_tool_call_ids.
  • Resume: send a new RunAgentInput with the same thread_id, identical tools, and messages enriched with a ToolMessage(tool_call_id, content="{\"temp\":14}") → the agent resumes, emits the final TEXT_MESSAGE_*, then a standard RUN_FINISHED (without result.status).
  • Cleanup: after a successful resume, storage.read("paused_runs", thread_id) returns None.
  • Abandon: during a pause, send a new UserMessage without a ToolMessage → the record is dropped and a fresh run starts.
  • Cascade: if the agent calls another frontend tool after a resume, confirm a new RUN_FINISHED(awaiting_tool_result) is emitted with the new IDs and the record is re-persisted.

Public API exported¤

All these symbols are available from digitalkin.community.agno:

Symbol Type Primary use
AgnoStreamAdapter class Agno → DigitalKin event conversion (unchanged, except the new run_paused handler)
agui_tool_to_external_function func Convert a single AgUiTool — advanced use
make_tools_factory func Build the tools= factory for Agent
AgnoHitlRunner class High-level HITL runner — the one to use in modules
PausedRunStore class StorageStrategy wrapper — advanced use
PausedRunRecord BaseModel Storage schema — rarely imported directly
HITL_STORAGE_CONFIG dict Ready-to-merge fragment for services_config_params
PauseInfo dataclass Return type of the runner on pause
emit_awaiting_tool_result async func Manual emission of the pause RunFinished — advanced use

Design notes¤

Why use dependencies as a transport channel for frontend tools? The tools= factory receives the RunContext at the moment Agno resolves tools for the run (aresolve_callable_tools). The RunContext carries the dependencies passed to arun(dependencies=...), and they are populated before tool resolution (agno/agent/_run.py:2563). It's therefore the only per-run injection point supported natively by Agno without mutating the Agent instance. The actual tool registration still goes through the factory (i.e. the tools= parameter), not through dependencies, which only conveys the list.

Why is the storage key thread_id and not tool_call_id? One pause per thread at a time, which is the standard HITL model. If several frontend tools are called in the same run, they are all gathered in the same RunPausedEvent and stored under a single record. Nested pauses (a thread with several pauses stacked in parallel) are not supported — this is not an Agno limitation, it's a deliberate simplicity choice.

Why cache_callables=False is non-negotiable. aresolve_callable_tools caches the factory result by default (agno/utils/callables.py:260-284) — without explicitly disabling it, the first call freezes the tools for the entire lifetime of the Agent instance and subsequent runs never see different frontend tools. The factory is designed to be cheap (just a concatenation), so there's no penalty in disabling the cache.

Process-stateless by design. All the paused state lives in StorageStrategy (an external gRPC service). Any replica can handle the resume request for the same thread_id — no need for sticky routing or in-process caching.

No breaking change¤

The AgnoStreamAdapter patch is additive: modules that don't use external_execution=True see no difference in the streamed events. The new agui_tools and hitl modules are opt-in — you have to explicitly import them and wire the AgnoHitlRunner to enable HITL. No deprecation.

Migration from a homegrown HITL module¤

If you had already implemented frontend-tools support with local code (custom PausedRun schema, custom converter, resume logic in the trigger), you can now remove:

  • The local Pydantic model for the paused_runs collection → use PausedRunRecord via HITL_STORAGE_CONFIG.
  • Any local AgUiToolFunction conversion file → use make_tools_factory.
  • The _try_resume / _maybe_emit_awaiting logic in your trigger → use handle_agui_input.
  • The manual RunPausedEvent detection in your agent wrapper → the SDK adapter handles it now.

Concrete example of the refactor on template-archetype (before/after):

Before After
models/storage.py::PausedRun (local model) Removed — replaced by HITL_STORAGE_CONFIG
agents/frontend_tools.py (local converter) Removed — replaced by make_tools_factory
agents/agno_agent.py ~250 lines with homemade pause detection ~60 lines, just a wrapper around AgnoHitlRunner
triggers/message_trigger.py ~210 lines with _try_resume / _maybe_emit_awaiting ~55 lines, just a call to agent.handle_agui_input(...)