Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions octobot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

try:
import octobot_commons.os_util as os_util
import octobot_commons.logging as logging
import octobot_commons.logging
import octobot_commons.configuration as configuration
import octobot_commons.profiles as profiles
import octobot_commons.authentication as authentication
Expand Down Expand Up @@ -67,7 +67,7 @@ def update_config_with_args(starting_args, config: configuration.Configuration,
try:
import octobot_backtesting.constants as backtesting_constants
except ImportError as e:
logging.get_logger().error(
octobot_commons.logging.get_logger().error(
"Can't start backtesting without the octobot_backtesting package properly installed.")
raise e

Expand Down Expand Up @@ -325,25 +325,39 @@ def _load_or_create_tentacles(community_auth, config, logger):
config.load_profiles_if_possible_and_necessary()


def _init_cli_overriden_folders(args):
overrides = {}
if args.user_folder:
overrides["user_folder"] = args.user_folder
_set_user_root_from_cli(args.user_folder)
if args.log_folder:
overrides["log_folder"] = args.log_folder
logs_folder = args.log_folder
else:
logs_folder = constants.LOGS_FOLDER
return overrides, logs_folder


def start_octobot(args, default_config_file=None):
logger = None
try:
if args.version:
print(constants.LONG_VERSION)
return

user_folder = getattr(args, "user_folder", None)
if user_folder:
_set_user_root_from_cli(user_folder)

# log folder: --log-folder overrides default (from LOGS_FOLDER env at import + default "logs")
logs_folder = getattr(args, "log_folder", None) or constants.LOGS_FOLDER

overrides, logs_folder = _init_cli_overriden_folders(args)
logger = octobot_logger.init_logger(logs_folder=logs_folder)
startup_messages = []

# Version
logger.info("Version : {0}".format(constants.LONG_VERSION))

# Log folder overrides
if overrides:
logger.info(f"Overriding default folders: {overrides}")
else:
logger.info(f"Using default folders")

# Current running environment
_log_environment(logger)

Expand Down
6 changes: 5 additions & 1 deletion octobot/storage/process_bot_state_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ async def _write_state_file_async(
) -> None:
now = time.time()
state = process_bot_state_import.ProcessBotState(
metadata=process_bot_state_import.Metadata(updated_at=now, next_updated_at=now + interval),
metadata=process_bot_state_import.Metadata(
updated_at=now,
next_updated_at=now + interval,
pid=os.getpid(),
),
exchange_account_elements=_synced_exchange_account_elements_for_first_trading_exchange(
bot,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
@dataclasses.dataclass
class Metadata(octobot_commons.dataclasses.MinimizableDataclass):
"""
Timestamps written with process bot state dumps; used for file-based liveness checks.
Timestamps and child PID written with process bot state dumps. Liveness checks use
updated_at / next_updated_at only; pid is the authoritative child PID for parent binding
after restarts.
"""

updated_at: float = 0.0
next_updated_at: float = 0.0
pid: int = 0


@dataclasses.dataclass
Expand Down
13 changes: 13 additions & 0 deletions packages/flow/octobot_flow/environment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import typing

import octobot.constants # will load .env file and init constants

import octobot_flow.repositories.community
import octobot_trading.constants

_EXECUTOR_ID: typing.Optional[str] = None


def register_executor_id(executor_id: str) -> None:
global _EXECUTOR_ID
_EXECUTOR_ID = executor_id


def get_executor_id() -> typing.Optional[str]:
return _EXECUTOR_ID


def initialize_environment(allow_funds_transfer: bool = False) -> None:
octobot_flow.repositories.community.initialize_community_authentication()
Expand Down
27 changes: 21 additions & 6 deletions packages/flow/octobot_flow/logic/dsl/dsl_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import octobot_evaluators.evaluators as evaluators

import octobot_flow.entities
import octobot_flow.environment
import octobot_flow.errors
import octobot_flow.enums
import octobot_flow.logic.dsl.action_error_util
Expand All @@ -24,21 +25,23 @@
import tentacles.Meta.DSL_operators.octobot_process_operators.octobot_process_ops as octobot_process_ops



class DSLExecutor(AbstractActionExecutor):
def __init__(
self,
profile_data: octobot_commons.profiles.ProfileData,
exchange_manager: typing.Optional[octobot_trading.exchanges.ExchangeManager],
dsl_script: typing.Optional[str],
dependencies: typing.Optional[octobot_commons.signals.SignalDependencies] = None,
executor_id: typing.Optional[str] = None,
):
super().__init__()
self._exchange_manager = exchange_manager
self._dependencies = dependencies
self._dependencies_config: dict = profile_data.to_profile("").config
self._interpreter_signals: octobot_commons.dsl_interpreter.OperatorSignals = None # type: ignore (reset when interpreter is created)
self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(None)
self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(
None, executor_id
)
if dsl_script:
self._interpreter.prepare(dsl_script)

Expand All @@ -49,7 +52,15 @@ def _get_matrix_id(self) -> typing.Optional[str]:

def get_flow_operator_classes(
self,
executor_id: typing.Optional[str] = None,
) -> list[typing.Type[octobot_commons.dsl_interpreter.Operator]]:
resolved_executor_id = (
executor_id or octobot_flow.environment.get_executor_id()
)
if not resolved_executor_id:
raise octobot_flow.errors.MissingDSLExecutorDependencyError(
"executor_id is required for run_octobot_process"
)
return (
octobot_commons.dsl_interpreter.get_all_operators()
+ dsl_operators.create_ohlcv_operators(self._exchange_manager, None, None)
Expand All @@ -74,16 +85,19 @@ def get_flow_operator_classes(
copier_trading_mode=None,
)
+ octobot_process_ops.create_octobot_process_operators(
self._interpreter_signals
self._interpreter_signals,
resolved_executor_id,
)
) # type: ignore (list[type[Operator]])

def _create_interpreter(
self, previous_execution_result: typing.Optional[dict]
self,
previous_execution_result: typing.Optional[dict],
executor_id: typing.Optional[str] = None,
) -> octobot_commons.dsl_interpreter.Interpreter:
self._interpreter_signals = octobot_commons.dsl_interpreter.OperatorSignals()
return octobot_commons.dsl_interpreter.Interpreter(
self.get_flow_operator_classes()
self.get_flow_operator_classes(executor_id)
)

def get_dependencies(self) -> list[
Expand All @@ -110,7 +124,8 @@ async def execute_action(
] = None,
) -> octobot_commons.dsl_interpreter.DSLCallResult:
self._interpreter = self._create_interpreter(
action.previous_execution_result
action.previous_execution_result,
None,
)
expression = action.get_resolved_dsl_script()
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
# Drakkar-Software OctoBot
# Shared helpers/constants for octobot process functional tests (run_octobot_process, GridTradingMode).

import asyncio
import copy
import decimal
import json
import os
import pathlib
import time
import typing

import octobot.constants as octobot_constants
import octobot_commons.configuration as configuration_module
import octobot_commons.constants as commons_constants
import octobot_commons.dsl_interpreter as dsl_interpreter
import octobot_trading.constants as trading_constants
import octobot_trading.enums as trading_enums
import pytest

import octobot_flow.jobs
import octobot_flow.entities
import octobot_flow.environment
import octobot_flow.enums
import tests.functionnal_tests as functionnal_tests
import tests.functionnal_tests.tentacle_test_configs as tentacle_test_configs
Expand Down Expand Up @@ -200,6 +209,100 @@ def _get_action_by_id(
return None


_FERNET_ENCRYPTED_PREFIX = "gAAAAA"


# --- Child on-disk readiness (poll after init_state_ok; PID can be up before first dump / encrypt) ---


def _process_bot_state_path(inner: dict) -> str:
return os.path.normpath(
os.path.join(
inner["user_root"],
octobot_constants.PROCESS_BOT_STATE_FILE_NAME,
)
)


async def _wait_for_process_bot_state_file(
state_path: str,
*,
timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
) -> None:
"""Poll until the child has written at least one process_bot_state.json dump."""
deadline = time.monotonic() + timeout_sec
while time.monotonic() < deadline:
if os.path.isfile(state_path):
return
await asyncio.sleep(poll_interval_sec)
pytest.fail(
f"Timed out waiting for process_bot_state.json at {state_path!r} within {timeout_sec}s"
)


async def _assert_encrypted_exchange_credentials_in_user_config(
user_root: typing.Union[pathlib.Path, str],
exchange_internal_name: str,
expected_api_key: str,
expected_api_secret: str,
*,
timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
) -> None:
"""
Poll child user-root config.json, then assert api key/secret are Fernet-encrypted and decrypt
to the expected plaintext.

run_octobot_process seeds plaintext credentials before spawn; the child re-saves config.json
on startup. init_state_ok can become true from PID liveness before encryption completes.
"""
user_root_path = pathlib.Path(user_root)
config_path = user_root_path / commons_constants.CONFIG_FILE
deadline = time.monotonic() + timeout_sec
last_failure_reason = "config.json missing or exchange entry not ready"
while time.monotonic() < deadline:
if not config_path.is_file():
await asyncio.sleep(poll_interval_sec)
continue
root_cfg = json.loads(config_path.read_text(encoding="utf-8"))
exchanges_cfg = root_cfg.get(commons_constants.CONFIG_EXCHANGES) or {}
exchange_cfg = exchanges_cfg.get(exchange_internal_name)
if not isinstance(exchange_cfg, dict):
last_failure_reason = f"exchange {exchange_internal_name!r} missing from config"
await asyncio.sleep(poll_interval_sec)
continue
stored_api_key = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_KEY)
stored_api_secret = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_SECRET)
if not (
isinstance(stored_api_key, str)
and stored_api_key.startswith(_FERNET_ENCRYPTED_PREFIX)
and isinstance(stored_api_secret, str)
and stored_api_secret.startswith(_FERNET_ENCRYPTED_PREFIX)
):
last_failure_reason = "api key/secret not yet Fernet-encrypted in config.json"
await asyncio.sleep(poll_interval_sec)
continue
try:
decrypted_api_key = configuration_module.decrypt(stored_api_key)
decrypted_api_secret = configuration_module.decrypt(stored_api_secret)
except Exception as decrypt_error:
last_failure_reason = f"decrypt failed: {decrypt_error}"
await asyncio.sleep(poll_interval_sec)
continue
assert decrypted_api_key == expected_api_key, (
f"decrypted api key mismatch for {exchange_internal_name!r}"
)
assert decrypted_api_secret == expected_api_secret, (
f"decrypted api secret mismatch for {exchange_internal_name!r}"
)
return
pytest.fail(
f"Timed out waiting for encrypted exchange credentials in {config_path!r} "
f"within {timeout_sec}s ({last_failure_reason})"
)


def _make_tracked_spawn_managed_with_forward_terminal_output(
real_spawn_managed: typing.Callable[..., typing.Any],
popen_calls: dict[str, int],
Expand All @@ -213,6 +316,12 @@ def _tracked(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
return _tracked


@pytest.fixture(autouse=True)
def register_functional_executor_id():
octobot_flow.environment.register_executor_id("func-test-executor")
yield


@pytest.fixture
def init_action():
# Automation apply_configuration: seed automation state to match expected exchange + portfolio.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders(
initial_spawn_count = popen_calls["count"]
assert initial_spawn_count >= 1

# First process_bot_state dump can lag init_state_ok (see shared wait helper).
state_path = octobot_process_functional_shared._process_bot_state_path(inner)
await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path)

# 3) Wait until at least four open ladder orders exist, then assert a 2×2 grid pattern.
orders_deadline = time.monotonic() + octobot_process_functional_shared.GRID_ORDERS_TIMEOUT_SEC
exchange_account_snapshot: typing.Optional[
Expand Down
Loading
Loading