diff --git a/octobot/cli.py b/octobot/cli.py
index 88e829c55e..7023c4be76 100644
--- a/octobot/cli.py
+++ b/octobot/cli.py
@@ -25,7 +25,7 @@
try:
import octobot_commons.os_util as os_util
- import octobot_commons.logging as logging
+ import octobot_commons.logging
import octobot_commons.configuration as configuration
import octobot_commons.profiles as profiles
import octobot_commons.authentication as authentication
@@ -67,7 +67,7 @@ def update_config_with_args(starting_args, config: configuration.Configuration,
try:
import octobot_backtesting.constants as backtesting_constants
except ImportError as e:
- logging.get_logger().error(
+ octobot_commons.logging.get_logger().error(
"Can't start backtesting without the octobot_backtesting package properly installed.")
raise e
@@ -325,25 +325,39 @@ def _load_or_create_tentacles(community_auth, config, logger):
config.load_profiles_if_possible_and_necessary()
+def _init_cli_overriden_folders(args):
+ overrides = {}
+ if args.user_folder:
+ overrides["user_folder"] = args.user_folder
+ _set_user_root_from_cli(args.user_folder)
+ if args.log_folder:
+ overrides["log_folder"] = args.log_folder
+ logs_folder = args.log_folder
+ else:
+ logs_folder = constants.LOGS_FOLDER
+ return overrides, logs_folder
+
+
def start_octobot(args, default_config_file=None):
logger = None
try:
if args.version:
print(constants.LONG_VERSION)
return
-
- user_folder = getattr(args, "user_folder", None)
- if user_folder:
- _set_user_root_from_cli(user_folder)
-
- # log folder: --log-folder overrides default (from LOGS_FOLDER env at import + default "logs")
- logs_folder = getattr(args, "log_folder", None) or constants.LOGS_FOLDER
+
+ overrides, logs_folder = _init_cli_overriden_folders(args)
logger = octobot_logger.init_logger(logs_folder=logs_folder)
startup_messages = []
# Version
logger.info("Version : {0}".format(constants.LONG_VERSION))
+ # Log folder overrides
+ if overrides:
+ logger.info(f"Overriding default folders: {overrides}")
+ else:
+ logger.info(f"Using default folders")
+
# Current running environment
_log_environment(logger)
diff --git a/octobot/storage/process_bot_state_dumper.py b/octobot/storage/process_bot_state_dumper.py
index eba1502c0b..ff51f7a9f1 100644
--- a/octobot/storage/process_bot_state_dumper.py
+++ b/octobot/storage/process_bot_state_dumper.py
@@ -73,7 +73,11 @@ async def _write_state_file_async(
) -> None:
now = time.time()
state = process_bot_state_import.ProcessBotState(
- metadata=process_bot_state_import.Metadata(updated_at=now, next_updated_at=now + interval),
+ metadata=process_bot_state_import.Metadata(
+ updated_at=now,
+ next_updated_at=now + interval,
+ pid=os.getpid(),
+ ),
exchange_account_elements=_synced_exchange_account_elements_for_first_trading_exchange(
bot,
),
diff --git a/packages/flow/octobot_flow/entities/accounts/process_bot_state.py b/packages/flow/octobot_flow/entities/accounts/process_bot_state.py
index 8128b1716a..0e72f669a0 100644
--- a/packages/flow/octobot_flow/entities/accounts/process_bot_state.py
+++ b/packages/flow/octobot_flow/entities/accounts/process_bot_state.py
@@ -10,11 +10,14 @@
@dataclasses.dataclass
class Metadata(octobot_commons.dataclasses.MinimizableDataclass):
"""
- Timestamps written with process bot state dumps; used for file-based liveness checks.
+ Timestamps and child PID written with process bot state dumps. Liveness checks use
+ updated_at / next_updated_at only; pid is the authoritative child PID for parent binding
+ after restarts.
"""
updated_at: float = 0.0
next_updated_at: float = 0.0
+ pid: int = 0
@dataclasses.dataclass
diff --git a/packages/flow/octobot_flow/environment.py b/packages/flow/octobot_flow/environment.py
index e32bacb11d..22a41028f9 100644
--- a/packages/flow/octobot_flow/environment.py
+++ b/packages/flow/octobot_flow/environment.py
@@ -1,8 +1,21 @@
+import typing
+
import octobot.constants # will load .env file and init constants
import octobot_flow.repositories.community
import octobot_trading.constants
+_EXECUTOR_ID: typing.Optional[str] = None
+
+
+def register_executor_id(executor_id: str) -> None:
+ global _EXECUTOR_ID
+ _EXECUTOR_ID = executor_id
+
+
+def get_executor_id() -> typing.Optional[str]:
+ return _EXECUTOR_ID
+
def initialize_environment(allow_funds_transfer: bool = False) -> None:
octobot_flow.repositories.community.initialize_community_authentication()
diff --git a/packages/flow/octobot_flow/logic/dsl/dsl_executor.py b/packages/flow/octobot_flow/logic/dsl/dsl_executor.py
index 3d9f20e135..f874f0bfb8 100644
--- a/packages/flow/octobot_flow/logic/dsl/dsl_executor.py
+++ b/packages/flow/octobot_flow/logic/dsl/dsl_executor.py
@@ -12,6 +12,7 @@
import octobot_evaluators.evaluators as evaluators
import octobot_flow.entities
+import octobot_flow.environment
import octobot_flow.errors
import octobot_flow.enums
import octobot_flow.logic.dsl.action_error_util
@@ -24,7 +25,6 @@
import tentacles.Meta.DSL_operators.octobot_process_operators.octobot_process_ops as octobot_process_ops
-
class DSLExecutor(AbstractActionExecutor):
def __init__(
self,
@@ -32,13 +32,16 @@ def __init__(
exchange_manager: typing.Optional[octobot_trading.exchanges.ExchangeManager],
dsl_script: typing.Optional[str],
dependencies: typing.Optional[octobot_commons.signals.SignalDependencies] = None,
+ executor_id: typing.Optional[str] = None,
):
super().__init__()
self._exchange_manager = exchange_manager
self._dependencies = dependencies
self._dependencies_config: dict = profile_data.to_profile("").config
self._interpreter_signals: octobot_commons.dsl_interpreter.OperatorSignals = None # type: ignore (reset when interpreter is created)
- self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(None)
+ self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(
+ None, executor_id
+ )
if dsl_script:
self._interpreter.prepare(dsl_script)
@@ -49,7 +52,15 @@ def _get_matrix_id(self) -> typing.Optional[str]:
def get_flow_operator_classes(
self,
+ executor_id: typing.Optional[str] = None,
) -> list[typing.Type[octobot_commons.dsl_interpreter.Operator]]:
+ resolved_executor_id = (
+ executor_id or octobot_flow.environment.get_executor_id()
+ )
+ if not resolved_executor_id:
+ raise octobot_flow.errors.MissingDSLExecutorDependencyError(
+ "executor_id is required for run_octobot_process"
+ )
return (
octobot_commons.dsl_interpreter.get_all_operators()
+ dsl_operators.create_ohlcv_operators(self._exchange_manager, None, None)
@@ -74,16 +85,19 @@ def get_flow_operator_classes(
copier_trading_mode=None,
)
+ octobot_process_ops.create_octobot_process_operators(
- self._interpreter_signals
+ self._interpreter_signals,
+ resolved_executor_id,
)
) # type: ignore (list[type[Operator]])
def _create_interpreter(
- self, previous_execution_result: typing.Optional[dict]
+ self,
+ previous_execution_result: typing.Optional[dict],
+ executor_id: typing.Optional[str] = None,
) -> octobot_commons.dsl_interpreter.Interpreter:
self._interpreter_signals = octobot_commons.dsl_interpreter.OperatorSignals()
return octobot_commons.dsl_interpreter.Interpreter(
- self.get_flow_operator_classes()
+ self.get_flow_operator_classes(executor_id)
)
def get_dependencies(self) -> list[
@@ -110,7 +124,8 @@ async def execute_action(
] = None,
) -> octobot_commons.dsl_interpreter.DSLCallResult:
self._interpreter = self._create_interpreter(
- action.previous_execution_result
+ action.previous_execution_result,
+ None,
)
expression = action.get_resolved_dsl_script()
try:
diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py
index 4b309ac27a..40e1d6085d 100644
--- a/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py
+++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py
@@ -1,10 +1,18 @@
# Drakkar-Software OctoBot
# Shared helpers/constants for octobot process functional tests (run_octobot_process, GridTradingMode).
+import asyncio
import copy
import decimal
+import json
+import os
+import pathlib
+import time
import typing
+import octobot.constants as octobot_constants
+import octobot_commons.configuration as configuration_module
+import octobot_commons.constants as commons_constants
import octobot_commons.dsl_interpreter as dsl_interpreter
import octobot_trading.constants as trading_constants
import octobot_trading.enums as trading_enums
@@ -12,6 +20,7 @@
import octobot_flow.jobs
import octobot_flow.entities
+import octobot_flow.environment
import octobot_flow.enums
import tests.functionnal_tests as functionnal_tests
import tests.functionnal_tests.tentacle_test_configs as tentacle_test_configs
@@ -200,6 +209,100 @@ def _get_action_by_id(
return None
+_FERNET_ENCRYPTED_PREFIX = "gAAAAA"
+
+
+# --- Child on-disk readiness (poll after init_state_ok; PID can be up before first dump / encrypt) ---
+
+
+def _process_bot_state_path(inner: dict) -> str:
+ return os.path.normpath(
+ os.path.join(
+ inner["user_root"],
+ octobot_constants.PROCESS_BOT_STATE_FILE_NAME,
+ )
+ )
+
+
+async def _wait_for_process_bot_state_file(
+ state_path: str,
+ *,
+ timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
+ poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
+) -> None:
+ """Poll until the child has written at least one process_bot_state.json dump."""
+ deadline = time.monotonic() + timeout_sec
+ while time.monotonic() < deadline:
+ if os.path.isfile(state_path):
+ return
+ await asyncio.sleep(poll_interval_sec)
+ pytest.fail(
+ f"Timed out waiting for process_bot_state.json at {state_path!r} within {timeout_sec}s"
+ )
+
+
+async def _assert_encrypted_exchange_credentials_in_user_config(
+ user_root: typing.Union[pathlib.Path, str],
+ exchange_internal_name: str,
+ expected_api_key: str,
+ expected_api_secret: str,
+ *,
+ timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
+ poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
+) -> None:
+ """
+ Poll child user-root config.json, then assert api key/secret are Fernet-encrypted and decrypt
+ to the expected plaintext.
+
+ run_octobot_process seeds plaintext credentials before spawn; the child re-saves config.json
+ on startup. init_state_ok can become true from PID liveness before encryption completes.
+ """
+ user_root_path = pathlib.Path(user_root)
+ config_path = user_root_path / commons_constants.CONFIG_FILE
+ deadline = time.monotonic() + timeout_sec
+ last_failure_reason = "config.json missing or exchange entry not ready"
+ while time.monotonic() < deadline:
+ if not config_path.is_file():
+ await asyncio.sleep(poll_interval_sec)
+ continue
+ root_cfg = json.loads(config_path.read_text(encoding="utf-8"))
+ exchanges_cfg = root_cfg.get(commons_constants.CONFIG_EXCHANGES) or {}
+ exchange_cfg = exchanges_cfg.get(exchange_internal_name)
+ if not isinstance(exchange_cfg, dict):
+ last_failure_reason = f"exchange {exchange_internal_name!r} missing from config"
+ await asyncio.sleep(poll_interval_sec)
+ continue
+ stored_api_key = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_KEY)
+ stored_api_secret = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_SECRET)
+ if not (
+ isinstance(stored_api_key, str)
+ and stored_api_key.startswith(_FERNET_ENCRYPTED_PREFIX)
+ and isinstance(stored_api_secret, str)
+ and stored_api_secret.startswith(_FERNET_ENCRYPTED_PREFIX)
+ ):
+ last_failure_reason = "api key/secret not yet Fernet-encrypted in config.json"
+ await asyncio.sleep(poll_interval_sec)
+ continue
+ try:
+ decrypted_api_key = configuration_module.decrypt(stored_api_key)
+ decrypted_api_secret = configuration_module.decrypt(stored_api_secret)
+ except Exception as decrypt_error:
+ last_failure_reason = f"decrypt failed: {decrypt_error}"
+ await asyncio.sleep(poll_interval_sec)
+ continue
+ assert decrypted_api_key == expected_api_key, (
+ f"decrypted api key mismatch for {exchange_internal_name!r}"
+ )
+ assert decrypted_api_secret == expected_api_secret, (
+ f"decrypted api secret mismatch for {exchange_internal_name!r}"
+ )
+ return
+ pytest.fail(
+ f"Timed out waiting for encrypted exchange credentials in {config_path!r} "
+ f"within {timeout_sec}s ({last_failure_reason})"
+ )
+
+
def _make_tracked_spawn_managed_with_forward_terminal_output(
real_spawn_managed: typing.Callable[..., typing.Any],
popen_calls: dict[str, int],
@@ -213,6 +316,12 @@ def _tracked(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
return _tracked
+@pytest.fixture(autouse=True)
+def register_functional_executor_id():
+ octobot_flow.environment.register_executor_id("func-test-executor")
+ yield
+
+
@pytest.fixture
def init_action():
# Automation apply_configuration: seed automation state to match expected exchange + portfolio.
diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py
index e69c73ff36..22b3147366 100644
--- a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py
+++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py
@@ -185,6 +185,10 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders(
initial_spawn_count = popen_calls["count"]
assert initial_spawn_count >= 1
+ # First process_bot_state dump can lag init_state_ok (see shared wait helper).
+ state_path = octobot_process_functional_shared._process_bot_state_path(inner)
+ await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path)
+
# 3) Wait until at least four open ladder orders exist, then assert a 2×2 grid pattern.
orders_deadline = time.monotonic() + octobot_process_functional_shared.GRID_ORDERS_TIMEOUT_SEC
exchange_account_snapshot: typing.Optional[
diff --git a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py
index 1f1a56be7a..87db26013c 100644
--- a/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py
+++ b/packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_start.py
@@ -12,7 +12,6 @@
import mock
import octobot.constants as octobot_app_constants
-import octobot_commons.configuration as configuration_module
import octobot_commons.constants as common_constants
import octobot_commons.dsl_interpreter as dsl_interpreter
import octobot_commons.process_util as process_util
@@ -154,13 +153,9 @@ async def test_run_octobot_process_lifecycle_grid_trading(
assert popen_calls["count"] >= 1
# --- process_bot_state path: must exist before poll (child wrote at least one dump) ---
- state_path = os.path.normpath(
- os.path.join(
- inner["user_root"],
- octobot_app_constants.PROCESS_BOT_STATE_FILE_NAME,
- )
- )
- assert os.path.isfile(state_path)
+ # First process_bot_state dump can lag init_state_ok (see shared wait helper).
+ state_path = octobot_process_functional_shared._process_bot_state_path(inner)
+ await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path)
# 1) Poll AutomationJob + dump() until merge yields ≥4 open orders (EAE from automation snapshot,
# not from parsing full process_bot_state on disk).
@@ -438,15 +433,14 @@ async def test_run_octobot_process_lifecycle_default_config_no_profile_data(
user_root = pathlib.Path(inner["user_root"])
assert inner.get("profile_id") == "non-trading"
- root_cfg = json.loads((user_root / common_constants.CONFIG_FILE).read_text(encoding="utf-8"))
- exchange_cfg = root_cfg[common_constants.CONFIG_EXCHANGES][
- octobot_process_functional_shared.EXCHANGE_BINANCEUS
- ]
exchange_auth_entry = exchange_auth[0]
- stored_api_key = exchange_cfg[common_constants.CONFIG_EXCHANGE_KEY]
- stored_api_secret = exchange_cfg[common_constants.CONFIG_EXCHANGE_SECRET]
- assert configuration_module.decrypt(stored_api_key) == exchange_auth_entry["api_key"]
- assert configuration_module.decrypt(stored_api_secret) == exchange_auth_entry["api_secret"]
+ # Child encrypts config.json after spawn; wait + assert in one helper (see shared module).
+ await octobot_process_functional_shared._assert_encrypted_exchange_credentials_in_user_config(
+ user_root,
+ octobot_process_functional_shared.EXCHANGE_BINANCEUS,
+ exchange_auth_entry["api_key"],
+ exchange_auth_entry["api_secret"],
+ )
profile_json_path = (
user_root
/ common_constants.PROFILES_FOLDER
@@ -455,13 +449,9 @@ async def test_run_octobot_process_lifecycle_default_config_no_profile_data(
)
assert profile_json_path.is_file()
- state_path = os.path.normpath(
- os.path.join(
- inner["user_root"],
- octobot_app_constants.PROCESS_BOT_STATE_FILE_NAME,
- )
- )
- assert os.path.isfile(state_path)
+ # First process_bot_state dump can lag init_state_ok (see shared wait helper).
+ state_path = octobot_process_functional_shared._process_bot_state_path(inner)
+ await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path)
with open(state_path, encoding="utf-8") as process_state_file:
file_metadata_payload = json.load(process_state_file)
process_metadata = process_bot_state_import.Metadata.from_dict(
diff --git a/packages/node/octobot_node/constants.py b/packages/node/octobot_node/constants.py
index f93305b4d8..793cfaa23c 100644
--- a/packages/node/octobot_node/constants.py
+++ b/packages/node/octobot_node/constants.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see .
import os
+import uuid
try:
import octobot.constants as octobot_constants
@@ -67,3 +68,5 @@
)
DEFAULT_PORTFOLIO_VALUATION_UNIT = "USDT"
+
+SCHEDULER_EXECUTOR_ID = str(uuid.uuid4()) # unique for each worker
diff --git a/packages/node/octobot_node/protocol/debug.py b/packages/node/octobot_node/protocol/debug.py
index ca05b7b317..7f8058df9b 100644
--- a/packages/node/octobot_node/protocol/debug.py
+++ b/packages/node/octobot_node/protocol/debug.py
@@ -14,73 +14,22 @@
# You should have received a copy of the GNU General Public License along
# with OctoBot. If not, see .
-import typing
-
-import octobot_commons.constants as commons_constants
-import octobot_flow.enums as flow_enums
import octobot_protocol.models as protocol_models
import octobot_sync.constants as sync_constants
import octobot_node.scheduler.api as scheduler_api
import octobot_node.protocol.accounts as accounts_protocol
import octobot_node.protocol.strategies as strategies_protocol
import octobot_node.protocol.accounts_trading as accounts_trading_protocol
+import octobot_node.protocol.util.privacy_filter as privacy_filter
-_AUTH_DETAIL_FIELD_NAMES = (
- "api_key",
- "api_secret",
- "api_password",
- "access_token",
- "encrypted",
-)
-
-
-def _auth_details_dict_has_credentials(auth_details: dict) -> bool:
- return bool(
- auth_details.get("api_key")
- or auth_details.get("api_secret")
- or auth_details.get("api_password")
- or auth_details.get("access_token")
- or auth_details.get("encrypted")
- )
-
-
-def _redact_auth_details_dict(auth_details: dict) -> dict:
- redacted_auth_details = dict(auth_details)
- for field_name in _AUTH_DETAIL_FIELD_NAMES:
- if redacted_auth_details.get(field_name):
- redacted_auth_details[field_name] = commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
- return redacted_auth_details
-
-
-def _redact_action_for_debug(action: protocol_models.Action) -> protocol_models.Action:
- if action.action_type != flow_enums.ActionType.APPLY_CONFIGURATION.value:
- return action
- configuration = action.configuration
- if configuration is None:
- return action
- exchange_account_details = configuration.get("exchange_account_details")
- if not isinstance(exchange_account_details, dict):
- return action
- auth_details = exchange_account_details.get("auth_details")
- if not isinstance(auth_details, dict) or not _auth_details_dict_has_credentials(auth_details):
- return action
- redacted_configuration = {
- **configuration,
- "exchange_account_details": {
- **exchange_account_details,
- "auth_details": _redact_auth_details_dict(auth_details),
- },
- }
- return action.model_copy(update={"configuration": redacted_configuration})
-
-
-def _redact_actions_for_debug(
- actions: typing.Optional[list[protocol_models.Action]],
-) -> typing.Optional[list[protocol_models.Action]]:
- if actions is None:
- return None
- return [_redact_action_for_debug(action) for action in actions]
+def _redact_user_actions_for_debug(
+ user_actions: list[protocol_models.UserAction],
+) -> list[protocol_models.UserAction]:
+ return [
+ privacy_filter.privatize_user_action(user_action)
+ for user_action in user_actions
+ ]
def _automation_state_for_debug(
@@ -88,8 +37,10 @@ def _automation_state_for_debug(
) -> protocol_models.AutomationState:
return automation.model_copy(
update={
- "actions": _redact_actions_for_debug(automation.actions),
- "priority_actions": _redact_actions_for_debug(automation.priority_actions),
+ "actions": privacy_filter.privatize_dag_actions(automation.actions),
+ "priority_actions": privacy_filter.privatize_dag_actions(
+ automation.priority_actions,
+ ),
}
)
@@ -117,7 +68,9 @@ async def get_debug_state(user_id: str) -> protocol_models.DebugState:
_automation_state_for_debug(automation)
for automation in await scheduler_api.get_automation_states(user_id)
]
- user_actions = await scheduler_api.list_user_actions(user_id, active_only=False)
+ user_actions = _redact_user_actions_for_debug(
+ await scheduler_api.list_user_actions(user_id, active_only=False),
+ )
account_state = accounts_protocol.get_accounts_state(user_id)
strategies_state = strategies_protocol.get_strategies_state(user_id)
bound_account_ids = _account_ids_bound_to_running_automations(automations)
diff --git a/packages/node/octobot_node/protocol/util/__init__.py b/packages/node/octobot_node/protocol/util/__init__.py
new file mode 100644
index 0000000000..0c7e4d5082
--- /dev/null
+++ b/packages/node/octobot_node/protocol/util/__init__.py
@@ -0,0 +1,15 @@
+# Drakkar-Software OctoBot-Node
+# Copyright (c) Drakkar-Software, All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 3.0 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library.
diff --git a/packages/node/octobot_node/protocol/util/privacy_filter.py b/packages/node/octobot_node/protocol/util/privacy_filter.py
new file mode 100644
index 0000000000..2cf70adbc4
--- /dev/null
+++ b/packages/node/octobot_node/protocol/util/privacy_filter.py
@@ -0,0 +1,151 @@
+# This file is part of OctoBot Node (https://github.com/Drakkar-Software/OctoBot-Node)
+# Copyright (c) 2025 Drakkar-Software, All rights reserved.
+#
+# OctoBot Node is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3.0 of the License, or (at
+# your option) any later version.
+#
+# OctoBot is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with OctoBot. If not, see .
+
+import typing
+
+import octobot_commons.constants as commons_constants
+import octobot_flow.enums as flow_enums
+import octobot_protocol.models as protocol_models
+
+
+_AUTH_DETAIL_FIELD_NAMES = (
+ "api_key",
+ "api_secret",
+ "api_password",
+ "access_token",
+ "encrypted",
+)
+
+_ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES = (
+ "api_key",
+ "api_secret",
+ "api_passphrase",
+ "public_key",
+ "private_key",
+ "seed_phrase",
+)
+
+
+def _auth_details_dict_has_credentials(auth_details: dict) -> bool:
+ return bool(
+ auth_details.get("api_key")
+ or auth_details.get("api_secret")
+ or auth_details.get("api_password")
+ or auth_details.get("access_token")
+ or auth_details.get("encrypted")
+ )
+
+
+def _redact_auth_details_dict(auth_details: dict) -> dict:
+ redacted_auth_details = dict(auth_details)
+ for field_name in _AUTH_DETAIL_FIELD_NAMES:
+ if redacted_auth_details.get(field_name):
+ redacted_auth_details[field_name] = commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ return redacted_auth_details
+
+
+def _privatize_dag_action(action: protocol_models.Action) -> protocol_models.Action:
+ if action.action_type != flow_enums.ActionType.APPLY_CONFIGURATION.value:
+ return action
+ configuration = action.configuration
+ if configuration is None:
+ return action
+ exchange_account_details = configuration.get("exchange_account_details")
+ if not isinstance(exchange_account_details, dict):
+ return action
+ auth_details = exchange_account_details.get("auth_details")
+ if not isinstance(auth_details, dict) or not _auth_details_dict_has_credentials(auth_details):
+ return action
+ redacted_configuration = {
+ **configuration,
+ "exchange_account_details": {
+ **exchange_account_details,
+ "auth_details": _redact_auth_details_dict(auth_details),
+ },
+ }
+ return action.model_copy(update={"configuration": redacted_configuration})
+
+
+def privatize_dag_actions(
+ actions: typing.Optional[list[protocol_models.Action]],
+) -> typing.Optional[list[protocol_models.Action]]:
+ if actions is None:
+ return None
+ return [_privatize_dag_action(action) for action in actions]
+
+
+def _account_authentication_has_credentials(
+ authentication: protocol_models.AccountAuthentication,
+) -> bool:
+ return any(
+ getattr(authentication, field_name)
+ for field_name in _ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES
+ )
+
+
+def _privatize_account_authentication(
+ authentication: protocol_models.AccountAuthentication,
+) -> protocol_models.AccountAuthentication:
+ if not _account_authentication_has_credentials(authentication):
+ return authentication
+ redacted_updates = {
+ field_name: commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ for field_name in _ACCOUNT_AUTHENTICATION_SENSITIVE_FIELD_NAMES
+ if getattr(authentication, field_name)
+ }
+ return authentication.model_copy(update=redacted_updates)
+
+
+def privatize_user_action(
+ user_action: protocol_models.UserAction,
+) -> protocol_models.UserAction:
+ configuration = user_action.configuration
+ if configuration is None or configuration.actual_instance is None:
+ return user_action
+ actual_configuration = configuration.actual_instance
+ if isinstance(actual_configuration, protocol_models.CreateAccountAuthConfiguration):
+ redacted_authentication = _privatize_account_authentication(
+ actual_configuration.configuration,
+ )
+ if redacted_authentication is actual_configuration.configuration:
+ return user_action
+ redacted_configuration = actual_configuration.model_copy(
+ update={"configuration": redacted_authentication},
+ )
+ return user_action.model_copy(
+ update={
+ "configuration": protocol_models.UserActionConfiguration(
+ redacted_configuration,
+ ),
+ },
+ )
+ if isinstance(actual_configuration, protocol_models.EditAccountAuthConfiguration):
+ redacted_authentication = _privatize_account_authentication(
+ actual_configuration.configuration,
+ )
+ if redacted_authentication is actual_configuration.configuration:
+ return user_action
+ redacted_configuration = actual_configuration.model_copy(
+ update={"configuration": redacted_authentication},
+ )
+ return user_action.model_copy(
+ update={
+ "configuration": protocol_models.UserActionConfiguration(
+ redacted_configuration,
+ ),
+ },
+ )
+ return user_action
diff --git a/packages/node/octobot_node/scheduler/octobot_flow_client.py b/packages/node/octobot_node/scheduler/octobot_flow_client.py
index 3cfd579438..bf9209de24 100644
--- a/packages/node/octobot_node/scheduler/octobot_flow_client.py
+++ b/packages/node/octobot_node/scheduler/octobot_flow_client.py
@@ -20,6 +20,7 @@
import octobot_commons.dataclasses
import octobot_commons.logging
+import octobot_node.constants
import octobot_node.errors as errors
import octobot_node.scheduler.workflows_util as workflows_util
@@ -33,6 +34,9 @@
# ensure environment is initialized
octobot_flow.environment.initialize_environment(True)
+ octobot_flow.environment.register_executor_id(
+ octobot_node.constants.SCHEDULER_EXECUTOR_ID
+ )
except ImportError:
pass # OctoBot Flow is not available
diff --git a/packages/node/octobot_node/scheduler/scheduler.py b/packages/node/octobot_node/scheduler/scheduler.py
index e4dcfbc795..e5359c3291 100644
--- a/packages/node/octobot_node/scheduler/scheduler.py
+++ b/packages/node/octobot_node/scheduler/scheduler.py
@@ -48,7 +48,8 @@
_BASE_CONFIG = dbos.DBOSConfig(
name=DEFAULT_NAME,
max_executor_threads=octobot_node.config.settings.SCHEDULER_MAX_EXECUTOR_THREADS,
- application_version=VERSION,
+ application_version=VERSION, # octobot version
+ # executor_id=..., # a constant executor_id is required for DBOS workflow recovery: leave its init to DBOS
)
diff --git a/packages/node/tests/protocol/test_debug.py b/packages/node/tests/protocol/test_debug.py
index 70a7eba4da..6dcb5b98d8 100644
--- a/packages/node/tests/protocol/test_debug.py
+++ b/packages/node/tests/protocol/test_debug.py
@@ -10,7 +10,6 @@
import mock
import pytest
-import octobot_commons.constants as commons_constants
import octobot_protocol.models as protocol_models
import octobot_sync.constants as sync_constants
@@ -92,6 +91,16 @@ async def test_assembles_debug_state_from_dependencies(self):
strategies=sample_strategies,
)
with (
+ mock.patch.object(
+ debug_module.privacy_filter,
+ "privatize_dag_actions",
+ wraps=debug_module.privacy_filter.privatize_dag_actions,
+ ) as privatize_dag_actions_mock,
+ mock.patch.object(
+ debug_module.privacy_filter,
+ "privatize_user_action",
+ wraps=debug_module.privacy_filter.privatize_user_action,
+ ) as privatize_user_action_mock,
mock.patch.object(
debug_module.scheduler_api,
"get_automation_states",
@@ -125,6 +134,10 @@ async def test_assembles_debug_state_from_dependencies(self):
_TEST_WALLET_ADDRESS,
["acc-bound"],
)
+ assert privatize_dag_actions_mock.call_count == 2
+ privatize_dag_actions_mock.assert_any_call(sample_automations[0].actions)
+ privatize_dag_actions_mock.assert_any_call(sample_automations[0].priority_actions)
+ privatize_user_action_mock.assert_called_once_with(sample_user_actions[0])
assert debug_state.version == sync_constants.DEBUG_STATE_VERSION
assert debug_state.debug is not None
assert debug_state.debug.automations == sample_automations
@@ -135,150 +148,7 @@ async def test_assembles_debug_state_from_dependencies(self):
assert debug_state.debug.account_tradings == sample_trading_summaries
@pytest.mark.asyncio
- async def test_redacts_auth_details_in_automations(self):
- automation_with_auth = protocol_models.AutomationState(
- id="auto-auth",
- status=protocol_models.WorkflowStatus.COMPLETED,
- metadata=protocol_models.AutomationMetadata(
- name="auto",
- description="",
- ),
- actions=[
- protocol_models.Action(
- id="action-1",
- action_type="apply_configuration",
- status=protocol_models.WorkflowStatus.COMPLETED,
- configuration={
- "exchange_account_details": {
- "auth_details": {
- "api_key": "secret-key",
- "api_secret": "secret-secret",
- "api_password": "secret-pass",
- },
- },
- },
- ),
- ],
- )
- accounts_state = protocol_models.AccountsState(
- version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION,
- accounts=[],
- exchange_configs=[],
- )
- strategies_state = protocol_models.StrategiesState(
- version=sync_constants.USER_STRATEGIES_STATE_VERSION,
- strategies=[],
- )
- with (
- mock.patch.object(
- debug_module.scheduler_api,
- "get_automation_states",
- mock.AsyncMock(return_value=[automation_with_auth]),
- ),
- mock.patch.object(
- debug_module.scheduler_api,
- "list_user_actions",
- mock.AsyncMock(return_value=[]),
- ),
- mock.patch.object(
- debug_module.accounts_protocol,
- "get_accounts_state",
- return_value=accounts_state,
- ),
- mock.patch.object(
- debug_module.strategies_protocol,
- "get_strategies_state",
- return_value=strategies_state,
- ),
- mock.patch.object(
- debug_module.accounts_trading_protocol,
- "get_account_trading_summaries",
- return_value=[],
- ),
- ):
- debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS)
- assert debug_state.debug is not None
- assert debug_state.debug.automations is not None
- auth_details = debug_state.debug.automations[0].actions[0].configuration[
- "exchange_account_details"
- ]["auth_details"]
- assert auth_details["api_key"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
- assert auth_details["api_secret"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
- assert auth_details["api_password"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
-
- @pytest.mark.asyncio
- async def test_leaves_non_config_actions_unchanged(self):
- dsl_action = protocol_models.Action(
- id="dsl-1",
- action_type="dsl_script",
- status=protocol_models.WorkflowStatus.COMPLETED,
- dsl='run_octobot_process("acc", {}, [{"api_key": "leak"}])',
- )
- automation = protocol_models.AutomationState(
- id="auto-dsl",
- status=protocol_models.WorkflowStatus.COMPLETED,
- metadata=protocol_models.AutomationMetadata(name="auto", description=""),
- actions=[dsl_action],
- )
- accounts_state = protocol_models.AccountsState(
- version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION,
- accounts=[],
- exchange_configs=[],
- )
- strategies_state = protocol_models.StrategiesState(
- version=sync_constants.USER_STRATEGIES_STATE_VERSION,
- strategies=[],
- )
- with (
- mock.patch.object(
- debug_module.scheduler_api,
- "get_automation_states",
- mock.AsyncMock(return_value=[automation]),
- ),
- mock.patch.object(
- debug_module.scheduler_api,
- "list_user_actions",
- mock.AsyncMock(return_value=[]),
- ),
- mock.patch.object(
- debug_module.accounts_protocol,
- "get_accounts_state",
- return_value=accounts_state,
- ),
- mock.patch.object(
- debug_module.strategies_protocol,
- "get_strategies_state",
- return_value=strategies_state,
- ),
- mock.patch.object(
- debug_module.accounts_trading_protocol,
- "get_account_trading_summaries",
- return_value=[],
- ),
- ):
- debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS)
- returned_action = debug_state.debug.automations[0].actions[0]
- assert returned_action is dsl_action
- assert returned_action.dsl == dsl_action.dsl
-
- @pytest.mark.asyncio
- async def test_leaves_apply_configuration_without_auth_unchanged(self):
- config_action = protocol_models.Action(
- id="action-1",
- action_type="apply_configuration",
- status=protocol_models.WorkflowStatus.COMPLETED,
- configuration={
- "exchange_account_details": {
- "auth_details": {},
- },
- },
- )
- automation = protocol_models.AutomationState(
- id="auto-sim",
- status=protocol_models.WorkflowStatus.COMPLETED,
- metadata=protocol_models.AutomationMetadata(name="auto", description=""),
- actions=[config_action],
- )
+ async def test_empty_collections_when_dependencies_return_empty(self):
accounts_state = protocol_models.AccountsState(
version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION,
accounts=[],
@@ -290,48 +160,15 @@ async def test_leaves_apply_configuration_without_auth_unchanged(self):
)
with (
mock.patch.object(
- debug_module.scheduler_api,
- "get_automation_states",
- mock.AsyncMock(return_value=[automation]),
- ),
- mock.patch.object(
- debug_module.scheduler_api,
- "list_user_actions",
- mock.AsyncMock(return_value=[]),
- ),
- mock.patch.object(
- debug_module.accounts_protocol,
- "get_accounts_state",
- return_value=accounts_state,
- ),
+ debug_module.privacy_filter,
+ "privatize_dag_actions",
+ wraps=debug_module.privacy_filter.privatize_dag_actions,
+ ) as privatize_dag_actions_mock,
mock.patch.object(
- debug_module.strategies_protocol,
- "get_strategies_state",
- return_value=strategies_state,
- ),
- mock.patch.object(
- debug_module.accounts_trading_protocol,
- "get_account_trading_summaries",
- return_value=[],
- ),
- ):
- debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS)
- returned_action = debug_state.debug.automations[0].actions[0]
- assert returned_action is config_action
- assert returned_action.configuration == config_action.configuration
-
- @pytest.mark.asyncio
- async def test_empty_collections_when_dependencies_return_empty(self):
- accounts_state = protocol_models.AccountsState(
- version=sync_constants.EXCHANGE_ACCOUNTS_STATE_VERSION,
- accounts=[],
- exchange_configs=[],
- )
- strategies_state = protocol_models.StrategiesState(
- version=sync_constants.USER_STRATEGIES_STATE_VERSION,
- strategies=[],
- )
- with (
+ debug_module.privacy_filter,
+ "privatize_user_action",
+ wraps=debug_module.privacy_filter.privatize_user_action,
+ ) as privatize_user_action_mock,
mock.patch.object(
debug_module.scheduler_api,
"get_automation_states",
@@ -360,6 +197,8 @@ async def test_empty_collections_when_dependencies_return_empty(self):
):
debug_state = await debug_module.get_debug_state(_TEST_WALLET_ADDRESS)
get_account_trading_summaries_mock.assert_called_once_with(_TEST_WALLET_ADDRESS, [])
+ privatize_dag_actions_mock.assert_not_called()
+ privatize_user_action_mock.assert_not_called()
assert debug_state.debug is not None
assert debug_state.debug.automations == []
assert debug_state.debug.user_actions == []
@@ -386,6 +225,16 @@ async def test_ignores_exchange_account_ids_when_automation_not_running(self):
strategies=[],
)
with (
+ mock.patch.object(
+ debug_module.privacy_filter,
+ "privatize_dag_actions",
+ wraps=debug_module.privacy_filter.privatize_dag_actions,
+ ) as privatize_dag_actions_mock,
+ mock.patch.object(
+ debug_module.privacy_filter,
+ "privatize_user_action",
+ wraps=debug_module.privacy_filter.privatize_user_action,
+ ) as privatize_user_action_mock,
mock.patch.object(
debug_module.scheduler_api,
"get_automation_states",
@@ -414,3 +263,7 @@ async def test_ignores_exchange_account_ids_when_automation_not_running(self):
):
await debug_module.get_debug_state(_TEST_WALLET_ADDRESS)
get_account_trading_summaries_mock.assert_called_once_with(_TEST_WALLET_ADDRESS, [])
+ assert privatize_dag_actions_mock.call_count == 2
+ privatize_dag_actions_mock.assert_any_call(completed_automation.actions)
+ privatize_dag_actions_mock.assert_any_call(completed_automation.priority_actions)
+ privatize_user_action_mock.assert_not_called()
diff --git a/packages/node/tests/protocol/util/test_privacy_filter.py b/packages/node/tests/protocol/util/test_privacy_filter.py
new file mode 100644
index 0000000000..b0ca10b134
--- /dev/null
+++ b/packages/node/tests/protocol/util/test_privacy_filter.py
@@ -0,0 +1,136 @@
+# Drakkar-Software OctoBot-Node
+# Copyright (c) 2025 Drakkar-Software, All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 3.0 of the License, or (at your option) any later version.
+
+import octobot_commons.constants as commons_constants
+import octobot_protocol.models as protocol_models
+
+import octobot_node.protocol.util.privacy_filter as privacy_filter
+
+
+class TestPrivatizeDagActions:
+ """Checks :func:`octobot_node.protocol.util.privacy_filter.privatize_dag_actions`."""
+
+ def test_redacts_auth_details_in_apply_configuration_action(self):
+ action = protocol_models.Action(
+ id="action-1",
+ action_type="apply_configuration",
+ status=protocol_models.WorkflowStatus.COMPLETED,
+ configuration={
+ "exchange_account_details": {
+ "auth_details": {
+ "api_key": "secret-key",
+ "api_secret": "secret-secret",
+ "api_password": "secret-pass",
+ },
+ },
+ },
+ )
+ privatized_actions = privacy_filter.privatize_dag_actions([action])
+ auth_details = privatized_actions[0].configuration[
+ "exchange_account_details"
+ ]["auth_details"]
+ assert auth_details["api_key"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert auth_details["api_secret"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert auth_details["api_password"] == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+
+ def test_leaves_non_config_actions_unchanged(self):
+ dsl_action = protocol_models.Action(
+ id="dsl-1",
+ action_type="dsl_script",
+ status=protocol_models.WorkflowStatus.COMPLETED,
+ dsl='run_octobot_process("acc", {}, [{"api_key": "leak"}])',
+ )
+ privatized_actions = privacy_filter.privatize_dag_actions([dsl_action])
+ returned_action = privatized_actions[0]
+ assert returned_action is dsl_action
+ assert returned_action.dsl == dsl_action.dsl
+
+ def test_leaves_apply_configuration_without_auth_unchanged(self):
+ config_action = protocol_models.Action(
+ id="action-1",
+ action_type="apply_configuration",
+ status=protocol_models.WorkflowStatus.COMPLETED,
+ configuration={
+ "exchange_account_details": {
+ "auth_details": {},
+ },
+ },
+ )
+ privatized_actions = privacy_filter.privatize_dag_actions([config_action])
+ returned_action = privatized_actions[0]
+ assert returned_action is config_action
+ assert returned_action.configuration == config_action.configuration
+
+
+class TestPrivatizeUserAction:
+ """Checks :func:`octobot_node.protocol.util.privacy_filter.privatize_user_action`."""
+
+ def test_redacts_create_account_auth_configuration(self):
+ user_action = protocol_models.UserAction(
+ id="ua-create-auth",
+ configuration=protocol_models.UserActionConfiguration(
+ protocol_models.CreateAccountAuthConfiguration(
+ action_type=protocol_models.UserActionType.ACCOUNT_AUTH_CREATE,
+ configuration=protocol_models.AccountAuthentication(
+ id="auth-1",
+ api_key="secret-key",
+ api_secret="secret-secret",
+ api_passphrase="secret-pass",
+ public_key="secret-public",
+ private_key="secret-private",
+ seed_phrase="secret-seed",
+ ),
+ ),
+ ),
+ )
+ privatized_user_action = privacy_filter.privatize_user_action(user_action)
+ authentication = (
+ privatized_user_action.configuration.actual_instance.configuration
+ )
+ assert authentication.api_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.api_secret == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.api_passphrase == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.public_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.private_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.seed_phrase == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.id == "auth-1"
+
+ def test_redacts_edit_account_auth_configuration(self):
+ user_action = protocol_models.UserAction(
+ id="ua-edit-auth",
+ configuration=protocol_models.UserActionConfiguration(
+ protocol_models.EditAccountAuthConfiguration(
+ action_type=protocol_models.UserActionType.ACCOUNT_AUTH_EDIT,
+ id="auth-1",
+ configuration=protocol_models.AccountAuthentication(
+ id="auth-1",
+ api_key="secret-key",
+ api_secret="secret-secret",
+ ),
+ ),
+ ),
+ )
+ privatized_user_action = privacy_filter.privatize_user_action(user_action)
+ authentication = (
+ privatized_user_action.configuration.actual_instance.configuration
+ )
+ assert authentication.api_key == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.api_secret == commons_constants.PRIVATE_MESSAGE_PLACEHOLDER
+ assert authentication.id == "auth-1"
+
+ def test_leaves_non_account_auth_user_actions_unchanged(self):
+ stop_user_action = protocol_models.UserAction(
+ id="ua-stop",
+ configuration=protocol_models.UserActionConfiguration(
+ protocol_models.StopAutomationConfiguration(
+ action_type=protocol_models.UserActionType.AUTOMATION_STOP,
+ id="auto-1",
+ ),
+ ),
+ )
+ privatized_user_action = privacy_filter.privatize_user_action(stop_user_action)
+ assert privatized_user_action is stop_user_action
diff --git a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py
index d24f384fa0..bf39a5dfe2 100644
--- a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py
+++ b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/octobot_process_ops.py
@@ -31,6 +31,7 @@
import octobot_commons.json_util as json_util
import octobot_commons.logging as commons_logging
import octobot_commons.os_util as os_util
+import octobot_commons.process_util as process_util
import octobot_commons.profiles.profile_data as profile_data_module
import octobot_commons.profiles.profile_data_import as profile_data_import
import octobot_commons.profiles.exchange_auth_data as exchange_auth_data_module
@@ -53,6 +54,14 @@
DEFAULT_DSL_PROFILE_ID = "non-trading"
+# run_octobot_process uses two state layers:
+# - Recall state (`EnsureOctobotProcessState` in DSL `last_execution_result`): master-side
+# snapshot (ports, paths, stored pid, init_state_ok, executor_id). Persisted across
+# re-calls until STOP, UPDATE_CONFIG, or respawn.
+# - Child dump (`process_bot_state.json` → `ProcessBotState`): written by the child; used for
+# timestamp-fresh checks and metadata.pid when the stored recall pid is stale.
+# executor_id ties recall to the current DBOS scheduler worker. On mismatch
+# with all child PIDs dead, respawn is forced immediately (grace is bypassed).
class EnsureOctobotProcessState(pydantic.BaseModel):
model_config = pydantic.ConfigDict(validate_assignment=True, extra="ignore")
http_base_url: str
@@ -62,12 +71,15 @@ class EnsureOctobotProcessState(pydantic.BaseModel):
user_folder: str
log_folder: str
profile_id: str | None
+ # Last known child PID on the master; may lag after a child self-restart until adoption.
pid: int
state_file_path: str = ""
- # Omitted in ensure success `self.value` (stop command); 0.0 is unused there.
+ # Wall-clock when the first spawn began; used only while init_state_ok is False (ping_timeout).
started_waiting_at: float = 0.0
- # Set after process_bot_state.json liveness passes; disables the init `ping_timeout` cap (re-calls only use `waiting_time`).
+ # True once the child reached confirmed-alive; switches from init ping_timeout to recall/grace rules.
init_state_ok: bool = False
+ # Required scheduler executor id at emit time; compared on recall to detect worker restart.
+ executor_id: str
# Keys on `last_result` that `create_re_callable_result_dict` takes as top-level args (not state).
@@ -92,7 +104,11 @@ def _resolve_state_file_path(recall_state: EnsureOctobotProcessState) -> str:
)
+# --- Liveness and routing (recall state + child dump) ---
+
+
def _is_process_state_alive(state: process_bot_state_import.ProcessBotState) -> bool:
+ """True when the child dump is timestamp-fresh (~2 × dump interval since metadata.updated_at)."""
interval = octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS
epsilon = max(0.1 * interval, 1e-6)
now = time.time()
@@ -106,6 +122,7 @@ def _is_process_state_alive(state: process_bot_state_import.ProcessBotState) ->
async def _load_process_bot_state(
state_file_path: str,
) -> typing.Optional[process_bot_state_import.ProcessBotState]:
+ """Load child dump from disk; None if missing or unreadable."""
try:
async with aiofiles.open(state_file_path, mode="r", encoding="utf-8") as state_file:
raw = await state_file.read()
@@ -115,7 +132,17 @@ async def _load_process_bot_state(
return None
+def _is_state_timestamp_fresh(
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+) -> bool:
+ """Alias for timestamp-fresh child dump (see _is_process_state_alive)."""
+ if loaded_state is None:
+ return False
+ return _is_process_state_alive(loaded_state)
+
+
def _parse_ensure_recall_state(raw: dict) -> typing.Optional[EnsureOctobotProcessState]:
+ """Parse recall payload; empty or invalid dict → None."""
if not raw:
return None
try:
@@ -124,6 +151,129 @@ def _parse_ensure_recall_state(raw: dict) -> typing.Optional[EnsureOctobotProces
return None
+def _metadata_pid_is_running(
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+) -> bool:
+ """True when child dump metadata.pid is valid and still running in the OS."""
+ if loaded_state is None:
+ return False
+ state_pid = loaded_state.metadata.pid
+ if state_pid <= 0:
+ return False
+ return process_util.pid_is_running(state_pid)
+
+
+def _is_child_confirmed_alive(
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+) -> bool:
+ """Strongest child-alive signal: timestamp-fresh dump and metadata.pid running."""
+ if loaded_state is None:
+ return False
+ if not _is_state_timestamp_fresh(loaded_state):
+ return False
+ return _metadata_pid_is_running(loaded_state)
+
+
+def _any_child_pid_running(
+ recall_state: EnsureOctobotProcessState,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+) -> bool:
+ """True when either recall pid or child dump metadata.pid is running."""
+ if _stored_pid_is_running(recall_state):
+ return True
+ return _metadata_pid_is_running(loaded_state)
+
+
+def _executor_restarted_requires_respawn(
+ recall_state: EnsureOctobotProcessState,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+ *,
+ current_executor_id: str,
+) -> bool:
+ """Scheduler worker restarted: recall executor_id differs and no child PID is running."""
+ if _any_child_pid_running(recall_state, loaded_state):
+ return False
+ return recall_state.executor_id != current_executor_id
+
+
+def _stored_pid_is_running(recall_state: EnsureOctobotProcessState) -> bool:
+ """Fast path: recall pid still running."""
+ if recall_state.pid <= 0:
+ return False
+ return process_util.pid_is_running(recall_state.pid)
+
+
+def _in_restart_grace_period(
+ recall_state: EnsureOctobotProcessState,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+ *,
+ now: float,
+ ping_timeout: float,
+ stored_pid_running: bool,
+) -> bool:
+ """Child self-restart window: init done, all PIDs dead, recent dump within ping_timeout."""
+ if not recall_state.init_state_ok or stored_pid_running:
+ return False
+ if loaded_state is None:
+ return False
+ last_updated_at = loaded_state.metadata.updated_at
+ if last_updated_at <= 0:
+ return False
+ return (now - last_updated_at) < ping_timeout
+
+
+def _should_use_recall_path(
+ recall_state: EnsureOctobotProcessState,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+ *,
+ stored_pid_running: bool,
+ now: float,
+ ping_timeout: float,
+) -> bool:
+ """Recall vs respawn: stored PID → confirmed alive → init → grace → else first_spawn."""
+ if stored_pid_running:
+ return True
+ if _is_child_confirmed_alive(loaded_state):
+ return True
+ if not recall_state.init_state_ok:
+ return True
+ return _in_restart_grace_period(
+ recall_state,
+ loaded_state,
+ now=now,
+ ping_timeout=ping_timeout,
+ stored_pid_running=stored_pid_running,
+ )
+
+
+def _resolve_bound_pid(
+ recall_state: EnsureOctobotProcessState,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState],
+) -> typing.Optional[int]:
+ """Bind operating PID from recall or fresh dump; None if metadata.pid dead (no raise)."""
+ if _stored_pid_is_running(recall_state):
+ return recall_state.pid
+ if loaded_state is None or not _is_state_timestamp_fresh(loaded_state):
+ return None
+ state_pid = loaded_state.metadata.pid
+ if state_pid <= 0:
+ raise commons_errors.DSLInterpreterError(
+ "process_bot_state.json is live but metadata.pid is missing or invalid."
+ )
+ if process_util.pid_is_running(state_pid):
+ return state_pid
+ return None
+
+
+def _apply_resolved_pid_to_state(
+ recall_state: EnsureOctobotProcessState,
+ resolved_pid: typing.Optional[int],
+) -> EnsureOctobotProcessState:
+ if resolved_pid is None or resolved_pid == recall_state.pid:
+ return recall_state
+ return recall_state.model_copy(update={"pid": resolved_pid})
+
+
def _remove_path_for_fresh_start(path: str, *, logger: typing.Any) -> None:
if not path or not str(path).strip():
logger.info("configuration update: skip remove (empty path)")
@@ -484,7 +634,8 @@ def _listen_port_pair_with_shared_scan_offset(
def create_octobot_process_operators(
- signals: typing.Optional[dsl_interpreter.OperatorSignals] = None
+ signals: typing.Optional[dsl_interpreter.OperatorSignals] = None,
+ executor_id: str = "",
) -> list[type[dsl_interpreter.Operator]]:
# Child process: user layout, ports, process_bot_state.json liveness (re-callable).
class EnsureOctobotProcessOperator(
@@ -510,6 +661,13 @@ def __init__(self, *args, **kwargs):
dsl_interpreter.ProcessBoundOperatorMixin.__init__(self)
dsl_interpreter.SignalableOperatorMixin.__init__(self, signals)
+ def _read_executor_id(self) -> str:
+ if not executor_id:
+ raise commons_errors.DSLInterpreterError(
+ "executor_id is required for run_octobot_process"
+ )
+ return executor_id
+
@staticmethod
def get_library() -> str:
return commons_constants.CONTEXTUAL_OPERATORS_LIBRARY
@@ -629,7 +787,10 @@ def _re_calling_result_dispatches_this_ensure(
inner = rec.get("last_execution_result")
if not isinstance(inner, dict):
return False
- return _parse_ensure_recall_state(inner) is not None
+ try:
+ return _parse_ensure_recall_state(inner) is not None
+ except commons_errors.DSLInterpreterError:
+ return False
@classmethod
def should_dispatch_operator_signal_for_result(
@@ -683,26 +844,58 @@ async def _pre_compute_recall_path(
start_time: float,
recall_interval: float,
ping_timeout: float,
+ loaded_state: typing.Optional[process_bot_state_import.ProcessBotState] = None,
) -> None:
state_path = _resolve_state_file_path(recall_state)
+ now = time.time()
+ if loaded_state is None:
+ loaded_state = await _load_process_bot_state(state_path)
+ child_confirmed_alive = _is_child_confirmed_alive(loaded_state)
+ stored_pid_running = _stored_pid_is_running(recall_state)
# Init window: fail and kill the child if the state file never became live in time.
if (
not recall_state.init_state_ok
- and time.time() - recall_state.started_waiting_at > ping_timeout
+ and now - recall_state.started_waiting_at > ping_timeout
):
+ resolved_pid = _resolve_bound_pid(recall_state, loaded_state)
+ if resolved_pid is not None:
+ self.pid = resolved_pid
self.value = self.request_graceful_stop(logger=_get_logger())
raise commons_errors.DSLInterpreterError(
"Timed out waiting for OctoBot process_bot_state.json during init (see ping_timeout).",
)
+ if _in_restart_grace_period(
+ recall_state,
+ loaded_state,
+ now=now,
+ ping_timeout=ping_timeout,
+ stored_pid_running=stored_pid_running,
+ ):
+ _get_logger().info(
+ "restart grace: waiting for child state dump (last_updated_at=%s, ping_timeout=%s)",
+ loaded_state.metadata.updated_at if loaded_state is not None else None,
+ ping_timeout,
+ )
+ resolved_pid = _resolve_bound_pid(recall_state, loaded_state)
+ if resolved_pid is not None:
+ if resolved_pid != recall_state.pid:
+ _get_logger().info(
+ "adopted pid=%s (was %s) from process_bot_state",
+ resolved_pid,
+ recall_state.pid,
+ )
+ self.pid = resolved_pid
+ recall_state = _apply_resolved_pid_to_state(recall_state, resolved_pid)
_get_logger().info("process state path (re-call path): %s", state_path)
- loaded = await _load_process_bot_state(state_path)
- is_live = loaded is not None and _is_process_state_alive(loaded)
- if is_live:
+ # Running: stored recall pid or child-confirmed-alive → init_state_ok, optional EAE.
+ is_running = stored_pid_running or child_confirmed_alive
+ if is_running:
+ logged_pid = resolved_pid if resolved_pid is not None else recall_state.pid
_get_logger().info(
"OctoBot is running (re-call path): user_folder=%r base_url=%r pid=%s",
recall_state.user_folder,
recall_state.http_base_url,
- recall_state.pid,
+ logged_pid,
)
updated = recall_state.model_copy(
update={"init_state_ok": True, "state_file_path": state_path}
@@ -712,15 +905,17 @@ async def _pre_compute_recall_path(
last_result=last_result,
start_time=start_time,
recall_interval=recall_interval,
- parsed_process_bot_state=loaded,
+ parsed_process_bot_state=loaded_state,
)
return
+ # Still starting: not confirmed alive (grace or waiting for first dump); re-call only.
+ logged_pid = resolved_pid if resolved_pid is not None else recall_state.pid
_get_logger().info(
- "OctoBot is still starting (re-call path, process state not live): user_folder=%r "
+ "OctoBot is still starting (re-call path, child not confirmed alive): user_folder=%r "
"base_url=%r pid=%s state_path=%s",
recall_state.user_folder,
recall_state.http_base_url,
- recall_state.pid,
+ logged_pid,
state_path,
)
self._emit_ensure_recall(
@@ -728,7 +923,7 @@ async def _pre_compute_recall_path(
last_result=last_result,
start_time=start_time,
recall_interval=recall_interval,
- parsed_process_bot_state=loaded,
+ parsed_process_bot_state=loaded_state,
)
async def _pre_compute_first_spawn(
@@ -809,28 +1004,39 @@ async def _pre_compute_first_spawn(
pid=self.pid or 0,
state_file_path=state_file_path,
started_waiting_at=start_time,
+ executor_id=self._read_executor_id(),
)
# First process state check after spawn (init cap still uses `state.started_waiting_at`).
loaded = await _load_process_bot_state(state_file_path)
is_live = loaded is not None and _is_process_state_alive(loaded)
if is_live:
- _get_logger().info(
- "OctoBot is running (first-spawn path): user_folder=%r base_url=%r pid=%s",
- user_folder,
- http_base_url,
- self.pid,
- )
- ready = state.model_copy(update={"init_state_ok": True})
- self._emit_ensure_recall(
- state=ready,
- last_result=last_result,
- start_time=start_time,
- recall_interval=recall_interval,
- parsed_process_bot_state=loaded,
- )
- return
+ state_pid = loaded.metadata.pid
+ if state_pid <= 0:
+ raise commons_errors.DSLInterpreterError(
+ "process_bot_state.json is live but metadata.pid is missing or invalid."
+ )
+ if process_util.pid_is_running(state_pid):
+ self.pid = state_pid
+ state = state.model_copy(update={"pid": state_pid})
+ _get_logger().info(
+ "OctoBot is running (first-spawn path): user_folder=%r base_url=%r pid=%s",
+ user_folder,
+ http_base_url,
+ state_pid,
+ )
+ ready = state.model_copy(update={"init_state_ok": True})
+ self._emit_ensure_recall(
+ state=ready,
+ last_result=last_result,
+ start_time=start_time,
+ recall_interval=recall_interval,
+ parsed_process_bot_state=loaded,
+ )
+ return
+ # Orphaned timestamp-fresh dump with dead metadata.pid (e.g. after master restart):
+ # treat as still starting, not an error.
_get_logger().info(
- "OctoBot is still starting (first-spawn path, process state not live): user_folder=%r base_url=%r "
+ "OctoBot is still starting (first-spawn path, child not confirmed alive): user_folder=%r base_url=%r "
"pid=%s state_path=%s",
user_folder,
http_base_url,
@@ -864,17 +1070,25 @@ async def _pre_compute_update_config_refresh(
"run_octobot_process call.",
)
process_logger = _get_logger()
+ state_path = _resolve_state_file_path(recall_state)
+ loaded_state = await _load_process_bot_state(state_path)
+ resolved_pid = _resolve_bound_pid(recall_state, loaded_state)
+ if resolved_pid is None:
+ raise commons_errors.DSLInterpreterError(
+ "run_octobot_process(UPDATE_CONFIG) cannot resolve a running child pid to stop."
+ )
+ self.pid = resolved_pid
process_logger.info(
"configuration update: begin refresh user_folder=%r user_root=%r log_folder=%r pid=%s",
user_folder,
recall_state.user_root,
recall_state.log_folder,
- recall_state.pid,
+ resolved_pid,
)
stop_outcome = self.request_graceful_stop(logger=process_logger)
process_logger.info("configuration update: graceful stop outcome: %s", stop_outcome)
await self.wait_until_pid_stopped(
- recall_state.pid,
+ resolved_pid,
logger=process_logger,
timeout_seconds=ping_timeout,
)
@@ -902,10 +1116,26 @@ async def pre_compute(self) -> None:
raise commons_errors.DSLInterpreterError(
"run_octobot_process(execution_stop) requires last_execution_result from a prior run_octobot_process call.",
)
- if not self.is_process_running():
- self.value = {"status": "already_stopped", "reason": "not_running"}
+ state_path = _resolve_state_file_path(recall_state)
+ loaded_state = await _load_process_bot_state(state_path)
+ stored_pid_running = _stored_pid_is_running(recall_state)
+ resolved_pid = _resolve_bound_pid(recall_state, loaded_state)
+ if resolved_pid is not None:
+ self.pid = resolved_pid
+ self.value = self.request_graceful_stop(logger=_get_logger())
return
- self.value = self.request_graceful_stop(logger=_get_logger())
+ # Grace with dead metadata pid: child restarting; no SIGTERM, report already_stopped.
+ if _in_restart_grace_period(
+ recall_state,
+ loaded_state,
+ now=time.time(),
+ ping_timeout=float(params.get("ping_timeout") or DEFAULT_ENSURE_TIMEOUT),
+ stored_pid_running=stored_pid_running,
+ ):
+ _get_logger().info(
+ "run_octobot_process(STOP): child in restart grace; treating as already_stopped"
+ )
+ self.value = {"status": "already_stopped", "reason": "not_running"}
return
working_directory = os.path.normpath(os.getcwd())
user_folder = params["user_folder"]
@@ -928,15 +1158,67 @@ async def pre_compute(self) -> None:
)
return
recall_state = self._try_parse_ensure_recall_state(last_result)
- if recall_state is not None and self.is_process_running():
- await self._pre_compute_recall_path(
+ if recall_state is not None:
+ # 1. Load child dump alongside strict recall state.
+ state_path = _resolve_state_file_path(recall_state)
+ loaded_state = await _load_process_bot_state(state_path)
+ stored_pid_running = _stored_pid_is_running(recall_state)
+ state_timestamp_fresh = _is_state_timestamp_fresh(loaded_state)
+ # 2. Master restart → first_spawn immediately (grace bypassed).
+ if _executor_restarted_requires_respawn(
recall_state,
- last_result,
- start_time=start_time,
- recall_interval=recall_interval,
+ loaded_state,
+ current_executor_id=self._read_executor_id(),
+ ):
+ _get_logger().info(
+ "scheduler worker restarted; forcing child respawn for user_folder=%r "
+ "(recall executor_id=%r, current=%r)",
+ recall_state.user_folder,
+ recall_state.executor_id,
+ self._read_executor_id(),
+ )
+ await self._pre_compute_first_spawn(
+ user_folder,
+ working_directory,
+ params,
+ last_result,
+ start_time=start_time,
+ recall_interval=recall_interval,
+ )
+ return
+ # 3. Recall vs respawn from liveness/grace rules.
+ if _should_use_recall_path(
+ recall_state,
+ loaded_state,
+ stored_pid_running=stored_pid_running,
+ now=start_time,
ping_timeout=ping_timeout,
+ ):
+ await self._pre_compute_recall_path(
+ recall_state,
+ last_result,
+ start_time=start_time,
+ recall_interval=recall_interval,
+ ping_timeout=ping_timeout,
+ loaded_state=loaded_state,
+ )
+ return
+ # 4. Recall declined (e.g. grace expired) → log and first_spawn below.
+ last_state_updated_at = (
+ loaded_state.metadata.updated_at if loaded_state is not None else None
+ )
+ _get_logger().info(
+ "run_octobot_process: respawning child (recall path declined): user_folder=%r "
+ "stored_pid=%s stored_pid_running=%s state_timestamp_fresh=%s init_state_ok=%s "
+ "last_state_updated_at=%s ping_timeout=%s",
+ recall_state.user_folder,
+ recall_state.pid,
+ stored_pid_running,
+ state_timestamp_fresh,
+ recall_state.init_state_ok,
+ last_state_updated_at,
+ ping_timeout,
)
- return
await self._pre_compute_first_spawn(
user_folder,
working_directory,
diff --git a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py
index 88c9aaa693..b07143d73d 100644
--- a/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py
+++ b/packages/tentacles/Meta/DSL_operators/octobot_process_operators/tests/test_octobot_process_ops.py
@@ -46,7 +46,10 @@
import tentacles.Trading.Mode.simple_market_making_trading_mode.simple_market_making_trading as simple_market_making_trading
# Nested class from factory (not exposed on ``octobot_process_ops``).
-EnsureOctobotProcessOperator = octobot_process_ops.create_octobot_process_operators(None)[0]
+TEST_EXECUTOR_ID = "test-executor"
+EnsureOctobotProcessOperator = octobot_process_ops.create_octobot_process_operators(
+ None, TEST_EXECUTOR_ID
+)[0]
_TESTS_RUN_OCTOBOT_PROCESS_WAITING_TIME_SEC = 2
pytestmark = pytest.mark.asyncio
@@ -56,16 +59,66 @@ async def _async_return_none_mock(*_unused):
return None
-async def _async_live_process_bot_state_mock(*_unused):
+async def _async_live_process_bot_state_mock(*_unused, metadata_pid=20002):
now = octobot_process_ops.time.time()
interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS)
return process_bot_state_import.ProcessBotState(
metadata=process_bot_state_import.Metadata(
updated_at=now - 0.1,
next_updated_at=now + interval,
+ pid=metadata_pid,
),
exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(),
)
+
+
+def _stale_process_bot_state_for_grace(*, age_seconds: float, metadata_pid: int = 10002):
+ now = octobot_process_ops.time.time()
+ interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS)
+ return process_bot_state_import.ProcessBotState(
+ metadata=process_bot_state_import.Metadata(
+ updated_at=now - age_seconds,
+ next_updated_at=now - age_seconds + interval,
+ pid=metadata_pid,
+ ),
+ exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(),
+ )
+
+
+async def _async_live_process_bot_state_with_pid_10001(*_unused):
+ return await _async_live_process_bot_state_mock(metadata_pid=10001)
+
+
+def _healthy_recall_inner(
+ *,
+ pid: int = 10002,
+ init_state_ok: bool = True,
+ user_root: str | None = None,
+ tmp_path=None,
+ executor_id: str = TEST_EXECUTOR_ID,
+) -> dict:
+ if user_root is None and tmp_path is not None:
+ user_root = str(
+ tmp_path / commons_constants.USER_FOLDER / commons_constants.AUTOMATIONS_FOLDER / "ub"
+ )
+ user_root = user_root or "/x/ub"
+ state_fn = os.path.join(user_root, octobot_constants.PROCESS_BOT_STATE_FILE_NAME)
+ return {
+ "waiting_time": octobot_process_ops.DEFAULT_PING_WAITING_TIME,
+ "last_execution_time": 0.0,
+ "http_base_url": "http://127.0.0.1:20050",
+ "web_port": 20050,
+ "node_port": 30050,
+ "user_root": user_root,
+ "user_folder": "ub",
+ "log_folder": "/x/logs/ub",
+ "profile_id": "p",
+ "pid": pid,
+ "state_file_path": state_fn,
+ "started_waiting_at": 0.0,
+ "init_state_ok": init_state_ok,
+ "executor_id": executor_id,
+ }
def _stop_test_ensure_state_dict(http_base_url: str) -> dict:
return octobot_process_ops.EnsureOctobotProcessState(
http_base_url=http_base_url,
@@ -79,6 +132,7 @@ def _stop_test_ensure_state_dict(http_base_url: str) -> dict:
state_file_path=os.path.normpath(
os.path.join("/x", octobot_constants.PROCESS_BOT_STATE_FILE_NAME)
),
+ executor_id=TEST_EXECUTOR_ID,
).model_dump()
@@ -980,9 +1034,13 @@ async def test_returns_recallable_with_init_state_ok_after_first_spawn(self, tmp
process_util,
"spawn_managed_subprocess",
) as spawn_mock, mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 10001,
+ ), mock.patch.object(
octobot_process_ops,
"_load_process_bot_state",
- new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock),
+ new=mock.AsyncMock(side_effect=_async_live_process_bot_state_with_pid_10001),
):
spawn_mock.return_value.pid = 10001
await op.pre_compute()
@@ -1055,9 +1113,9 @@ async def test_returns_recallable_with_init_state_ok_on_recall_path(self, tmp_pa
"getcwd",
return_value=str(tmp_path),
), mock.patch.object(
- dsl_interpreter.ProcessBoundOperatorMixin,
- "is_process_running",
- return_value=True,
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 10002,
), mock.patch.object(
octobot_process_ops,
"_load_process_bot_state",
@@ -1098,6 +1156,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path):
"state_file_path": state_fn,
"started_waiting_at": 0.0,
"init_state_ok": False,
+ "executor_id": TEST_EXECUTOR_ID,
}
op = EnsureOctobotProcessOperator(
user_folder="ub",
@@ -1122,6 +1181,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path):
), mock.patch.object(
octobot_process_ops,
"_load_process_bot_state",
+ new=mock.AsyncMock(return_value=None),
) as load_mock, mock.patch.object(
octobot_process_ops,
"_listen_port_pair_with_shared_scan_offset",
@@ -1129,7 +1189,7 @@ async def test_init_timeout_kills_and_raises_dsl_error(self, tmp_path):
with pytest.raises(commons_errors.DSLInterpreterError, match="Timed out waiting"):
await op.pre_compute()
stop_mock.assert_called_once_with(logger=mock.ANY)
- load_mock.assert_not_called()
+ load_mock.assert_called()
ffp.assert_not_called()
@@ -1154,6 +1214,7 @@ async def test_does_not_apply_init_timeout_after_init_state_ok(self, tmp_path):
"state_file_path": state_fn2,
"started_waiting_at": 0.0,
"init_state_ok": True,
+ "executor_id": TEST_EXECUTOR_ID,
}
op = EnsureOctobotProcessOperator(
user_folder="ub",
@@ -1167,9 +1228,9 @@ async def test_does_not_apply_init_timeout_after_init_state_ok(self, tmp_path):
"getcwd",
return_value=str(tmp_path),
), mock.patch.object(
- dsl_interpreter.ProcessBoundOperatorMixin,
- "is_process_running",
- return_value=True,
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id in (88002, 20002),
), mock.patch.object(octobot_process_ops, "time", st_time), mock.patch.object(
octobot_process_ops,
"_load_process_bot_state",
@@ -1528,7 +1589,8 @@ async def _run_default_config_lifecycle(
)
operator_signals_holder = dsl_interpreter.OperatorSignals()
stop_operator_cls = octobot_process_ops.create_octobot_process_operators(
- operator_signals_holder
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
)[0]
operator_signals_holder.sync({
stop_operator_cls.get_name(): dsl_interpreter.OperatorSignal.STOP.value,
@@ -1595,7 +1657,8 @@ async def test_execution_stop_dead_child_is_already_stopped(self):
inner = _stop_test_ensure_state_dict("http://127.0.0.1:7")
operator_signals_holder = dsl_interpreter.OperatorSignals()
operator_under_test = octobot_process_ops.create_octobot_process_operators(
- operator_signals_holder
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
)[0]
operator_signals_holder.sync({
operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value,
@@ -1605,23 +1668,22 @@ async def test_execution_stop_dead_child_is_already_stopped(self):
profile_data=_MINIMAL_PROFILE_DATA,
last_execution_result=_re_calling_ensure_value(inner),
)
- with (
- mock.patch.object(
- dsl_interpreter.ProcessBoundOperatorMixin,
- "is_process_running",
- return_value=False,
- ),
+ with mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
):
await op.pre_compute()
assert isinstance(op.value, dict)
assert op.value["status"] == "already_stopped"
async def test_execution_stop_short_circuits_without_sigterm_when_not_running(self):
- """STOP branch returns already_stopped before ``request_graceful_stop`` when ``is_process_running`` is false."""
+ """STOP branch returns already_stopped before ``request_graceful_stop`` when stored pid is not running."""
inner = _stop_test_ensure_state_dict("http://127.0.0.1:7")
operator_signals_holder = dsl_interpreter.OperatorSignals()
operator_under_test = octobot_process_ops.create_octobot_process_operators(
- operator_signals_holder
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
)[0]
operator_signals_holder.sync({
operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value,
@@ -1634,8 +1696,8 @@ async def test_execution_stop_short_circuits_without_sigterm_when_not_running(se
graceful_stop_mock = mock.Mock()
with (
mock.patch.object(
- dsl_interpreter.ProcessBoundOperatorMixin,
- "is_process_running",
+ process_util,
+ "pid_is_running",
return_value=False,
),
mock.patch.object(
@@ -1652,7 +1714,8 @@ async def test_execution_stop_os_kill_failure_raises(self):
inner = _stop_test_ensure_state_dict("http://127.0.0.1:7")
operator_signals_holder = dsl_interpreter.OperatorSignals()
operator_under_test = octobot_process_ops.create_octobot_process_operators(
- operator_signals_holder
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
)[0]
operator_signals_holder.sync({
operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value,
@@ -1744,10 +1807,12 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa
octobot_constants.PROCESS_BOT_STATE_FILE_NAME,
),
init_state_ok=True,
+ executor_id=TEST_EXECUTOR_ID,
).model_dump()
operator_signals_holder = dsl_interpreter.OperatorSignals()
operator_under_test = octobot_process_ops.create_octobot_process_operators(
- operator_signals_holder
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
)[0]
operator_signals_holder.sync({
operator_under_test.get_name(): dsl_interpreter.OperatorSignal.UPDATE_CONFIG.value,
@@ -1770,6 +1835,11 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa
"request_graceful_stop",
return_value={"status": "stopped", "signal": "sigterm"},
) as stop_mock,
+ mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 4242,
+ ),
mock.patch.object(
octobot_process_ops,
"_load_process_bot_state",
@@ -1785,10 +1855,563 @@ async def test_update_config_triggers_respawn_and_recallable_result(self, tmp_pa
stop_mock.assert_called_once()
wait_mock.assert_awaited_once()
spawn_mock.assert_called_once()
- assert not (user_automation / "stale_marker.txt").exists()
- assert isinstance(op.value, dict)
assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
finally:
shutil.rmtree(tmp_path / commons_constants.USER_FOLDER, ignore_errors=True)
if (tmp_path / "logs").exists():
shutil.rmtree(tmp_path / "logs", ignore_errors=True)
+
+
+class TestMetadataPidRoundTrip:
+ def test_metadata_pid_to_dict_from_dict(self):
+ metadata = process_bot_state_import.Metadata(
+ updated_at=1.0,
+ next_updated_at=2.0,
+ pid=424242,
+ )
+ restored = process_bot_state_import.Metadata.from_dict(
+ metadata.to_dict(include_default_values=False)
+ )
+ assert restored.pid == 424242
+
+
+class TestShouldUseRecallPathWhenStoredPidDeadButStateLive:
+ async def test_adopts_pid_from_live_state_without_spawn(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 20002,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ le = op.value[dsl_interpreter.ReCallingOperatorResult.__name__]["last_execution_result"]
+ assert le.get("init_state_ok") is True
+ assert le.get("pid") == 20002
+
+
+class TestShouldUseRecallPathDuringInitWhenStoredPidDead:
+ async def test_recall_without_spawn_during_init(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, init_state_ok=False, tmp_path=tmp_path)
+ inner["started_waiting_at"] = octobot_process_ops.time.time()
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_return_none_mock),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestRecallPathWhenStateFileMissingButPidRunning:
+ async def test_recall_when_pid_running_without_state_file(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 10002,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_return_none_mock),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestRestartGracePeriodAvoidsRespawnWhileStateStale:
+ async def test_recall_during_restart_grace(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ stale_state = _stale_process_bot_state_for_grace(
+ age_seconds=float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) * 2.5,
+ metadata_pid=10002,
+ )
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ ping_timeout=120.0,
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=stale_state),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestFirstSpawnAfterRestartGraceExpires:
+ async def test_respawns_when_grace_expired(self, tmp_path):
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ stale_state = _stale_process_bot_state_for_grace(age_seconds=200.0, metadata_pid=10002)
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ ping_timeout=120.0,
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": inner["user_root"],
+ "profile_id": "x",
+ "already_prepared": True,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=stale_state),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30003)
+ await op.pre_compute()
+ spawn_mock.assert_called_once()
+
+
+class TestFirstSpawnWhenStateFileMissingAndPidDeadAfterInit:
+ async def test_respawns_when_no_state_file_and_pid_dead(self, tmp_path):
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": inner["user_root"],
+ "profile_id": "x",
+ "already_prepared": True,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_return_none_mock),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30004)
+ await op.pre_compute()
+ spawn_mock.assert_called_once()
+
+
+class TestStopAdoptsPidFromProcessBotState:
+ async def test_stop_signals_adopted_pid(self):
+ inner = _healthy_recall_inner(pid=10002)
+ operator_signals_holder = dsl_interpreter.OperatorSignals()
+ operator_under_test = octobot_process_ops.create_octobot_process_operators(
+ operator_signals_holder,
+ TEST_EXECUTOR_ID,
+ )[0]
+ operator_signals_holder.sync({
+ operator_under_test.get_name(): dsl_interpreter.OperatorSignal.STOP.value,
+ })
+ op = operator_under_test(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ graceful_stop_mock = mock.Mock(return_value={"status": "stopped", "signal": "sigterm"})
+ with (
+ mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 20002,
+ ),
+ mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock),
+ ),
+ mock.patch.object(
+ operator_under_test,
+ "request_graceful_stop",
+ new=graceful_stop_mock,
+ ),
+ ):
+ await op.pre_compute()
+ graceful_stop_mock.assert_called_once()
+ assert op.pid == 20002
+ assert op.value == {"status": "stopped", "signal": "sigterm"}
+
+
+class TestExecutorRestartedRequiresRespawn:
+ async def test_marker_mismatch_forces_first_spawn(self, tmp_path):
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ inner = _healthy_recall_inner(
+ pid=10002,
+ tmp_path=tmp_path,
+ executor_id="old-executor-id",
+ )
+ live_state = await _async_live_process_bot_state_mock(metadata_pid=20002)
+ op = octobot_process_ops.create_octobot_process_operators(
+ None, TEST_EXECUTOR_ID
+ )[0](
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": inner["user_root"],
+ "profile_id": "x",
+ "already_prepared": True,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=live_state),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30005)
+ await op.pre_compute()
+ spawn_mock.assert_called_once()
+
+
+class TestExecutorRestartSkippedWhenChildPidRunning:
+ async def test_marker_mismatch_but_metadata_pid_running_recalls(self, tmp_path):
+ inner = _healthy_recall_inner(
+ pid=10002,
+ tmp_path=tmp_path,
+ executor_id="old-executor-id",
+ )
+ op = octobot_process_ops.create_octobot_process_operators(
+ None, TEST_EXECUTOR_ID
+ )[0](
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ side_effect=lambda process_id: process_id == 20002,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_live_process_bot_state_mock),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestGraceWhenTimestampFreshButMetadataPidDead:
+ async def test_recall_during_grace_without_spawn(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ now = octobot_process_ops.time.time()
+ interval = float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS)
+ fresh_dead_pid_state = process_bot_state_import.ProcessBotState(
+ metadata=process_bot_state_import.Metadata(
+ updated_at=now - 0.1,
+ next_updated_at=now + interval,
+ pid=20002,
+ ),
+ exchange_account_elements=octobot_flow_entities.ExchangeAccountElements(),
+ )
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ ping_timeout=120.0,
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=fresh_dead_pid_state),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestExecutorRestartDoesNotBypassGraceWhenMarkerMatches:
+ async def test_recall_during_grace_when_marker_matches(self, tmp_path):
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ stale_state = _stale_process_bot_state_for_grace(
+ age_seconds=float(octobot_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS) * 2.5,
+ metadata_pid=10002,
+ )
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ ping_timeout=120.0,
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock, mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=stale_state),
+ ):
+ await op.pre_compute()
+ spawn_mock.assert_not_called()
+ assert dsl_interpreter.ReCallingOperatorResult.__name__ in op.value
+
+
+class TestRespawnsWhenGraceExpiredAndPidDead:
+ async def test_first_spawn_when_grace_expired(self, tmp_path):
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ stale_state = _stale_process_bot_state_for_grace(age_seconds=200.0, metadata_pid=10002)
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ ping_timeout=120.0,
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ process_util,
+ "pid_is_running",
+ return_value=False,
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": inner["user_root"],
+ "profile_id": "x",
+ "already_prepared": True,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(return_value=stale_state),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30006)
+ await op.pre_compute()
+ spawn_mock.assert_called_once()
+
+
+class TestEnsureOctobotProcessStateEmitsExecutorId:
+ async def test_first_spawn_emits_executor_id(self, tmp_path, monkeypatch):
+ monkeypatch.chdir(tmp_path)
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ op = EnsureOctobotProcessOperator(
+ user_folder="emit_master_bot",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ )
+ with mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_return_none_mock),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": str(
+ tmp_path
+ / commons_constants.USER_FOLDER
+ / commons_constants.AUTOMATIONS_FOLDER
+ / "emit_master_bot"
+ ),
+ "profile_id": "x",
+ "already_prepared": False,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=40001)
+ await op.pre_compute()
+ le = op.value[dsl_interpreter.ReCallingOperatorResult.__name__]["last_execution_result"]
+ assert le["executor_id"] == TEST_EXECUTOR_ID
+
+
+class TestRecallStateRequiresExecutorId:
+ async def test_missing_executor_id_falls_through_to_first_spawn(self, tmp_path):
+ (tmp_path / "start.py").write_text("#", encoding="utf-8")
+ inner = _healthy_recall_inner(pid=10002, tmp_path=tmp_path)
+ del inner["executor_id"]
+ op = EnsureOctobotProcessOperator(
+ user_folder="ub",
+ profile_data=_MINIMAL_PROFILE_DATA,
+ last_execution_result=_re_calling_ensure_value(inner),
+ )
+ with mock.patch.object(
+ octobot_process_ops.os,
+ "getcwd",
+ return_value=str(tmp_path),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "ensure_user_profile_and_layout",
+ new=mock.AsyncMock(
+ return_value={
+ "user_root": inner["user_root"],
+ "profile_id": "x",
+ "already_prepared": True,
+ }
+ ),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_listen_port_pair_with_shared_scan_offset",
+ return_value=(20050, 30050),
+ ), mock.patch.object(
+ octobot_process_ops,
+ "_load_process_bot_state",
+ new=mock.AsyncMock(side_effect=_async_return_none_mock),
+ ), mock.patch.object(
+ process_util,
+ "spawn_managed_subprocess",
+ ) as spawn_mock:
+ spawn_mock.return_value = mock.Mock(spec=["pid"], pid=30007)
+ await op.pre_compute()
+ spawn_mock.assert_called_once()