Skip to content

Commit 76bb311

Browse files
committed
[Node] fix process bot restart issues
1 parent 5c8a121 commit 76bb311

18 files changed

Lines changed: 1528 additions & 355 deletions

File tree

octobot/cli.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
try:
2727
import octobot_commons.os_util as os_util
28-
import octobot_commons.logging as logging
28+
import octobot_commons.logging
2929
import octobot_commons.configuration as configuration
3030
import octobot_commons.profiles as profiles
3131
import octobot_commons.authentication as authentication
@@ -67,7 +67,7 @@ def update_config_with_args(starting_args, config: configuration.Configuration,
6767
try:
6868
import octobot_backtesting.constants as backtesting_constants
6969
except ImportError as e:
70-
logging.get_logger().error(
70+
octobot_commons.logging.get_logger().error(
7171
"Can't start backtesting without the octobot_backtesting package properly installed.")
7272
raise e
7373

@@ -325,25 +325,39 @@ def _load_or_create_tentacles(community_auth, config, logger):
325325
config.load_profiles_if_possible_and_necessary()
326326

327327

328+
def _init_cli_overriden_folders(args):
329+
overrides = {}
330+
if args.user_folder:
331+
overrides["user_folder"] = args.user_folder
332+
_set_user_root_from_cli(args.user_folder)
333+
if args.log_folder:
334+
overrides["log_folder"] = args.log_folder
335+
logs_folder = args.log_folder
336+
else:
337+
logs_folder = constants.LOGS_FOLDER
338+
return overrides, logs_folder
339+
340+
328341
def start_octobot(args, default_config_file=None):
329342
logger = None
330343
try:
331344
if args.version:
332345
print(constants.LONG_VERSION)
333346
return
334-
335-
user_folder = getattr(args, "user_folder", None)
336-
if user_folder:
337-
_set_user_root_from_cli(user_folder)
338-
339-
# log folder: --log-folder overrides default (from LOGS_FOLDER env at import + default "logs")
340-
logs_folder = getattr(args, "log_folder", None) or constants.LOGS_FOLDER
347+
348+
overrides, logs_folder = _init_cli_overriden_folders(args)
341349
logger = octobot_logger.init_logger(logs_folder=logs_folder)
342350
startup_messages = []
343351

344352
# Version
345353
logger.info("Version : {0}".format(constants.LONG_VERSION))
346354

355+
# Log folder overrides
356+
if overrides:
357+
logger.info(f"Overriding default folders: {overrides}")
358+
else:
359+
logger.info(f"Using default folders")
360+
347361
# Current running environment
348362
_log_environment(logger)
349363

octobot/storage/process_bot_state_dumper.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ async def _write_state_file_async(
7373
) -> None:
7474
now = time.time()
7575
state = process_bot_state_import.ProcessBotState(
76-
metadata=process_bot_state_import.Metadata(updated_at=now, next_updated_at=now + interval),
76+
metadata=process_bot_state_import.Metadata(
77+
updated_at=now,
78+
next_updated_at=now + interval,
79+
pid=os.getpid(),
80+
),
7781
exchange_account_elements=_synced_exchange_account_elements_for_first_trading_exchange(
7882
bot,
7983
),

packages/flow/octobot_flow/entities/accounts/process_bot_state.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
@dataclasses.dataclass
1111
class Metadata(octobot_commons.dataclasses.MinimizableDataclass):
1212
"""
13-
Timestamps written with process bot state dumps; used for file-based liveness checks.
13+
Timestamps and child PID written with process bot state dumps. Liveness checks use
14+
updated_at / next_updated_at only; pid is the authoritative child PID for parent binding
15+
after restarts.
1416
"""
1517

1618
updated_at: float = 0.0
1719
next_updated_at: float = 0.0
20+
pid: int = 0
1821

1922

2023
@dataclasses.dataclass

packages/flow/octobot_flow/environment.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1+
import typing
2+
13
import octobot.constants # will load .env file and init constants
24

35
import octobot_flow.repositories.community
46
import octobot_trading.constants
57

8+
_EXECUTOR_ID: typing.Optional[str] = None
9+
10+
11+
def register_executor_id(executor_id: str) -> None:
12+
global _EXECUTOR_ID
13+
_EXECUTOR_ID = executor_id
14+
15+
16+
def get_executor_id() -> typing.Optional[str]:
17+
return _EXECUTOR_ID
18+
619

720
def initialize_environment(allow_funds_transfer: bool = False) -> None:
821
octobot_flow.repositories.community.initialize_community_authentication()

packages/flow/octobot_flow/logic/dsl/dsl_executor.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import octobot_evaluators.evaluators as evaluators
1313

1414
import octobot_flow.entities
15+
import octobot_flow.environment
1516
import octobot_flow.errors
1617
import octobot_flow.enums
1718
import octobot_flow.logic.dsl.action_error_util
@@ -24,21 +25,23 @@
2425
import tentacles.Meta.DSL_operators.octobot_process_operators.octobot_process_ops as octobot_process_ops
2526

2627

27-
2828
class DSLExecutor(AbstractActionExecutor):
2929
def __init__(
3030
self,
3131
profile_data: octobot_commons.profiles.ProfileData,
3232
exchange_manager: typing.Optional[octobot_trading.exchanges.ExchangeManager],
3333
dsl_script: typing.Optional[str],
3434
dependencies: typing.Optional[octobot_commons.signals.SignalDependencies] = None,
35+
executor_id: typing.Optional[str] = None,
3536
):
3637
super().__init__()
3738
self._exchange_manager = exchange_manager
3839
self._dependencies = dependencies
3940
self._dependencies_config: dict = profile_data.to_profile("").config
4041
self._interpreter_signals: octobot_commons.dsl_interpreter.OperatorSignals = None # type: ignore (reset when interpreter is created)
41-
self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(None)
42+
self._interpreter: octobot_commons.dsl_interpreter.Interpreter = self._create_interpreter(
43+
None, executor_id
44+
)
4245
if dsl_script:
4346
self._interpreter.prepare(dsl_script)
4447

@@ -49,7 +52,15 @@ def _get_matrix_id(self) -> typing.Optional[str]:
4952

5053
def get_flow_operator_classes(
5154
self,
55+
executor_id: typing.Optional[str] = None,
5256
) -> list[typing.Type[octobot_commons.dsl_interpreter.Operator]]:
57+
resolved_executor_id = (
58+
executor_id or octobot_flow.environment.get_executor_id()
59+
)
60+
if not resolved_executor_id:
61+
raise octobot_flow.errors.MissingDSLExecutorDependencyError(
62+
"executor_id is required for run_octobot_process"
63+
)
5364
return (
5465
octobot_commons.dsl_interpreter.get_all_operators()
5566
+ dsl_operators.create_ohlcv_operators(self._exchange_manager, None, None)
@@ -74,16 +85,19 @@ def get_flow_operator_classes(
7485
copier_trading_mode=None,
7586
)
7687
+ octobot_process_ops.create_octobot_process_operators(
77-
self._interpreter_signals
88+
self._interpreter_signals,
89+
resolved_executor_id,
7890
)
7991
) # type: ignore (list[type[Operator]])
8092

8193
def _create_interpreter(
82-
self, previous_execution_result: typing.Optional[dict]
94+
self,
95+
previous_execution_result: typing.Optional[dict],
96+
executor_id: typing.Optional[str] = None,
8397
) -> octobot_commons.dsl_interpreter.Interpreter:
8498
self._interpreter_signals = octobot_commons.dsl_interpreter.OperatorSignals()
8599
return octobot_commons.dsl_interpreter.Interpreter(
86-
self.get_flow_operator_classes()
100+
self.get_flow_operator_classes(executor_id)
87101
)
88102

89103
def get_dependencies(self) -> list[
@@ -110,7 +124,8 @@ async def execute_action(
110124
] = None,
111125
) -> octobot_commons.dsl_interpreter.DSLCallResult:
112126
self._interpreter = self._create_interpreter(
113-
action.previous_execution_result
127+
action.previous_execution_result,
128+
None,
114129
)
115130
expression = action.get_resolved_dsl_script()
116131
try:

packages/flow/tests/functionnal_tests/octobot_process_actions/octobot_process_functional_shared.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
# Drakkar-Software OctoBot
22
# Shared helpers/constants for octobot process functional tests (run_octobot_process, GridTradingMode).
33

4+
import asyncio
45
import copy
56
import decimal
7+
import json
8+
import os
9+
import pathlib
10+
import time
611
import typing
712

13+
import octobot.constants as octobot_constants
14+
import octobot_commons.configuration as configuration_module
15+
import octobot_commons.constants as commons_constants
816
import octobot_commons.dsl_interpreter as dsl_interpreter
917
import octobot_trading.constants as trading_constants
1018
import octobot_trading.enums as trading_enums
1119
import pytest
1220

1321
import octobot_flow.jobs
1422
import octobot_flow.entities
23+
import octobot_flow.environment
1524
import octobot_flow.enums
1625
import tests.functionnal_tests as functionnal_tests
1726
import tests.functionnal_tests.tentacle_test_configs as tentacle_test_configs
@@ -200,6 +209,100 @@ def _get_action_by_id(
200209
return None
201210

202211

212+
_FERNET_ENCRYPTED_PREFIX = "gAAAAA"
213+
214+
215+
# --- Child on-disk readiness (poll after init_state_ok; PID can be up before first dump / encrypt) ---
216+
217+
218+
def _process_bot_state_path(inner: dict) -> str:
219+
return os.path.normpath(
220+
os.path.join(
221+
inner["user_root"],
222+
octobot_constants.PROCESS_BOT_STATE_FILE_NAME,
223+
)
224+
)
225+
226+
227+
async def _wait_for_process_bot_state_file(
228+
state_path: str,
229+
*,
230+
timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
231+
poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
232+
) -> None:
233+
"""Poll until the child has written at least one process_bot_state.json dump."""
234+
deadline = time.monotonic() + timeout_sec
235+
while time.monotonic() < deadline:
236+
if os.path.isfile(state_path):
237+
return
238+
await asyncio.sleep(poll_interval_sec)
239+
pytest.fail(
240+
f"Timed out waiting for process_bot_state.json at {state_path!r} within {timeout_sec}s"
241+
)
242+
243+
244+
async def _assert_encrypted_exchange_credentials_in_user_config(
245+
user_root: typing.Union[pathlib.Path, str],
246+
exchange_internal_name: str,
247+
expected_api_key: str,
248+
expected_api_secret: str,
249+
*,
250+
timeout_sec: float = GLOBAL_START_TIMEOUT_SEC,
251+
poll_interval_sec: float = SLEEP_BETWEEN_JOB_POLLS_SEC,
252+
) -> None:
253+
"""
254+
Poll child user-root config.json, then assert api key/secret are Fernet-encrypted and decrypt
255+
to the expected plaintext.
256+
257+
run_octobot_process seeds plaintext credentials before spawn; the child re-saves config.json
258+
on startup. init_state_ok can become true from PID liveness before encryption completes.
259+
"""
260+
user_root_path = pathlib.Path(user_root)
261+
config_path = user_root_path / commons_constants.CONFIG_FILE
262+
deadline = time.monotonic() + timeout_sec
263+
last_failure_reason = "config.json missing or exchange entry not ready"
264+
while time.monotonic() < deadline:
265+
if not config_path.is_file():
266+
await asyncio.sleep(poll_interval_sec)
267+
continue
268+
root_cfg = json.loads(config_path.read_text(encoding="utf-8"))
269+
exchanges_cfg = root_cfg.get(commons_constants.CONFIG_EXCHANGES) or {}
270+
exchange_cfg = exchanges_cfg.get(exchange_internal_name)
271+
if not isinstance(exchange_cfg, dict):
272+
last_failure_reason = f"exchange {exchange_internal_name!r} missing from config"
273+
await asyncio.sleep(poll_interval_sec)
274+
continue
275+
stored_api_key = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_KEY)
276+
stored_api_secret = exchange_cfg.get(commons_constants.CONFIG_EXCHANGE_SECRET)
277+
if not (
278+
isinstance(stored_api_key, str)
279+
and stored_api_key.startswith(_FERNET_ENCRYPTED_PREFIX)
280+
and isinstance(stored_api_secret, str)
281+
and stored_api_secret.startswith(_FERNET_ENCRYPTED_PREFIX)
282+
):
283+
last_failure_reason = "api key/secret not yet Fernet-encrypted in config.json"
284+
await asyncio.sleep(poll_interval_sec)
285+
continue
286+
try:
287+
decrypted_api_key = configuration_module.decrypt(stored_api_key)
288+
decrypted_api_secret = configuration_module.decrypt(stored_api_secret)
289+
except Exception as decrypt_error:
290+
last_failure_reason = f"decrypt failed: {decrypt_error}"
291+
await asyncio.sleep(poll_interval_sec)
292+
continue
293+
assert decrypted_api_key == expected_api_key, (
294+
f"decrypted api key mismatch for {exchange_internal_name!r}"
295+
)
296+
assert decrypted_api_secret == expected_api_secret, (
297+
f"decrypted api secret mismatch for {exchange_internal_name!r}"
298+
)
299+
return
300+
pytest.fail(
301+
f"Timed out waiting for encrypted exchange credentials in {config_path!r} "
302+
f"within {timeout_sec}s ({last_failure_reason})"
303+
)
304+
305+
203306
def _make_tracked_spawn_managed_with_forward_terminal_output(
204307
real_spawn_managed: typing.Callable[..., typing.Any],
205308
popen_calls: dict[str, int],
@@ -213,6 +316,12 @@ def _tracked(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
213316
return _tracked
214317

215318

319+
@pytest.fixture(autouse=True)
320+
def register_functional_executor_id():
321+
octobot_flow.environment.register_executor_id("func-test-executor")
322+
yield
323+
324+
216325
@pytest.fixture
217326
def init_action():
218327
# Automation apply_configuration: seed automation state to match expected exchange + portfolio.

packages/flow/tests/functionnal_tests/octobot_process_actions/test_octobot_process_edit_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ async def test_run_octobot_process_grid_refresh_four_to_six_orders(
185185
initial_spawn_count = popen_calls["count"]
186186
assert initial_spawn_count >= 1
187187

188+
# First process_bot_state dump can lag init_state_ok (see shared wait helper).
189+
state_path = octobot_process_functional_shared._process_bot_state_path(inner)
190+
await octobot_process_functional_shared._wait_for_process_bot_state_file(state_path)
191+
188192
# 3) Wait until at least four open ladder orders exist, then assert a 2×2 grid pattern.
189193
orders_deadline = time.monotonic() + octobot_process_functional_shared.GRID_ORDERS_TIMEOUT_SEC
190194
exchange_account_snapshot: typing.Optional[

0 commit comments

Comments
 (0)