|
1 | | -import octobot.community |
| 1 | +import starfish_sdk |
| 2 | + |
2 | 3 |
|
3 | 4 | import octobot_flow.entities |
4 | 5 | import octobot_flow.repositories.community.trading_signals_channel as trading_signals_channel |
| 6 | +import octobot_flow.repositories.community.community_repository as community_repository |
| 7 | + |
| 8 | + |
| 9 | +FETCH_SIGNALS_SEGMENT = "latest" |
| 10 | + |
| 11 | + |
| 12 | +def _sync_signals_path(sync_kind: str, strategy_id: str, segment: str) -> str: |
| 13 | + return f"/v1/{sync_kind}/strategies/{strategy_id}/signals/{segment}" |
| 14 | + |
| 15 | + |
| 16 | +def _signals_pull_and_push_paths(strategy_id: str, segment: str) -> tuple[str, str]: |
| 17 | + return ( |
| 18 | + _sync_signals_path("pull", strategy_id, segment), |
| 19 | + _sync_signals_path("push", strategy_id, segment), |
| 20 | + ) |
5 | 21 |
|
6 | 22 |
|
7 | | -class TradingSignalsRepository: |
8 | | - def __init__(self, authenticator: octobot.community.CommunityAuthentication): |
9 | | - self.authenticator: octobot.community.CommunityAuthentication = authenticator |
| 23 | +def _signals_paths_for_upload(strategy_id: str, account_updated_at: float) -> tuple[str, str]: |
| 24 | + return _signals_pull_and_push_paths(strategy_id, str(account_updated_at)) |
10 | 25 |
|
| 26 | + |
| 27 | +def _pull_path_for_fetch(strategy_id: str) -> str: |
| 28 | + return _sync_signals_path("pull", strategy_id, FETCH_SIGNALS_SEGMENT) |
| 29 | + |
| 30 | + |
| 31 | +def _trim_historical_snapshots_if_needed( |
| 32 | + trading_signal: octobot_flow.entities.TradingSignal, |
| 33 | + history_size: int, |
| 34 | +) -> None: |
| 35 | + account = trading_signal.account |
| 36 | + if not account.historical_snapshots or len(account.historical_snapshots) <= history_size: |
| 37 | + return |
| 38 | + account.historical_snapshots = account.historical_snapshots[:history_size] |
| 39 | + |
| 40 | + |
| 41 | +class TradingSignalsRepository(community_repository.CommunityRepository): |
11 | 42 | async def insert_trading_signal(self, trading_signal: octobot_flow.entities.TradingSignal): |
12 | 43 | await trading_signals_channel.send_internal_trading_signal(trading_signal) |
| 44 | + await self._upload_trading_signal(trading_signal) |
| 45 | + |
| 46 | + async def fetch_trading_signals( |
| 47 | + self, |
| 48 | + strategy_ids: list[str], |
| 49 | + history_size: int, |
| 50 | + ) -> list[octobot_flow.entities.TradingSignal]: |
| 51 | + client = self._get_sync_client() |
| 52 | + trading_signals: list[octobot_flow.entities.TradingSignal] = [] |
| 53 | + for strategy_identifier in strategy_ids: |
| 54 | + try: |
| 55 | + pull_path = _pull_path_for_fetch(strategy_identifier) |
| 56 | + pull_result = await client.pull(pull_path, checkpoint=None) |
| 57 | + trading_signal = octobot_flow.entities.TradingSignal.from_dict(pull_result.data) |
| 58 | + _trim_historical_snapshots_if_needed(trading_signal, history_size) |
| 59 | + trading_signals.append(trading_signal) |
| 60 | + except Exception as strategy_error: |
| 61 | + self._logger().exception( |
| 62 | + strategy_error, |
| 63 | + True, |
| 64 | + f"Failed to fetch trading signals for strategy {strategy_identifier!r}: {strategy_error}", |
| 65 | + ) |
| 66 | + return trading_signals |
13 | 67 |
|
14 | | - async def fetch_trading_signals(self, strategy_ids: list[str], history_size: int) -> list[octobot_flow.entities.TradingSignal]: |
15 | | - raise NotImplementedError("TODO: fetch_trading_signals") |
| 68 | + async def _upload_trading_signal( |
| 69 | + self, |
| 70 | + trading_signal: octobot_flow.entities.TradingSignal, |
| 71 | + ): |
| 72 | + pull_path, push_path = _signals_paths_for_upload( |
| 73 | + trading_signal.strategy_id, |
| 74 | + trading_signal.account.updated_at, |
| 75 | + ) |
| 76 | + manager = starfish_sdk.SyncManager( |
| 77 | + client=self._get_sync_client(), |
| 78 | + pull_path=pull_path, |
| 79 | + push_path=push_path, |
| 80 | + sign_data=self.authenticator._sync_data_signer, |
| 81 | + ) |
| 82 | + payload = trading_signal.to_dict() |
| 83 | + try: |
| 84 | + await manager.push(payload) |
| 85 | + except Exception as upload_error: |
| 86 | + self._logger().exception(upload_error, True, f"Failed to upload trading signal: {upload_error}") |
0 commit comments