Skip to content

Commit 4cbf440

Browse files
committed
[Flow] add signals pull & push
1 parent 202cde0 commit 4cbf440

14 files changed

Lines changed: 209 additions & 59 deletions

File tree

octobot/community/authentication.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ def decrypt_node_wallet(self, passphrase: str):
656656

657657
def verify_node_passphrase(self, passphrase: str) -> bool:
658658
return self._wallet_backend.verify_node_passphrase(passphrase)
659+
660+
@property
661+
def sync_client(self) -> sync_client.StarfishClient:
662+
if self._sync_client is None:
663+
raise errors.SyncClientNotInitializedError("Sync client is not initialized")
664+
return self._sync_client
659665

660666
def init_sync_client(self):
661667
if self._sync_client is not None:

octobot/community/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class JWTExpiredError(commons_authentication.AuthenticationError):
3232
pass
3333

3434

35+
class SyncClientNotInitializedError(commons_authentication.AuthenticationError):
36+
pass
37+
38+
3539
class BotError(commons_authentication.UnavailableError):
3640
pass
3741

octobot/community/local_authenticator.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,45 @@ def get_stateless_configuration() -> octobot_commons.configuration.Configuration
1616

1717
@contextlib.asynccontextmanager
1818
async def local_user_authenticator(
19-
email: str,
20-
hidden: bool,
19+
email: typing.Optional[str] = None,
20+
hidden: typing.Optional[bool] = None,
2121
backend_url: typing.Optional[str] = None,
2222
password: typing.Optional[str] = None,
2323
auth_key: typing.Optional[str] = None,
2424
) -> typing.AsyncGenerator["community.CommunityAuthentication", None]:
25-
if not email:
26-
raise ValueError("email is required")
2725
community.IdentifiersProvider.use_production()
2826
local_instance = None
2927
configuration = get_stateless_configuration()
28+
authenticate = password or auth_key
29+
if authenticate and not email:
30+
raise ValueError("email is required when authenticating with password or auth_key")
3031
try:
3132
local_instance = community.CommunityAuthentication(
3233
config=configuration, backend_url=backend_url, use_as_singleton=False
3334
)
3435
local_instance.supabase_client.is_admin = False
35-
local_instance.silent_auth = hidden
36+
local_instance.silent_auth = False if hidden is None else hidden
3637
if auth_key:
3738
password_value = None
3839
auth_key_value = auth_key
3940
else:
4041
password_value = password
4142
auth_key_value = None
42-
await local_instance.login(
43-
email, password_value, password_token=None, auth_key=auth_key_value, minimal=True
44-
)
45-
common_logging.get_logger("local_community_user_authenticator").info(
46-
f"Authenticated as {email[:3]}[...]{email[-4:]}"
47-
)
43+
if authenticate:
44+
email = typing.cast(str, email) # email is always str here
45+
await local_instance.login(
46+
email, password_value, password_token=None, auth_key=auth_key_value, minimal=True
47+
)
48+
auth_logger = common_logging.get_logger("local_community_user_authenticator")
49+
if len(email) > 7:
50+
auth_logger.info(f"Authenticated as {email[:3]}[...]{email[-4:]}")
51+
else:
52+
auth_logger.info("Authenticated as local community user")
4853
yield local_instance
4954
finally:
5055
if local_instance is not None:
51-
await local_instance.logout()
56+
if authenticate:
57+
await local_instance.logout()
5258
await local_instance.stop()
5359

5460

packages/flow/octobot_flow/entities/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from octobot_flow.entities.community import (
3434
UserAuthentication,
3535
TradingSignal,
36+
register_passphrase,
3637
)
3738
__all__ = [
3839
"AccountElements",
@@ -63,4 +64,5 @@
6364
"AdditionalActions",
6465
"UserAuthentication",
6566
"TradingSignal",
67+
"register_passphrase",
6668
]
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
from octobot_flow.entities.community.user_authentication import UserAuthentication
1+
from octobot_flow.entities.community.user_authentication import (
2+
UserAuthentication,
3+
register_passphrase,
4+
)
25
from octobot_flow.entities.community.trading_signal import TradingSignal
36

47
__all__ = [
58
"UserAuthentication",
69
"TradingSignal",
10+
"register_passphrase",
711
]

packages/flow/octobot_flow/entities/community/user_authentication.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
import octobot_commons.dataclasses
44

55

6+
_PASSPHRASES_BY_PUBLIC_KEY: dict[str, str] = {}
7+
8+
9+
def register_passphrase(public_key: str, passphrase: str) -> None:
10+
_PASSPHRASES_BY_PUBLIC_KEY[public_key] = passphrase
11+
12+
613
@dataclasses.dataclass
714
class UserAuthentication(octobot_commons.dataclasses.FlexibleDataclass):
815
email: typing.Optional[str] = None
@@ -11,6 +18,13 @@ class UserAuthentication(octobot_commons.dataclasses.FlexibleDataclass):
1118
user_id: typing.Optional[str] = None
1219
auth_key: typing.Optional[str] = None
1320
encrypted_keys_by_exchange: dict[str, str] = dataclasses.field(default_factory=dict)
21+
public_key: typing.Optional[str] = None
1422

1523
def has_auth_details(self) -> bool:
16-
return bool(self.password or self.auth_key)
24+
return bool(self.password or self.auth_key or self.public_key)
25+
26+
@property
27+
def passphrase(self) -> typing.Optional[str]:
28+
if self.public_key is None:
29+
return None
30+
return _PASSPHRASES_BY_PUBLIC_KEY.get(self.public_key)

packages/flow/octobot_flow/jobs/automation_job.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ async def run(self) -> list[octobot_flow.entities.AbstractActionDetails]:
6464
executed_actions = []
6565
async with self._maybe_authenticator() as maybe_authenticator:
6666
maybe_community_repository = (
67-
octobot_flow.repositories.community.CommunityRepository(maybe_authenticator)
67+
octobot_flow.repositories.community.CommunityRepository(
68+
maybe_authenticator,
69+
passphrase=self.auth_details.passphrase,
70+
)
6871
if maybe_authenticator else None
6972
)
7073
with octobot_flow.encryption.decrypted_bots_configurations(self.automation_state):
@@ -311,9 +314,7 @@ async def _init_all_required_copy_trading_data(
311314
raise octobot_flow.errors.CommunityTradingSignalError(
312315
"Community authentication is required to fetch copy trading signals"
313316
)
314-
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository(
315-
maybe_community_repository.authenticator
316-
)
317+
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository(maybe_community_repository)
317318
self._logger.info(f"Fetching copy trading signals for {to_fetch_signals} strategies")
318319
trading_signals = await trading_signals_repository.fetch_trading_signals(
319320
to_fetch_signals,
@@ -387,8 +388,8 @@ async def _emit_trading_signals(
387388
fetched_dependencies.fetched_exchange_data,
388389
reference_market
389390
)
390-
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository(
391-
maybe_community_repository.authenticator
391+
trading_signals_repository = octobot_flow.repositories.community.TradingSignalsRepository.from_community_repository(
392+
maybe_community_repository
392393
)
393394
await trading_signals_repository.insert_trading_signal(
394395
octobot_flow.entities.TradingSignal(

packages/flow/octobot_flow/repositories/community/authenticator_factory.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ def enable_community_authentication(self) -> bool:
2222

2323
@contextlib.asynccontextmanager
2424
async def local_authenticator(self) -> typing.AsyncGenerator[community.CommunityAuthentication, None]:
25-
if not self.auth_details.email:
26-
raise ValueError("auth_details.email is required")
2725
async with local_community_auth.local_user_authenticator(
2826
email=self.auth_details.email,
2927
hidden=self.auth_details.hidden,

packages/flow/octobot_flow/repositories/community/community_repository.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import contextlib
22
import asyncio
3+
import typing
4+
import starfish_sdk
5+
6+
import octobot_commons.logging
37

48
import octobot.community
9+
import octobot.community.errors as community_errors
510

611
import octobot_flow.entities
12+
import octobot_flow.errors
713

814

915
class CommunityRepository:
10-
def __init__(self, authenticator: octobot.community.CommunityAuthentication):
16+
def __init__(self, authenticator: octobot.community.CommunityAuthentication, passphrase: typing.Optional[str] = None):
1117
self.authenticator: octobot.community.CommunityAuthentication = authenticator
18+
self.passphrase: typing.Optional[str] = passphrase
19+
20+
@classmethod
21+
def from_community_repository(cls, other_repository: "CommunityRepository") -> typing.Self:
22+
return cls(other_repository.authenticator, other_repository.passphrase)
1223

1324
async def insert_bot_logs(self, log_data: list[octobot.community.BotLogData]):
1425
await asyncio.gather(
@@ -30,3 +41,15 @@ def automation_context(self, automation: octobot_flow.entities.AutomationDetails
3041
yield
3142
finally:
3243
self.authenticator.user_account.bot_id = previous_bot_id # type: ignore
44+
45+
def _get_sync_client(self) -> starfish_sdk.StarfishClient:
46+
try:
47+
return self.authenticator.sync_client
48+
except community_errors.SyncClientNotInitializedError as e:
49+
raise octobot_flow.errors.CommunityTradingSignalError(
50+
"Starfish sync client is unavailable (no sync server configured or wallet passphrase required)"
51+
) from e
52+
53+
@classmethod
54+
def _logger(cls) -> octobot_commons.logging.BotLogger:
55+
return octobot_commons.logging.get_logger(cls.__name__)
Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,86 @@
1-
import octobot.community
1+
import starfish_sdk
2+
23

34
import octobot_flow.entities
45
import octobot_flow.repositories.community.trading_signals_channel as trading_signals_channel
6+
import octobot_flow.repositories.community.community_repository as community_repository
7+
8+
9+
FETCH_SIGNALS_SEGMENT = "latest"
10+
11+
12+
def _sync_signals_path(sync_kind: str, strategy_id: str, segment: str) -> str:
13+
return f"/v1/{sync_kind}/strategies/{strategy_id}/signals/{segment}"
14+
15+
16+
def _signals_pull_and_push_paths(strategy_id: str, segment: str) -> tuple[str, str]:
17+
return (
18+
_sync_signals_path("pull", strategy_id, segment),
19+
_sync_signals_path("push", strategy_id, segment),
20+
)
521

622

7-
class TradingSignalsRepository:
8-
def __init__(self, authenticator: octobot.community.CommunityAuthentication):
9-
self.authenticator: octobot.community.CommunityAuthentication = authenticator
23+
def _signals_paths_for_upload(strategy_id: str, account_updated_at: float) -> tuple[str, str]:
24+
return _signals_pull_and_push_paths(strategy_id, str(account_updated_at))
1025

26+
27+
def _pull_path_for_fetch(strategy_id: str) -> str:
28+
return _sync_signals_path("pull", strategy_id, FETCH_SIGNALS_SEGMENT)
29+
30+
31+
def _trim_historical_snapshots_if_needed(
32+
trading_signal: octobot_flow.entities.TradingSignal,
33+
history_size: int,
34+
) -> None:
35+
account = trading_signal.account
36+
if not account.historical_snapshots or len(account.historical_snapshots) <= history_size:
37+
return
38+
account.historical_snapshots = account.historical_snapshots[:history_size]
39+
40+
41+
class TradingSignalsRepository(community_repository.CommunityRepository):
1142
async def insert_trading_signal(self, trading_signal: octobot_flow.entities.TradingSignal):
1243
await trading_signals_channel.send_internal_trading_signal(trading_signal)
44+
await self._upload_trading_signal(trading_signal)
45+
46+
async def fetch_trading_signals(
47+
self,
48+
strategy_ids: list[str],
49+
history_size: int,
50+
) -> list[octobot_flow.entities.TradingSignal]:
51+
client = self._get_sync_client()
52+
trading_signals: list[octobot_flow.entities.TradingSignal] = []
53+
for strategy_identifier in strategy_ids:
54+
try:
55+
pull_path = _pull_path_for_fetch(strategy_identifier)
56+
pull_result = await client.pull(pull_path, checkpoint=None)
57+
trading_signal = octobot_flow.entities.TradingSignal.from_dict(pull_result.data)
58+
_trim_historical_snapshots_if_needed(trading_signal, history_size)
59+
trading_signals.append(trading_signal)
60+
except Exception as strategy_error:
61+
self._logger().exception(
62+
strategy_error,
63+
True,
64+
f"Failed to fetch trading signals for strategy {strategy_identifier!r}: {strategy_error}",
65+
)
66+
return trading_signals
1367

14-
async def fetch_trading_signals(self, strategy_ids: list[str], history_size: int) -> list[octobot_flow.entities.TradingSignal]:
15-
raise NotImplementedError("TODO: fetch_trading_signals")
68+
async def _upload_trading_signal(
69+
self,
70+
trading_signal: octobot_flow.entities.TradingSignal,
71+
):
72+
pull_path, push_path = _signals_paths_for_upload(
73+
trading_signal.strategy_id,
74+
trading_signal.account.updated_at,
75+
)
76+
manager = starfish_sdk.SyncManager(
77+
client=self._get_sync_client(),
78+
pull_path=pull_path,
79+
push_path=push_path,
80+
sign_data=self.authenticator._sync_data_signer,
81+
)
82+
payload = trading_signal.to_dict()
83+
try:
84+
await manager.push(payload)
85+
except Exception as upload_error:
86+
self._logger().exception(upload_error, True, f"Failed to upload trading signal: {upload_error}")

0 commit comments

Comments
 (0)