Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
64c2b64
Allow Specifying Minimum Sizes for Testing Activations
george-sentry Jun 12, 2026
91d294a
Change Activation Size Arg. from KB to Bytes
george-sentry Jun 14, 2026
5d5f3f9
Merge branch 'main' of https://github.com/getsentry/taskbroker into g…
george-sentry Jun 18, 2026
61c86bf
Use Result Queue Size for Batches
george-sentry Jun 19, 2026
2115370
Log Activations for Tracing
george-sentry Jun 23, 2026
f13cc2d
Fix Import Issue
george-sentry Jun 23, 2026
4198a1c
Improve Logging
george-sentry Jun 23, 2026
2ec9369
Fix Enqueued Log
george-sentry Jun 23, 2026
4fc8da7
Add Processing Deadline Duration Override
george-sentry Jun 25, 2026
aa2c430
Make Durations a List
george-sentry Jun 25, 2026
e3fd3e4
Don't Randomly Select Bytes / Seconds
george-sentry Jun 25, 2026
8fbcf3d
Fix Subsecond Duration
george-sentry Jun 25, 2026
e2d907e
Remove Structured Logging Junk
george-sentry Jun 30, 2026
8671882
Register Canary Task w/All Namespaces
george-sentry Jun 30, 2026
dea4cb1
Merge branch 'main' of https://github.com/getsentry/taskbroker into g…
george-sentry Jun 30, 2026
3cfe3f2
Clarify that Seconds Increases, Doesn't Replace Default Processing De…
george-sentry Jun 30, 2026
4787e49
Update Batch Size Equals Concurrency
george-sentry Jun 30, 2026
9b1cf12
Fix Worker Tests
george-sentry Jun 30, 2026
8136a48
Fix Canary Task Tests
george-sentry Jun 30, 2026
91963ff
Prefix Canary Task to Avoid Name Collisions
george-sentry Jun 30, 2026
15d3866
Merge branch 'main' of https://github.com/getsentry/taskbroker into g…
george-sentry Jun 30, 2026
b1a2ce0
Merge branch 'george/push-taskbroker/testing-activations-custom-sizes…
george-sentry Jun 30, 2026
469269d
Remove Unused `_result_queue_maxsize` Field
george-sentry Jun 30, 2026
266a5f1
Basic Seconds / Bytes Validation
george-sentry Jun 30, 2026
08b5b03
Merge branch 'main' of https://github.com/getsentry/taskbroker into g…
george-sentry Jun 30, 2026
b9e910e
Add `internal` Namespace w/`canary_task` to Every App
george-sentry Jun 30, 2026
f3513ae
Fix Bad Validation
george-sentry Jun 30, 2026
df12e16
Prevent Users from Creating `internal` Namespace
george-sentry Jul 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion clients/python/src/taskbroker_client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation

from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE
from taskbroker_client.canary import CANARY_TASK_NAME, canary_task
from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, INTERNAL_NAMESPACE
from taskbroker_client.imports import import_string
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.registry import ExternalNamespace, TaskNamespace, TaskRegistry
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(
metrics=self.metrics,
context_hooks=self.context_hooks,
)
self._register_internal_tasks()
self.at_most_once_store(at_most_once_store)

def _build_router(self, router_name: str | TaskRouter) -> TaskRouter:
Expand All @@ -63,6 +65,14 @@ def _build_metrics(self, backend_name: str | MetricsBackend) -> MetricsBackend:
return metrics_class()
return backend_name

def _register_internal_tasks(self) -> None:
namespace = self._taskregistry.create_namespace(
name=INTERNAL_NAMESPACE,
internal=True,
)

namespace.register(name=CANARY_TASK_NAME)(canary_task)
Comment thread
sentry[bot] marked this conversation as resolved.

@property
def taskregistry(self) -> TaskRegistry:
"""Get the TaskRegistry instance from this app"""
Expand Down
14 changes: 14 additions & 0 deletions clients/python/src/taskbroker_client/canary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

import logging
from time import sleep

CANARY_TASK_NAME = "canary_task"

logger = logging.getLogger(__name__)


def canary_task() -> None:
logger.info("Running canary task...")
sleep(0.1)
print("Done running canary task!")
5 changes: 5 additions & 0 deletions clients/python/src/taskbroker_client/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from enum import Enum

INTERNAL_NAMESPACE = "internal"
"""
Namespace reserved for taskbroker client internal tasks.
"""

DEFAULT_PROCESSING_DEADLINE = 10
"""
The fallback/default processing_deadline that tasks
Expand Down
9 changes: 8 additions & 1 deletion clients/python/src/taskbroker_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation
from sentry_sdk.consts import OP, SPANDATA

from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType
from taskbroker_client.constants import (
DEFAULT_PROCESSING_DEADLINE,
INTERNAL_NAMESPACE,
CompressionType,
)
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
Expand Down Expand Up @@ -345,6 +349,7 @@ def create_namespace(
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
app_feature: str | None = None,
internal: bool = False,
) -> TaskNamespace:
"""
Create a task namespace.
Expand All @@ -354,6 +359,8 @@ def create_namespace(

Namespaces can define default behavior for tasks defined within a namespace.
"""
if name == INTERNAL_NAMESPACE and not internal:
raise ValueError(f"{INTERNAL_NAMESPACE!r} is reserved for internal taskbroker tasks.")
if name in self._namespaces:
raise ValueError(f"Task namespace with name {name} already exists.")
namespace = TaskNamespace(
Expand Down
89 changes: 89 additions & 0 deletions clients/python/src/taskbroker_client/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from collections.abc import Callable, Collection, Mapping, MutableMapping
from functools import update_wrapper
from math import ceil
from typing import TYPE_CHECKING, Any, Generic, ParamSpec, TypeVar, get_origin
from uuid import uuid4

Expand Down Expand Up @@ -184,6 +185,49 @@ def apply_async(
wait_for_delivery=self.wait_for_delivery,
)

def apply_async_testing(
self,
args: Any | None = None,
kwargs: Any | None = None,
headers: MutableMapping[str, Any] | None = None,
expires: int | datetime.timedelta | None = None,
countdown: int | datetime.timedelta | None = None,
bytes: int | None = None,
seconds: float | None = None,
**options: Any,
) -> None:
"""
Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry.

If `bytes` is provided, the activation is padded to be at least that size.
If `seconds` is provided, it is added to the default processing deadline.

Normal `delay` and `apply_async` calls do not use this path.
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}

self._signal_send(task=self, args=args, kwargs=kwargs)

activation = self.create_testing_activation(
args=args,
kwargs=kwargs,
headers=headers,
expires=expires,
countdown=countdown,
bytes=bytes,
seconds=seconds,
)
if ALWAYS_EAGER:
self._call_func(*args, **kwargs)
else:
self._namespace.send_task(
activation,
wait_for_delivery=self.wait_for_delivery,
)

def _call_func(self, *args: Any, **kwargs: Any) -> None:
# Overridden in ExternalTask
if self.pass_headers:
Expand Down Expand Up @@ -300,6 +344,51 @@ def create_activation(
delay=countdown,
)

def create_testing_activation(
self,
args: Collection[Any],
kwargs: Mapping[Any, Any],
headers: MutableMapping[str, Any] | None = None,
expires: int | datetime.timedelta | None = None,
countdown: int | datetime.timedelta | None = None,
bytes: int | None = None,
seconds: float | None = None,
) -> TaskActivation:
"""
Build a `TaskActivation` with optional size padding for testing purposes.

If `bytes` is provided, the activation is padded to be at least that size.
If `seconds` is provided, it is added to the default processing deadline.
"""
activation = self.create_activation(
args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown
)

if seconds:
if seconds < 0:
raise ValueError(
f"Test activation seconds is {seconds}, which is NOT greater than zero"
)

activation.processing_deadline_duration = DEFAULT_PROCESSING_DEADLINE + ceil(seconds)
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
sentry[bot] marked this conversation as resolved.

if bytes:
if bytes < 0:
raise ValueError(
f"Test activation bytes is {bytes}, which is NOT greater than zero"
)

padding_key = "taskbroker-testing-activation-padding"

while activation.ByteSize() < bytes:
missing_bytes = bytes - activation.ByteSize()

activation.headers[padding_key] = (
activation.headers.get(padding_key, "") + "x" * missing_bytes
)

return activation

def _create_retry_state(self) -> RetryState:
retry = self.retry or self._namespace.default_retry or None
if not retry or self.at_most_once:
Expand Down
20 changes: 20 additions & 0 deletions clients/python/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from unittest.mock import patch

import msgpack
import pytest
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation

from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.canary import CANARY_TASK_NAME
from taskbroker_client.constants import INTERNAL_NAMESPACE
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import Task
Expand Down Expand Up @@ -38,6 +42,22 @@ def test_set_config() -> None:
assert "ignored" not in app.config


def test_registers_internal_canary_task(capsys: pytest.CaptureFixture[str]) -> None:
app = TaskbrokerApp(name="acme", producer_factory=producer_factory)

namespace = app.get_namespace(INTERNAL_NAMESPACE)
task = app.get_task(INTERNAL_NAMESPACE, CANARY_TASK_NAME)

assert namespace.name == INTERNAL_NAMESPACE
assert isinstance(task, Task)
assert task.name == CANARY_TASK_NAME
with patch("taskbroker_client.canary.logger") as mock_logger:
task()

mock_logger.info.assert_called_once_with("Running canary task...")
assert capsys.readouterr().out == "Done running canary task!\n"


def test_should_attempt_at_most_once() -> None:
activation = TaskActivation(
id="111",
Expand Down
43 changes: 42 additions & 1 deletion clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
)
from sentry_sdk.crons import MonitorStatus

from taskbroker_client.constants import CompressionType
from taskbroker_client.canary import CANARY_TASK_NAME
from taskbroker_client.constants import INTERNAL_NAMESPACE, CompressionType
from taskbroker_client.retry import NoRetriesRemainingError
from taskbroker_client.state import current_task
from taskbroker_client.types import InflightTaskActivation, ProcessingResult
Expand All @@ -62,6 +63,18 @@
),
)

CANARY_TASK = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="canary",
taskname=CANARY_TASK_NAME,
namespace=INTERNAL_NAMESPACE,
parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True),
processing_deadline_duration=2,
),
)

RETRY_TASK = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
Expand Down Expand Up @@ -323,6 +336,7 @@ def _make_result_thread_pool(
capture: _SendResultCapture,
*,
concurrency: int = 3,
result_queue_maxsize: int = 3,
update_in_batches: bool,
) -> TaskWorkerProcessingPool:
return TaskWorkerProcessingPool(
Expand All @@ -331,6 +345,7 @@ def _make_result_thread_pool(
mp_context=get_context("fork"),
max_child_task_count=100,
concurrency=concurrency,
result_queue_maxsize=result_queue_maxsize,
processing_pool_name="test",
process_type="fork",
update_in_batches=update_in_batches,
Expand Down Expand Up @@ -1118,6 +1133,32 @@ def test_child_process_complete(mock_capture_checkin: mock.MagicMock) -> None:
assert mock_capture_checkin.call_count == 0


def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

todo.put(CANARY_TASK)
with mock.patch("taskbroker_client.canary.logger") as mock_logger:
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
skip_awaiting_futures=False,
future_checking_frequency=0.1,
)

result = processed.get()
assert result.task_id == CANARY_TASK.activation.id
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE
mock_logger.info.assert_called_once_with("Running canary task...")
assert capsys.readouterr().out == "Done running canary task!\n"


def test_child_process_emits_running_message() -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand Down
Loading