Problem
Currently run() processes tasks sequentially in a for line in sys.stdin loop. Even when the Go worker sends multiple tasks concurrently via stdin (StdioHandler supports multiplexing by task_id), the Python side blocks on each handler call before reading the next task.
This means concurrency > 1 in worker config has no real effect for long_running workers — tasks are just pre-fetched into the stdin buffer.
Proposed Solution
Add a concurrent mode to run() that dispatches incoming tasks to a thread pool:
def run(workers=1):
# ... load, ready signal ...
if workers <= 1:
# Current sequential behavior
for line in sys.stdin:
process_single(line, handler, ctx)
else:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as pool:
for line in sys.stdin:
pool.submit(process_single, line, handler, ctx)
Stdout writes need to be thread-safe (add a lock around print/flush).
Why Threads (not Processes)
- The
@load context (ML models) must be shared across tasks — threads share memory
- ML inference (torch, numpy, llama.cpp) releases the GIL during C operations
- Existing workers (
prompt-classifier, image-classifier) already use threading.Lock()
- GPU inference naturally releases GIL during CUDA calls
Configuration
- Parameter:
run(workers=N)
- Env var:
RUNQY_PYTHON_WORKERS (runtime override without code changes)
- Default
workers=1 = current sequential behavior (backward compatible)
Related
- Go StdioHandler already supports concurrent multiplexing via
pending[taskID] map
- Workers that benefit:
prompt-classifier, image-classifier (both thread-safe)
- Workers to keep sequential:
dolphin (GPU VRAM bound)
Problem
Currently
run()processes tasks sequentially in afor line in sys.stdinloop. Even when the Go worker sends multiple tasks concurrently via stdin (StdioHandler supports multiplexing bytask_id), the Python side blocks on each handler call before reading the next task.This means
concurrency > 1in worker config has no real effect forlong_runningworkers — tasks are just pre-fetched into the stdin buffer.Proposed Solution
Add a
concurrentmode torun()that dispatches incoming tasks to a thread pool:Stdout writes need to be thread-safe (add a lock around
print/flush).Why Threads (not Processes)
@loadcontext (ML models) must be shared across tasks — threads share memoryprompt-classifier,image-classifier) already usethreading.Lock()Configuration
run(workers=N)RUNQY_PYTHON_WORKERS(runtime override without code changes)workers=1= current sequential behavior (backward compatible)Related
pending[taskID]mapprompt-classifier,image-classifier(both thread-safe)dolphin(GPU VRAM bound)