feat(taskbroker): Better Child Management#732
Conversation
I believe this may result in higher and less predictable memory usage. Isn't there a way to avoid all child processes restarting at the same time by adding some jitter to the restart interval? |
You are right, that is a possible effect. And yes, it is possible to add jitter. |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 12b2d19. Configure here.
| logger.debug("taskworker.worker.spawn_children_thread.started") | ||
|
|
||
| # Queue of incoming message from children | ||
| messages: multiprocessing.Queue[ChildMessage] = self._mp_context.Queue() |
There was a problem hiding this comment.
Bug: The messages queue is a local variable in spawn_children_thread and can be garbage-collected upon thread exit, causing child processes to fail when they try to use it.
Severity: MEDIUM
Suggested Fix
Move the messages queue to a class attribute to ensure its lifetime exceeds that of the child processes that use it. Alternatively, explicitly manage the queue's lifecycle by calling messages.close() and messages.join_thread() within a try/finally block in the shutdown flow to guarantee proper cleanup.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: clients/python/src/taskbroker_client/worker/worker.py#L969
Potential issue: The `messages` queue is created as a local variable within the
`spawn_children_thread` function. When this thread terminates during shutdown, the
`messages` queue object goes out of scope and can be garbage collected by Python.
However, child processes, which are terminated shortly after, may still hold references
to this queue. If a child attempts to send a message via `messages.put_nowait()` after
the queue has been garbage collected, it could lead to a `BrokenPipeError` or a
deadlock, creating a race condition during the shutdown sequence.
|
Closed in favor of #735. |

Linear
Refs STREAM-1269
Description
In #731, we deferred setting the status to
SERVINGfor workers in push mode until all threads were warmed up because before, Kubernetes thought workers were ready to receive activations even though they weren't, causing significant throughput declines during worker redeployments.Something similar happens when child processes recycle, or exit voluntarily because they have executed a certain, configurable number of tasks. This is meant to prevent memory leaks.
In some cases, child processes recycle at the same time, decreasing throughput significantly until all child processes are ready again. We can solve this problem by improving the child management process as follows.
SERVINGThis way, after startup, there are always exactly
--concurrencychild processes at all times.Before
After