Skip to content

SDK Flow: Server to Module Execution¤

Architecture Stateless¤

┌─────────────────────────────────────────────────────────────────────┐
│                    ModuleServer (Long-lived)                         │
│  - Listens to gRPC continuously                                      │
│  - Stores NO state between requests                                  │
│  - Each request = new module instance                                │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    Per-Request (Ephemeral)                           │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐              │
│  │   job_id     │  │   Module     │  │ TaskSession  │              │
│  │   (uuid)     │  │   Instance   │  │   + Queue    │              │
│  └──────────────┘  └──────────────┘  └──────────────┘              │
│         │                 │                  │                       │
│         └────────────────┼──────────────────┘                       │
│                          │                                           │
│                     Destroyed after execution                       │
└─────────────────────────────────────────────────────────────────────┘

Flow Diagram¤

flowchart TB
    subgraph Client["Client"]
        C1[ConfigSetupModuleRequest]
        C2[StartModuleRequest]
        C3[Receive Stream Response]
    end

    subgraph Server["ModuleServer - Stateless Long-lived"]
        S1[gRPC Listener]
        S2[ModuleServicer]
    end

    subgraph ConfigSetupFlow["ConfigSetupModule Flow"]
        CS1[Parse request.content → ConfigSetupModel]
        CS2[Parse setup_version → SetupModel]
        CS3[JobManager.create_config_setup_instance_job]
        CS4[ModuleFactory.create_module_instance]
        CS5[BaseModule.__init__]
        CS6[Create ModuleContext + Services]
        CS7[TaskSession with Queue]
        CS8[module.start_config_setup]
        CS9["asyncio.gather:<br/>_resolve_tools || run_config_setup"]
        CS10[callback → Queue → Response]
    end

    subgraph StartModuleFlow["StartModule Flow"]
        SM1[Parse input → InputModel]
        SM2[Load SetupModel from storage]
        SM3[JobManager.create_module_instance_job]
        SM4[ModuleFactory.create_module_instance]
        SM5[BaseModule.__init__]
        SM6[Create ModuleContext + Services]
        SM7[LocalTaskManager.create_task]
        SM8[TaskSession with Queue + Signal Service]
    end

    subgraph TaskExecution["TaskExecutor - 2 Concurrent Tasks"]
        TE1[main_task:<br/>module.start]
        TE3[signal_task:<br/>Listen stop/cancel via TaskManagerStrategy]
    end

    subgraph ModuleStart["module.start Lifecycle"]
        MS1[Build ToolCache from SetupModel]
        MS2[Set context.tool_cache]
        MS3[Send ModuleStartInfoOutput]
        MS4[await initialize]
        MS5[Init TriggerHandlers]
        MS6[await run]
    end

    subgraph ModuleRun["module.run - Protocol Dispatch"]
        MR1[Validate InputModel]
        MR2[Get protocol from input.root.protocol]
        MR3[Lookup TriggerHandler by protocol]
        MR4[handler.handle input, setup, context]
        MR5[callback → send_message]
    end

    subgraph ModuleStop["module.stop Cleanup"]
        MST1[status = STOPPING]
        MST2[await cleanup]
        MST3[Send EndOfStreamOutput]
        MST4[status = STOPPED]
    end

    subgraph Streaming["Queue-Based Streaming"]
        Q1[asyncio.Queue maxsize=1000]
        Q2[add_to_queue job_id, output]
        Q3[generate_stream_consumer]
        Q4[yield StartModuleResponse]
    end

    %% Client to Server
    C1 --> S1
    C2 --> S1
    S1 --> S2

    %% ConfigSetup Flow
    S2 -->|ConfigSetupModule RPC| CS1
    CS1 --> CS2
    CS2 --> CS3
    CS3 --> CS4
    CS4 --> CS5
    CS5 --> CS6
    CS6 --> CS7
    CS7 --> CS8
    CS8 --> CS9
    CS9 --> CS10
    CS10 -->|Response| C3

    %% StartModule Flow
    S2 -->|StartModule RPC| SM1
    SM1 --> SM2
    SM2 --> SM3
    SM3 --> SM4
    SM4 --> SM5
    SM5 --> SM6
    SM6 --> SM7
    SM7 --> SM8
    SM8 --> TaskExecution

    %% Task Execution
    TE1 --> ModuleStart
    TE2 -.->|Monitoring| SM8
    TE3 -.->|Control| SM8

    %% Module Start
    MS1 --> MS2
    MS2 --> MS3
    MS3 --> MS4
    MS4 --> MS5
    MS5 --> MS6
    MS6 --> ModuleRun

    %% Module Run
    MR1 --> MR2
    MR2 --> MR3
    MR3 --> MR4
    MR4 --> MR5
    MR5 --> Q2

    %% Module Stop
    ModuleRun --> ModuleStop
    MST3 --> Q2

    %% Streaming
    Q2 --> Q1
    Q1 --> Q3
    Q3 --> Q4
    Q4 --> C3

Two Main Flows¤

Step ConfigSetupModule StartModule
1 Parse ConfigSetupModel Parse InputModel
2 Parse SetupModel (config fields) Load SetupModel from storage
3 Create Module Instance Create Module Instance
4 start_config_setup() create_task() with 3 concurrent tasks
5 _resolve_tools() + run_config_setup() in parallel module.start()run() → TriggerHandler
6 Return updated SetupModel Stream outputs via Queue

ConfigSetupModule Flow Detail¤

Client sends ConfigSetupModuleRequest
    │
    ▼
ModuleServicer.ConfigSetupModule()
    │
    ├─ Parse request.content → ConfigSetupModel
    ├─ Parse request.setup_version.content → SetupModel (config_fields=True)
    │
    ▼
JobManager.create_config_setup_instance_job()
    │
    ├─ job_id = uuid.uuid4()
    │
    ├─ module = ModuleFactory.create_module_instance()
    │       │
    │       └─ BaseModule.__init__()
    │           ├─ status = CREATED
    │           └─ context = ModuleContext(services, session, callbacks)
    │
    ├─ TaskSession(job_id, queue)
    │
    └─ module.start_config_setup(config_setup_data, callback)
            │
            ├─ status = RUNNING
            │
            └─ asyncio.gather(
                   _resolve_tools(config_setup_data),
                   run_config_setup(context, config_setup_data)
               )
               │
               └─ callback(setup_model) → Queue → Response

StartModule Flow Detail¤

Client sends StartModuleRequest
    │
    ▼
ModuleServicer.StartModule()
    │
    ├─ input_data = parse InputModel
    ├─ setup_data = load from SetupStrategy
    │
    ▼
JobManager.create_module_instance_job()
    │
    ├─ job_id = uuid.uuid4()
    │
    ├─ module = ModuleFactory.create_module_instance()
    │
    ▼
LocalTaskManager.create_task()
    │
    ├─ TaskSession(job_id, queue, signal_service)
    │
    └─ TaskExecutor.execute_task()
            │
            ├─ main_task: module.start(input_data, setup_data, callback)
            └─ signal_task: session.listen_signals()

module.start() Lifecycle¤

BaseModule.start(input_data, setup_data, callback)
    │
    ├─ context.callbacks.send_message = callback
    │
    ├─ tool_cache = setup_data.build_tool_cache()
    ├─ context.tool_cache = tool_cache
    │
    ├─ callback(ModuleStartInfoOutput)  ──→ Queue
    │
    ├─ await initialize(context, setup_data)
    │
    ├─ self.trigger_handlers = triggers_discoverer.init_handlers(context)
    │
    ├─ await run(input_data, setup_data)
    │       │
    │       ├─ handler = get_trigger(input.root.protocol)
    │       └─ handler.handle(input, setup, context)
    │               │
    │               └─ send_message(output) ──→ Queue
    │
    └─ await stop()
            │
            ├─ await cleanup()
            └─ callback(EndOfStreamOutput) ──→ Queue

Protocol-Based Dispatch¤

InputModel
    │
    └─ root: DataTrigger
           │
           └─ protocol: str  ←── "message", "file", "healthcheck_ping", etc.
                   │
                   ▼
           TriggerHandler lookup by protocol
                   │
                   ▼
           handler.handle(input, setup, context)

Queue-Based Streaming¤

TriggerHandler.handle()
    │
    └─ await send_message(output)
            │
            ▼
       callback(output)
            │
            ▼
       add_to_queue(job_id, output)
            │
            ▼
       session.queue.put(output.model_dump())
            │
            ▼
       ┌─────────────────────────────────┐
       │  asyncio.Queue (maxsize=1000)   │
       └─────────────────────────────────┘
            │
            ▼
       generate_stream_consumer(job_id)
            │
            ▼
       yield StartModuleResponse(output)
            │
            ▼
       Client receives streamed response

IDs Flow¤

Client provides:
    ├─ mission_id
    ├─ setup_id
    └─ setup_version_id

Server generates:
    └─ job_id = uuid.uuid4()  (unique per request)

All IDs stored in:
    └─ ModuleContext.session
            │
            ├─ Used in structured logging
            ├─ Used in signal service records
            └─ Returned in responses

Key Files¤

Component File
Server src/digitalkin/grpc_servers/module_server.py
Servicer src/digitalkin/grpc_servers/module_servicer.py
Module Base src/digitalkin/modules/_base_module.py
Job Manager src/digitalkin/core/job_manager/single_job_manager.py
Task Manager src/digitalkin/core/task_manager/local_task_manager.py
Task Session src/digitalkin/core/task_manager/task_session.py
Task Executor src/digitalkin/core/task_manager/task_executor.py
Factories src/digitalkin/core/common/factories.py
Trigger Handler src/digitalkin/modules/trigger_handler.py