Skip to content

Commit 681310e

Browse files
committed
[Flow] add update_automation_configuration keywords
1 parent f9cc5bc commit 681310e

34 files changed

Lines changed: 2811 additions & 956 deletions

octobot/commands.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import octobot.community.tentacles_packages as community_tentacles_packages
4040
import octobot.configuration_manager as configuration_manager
4141

42-
import octobot.storage.process_bot_state_dumper as process_bot_state_dumper
43-
4442
COMMANDS_LOGGER_NAME = "Commands"
4543
IGNORED_COMMAND_WHEN_RESTART = ["-u", "--update"]
4644

@@ -326,12 +324,6 @@ async def start_bot(bot, logger, catch=False):
326324
await bot.initialize()
327325
except asyncio.CancelledError:
328326
logger.info("Core engine tasks cancelled.")
329-
else:
330-
if bot.dump_state_path:
331-
332-
bot._process_bot_state_dump_task = asyncio.create_task(
333-
process_bot_state_dumper.run_periodic_dump_loop(bot.dump_state_path, logger, bot)
334-
)
335327

336328
except Exception as e:
337329
logger.exception(e)

octobot/constants.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,6 @@
215215
DEFAULT_LOGS_FOLDER = "logs"
216216
LOGS_FOLDER = os.getenv("LOGS_FOLDER", DEFAULT_LOGS_FOLDER)
217217

218-
# Web automation: child process sets OCTOBOT_WEB_API_KEY
219-
ENV_WEB_API_KEY = "OCTOBOT_WEB_API_KEY"
220-
WEB_API_KEY_HEADER = "X-Octobot-Api-Key"
221218
# Process bot state JSON next to user config (--dump-state); liveness for run_octobot_process
222219
PROCESS_BOT_STATE_FILE_NAME = "process_bot_state.json"
223220
ENV_PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS = "OCTOBOT_PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS"

octobot/octobot.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ def __init__(self, config: configuration.Configuration, community_authenticator=
107107
self._init_metadata_run_task = None
108108
# optional path for periodic ProcessBotState JSON (see cli --dump-state)
109109
self.dump_state_path = None
110-
self._process_bot_state_dump_task = None
111110

112111
# Producers
113112
self.exchange_producer = None
@@ -214,9 +213,6 @@ async def stop(self):
214213
self.logger.debug("Stopping ...")
215214
if self._init_metadata_run_task is not None and not self._init_metadata_run_task.done():
216215
self._init_metadata_run_task.cancel()
217-
if self._process_bot_state_dump_task is not None and not self._process_bot_state_dump_task.done():
218-
self._process_bot_state_dump_task.cancel()
219-
self._process_bot_state_dump_task = None
220216
signals.SignalPublisher.instance().stop()
221217
if self.evaluator_producer is not None:
222218
await self.evaluator_producer.stop()

octobot/storage/process_bot_state_dumper.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@
2121

2222
import aiofiles
2323
import octobot_commons.json_util as json_util
24+
import octobot_commons.logging as logging
2425
import octobot_trading.api as trading_api
2526

2627
import octobot.constants as octobot_app_constants
2728
import octobot_flow.entities.accounts.exchange_account_elements as exchange_account_elements_import
2829
import octobot_flow.entities.accounts.process_bot_state as process_bot_state_import
2930

3031

32+
if typing.TYPE_CHECKING:
33+
import octobot.octobot
34+
35+
3136
def _synced_exchange_account_elements_for_first_trading_exchange(
32-
octobot: typing.Any,
33-
logger: typing.Any,
37+
octobot: "octobot.octobot.OctoBot",
3438
) -> exchange_account_elements_import.ExchangeAccountElements:
3539
"""
3640
Build one snapshot for the first trading exchange only. If several are trading, log an error
@@ -51,7 +55,7 @@ def _synced_exchange_account_elements_for_first_trading_exchange(
5155
elements.name = trading_api.get_exchange_name(first_exchange_manager)
5256
elements.sync_from_exchange_manager(first_exchange_manager, [])
5357
for skipped_exchange_manager in trading_managers[1:]:
54-
logger.error(
58+
_get_logger().error(
5559
"process bot state dump includes only the first trading exchange; dumping %s (%s). "
5660
"Skipping additional trading exchange %s (%s).",
5761
trading_api.get_exchange_name(first_exchange_manager),
@@ -65,14 +69,13 @@ def _synced_exchange_account_elements_for_first_trading_exchange(
6569
async def _write_state_file_async(
6670
state_file_path: str,
6771
interval: float,
68-
bot: typing.Any,
69-
logger: typing.Any,
72+
bot: "octobot.octobot.OctoBot",
7073
) -> None:
7174
now = time.time()
7275
state = process_bot_state_import.ProcessBotState(
7376
metadata=process_bot_state_import.Metadata(updated_at=now, next_updated_at=now + interval),
7477
exchange_account_elements=_synced_exchange_account_elements_for_first_trading_exchange(
75-
bot, logger
78+
bot,
7679
),
7780
)
7881
content = state.to_dict(include_default_values=False)
@@ -95,19 +98,23 @@ async def _write_state_file_async(
9598
raise
9699

97100

98-
async def run_periodic_dump_loop(state_file_path: str, logger, bot: typing.Any) -> None:
101+
async def run_periodic_dump_loop(state_file_path: str, bot: "octobot.octobot.OctoBot") -> None:
99102
"""
100103
Periodically write ProcessBotState next to the user config. Cancel the task to stop.
101104
"""
102105
interval = octobot_app_constants.PROCESS_BOT_STATE_DUMP_INTERVAL_SECONDS
103106
while True:
104107
try:
105-
await _write_state_file_async(state_file_path, interval, bot, logger)
108+
await _write_state_file_async(state_file_path, interval, bot)
106109
except asyncio.CancelledError:
107110
raise
108111
except Exception as err: # pylint: disable=broad-except
109-
logger.exception("process bot state dump failed: %s", err)
112+
_get_logger().exception(err, True, "process bot state dump failed: %s", err)
110113
try:
111114
await asyncio.sleep(interval)
112115
except asyncio.CancelledError:
113116
break
117+
118+
119+
def _get_logger() -> logging.BotLogger:
120+
return logging.get_logger("ProcessBotStateDumper")

octobot/task_manager.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import octobot_commons.constants as commons_constants
2525

2626
import octobot.constants as constants
27+
import octobot.storage.process_bot_state_dumper as process_bot_state_dumper
2728

2829

2930
ASYNC_IGNORED_ERROR_MESSAGES = ["'message': 'Unclosed client session'"]
@@ -48,6 +49,8 @@ def __init__(self, octobot):
4849
self.executors = None
4950
self.bot_main_task = None
5051
self.loop_forever_thread = None
52+
53+
self._process_bot_state_dump_task = None
5154

5255
def init_async_loop(self):
5356
self.async_loop = asyncio.new_event_loop()
@@ -63,6 +66,12 @@ async def start_tools_tasks(self):
6366
self.ready = True
6467
self.tools_task_group = asyncio.gather(*task_list)
6568
self.create_pool_executor()
69+
if self.octobot.dump_state_path:
70+
self._process_bot_state_dump_task = asyncio.create_task(
71+
process_bot_state_dumper.run_periodic_dump_loop(
72+
self.octobot.dump_state_path, self.octobot
73+
)
74+
)
6675

6776
def run_bot_in_thread(self, coroutine):
6877
self.init_async_loop()
@@ -99,6 +108,10 @@ async def stop_timeout(timeout):
99108
if self.tools_task_group:
100109
self.tools_task_group.cancel()
101110

111+
if self._process_bot_state_dump_task is not None and not self._process_bot_state_dump_task.done():
112+
self._process_bot_state_dump_task.cancel()
113+
self._process_bot_state_dump_task = None
114+
102115
# close community session
103116
if self.octobot.community_handler:
104117
stop_coroutines.append(self.octobot.community_handler.stop_task())

packages/commons/octobot_commons/dsl_interpreter/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
ExpressionOperator,
4040
PreComputingCallOperator,
4141
ReCallableOperatorMixin,
42+
SignalableOperatorMixin,
43+
OperatorSignal,
44+
OperatorSignals,
4245
ReCallingOperatorResult,
4346
ReCallingOperatorResultKeys,
4447
ProcessBoundOperatorMixin,
@@ -75,6 +78,9 @@
7578
"ExpressionOperator",
7679
"PreComputingCallOperator",
7780
"ReCallableOperatorMixin",
81+
"SignalableOperatorMixin",
82+
"OperatorSignal",
83+
"OperatorSignals",
7884
"ProcessBoundOperatorMixin",
7985
"is_process_bound",
8086
"InterpreterDependency",

packages/commons/octobot_commons/dsl_interpreter/operators/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@
5050
ReCallingOperatorResult,
5151
ReCallingOperatorResultKeys,
5252
)
53+
from octobot_commons.dsl_interpreter.operators.signalable_operator_mixin import (
54+
OperatorSignal,
55+
OperatorSignals,
56+
SignalableOperatorMixin,
57+
)
5358
from octobot_commons.dsl_interpreter.operators.process_bound_operator_mixin import (
5459
ProcessBoundOperatorMixin,
5560
is_process_bound,
@@ -69,6 +74,9 @@
6974
"ReCallableOperatorMixin",
7075
"ReCallingOperatorResult",
7176
"ReCallingOperatorResultKeys",
77+
"SignalableOperatorMixin",
78+
"OperatorSignal",
79+
"OperatorSignals",
7280
"ProcessBoundOperatorMixin",
7381
"is_process_bound",
7482
]

packages/commons/octobot_commons/dsl_interpreter/operators/process_bound_operator_mixin.py

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
# You should have received a copy of the GNU Lesser General Public
1515
# License along with this library.
1616

17+
import asyncio
1718
import pathlib
18-
import socket
1919
import subprocess
20+
import time
2021
import typing
2122

2223
import octobot_commons.dsl_interpreter.operator as dsl_interpreter_operator
2324
import octobot_commons.errors as commons_errors
25+
import octobot_commons.logging as commons_logging
2426
import octobot_commons.process_util as process_util
2527

2628

@@ -52,6 +54,37 @@ def request_graceful_stop(
5254
)
5355
return process_util.request_graceful_stop_via_sigterm(self.pid, logger=logger)
5456

57+
async def wait_until_pid_stopped(
58+
self,
59+
pid: int,
60+
*,
61+
logger: typing.Optional[typing.Any] = None,
62+
timeout_seconds: float,
63+
poll_interval: float = 0.2,
64+
) -> None:
65+
"""Poll until ``pid`` is gone or ``timeout_seconds`` elapses (after e.g. SIGTERM)."""
66+
resolved_logger = logger or commons_logging.get_logger(self.__class__.__name__)
67+
if pid <= 0:
68+
resolved_logger.info(
69+
"wait_until_pid_stopped: pid=%s treated as already stopped (non-positive)",
70+
pid,
71+
)
72+
return
73+
resolved_logger.info(
74+
"wait_until_pid_stopped: waiting for pid=%s to exit (timeout=%ss)",
75+
pid,
76+
timeout_seconds,
77+
)
78+
deadline = time.monotonic() + timeout_seconds
79+
while time.monotonic() < deadline:
80+
if not process_util.pid_is_running(pid):
81+
resolved_logger.info("wait_until_pid_stopped: pid=%s exited", pid)
82+
return
83+
await asyncio.sleep(poll_interval)
84+
raise commons_errors.DSLInterpreterError(
85+
f"Timed out after {timeout_seconds}s waiting for pid={pid} to exit."
86+
)
87+
5588
def spawn_subprocess(
5689
self,
5790
argv: list[str],
@@ -78,42 +111,6 @@ def reject_user_path_segment(path_value: str) -> None:
78111
"Invalid path: parent directory segments are not allowed."
79112
)
80113

81-
@staticmethod
82-
def _tcp_port_is_free(bind_host: str, port: int) -> bool:
83-
"""True if nothing is currently bound to (host, port) for TCP."""
84-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
85-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
86-
try:
87-
sock.bind((bind_host, port))
88-
except OSError:
89-
return False
90-
return True
91-
92-
@staticmethod
93-
def find_first_free_listen_port_after_base(
94-
bind_host_for_probe: str,
95-
listen_port_base: int,
96-
max_offset: int = 256,
97-
blocklist: list[int] = None,
98-
) -> int:
99-
"""
100-
First offset where ``listen_port_base + offset`` is TCP-free on ``bind_host_for_probe``
101-
(optional: require ``paired_listen_port_base + offset`` free as well, same scan step).
102-
Returns ``listen_port``.
103-
"""
104-
for offset_from_base in range(max_offset):
105-
listen_port = listen_port_base + offset_from_base
106-
if blocklist and listen_port in blocklist:
107-
continue
108-
if not ProcessBoundOperatorMixin._tcp_port_is_free(
109-
bind_host_for_probe, listen_port
110-
):
111-
continue
112-
return listen_port
113-
raise commons_errors.DSLInterpreterError(
114-
"No free listen port found in the scanned range."
115-
)
116-
117114
@staticmethod
118115
def bind_address_for_env_and_probe_hosts(
119116
params: dict,

packages/commons/octobot_commons/dsl_interpreter/operators/re_callable_operator_mixin.py

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
#
1414
# You should have received a copy of the GNU Lesser General Public
1515
# License along with this library.
16-
import contextlib
17-
import contextvars
1816
import dataclasses
1917
import enum
2018
import time
@@ -73,66 +71,19 @@ def get_script_override(result: typing.Any) -> typing.Optional[str]:
7371
)
7472

7573
@staticmethod
76-
def get_keyword(result: typing.Any) -> typing.Optional[str]:
74+
def get_keyword(result: dict[str, typing.Any]) -> typing.Optional[str]:
7775
"""
7876
Returns the keyword from the re-calling operator result.
7977
"""
8078
return result[ReCallingOperatorResult.__name__]["keyword"]
8179

8280

83-
# Per logical context (e.g. asyncio Task), not process-global: concurrent or nested `interprete`
84-
# calls do not share this value. Values are operator `get_name()` strings for which
85-
# `set_execution_stop()` is active; `get_execution_stop()` checks membership for one class only.
86-
_execution_stop_operator_names: contextvars.ContextVar[frozenset[str]] = contextvars.ContextVar(
87-
"re_callable_execution_stop_operator_names",
88-
default=frozenset(),
89-
)
90-
91-
9281
class ReCallableOperatorMixin:
9382
"""
9483
Mixin for re-callable operators.
9584
"""
9685
LAST_EXECUTION_RESULT_KEY = "last_execution_result"
9786

98-
@classmethod
99-
def get_execution_stop(cls) -> bool:
100-
"""
101-
True when this operator class is the current execution_stop target
102-
(see set_execution_stop). Scopes per operator get_name() so one class
103-
does not see another's stop mode.
104-
"""
105-
return cls.get_name() in _execution_stop_operator_names.get()
106-
107-
@classmethod
108-
@contextlib.contextmanager
109-
def set_execution_stop(cls) -> typing.Iterator[None]:
110-
"""
111-
Context manager: for the duration of the block, get_execution_stop() is
112-
True for this class only (other re-callable classes remain False unless
113-
also entered in an outer or parallel context).
114-
"""
115-
previous = _execution_stop_operator_names.get()
116-
token = _execution_stop_operator_names.set(
117-
frozenset(previous | {cls.get_name()})
118-
)
119-
try:
120-
yield
121-
finally:
122-
_execution_stop_operator_names.reset(token)
123-
124-
@classmethod
125-
def should_run_execution_stop_for_result( # pylint: disable=unused-argument
126-
cls, re_calling_result: typing.Optional[dict]
127-
) -> bool:
128-
"""
129-
When draining execution_stop for automation shutdown, return whether this
130-
operator should run a stop branch for the given previous re-calling
131-
result dict (the same shape as last_execution_result on the action).
132-
Default: do nothing (subclasses e.g. run_octobot may override).
133-
"""
134-
return False
135-
13687
@classmethod
13788
def get_re_callable_parameters(cls) -> list[operator_parameter.OperatorParameter]:
13889
"""

0 commit comments

Comments
 (0)