From f7a8532c7a7acbf5a6bd945ca56639104d3107da Mon Sep 17 00:00:00 2001 From: Guillaume De Saint Martin Date: Thu, 25 Jun 2026 19:21:25 +0200 Subject: [PATCH] [Node] fix process bot restart issues --- octobot/cli.py | 32 +- octobot/storage/process_bot_state_dumper.py | 6 +- .../entities/accounts/process_bot_state.py | 5 +- packages/flow/octobot_flow/environment.py | 13 + .../octobot_flow/logic/dsl/dsl_executor.py | 27 +- .../octobot_process_functional_shared.py | 109 +++ .../test_octobot_process_edit_config.py | 4 + .../test_octobot_process_start.py | 36 +- packages/node/octobot_node/constants.py | 3 + packages/node/octobot_node/protocol/debug.py | 77 +- .../octobot_node/protocol/util/__init__.py | 15 + .../protocol/util/privacy_filter.py | 151 ++++ .../scheduler/octobot_flow_client.py | 4 + .../node/octobot_node/scheduler/scheduler.py | 3 +- packages/node/tests/protocol/test_debug.py | 225 +----- .../protocol/util/test_privacy_filter.py | 136 ++++ .../octobot_process_ops.py | 362 ++++++++-- .../tests/test_octobot_process_ops.py | 675 +++++++++++++++++- 18 files changed, 1528 insertions(+), 355 deletions(-) create mode 100644 packages/node/octobot_node/protocol/util/__init__.py create mode 100644 packages/node/octobot_node/protocol/util/privacy_filter.py create mode 100644 packages/node/tests/protocol/util/test_privacy_filter.py diff --git a/octobot/cli.py b/octobot/cli.py index 88e829c55e..7023c4be76 100644 --- a/octobot/cli.py +++ b/octobot/cli.py @@ -25,7 +25,7 @@ try: import octobot_commons.os_util as os_util - import octobot_commons.logging as logging + import octobot_commons.logging import octobot_commons.configuration as configuration import octobot_commons.profiles as profiles import octobot_commons.authentication as authentication @@ -67,7 +67,7 @@ def update_config_with_args(starting_args, config: configuration.Configuration, try: import octobot_backtesting.constants as backtesting_constants except ImportError as e: - logging.get_logger().error( + octobot_commons.logging.get_logger().error( "Can't start backtesting without the octobot_backtesting package properly installed.") raise e @@ -325,25 +325,39 @@ def _load_or_create_tentacles(community_auth, config, logger): config.load_profiles_if_possible_and_necessary() +def _init_cli_overriden_folders(args): + overrides = {} + if args.user_folder: + overrides["user_folder"] = args.user_folder + _set_user_root_from_cli(args.user_folder) + if args.log_folder: + overrides["log_folder"] = args.log_folder + logs_folder = args.log_folder + else: + logs_folder = constants.LOGS_FOLDER + return overrides, logs_folder + + def start_octobot(args, default_config_file=None): logger = None try: if args.version: print(constants.LONG_VERSION) return - - user_folder = getattr(args, "user_folder", None) - if user_folder: - _set_user_root_from_cli(user_folder) - - # log folder: --log-folder overrides default (from LOGS_FOLDER env at import + default "logs") - logs_folder = getattr(args, "log_folder", None) or constants.LOGS_FOLDER + + overrides, logs_folder = _init_cli_overriden_folders(args) logger = octobot_logger.init_logger(logs_folder=logs_folder) startup_messages = [] # Version logger.info("Version : {0}".format(constants.LONG_VERSION)) + # Log folder overrides + if overrides: + logger.info(f"Overriding default folders: {overrides}") + else: + logger.info(f"Using default folders") + # Current running environment _log_environment(logger) diff --git a/octobot/storage/process_bot_state_dumper.py b/octobot/storage/process_bot_state_dumper.py index eba1502c0b..ff51f7a9f1 100644 --- a/octobot/storage/process_bot_state_dumper.py +++ b/octobot/storage/process_bot_state_dumper.py @@ -73,7 +73,11 @@ async def _write_state_file_async( ) -> None: now = time.time() state = process_bot_state_import.ProcessBotState( - metadata=process_bot_state_import.Metadata(updated_at=now, next_updated_at=now + interval), + metadata=process_bot_state_import.Metadata( + updated_at=now, + next_updated_at=now + interval, + pid=os.getpid(), + ), exchange_account_elements=_synced_exchange_account_elements_for_first_trading_exchange( bot, ), diff --git a/packages/flow/octobot_flow/entities/accounts/process_bot_state.py b/packages/flow/octobot_flow/entities/accounts/process_bot_state.py index 8128b1716a..0e72f669a0 100644 --- a/packages/flow/octobot_flow/entities/accounts/process_bot_state.py +++ b/packages/flow/octobot_flow/entities/accounts/process_bot_state.py @@ -10,11 +10,14 @@ @dataclasses.dataclass class Metadata(octobot_commons.dataclasses.MinimizableDataclass): """ - Timestamps written with process bot state dumps; used for file-based liveness checks. + Timestamps and child PID written with process bot state dumps. Liveness checks use + updated_at / next_updated_at only; pid is the authoritative child PID for parent binding + after restarts. """ updated_at: float = 0.0 next_updated_at: float = 0.0 + pid: int = 0 @dataclasses.dataclass diff --git a/packages/flow/octobot_flow/environment.py b/packages/flow/octobot_flow/environment.py index e32bacb11d..22a41028f9 100644 --- a/packages/flow/octobot_flow/environment.py +++ b/packages/flow/octobot_flow/environment.py @@ -1,8 +1,21 @@ +import typing + import octobot.constants # will load .env file and init constants import octobot_flow.repositories.community import octobot_trading.constants +_EXECUTOR_ID: typing.Optional[str] = None + + +def register_executor_id(executor_id: str) -> None: + global _EXECUTOR_ID + _EXECUTOR_ID = executor_id + + +def get_executor_id() -> typing.Optional[str]: + return _EXECUTOR_ID + def initialize_environment(allow_funds_transfer: bool = False) -> None: octobot_flow.repositories.community.initialize_community_authentication() diff --git a/packages/flow/octobot_flow/logic/dsl/dsl_executor.py b/packages/flow/octobot_flow/logic/dsl/dsl_executor.py index 3d9f20e135..f874f0bfb8 100644 --- a/packages/flow/octobot_flow/logic/dsl/dsl_executor.py +++ b/packages/flow/octobot_flow/logic/dsl/dsl_executor.py @@ -12,6 +12,7 @@ import octobot_evaluators.evaluators as evaluators import octobot_flow.entities +import octobot_flow.environment import octobot_flow.errors import octobot_flow.enums import octobot_flow.logic.dsl.action_error_util @@ -24,7 +25,6 @@ import tentacles.Meta.DSL_operators.octobot_process_operators.octobot_process_ops as octobot_process_ops - class DSLExecutor(AbstractActionExecutor): def __init__( self, @@ -32,13 +32,16 @@ def __init__( exchange_manager: typing.Optional[octobot_trading.exchanges.ExchangeManager], dsl_script: typing.Optional[str], dependencies: typing.Optional[octobot_commons.signals.SignalDependencies] = None, + executor_id: typing.Optional[str] = None, ): super().__init__() self._exchange_manager = exchange_manager self._dependencies = dependencies self._dependencies_config: dict = profile_data.to_profile("").config self._interpreter_signals: octobot_commons.dsl_interpreter.OperatorSignals = None # type: ignore (reset when interpreter is created) - self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(None) + self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter( + None, executor_id + ) if dsl_script: self._interpreter.prepare(dsl_script) @@ -49,7 +52,15 @@ def _get_matrix_id(self) -> typing.Optional[str]: def get_flow_operator_classes( self, + executor_id: typing.Optional[str] = None, ) -> list[typing.Type[octobot_commons.dsl_interpreter.Operator]]: + resolved_executor_id = ( + executor_id or octobot_flow.environment.get_executor_id() + ) + if not resolved_executor_id: + raise octobot_flow.errors.MissingDSLExecutorDependencyError( + "executor_id is required for run_octobot_process" + ) return ( octobot_commons.dsl_interpreter.get_all_operators() + dsl_operators.create_ohlcv_operators(self._exchange_manager, None, None) @@ -74,16 +85,19 @@ def get_flow_operator_classes( copier_trading_mode=None, ) + octobot_process_ops.create_octobot_process_operators( - self._interpreter_signals + self._interpreter_signals, + resolved_executor_id, ) ) # type: ignore (list[type[Operator]]) def _create_interpreter( - self, previous_execution_result: typing.Optional[dict] + self, + previous_execution_result: typing.Optional[dict], + executor_id: typing.Optional[str] = None, ) -> octobot_commons.dsl_interpreter.Interpreter: self._interpreter_signals = octobot_commons.dsl_interpreter.OperatorSignals() return octobot_commons.dsl_interpreter.Interpreter( - self.get_flow_operator_classes() + self.get_flow_operator_classes(executor_id) ) def get_dependencies(self) -> list[ @@ -110,7 +124,8 @@ async def execute_action( ] = None, ) -> octobot_commons.dsl_interpreter.DSLCallResult: self._interpreter = self._create_interpreter( - action.previous_execution_result + action.previous_execution_result, + None, ) expression = action.get_resolved_dsl_script() try: diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py index 4b309ac27a..40e1d6085d 100644 --- a/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py +++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py @@ -1,10 +1,18 @@ # Drakkar-Software OctoBot # Shared helpers/constants for octobot process functional tests (run_octobot_process, GridTradingMode). +import asyncio import copy import decimal +import json +import os +import pathlib +import time import typing +import octobot.constants as octobot_constants +import octobot_commons.configuration as configuration_module +import octobot_commons.constants as commons_constants import octobot_commons.dsl_interpreter as dsl_interpreter import octobot_trading.constants as trading_constants import octobot_trading.enums as trading_enums @@ -12,6 +20,7 @@ import octobot_flow.jobs import octobot_flow.entities +import octobot_flow.environment import octobot_flow.enums import tests.functionnal_tests as functionnal_tests import tests.functionnal_tests.tentacle_test_configs as tentacle_test_configs @@ -200,6 +209,100 @@ def _get_action_by_id( return None +_FERNET_ENCRYPTED_PREFIX = "gAAAAA" + + +# --- Child on-disk readiness (poll after init_state_ok; PID can be up before first dump / encrypt) --- + + +def _process_bot_state_path(inner: dict) -> str: + return os.path.normpath( + os.path.join( + inner["user_root"], + octobot_constants.PROCESS_BOT_STATE_FILE_NAME, + ) + ) + + +async def _wait_for_process_bot_state_file( + state_path: str, + *, + timeout_sec: float = GLOBAL_START_TIMEOUT_SEC, + poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC, +) -> None: + """Poll until the child has written at least one process_bot_state.json dump.""" + deadline = time.monotonic() + timeout_sec + while time.monotonic() < deadline: + if os.path.isfile(state_path): + return + await asyncio.sleep(poll_interval_sec) + pytest.fail( + f"Timed out waiting for process_bot_state.json at {state_path!r} within {timeout_sec}s" + ) + + +async def _assert_encrypted_exchange_credentials_in_user_config( + user_root: typing.Union[pathlib.Path, str], + exchange_internal_name: str, + expected_api_key: str, + expected_api_secret: str, + *, + timeout_sec: float = GLOBAL_START_TIMEOUT_SEC, + poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC, +) -> None: + """ + Poll child user-root config.json, then assert api key/secret are Fernet-encrypted and decrypt + to the expected plaintext. + + run_octobot_process seeds plaintext credentials before spawn; the child re-saves config.json + on startup. init_state_ok can become true from PID liveness before encryption completes. + """ + user_root_path = pathlib.Path(user_root) + config_path = user_root_path / commons_constants.CONFIG_FILE + deadline = time.monotonic() + timeout_sec + last_failure_reason = "config.json missing or exchange entry not ready" + while time.monotonic() < deadline: + if not config_path.is_file(): + await asyncio.sleep(poll_interval_sec) + continue + root_cfg = json.loads(config_path.read_text(encoding="utf-8")) + exchanges_cfg = root_cfg.get(commons_constants.CONFIG_EXCHANGES) or {} + exchange_cfg = exchanges_cfg.get(exchange_internal_name) + if not isinstance(exchange_cfg, dict): + last_failure_reason = f"exchange {exchange_internal_name!r} missing from config" + await asyncio.sleep(poll_interval_sec) + continue + stored_api_key = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_KEY) + stored_api_secret = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_SECRET) + if not ( + isinstance(stored_api_key, str) + and stored_api_key.startswith(_FERNET_ENCRYPTED_PREFIX) + and isinstance(stored_api_secret, str) + and stored_api_secret.startswith(_FERNET_ENCRYPTED_PREFIX) + ): + last_failure_reason = "api key/secret not yet Fernet-encrypted in config.json" + await asyncio.sleep(poll_interval_sec) + continue + try: + decrypted_api_key = configuration_module.decrypt(stored_api_key) + decrypted_api_secret = configuration_module.decrypt(stored_api_secret) + except Exception as decrypt_error: + last_failure_reason = f"decrypt failed: {decrypt_error}" + await asyncio.sleep(poll_interval_sec) + continue + assert decrypted_api_key == expected_api_key, ( + f"decrypted api key mismatch for {exchange_internal_name!r}" + ) + assert decrypted_api_secret == expected_api_secret, ( + f"decrypted api secret mismatch for {exchange_internal_name!r}" + ) + return + pytest.fail( + f"Timed out waiting for encrypted exchange credentials in {config_path!r} " + f"within {timeout_sec}s ({last_failure_reason})" + ) + + def _make_tracked_spawn_managed_with_forward_terminal_output( real_spawn_managed: typing.Callable[..., typing.Any], popen_calls: dict[str, int], @@ -213,6 +316,12 @@ def _tracked(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: return _tracked +@pytest.fixture(autouse=True) +def register_functional_executor_id(): + octobot_flow.environment.register_executor_id("func-test-executor") + yield + + @pytest.fixture def init_action(): # Automation apply_configuration: seed automation state to match expected exchange + portfolio. diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py index e69c73ff36..22b3147366 100644 --- a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py +++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py @@ -185,6 +185,10 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders( initial_spawn_count = popen_calls["count"] assert initial_spawn_count >= 1 + # First process_bot_state dump can lag init_state_ok (see shared wait helper). + state_path = octobot_process_functional_shared._process_bot_state_path(inner) + await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path) + # 3) Wait until at least four open ladder orders exist, then assert a 2×2 grid pattern. orders_deadline = time.monotonic() + octobot_process_functional_shared.GRID_ORDERS_TIMEOUT_SEC exchange_account_snapshot: typing.Optional[ diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py index 1f1a56be7a..87db26013c 100644 --- a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py +++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py @@ -12,7 +12,6 @@ import mock import octobot.constants as octobot_app_constants -import octobot_commons.configuration as configuration_module import octobot_commons.constants as common_constants import octobot_commons.dsl_interpreter as dsl_interpreter import octobot_commons.process_util as process_util @@ -154,13 +153,9 @@ async def test_run_octobot_process_lifecycle_grid_trading( assert popen_calls["count"] >= 1 # --- process_bot_state path: must exist before poll (child wrote at least one dump) --- - state_path = os.path.normpath( - os.path.join( - inner["user_root"], - octobot_app_constants.PROCESS_BOT_STATE_FILE_NAME, - ) - ) - assert os.path.isfile(state_path) + # First process_bot_state dump can lag init_state_ok (see shared wait helper). + state_path = octobot_process_functional_shared._process_bot_state_path(inner) + await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path) # 1) Poll AutomationJob + dump() until merge yields ≥4 open orders (EAE from automation snapshot, # not from parsing full process_bot_state on disk). @@ -438,15 +433,14 @@ async def test_run_octobot_process_lifecycle_default_config_no_profile_data( user_root = pathlib.Path(inner["user_root"]) assert inner.get("profile_id") == "non-trading" - root_cfg = json.loads((user_root / common_constants.CONFIG_FILE).read_text(encoding="utf-8")) - exchange_cfg = root_cfg[common_constants.CONFIG_EXCHANGES][ - octobot_process_functional_shared.EXCHANGE_BINANCEUS - ] exchange_auth_entry = exchange_auth[0] - stored_api_key = exchange_cfg[common_constants.CONFIG_EXCHANGE_KEY] - stored_api_secret = exchange_cfg[common_constants.CONFIG_EXCHANGE_SECRET] - assert configuration_module.decrypt(stored_api_key) == exchange_auth_entry["api_key"] - assert configuration_module.decrypt(stored_api_secret) == exchange_auth_entry["api_secret"] + # Child encrypts config.json after spawn; wait + assert in one helper (see shared module). + await octobot_process_functional_shared._assert_encrypted_exchange_credentials_in_user_config( + user_root, + octobot_process_functional_shared.EXCHANGE_BINANCEUS, + exchange_auth_entry["api_key"], + exchange_auth_entry["api_secret"], + ) profile_json_path = ( user_root / common_constants.PROFILES_FOLDER @@ -455,13 +449,9 @@ async def test_run_octobot_process_lifecycle_default_config_no_profile_data( ) assert profile_json_path.is_file() - state_path = os.path.normpath( - os.path.join( - inner["user_root"], - octobot_app_constants.PROCESS_BOT_STATE_FILE_NAME, - ) - ) - assert os.path.isfile(state_path) + # First process_bot_state dump can lag init_state_ok (see shared wait helper). + state_path = octobot_process_functional_shared._process_bot_state_path(inner) + await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path) with open(state_path, encoding="utf-8") as process_state_file: file_metadata_payload = json.load(process_state_file) process_metadata = process_bot_state_import.Metadata.from_dict( diff --git a/packages/node/octobot_node/constants.py b/packages/node/octobot_node/constants.py index f93305b4d8..793cfaa23c 100644 --- a/packages/node/octobot_node/constants.py +++ b/packages/node/octobot_node/constants.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public # License along with OctoBot. If not, see . import os +import uuid try: import octobot.constants as octobot_constants @@ -67,3 +68,5 @@ ) DEFAULT_PORTFOLIO_VALUATION_UNIT = "USDT" + +SCHEDULER_EXECUTOR_ID = str(uuid.uuid4()) # unique for each worker diff --git a/packages/node/octobot_node/protocol/debug.py b/packages/node/octobot_node/protocol/debug.py index ca05b7b317..7f8058df9b 100644 --- a/packages/node/octobot_node/protocol/debug.py +++ b/packages/node/octobot_node/protocol/debug.py @@ -14,73 +14,22 @@ # You should have received a copy of the GNU General Public License along # with OctoBot. If not, see . -import typing - -import octobot_commons.constants as commons_constants -import octobot_flow.enums as flow_enums import octobot_protocol.models as protocol_models import octobot_sync.constants as sync_constants import octobot_node.scheduler.api as scheduler_api import octobot_node.protocol.accounts as accounts_protocol import octobot_node.protocol.strategies as strategies_protocol import octobot_node.protocol.accounts_trading as accounts_trading_protocol +import octobot_node.protocol.util.privacy_filter as privacy_filter -_AUTH_DETAIL_FIELD_NAMES = ( - "api_key", - "api_secret", - "api_password", - "access_token", - "encrypted", -) - - -def _auth_details_dict_has_credentials(auth_details: dict) -> bool: - return bool( - auth_details.get("api_key") - or auth_details.get("api_secret") - or auth_details.get("api_password") - or auth_details.get("access_token") - or auth_details.get("encrypted") - ) - - -def _redact_auth_details_dict(auth_details: dict) -> dict: - redacted_auth_details = dict(auth_details) - for field_name in _AUTH_DETAIL_FIELD_NAMES: - if redacted_auth_details.get(field_name): - redacted_auth_details[field_name] = commons_constants.PRIVATE_MESSAGE_PLACEHOLDER - return redacted_auth_details - - -def _redact_action_for_debug(action: protocol_models.Action) -> protocol_models.Action: - if action.action_type != flow_enums.ActionType.APPLY_CONFIGURATION.value: - return action - configuration = action.configuration - if configuration is None: - return action - exchange_account_details = configuration.get("exchange_account_details") - if not isinstance(exchange_account_details, dict): - return action - auth_details = exchange_account_details.get("auth_details") - if not isinstance(auth_details, dict) or not _auth_details_dict_has_credentials(auth_details): - return action - redacted_configuration = { - **configuration, - "exchange_account_details": { - **exchange_account_details, - "auth_details": _redact_auth_details_dict(auth_details), - }, - } - return action.model_copy(update={"configuration": redacted_configuration}) - - -def _redact_actions_for_debug( - actions: typing.Optional[list[protocol_models.Action]], -) -> typing.Optional[list[protocol_models.Action]]: - if actions is None: - return None - return [_redact_action_for_debug(action) for action in actions] +def _redact_user_actions_for_debug( + user_actions: list[protocol_models.UserAction], +) -> list[protocol_models.UserAction]: + return [ + privacy_filter.privatize_user_action(user_action) + for user_action in user_actions + ] def _automation_state_for_debug( @@ -88,8 +37,10 @@ def _automation_state_for_debug( ) -> protocol_models.AutomationState: return automation.model_copy( update={ - "actions": _redact_actions_for_debug(automation.actions), - "priority_actions": _redact_actions_for_debug(automation.priority_actions), + "actions": privacy_filter.privatize_dag_actions(automation.actions), + "priority_actions": privacy_filter.privatize_dag_actions( + automation.priority_actions, + ), } ) @@ -117,7 +68,9 @@ async def get_debug_state(user_id: str) -> protocol_models.DebugState: _automation_state_for_debug(automation) for automation in await scheduler_api.get_automation_states(user_id) ] - user_actions = await scheduler_api.list_user_actions(user_id, active_only=False) + user_actions = _redact_user_actions_for_debug( + await scheduler_api.list_user_actions(user_id, active_only=False), + ) account_state = accounts_protocol.get_accounts_state(user_id) strategies_state = strategies_protocol.get_strategies_state(user_id) bound_account_ids = _account_ids_bound_to_running_automations(automations) diff --git a/packages/node/octobot_node/protocol/util/__init__.py b/packages/node/octobot_node/protocol/util/__init__.py new file mode 100644 index 0000000000..0c7e4d5082 --- /dev/null +++ b/packages/node/octobot_node/protocol/util/__init__.py @@ -0,0 +1,15 @@ +# Drakkar-Software OctoBot-Node +# Copyright (c) Drakkar-Software, All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. diff --git a/packages/node/octobot_node/protocol/util/privacy_filter.py b/packages/node/octobot_node/protocol/util/privacy_filter.py new file mode 100644 index 0000000000..2cf70adbc4 --- /dev/null +++ b/packages/node/octobot_node/protocol/util/privacy_filter.py @@ -0,0 +1,151 @@ +# This file is part of OctoBot Node (https://github.com/Drakkar-Software/OctoBot-Node) +# Copyright (c) 2025 Drakkar-Software, All rights reserved. +# +# OctoBot Node is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3.0 of the License, or (at +# your option) any later version. +# +# OctoBot is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with OctoBot. If not, see . + +import typing + +import octobot_commons.constants as commons_constants +import octobot_flow.enums as flow_enums +import octobot_protocol.models as protocol_models + + +_AUTH_DETAIL_FIELD_NAMES = ( + "api_key", + "api_secret", + "api_password", + "access_token", + "encrypted", +) + +_ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES = ( + "api_key", + "api_secret", + "api_passphrase", + "public_key", + "private_key", + "seed_phrase", +) + + +def _auth_details_dict_has_credentials(auth_details: dict) -> bool: + return bool( + auth_details.get("api_key") + or auth_details.get("api_secret") + or auth_details.get("api_password") + or auth_details.get("access_token") + or auth_details.get("encrypted") + ) + + +def _redact_auth_details_dict(auth_details: dict) -> dict: + redacted_auth_details = dict(auth_details) + for field_name in _AUTH_DETAIL_FIELD_NAMES: + if redacted_auth_details.get(field_name): + redacted_auth_details[field_name] = commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + return redacted_auth_details + + +def _privatize_dag_action(action: protocol_models.Action) -> protocol_models.Action: + if action.action_type != flow_enums.ActionType.APPLY_CONFIGURATION.value: + return action + configuration = action.configuration + if configuration is None: + return action + exchange_account_details = configuration.get("exchange_account_details") + if not isinstance(exchange_account_details, dict): + return action + auth_details = exchange_account_details.get("auth_details") + if not isinstance(auth_details, dict) or not _auth_details_dict_has_credentials(auth_details): + return action + redacted_configuration = { + **configuration, + "exchange_account_details": { + **exchange_account_details, + "auth_details": _redact_auth_details_dict(auth_details), + }, + } + return action.model_copy(update={"configuration": redacted_configuration}) + + +def privatize_dag_actions( + actions: typing.Optional[list[protocol_models.Action]], +) -> typing.Optional[list[protocol_models.Action]]: + if actions is None: + return None + return [_privatize_dag_action(action) for action in actions] + + +def _account_authentication_has_credentials( + authentication: protocol_models.AccountAuthentication, +) -> bool: + return any( + getattr(authentication, field_name) + for field_name in _ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES + ) + + +def _privatize_account_authentication( + authentication: protocol_models.AccountAuthentication, +) -> protocol_models.AccountAuthentication: + if not _account_authentication_has_credentials(authentication): + return authentication + redacted_updates = { + field_name: commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + for field_name in _ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES + if getattr(authentication, field_name) + } + return authentication.model_copy(update=redacted_updates) + + +def privatize_user_action( + user_action: protocol_models.UserAction, +) -> protocol_models.UserAction: + configuration = user_action.configuration + if configuration is None or configuration.actual_instance is None: + return user_action + actual_configuration = configuration.actual_instance + if isinstance(actual_configuration, protocol_models.CreateAccountAuthConfiguration): + redacted_authentication = _privatize_account_authentication( + actual_configuration.configuration, + ) + if redacted_authentication is actual_configuration.configuration: + return user_action + redacted_configuration = actual_configuration.model_copy( + update={"configuration": redacted_authentication}, + ) + return user_action.model_copy( + update={ + "configuration": protocol_models.UserActionConfiguration( + redacted_configuration, + ), + }, + ) + if isinstance(actual_configuration, protocol_models.EditAccountAuthConfiguration): + redacted_authentication = _privatize_account_authentication( + actual_configuration.configuration, + ) + if redacted_authentication is actual_configuration.configuration: + return user_action + redacted_configuration = actual_configuration.model_copy( + update={"configuration": redacted_authentication}, + ) + return user_action.model_copy( + update={ + "configuration": protocol_models.UserActionConfiguration( + redacted_configuration, + ), + }, + ) + return user_action diff --git a/packages/node/octobot_node/scheduler/octobot_flow_client.py b/packages/node/octobot_node/scheduler/octobot_flow_client.py index 3cfd579438..bf9209de24 100644 --- a/packages/node/octobot_node/scheduler/octobot_flow_client.py +++ b/packages/node/octobot_node/scheduler/octobot_flow_client.py @@ -20,6 +20,7 @@ import octobot_commons.dataclasses import octobot_commons.logging +import octobot_node.constants import octobot_node.errors as errors import octobot_node.scheduler.workflows_util as workflows_util @@ -33,6 +34,9 @@ # ensure environment is initialized octobot_flow.environment.initialize_environment(True) + octobot_flow.environment.register_executor_id( + octobot_node.constants.SCHEDULER_EXECUTOR_ID + ) except ImportError: pass # OctoBot Flow is not available diff --git a/packages/node/octobot_node/scheduler/scheduler.py b/packages/node/octobot_node/scheduler/scheduler.py index e4dcfbc795..e5359c3291 100644 --- a/packages/node/octobot_node/scheduler/scheduler.py +++ b/packages/node/octobot_node/scheduler/scheduler.py @@ -48,7 +48,8 @@ _BASE_CONFIG = dbos.DBOSConfig( name=DEFAULT_NAME, max_executor_threads=octobot_node.config.settings.SCHEDULER_MAX_EXECUTOR_THREADS, - application_version=VERSION, + application_version=VERSION, # octobot version + # executor_id=..., # a constant executor_id is required for DBOS workflow recovery: leave its init to DBOS ) diff --git a/packages/node/tests/protocol/test_debug.py b/packages/node/tests/protocol/test_debug.py index 70a7eba4da..6dcb5b98d8 100644 --- a/packages/node/tests/protocol/test_debug.py +++ b/packages/node/tests/protocol/test_debug.py @@ -10,7 +10,6 @@ import mock import pytest -import octobot_commons.constants as commons_constants import octobot_protocol.models as protocol_models import octobot_sync.constants as sync_constants @@ -92,6 +91,16 @@ async def test_assembles_debug_state_from_dependencies(self): strategies=sample_strategies, ) with ( + mock.patch.object( + debug_module.privacy_filter, + "privatize_dag_actions", + wraps=debug_module.privacy_filter.privatize_dag_actions, + ) as privatize_dag_actions_mock, + mock.patch.object( + debug_module.privacy_filter, + "privatize_user_action", + wraps=debug_module.privacy_filter.privatize_user_action, + ) as privatize_user_action_mock, mock.patch.object( debug_module.scheduler_api, "get_automation_states", @@ -125,6 +134,10 @@ async def test_assembles_debug_state_from_dependencies(self): _TEST_WALLET_ADDRESS, ["acc-bound"], ) + assert privatize_dag_actions_mock.call_count == 2 + privatize_dag_actions_mock.assert_any_call(sample_automations[0].actions) + privatize_dag_actions_mock.assert_any_call(sample_automations[0].priority_actions) + privatize_user_action_mock.assert_called_once_with(sample_user_actions[0]) assert debug_state.version == sync_constants.DEBUG_STATE_VERSION assert debug_state.debug is not None assert debug_state.debug.automations == sample_automations @@ -135,150 +148,7 @@ async def test_assembles_debug_state_from_dependencies(self): assert debug_state.debug.account_tradings == sample_trading_summaries @pytest.mark.asyncio - async def test_redacts_auth_details_in_automations(self): - automation_with_auth = protocol_models.AutomationState( - id="auto-auth", - status=protocol_models.WorkflowStatus.COMPLETED, - metadata=protocol_models.AutomationMetadata( - name="auto", - description="", - ), - actions=[ - protocol_models.Action( - id="action-1", - action_type="apply_configuration", - status=protocol_models.WorkflowStatus.COMPLETED, - configuration={ - "exchange_account_details": { - "auth_details": { - "api_key": "secret-key", - "api_secret": "secret-secret", - "api_password": "secret-pass", - }, - }, - }, - ), - ], - ) - accounts_state = protocol_models.AccountsState( - version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION, - accounts=[], - exchange_configs=[], - ) - strategies_state = protocol_models.StrategiesState( - version=sync_constants.USER_STRATEGIES_STATE_VERSION, - strategies=[], - ) - with ( - mock.patch.object( - debug_module.scheduler_api, - "get_automation_states", - mock.AsyncMock(return_value=[automation_with_auth]), - ), - mock.patch.object( - debug_module.scheduler_api, - "list_user_actions", - mock.AsyncMock(return_value=[]), - ), - mock.patch.object( - debug_module.accounts_protocol, - "get_accounts_state", - return_value=accounts_state, - ), - mock.patch.object( - debug_module.strategies_protocol, - "get_strategies_state", - return_value=strategies_state, - ), - mock.patch.object( - debug_module.accounts_trading_protocol, - "get_account_trading_summaries", - return_value=[], - ), - ): - debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS) - assert debug_state.debug is not None - assert debug_state.debug.automations is not None - auth_details = debug_state.debug.automations[0].actions[0].configuration[ - "exchange_account_details" - ]["auth_details"] - assert auth_details["api_key"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER - assert auth_details["api_secret"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER - assert auth_details["api_password"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER - - @pytest.mark.asyncio - async def test_leaves_non_config_actions_unchanged(self): - dsl_action = protocol_models.Action( - id="dsl-1", - action_type="dsl_script", - status=protocol_models.WorkflowStatus.COMPLETED, - dsl='run_octobot_process("acc", {}, [{"api_key": "leak"}])', - ) - automation = protocol_models.AutomationState( - id="auto-dsl", - status=protocol_models.WorkflowStatus.COMPLETED, - metadata=protocol_models.AutomationMetadata(name="auto", description=""), - actions=[dsl_action], - ) - accounts_state = protocol_models.AccountsState( - version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION, - accounts=[], - exchange_configs=[], - ) - strategies_state = protocol_models.StrategiesState( - version=sync_constants.USER_STRATEGIES_STATE_VERSION, - strategies=[], - ) - with ( - mock.patch.object( - debug_module.scheduler_api, - "get_automation_states", - mock.AsyncMock(return_value=[automation]), - ), - mock.patch.object( - debug_module.scheduler_api, - "list_user_actions", - mock.AsyncMock(return_value=[]), - ), - mock.patch.object( - debug_module.accounts_protocol, - "get_accounts_state", - return_value=accounts_state, - ), - mock.patch.object( - debug_module.strategies_protocol, - "get_strategies_state", - return_value=strategies_state, - ), - mock.patch.object( - debug_module.accounts_trading_protocol, - "get_account_trading_summaries", - return_value=[], - ), - ): - debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS) - returned_action = debug_state.debug.automations[0].actions[0] - assert returned_action is dsl_action - assert returned_action.dsl == dsl_action.dsl - - @pytest.mark.asyncio - async def test_leaves_apply_configuration_without_auth_unchanged(self): - config_action = protocol_models.Action( - id="action-1", - action_type="apply_configuration", - status=protocol_models.WorkflowStatus.COMPLETED, - configuration={ - "exchange_account_details": { - "auth_details": {}, - }, - }, - ) - automation = protocol_models.AutomationState( - id="auto-sim", - status=protocol_models.WorkflowStatus.COMPLETED, - metadata=protocol_models.AutomationMetadata(name="auto", description=""), - actions=[config_action], - ) + async def test_empty_collections_when_dependencies_return_empty(self): accounts_state = protocol_models.AccountsState( version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION, accounts=[], @@ -290,48 +160,15 @@ async def test_leaves_apply_configuration_without_auth_unchanged(self): ) with ( mock.patch.object( - debug_module.scheduler_api, - "get_automation_states", - mock.AsyncMock(return_value=[automation]), - ), - mock.patch.object( - debug_module.scheduler_api, - "list_user_actions", - mock.AsyncMock(return_value=[]), - ), - mock.patch.object( - debug_module.accounts_protocol, - "get_accounts_state", - return_value=accounts_state, - ), + debug_module.privacy_filter, + "privatize_dag_actions", + wraps=debug_module.privacy_filter.privatize_dag_actions, + ) as privatize_dag_actions_mock, mock.patch.object( - debug_module.strategies_protocol, - "get_strategies_state", - return_value=strategies_state, - ), - mock.patch.object( - debug_module.accounts_trading_protocol, - "get_account_trading_summaries", - return_value=[], - ), - ): - debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS) - returned_action = debug_state.debug.automations[0].actions[0] - assert returned_action is config_action - assert returned_action.configuration == config_action.configuration - - @pytest.mark.asyncio - async def test_empty_collections_when_dependencies_return_empty(self): - accounts_state = protocol_models.AccountsState( - version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION, - accounts=[], - exchange_configs=[], - ) - strategies_state = protocol_models.StrategiesState( - version=sync_constants.USER_STRATEGIES_STATE_VERSION, - strategies=[], - ) - with ( + debug_module.privacy_filter, + "privatize_user_action", + wraps=debug_module.privacy_filter.privatize_user_action, + ) as privatize_user_action_mock, mock.patch.object( debug_module.scheduler_api, "get_automation_states", @@ -360,6 +197,8 @@ async def test_empty_collections_when_dependencies_return_empty(self): ): debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS) get_account_trading_summaries_mock.assert_called_once_with(_TEST_WALLET_ADDRESS, []) + privatize_dag_actions_mock.assert_not_called() + privatize_user_action_mock.assert_not_called() assert debug_state.debug is not None assert debug_state.debug.automations == [] assert debug_state.debug.user_actions == [] @@ -386,6 +225,16 @@ async def test_ignores_exchange_account_ids_when_automation_not_running(self): strategies=[], ) with ( + mock.patch.object( + debug_module.privacy_filter, + "privatize_dag_actions", + wraps=debug_module.privacy_filter.privatize_dag_actions, + ) as privatize_dag_actions_mock, + mock.patch.object( + debug_module.privacy_filter, + "privatize_user_action", + wraps=debug_module.privacy_filter.privatize_user_action, + ) as privatize_user_action_mock, mock.patch.object( debug_module.scheduler_api, "get_automation_states", @@ -414,3 +263,7 @@ async def test_ignores_exchange_account_ids_when_automation_not_running(self): ): await debug_module.get_debug_state(_TEST_WALLET_ADDRESS) get_account_trading_summaries_mock.assert_called_once_with(_TEST_WALLET_ADDRESS, []) + assert privatize_dag_actions_mock.call_count == 2 + privatize_dag_actions_mock.assert_any_call(completed_automation.actions) + privatize_dag_actions_mock.assert_any_call(completed_automation.priority_actions) + privatize_user_action_mock.assert_not_called() diff --git a/packages/node/tests/protocol/util/test_privacy_filter.py b/packages/node/tests/protocol/util/test_privacy_filter.py new file mode 100644 index 0000000000..b0ca10b134 --- /dev/null +++ b/packages/node/tests/protocol/util/test_privacy_filter.py @@ -0,0 +1,136 @@ +# Drakkar-Software OctoBot-Node +# Copyright (c) 2025 Drakkar-Software, All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 3.0 of the License, or (at your option) any later version. + +import octobot_commons.constants as commons_constants +import octobot_protocol.models as protocol_models + +import octobot_node.protocol.util.privacy_filter as privacy_filter + + +class TestPrivatizeDagActions: + """Checks :func:`octobot_node.protocol.util.privacy_filter.privatize_dag_actions`.""" + + def test_redacts_auth_details_in_apply_configuration_action(self): + action = protocol_models.Action( + id="action-1", + action_type="apply_configuration", + status=protocol_models.WorkflowStatus.COMPLETED, + configuration={ + "exchange_account_details": { + "auth_details": { + "api_key": "secret-key", + "api_secret": "secret-secret", + "api_password": "secret-pass", + }, + }, + }, + ) + privatized_actions = privacy_filter.privatize_dag_actions([action]) + auth_details = privatized_actions[0].configuration[ + "exchange_account_details" + ]["auth_details"] + assert auth_details["api_key"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert auth_details["api_secret"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert auth_details["api_password"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + + def test_leaves_non_config_actions_unchanged(self): + dsl_action = protocol_models.Action( + id="dsl-1", + action_type="dsl_script", + status=protocol_models.WorkflowStatus.COMPLETED, + dsl='run_octobot_process("acc", {}, [{"api_key": "leak"}])', + ) + privatized_actions = privacy_filter.privatize_dag_actions([dsl_action]) + returned_action = privatized_actions[0] + assert returned_action is dsl_action + assert returned_action.dsl == dsl_action.dsl + + def test_leaves_apply_configuration_without_auth_unchanged(self): + config_action = protocol_models.Action( + id="action-1", + action_type="apply_configuration", + status=protocol_models.WorkflowStatus.COMPLETED, + configuration={ + "exchange_account_details": { + "auth_details": {}, + }, + }, + ) + privatized_actions = privacy_filter.privatize_dag_actions([config_action]) + returned_action = privatized_actions[0] + assert returned_action is config_action + assert returned_action.configuration == config_action.configuration + + +class TestPrivatizeUserAction: + """Checks :func:`octobot_node.protocol.util.privacy_filter.privatize_user_action`.""" + + def test_redacts_create_account_auth_configuration(self): + user_action = protocol_models.UserAction( + id="ua-create-auth", + configuration=protocol_models.UserActionConfiguration( + protocol_models.CreateAccountAuthConfiguration( + action_type=protocol_models.UserActionType.ACCOUNT_AUTH_CREATE, + configuration=protocol_models.AccountAuthentication( + id="auth-1", + api_key="secret-key", + api_secret="secret-secret", + api_passphrase="secret-pass", + public_key="secret-public", + private_key="secret-private", + seed_phrase="secret-seed", + ), + ), + ), + ) + privatized_user_action = privacy_filter.privatize_user_action(user_action) + authentication = ( + privatized_user_action.configuration.actual_instance.configuration + ) + assert authentication.api_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.api_secret == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.api_passphrase == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.public_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.private_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.seed_phrase == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.id == "auth-1" + + def test_redacts_edit_account_auth_configuration(self): + user_action = protocol_models.UserAction( + id="ua-edit-auth", + configuration=protocol_models.UserActionConfiguration( + protocol_models.EditAccountAuthConfiguration( + action_type=protocol_models.UserActionType.ACCOUNT_AUTH_EDIT, + id="auth-1", + configuration=protocol_models.AccountAuthentication( + id="auth-1", + api_key="secret-key", + api_secret="secret-secret", + ), + ), + ), + ) + privatized_user_action = privacy_filter.privatize_user_action(user_action) + authentication = ( + privatized_user_action.configuration.actual_instance.configuration + ) + assert authentication.api_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.api_secret == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER + assert authentication.id == "auth-1" + + def test_leaves_non_account_auth_user_actions_unchanged(self): + stop_user_action = protocol_models.UserAction( + id="ua-stop", + configuration=protocol_models.UserActionConfiguration( + protocol_models.StopAutomationConfiguration( + action_type=protocol_models.UserActionType.AUTOMATION_STOP, + id="auto-1", + ), + ), + ) + privatized_user_action = privacy_filter.privatize_user_action(stop_user_action) + assert privatized_user_action is stop_user_action diff --git a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py index d24f384fa0..bf39a5dfe2 100644 --- a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py +++ b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py @@ -31,6 +31,7 @@ import octobot_commons.json_util as json_util import octobot_commons.logging as commons_logging import octobot_commons.os_util as os_util +import octobot_commons.process_util as process_util import octobot_commons.profiles.profile_data as profile_data_module import octobot_commons.profiles.profile_data_import as profile_data_import import octobot_commons.profiles.exchange_auth_data as exchange_auth_data_module @@ -53,6 +54,14 @@ DEFAULT_DSL_PROFILE_ID = "non-trading" +# run_octobot_process uses two state layers: +# - Recall state (`EnsureOctobotProcessState` in DSL `last_execution_result`): master-side +# snapshot (ports, paths, stored pid, init_state_ok, executor_id). Persisted across +# re-calls until STOP, UPDATE_CONFIG, or respawn. +# - Child dump (`process_bot_state.json` → `ProcessBotState`): written by the child; used for +# timestamp-fresh checks and metadata.pid when the stored recall pid is stale. +# executor_id ties recall to the current DBOS scheduler worker. On mismatch +# with all child PIDs dead, respawn is forced immediately (grace is bypassed). class EnsureOctobotProcessState(pydantic.BaseModel): model_config = pydantic.ConfigDict(validate_assignment=True, extra="ignore") http_base_url: str @@ -62,12 +71,15 @@ class EnsureOctobotProcessState(pydantic.BaseModel): user_folder: str log_folder: str profile_id: str | None + # Last known child PID on the master; may lag after a child self-restart until adoption. pid: int state_file_path: str = "" - # Omitted in ensure success `self.value` (stop command); 0.0 is unused there. + # Wall-clock when the first spawn began; used only while init_state_ok is False (ping_timeout). started_waiting_at: float = 0.0 - # Set after process_bot_state.json liveness passes; disables the init `ping_timeout` cap (re-calls only use `waiting_time`). + # True once the child reached confirmed-alive; switches from init ping_timeout to recall/grace rules. init_state_ok: bool = False + # Required scheduler executor id at emit time; compared on recall to detect worker restart. + executor_id: str # Keys on `last_result` that `create_re_callable_result_dict` takes as top-level args (not state). @@ -92,7 +104,11 @@ def _resolve_state_file_path(recall_state: EnsureOctobotProcessState) -> str: ) +# --- Liveness and routing (recall state + child dump) --- + + def _is_process_state_alive(state: process_bot_state_import.ProcessBotState) -> bool: + """True when the child dump is timestamp-fresh (~2 × dump interval since metadata.updated_at).""" interval = octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS epsilon = max(0.1 * interval, 1e-6) now = time.time() @@ -106,6 +122,7 @@ def _is_process_state_alive(state: process_bot_state_import.ProcessBotState) -> async def _load_process_bot_state( state_file_path: str, ) -> typing.Optional[process_bot_state_import.ProcessBotState]: + """Load child dump from disk; None if missing or unreadable.""" try: async with aiofiles.open(state_file_path, mode="r", encoding="utf-8") as state_file: raw = await state_file.read() @@ -115,7 +132,17 @@ async def _load_process_bot_state( return None +def _is_state_timestamp_fresh( + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], +) -> bool: + """Alias for timestamp-fresh child dump (see _is_process_state_alive).""" + if loaded_state is None: + return False + return _is_process_state_alive(loaded_state) + + def _parse_ensure_recall_state(raw: dict) -> typing.Optional[EnsureOctobotProcessState]: + """Parse recall payload; empty or invalid dict → None.""" if not raw: return None try: @@ -124,6 +151,129 @@ def _parse_ensure_recall_state(raw: dict) -> typing.Optional[EnsureOctobotProces return None +def _metadata_pid_is_running( + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], +) -> bool: + """True when child dump metadata.pid is valid and still running in the OS.""" + if loaded_state is None: + return False + state_pid = loaded_state.metadata.pid + if state_pid <= 0: + return False + return process_util.pid_is_running(state_pid) + + +def _is_child_confirmed_alive( + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], +) -> bool: + """Strongest child-alive signal: timestamp-fresh dump and metadata.pid running.""" + if loaded_state is None: + return False + if not _is_state_timestamp_fresh(loaded_state): + return False + return _metadata_pid_is_running(loaded_state) + + +def _any_child_pid_running( + recall_state: EnsureOctobotProcessState, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], +) -> bool: + """True when either recall pid or child dump metadata.pid is running.""" + if _stored_pid_is_running(recall_state): + return True + return _metadata_pid_is_running(loaded_state) + + +def _executor_restarted_requires_respawn( + recall_state: EnsureOctobotProcessState, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], + *, + current_executor_id: str, +) -> bool: + """Scheduler worker restarted: recall executor_id differs and no child PID is running.""" + if _any_child_pid_running(recall_state, loaded_state): + return False + return recall_state.executor_id != current_executor_id + + +def _stored_pid_is_running(recall_state: EnsureOctobotProcessState) -> bool: + """Fast path: recall pid still running.""" + if recall_state.pid <= 0: + return False + return process_util.pid_is_running(recall_state.pid) + + +def _in_restart_grace_period( + recall_state: EnsureOctobotProcessState, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], + *, + now: float, + ping_timeout: float, + stored_pid_running: bool, +) -> bool: + """Child self-restart window: init done, all PIDs dead, recent dump within ping_timeout.""" + if not recall_state.init_state_ok or stored_pid_running: + return False + if loaded_state is None: + return False + last_updated_at = loaded_state.metadata.updated_at + if last_updated_at <= 0: + return False + return (now - last_updated_at) < ping_timeout + + +def _should_use_recall_path( + recall_state: EnsureOctobotProcessState, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], + *, + stored_pid_running: bool, + now: float, + ping_timeout: float, +) -> bool: + """Recall vs respawn: stored PID → confirmed alive → init → grace → else first_spawn.""" + if stored_pid_running: + return True + if _is_child_confirmed_alive(loaded_state): + return True + if not recall_state.init_state_ok: + return True + return _in_restart_grace_period( + recall_state, + loaded_state, + now=now, + ping_timeout=ping_timeout, + stored_pid_running=stored_pid_running, + ) + + +def _resolve_bound_pid( + recall_state: EnsureOctobotProcessState, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState], +) -> typing.Optional[int]: + """Bind operating PID from recall or fresh dump; None if metadata.pid dead (no raise).""" + if _stored_pid_is_running(recall_state): + return recall_state.pid + if loaded_state is None or not _is_state_timestamp_fresh(loaded_state): + return None + state_pid = loaded_state.metadata.pid + if state_pid <= 0: + raise commons_errors.DSLInterpreterError( + "process_bot_state.json is live but metadata.pid is missing or invalid." + ) + if process_util.pid_is_running(state_pid): + return state_pid + return None + + +def _apply_resolved_pid_to_state( + recall_state: EnsureOctobotProcessState, + resolved_pid: typing.Optional[int], +) -> EnsureOctobotProcessState: + if resolved_pid is None or resolved_pid == recall_state.pid: + return recall_state + return recall_state.model_copy(update={"pid": resolved_pid}) + + def _remove_path_for_fresh_start(path: str, *, logger: typing.Any) -> None: if not path or not str(path).strip(): logger.info("configuration update: skip remove (empty path)") @@ -484,7 +634,8 @@ def _listen_port_pair_with_shared_scan_offset( def create_octobot_process_operators( - signals: typing.Optional[dsl_interpreter.OperatorSignals] = None + signals: typing.Optional[dsl_interpreter.OperatorSignals] = None, + executor_id: str = "", ) -> list[type[dsl_interpreter.Operator]]: # Child process: user layout, ports, process_bot_state.json liveness (re-callable). class EnsureOctobotProcessOperator( @@ -510,6 +661,13 @@ def __init__(self, *args, **kwargs): dsl_interpreter.ProcessBoundOperatorMixin.__init__(self) dsl_interpreter.SignalableOperatorMixin.__init__(self, signals) + def _read_executor_id(self) -> str: + if not executor_id: + raise commons_errors.DSLInterpreterError( + "executor_id is required for run_octobot_process" + ) + return executor_id + @staticmethod def get_library() -> str: return commons_constants.CONTEXTUAL_OPERATORS_LIBRARY @@ -629,7 +787,10 @@ def _re_calling_result_dispatches_this_ensure( inner = rec.get("last_execution_result") if not isinstance(inner, dict): return False - return _parse_ensure_recall_state(inner) is not None + try: + return _parse_ensure_recall_state(inner) is not None + except commons_errors.DSLInterpreterError: + return False @classmethod def should_dispatch_operator_signal_for_result( @@ -683,26 +844,58 @@ async def _pre_compute_recall_path( start_time: float, recall_interval: float, ping_timeout: float, + loaded_state: typing.Optional[process_bot_state_import.ProcessBotState] = None, ) -> None: state_path = _resolve_state_file_path(recall_state) + now = time.time() + if loaded_state is None: + loaded_state = await _load_process_bot_state(state_path) + child_confirmed_alive = _is_child_confirmed_alive(loaded_state) + stored_pid_running = _stored_pid_is_running(recall_state) # Init window: fail and kill the child if the state file never became live in time. if ( not recall_state.init_state_ok - and time.time() - recall_state.started_waiting_at > ping_timeout + and now - recall_state.started_waiting_at > ping_timeout ): + resolved_pid = _resolve_bound_pid(recall_state, loaded_state) + if resolved_pid is not None: + self.pid = resolved_pid self.value = self.request_graceful_stop(logger=_get_logger()) raise commons_errors.DSLInterpreterError( "Timed out waiting for OctoBot process_bot_state.json during init (see ping_timeout).", ) + if _in_restart_grace_period( + recall_state, + loaded_state, + now=now, + ping_timeout=ping_timeout, + stored_pid_running=stored_pid_running, + ): + _get_logger().info( + "restart grace: waiting for child state dump (last_updated_at=%s, ping_timeout=%s)", + loaded_state.metadata.updated_at if loaded_state is not None else None, + ping_timeout, + ) + resolved_pid = _resolve_bound_pid(recall_state, loaded_state) + if resolved_pid is not None: + if resolved_pid != recall_state.pid: + _get_logger().info( + "adopted pid=%s (was %s) from process_bot_state", + resolved_pid, + recall_state.pid, + ) + self.pid = resolved_pid + recall_state = _apply_resolved_pid_to_state(recall_state, resolved_pid) _get_logger().info("process state path (re-call path): %s", state_path) - loaded = await _load_process_bot_state(state_path) - is_live = loaded is not None and _is_process_state_alive(loaded) - if is_live: + # Running: stored recall pid or child-confirmed-alive → init_state_ok, optional EAE. + is_running = stored_pid_running or child_confirmed_alive + if is_running: + logged_pid = resolved_pid if resolved_pid is not None else recall_state.pid _get_logger().info( "OctoBot is running (re-call path): user_folder=%r base_url=%r pid=%s", recall_state.user_folder, recall_state.http_base_url, - recall_state.pid, + logged_pid, ) updated = recall_state.model_copy( update={"init_state_ok": True, "state_file_path": state_path} @@ -712,15 +905,17 @@ async def _pre_compute_recall_path( last_result=last_result, start_time=start_time, recall_interval=recall_interval, - parsed_process_bot_state=loaded, + parsed_process_bot_state=loaded_state, ) return + # Still starting: not confirmed alive (grace or waiting for first dump); re-call only. + logged_pid = resolved_pid if resolved_pid is not None else recall_state.pid _get_logger().info( - "OctoBot is still starting (re-call path, process state not live): user_folder=%r " + "OctoBot is still starting (re-call path, child not confirmed alive): user_folder=%r " "base_url=%r pid=%s state_path=%s", recall_state.user_folder, recall_state.http_base_url, - recall_state.pid, + logged_pid, state_path, ) self._emit_ensure_recall( @@ -728,7 +923,7 @@ async def _pre_compute_recall_path( last_result=last_result, start_time=start_time, recall_interval=recall_interval, - parsed_process_bot_state=loaded, + parsed_process_bot_state=loaded_state, ) async def _pre_compute_first_spawn( @@ -809,28 +1004,39 @@ async def _pre_compute_first_spawn( pid=self.pid or 0, state_file_path=state_file_path, started_waiting_at=start_time, + executor_id=self._read_executor_id(), ) # First process state check after spawn (init cap still uses `state.started_waiting_at`). loaded = await _load_process_bot_state(state_file_path) is_live = loaded is not None and _is_process_state_alive(loaded) if is_live: - _get_logger().info( - "OctoBot is running (first-spawn path): user_folder=%r base_url=%r pid=%s", - user_folder, - http_base_url, - self.pid, - ) - ready = state.model_copy(update={"init_state_ok": True}) - self._emit_ensure_recall( - state=ready, - last_result=last_result, - start_time=start_time, - recall_interval=recall_interval, - parsed_process_bot_state=loaded, - ) - return + state_pid = loaded.metadata.pid + if state_pid <= 0: + raise commons_errors.DSLInterpreterError( + "process_bot_state.json is live but metadata.pid is missing or invalid." + ) + if process_util.pid_is_running(state_pid): + self.pid = state_pid + state = state.model_copy(update={"pid": state_pid}) + _get_logger().info( + "OctoBot is running (first-spawn path): user_folder=%r base_url=%r pid=%s", + user_folder, + http_base_url, + state_pid, + ) + ready = state.model_copy(update={"init_state_ok": True}) + self._emit_ensure_recall( + state=ready, + last_result=last_result, + start_time=start_time, + recall_interval=recall_interval, + parsed_process_bot_state=loaded, + ) + return + # Orphaned timestamp-fresh dump with dead metadata.pid (e.g. after master restart): + # treat as still starting, not an error. _get_logger().info( - "OctoBot is still starting (first-spawn path, process state not live): user_folder=%r base_url=%r " + "OctoBot is still starting (first-spawn path, child not confirmed alive): user_folder=%r base_url=%r " "pid=%s state_path=%s", user_folder, http_base_url, @@ -864,17 +1070,25 @@ async def _pre_compute_update_config_refresh( "run_octobot_process call.", ) process_logger = _get_logger() + state_path = _resolve_state_file_path(recall_state) + loaded_state = await _load_process_bot_state(state_path) + resolved_pid = _resolve_bound_pid(recall_state, loaded_state) + if resolved_pid is None: + raise commons_errors.DSLInterpreterError( + "run_octobot_process(UPDATE_CONFIG) cannot resolve a running child pid to stop." + ) + self.pid = resolved_pid process_logger.info( "configuration update: begin refresh user_folder=%r user_root=%r log_folder=%r pid=%s", user_folder, recall_state.user_root, recall_state.log_folder, - recall_state.pid, + resolved_pid, ) stop_outcome = self.request_graceful_stop(logger=process_logger) process_logger.info("configuration update: graceful stop outcome: %s", stop_outcome) await self.wait_until_pid_stopped( - recall_state.pid, + resolved_pid, logger=process_logger, timeout_seconds=ping_timeout, ) @@ -902,10 +1116,26 @@ async def pre_compute(self) -> None: raise commons_errors.DSLInterpreterError( "run_octobot_process(execution_stop) requires last_execution_result from a prior run_octobot_process call.", ) - if not self.is_process_running(): - self.value = {"status": "already_stopped", "reason": "not_running"} + state_path = _resolve_state_file_path(recall_state) + loaded_state = await _load_process_bot_state(state_path) + stored_pid_running = _stored_pid_is_running(recall_state) + resolved_pid = _resolve_bound_pid(recall_state, loaded_state) + if resolved_pid is not None: + self.pid = resolved_pid + self.value = self.request_graceful_stop(logger=_get_logger()) return - self.value = self.request_graceful_stop(logger=_get_logger()) + # Grace with dead metadata pid: child restarting; no SIGTERM, report already_stopped. + if _in_restart_grace_period( + recall_state, + loaded_state, + now=time.time(), + ping_timeout=float(params.get("ping_timeout") or DEFAULT_ENSURE_TIMEOUT), + stored_pid_running=stored_pid_running, + ): + _get_logger().info( + "run_octobot_process(STOP): child in restart grace; treating as already_stopped" + ) + self.value = {"status": "already_stopped", "reason": "not_running"} return working_directory = os.path.normpath(os.getcwd()) user_folder = params["user_folder"] @@ -928,15 +1158,67 @@ async def pre_compute(self) -> None: ) return recall_state = self._try_parse_ensure_recall_state(last_result) - if recall_state is not None and self.is_process_running(): - await self._pre_compute_recall_path( + if recall_state is not None: + # 1. Load child dump alongside strict recall state. + state_path = _resolve_state_file_path(recall_state) + loaded_state = await _load_process_bot_state(state_path) + stored_pid_running = _stored_pid_is_running(recall_state) + state_timestamp_fresh = _is_state_timestamp_fresh(loaded_state) + # 2. Master restart → first_spawn immediately (grace bypassed). + if _executor_restarted_requires_respawn( recall_state, - last_result, - start_time=start_time, - recall_interval=recall_interval, + loaded_state, + current_executor_id=self._read_executor_id(), + ): + _get_logger().info( + "scheduler worker restarted; forcing child respawn for user_folder=%r " + "(recall executor_id=%r, current=%r)", + recall_state.user_folder, + recall_state.executor_id, + self._read_executor_id(), + ) + await self._pre_compute_first_spawn( + user_folder, + working_directory, + params, + last_result, + start_time=start_time, + recall_interval=recall_interval, + ) + return + # 3. Recall vs respawn from liveness/grace rules. + if _should_use_recall_path( + recall_state, + loaded_state, + stored_pid_running=stored_pid_running, + now=start_time, ping_timeout=ping_timeout, + ): + await self._pre_compute_recall_path( + recall_state, + last_result, + start_time=start_time, + recall_interval=recall_interval, + ping_timeout=ping_timeout, + loaded_state=loaded_state, + ) + return + # 4. Recall declined (e.g. grace expired) → log and first_spawn below. + last_state_updated_at = ( + loaded_state.metadata.updated_at if loaded_state is not None else None + ) + _get_logger().info( + "run_octobot_process: respawning child (recall path declined): user_folder=%r " + "stored_pid=%s stored_pid_running=%s state_timestamp_fresh=%s init_state_ok=%s " + "last_state_updated_at=%s ping_timeout=%s", + recall_state.user_folder, + recall_state.pid, + stored_pid_running, + state_timestamp_fresh, + recall_state.init_state_ok, + last_state_updated_at, + ping_timeout, ) - return await self._pre_compute_first_spawn( user_folder, working_directory, diff --git a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py index 88c9aaa693..b07143d73d 100644 --- a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py +++ b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py @@ -46,7 +46,10 @@ import tentacles.Trading.Mode.simple_market_making_trading_mode.simple_market_making_trading as simple_market_making_trading # Nested class from factory (not exposed on ``octobot_process_ops``). -EnsureOctobotProcessOperator = octobot_process_ops.create_octobot_process_operators(None)[0] +TEST_EXECUTOR_ID = "test-executor" +EnsureOctobotProcessOperator = octobot_process_ops.create_octobot_process_operators( + None, TEST_EXECUTOR_ID +)[0] _TESTS_RUN_OCTOBOT_PROCESS_WAITING_TIME_SEC = 2 pytestmark = pytest.mark.asyncio @@ -56,16 +59,66 @@ async def _async_return_none_mock(*_unused): return None -async def _async_live_process_bot_state_mock(*_unused): +async def _async_live_process_bot_state_mock(*_unused, metadata_pid=20002): now = octobot_process_ops.time.time() interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) return process_bot_state_import.ProcessBotState( metadata=process_bot_state_import.Metadata( updated_at=now - 0.1, next_updated_at=now + interval, + pid=metadata_pid, ), exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(), ) + + +def _stale_process_bot_state_for_grace(*, age_seconds: float, metadata_pid: int = 10002): + now = octobot_process_ops.time.time() + interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) + return process_bot_state_import.ProcessBotState( + metadata=process_bot_state_import.Metadata( + updated_at=now - age_seconds, + next_updated_at=now - age_seconds + interval, + pid=metadata_pid, + ), + exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(), + ) + + +async def _async_live_process_bot_state_with_pid_10001(*_unused): + return await _async_live_process_bot_state_mock(metadata_pid=10001) + + +def _healthy_recall_inner( + *, + pid: int = 10002, + init_state_ok: bool = True, + user_root: str | None = None, + tmp_path=None, + executor_id: str = TEST_EXECUTOR_ID, +) -> dict: + if user_root is None and tmp_path is not None: + user_root = str( + tmp_path / commons_constants.USER_FOLDER / commons_constants.AUTOMATIONS_FOLDER / "ub" + ) + user_root = user_root or "/x/ub" + state_fn = os.path.join(user_root, octobot_constants.PROCESS_BOT_STATE_FILE_NAME) + return { + "waiting_time": octobot_process_ops.DEFAULT_PING_WAITING_TIME, + "last_execution_time": 0.0, + "http_base_url": "http://127.0.0.1:20050", + "web_port": 20050, + "node_port": 30050, + "user_root": user_root, + "user_folder": "ub", + "log_folder": "/x/logs/ub", + "profile_id": "p", + "pid": pid, + "state_file_path": state_fn, + "started_waiting_at": 0.0, + "init_state_ok": init_state_ok, + "executor_id": executor_id, + } def _stop_test_ensure_state_dict(http_base_url: str) -> dict: return octobot_process_ops.EnsureOctobotProcessState( http_base_url=http_base_url, @@ -79,6 +132,7 @@ def _stop_test_ensure_state_dict(http_base_url: str) -> dict: state_file_path=os.path.normpath( os.path.join("/x", octobot_constants.PROCESS_BOT_STATE_FILE_NAME) ), + executor_id=TEST_EXECUTOR_ID, ).model_dump() @@ -980,9 +1034,13 @@ async def test_returns_recallable_with_init_state_ok_after_first_spawn(self, tmp process_util, "spawn_managed_subprocess", ) as spawn_mock, mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 10001, + ), mock.patch.object( octobot_process_ops, "_load_process_bot_state", - new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock), + new=mock.AsyncMock(side_effect=_async_live_process_bot_state_with_pid_10001), ): spawn_mock.return_value.pid = 10001 await op.pre_compute() @@ -1055,9 +1113,9 @@ async def test_returns_recallable_with_init_state_ok_on_recall_path(self, tmp_pa "getcwd", return_value=str(tmp_path), ), mock.patch.object( - dsl_interpreter.ProcessBoundOperatorMixin, - "is_process_running", - return_value=True, + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 10002, ), mock.patch.object( octobot_process_ops, "_load_process_bot_state", @@ -1098,6 +1156,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path): "state_file_path": state_fn, "started_waiting_at": 0.0, "init_state_ok": False, + "executor_id": TEST_EXECUTOR_ID, } op = EnsureOctobotProcessOperator( user_folder="ub", @@ -1122,6 +1181,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path): ), mock.patch.object( octobot_process_ops, "_load_process_bot_state", + new=mock.AsyncMock(return_value=None), ) as load_mock, mock.patch.object( octobot_process_ops, "_listen_port_pair_with_shared_scan_offset", @@ -1129,7 +1189,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path): with pytest.raises(commons_errors.DSLInterpreterError, match="Timed out waiting"): await op.pre_compute() stop_mock.assert_called_once_with(logger=mock.ANY) - load_mock.assert_not_called() + load_mock.assert_called() ffp.assert_not_called() @@ -1154,6 +1214,7 @@ async def test_does_not_apply_init_timeout_after_init_state_ok(self, tmp_path): "state_file_path": state_fn2, "started_waiting_at": 0.0, "init_state_ok": True, + "executor_id": TEST_EXECUTOR_ID, } op = EnsureOctobotProcessOperator( user_folder="ub", @@ -1167,9 +1228,9 @@ async def test_does_not_apply_init_timeout_after_init_state_ok(self, tmp_path): "getcwd", return_value=str(tmp_path), ), mock.patch.object( - dsl_interpreter.ProcessBoundOperatorMixin, - "is_process_running", - return_value=True, + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id in (88002, 20002), ), mock.patch.object(octobot_process_ops, "time", st_time), mock.patch.object( octobot_process_ops, "_load_process_bot_state", @@ -1528,7 +1589,8 @@ async def _run_default_config_lifecycle( ) operator_signals_holder = dsl_interpreter.OperatorSignals() stop_operator_cls = octobot_process_ops.create_octobot_process_operators( - operator_signals_holder + operator_signals_holder, + TEST_EXECUTOR_ID, )[0] operator_signals_holder.sync({ stop_operator_cls.get_name(): dsl_interpreter.OperatorSignal.STOP.value, @@ -1595,7 +1657,8 @@ async def test_execution_stop_dead_child_is_already_stopped(self): inner = _stop_test_ensure_state_dict("http://127.0.0.1:7") operator_signals_holder = dsl_interpreter.OperatorSignals() operator_under_test = octobot_process_ops.create_octobot_process_operators( - operator_signals_holder + operator_signals_holder, + TEST_EXECUTOR_ID, )[0] operator_signals_holder.sync({ operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value, @@ -1605,23 +1668,22 @@ async def test_execution_stop_dead_child_is_already_stopped(self): profile_data=_MINIMAL_PROFILE_DATA, last_execution_result=_re_calling_ensure_value(inner), ) - with ( - mock.patch.object( - dsl_interpreter.ProcessBoundOperatorMixin, - "is_process_running", - return_value=False, - ), + with mock.patch.object( + process_util, + "pid_is_running", + return_value=False, ): await op.pre_compute() assert isinstance(op.value, dict) assert op.value["status"] == "already_stopped" async def test_execution_stop_short_circuits_without_sigterm_when_not_running(self): - """STOP branch returns already_stopped before ``request_graceful_stop`` when ``is_process_running`` is false.""" + """STOP branch returns already_stopped before ``request_graceful_stop`` when stored pid is not running.""" inner = _stop_test_ensure_state_dict("http://127.0.0.1:7") operator_signals_holder = dsl_interpreter.OperatorSignals() operator_under_test = octobot_process_ops.create_octobot_process_operators( - operator_signals_holder + operator_signals_holder, + TEST_EXECUTOR_ID, )[0] operator_signals_holder.sync({ operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value, @@ -1634,8 +1696,8 @@ async def test_execution_stop_short_circuits_without_sigterm_when_not_running(se graceful_stop_mock = mock.Mock() with ( mock.patch.object( - dsl_interpreter.ProcessBoundOperatorMixin, - "is_process_running", + process_util, + "pid_is_running", return_value=False, ), mock.patch.object( @@ -1652,7 +1714,8 @@ async def test_execution_stop_os_kill_failure_raises(self): inner = _stop_test_ensure_state_dict("http://127.0.0.1:7") operator_signals_holder = dsl_interpreter.OperatorSignals() operator_under_test = octobot_process_ops.create_octobot_process_operators( - operator_signals_holder + operator_signals_holder, + TEST_EXECUTOR_ID, )[0] operator_signals_holder.sync({ operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value, @@ -1744,10 +1807,12 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa octobot_constants.PROCESS_BOT_STATE_FILE_NAME, ), init_state_ok=True, + executor_id=TEST_EXECUTOR_ID, ).model_dump() operator_signals_holder = dsl_interpreter.OperatorSignals() operator_under_test = octobot_process_ops.create_octobot_process_operators( - operator_signals_holder + operator_signals_holder, + TEST_EXECUTOR_ID, )[0] operator_signals_holder.sync({ operator_under_test.get_name(): dsl_interpreter.OperatorSignal.UPDATE_CONFIG.value, @@ -1770,6 +1835,11 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa "request_graceful_stop", return_value={"status": "stopped", "signal": "sigterm"}, ) as stop_mock, + mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 4242, + ), mock.patch.object( octobot_process_ops, "_load_process_bot_state", @@ -1785,10 +1855,563 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa stop_mock.assert_called_once() wait_mock.assert_awaited_once() spawn_mock.assert_called_once() - assert not (user_automation / "stale_marker.txt").exists() - assert isinstance(op.value, dict) assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value finally: shutil.rmtree(tmp_path / commons_constants.USER_FOLDER, ignore_errors=True) if (tmp_path / "logs").exists(): shutil.rmtree(tmp_path / "logs", ignore_errors=True) + + +class TestMetadataPidRoundTrip: + def test_metadata_pid_to_dict_from_dict(self): + metadata = process_bot_state_import.Metadata( + updated_at=1.0, + next_updated_at=2.0, + pid=424242, + ) + restored = process_bot_state_import.Metadata.from_dict( + metadata.to_dict(include_default_values=False) + ) + assert restored.pid == 424242 + + +class TestShouldUseRecallPathWhenStoredPidDeadButStateLive: + async def test_adopts_pid_from_live_state_without_spawn(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 20002, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + le = op.value[dsl_interpreter.ReCallingOperatorResult.__name__]["last_execution_result"] + assert le.get("init_state_ok") is True + assert le.get("pid") == 20002 + + +class TestShouldUseRecallPathDuringInitWhenStoredPidDead: + async def test_recall_without_spawn_during_init(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, init_state_ok=False, tmp_path=tmp_path) + inner["started_waiting_at"] = octobot_process_ops.time.time() + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_return_none_mock), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestRecallPathWhenStateFileMissingButPidRunning: + async def test_recall_when_pid_running_without_state_file(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 10002, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_return_none_mock), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestRestartGracePeriodAvoidsRespawnWhileStateStale: + async def test_recall_during_restart_grace(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + stale_state = _stale_process_bot_state_for_grace( + age_seconds=float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) * 2.5, + metadata_pid=10002, + ) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ping_timeout=120.0, + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=stale_state), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestFirstSpawnAfterRestartGraceExpires: + async def test_respawns_when_grace_expired(self, tmp_path): + (tmp_path / "start.py").write_text("#", encoding="utf-8") + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + stale_state = _stale_process_bot_state_for_grace(age_seconds=200.0, metadata_pid=10002) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ping_timeout=120.0, + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": inner["user_root"], + "profile_id": "x", + "already_prepared": True, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=stale_state), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30003) + await op.pre_compute() + spawn_mock.assert_called_once() + + +class TestFirstSpawnWhenStateFileMissingAndPidDeadAfterInit: + async def test_respawns_when_no_state_file_and_pid_dead(self, tmp_path): + (tmp_path / "start.py").write_text("#", encoding="utf-8") + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": inner["user_root"], + "profile_id": "x", + "already_prepared": True, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_return_none_mock), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30004) + await op.pre_compute() + spawn_mock.assert_called_once() + + +class TestStopAdoptsPidFromProcessBotState: + async def test_stop_signals_adopted_pid(self): + inner = _healthy_recall_inner(pid=10002) + operator_signals_holder = dsl_interpreter.OperatorSignals() + operator_under_test = octobot_process_ops.create_octobot_process_operators( + operator_signals_holder, + TEST_EXECUTOR_ID, + )[0] + operator_signals_holder.sync({ + operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value, + }) + op = operator_under_test( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + graceful_stop_mock = mock.Mock(return_value={"status": "stopped", "signal": "sigterm"}) + with ( + mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 20002, + ), + mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock), + ), + mock.patch.object( + operator_under_test, + "request_graceful_stop", + new=graceful_stop_mock, + ), + ): + await op.pre_compute() + graceful_stop_mock.assert_called_once() + assert op.pid == 20002 + assert op.value == {"status": "stopped", "signal": "sigterm"} + + +class TestExecutorRestartedRequiresRespawn: + async def test_marker_mismatch_forces_first_spawn(self, tmp_path): + (tmp_path / "start.py").write_text("#", encoding="utf-8") + inner = _healthy_recall_inner( + pid=10002, + tmp_path=tmp_path, + executor_id="old-executor-id", + ) + live_state = await _async_live_process_bot_state_mock(metadata_pid=20002) + op = octobot_process_ops.create_octobot_process_operators( + None, TEST_EXECUTOR_ID + )[0]( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": inner["user_root"], + "profile_id": "x", + "already_prepared": True, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=live_state), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30005) + await op.pre_compute() + spawn_mock.assert_called_once() + + +class TestExecutorRestartSkippedWhenChildPidRunning: + async def test_marker_mismatch_but_metadata_pid_running_recalls(self, tmp_path): + inner = _healthy_recall_inner( + pid=10002, + tmp_path=tmp_path, + executor_id="old-executor-id", + ) + op = octobot_process_ops.create_octobot_process_operators( + None, TEST_EXECUTOR_ID + )[0]( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + side_effect=lambda process_id: process_id == 20002, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestGraceWhenTimestampFreshButMetadataPidDead: + async def test_recall_during_grace_without_spawn(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + now = octobot_process_ops.time.time() + interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) + fresh_dead_pid_state = process_bot_state_import.ProcessBotState( + metadata=process_bot_state_import.Metadata( + updated_at=now - 0.1, + next_updated_at=now + interval, + pid=20002, + ), + exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(), + ) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ping_timeout=120.0, + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=fresh_dead_pid_state), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestExecutorRestartDoesNotBypassGraceWhenMarkerMatches: + async def test_recall_during_grace_when_marker_matches(self, tmp_path): + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + stale_state = _stale_process_bot_state_for_grace( + age_seconds=float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) * 2.5, + metadata_pid=10002, + ) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ping_timeout=120.0, + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock, mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=stale_state), + ): + await op.pre_compute() + spawn_mock.assert_not_called() + assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value + + +class TestRespawnsWhenGraceExpiredAndPidDead: + async def test_first_spawn_when_grace_expired(self, tmp_path): + (tmp_path / "start.py").write_text("#", encoding="utf-8") + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + stale_state = _stale_process_bot_state_for_grace(age_seconds=200.0, metadata_pid=10002) + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ping_timeout=120.0, + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + process_util, + "pid_is_running", + return_value=False, + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": inner["user_root"], + "profile_id": "x", + "already_prepared": True, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(return_value=stale_state), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30006) + await op.pre_compute() + spawn_mock.assert_called_once() + + +class TestEnsureOctobotProcessStateEmitsExecutorId: + async def test_first_spawn_emits_executor_id(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + (tmp_path / "start.py").write_text("#", encoding="utf-8") + op = EnsureOctobotProcessOperator( + user_folder="emit_master_bot", + profile_data=_MINIMAL_PROFILE_DATA, + ) + with mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_return_none_mock), + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": str( + tmp_path + / commons_constants.USER_FOLDER + / commons_constants.AUTOMATIONS_FOLDER + / "emit_master_bot" + ), + "profile_id": "x", + "already_prepared": False, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=40001) + await op.pre_compute() + le = op.value[dsl_interpreter.ReCallingOperatorResult.__name__]["last_execution_result"] + assert le["executor_id"] == TEST_EXECUTOR_ID + + +class TestRecallStateRequiresExecutorId: + async def test_missing_executor_id_falls_through_to_first_spawn(self, tmp_path): + (tmp_path / "start.py").write_text("#", encoding="utf-8") + inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path) + del inner["executor_id"] + op = EnsureOctobotProcessOperator( + user_folder="ub", + profile_data=_MINIMAL_PROFILE_DATA, + last_execution_result=_re_calling_ensure_value(inner), + ) + with mock.patch.object( + octobot_process_ops.os, + "getcwd", + return_value=str(tmp_path), + ), mock.patch.object( + octobot_process_ops, + "ensure_user_profile_and_layout", + new=mock.AsyncMock( + return_value={ + "user_root": inner["user_root"], + "profile_id": "x", + "already_prepared": True, + } + ), + ), mock.patch.object( + octobot_process_ops, + "_listen_port_pair_with_shared_scan_offset", + return_value=(20050, 30050), + ), mock.patch.object( + octobot_process_ops, + "_load_process_bot_state", + new=mock.AsyncMock(side_effect=_async_return_none_mock), + ), mock.patch.object( + process_util, + "spawn_managed_subprocess", + ) as spawn_mock: + spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30007) + await op.pre_compute() + spawn_mock.assert_called_once()