Skip to content

DigitalKin Python SDK¤

CI PyPI Python Version License

The DigitalKin Python SDK for building and managing modules within the DigitalKin agentic mesh. Create custom Tools and Archetypes that communicate over gRPC, register with a service mesh, and scale independently.

Features¤

  • Async-native gRPC module system — every module is a gRPC server built on grpcio with full async support
  • Typed module contracts — Pydantic models for Input, Output, Setup, and Secret schemas with protocol-based trigger dispatch
  • Module-to-module communication — tools and archetypes discover each other via the registry and exchange requests over gRPC
  • Tool resolution — archetypes dynamically resolve and invoke tool modules at runtime
  • Admission queue & backpressure — built-in request admission with configurable concurrency limits
  • Healthcheck protocols — automatic ping, services, and status healthcheck triggers registered on every module
  • Profiling — optional [profiling] extra with asyncio-inspector, pyinstrument, viztracer, and yappi
  • Batched history writes — efficient storage writes for conversation history
  • TaskIQ integration — optional distributed task execution backed by RabbitMQ and Redis ([taskiq] extra)

Installation¤

# With uv (recommended)
uv add digitalkin

# With pip
pip install digitalkin

Optional extras:

# Distributed task execution (RabbitMQ + Redis)
uv add "digitalkin[taskiq]"

# Async profiling tools
uv add "digitalkin[profiling]"

Quick Start¤

1. Define your models¤

from pydantic import BaseModel
from digitalkin.models.module.base_types import DataModel, DataTrigger


class MessageInput(DataTrigger):
    protocol: str = "message"
    content: str


class InputModel(DataModel[MessageInput]):
    root: MessageInput


class MessageOutput(DataTrigger):
    protocol: str = "message"
    reply: str


class OutputModel(DataModel[MessageOutput]):
    root: MessageOutput

2. Create a module and trigger¤

from digitalkin import ArchetypeModule, ModuleContext, TriggerHandler
from digitalkin.models.module.setup_types import SetupModel


class MyArchetype(ArchetypeModule[InputModel, OutputModel, SetupModel, BaseModel]):
    async def initialize(self, context: ModuleContext, setup_data: SetupModel) -> None:
        pass

    async def cleanup(self, context: ModuleContext) -> None:
        pass


@MyArchetype.register
class MessageTrigger(TriggerHandler[InputModel, SetupModel, OutputModel]):
    protocol = "message"
    input_format = InputModel
    output_format = OutputModel

    def __init__(self, context: ModuleContext) -> None:
        super().__init__(context)

    async def handle(
        self,
        input_data: InputModel,
        setup_data: SetupModel,
        context: ModuleContext,
    ) -> None:
        output = OutputModel(root=MessageOutput(reply=f"Echo: {input_data.root.content}"))
        await self.send_message(context, output)

3. Run the server¤

import asyncio
from digitalkin.grpc_servers.module_server import ModuleServer

async def main() -> None:
    server = ModuleServer(MyArchetype)
    await server.start_async()
    await server.await_termination()

asyncio.run(main())

TaskIQ with RabbitMQ¤

TaskIQ integration allows the module to scale for heavy CPU tasks by distributing requests to stateless worker instances.

  • Decoupled Scalability: RabbitMQ brokers messages, letting producers and consumers scale independently.
  • Reliability: Durable queues, acknowledgements, and dead-lettering ensure tasks aren't lost.
  • Concurrency Control: TaskIQ's worker pool manages parallel execution without custom schedulers.
  • Flexibility: Built-in retries, exponential backoff, and Redis result-backend for resilient workflows.

To enable RabbitMQ streaming:

sudo rabbitmq-plugins enable rabbitmq_stream
task start-taskiq

Development¤

Prerequisites¤

  • Python 3.10+
  • uv — modern Python package management
  • Task — task runner

Setup¤

git clone --recurse-submodules https://github.com/DigitalKin-ai/digitalkin.git
cd digitalkin

task setup-dev
source .venv/bin/activate

Common Tasks¤

task linter               # Format + lint (ruff) + type check (mypy)
task check                # Linter + mypy + tests
task run-tests            # Run pytest via Docker
task build-package        # Build distribution
task bump-version -- patch|minor|major

task docs-serve           # Serve docs locally (mkdocs)
task docs-build           # Build docs

task generate-certificates  # Generate mTLS certs for gRPC
task start-taskiq           # Start TaskIQ worker

task clean                # Remove build artifacts + __pycache__
task clean-all            # Above + remove .venv

Publishing Process¤

  1. Update code and commit changes (following conventional branch/commit standard).
  2. Use task bump-version -- major|minor|patch to commit the new version.
  3. Use GitHub "Create Release" workflow to publish the new version.
  4. Workflow automatically publishes to Test PyPI and PyPI.

License¤

This project is licensed under the terms specified in the LICENSE file.


For more information, visit our Documentation or report issues on our Issues page.