diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 39374c9f..6517a8f0 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -5,7 +5,8 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation -from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE +from taskbroker_client.canary import CANARY_TASK_NAME, canary_task +from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, INTERNAL_NAMESPACE from taskbroker_client.imports import import_string from taskbroker_client.metrics import MetricsBackend from taskbroker_client.registry import ExternalNamespace, TaskNamespace, TaskRegistry @@ -45,6 +46,7 @@ def __init__( metrics=self.metrics, context_hooks=self.context_hooks, ) + self._register_internal_tasks() self.at_most_once_store(at_most_once_store) def _build_router(self, router_name: str | TaskRouter) -> TaskRouter: @@ -63,6 +65,14 @@ def _build_metrics(self, backend_name: str | MetricsBackend) -> MetricsBackend: return metrics_class() return backend_name + def _register_internal_tasks(self) -> None: + namespace = self._taskregistry.create_namespace( + name=INTERNAL_NAMESPACE, + internal=True, + ) + + namespace.register(name=CANARY_TASK_NAME)(canary_task) + @property def taskregistry(self) -> TaskRegistry: """Get the TaskRegistry instance from this app""" diff --git a/clients/python/src/taskbroker_client/canary.py b/clients/python/src/taskbroker_client/canary.py new file mode 100644 index 00000000..c2d1c536 --- /dev/null +++ b/clients/python/src/taskbroker_client/canary.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import logging +from time import sleep + +CANARY_TASK_NAME = "canary_task" + +logger = logging.getLogger(__name__) + + +def canary_task() -> None: + logger.info("Running canary task...") + sleep(0.1) + print("Done running canary task!") diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index ad157364..dd4b4234 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -1,5 +1,10 @@ from enum import Enum +INTERNAL_NAMESPACE = "internal" +""" +Namespace reserved for taskbroker client internal tasks. +""" + DEFAULT_PROCESSING_DEADLINE = 10 """ The fallback/default processing_deadline that tasks diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 93aee24d..8e8da7ce 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -12,7 +12,11 @@ from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from sentry_sdk.consts import OP, SPANDATA -from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType +from taskbroker_client.constants import ( + DEFAULT_PROCESSING_DEADLINE, + INTERNAL_NAMESPACE, + CompressionType, +) from taskbroker_client.metrics import MetricsBackend from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter @@ -345,6 +349,7 @@ def create_namespace( expires: int | datetime.timedelta | None = None, processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, app_feature: str | None = None, + internal: bool = False, ) -> TaskNamespace: """ Create a task namespace. @@ -354,6 +359,8 @@ def create_namespace( Namespaces can define default behavior for tasks defined within a namespace. """ + if name == INTERNAL_NAMESPACE and not internal: + raise ValueError(f"{INTERNAL_NAMESPACE!r} is reserved for internal taskbroker tasks.") if name in self._namespaces: raise ValueError(f"Task namespace with name {name} already exists.") namespace = TaskNamespace( diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index da604436..651e7d45 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -5,6 +5,7 @@ import time from collections.abc import Callable, Collection, Mapping, MutableMapping from functools import update_wrapper +from math import ceil from typing import TYPE_CHECKING, Any, Generic, ParamSpec, TypeVar, get_origin from uuid import uuid4 @@ -184,6 +185,49 @@ def apply_async( wait_for_delivery=self.wait_for_delivery, ) + def apply_async_testing( + self, + args: Any | None = None, + kwargs: Any | None = None, + headers: MutableMapping[str, Any] | None = None, + expires: int | datetime.timedelta | None = None, + countdown: int | datetime.timedelta | None = None, + bytes: int | None = None, + seconds: float | None = None, + **options: Any, + ) -> None: + """ + Task dispatch for testing tools like the 'taskbroker-send-tasks' command in Sentry. + + If `bytes` is provided, the activation is padded to be at least that size. + If `seconds` is provided, it is added to the default processing deadline. + + Normal `delay` and `apply_async` calls do not use this path. + """ + if args is None: + args = [] + if kwargs is None: + kwargs = {} + + self._signal_send(task=self, args=args, kwargs=kwargs) + + activation = self.create_testing_activation( + args=args, + kwargs=kwargs, + headers=headers, + expires=expires, + countdown=countdown, + bytes=bytes, + seconds=seconds, + ) + if ALWAYS_EAGER: + self._call_func(*args, **kwargs) + else: + self._namespace.send_task( + activation, + wait_for_delivery=self.wait_for_delivery, + ) + def _call_func(self, *args: Any, **kwargs: Any) -> None: # Overridden in ExternalTask if self.pass_headers: @@ -300,6 +344,51 @@ def create_activation( delay=countdown, ) + def create_testing_activation( + self, + args: Collection[Any], + kwargs: Mapping[Any, Any], + headers: MutableMapping[str, Any] | None = None, + expires: int | datetime.timedelta | None = None, + countdown: int | datetime.timedelta | None = None, + bytes: int | None = None, + seconds: float | None = None, + ) -> TaskActivation: + """ + Build a `TaskActivation` with optional size padding for testing purposes. + + If `bytes` is provided, the activation is padded to be at least that size. + If `seconds` is provided, it is added to the default processing deadline. + """ + activation = self.create_activation( + args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown + ) + + if seconds: + if seconds < 0: + raise ValueError( + f"Test activation seconds is {seconds}, which is NOT greater than zero" + ) + + activation.processing_deadline_duration = DEFAULT_PROCESSING_DEADLINE + ceil(seconds) + + if bytes: + if bytes < 0: + raise ValueError( + f"Test activation bytes is {bytes}, which is NOT greater than zero" + ) + + padding_key = "taskbroker-testing-activation-padding" + + while activation.ByteSize() < bytes: + missing_bytes = bytes - activation.ByteSize() + + activation.headers[padding_key] = ( + activation.headers.get(padding_key, "") + "x" * missing_bytes + ) + + return activation + def _create_retry_state(self) -> RetryState: retry = self.retry or self._namespace.default_retry or None if not retry or self.at_most_once: diff --git a/clients/python/tests/test_app.py b/clients/python/tests/test_app.py index 91eafe6c..cc14d264 100644 --- a/clients/python/tests/test_app.py +++ b/clients/python/tests/test_app.py @@ -1,8 +1,12 @@ +from unittest.mock import patch + import msgpack import pytest from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.canary import CANARY_TASK_NAME +from taskbroker_client.constants import INTERNAL_NAMESPACE from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import Task @@ -38,6 +42,22 @@ def test_set_config() -> None: assert "ignored" not in app.config +def test_registers_internal_canary_task(capsys: pytest.CaptureFixture[str]) -> None: + app = TaskbrokerApp(name="acme", producer_factory=producer_factory) + + namespace = app.get_namespace(INTERNAL_NAMESPACE) + task = app.get_task(INTERNAL_NAMESPACE, CANARY_TASK_NAME) + + assert namespace.name == INTERNAL_NAMESPACE + assert isinstance(task, Task) + assert task.name == CANARY_TASK_NAME + with patch("taskbroker_client.canary.logger") as mock_logger: + task() + + mock_logger.info.assert_called_once_with("Running canary task...") + assert capsys.readouterr().out == "Done running canary task!\n" + + def test_should_attempt_at_most_once() -> None: activation = TaskActivation( id="111", diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index e3653905..1b94fa92 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -35,7 +35,8 @@ ) from sentry_sdk.crons import MonitorStatus -from taskbroker_client.constants import CompressionType +from taskbroker_client.canary import CANARY_TASK_NAME +from taskbroker_client.constants import INTERNAL_NAMESPACE, CompressionType from taskbroker_client.retry import NoRetriesRemainingError from taskbroker_client.state import current_task from taskbroker_client.types import InflightTaskActivation, ProcessingResult @@ -62,6 +63,18 @@ ), ) +CANARY_TASK = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="canary", + taskname=CANARY_TASK_NAME, + namespace=INTERNAL_NAMESPACE, + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + ), +) + RETRY_TASK = InflightTaskActivation( host="localhost:50051", receive_timestamp=0, @@ -323,6 +336,7 @@ def _make_result_thread_pool( capture: _SendResultCapture, *, concurrency: int = 3, + result_queue_maxsize: int = 3, update_in_batches: bool, ) -> TaskWorkerProcessingPool: return TaskWorkerProcessingPool( @@ -331,6 +345,7 @@ def _make_result_thread_pool( mp_context=get_context("fork"), max_child_task_count=100, concurrency=concurrency, + result_queue_maxsize=result_queue_maxsize, processing_pool_name="test", process_type="fork", update_in_batches=update_in_batches, @@ -1118,6 +1133,32 @@ def test_child_process_complete(mock_capture_checkin: mock.MagicMock) -> None: assert mock_capture_checkin.call_count == 0 +def test_child_process_canary_task(capsys: pytest.CaptureFixture[str]) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(CANARY_TASK) + with mock.patch("taskbroker_client.canary.logger") as mock_logger: + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + skip_awaiting_futures=False, + future_checking_frequency=0.1, + ) + + result = processed.get() + assert result.task_id == CANARY_TASK.activation.id + assert result.status == TASK_ACTIVATION_STATUS_COMPLETE + mock_logger.info.assert_called_once_with("Running canary task...") + assert capsys.readouterr().out == "Done running canary task!\n" + + def test_child_process_emits_running_message() -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue()