Skip to content

feat: add concurrent task processing to run() for long_running mode #4

@kitsune-hash

Description

@kitsune-hash

Problem

Currently run() processes tasks sequentially in a for line in sys.stdin loop. Even when the Go worker sends multiple tasks concurrently via stdin (StdioHandler supports multiplexing by task_id), the Python side blocks on each handler call before reading the next task.

This means concurrency > 1 in worker config has no real effect for long_running workers — tasks are just pre-fetched into the stdin buffer.

Proposed Solution

Add a concurrent mode to run() that dispatches incoming tasks to a thread pool:

def run(workers=1):
    # ... load, ready signal ...
    if workers <= 1:
        # Current sequential behavior
        for line in sys.stdin:
            process_single(line, handler, ctx)
    else:
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor(max_workers=workers) as pool:
            for line in sys.stdin:
                pool.submit(process_single, line, handler, ctx)

Stdout writes need to be thread-safe (add a lock around print/flush).

Why Threads (not Processes)

  • The @load context (ML models) must be shared across tasks — threads share memory
  • ML inference (torch, numpy, llama.cpp) releases the GIL during C operations
  • Existing workers (prompt-classifier, image-classifier) already use threading.Lock()
  • GPU inference naturally releases GIL during CUDA calls

Configuration

  • Parameter: run(workers=N)
  • Env var: RUNQY_PYTHON_WORKERS (runtime override without code changes)
  • Default workers=1 = current sequential behavior (backward compatible)

Related

  • Go StdioHandler already supports concurrent multiplexing via pending[taskID] map
  • Workers that benefit: prompt-classifier, image-classifier (both thread-safe)
  • Workers to keep sequential: dolphin (GPU VRAM bound)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions