Skip to content

Commit fe90318

Browse files
committed
[Flow] add signals pull & push
1 parent f8002e4 commit fe90318

12 files changed

Lines changed: 181 additions & 57 deletions

File tree

octobot/community/authentication.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ def decrypt_node_wallet(self, passphrase: str):
656656

657657
def verify_node_passphrase(self, passphrase: str) -> bool:
658658
return self._wallet_backend.verify_node_passphrase(passphrase)
659+
660+
@property
661+
def sync_client(self) -> sync_client.StarfishClient:
662+
if self._sync_client is None:
663+
raise errors.SyncClientNotInitializedError("Sync client is not initialized")
664+
return self._sync_client
659665

660666
def init_sync_client(self):
661667
if self._sync_client is not None:

octobot/community/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class JWTExpiredError(commons_authentication.AuthenticationError):
3232
pass
3333

3434

35+
class SyncClientNotInitializedError(commons_authentication.AuthenticationError):
36+
pass
37+
38+
3539
class BotError(commons_authentication.UnavailableError):
3640
pass
3741

octobot/community/local_authenticator.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,45 @@ def get_stateless_configuration() -> octobot_commons.configuration.Configuration
1616

1717
@contextlib.asynccontextmanager
1818
async def local_user_authenticator(
19-
email: str,
20-
hidden: bool,
19+
email: typing.Optional[str] = None,
20+
hidden: typing.Optional[bool] = None,
2121
backend_url: typing.Optional[str] = None,
2222
password: typing.Optional[str] = None,
2323
auth_key: typing.Optional[str] = None,
2424
) -> typing.AsyncGenerator["community.CommunityAuthentication", None]:
25-
if not email:
26-
raise ValueError("email is required")
2725
community.IdentifiersProvider.use_production()
2826
local_instance = None
2927
configuration = get_stateless_configuration()
28+
authenticate = password or auth_key
29+
if authenticate and not email:
30+
raise ValueError("email is required when authenticating with password or auth_key")
3031
try:
3132
local_instance = community.CommunityAuthentication(
3233
config=configuration, backend_url=backend_url, use_as_singleton=False
3334
)
3435
local_instance.supabase_client.is_admin = False
35-
local_instance.silent_auth = hidden
36+
local_instance.silent_auth = False if hidden is None else hidden
3637
if auth_key:
3738
password_value = None
3839
auth_key_value = auth_key
3940
else:
4041
password_value = password
4142
auth_key_value = None
42-
await local_instance.login(
43-
email, password_value, password_token=None, auth_key=auth_key_value, minimal=True
44-
)
45-
common_logging.get_logger("local_community_user_authenticator").info(
46-
f"Authenticated as {email[:3]}[...]{email[-4:]}"
47-
)
43+
if authenticate:
44+
email = typing.cast(str, email) # email is always str here
45+
await local_instance.login(
46+
email, password_value, password_token=None, auth_key=auth_key_value, minimal=True
47+
)
48+
auth_logger = common_logging.get_logger("local_community_user_authenticator")
49+
if len(email) > 7:
50+
auth_logger.info(f"Authenticated as {email[:3]}[...]{email[-4:]}")
51+
else:
52+
auth_logger.info("Authenticated as local community user")
4853
yield local_instance
4954
finally:
5055
if local_instance is not None:
51-
await local_instance.logout()
56+
if authenticate:
57+
await local_instance.logout()
5258
await local_instance.stop()
5359

5460

packages/flow/octobot_flow/entities/community/user_authentication.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class UserAuthentication(octobot_commons.dataclasses.FlexibleDataclass):
1111
user_id: typing.Optional[str] = None
1212
auth_key: typing.Optional[str] = None
1313
encrypted_keys_by_exchange: dict[str, str] = dataclasses.field(default_factory=dict)
14+
passphrase: typing.Optional[str] = None
1415

1516
def has_auth_details(self) -> bool:
16-
return bool(self.password or self.auth_key)
17+
return bool(self.password or self.auth_key or self.passphrase)

packages/flow/octobot_flow/jobs/automation_job.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,7 @@ async def _init_all_required_copy_trading_data(
311311
raise octobot_flow.errors.CommunityTradingSignalError(
312312
"Community authentication is required to fetch copy trading signals"
313313
)
314-
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository(
315-
maybe_community_repository.authenticator
316-
)
314+
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository(maybe_community_repository)
317315
self._logger.info(f"Fetching copy trading signals for {to_fetch_signals} strategies")
318316
trading_signals = await trading_signals_repository.fetch_trading_signals(
319317
to_fetch_signals,
@@ -387,8 +385,8 @@ async def _emit_trading_signals(
387385
fetched_dependencies.fetched_exchange_data,
388386
reference_market
389387
)
390-
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository(
391-
maybe_community_repository.authenticator
388+
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository(
389+
maybe_community_repository
392390
)
393391
await trading_signals_repository.insert_trading_signal(
394392
octobot_flow.entities.TradingSignal(

packages/flow/octobot_flow/repositories/community/authenticator_factory.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ def enable_community_authentication(self) -> bool:
2222

2323
@contextlib.asynccontextmanager
2424
async def local_authenticator(self) -> typing.AsyncGenerator[community.CommunityAuthentication, None]:
25-
if not self.auth_details.email:
26-
raise ValueError("auth_details.email is required")
2725
async with local_community_auth.local_user_authenticator(
2826
email=self.auth_details.email,
2927
hidden=self.auth_details.hidden,

packages/flow/octobot_flow/repositories/community/community_repository.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import contextlib
22
import asyncio
3+
import typing
4+
import starfish_sdk
5+
6+
import octobot_commons.logging
37

48
import octobot.community
9+
import octobot.community.errors as community_errors
510

611
import octobot_flow.entities
12+
import octobot_flow.errors
713

814

915
class CommunityRepository:
10-
def __init__(self, authenticator: octobot.community.CommunityAuthentication):
16+
def __init__(self, authenticator: octobot.community.CommunityAuthentication, passphrase: typing.Optional[str] = None):
1117
self.authenticator: octobot.community.CommunityAuthentication = authenticator
18+
self.passphrase: str | None = passphrase
19+
20+
@classmethod
21+
def from_community_repository(cls, other_repository: "CommunityRepository") -> typing.Self:
22+
return cls(other_repository.authenticator, other_repository.passphrase)
1223

1324
async def insert_bot_logs(self, log_data: list[octobot.community.BotLogData]):
1425
await asyncio.gather(
@@ -30,3 +41,15 @@ def automation_context(self, automation: octobot_flow.entities.AutomationDetails
3041
yield
3142
finally:
3243
self.authenticator.user_account.bot_id = previous_bot_id # type: ignore
44+
45+
def _get_sync_client(self) -> starfish_sdk.StarfishClient:
46+
try:
47+
return self.authenticator.sync_client
48+
except community_errors.SyncClientNotInitializedError as e:
49+
raise octobot_flow.errors.CommunityTradingSignalError(
50+
"Starfish sync client is unavailable (no sync server configured or wallet passphrase required)"
51+
) from e
52+
53+
@classmethod
54+
def _logger(cls) -> octobot_commons.logging.BotLogger:
55+
return octobot_commons.logging.get_logger(cls.__name__)
Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,86 @@
1-
import octobot.community
1+
import starfish_sdk
2+
23

34
import octobot_flow.entities
45
import octobot_flow.repositories.community.trading_signals_channel as trading_signals_channel
6+
import octobot_flow.repositories.community.community_repository as community_repository
7+
8+
9+
FETCH_SIGNALS_SEGMENT = "latest"
10+
11+
12+
def _sync_signals_path(sync_kind: str, strategy_id: str, segment: str) -> str:
13+
return f"/v1/{sync_kind}/strategies/{strategy_id}/signals/{segment}"
14+
15+
16+
def _signals_pull_and_push_paths(strategy_id: str, segment: str) -> tuple[str, str]:
17+
return (
18+
_sync_signals_path("pull", strategy_id, segment),
19+
_sync_signals_path("push", strategy_id, segment),
20+
)
521

622

7-
class TradingSignalsRepository:
8-
def __init__(self, authenticator: octobot.community.CommunityAuthentication):
9-
self.authenticator: octobot.community.CommunityAuthentication = authenticator
23+
def _signals_paths_for_upload(strategy_id: str, account_updated_at: float) -> tuple[str, str]:
24+
return _signals_pull_and_push_paths(strategy_id, str(account_updated_at))
1025

26+
27+
def _pull_path_for_fetch(strategy_id: str) -> str:
28+
return _sync_signals_path("pull", strategy_id, FETCH_SIGNALS_SEGMENT)
29+
30+
31+
def _trim_historical_snapshots_if_needed(
32+
trading_signal: octobot_flow.entities.TradingSignal,
33+
history_size: int,
34+
) -> None:
35+
account = trading_signal.account
36+
if not account.historical_snapshots or len(account.historical_snapshots) <= history_size:
37+
return
38+
account.historical_snapshots = account.historical_snapshots[:history_size]
39+
40+
41+
class TradingSignalsRepository(community_repository.CommunityRepository):
1142
async def insert_trading_signal(self, trading_signal: octobot_flow.entities.TradingSignal):
1243
await trading_signals_channel.send_internal_trading_signal(trading_signal)
44+
await self._upload_trading_signal(trading_signal)
45+
46+
async def fetch_trading_signals(
47+
self,
48+
strategy_ids: list[str],
49+
history_size: int,
50+
) -> list[octobot_flow.entities.TradingSignal]:
51+
client = self._get_sync_client()
52+
trading_signals: list[octobot_flow.entities.TradingSignal] = []
53+
for strategy_identifier in strategy_ids:
54+
try:
55+
pull_path = _pull_path_for_fetch(strategy_identifier)
56+
pull_result = await client.pull(pull_path, checkpoint=None)
57+
trading_signal = octobot_flow.entities.TradingSignal.from_dict(pull_result.data)
58+
_trim_historical_snapshots_if_needed(trading_signal, history_size)
59+
trading_signals.append(trading_signal)
60+
except Exception as strategy_error:
61+
self._logger().exception(
62+
strategy_error,
63+
True,
64+
f"Failed to fetch trading signals for strategy {strategy_identifier!r}: {strategy_error}",
65+
)
66+
return trading_signals
1367

14-
async def fetch_trading_signals(self, strategy_ids: list[str], history_size: int) -> list[octobot_flow.entities.TradingSignal]:
15-
raise NotImplementedError("TODO: fetch_trading_signals")
68+
async def _upload_trading_signal(
69+
self,
70+
trading_signal: octobot_flow.entities.TradingSignal,
71+
):
72+
pull_path, push_path = _signals_paths_for_upload(
73+
trading_signal.strategy_id,
74+
trading_signal.account.updated_at,
75+
)
76+
manager = starfish_sdk.SyncManager(
77+
client=self._get_sync_client(),
78+
pull_path=pull_path,
79+
push_path=push_path,
80+
sign_data=self.authenticator._sync_data_signer,
81+
)
82+
payload = trading_signal.to_dict()
83+
try:
84+
await manager.push(payload)
85+
except Exception as upload_error:
86+
self._logger().exception(upload_error, True, f"Failed to upload trading signal: {upload_error}")

packages/flow/tests/functionnal_tests/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def set_emit_signals_metadata(automation_state: dict, emit_signals: bool) -> Non
4343

4444

4545
@contextlib.contextmanager
46-
def trading_signal_emission_patches(emit_signals: bool):
46+
def trading_signal_emission_patches(emit_signals: bool, *, mock_authenticator: bool = True):
4747
with contextlib.ExitStack() as stack:
4848
insert_mock = stack.enter_context(
4949
mock.patch.object(
@@ -52,7 +52,7 @@ def trading_signal_emission_patches(emit_signals: bool):
5252
mock.AsyncMock(),
5353
)
5454
)
55-
if emit_signals:
55+
if emit_signals and mock_authenticator:
5656

5757
@contextlib.asynccontextmanager
5858
async def _fake_maybe_authenticator(self):

packages/flow/tests/functionnal_tests/trading_modes_actions/simulator/test_grid_trading_mode_action.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
_FIXED_BTC_USDC_CLOSE = 100000.0
4747
GRID_REFERENCE_LOWEST_BUY = _FIXED_BTC_USDC_CLOSE - (spread / 2) - increment * 2 + 12.12
4848

49+
# Placeholder passphrase for community `UserAuthentication` (not a production secret; tests mock Starfish paths).
50+
SIMULATOR_COPY_GRID_FUNCTIONAL_TEST_COMMUNITY_PASSPHRASE = "simulator-copy-grid-functional-test"
51+
4952
grid_pair_settings = [
5053
grid_trading.GridTradingMode.get_default_pair_config(
5154
"BTC/USDC",
@@ -726,6 +729,9 @@ async def test_simulator_copy_grid(
726729
reference_account until a fetched trading signal fills the DSL; otherwise the reference account
727730
is embedded in the action from the start.
728731
"""
732+
community_auth_details = octobot_flow.entities.UserAuthentication(
733+
passphrase=SIMULATOR_COPY_GRID_FUNCTIONAL_TEST_COMMUNITY_PASSPHRASE,
734+
)
729735
patched_fetch_tickers = tickers_repository_fetch_tickers_btc_usdc_close_override(
730736
lambda: _FIXED_BTC_USDC_CLOSE
731737
)
@@ -762,20 +768,9 @@ async def test_simulator_copy_grid(
762768
fetch_trading_signals_mock,
763769
)
764770
)
765-
if start_as_uninitialized_copy_keyword and not emit_signals:
766-
767-
@contextlib.asynccontextmanager
768-
async def _fake_maybe_authenticator(self):
769-
yield mock.MagicMock()
770-
771-
patch_stack.enter_context(
772-
mock.patch.object(
773-
octobot_flow.AutomationJob,
774-
"_maybe_authenticator",
775-
_fake_maybe_authenticator,
776-
)
777-
)
778-
insert_trading_signal_mock = patch_stack.enter_context(trading_signal_emission_patches(emit_signals))
771+
insert_trading_signal_mock = patch_stack.enter_context(
772+
trading_signal_emission_patches(emit_signals, mock_authenticator=False)
773+
)
779774

780775
reference_market = init_action["config"]["exchange_account_details"]["portfolio"]["unit"]
781776
if start_as_uninitialized_copy_keyword:
@@ -790,7 +785,9 @@ async def _fake_maybe_authenticator(self):
790785
set_emit_signals_metadata(automation_state, emit_signals)
791786

792787
# 1. run init action
793-
async with octobot_flow.AutomationJob(automation_state, [], [], {}) as automation_job:
788+
async with octobot_flow.AutomationJob(
789+
automation_state, [], [], community_auth_details
790+
) as automation_job:
794791
await automation_job.run()
795792
after_init_execution_dump = automation_job.dump()
796793

@@ -808,7 +805,9 @@ async def _fake_maybe_authenticator(self):
808805
assert action.previous_execution_result is None
809806

810807
# 2. run copy exchange account action (rebalance + mirror reference grid orders)
811-
async with octobot_flow.AutomationJob(after_init_execution_dump, [], [], {}) as automation_job:
808+
async with octobot_flow.AutomationJob(
809+
after_init_execution_dump, [], [], community_auth_details
810+
) as automation_job:
812811
await automation_job.run()
813812
after_initial_copy_execution_dump = automation_job.dump()
814813
assert len(automation_job.automation_state.automation.actions_dag.actions) == len(all_actions)
@@ -902,7 +901,9 @@ async def _fake_maybe_authenticator(self):
902901
assert d_order_price(sell_orders[1][price_col]) == lowest_buy_price + D_INCREMENT + D_SPREAD + D_INCREMENT
903902

904903
# 3. trigger again: portfolio and mirrored grid should be unchanged
905-
async with octobot_flow.AutomationJob(after_initial_copy_execution_dump, [], [], {}) as automation_job:
904+
async with octobot_flow.AutomationJob(
905+
after_initial_copy_execution_dump, [], [], community_auth_details
906+
) as automation_job:
906907
await automation_job.run()
907908
after_second_call_execution_dump = automation_job.dump()
908909
assert len(automation_job.automation_state.automation.actions_dag.actions) == len(all_actions)

0 commit comments

Comments
 (0)