From 64c2b64304f14f4be96886748e4182ce0c5415e7 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 12 Jun 2026 11:20:22 -0700 Subject: [PATCH 01/23] Allow Specifying Minimum Sizes for Testing Activations --- clients/python/src/taskbroker_client/task.py | 79 ++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 69cb1b8a..4f4c8ea1 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -4,6 +4,7 @@ import datetime import inspect import os +import random import time from collections.abc import Callable, Collection, Mapping, MutableMapping from functools import update_wrapper @@ -200,6 +201,45 @@ def apply_async( wait_for_delivery=self.wait_for_delivery, ) + def apply_async_testing( + self, + args: Any | None = None, + kwargs: Any | None = None, + headers: MutableMapping[str, Any] | None = None, + expires: int | datetime.timedelta | None = None, + countdown: int | datetime.timedelta | None = None, + sizes: Collection[int] | None = None, + **options: Any, + ) -> None: + """ + Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. + + Argument `sizes` contains minimum activation sizes in KB, parsed from repeated `--activation-size` values. + Normal `delay` and `apply_async` calls do not use this path. + """ + if args is None: + args = [] + if kwargs is None: + kwargs = {} + + self._signal_send(task=self, args=args, kwargs=kwargs) + + activation = self.create_testing_activation( + args=args, + kwargs=kwargs, + headers=headers, + expires=expires, + countdown=countdown, + activation_sizes=sizes, + ) + if ALWAYS_EAGER: + self._call_func(*args, **kwargs) + else: + self._namespace.send_task( + activation, + wait_for_delivery=self.wait_for_delivery, + ) + def _call_func(self, *args: Any, **kwargs: Any) -> None: # Overridden in ExternalTask if self.pass_headers: @@ -338,6 +378,45 @@ def create_activation( delay=countdown, ) + def create_testing_activation( + self, + args: Collection[Any], + kwargs: Mapping[Any, Any], + headers: MutableMapping[str, Any] | None = None, + expires: int | datetime.timedelta | None = None, + countdown: int | datetime.timedelta | None = None, + activation_sizes: Collection[int] | None = None, + ) -> TaskActivation: + """ + Build a `TaskActivation` with optional size padding for testing purposes. + + With one activation size, every activation is padded to at least that size. + With multiple sizes, one target size is randomly selected per activation. + If the activation size is already ≥ the target, it will be unchanged. + """ + activation = self.create_activation( + args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown + ) + if not activation_sizes: + return activation + + activation_sizes = list(activation_sizes) + selected_size = ( + activation_sizes[0] if len(activation_sizes) == 1 else random.choice(activation_sizes) + ) + target_bytes = selected_size * 1024 + if activation.ByteSize() >= target_bytes: + return activation + + padding_key = "taskbroker-testing-activation-padding" + while activation.ByteSize() < target_bytes: + missing_bytes = target_bytes - activation.ByteSize() + activation.headers[padding_key] = ( + activation.headers.get(padding_key, "") + "x" * missing_bytes + ) + + return activation + def _create_retry_state(self) -> RetryState: retry = self.retry or self._namespace.default_retry or None if not retry or self.at_most_once: From 91d294a2ebf72b0d7db34019a76a3a168fcfaa9f Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Sun, 14 Jun 2026 15:17:01 -0700 Subject: [PATCH 02/23] Change Activation Size Arg. from KB to Bytes --- clients/python/src/taskbroker_client/task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 4f4c8ea1..798a0a6a 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -214,7 +214,7 @@ def apply_async_testing( """ Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. - Argument `sizes` contains minimum activation sizes in KB, parsed from repeated `--activation-size` values. + Argument `sizes` contains minimum activation sizes in bytes, parsed from repeated `--activation-size` values. Normal `delay` and `apply_async` calls do not use this path. """ if args is None: @@ -404,7 +404,7 @@ def create_testing_activation( selected_size = ( activation_sizes[0] if len(activation_sizes) == 1 else random.choice(activation_sizes) ) - target_bytes = selected_size * 1024 + target_bytes = selected_size if activation.ByteSize() >= target_bytes: return activation From 61c86bfd7b258f755cc813c2d4312d0d354bec35 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 18 Jun 2026 17:41:38 -0700 Subject: [PATCH 03/23] Use Result Queue Size for Batches --- .../src/taskbroker_client/worker/worker.py | 3 ++- clients/python/tests/worker/test_worker.py | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 9c9b7fb6..e2c27371 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -703,6 +703,7 @@ def __init__( update_in_batches: bool = False, ) -> None: self._concurrency = concurrency + self._result_queue_maxsize = result_queue_maxsize self._processing_pool_name = processing_pool_name or "unknown" self._pod_name = pod_name or "unknown" self._update_in_batches = update_in_batches @@ -797,7 +798,7 @@ def result_thread() -> None: break else: results.append(result) - if len(results) >= self._concurrency: + if len(results) >= self._result_queue_maxsize: executor.submit(self.send_results, results, False) results = [] except queue.Empty: diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 65e565ed..bffc2154 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -328,6 +328,7 @@ def _make_result_thread_pool( capture: _SendResultCapture, *, concurrency: int = 3, + result_queue_maxsize: int = 3, update_in_batches: bool, ) -> TaskWorkerProcessingPool: return TaskWorkerProcessingPool( @@ -336,6 +337,7 @@ def _make_result_thread_pool( mp_context=get_context("fork"), max_child_task_count=100, concurrency=concurrency, + result_queue_maxsize=result_queue_maxsize, processing_pool_name="test", process_type="fork", update_in_batches=update_in_batches, @@ -522,18 +524,23 @@ def test_push_task_queue(self) -> None: def test_result_thread_sends_full_batch(self) -> None: capture = _SendResultCapture() - concurrency = 3 - pool = _make_result_thread_pool(capture, concurrency=concurrency, update_in_batches=True) + result_queue_maxsize = 5 + pool = _make_result_thread_pool( + capture, + concurrency=2, + result_queue_maxsize=result_queue_maxsize, + update_in_batches=True, + ) try: pool.start_result_thread() - for i in range(concurrency): + for i in range(result_queue_maxsize): pool.put_result(_make_processing_result(str(i))) capture.wait_for_calls(1) batch, is_draining = capture.send_calls[0] - self.assertEqual(len(batch), concurrency) - self.assertEqual({result.task_id for result in batch}, {"0", "1", "2"}) + self.assertEqual(len(batch), result_queue_maxsize) + self.assertEqual({result.task_id for result in batch}, {"0", "1", "2", "3", "4"}) self.assertFalse(is_draining) finally: pool.shutdown() From 211537037cef984cf43495a7cc42ff510d3f39e6 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 23 Jun 2026 10:02:16 -0700 Subject: [PATCH 04/23] Log Activations for Tracing --- .../src/taskbroker_client/worker/worker.py | 22 +++++++++++++++++++ .../taskbroker_client/worker/workerchild.py | 8 +++++++ 2 files changed, 30 insertions(+) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index e2c27371..5d8607c9 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -8,6 +8,7 @@ import threading import time from concurrent.futures import ThreadPoolExecutor +from enum import Enum from multiprocessing.context import ForkContext, ForkServerContext, SpawnContext from multiprocessing.process import BaseProcess from pathlib import Path @@ -54,6 +55,13 @@ WORKER_SERVICE_NAME = "sentry_protos.taskbroker.v1.WorkerService" +class ActivationEvent(Enum): + RECEIVED = "RECEIVED" + TIMED_OUT = "TIMED_OUT" + QUEUED = "QUEUED" + PICKED_UP = "PICKED_UP" + + class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): """ gRPC servicer that receives task activations pushed from the broker @@ -68,6 +76,10 @@ def PushTask( request: PushTaskRequest, context: grpc.ServicerContext, ) -> PushTaskResponse: + logger.debug( + "Activation received", extra={"id": request.task.id, "event": ActivationEvent.RECEIVED} + ) + """Handle incoming task activation.""" start_time = time.monotonic() self.worker_pool._metrics.incr( @@ -84,6 +96,11 @@ def PushTask( # Push the task to the worker queue (wait at most N seconds) if not self.worker_pool.push_task(inflight, timeout=self.push_task_timeout): + logger.debug( + "Activation enqueue timed out", + extra={"id": request.task.id, "event": ActivationEvent.TIMED_OUT}, + ) + self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={"result": "busy", "processing_pool": self.worker_pool._processing_pool_name}, @@ -97,6 +114,11 @@ def PushTask( context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") else: + logger.debug( + "Activation enqueued", + extra={"id": request.task.id, "event": ActivationEvent.QUEUED}, + ) + self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={ diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index a63634e4..98fe92fa 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -38,6 +38,7 @@ from taskbroker_client.task import Task from taskbroker_client.types import ContextHook, InflightTaskActivation, ProcessingResult from taskbroker_client.worker.producer import TaskProducer +from taskbroker_client.worker.worker import ActivationEvent logger = logging.getLogger(__name__) @@ -362,6 +363,13 @@ def check_task_future_completion() -> None: try: check_task_future_completion() inflight = child_tasks.get(timeout=1.0) + logger.debug( + "Activation popped from queue", + extra={ + "id": inflight.activation.id, + "event": ActivationEvent.PICKED_UP, + }, + ) except queue.Empty: metrics.incr( "taskworker.worker.child_task_queue_empty", From f13cc2d738df4018a2cf25fd78c171fef5cb58be Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 23 Jun 2026 10:12:59 -0700 Subject: [PATCH 05/23] Fix Import Issue --- clients/python/src/taskbroker_client/worker/events.py | 8 ++++++++ clients/python/src/taskbroker_client/worker/worker.py | 9 +-------- .../python/src/taskbroker_client/worker/workerchild.py | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) create mode 100644 clients/python/src/taskbroker_client/worker/events.py diff --git a/clients/python/src/taskbroker_client/worker/events.py b/clients/python/src/taskbroker_client/worker/events.py new file mode 100644 index 00000000..9e3c8464 --- /dev/null +++ b/clients/python/src/taskbroker_client/worker/events.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ActivationEvent(Enum): + RECEIVED = "RECEIVED" + TIMED_OUT = "TIMED_OUT" + QUEUED = "QUEUED" + PICKED_UP = "PICKED_UP" diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 5d8607c9..9c16a962 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -8,7 +8,6 @@ import threading import time from concurrent.futures import ThreadPoolExecutor -from enum import Enum from multiprocessing.context import ForkContext, ForkServerContext, SpawnContext from multiprocessing.process import BaseProcess from pathlib import Path @@ -41,6 +40,7 @@ TaskbrokerClient, parse_rpc_secret_list, ) +from taskbroker_client.worker.events import ActivationEvent from taskbroker_client.worker.push_clients import BatchPushTaskbrokerClient, PushTaskbrokerClient from taskbroker_client.worker.workerchild import child_process @@ -55,13 +55,6 @@ WORKER_SERVICE_NAME = "sentry_protos.taskbroker.v1.WorkerService" -class ActivationEvent(Enum): - RECEIVED = "RECEIVED" - TIMED_OUT = "TIMED_OUT" - QUEUED = "QUEUED" - PICKED_UP = "PICKED_UP" - - class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): """ gRPC servicer that receives task activations pushed from the broker diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 98fe92fa..3835a63e 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -37,8 +37,8 @@ from taskbroker_client.state import clear_current_task, current_task, set_current_task from taskbroker_client.task import Task from taskbroker_client.types import ContextHook, InflightTaskActivation, ProcessingResult +from taskbroker_client.worker.events import ActivationEvent from taskbroker_client.worker.producer import TaskProducer -from taskbroker_client.worker.worker import ActivationEvent logger = logging.getLogger(__name__) From 4198a1c541ab428aba04420dc27852a633302d8d Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 23 Jun 2026 11:17:46 -0700 Subject: [PATCH 06/23] Improve Logging --- .../taskbroker_client/structured_logging.py | 69 +++++++++++++++++++ .../src/taskbroker_client/worker/events.py | 4 +- .../src/taskbroker_client/worker/worker.py | 42 +++++++++-- .../taskbroker_client/worker/workerchild.py | 10 ++- .../python/tests/test_structured_logging.py | 28 ++++++++ 5 files changed, 143 insertions(+), 10 deletions(-) create mode 100644 clients/python/src/taskbroker_client/structured_logging.py create mode 100644 clients/python/tests/test_structured_logging.py diff --git a/clients/python/src/taskbroker_client/structured_logging.py b/clients/python/src/taskbroker_client/structured_logging.py new file mode 100644 index 00000000..490978cb --- /dev/null +++ b/clients/python/src/taskbroker_client/structured_logging.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import json +import logging +import sys +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +_STRUCTURED_LOG_HANDLER = "_taskbroker_structured_log_handler" +_RESERVED_LOG_RECORD_ATTRS = set(logging.makeLogRecord({}).__dict__) | { + "asctime", + "message", +} + + +def _json_safe(value: Any) -> Any: + if isinstance(value, Enum): + return value.value + if isinstance(value, dict): + return {str(key): _json_safe(item) for key, item in value.items()} + if isinstance(value, (list, tuple)): + return [_json_safe(item) for item in value] + if isinstance(value, set): + return sorted(_json_safe(item) for item in value) + if isinstance(value, BaseException): + return str(value) + return value + + +class StructuredLogFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload: dict[str, Any] = { + "message": record.getMessage(), + "severity": record.levelname, + "timestamp": datetime.fromtimestamp(record.created, UTC).isoformat(), + "logger": record.name, + } + + for key, value in record.__dict__.items(): + if key in _RESERVED_LOG_RECORD_ATTRS or key.startswith("_"): + continue + payload[key] = _json_safe(value) + + if record.exc_info: + payload["exception"] = self.formatException(record.exc_info) + if record.stack_info: + payload["stack"] = self.formatStack(record.stack_info) + + return json.dumps(payload, separators=(",", ":"), default=str) + + +def configure_taskbroker_structured_logging() -> None: + """ + Emit taskbroker_client logs as JSON on stdout so container log collectors + ingest them as structured logs instead of stderr text payloads. + """ + package_logger = logging.getLogger("taskbroker_client") + + for handler in package_logger.handlers: + if getattr(handler, _STRUCTURED_LOG_HANDLER, False): + return + + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(StructuredLogFormatter()) + setattr(handler, _STRUCTURED_LOG_HANDLER, True) + + package_logger.addHandler(handler) + package_logger.propagate = False diff --git a/clients/python/src/taskbroker_client/worker/events.py b/clients/python/src/taskbroker_client/worker/events.py index 9e3c8464..68bcb06f 100644 --- a/clients/python/src/taskbroker_client/worker/events.py +++ b/clients/python/src/taskbroker_client/worker/events.py @@ -1,8 +1,10 @@ from enum import Enum -class ActivationEvent(Enum): +class TraceEvent(Enum): + WORKER_STARTED = "WORKER_UP" RECEIVED = "RECEIVED" TIMED_OUT = "TIMED_OUT" QUEUED = "QUEUED" PICKED_UP = "PICKED_UP" + WORKER_STOPPED = "WORKER_STOPPED" diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 9c16a962..7a69e82f 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -32,6 +32,7 @@ WORKER_CHILD_JOIN_TIMEOUT_SEC, ) from taskbroker_client.metrics import MetricsBackend +from taskbroker_client.structured_logging import configure_taskbroker_structured_logging from taskbroker_client.types import InflightTaskActivation, ProcessingResult from taskbroker_client.worker.client import ( HealthCheckSettings, @@ -40,7 +41,7 @@ TaskbrokerClient, parse_rpc_secret_list, ) -from taskbroker_client.worker.events import ActivationEvent +from taskbroker_client.worker.events import TraceEvent from taskbroker_client.worker.push_clients import BatchPushTaskbrokerClient, PushTaskbrokerClient from taskbroker_client.worker.workerchild import child_process @@ -70,7 +71,12 @@ def PushTask( context: grpc.ServicerContext, ) -> PushTaskResponse: logger.debug( - "Activation received", extra={"id": request.task.id, "event": ActivationEvent.RECEIVED} + "taskworker.activation.received", + extra={ + "for": "trace-activations", + "id": request.task.id, + "event": TraceEvent.RECEIVED.value, + }, ) """Handle incoming task activation.""" @@ -90,8 +96,12 @@ def PushTask( # Push the task to the worker queue (wait at most N seconds) if not self.worker_pool.push_task(inflight, timeout=self.push_task_timeout): logger.debug( - "Activation enqueue timed out", - extra={"id": request.task.id, "event": ActivationEvent.TIMED_OUT}, + "taskworker.activation.enqueue_timed_out", + extra={ + "for": "trace-activations", + "id": request.task.id, + "event": TraceEvent.TIMED_OUT.value, + }, ) self.worker_pool._metrics.incr( @@ -108,8 +118,8 @@ def PushTask( context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") else: logger.debug( - "Activation enqueued", - extra={"id": request.task.id, "event": ActivationEvent.QUEUED}, + "taskworker.activation.enqueued", + extra={"id": request.task.id, "event": TraceEvent.QUEUED.value}, ) self.worker_pool._metrics.incr( @@ -158,6 +168,8 @@ def __init__( push_task_timeout: float = 5, update_in_batches: bool = False, ) -> None: + configure_taskbroker_structured_logging() + app = import_app(app_module) if process_type == "fork": @@ -377,6 +389,14 @@ def signal_handler(*args: Any) -> None: health_servicer.set("", health_pb2.HealthCheckResponse.SERVING) health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.SERVING) + logger.debug( + "taskworker.started", + extra={ + "for": "trace-activations", + "event": TraceEvent.WORKER_STARTED, + }, + ) + logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) self._start_health_check_thread() @@ -391,6 +411,14 @@ def signal_handler(*args: Any) -> None: health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING) health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING) + logger.debug( + "taskworker.stopped", + extra={ + "for": "trace-activations", + "event": TraceEvent.WORKER_STOPPED, + }, + ) + if server is not None: server.stop(grace=5) @@ -514,6 +542,8 @@ def __init__( health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, ) -> None: self._namespace = namespace + configure_taskbroker_structured_logging() + app = import_app(app_module) if process_type == "fork": diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 3835a63e..53584e1b 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -35,9 +35,10 @@ from taskbroker_client.constants import CompressionType from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import clear_current_task, current_task, set_current_task +from taskbroker_client.structured_logging import configure_taskbroker_structured_logging from taskbroker_client.task import Task from taskbroker_client.types import ContextHook, InflightTaskActivation, ProcessingResult -from taskbroker_client.worker.events import ActivationEvent +from taskbroker_client.worker.events import TraceEvent from taskbroker_client.worker.producer import TaskProducer logger = logging.getLogger(__name__) @@ -195,6 +196,8 @@ def child_process( and not the module root. If modules that include django are imported at the module level the wrong django settings will be used. """ + configure_taskbroker_structured_logging() + app = import_app(app_module) app.load_modules() metrics = app.metrics @@ -364,10 +367,11 @@ def check_task_future_completion() -> None: check_task_future_completion() inflight = child_tasks.get(timeout=1.0) logger.debug( - "Activation popped from queue", + "taskworker.activation.picked_up", extra={ + "for": "trace-activations", "id": inflight.activation.id, - "event": ActivationEvent.PICKED_UP, + "event": TraceEvent.PICKED_UP.value, }, ) except queue.Empty: diff --git a/clients/python/tests/test_structured_logging.py b/clients/python/tests/test_structured_logging.py new file mode 100644 index 00000000..ddcac0bc --- /dev/null +++ b/clients/python/tests/test_structured_logging.py @@ -0,0 +1,28 @@ +import json +import logging + +from taskbroker_client.structured_logging import StructuredLogFormatter +from taskbroker_client.worker.events import TraceEvent + + +def test_structured_log_formatter_includes_cloud_logging_fields() -> None: + record = logging.LogRecord( + name="taskbroker_client.worker.worker", + level=logging.DEBUG, + pathname=__file__, + lineno=1, + msg="taskworker.activation.received", + args=(), + exc_info=None, + ) + record.id = "123" + record.event = TraceEvent.RECEIVED + + payload = json.loads(StructuredLogFormatter().format(record)) + + assert payload["message"] == "taskworker.activation.received" + assert payload["severity"] == "DEBUG" + assert payload["logger"] == "taskbroker_client.worker.worker" + assert payload["id"] == "123" + assert payload["event"] == "RECEIVED" + assert "timestamp" in payload From 2ec936949dffa10739f3e5a1066a7cd79c25f830 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 23 Jun 2026 14:05:54 -0700 Subject: [PATCH 07/23] Fix Enqueued Log --- clients/python/src/taskbroker_client/worker/worker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 7a69e82f..1650af0a 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -119,7 +119,11 @@ def PushTask( else: logger.debug( "taskworker.activation.enqueued", - extra={"id": request.task.id, "event": TraceEvent.QUEUED.value}, + extra={ + "for": "trace-activations", + "id": request.task.id, + "event": TraceEvent.QUEUED.value, + }, ) self.worker_pool._metrics.incr( From 4fc8da7e19a0c616bf26a94c42ad2c9b172ff5fc Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 25 Jun 2026 08:37:15 -0700 Subject: [PATCH 08/23] Add Processing Deadline Duration Override --- clients/python/src/taskbroker_client/task.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 798a0a6a..f45a5874 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -209,12 +209,14 @@ def apply_async_testing( expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, sizes: Collection[int] | None = None, + processing_deadline_duration: int | datetime.timedelta | None = None, **options: Any, ) -> None: """ Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. Argument `sizes` contains minimum activation sizes in bytes, parsed from repeated `--activation-size` values. + Argument `processing_deadline_duration` overrides the task's configured execution deadline. Normal `delay` and `apply_async` calls do not use this path. """ if args is None: @@ -231,6 +233,7 @@ def apply_async_testing( expires=expires, countdown=countdown, activation_sizes=sizes, + processing_deadline_duration=processing_deadline_duration, ) if ALWAYS_EAGER: self._call_func(*args, **kwargs) @@ -386,6 +389,7 @@ def create_testing_activation( expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, activation_sizes: Collection[int] | None = None, + processing_deadline_duration: int | datetime.timedelta | None = None, ) -> TaskActivation: """ Build a `TaskActivation` with optional size padding for testing purposes. @@ -393,10 +397,17 @@ def create_testing_activation( With one activation size, every activation is padded to at least that size. With multiple sizes, one target size is randomly selected per activation. If the activation size is already ≥ the target, it will be unchanged. + + If provided, `processing_deadline_duration` overrides the task's configured execution deadline. """ activation = self.create_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) + if processing_deadline_duration is not None: + if isinstance(processing_deadline_duration, datetime.timedelta): + processing_deadline_duration = int(processing_deadline_duration.total_seconds()) + activation.processing_deadline_duration = processing_deadline_duration + if not activation_sizes: return activation From aa2c4309a9bec64baa58f0c2bcb93b6bc821c8c3 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 25 Jun 2026 08:49:25 -0700 Subject: [PATCH 09/23] Make Durations a List --- clients/python/src/taskbroker_client/task.py | 39 ++++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index f45a5874..04cc8b98 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -209,14 +209,17 @@ def apply_async_testing( expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, sizes: Collection[int] | None = None, - processing_deadline_duration: int | datetime.timedelta | None = None, + durations: Collection[int | datetime.timedelta] | None = None, **options: Any, ) -> None: """ Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. Argument `sizes` contains minimum activation sizes in bytes, parsed from repeated `--activation-size` values. - Argument `processing_deadline_duration` overrides the task's configured execution deadline. + + Argument `durations` contains execution deadlines in seconds. + If multiple durations are provided, one is randomly selected per activation. + Normal `delay` and `apply_async` calls do not use this path. """ if args is None: @@ -232,8 +235,8 @@ def apply_async_testing( headers=headers, expires=expires, countdown=countdown, - activation_sizes=sizes, - processing_deadline_duration=processing_deadline_duration, + sizes=sizes, + durations=durations, ) if ALWAYS_EAGER: self._call_func(*args, **kwargs) @@ -388,8 +391,8 @@ def create_testing_activation( headers: MutableMapping[str, Any] | None = None, expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, - activation_sizes: Collection[int] | None = None, - processing_deadline_duration: int | datetime.timedelta | None = None, + sizes: Collection[int] | None = None, + durations: Collection[int | datetime.timedelta] | None = None, ) -> TaskActivation: """ Build a `TaskActivation` with optional size padding for testing purposes. @@ -398,23 +401,27 @@ def create_testing_activation( With multiple sizes, one target size is randomly selected per activation. If the activation size is already ≥ the target, it will be unchanged. - If provided, `processing_deadline_duration` overrides the task's configured execution deadline. + With one duration, every activation uses that processing deadline. + With multiple durations, one processing deadline is randomly selected per activation. """ activation = self.create_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) - if processing_deadline_duration is not None: - if isinstance(processing_deadline_duration, datetime.timedelta): - processing_deadline_duration = int(processing_deadline_duration.total_seconds()) - activation.processing_deadline_duration = processing_deadline_duration + if durations: + durations = list(durations) - if not activation_sizes: + selected_duration = durations[0] if len(durations) == 1 else random.choice(durations) + + if isinstance(selected_duration, datetime.timedelta): + selected_duration = int(selected_duration.total_seconds()) + + activation.processing_deadline_duration = selected_duration + + if not sizes: return activation - activation_sizes = list(activation_sizes) - selected_size = ( - activation_sizes[0] if len(activation_sizes) == 1 else random.choice(activation_sizes) - ) + sizes = list(sizes) + selected_size = sizes[0] if len(sizes) == 1 else random.choice(sizes) target_bytes = selected_size if activation.ByteSize() >= target_bytes: return activation From e3fd3e42a8dd1b6a4fe558e0166c5e26581f4a21 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 25 Jun 2026 09:07:58 -0700 Subject: [PATCH 10/23] Don't Randomly Select Bytes / Seconds --- clients/python/src/taskbroker_client/task.py | 43 +++++++------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 04cc8b98..11eb1b26 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -4,7 +4,6 @@ import datetime import inspect import os -import random import time from collections.abc import Callable, Collection, Mapping, MutableMapping from functools import update_wrapper @@ -208,8 +207,8 @@ def apply_async_testing( headers: MutableMapping[str, Any] | None = None, expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, - sizes: Collection[int] | None = None, - durations: Collection[int | datetime.timedelta] | None = None, + bytes: int | None = None, + seconds: int | None = None, **options: Any, ) -> None: """ @@ -235,8 +234,8 @@ def apply_async_testing( headers=headers, expires=expires, countdown=countdown, - sizes=sizes, - durations=durations, + bytes=bytes, + seconds=seconds, ) if ALWAYS_EAGER: self._call_func(*args, **kwargs) @@ -391,8 +390,8 @@ def create_testing_activation( headers: MutableMapping[str, Any] | None = None, expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, - sizes: Collection[int] | None = None, - durations: Collection[int | datetime.timedelta] | None = None, + bytes: int | None = None, + seconds: int | None = None, ) -> TaskActivation: """ Build a `TaskActivation` with optional size padding for testing purposes. @@ -407,31 +406,19 @@ def create_testing_activation( activation = self.create_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) - if durations: - durations = list(durations) - selected_duration = durations[0] if len(durations) == 1 else random.choice(durations) + if seconds: + activation.processing_deadline_duration = seconds - if isinstance(selected_duration, datetime.timedelta): - selected_duration = int(selected_duration.total_seconds()) + if bytes: + padding_key = "taskbroker-testing-activation-padding" - activation.processing_deadline_duration = selected_duration + while activation.ByteSize() < bytes: + missing_bytes = bytes - activation.ByteSize() - if not sizes: - return activation - - sizes = list(sizes) - selected_size = sizes[0] if len(sizes) == 1 else random.choice(sizes) - target_bytes = selected_size - if activation.ByteSize() >= target_bytes: - return activation - - padding_key = "taskbroker-testing-activation-padding" - while activation.ByteSize() < target_bytes: - missing_bytes = target_bytes - activation.ByteSize() - activation.headers[padding_key] = ( - activation.headers.get(padding_key, "") + "x" * missing_bytes - ) + activation.headers[padding_key] = ( + activation.headers.get(padding_key, "") + "x" * missing_bytes + ) return activation From 8fbcf3d6cfd84c1c5a4833aebd2c2935c96eafc8 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 25 Jun 2026 09:47:19 -0700 Subject: [PATCH 11/23] Fix Subsecond Duration --- clients/python/src/taskbroker_client/task.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 11eb1b26..838fb9ce 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -7,6 +7,7 @@ import time from collections.abc import Callable, Collection, Mapping, MutableMapping from functools import update_wrapper +from math import ceil from typing import TYPE_CHECKING, Any, Generic, ParamSpec, TypeVar, get_origin from uuid import uuid4 @@ -208,7 +209,7 @@ def apply_async_testing( expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, bytes: int | None = None, - seconds: int | None = None, + seconds: float | None = None, **options: Any, ) -> None: """ @@ -391,7 +392,7 @@ def create_testing_activation( expires: int | datetime.timedelta | None = None, countdown: int | datetime.timedelta | None = None, bytes: int | None = None, - seconds: int | None = None, + seconds: float | None = None, ) -> TaskActivation: """ Build a `TaskActivation` with optional size padding for testing purposes. @@ -408,7 +409,7 @@ def create_testing_activation( ) if seconds: - activation.processing_deadline_duration = seconds + activation.processing_deadline_duration = DEFAULT_PROCESSING_DEADLINE + ceil(seconds) if bytes: padding_key = "taskbroker-testing-activation-padding" From e2d907ebc3ec4b0545a19e46f400fbc4b2ecb66a Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 29 Jun 2026 17:39:26 -0700 Subject: [PATCH 12/23] Remove Structured Logging Junk --- .../taskbroker_client/structured_logging.py | 69 ------------------- .../src/taskbroker_client/worker/events.py | 10 --- .../src/taskbroker_client/worker/worker.py | 49 ------------- .../taskbroker_client/worker/workerchild.py | 12 ---- .../python/tests/test_structured_logging.py | 28 -------- 5 files changed, 168 deletions(-) delete mode 100644 clients/python/src/taskbroker_client/structured_logging.py delete mode 100644 clients/python/src/taskbroker_client/worker/events.py delete mode 100644 clients/python/tests/test_structured_logging.py diff --git a/clients/python/src/taskbroker_client/structured_logging.py b/clients/python/src/taskbroker_client/structured_logging.py deleted file mode 100644 index 490978cb..00000000 --- a/clients/python/src/taskbroker_client/structured_logging.py +++ /dev/null @@ -1,69 +0,0 @@ -from __future__ import annotations - -import json -import logging -import sys -from datetime import UTC, datetime -from enum import Enum -from typing import Any - -_STRUCTURED_LOG_HANDLER = "_taskbroker_structured_log_handler" -_RESERVED_LOG_RECORD_ATTRS = set(logging.makeLogRecord({}).__dict__) | { - "asctime", - "message", -} - - -def _json_safe(value: Any) -> Any: - if isinstance(value, Enum): - return value.value - if isinstance(value, dict): - return {str(key): _json_safe(item) for key, item in value.items()} - if isinstance(value, (list, tuple)): - return [_json_safe(item) for item in value] - if isinstance(value, set): - return sorted(_json_safe(item) for item in value) - if isinstance(value, BaseException): - return str(value) - return value - - -class StructuredLogFormatter(logging.Formatter): - def format(self, record: logging.LogRecord) -> str: - payload: dict[str, Any] = { - "message": record.getMessage(), - "severity": record.levelname, - "timestamp": datetime.fromtimestamp(record.created, UTC).isoformat(), - "logger": record.name, - } - - for key, value in record.__dict__.items(): - if key in _RESERVED_LOG_RECORD_ATTRS or key.startswith("_"): - continue - payload[key] = _json_safe(value) - - if record.exc_info: - payload["exception"] = self.formatException(record.exc_info) - if record.stack_info: - payload["stack"] = self.formatStack(record.stack_info) - - return json.dumps(payload, separators=(",", ":"), default=str) - - -def configure_taskbroker_structured_logging() -> None: - """ - Emit taskbroker_client logs as JSON on stdout so container log collectors - ingest them as structured logs instead of stderr text payloads. - """ - package_logger = logging.getLogger("taskbroker_client") - - for handler in package_logger.handlers: - if getattr(handler, _STRUCTURED_LOG_HANDLER, False): - return - - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(StructuredLogFormatter()) - setattr(handler, _STRUCTURED_LOG_HANDLER, True) - - package_logger.addHandler(handler) - package_logger.propagate = False diff --git a/clients/python/src/taskbroker_client/worker/events.py b/clients/python/src/taskbroker_client/worker/events.py deleted file mode 100644 index 68bcb06f..00000000 --- a/clients/python/src/taskbroker_client/worker/events.py +++ /dev/null @@ -1,10 +0,0 @@ -from enum import Enum - - -class TraceEvent(Enum): - WORKER_STARTED = "WORKER_UP" - RECEIVED = "RECEIVED" - TIMED_OUT = "TIMED_OUT" - QUEUED = "QUEUED" - PICKED_UP = "PICKED_UP" - WORKER_STOPPED = "WORKER_STOPPED" diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 1650af0a..e2c27371 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -32,7 +32,6 @@ WORKER_CHILD_JOIN_TIMEOUT_SEC, ) from taskbroker_client.metrics import MetricsBackend -from taskbroker_client.structured_logging import configure_taskbroker_structured_logging from taskbroker_client.types import InflightTaskActivation, ProcessingResult from taskbroker_client.worker.client import ( HealthCheckSettings, @@ -41,7 +40,6 @@ TaskbrokerClient, parse_rpc_secret_list, ) -from taskbroker_client.worker.events import TraceEvent from taskbroker_client.worker.push_clients import BatchPushTaskbrokerClient, PushTaskbrokerClient from taskbroker_client.worker.workerchild import child_process @@ -70,15 +68,6 @@ def PushTask( request: PushTaskRequest, context: grpc.ServicerContext, ) -> PushTaskResponse: - logger.debug( - "taskworker.activation.received", - extra={ - "for": "trace-activations", - "id": request.task.id, - "event": TraceEvent.RECEIVED.value, - }, - ) - """Handle incoming task activation.""" start_time = time.monotonic() self.worker_pool._metrics.incr( @@ -95,15 +84,6 @@ def PushTask( # Push the task to the worker queue (wait at most N seconds) if not self.worker_pool.push_task(inflight, timeout=self.push_task_timeout): - logger.debug( - "taskworker.activation.enqueue_timed_out", - extra={ - "for": "trace-activations", - "id": request.task.id, - "event": TraceEvent.TIMED_OUT.value, - }, - ) - self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={"result": "busy", "processing_pool": self.worker_pool._processing_pool_name}, @@ -117,15 +97,6 @@ def PushTask( context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") else: - logger.debug( - "taskworker.activation.enqueued", - extra={ - "for": "trace-activations", - "id": request.task.id, - "event": TraceEvent.QUEUED.value, - }, - ) - self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={ @@ -172,8 +143,6 @@ def __init__( push_task_timeout: float = 5, update_in_batches: bool = False, ) -> None: - configure_taskbroker_structured_logging() - app = import_app(app_module) if process_type == "fork": @@ -393,14 +362,6 @@ def signal_handler(*args: Any) -> None: health_servicer.set("", health_pb2.HealthCheckResponse.SERVING) health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.SERVING) - logger.debug( - "taskworker.started", - extra={ - "for": "trace-activations", - "event": TraceEvent.WORKER_STARTED, - }, - ) - logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) self._start_health_check_thread() @@ -415,14 +376,6 @@ def signal_handler(*args: Any) -> None: health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING) health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING) - logger.debug( - "taskworker.stopped", - extra={ - "for": "trace-activations", - "event": TraceEvent.WORKER_STOPPED, - }, - ) - if server is not None: server.stop(grace=5) @@ -546,8 +499,6 @@ def __init__( health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, ) -> None: self._namespace = namespace - configure_taskbroker_structured_logging() - app = import_app(app_module) if process_type == "fork": diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 53584e1b..a63634e4 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -35,10 +35,8 @@ from taskbroker_client.constants import CompressionType from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import clear_current_task, current_task, set_current_task -from taskbroker_client.structured_logging import configure_taskbroker_structured_logging from taskbroker_client.task import Task from taskbroker_client.types import ContextHook, InflightTaskActivation, ProcessingResult -from taskbroker_client.worker.events import TraceEvent from taskbroker_client.worker.producer import TaskProducer logger = logging.getLogger(__name__) @@ -196,8 +194,6 @@ def child_process( and not the module root. If modules that include django are imported at the module level the wrong django settings will be used. """ - configure_taskbroker_structured_logging() - app = import_app(app_module) app.load_modules() metrics = app.metrics @@ -366,14 +362,6 @@ def check_task_future_completion() -> None: try: check_task_future_completion() inflight = child_tasks.get(timeout=1.0) - logger.debug( - "taskworker.activation.picked_up", - extra={ - "for": "trace-activations", - "id": inflight.activation.id, - "event": TraceEvent.PICKED_UP.value, - }, - ) except queue.Empty: metrics.incr( "taskworker.worker.child_task_queue_empty", diff --git a/clients/python/tests/test_structured_logging.py b/clients/python/tests/test_structured_logging.py deleted file mode 100644 index ddcac0bc..00000000 --- a/clients/python/tests/test_structured_logging.py +++ /dev/null @@ -1,28 +0,0 @@ -import json -import logging - -from taskbroker_client.structured_logging import StructuredLogFormatter -from taskbroker_client.worker.events import TraceEvent - - -def test_structured_log_formatter_includes_cloud_logging_fields() -> None: - record = logging.LogRecord( - name="taskbroker_client.worker.worker", - level=logging.DEBUG, - pathname=__file__, - lineno=1, - msg="taskworker.activation.received", - args=(), - exc_info=None, - ) - record.id = "123" - record.event = TraceEvent.RECEIVED - - payload = json.loads(StructuredLogFormatter().format(record)) - - assert payload["message"] == "taskworker.activation.received" - assert payload["severity"] == "DEBUG" - assert payload["logger"] == "taskbroker_client.worker.worker" - assert payload["id"] == "123" - assert payload["event"] == "RECEIVED" - assert "timestamp" in payload From 867188204d17a3bf680fea47e606c5e7d9a5ec63 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 06:38:05 -0700 Subject: [PATCH 13/23] Register Canary Task w/All Namespaces --- .../python/src/taskbroker_client/canary.py | 15 ++++++ .../python/src/taskbroker_client/registry.py | 18 +++++++ clients/python/tests/test_registry.py | 47 ++++++++++++++++++- clients/python/tests/worker/test_worker.py | 41 ++++++++++++++++ 4 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 clients/python/src/taskbroker_client/canary.py diff --git a/clients/python/src/taskbroker_client/canary.py b/clients/python/src/taskbroker_client/canary.py new file mode 100644 index 00000000..ac6e1a4f --- /dev/null +++ b/clients/python/src/taskbroker_client/canary.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import logging +import time + +CANARY_TASK_NAME = "canary_task" +CANARY_TASK_SLEEP_SECONDS = 0.1 + +logger = logging.getLogger(__name__) + + +def canary_task() -> None: + logger.info("Running canary task...") + time.sleep(CANARY_TASK_SLEEP_SECONDS) + print("Done running canary task!") diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 93aee24d..f0b542a3 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -12,6 +12,7 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA +from taskbroker_client.canary import CANARY_TASK_NAME, canary_task from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType from taskbroker_client.metrics import MetricsBackend from taskbroker_client.retry import Retry @@ -56,6 +57,15 @@ def __init__( self._producers: dict[str, ProducerProtocol] = {} self._producer_factory = producer_factory self.metrics = metrics + self._register_builtin_tasks() + + def _register_builtin_tasks(self) -> None: + self._registered_tasks[CANARY_TASK_NAME] = Task( + name=CANARY_TASK_NAME, + func=canary_task, + namespace=self, + processing_deadline_duration=self.default_processing_deadline_duration, + ) def get(self, name: str) -> Task[Any, Any]: """ @@ -223,6 +233,14 @@ class ExternalNamespace(TaskNamespace): to Kafka, not called locally. """ + def _register_builtin_tasks(self) -> None: + self._registered_tasks[CANARY_TASK_NAME] = ExternalTask( + name=CANARY_TASK_NAME, + func=canary_task, + namespace=self, + processing_deadline_duration=self.default_processing_deadline_duration, + ) + @property def topic(self) -> str: return self.router.route_namespace(f"{self.application}:{self.name}") diff --git a/clients/python/tests/test_registry.py b/clients/python/tests/test_registry.py index c0a661ab..7da617bc 100644 --- a/clients/python/tests/test_registry.py +++ b/clients/python/tests/test_registry.py @@ -1,6 +1,6 @@ import base64 from concurrent.futures import Future -from unittest.mock import Mock +from unittest.mock import Mock, patch import msgpack import orjson @@ -11,12 +11,13 @@ ON_ATTEMPTS_EXCEEDED_DISCARD, ) +from taskbroker_client.canary import CANARY_TASK_NAME from taskbroker_client.constants import MAX_PARAMETER_BYTES_BEFORE_COMPRESSION, CompressionType from taskbroker_client.metrics import NoOpMetricsBackend from taskbroker_client.registry import TaskNamespace, TaskRegistry from taskbroker_client.retry import LastAction, Retry from taskbroker_client.router import DefaultRouter -from taskbroker_client.task import Task +from taskbroker_client.task import ExternalTask, Task from .conftest import producer_factory @@ -44,6 +45,33 @@ def simple_task() -> None: assert task.name == "tests.simple_task" +def test_namespace_registers_builtin_canary_task( + capsys: pytest.CaptureFixture[str], +) -> None: + namespace = TaskNamespace( + name="tests", + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + retry=None, + ) + + task = namespace.get(CANARY_TASK_NAME) + + assert isinstance(task, Task) + assert task.name == CANARY_TASK_NAME + with ( + patch("taskbroker_client.canary.logger") as mock_logger, + patch("taskbroker_client.canary.time.sleep") as mock_sleep, + ): + task() + + mock_logger.info.assert_called_once_with("Running canary task...") + mock_sleep.assert_called_once_with(0.1) + assert capsys.readouterr().out == "Done running canary task!\n" + + def test_namespace_register_inherits_default_retry() -> None: namespace = TaskNamespace( name="tests", @@ -357,6 +385,21 @@ def simple_task() -> None: registry.get_task(ns.name, "nope") +def test_external_namespace_registers_builtin_canary_task() -> None: + registry = TaskRegistry( + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + ) + namespace = registry.create_external_namespace(name="tests", application="other") + + task = namespace.get(CANARY_TASK_NAME) + + assert isinstance(task, ExternalTask) + assert task.name == CANARY_TASK_NAME + + def test_registry_create_namespace_simple() -> None: registry = TaskRegistry( application="acme", diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index bffc2154..3119c60d 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -35,6 +35,7 @@ ) from sentry_sdk.crons import MonitorStatus +from taskbroker_client.canary import CANARY_TASK_NAME from taskbroker_client.constants import CompressionType from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import current_task @@ -61,6 +62,18 @@ ), ) +CANARY_TASK = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="canary", + taskname=CANARY_TASK_NAME, + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + ), +) + RETRY_TASK = InflightTaskActivation( host="localhost:50051", receive_timestamp=0, @@ -824,6 +837,34 @@ def test_child_process_complete(mock_capture_checkin: mock.MagicMock) -> None: assert mock_capture_checkin.call_count == 0 +def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(CANARY_TASK) + with ( + mock.patch("taskbroker_client.canary.logger") as mock_logger, + mock.patch("taskbroker_client.canary.time.sleep") as mock_sleep, + ): + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.task_id == CANARY_TASK.activation.id + assert result.status == TASK_ACTIVATION_STATUS_COMPLETE + mock_logger.info.assert_called_once_with("Running canary task...") + mock_sleep.assert_called_once_with(0.1) + assert capsys.readouterr().out == "Done running canary task!\n" + + def test_child_process_remove_start_time_kwargs() -> None: activation = InflightTaskActivation( host="localhost:50051", From 3cfe3f246f608045b4ca599da9666fac5003fcda Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 06:46:49 -0700 Subject: [PATCH 14/23] Clarify that Seconds Increases, Doesn't Replace Default Processing Deadline --- clients/python/src/taskbroker_client/task.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index c5134f2c..6b09d249 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -201,8 +201,8 @@ def apply_async_testing( Argument `sizes` contains minimum activation sizes in bytes, parsed from repeated `--activation-size` values. - Argument `durations` contains execution deadlines in seconds. - If multiple durations are provided, one is randomly selected per activation. + Argument `durations` contains execution deadlines in seconds that are added to the default processing deadline. + If multiple durations are provided, one is randomly selected per activation and added to the default processing deadline. Normal `delay` and `apply_async` calls do not use this path. """ @@ -363,8 +363,8 @@ def create_testing_activation( With multiple sizes, one target size is randomly selected per activation. If the activation size is already ≥ the target, it will be unchanged. - With one duration, every activation uses that processing deadline. - With multiple durations, one processing deadline is randomly selected per activation. + With one duration, every activation adds it to the default processing deadline. + With multiple durations, one is randomly selected per activation. """ activation = self.create_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown From 4787e4958ce9ff5769ae457adc86e4aaca80f956 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 06:48:18 -0700 Subject: [PATCH 15/23] Update Batch Size Equals Concurrency --- clients/python/src/taskbroker_client/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 7798dbde..44b12022 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -901,7 +901,7 @@ def result_thread() -> None: break else: results.append(result) - if len(results) >= self._result_queue_maxsize: + if len(results) >= self._concurrency: executor.submit(self.send_results, results, False) results = [] except queue.Empty: From 9b1cf122deeee07530529c4b44d64adfc042548c Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 06:52:42 -0700 Subject: [PATCH 16/23] Fix Worker Tests --- clients/python/tests/worker/test_worker.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index c1f312bb..47e69aaf 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -497,23 +497,18 @@ def test_push_task_queue(self) -> None: def test_result_thread_sends_full_batch(self) -> None: capture = _SendResultCapture() - result_queue_maxsize = 5 - pool = _make_result_thread_pool( - capture, - concurrency=2, - result_queue_maxsize=result_queue_maxsize, - update_in_batches=True, - ) + concurrency = 3 + pool = _make_result_thread_pool(capture, concurrency=concurrency, update_in_batches=True) try: pool.start_result_thread() - for i in range(result_queue_maxsize): + for i in range(concurrency): pool.put_result(_make_processing_result(str(i))) capture.wait_for_calls(1) batch, is_draining = capture.send_calls[0] - self.assertEqual(len(batch), result_queue_maxsize) - self.assertEqual({result.task_id for result in batch}, {"0", "1", "2", "3", "4"}) + self.assertEqual(len(batch), concurrency) + self.assertEqual({result.task_id for result in batch}, {"0", "1", "2"}) self.assertFalse(is_draining) finally: pool.shutdown() From 8136a48447e9405c4a50a3d1f93e362167baa342 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 06:58:08 -0700 Subject: [PATCH 17/23] Fix Canary Task Tests --- clients/python/src/taskbroker_client/canary.py | 4 ++-- clients/python/tests/test_registry.py | 6 +----- clients/python/tests/worker/test_worker.py | 7 ++----- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/clients/python/src/taskbroker_client/canary.py b/clients/python/src/taskbroker_client/canary.py index ac6e1a4f..86fb7683 100644 --- a/clients/python/src/taskbroker_client/canary.py +++ b/clients/python/src/taskbroker_client/canary.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -import time +from time import sleep CANARY_TASK_NAME = "canary_task" CANARY_TASK_SLEEP_SECONDS = 0.1 @@ -11,5 +11,5 @@ def canary_task() -> None: logger.info("Running canary task...") - time.sleep(CANARY_TASK_SLEEP_SECONDS) + sleep(CANARY_TASK_SLEEP_SECONDS) print("Done running canary task!") diff --git a/clients/python/tests/test_registry.py b/clients/python/tests/test_registry.py index c25400e7..0fc159c0 100644 --- a/clients/python/tests/test_registry.py +++ b/clients/python/tests/test_registry.py @@ -59,14 +59,10 @@ def test_namespace_registers_builtin_canary_task( assert isinstance(task, Task) assert task.name == CANARY_TASK_NAME - with ( - patch("taskbroker_client.canary.logger") as mock_logger, - patch("taskbroker_client.canary.time.sleep") as mock_sleep, - ): + with patch("taskbroker_client.canary.logger") as mock_logger: task() mock_logger.info.assert_called_once_with("Running canary task...") - mock_sleep.assert_called_once_with(0.1) assert capsys.readouterr().out == "Done running canary task!\n" diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 47e69aaf..ff04664c 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -915,10 +915,7 @@ def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None: shutdown = Event() todo.put(CANARY_TASK) - with ( - mock.patch("taskbroker_client.canary.logger") as mock_logger, - mock.patch("taskbroker_client.canary.time.sleep") as mock_sleep, - ): + with mock.patch("taskbroker_client.canary.logger") as mock_logger: child_process( "examples.app:app", todo, @@ -935,7 +932,7 @@ def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None: assert result.task_id == CANARY_TASK.activation.id assert result.status == TASK_ACTIVATION_STATUS_COMPLETE mock_logger.info.assert_called_once_with("Running canary task...") - mock_sleep.assert_called_once_with(0.1) + time.sleep(1) assert capsys.readouterr().out == "Done running canary task!\n" From 91963ffb8fc2421d3a41b8e99d90dff16a5f97e4 Mon Sep 17 00:00:00 2001 From: George Berdovskiy Date: Tue, 30 Jun 2026 09:47:01 -0700 Subject: [PATCH 18/23] Prefix Canary Task to Avoid Name Collisions Co-authored-by: Mark Story --- clients/python/src/taskbroker_client/canary.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/canary.py b/clients/python/src/taskbroker_client/canary.py index 86fb7683..0a3fb0d3 100644 --- a/clients/python/src/taskbroker_client/canary.py +++ b/clients/python/src/taskbroker_client/canary.py @@ -3,7 +3,7 @@ import logging from time import sleep -CANARY_TASK_NAME = "canary_task" +CANARY_TASK_NAME = "_taskbroker_client.canary_task" CANARY_TASK_SLEEP_SECONDS = 0.1 logger = logging.getLogger(__name__) From 469269dc3997dbbc820cac928ad8d1eceb5c4ebc Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 14:49:20 -0700 Subject: [PATCH 19/23] Remove Unused `_result_queue_maxsize` Field --- clients/python/src/taskbroker_client/worker/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index a6b13986..7f89d5d1 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -835,7 +835,6 @@ def __init__( else: raise ValueError("Minimum concurrency must be strictly below concurrency") - self._result_queue_maxsize = result_queue_maxsize self._processing_pool_name = processing_pool_name or "unknown" self._pod_name = pod_name or "unknown" self._update_in_batches = update_in_batches From 266a5f12fda02b0a117d100b82263d1333109804 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 15:08:26 -0700 Subject: [PATCH 20/23] Basic Seconds / Bytes Validation --- clients/python/src/taskbroker_client/task.py | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 6b09d249..35133483 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -199,10 +199,8 @@ def apply_async_testing( """ Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. - Argument `sizes` contains minimum activation sizes in bytes, parsed from repeated `--activation-size` values. - - Argument `durations` contains execution deadlines in seconds that are added to the default processing deadline. - If multiple durations are provided, one is randomly selected per activation and added to the default processing deadline. + If `bytes` is provided, the activation is padded to be at least that size. + If `seconds` is provided, it is added to the default processing deadline. Normal `delay` and `apply_async` calls do not use this path. """ @@ -359,21 +357,21 @@ def create_testing_activation( """ Build a `TaskActivation` with optional size padding for testing purposes. - With one activation size, every activation is padded to at least that size. - With multiple sizes, one target size is randomly selected per activation. - If the activation size is already ≥ the target, it will be unchanged. - - With one duration, every activation adds it to the default processing deadline. - With multiple durations, one is randomly selected per activation. + If `bytes` is provided, the activation is padded to be at least that size. + If `seconds` is provided, it is added to the default processing deadline. """ activation = self.create_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) - if seconds: + if seconds and seconds > 0: activation.processing_deadline_duration = DEFAULT_PROCESSING_DEADLINE + ceil(seconds) + else: + raise ValueError( + f"Test activation seconds is {seconds}, which is NOT greater than zero" + ) - if bytes: + if bytes and bytes > 0: padding_key = "taskbroker-testing-activation-padding" while activation.ByteSize() < bytes: @@ -382,6 +380,8 @@ def create_testing_activation( activation.headers[padding_key] = ( activation.headers.get(padding_key, "") + "x" * missing_bytes ) + else: + raise ValueError(f"Test activation bytes is {bytes}, which is NOT greater than zero") return activation From b9e910e6997e117b58d1e582a65b0ad85838bda6 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 16:27:14 -0700 Subject: [PATCH 21/23] Add `internal` Namespace w/`canary_task` to Every App --- clients/python/src/taskbroker_client/app.py | 8 +++- .../python/src/taskbroker_client/canary.py | 5 +-- .../python/src/taskbroker_client/constants.py | 5 +++ .../python/src/taskbroker_client/registry.py | 18 -------- clients/python/tests/test_app.py | 20 +++++++++ clients/python/tests/test_registry.py | 43 +------------------ clients/python/tests/worker/test_worker.py | 5 +-- 7 files changed, 38 insertions(+), 66 deletions(-) diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 39374c9f..58dffdb6 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -5,7 +5,8 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation -from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE +from taskbroker_client.canary import CANARY_TASK_NAME, canary_task +from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, INTERNAL_NAMESPACE from taskbroker_client.imports import import_string from taskbroker_client.metrics import MetricsBackend from taskbroker_client.registry import ExternalNamespace, TaskNamespace, TaskRegistry @@ -45,6 +46,7 @@ def __init__( metrics=self.metrics, context_hooks=self.context_hooks, ) + self._register_internal_tasks() self.at_most_once_store(at_most_once_store) def _build_router(self, router_name: str | TaskRouter) -> TaskRouter: @@ -63,6 +65,10 @@ def _build_metrics(self, backend_name: str | MetricsBackend) -> MetricsBackend: return metrics_class() return backend_name + def _register_internal_tasks(self) -> None: + namespace = self._taskregistry.create_namespace(name=INTERNAL_NAMESPACE) + namespace.register(name=CANARY_TASK_NAME)(canary_task) + @property def taskregistry(self) -> TaskRegistry: """Get the TaskRegistry instance from this app""" diff --git a/clients/python/src/taskbroker_client/canary.py b/clients/python/src/taskbroker_client/canary.py index 0a3fb0d3..c2d1c536 100644 --- a/clients/python/src/taskbroker_client/canary.py +++ b/clients/python/src/taskbroker_client/canary.py @@ -3,13 +3,12 @@ import logging from time import sleep -CANARY_TASK_NAME = "_taskbroker_client.canary_task" -CANARY_TASK_SLEEP_SECONDS = 0.1 +CANARY_TASK_NAME = "canary_task" logger = logging.getLogger(__name__) def canary_task() -> None: logger.info("Running canary task...") - sleep(CANARY_TASK_SLEEP_SECONDS) + sleep(0.1) print("Done running canary task!") diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index ad157364..dd4b4234 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -1,5 +1,10 @@ from enum import Enum +INTERNAL_NAMESPACE = "internal" +""" +Namespace reserved for taskbroker client internal tasks. +""" + DEFAULT_PROCESSING_DEADLINE = 10 """ The fallback/default processing_deadline that tasks diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index f0b542a3..93aee24d 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -12,7 +12,6 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA -from taskbroker_client.canary import CANARY_TASK_NAME, canary_task from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType from taskbroker_client.metrics import MetricsBackend from taskbroker_client.retry import Retry @@ -57,15 +56,6 @@ def __init__( self._producers: dict[str, ProducerProtocol] = {} self._producer_factory = producer_factory self.metrics = metrics - self._register_builtin_tasks() - - def _register_builtin_tasks(self) -> None: - self._registered_tasks[CANARY_TASK_NAME] = Task( - name=CANARY_TASK_NAME, - func=canary_task, - namespace=self, - processing_deadline_duration=self.default_processing_deadline_duration, - ) def get(self, name: str) -> Task[Any, Any]: """ @@ -233,14 +223,6 @@ class ExternalNamespace(TaskNamespace): to Kafka, not called locally. """ - def _register_builtin_tasks(self) -> None: - self._registered_tasks[CANARY_TASK_NAME] = ExternalTask( - name=CANARY_TASK_NAME, - func=canary_task, - namespace=self, - processing_deadline_duration=self.default_processing_deadline_duration, - ) - @property def topic(self) -> str: return self.router.route_namespace(f"{self.application}:{self.name}") diff --git a/clients/python/tests/test_app.py b/clients/python/tests/test_app.py index 91eafe6c..cc14d264 100644 --- a/clients/python/tests/test_app.py +++ b/clients/python/tests/test_app.py @@ -1,8 +1,12 @@ +from unittest.mock import patch + import msgpack import pytest from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.canary import CANARY_TASK_NAME +from taskbroker_client.constants import INTERNAL_NAMESPACE from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import Task @@ -38,6 +42,22 @@ def test_set_config() -> None: assert "ignored" not in app.config +def test_registers_internal_canary_task(capsys: pytest.CaptureFixture[str]) -> None: + app = TaskbrokerApp(name="acme", producer_factory=producer_factory) + + namespace = app.get_namespace(INTERNAL_NAMESPACE) + task = app.get_task(INTERNAL_NAMESPACE, CANARY_TASK_NAME) + + assert namespace.name == INTERNAL_NAMESPACE + assert isinstance(task, Task) + assert task.name == CANARY_TASK_NAME + with patch("taskbroker_client.canary.logger") as mock_logger: + task() + + mock_logger.info.assert_called_once_with("Running canary task...") + assert capsys.readouterr().out == "Done running canary task!\n" + + def test_should_attempt_at_most_once() -> None: activation = TaskActivation( id="111", diff --git a/clients/python/tests/test_registry.py b/clients/python/tests/test_registry.py index 0fc159c0..ab21e7ec 100644 --- a/clients/python/tests/test_registry.py +++ b/clients/python/tests/test_registry.py @@ -1,5 +1,5 @@ from concurrent.futures import Future -from unittest.mock import Mock, patch +from unittest.mock import Mock import msgpack import pytest @@ -9,13 +9,12 @@ ON_ATTEMPTS_EXCEEDED_DISCARD, ) -from taskbroker_client.canary import CANARY_TASK_NAME from taskbroker_client.constants import MAX_PARAMETER_BYTES_BEFORE_COMPRESSION, CompressionType from taskbroker_client.metrics import NoOpMetricsBackend from taskbroker_client.registry import TaskNamespace, TaskRegistry from taskbroker_client.retry import LastAction, Retry from taskbroker_client.router import DefaultRouter -from taskbroker_client.task import ExternalTask, Task +from taskbroker_client.task import Task from .conftest import producer_factory @@ -43,29 +42,6 @@ def simple_task() -> None: assert task.name == "tests.simple_task" -def test_namespace_registers_builtin_canary_task( - capsys: pytest.CaptureFixture[str], -) -> None: - namespace = TaskNamespace( - name="tests", - application="acme", - producer_factory=producer_factory, - router=DefaultRouter(), - metrics=NoOpMetricsBackend(), - retry=None, - ) - - task = namespace.get(CANARY_TASK_NAME) - - assert isinstance(task, Task) - assert task.name == CANARY_TASK_NAME - with patch("taskbroker_client.canary.logger") as mock_logger: - task() - - mock_logger.info.assert_called_once_with("Running canary task...") - assert capsys.readouterr().out == "Done running canary task!\n" - - def test_namespace_register_inherits_default_retry() -> None: namespace = TaskNamespace( name="tests", @@ -375,21 +351,6 @@ def simple_task() -> None: registry.get_task(ns.name, "nope") -def test_external_namespace_registers_builtin_canary_task() -> None: - registry = TaskRegistry( - application="acme", - producer_factory=producer_factory, - router=DefaultRouter(), - metrics=NoOpMetricsBackend(), - ) - namespace = registry.create_external_namespace(name="tests", application="other") - - task = namespace.get(CANARY_TASK_NAME) - - assert isinstance(task, ExternalTask) - assert task.name == CANARY_TASK_NAME - - def test_registry_create_namespace_simple() -> None: registry = TaskRegistry( application="acme", diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 4b16ae35..1b94fa92 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -36,7 +36,7 @@ from sentry_sdk.crons import MonitorStatus from taskbroker_client.canary import CANARY_TASK_NAME -from taskbroker_client.constants import CompressionType +from taskbroker_client.constants import INTERNAL_NAMESPACE, CompressionType from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import current_task from taskbroker_client.types import InflightTaskActivation, ProcessingResult @@ -69,7 +69,7 @@ activation=TaskActivation( id="canary", taskname=CANARY_TASK_NAME, - namespace="examples", + namespace=INTERNAL_NAMESPACE, parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), processing_deadline_duration=2, ), @@ -1156,7 +1156,6 @@ def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None: assert result.task_id == CANARY_TASK.activation.id assert result.status == TASK_ACTIVATION_STATUS_COMPLETE mock_logger.info.assert_called_once_with("Running canary task...") - time.sleep(1) assert capsys.readouterr().out == "Done running canary task!\n" From f3513aed506d33d041e1e8ea979ad39fe56c7d39 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 16:38:49 -0700 Subject: [PATCH 22/23] Fix Bad Validation --- clients/python/src/taskbroker_client/task.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 35133483..651e7d45 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -364,14 +364,20 @@ def create_testing_activation( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) - if seconds and seconds > 0: + if seconds: + if seconds < 0: + raise ValueError( + f"Test activation seconds is {seconds}, which is NOT greater than zero" + ) + activation.processing_deadline_duration = DEFAULT_PROCESSING_DEADLINE + ceil(seconds) - else: - raise ValueError( - f"Test activation seconds is {seconds}, which is NOT greater than zero" - ) - if bytes and bytes > 0: + if bytes: + if bytes < 0: + raise ValueError( + f"Test activation bytes is {bytes}, which is NOT greater than zero" + ) + padding_key = "taskbroker-testing-activation-padding" while activation.ByteSize() < bytes: @@ -380,8 +386,6 @@ def create_testing_activation( activation.headers[padding_key] = ( activation.headers.get(padding_key, "") + "x" * missing_bytes ) - else: - raise ValueError(f"Test activation bytes is {bytes}, which is NOT greater than zero") return activation From df12e16eb55fbd9ec852444e39125b9fd76f7fbd Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Jun 2026 17:20:22 -0700 Subject: [PATCH 23/23] Prevent Users from Creating `internal` Namespace --- clients/python/src/taskbroker_client/app.py | 6 +++++- clients/python/src/taskbroker_client/registry.py | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 58dffdb6..6517a8f0 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -66,7 +66,11 @@ def _build_metrics(self, backend_name: str | MetricsBackend) -> MetricsBackend: return backend_name def _register_internal_tasks(self) -> None: - namespace = self._taskregistry.create_namespace(name=INTERNAL_NAMESPACE) + namespace = self._taskregistry.create_namespace( + name=INTERNAL_NAMESPACE, + internal=True, + ) + namespace.register(name=CANARY_TASK_NAME)(canary_task) @property diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 93aee24d..8e8da7ce 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -12,7 +12,11 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA -from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType +from taskbroker_client.constants import ( + DEFAULT_PROCESSING_DEADLINE, + INTERNAL_NAMESPACE, + CompressionType, +) from taskbroker_client.metrics import MetricsBackend from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter @@ -345,6 +349,7 @@ def create_namespace( expires: int | datetime.timedelta | None = None, processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, app_feature: str | None = None, + internal: bool = False, ) -> TaskNamespace: """ Create a task namespace. @@ -354,6 +359,8 @@ def create_namespace( Namespaces can define default behavior for tasks defined within a namespace. """ + if name == INTERNAL_NAMESPACE and not internal: + raise ValueError(f"{INTERNAL_NAMESPACE!r} is reserved for internal taskbroker tasks.") if name in self._namespaces: raise ValueError(f"Task namespace with name {name} already exists.") namespace = TaskNamespace(