diff --git a/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py b/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py index 6c400629..ec0089a9 100644 --- a/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py +++ b/bots/controllers/generic/lp_rebalancer/lp_rebalancer.py @@ -2,54 +2,75 @@ from decimal import Decimal from typing import List, Optional -from hummingbot.core.data_type.common import MarketDict +from pydantic import Field, field_validator, model_validator + +from hummingbot.core.data_type.common import MarketDict, TradeType from hummingbot.core.utils.async_utils import safe_ensure_future from hummingbot.data_feed.candles_feed.data_types import CandlesConfig from hummingbot.logger import HummingbotLogger from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase from hummingbot.strategy_v2.executors.data_types import ConnectorPair -from hummingbot.strategy_v2.executors.lp_executor.data_types import LPExecutorConfig, LPExecutorStates -from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.executors.gateway_utils import parse_provider +from hummingbot.strategy_v2.executors.lp_executor.data_types import LPExecutorConfig +from hummingbot.strategy_v2.executors.order_executor.data_types import ExecutionStrategy, OrderExecutorConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction +from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo -from pydantic import Field, field_validator, model_validator class LPRebalancerConfig(ControllerConfigBase): """ Configuration for LP Rebalancer Controller. - Uses total_amount_quote and side for position sizing. - Implements KEEP vs REBALANCE logic based on price limits. + This controller uses LP executor's upper_limit_price/lower_limit_price feature + to automatically close positions when price exceeds thresholds, eliminating + manual rebalancing monitoring. + + Key features: + - No rebalance_seconds timer - uses rebalance_threshold_pct to set executor limit prices + - LP executor auto-closes when price exceeds limits + - Controller just monitors for completion and re-opens if within price bounds + - Uses keep_position=True - controller handles position tracking via position_hold + + Provider Architecture: + - connector_name: The network identifier (e.g., "solana-mainnet-beta") + - lp_provider: LP provider in format "dex/trading_type" (e.g., "meteora/clmm") + - autoswap uses network's configured swapProvider (via Gateway) """ controller_type: str = "generic" controller_name: str = "lp_rebalancer" candles_config: List[CandlesConfig] = [] + # Network connector - e.g., "solana-mainnet-beta" + connector_name: str = "solana-mainnet-beta" + + # LP provider (required) - format: "dex/trading_type" + # Examples: "meteora/clmm", "orca/clmm", "raydium/clmm" + lp_provider: str = "orca/clmm" + # Pool configuration (required) - connector_name: str = "meteora/clmm" - network: str = "solana-mainnet-beta" - trading_pair: str = "" - pool_address: str = "" + trading_pair: str + pool_address: str # Position parameters total_amount_quote: Decimal = Field(default=Decimal("50"), json_schema_extra={"is_updatable": True}) - side: int = Field(default=1, json_schema_extra={"is_updatable": True}) # 0=BOTH, 1=BUY, 2=SELL + side: TradeType = Field(default=TradeType.BUY, json_schema_extra={"is_updatable": True}) # BUY, SELL, or RANGE position_width_pct: Decimal = Field(default=Decimal("0.5"), json_schema_extra={"is_updatable": True}) position_offset_pct: Decimal = Field( default=Decimal("0.01"), json_schema_extra={"is_updatable": True}, - description="Offset from current price to ensure single-sided positions start out-of-range (e.g., 0.1 = 0.1%)" + description="Offset from current price. Positive = out-of-range (single-sided). Negative = in-range (needs both tokens, autoswap will convert |offset|%)" ) - # Rebalancing - rebalance_seconds: int = Field(default=60, json_schema_extra={"is_updatable": True}) + # Rebalance threshold - used to set LP executor's limit prices + # When price moves this % beyond position bounds, executor auto-closes rebalance_threshold_pct: Decimal = Field( - default=Decimal("0.1"), + default=Decimal("1"), json_schema_extra={"is_updatable": True}, - description="Price must be this % out of range before rebalance timer starts (e.g., 0.1 = 0.1%, 2 = 2%)" + description="Price threshold % beyond position bounds that triggers auto-close (e.g., 1 = 1%)" ) - # Price limits - overlapping grids for sell and buy ranges + # Price limits - controller-level limits for deciding whether to re-open # Sell range: [sell_price_min, sell_price_max] # Buy range: [buy_price_min, buy_price_max] sell_price_max: Optional[Decimal] = Field(default=None, json_schema_extra={"is_updatable": True}) @@ -60,6 +81,18 @@ class LPRebalancerConfig(ControllerConfigBase): # Connector-specific params (optional) strategy_type: Optional[int] = Field(default=None, json_schema_extra={"is_updatable": True}) + # Auto-swap feature: swap tokens if balance insufficient for position + autoswap: bool = Field( + default=False, + json_schema_extra={"is_updatable": True}, + description="Automatically swap tokens if balance is insufficient for position. Uses network's swapProvider." + ) + swap_buffer_pct: Decimal = Field( + default=Decimal("0.01"), + json_schema_extra={"is_updatable": True}, + description="Extra % to swap beyond deficit to account for slippage (e.g., 0.01 = 0.01%)" + ) + @field_validator("sell_price_min", "sell_price_max", "buy_price_min", "buy_price_max", mode="before") @classmethod def validate_price_limits(cls, v): @@ -71,11 +104,27 @@ def validate_price_limits(cls, v): @field_validator("side", mode="before") @classmethod def validate_side(cls, v): - """Validate side is 0, 1, or 2.""" - v = int(v) - if v not in (0, 1, 2): - raise ValueError("side must be 0 (BOTH), 1 (BUY), or 2 (SELL)") - return v + """Validate and convert side to TradeType enum.""" + if isinstance(v, TradeType): + return v + if isinstance(v, str): + v = v.upper() + if v in ("BUY", "1"): + return TradeType.BUY + elif v in ("SELL", "2"): + return TradeType.SELL + elif v in ("RANGE", "3"): + return TradeType.RANGE + raise ValueError(f"Invalid side '{v}'. Must be BUY, SELL, or RANGE") + if isinstance(v, int): + if v == 1: + return TradeType.BUY + elif v == 2: + return TradeType.SELL + elif v == 3: + return TradeType.RANGE + raise ValueError(f"Invalid side {v}. Must be 1 (BUY), 2 (SELL), or 3 (RANGE)") + raise ValueError(f"Invalid side type {type(v)}. Must be TradeType, str, or int") @model_validator(mode="after") def validate_price_limit_ranges(self): @@ -86,22 +135,30 @@ def validate_price_limit_ranges(self): if self.sell_price_max is not None and self.sell_price_min is not None: if self.sell_price_max < self.sell_price_min: raise ValueError("sell_price_max must be >= sell_price_min") + # For negative offset (in-range), offset magnitude must not exceed width + if self.position_offset_pct < 0: + if abs(self.position_offset_pct) > self.position_width_pct: + raise ValueError( + f"For in-range positions, |position_offset_pct| ({abs(self.position_offset_pct)}) " + f"must not exceed position_width_pct ({self.position_width_pct})" + ) return self def update_markets(self, markets: MarketDict) -> MarketDict: """Register the LP connector with trading pair""" - return markets.add_or_update(self.connector_name, self.trading_pair) + markets = markets.add_or_update(self.connector_name, self.trading_pair) + return markets class LPRebalancer(ControllerBase): """ - Controller for LP position management with smart rebalancing. + Controller for LP position management using executor-level auto-close. Key features: - - Uses total_amount_quote for all positions (initial and rebalance) - - Derives rebalance side from price vs last executor's range - - KEEP position when already at limit, REBALANCE when not - - Validates bounds before creating positions + - Uses LP executor's upper_limit_price/lower_limit_price for auto-closing + - No manual rebalancing timer - executor handles position close + - Controller monitors for completion and re-opens within price limits + - Uses keep_position=True for position tracking via position_hold """ _logger: Optional[HummingbotLogger] = None @@ -116,34 +173,47 @@ def __init__(self, config: LPRebalancerConfig, *args, **kwargs): super().__init__(config, *args, **kwargs) self.config: LPRebalancerConfig = config + # Parse lp_provider into dex_name and trading_type for gateway calls + self.lp_dex_name, self.lp_trading_type = parse_provider( + config.lp_provider, default_trading_type="clmm" + ) + # Parse token symbols from trading pair parts = config.trading_pair.split("-") self._base_token: str = parts[0] if len(parts) >= 2 else "" self._quote_token: str = parts[1] if len(parts) >= 2 else "" - # Rebalance tracking - self._pending_rebalance: bool = False - self._pending_rebalance_side: Optional[int] = None # Side for pending rebalance - # Track the executor we created self._current_executor_id: Optional[str] = None - # Track amounts from last closed position (for rebalance sizing) + # Track amounts from last closed position (for autoswap sizing) self._last_closed_base_amount: Optional[Decimal] = None self._last_closed_quote_amount: Optional[Decimal] = None self._last_closed_base_fee: Optional[Decimal] = None self._last_closed_quote_fee: Optional[Decimal] = None - # Track initial balances for comparison + # Track initial balances for comparison (wallet balance at controller start) self._initial_base_balance: Optional[Decimal] = None self._initial_quote_balance: Optional[Decimal] = None + # Position hold: cumulative net position from closed LP executors + # Tracks net change = (returned + fees) - initial_deposited + self._position_hold_base: Decimal = Decimal("0") + self._position_hold_quote: Decimal = Decimal("0") + # Flag to trigger balance update after position creation self._pending_balance_update: bool = False # Cached pool price (updated in update_processed_data) self._pool_price: Optional[Decimal] = None + # Order executor tracking (for autoswap feature) + self._swap_executor_id: Optional[str] = None + self._pending_swap_side: Optional[int] = None # LP side to create after swap completes + + # Track if initial position has been created (after that, always use side 1 or 2) + self._initial_position_created: bool = False + # Initialize rate sources self.market_data_provider.initialize_rate_sources([ ConnectorPair( @@ -153,8 +223,9 @@ def __init__(self, config: LPRebalancerConfig, *args, **kwargs): ]) def active_executor(self) -> Optional[ExecutorInfo]: - """Get current active executor (should be 0 or 1)""" - active = [e for e in self.executors_info if e.is_active] + """Get current active LP executor (should be 0 or 1)""" + active = [e for e in self.executors_info + if e.is_active and getattr(e.config, "type", None) == "lp_executor"] return active[0] if active else None def get_tracked_executor(self) -> Optional[ExecutorInfo]: @@ -176,6 +247,140 @@ def is_tracked_executor_terminated(self) -> bool: return True return executor.status == RunnableStatus.TERMINATED + def get_swap_executor(self) -> Optional[ExecutorInfo]: + """Get the order executor we're tracking for autoswap""" + if not self._swap_executor_id: + return None + for e in self.executors_info: + if e.id == self._swap_executor_id: + return e + return None + + def is_swap_executor_done(self) -> bool: + """Check if order executor has completed (success or failure)""" + if not self._swap_executor_id: + return True + swap_executor = self.get_swap_executor() + if swap_executor is None: + return True + return swap_executor.is_done + + def _check_autoswap_needed(self, side: int, current_price: Decimal) -> Optional[OrderExecutorConfig]: + """ + Check if autoswap is needed and return order config if so. + + Returns OrderExecutorConfig if swap is needed, None otherwise. + Uses network's configured swapProvider via Gateway connector. + """ + if not self.config.autoswap: + return None + + # Capture closed position amounts BEFORE creating LP position + closed_base = self._last_closed_base_amount or Decimal("0") + closed_quote = self._last_closed_quote_amount or Decimal("0") + closed_base_fee = self._last_closed_base_fee or Decimal("0") + closed_quote_fee = self._last_closed_quote_fee or Decimal("0") + + # Calculate required amounts + base_amt, quote_amt = self._calculate_amounts(side, current_price) + + # Get current wallet balances + try: + base_balance = self.market_data_provider.get_balance( + self.config.connector_name, self._base_token + ) + quote_balance = self.market_data_provider.get_balance( + self.config.connector_name, self._quote_token + ) + except Exception as e: + self.logger().warning(f"Could not fetch balances for autoswap check: {e}") + return None + + # For rebalances, add closed position amounts to available balance + if closed_base > 0 or closed_quote > 0: + base_balance += closed_base + closed_base_fee + quote_balance += closed_quote + closed_quote_fee + self.logger().info( + f"Autoswap: including closed position amounts in balance: " + f"+{closed_base + closed_base_fee:.6f} {self._base_token}, " + f"+{closed_quote + closed_quote_fee:.6f} {self._quote_token}" + ) + + # Calculate deficit from raw amounts + base_deficit = base_amt - base_balance + quote_deficit = quote_amt - quote_balance + + # Add 0.1 SOL buffer for rent and transaction fees when SOL is involved + sol_buffer = Decimal("0.1") + if self._base_token.upper() == "SOL": + base_deficit += sol_buffer + if self._quote_token.upper() == "SOL": + quote_deficit += sol_buffer + + self.logger().info( + f"Autoswap check: need base={base_amt:.6f}, have={base_balance:.6f}, deficit={base_deficit:.6f} | " + f"need quote={quote_amt:.6f}, have={quote_balance:.6f}, deficit={quote_deficit:.6f}" + ) + + # Buffer multiplier only applied to swap amount + buffer_multiplier = Decimal("1") + (self.config.swap_buffer_pct / Decimal("100")) + + # If any deficit, swap + if base_deficit > 0 and quote_deficit <= 0: + # Need more base, have enough quote - BUY base with quote + swap_amount = base_deficit * buffer_multiplier + required_quote = swap_amount * current_price * Decimal("1.02") + if quote_balance >= required_quote: + self.logger().info( + f"Autoswap: BUY {swap_amount:.6f} {self._base_token} " + f"(deficit={base_deficit:.6f} + {self.config.swap_buffer_pct}% buffer)" + ) + return OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.BUY, + amount=swap_amount, + execution_strategy=ExecutionStrategy.MARKET, + ) + else: + self.logger().warning( + f"Autoswap: insufficient quote ({quote_balance:.6f}) to buy {swap_amount:.6f} base" + ) + return None + + elif quote_deficit > 0 and base_deficit <= 0: + # Need more quote, have enough base - SELL base for quote + swap_amount = (quote_deficit / current_price) * buffer_multiplier + if base_balance >= swap_amount * Decimal("1.02"): + self.logger().info( + f"Autoswap: SELL {swap_amount:.6f} {self._base_token} for ~{quote_deficit:.6f} {self._quote_token}" + ) + return OrderExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + side=TradeType.SELL, + amount=swap_amount, + execution_strategy=ExecutionStrategy.MARKET, + ) + else: + self.logger().warning( + f"Autoswap: insufficient base ({base_balance:.6f}) to sell for {quote_deficit:.6f} quote" + ) + return None + + elif base_deficit > 0 and quote_deficit > 0: + total_deficit_quote = base_deficit * current_price + quote_deficit + self.logger().warning( + f"Autoswap: cannot swap - both tokens in deficit (side=RANGE). " + f"Total deficit: {total_deficit_quote:.2f} {self._quote_token}" + ) + return None + + # No swap needed + return None + def _trigger_balance_update(self): """Trigger a balance update on the connector after position changes.""" try: @@ -187,7 +392,14 @@ def _trigger_balance_update(self): self.logger().debug(f"Could not trigger balance update: {e}") def determine_executor_actions(self) -> List[ExecutorAction]: - """Decide whether to create/stop executors""" + """ + Decide whether to create executors. + + Simplified logic: + - No active executor: check if we should create one (price within limits) + - Active executor: just wait for it to auto-close via limit prices + - No manual OUT_OF_RANGE monitoring or timer logic needed + """ # Capture initial balances on first run if self._initial_base_balance is None: try: @@ -201,6 +413,80 @@ def determine_executor_actions(self) -> List[ExecutorAction]: self.logger().debug(f"Could not capture initial balances: {e}") actions = [] + + # Handle order executor tracking and completion (for autoswap) + if self._pending_swap_side is not None: + if not self._swap_executor_id: + for e in self.executors_info: + if e.config.type == "order_executor" and e.is_active: + self._swap_executor_id = e.id + self.logger().info(f"Tracking order executor for swap: {e.id}") + break + + if not self._swap_executor_id: + self.logger().debug("Waiting for order executor to appear in executors_info") + return actions + + if self._swap_executor_id: + if not self.is_swap_executor_done(): + swap_executor = self.get_swap_executor() + self.logger().debug("Waiting for order executor to complete swap") + return actions + + # Order executor completed + swap_executor = self.get_swap_executor() + pending_side = self._pending_swap_side + + # Clear swap tracking + self._swap_executor_id = None + self._pending_swap_side = None + + # Check if swap succeeded (not FAILED close type) + swap_succeeded = swap_executor and swap_executor.close_type != CloseType.FAILED + if swap_succeeded: + self.logger().info("Autoswap completed successfully, proceeding to LP position") + self._trigger_balance_update() + + # Update position_hold with swap's inventory change + if swap_executor: + custom = swap_executor.custom_info + swap_side = custom.get("side") # TradeType enum or string + swap_side_str = swap_side.name if hasattr(swap_side, 'name') else str(swap_side) + executed_amount = Decimal(str(custom.get("executed_amount_base", 0))) + executed_price = Decimal(str(custom.get("average_executed_price", 0))) + quote_amount = executed_amount * executed_price + + if swap_side_str == "BUY": + # BUY swap: gained base, spent quote + self._position_hold_base += executed_amount + self._position_hold_quote -= quote_amount + else: + # SELL swap: spent base, gained quote + self._position_hold_base -= executed_amount + self._position_hold_quote += quote_amount + + self.logger().info( + f"Swap {swap_side_str} {executed_amount:.6f} {self._base_token} @ {executed_price:.4f}. " + f"Position hold: base={self._position_hold_base:+.6f}, quote={self._position_hold_quote:+.6f}" + ) + + if pending_side is not None: + executor_config = self._create_executor_config(pending_side) + if executor_config: + actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=executor_config + )) + self._initial_position_created = True + self._pending_balance_update = True + else: + close_type = swap_executor.close_type if swap_executor else "unknown" + self.logger().error( + f"Autoswap FAILED (close_type: {close_type}). Will retry on next cycle." + ) + + return actions + executor = self.active_executor() # Track the active executor's ID if we don't have one yet @@ -218,31 +504,106 @@ def determine_executor_actions(self) -> List[ExecutorAction]: ) return actions - # Previous executor terminated - capture final amounts for rebalance sizing + # Previous executor terminated - capture final amounts and update position_hold terminated_executor = self.get_tracked_executor() - if terminated_executor and self._pending_rebalance: - self._last_closed_base_amount = Decimal(str(terminated_executor.custom_info.get("base_amount", 0))) - self._last_closed_quote_amount = Decimal(str(terminated_executor.custom_info.get("quote_amount", 0))) - self._last_closed_base_fee = Decimal(str(terminated_executor.custom_info.get("base_fee", 0))) - self._last_closed_quote_fee = Decimal(str(terminated_executor.custom_info.get("quote_fee", 0))) - self.logger().info( - f"Captured closed position amounts: base={self._last_closed_base_amount}, " - f"quote={self._last_closed_quote_amount}, base_fee={self._last_closed_base_fee}, " - f"quote_fee={self._last_closed_quote_fee}" - ) + if terminated_executor: + # Skip position_hold update if executor failed (no tokens were actually deposited/returned) + if terminated_executor.close_type == CloseType.FAILED: + self.logger().warning( + f"Executor {terminated_executor.id} FAILED - skipping position_hold update" + ) + else: + self._last_closed_base_amount = Decimal(str(terminated_executor.custom_info.get("base_amount", 0))) + self._last_closed_quote_amount = Decimal(str(terminated_executor.custom_info.get("quote_amount", 0))) + self._last_closed_base_fee = Decimal(str(terminated_executor.custom_info.get("base_fee", 0))) + self._last_closed_quote_fee = Decimal(str(terminated_executor.custom_info.get("quote_fee", 0))) + + # Get initial amounts deposited + initial_base = Decimal(str(terminated_executor.custom_info.get("initial_base_amount", 0))) + initial_quote = Decimal(str(terminated_executor.custom_info.get("initial_quote_amount", 0))) + + # Update position_hold with NET change from this executor + # Net = (returned + fees) - initial_deposited + base_net = (self._last_closed_base_amount + self._last_closed_base_fee) - initial_base + quote_net = (self._last_closed_quote_amount + self._last_closed_quote_fee) - initial_quote + self._position_hold_base += base_net + self._position_hold_quote += quote_net + + self.logger().info( + f"Executor completed. Initial: base={initial_base}, quote={initial_quote}. " + f"Returned: base={self._last_closed_base_amount}+{self._last_closed_base_fee}, " + f"quote={self._last_closed_quote_amount}+{self._last_closed_quote_fee}. " + f"Net change: base={base_net:+}, quote={quote_net:+}. " + f"Position hold total: base={self._position_hold_base}, quote={self._position_hold_quote}" + ) + + # Check if executor FAILED - retry with same side from executor's config + executor_failed = terminated_executor and terminated_executor.close_type == CloseType.FAILED + failed_executor_side = None + if executor_failed: + failed_executor_side = terminated_executor.custom_info.get("side") + + # Capture closed position bounds for side determination (only for successful closes) + closed_lower_price = None + closed_upper_price = None + if terminated_executor and not executor_failed: + closed_lower_price = Decimal(str(terminated_executor.custom_info.get("lower_price", 0))) + closed_upper_price = Decimal(str(terminated_executor.custom_info.get("upper_price", 0))) # Clear tracking self._current_executor_id = None # Determine side for new position - if self._pending_rebalance and self._pending_rebalance_side is not None: - side = self._pending_rebalance_side - self._pending_rebalance = False - self._pending_rebalance_side = None - else: + if executor_failed and failed_executor_side is not None: + # Retry with same side on failure + side = failed_executor_side + self.logger().info(f"Retrying with same side={side} after executor failure") + elif not self._initial_position_created: + # Initial position: use configured side side = self.config.side + elif closed_lower_price and closed_upper_price and self._pool_price: + # After position close: determine side from price direction relative to closed bounds + # If price >= upper_price: price went UP → BUY (use USDC we got) + # If price < lower_price: price went DOWN → SELL (use SOL we got) + if self._pool_price >= closed_upper_price: + side = TradeType.BUY # price above range + self.logger().info(f"Price {self._pool_price} >= upper {closed_upper_price} → side=BUY") + elif self._pool_price < closed_lower_price: + side = TradeType.SELL # price below range + self.logger().info(f"Price {self._pool_price} < lower {closed_lower_price} → side=SELL") + else: + # Price is within old bounds (shouldn't happen with limit-price auto-close) + side = self._determine_side_from_price(self._pool_price) + self.logger().info(f"Price {self._pool_price} in range [{closed_lower_price}, {closed_upper_price}] → side={side} from limits") + else: + # Fallback to price limits + if not self._pool_price: + self.logger().info("Waiting for pool price to determine side") + return actions + side = self._determine_side_from_price(self._pool_price) + + # Check if price is within limits before creating position + if self._pool_price and not self._is_price_within_limits(self._pool_price, side): + self.logger().debug(f"Price {self._pool_price} outside limits for side={side}, waiting") + return actions - # Create executor config with calculated bounds + # Check if autoswap is needed before creating LP position + if self.config.autoswap: + if not self._pool_price: + self.logger().info("Autoswap: waiting for pool price") + return actions + swap_config = self._check_autoswap_needed(side, self._pool_price) + if swap_config: + self._pending_swap_side = side + actions.append(CreateExecutorAction( + controller_id=self.config.id, + executor_config=swap_config + )) + return actions + else: + self.logger().info("Autoswap: no swap needed, balances sufficient") + + # Create executor config with limit prices executor_config = self._create_executor_config(side) if executor_config is None: self.logger().warning("Skipping position creation - invalid bounds") @@ -252,158 +613,77 @@ def determine_executor_actions(self) -> List[ExecutorAction]: controller_id=self.config.id, executor_config=executor_config )) + self._initial_position_created = True self._pending_balance_update = True + + # Clear closed position amounts after LP position is created + self._last_closed_base_amount = None + self._last_closed_quote_amount = None + self._last_closed_base_fee = None + self._last_closed_quote_fee = None + return actions - # Trigger balance update after position is created + # Active executor exists - trigger balance update when position becomes active if self._pending_balance_update: state = executor.custom_info.get("state") if state in ("IN_RANGE", "OUT_OF_RANGE"): self._pending_balance_update = False self._trigger_balance_update() - # Check executor state - state = executor.custom_info.get("state") - - # Don't take action while executor is in transition states - if state in [LPExecutorStates.OPENING.value, LPExecutorStates.CLOSING.value]: - return actions - - # Check for rebalancing when out of range - if state == LPExecutorStates.OUT_OF_RANGE.value: - # Check if price is beyond threshold before considering timer - if self._is_beyond_rebalance_threshold(executor): - out_of_range_seconds = executor.custom_info.get("out_of_range_seconds") - if out_of_range_seconds is not None and out_of_range_seconds >= self.config.rebalance_seconds: - rebalance_action = self._handle_rebalance(executor) - if rebalance_action: - actions.append(rebalance_action) - + # No action needed - executor will auto-close via limit prices return actions - def _handle_rebalance(self, executor: ExecutorInfo) -> Optional[StopExecutorAction]: - """ - Handle rebalancing logic. - - Returns StopExecutorAction if rebalance needed, None if KEEP. - """ - current_price = executor.custom_info.get("current_price") - lower_price = executor.custom_info.get("lower_price") - upper_price = executor.custom_info.get("upper_price") - - if current_price is None or lower_price is None or upper_price is None: - return None - - current_price = Decimal(str(current_price)) - lower_price = Decimal(str(lower_price)) - upper_price = Decimal(str(upper_price)) - - # Step 1: Determine side from price direction (using [lower, upper) convention) - if current_price >= upper_price: - new_side = 1 # BUY - price at or above range - elif current_price < lower_price: - new_side = 2 # SELL - price below range - else: - # Price is in range, shouldn't happen in OUT_OF_RANGE state - self.logger().warning(f"Price {current_price} appears in range [{lower_price}, {upper_price})") - return None - - # Step 2: Check if new position would be valid (price within limits) - if not self._is_price_within_limits(current_price, new_side): - # Don't log repeatedly - this is checked every tick - return None - - # Step 4: Initiate rebalance - self._pending_rebalance = True - self._pending_rebalance_side = new_side - self.logger().info( - f"REBALANCE initiated (side={new_side}, price={current_price}, " - f"old_bounds=[{lower_price}, {upper_price}])" - ) - - return StopExecutorAction( - controller_id=self.config.id, - executor_id=executor.id, - keep_position=False, - ) - - def _is_beyond_rebalance_threshold(self, executor: ExecutorInfo) -> bool: + def _create_executor_config(self, side: TradeType) -> Optional[LPExecutorConfig]: """ - Check if price is beyond the rebalance threshold. + Create executor config with limit prices for auto-close. - Price must be this % out of range before rebalance timer is considered. + Sets upper_limit_price and lower_limit_price based on rebalance_threshold_pct. """ - current_price = executor.custom_info.get("current_price") - lower_price = executor.custom_info.get("lower_price") - upper_price = executor.custom_info.get("upper_price") - - if current_price is None or lower_price is None or upper_price is None: - return False - - threshold = self.config.rebalance_threshold_pct / Decimal("100") - - # Check if price is beyond threshold above upper or below lower - if current_price > upper_price: - deviation_pct = (current_price - upper_price) / upper_price - return deviation_pct >= threshold - elif current_price < lower_price: - deviation_pct = (lower_price - current_price) / lower_price - return deviation_pct >= threshold - - return False # Price is in range - - def _create_executor_config(self, side: int) -> Optional[LPExecutorConfig]: - """ - Create executor config for the given side. - - Returns None if bounds are invalid. - """ - # Use pool price (fetched in update_processed_data every tick) current_price = self._pool_price if current_price is None or current_price == 0: self.logger().warning("No pool price available - waiting for update_processed_data") return None - # Calculate amounts based on side - base_amt, quote_amt = self._calculate_amounts(side, current_price) - - # Calculate bounds + # Calculate position bounds for requested side lower_price, upper_price = self._calculate_price_bounds(side, current_price) - # Validate bounds + # Check bounds against price limits - clamp if one exceeds, try opposite if both exceed + lower_price, upper_price, side = self._validate_and_clamp_bounds( + lower_price, upper_price, side, current_price + ) + if lower_price is None: + return None + + # Validate bounds after clamping if lower_price >= upper_price: - self.logger().warning(f"Invalid bounds [{lower_price}, {upper_price}] - skipping position") + self.logger().warning(f"Invalid bounds [{lower_price}, {upper_price}] - skipping") return None + # Calculate amounts based on final side + base_amt, quote_amt = self._calculate_amounts(side, current_price) + + # Calculate limit prices for auto-close + threshold = self.config.rebalance_threshold_pct / Decimal("100") + upper_limit_price = upper_price * (Decimal("1") + threshold) + lower_limit_price = lower_price * (Decimal("1") - threshold) + # Build extra params (connector-specific) extra_params = {} if self.config.strategy_type is not None: extra_params["strategyType"] = self.config.strategy_type - # Check if bounds were clamped by price limits - clamped = [] - if side == 1: # BUY - if self.config.buy_price_max and upper_price == self.config.buy_price_max: - clamped.append(f"upper=buy_price_max({self.config.buy_price_max})") - if self.config.buy_price_min and lower_price == self.config.buy_price_min: - clamped.append(f"lower=buy_price_min({self.config.buy_price_min})") - elif side == 2: # SELL - if self.config.sell_price_min and lower_price == self.config.sell_price_min: - clamped.append(f"lower=sell_price_min({self.config.sell_price_min})") - if self.config.sell_price_max and upper_price == self.config.sell_price_max: - clamped.append(f"upper=sell_price_max({self.config.sell_price_max})") - - clamped_info = f", clamped: {', '.join(clamped)}" if clamped else "" - offset_pct = self.config.position_offset_pct self.logger().info( - f"Creating position: side={side}, pool_price={current_price:.2f}, " - f"bounds=[{lower_price:.4f}, {upper_price:.4f}], offset_pct={offset_pct}, " - f"base={base_amt:.4f}, quote={quote_amt:.4f}{clamped_info}" + f"Creating position: side={side.name}, pool_price={current_price:.6f}, " + f"bounds=[{lower_price:.6f}, {upper_price:.6f}], " + f"limits=[{lower_limit_price:.6f}, {upper_limit_price:.6f}], " + f"base={base_amt:.6f}, quote={quote_amt:.6f}" ) return LPExecutorConfig( timestamp=self.market_data_provider.time(), connector_name=self.config.connector_name, + lp_provider=self.config.lp_provider, trading_pair=self.config.trading_pair, pool_address=self.config.pool_address, lower_price=lower_price, @@ -411,167 +691,225 @@ def _create_executor_config(self, side: int) -> Optional[LPExecutorConfig]: base_amount=base_amt, quote_amount=quote_amt, side=side, - position_offset_pct=self.config.position_offset_pct, extra_params=extra_params if extra_params else None, - keep_position=False, + # Key difference: set limit prices for auto-close + upper_limit_price=upper_limit_price, + lower_limit_price=lower_limit_price, + # Use keep_position=True - controller handles position tracking + keep_position=True, ) - def _calculate_amounts(self, side: int, current_price: Decimal) -> tuple: + def _calculate_amounts(self, side: TradeType, current_price: Decimal) -> tuple: """ - Calculate base and quote amounts based on side and total_amount_quote. - - For rebalances, clamps to the actual amounts returned from the closed position - to avoid order failures when balance is less than configured total (due to - impermanent loss, fees, or price movement). - - Side 0 (BOTH): split 50/50 - Side 1 (BUY): all quote - clamp to closed position's quote + quote_fee - Side 2 (SELL): all base - clamp to closed position's base + base_fee + Calculate base and quote amounts based on side, offset, and total_amount_quote. """ total = self.config.total_amount_quote + offset = self.config.position_offset_pct - # For rebalances, clamp to actual amounts from closed position - # Check if we have captured amounts (indicates this is a rebalance) - has_closed_amounts = (self._last_closed_base_amount is not None or - self._last_closed_quote_amount is not None) - if has_closed_amounts: - if side == 1: # BUY - needs quote token - if self._last_closed_quote_amount is not None: - # Total available = position amount + fees earned - available_quote = self._last_closed_quote_amount - if self._last_closed_quote_fee: - available_quote += self._last_closed_quote_fee - if available_quote < total: - self.logger().info( - f"Clamping quote amount from {total} to {available_quote} {self._quote_token} " - f"(closed position returned {self._last_closed_quote_amount} + {self._last_closed_quote_fee} fees)" - ) - total = available_quote - elif side == 2: # SELL - needs base token - if self._last_closed_base_amount is not None: - # Total available = position amount + fees earned - available_base = self._last_closed_base_amount - if self._last_closed_base_fee: - available_base += self._last_closed_base_fee - available_as_quote = available_base * current_price - if available_as_quote < total: - self.logger().info( - f"Clamping total from {total} to {available_as_quote:.4f} " - f"{self._quote_token} (closed: {self._last_closed_base_amount} + " - f"{self._last_closed_base_fee} fees {self._base_token})" - ) - total = available_as_quote - - # Clear the cached amounts after use - self._last_closed_base_amount = None - self._last_closed_quote_amount = None - self._last_closed_base_fee = None - self._last_closed_quote_fee = None - - if side == 0: # BOTH + if side == TradeType.RANGE: quote_amt = total / Decimal("2") base_amt = quote_amt / current_price - elif side == 1: # BUY - base_amt = Decimal("0") - quote_amt = total - else: # SELL - base_amt = total / current_price - quote_amt = Decimal("0") + elif offset >= 0: + # Out-of-range: single-sided allocation + if side == TradeType.BUY: # BUY - all quote + base_amt = Decimal("0") + quote_amt = total + else: # SELL - all base + base_amt = total / current_price + quote_amt = Decimal("0") + else: + # In-range (offset < 0): proportional split + lower_price, upper_price = self._calculate_price_bounds(side, current_price) + price_range = upper_price - lower_price + + if price_range <= 0 or current_price <= lower_price: + if side == TradeType.BUY: + base_amt = Decimal("0") + quote_amt = total + else: + base_amt = total / current_price + quote_amt = Decimal("0") + elif current_price >= upper_price: + if side == TradeType.SELL: + base_amt = total / current_price + quote_amt = Decimal("0") + else: + base_amt = Decimal("0") + quote_amt = total + else: + price_ratio = (current_price - lower_price) / price_range + quote_pct = price_ratio + base_pct = Decimal("1") - price_ratio + quote_amt = total * quote_pct + base_amt = (total * base_pct) / current_price return base_amt, quote_amt - def _calculate_price_bounds(self, side: int, current_price: Decimal) -> tuple: + def _calculate_price_bounds(self, side: TradeType, current_price: Decimal) -> tuple: """ Calculate position bounds based on side and price limits. - - Side 0 (BOTH): centered on current price, clamped to [buy_min, sell_max] - Side 1 (BUY): upper = min(current, buy_price_max) * (1 - offset), lower extends width below - Side 2 (SELL): lower = max(current, sell_price_min) * (1 + offset), upper extends width above - - The offset ensures single-sided positions start out-of-range so they only - require one token (SOL for SELL, USDC for BUY). """ width = self.config.position_width_pct / Decimal("100") offset = self.config.position_offset_pct / Decimal("100") - if side == 0: # BOTH + if side == TradeType.RANGE: + # Centered on current price half_width = width / Decimal("2") lower_price = current_price * (Decimal("1") - half_width) upper_price = current_price * (Decimal("1") + half_width) - # Clamp to limits - if self.config.buy_price_min: - lower_price = max(lower_price, self.config.buy_price_min) - if self.config.sell_price_max: - upper_price = min(upper_price, self.config.sell_price_max) - - elif side == 1: # BUY - # Position BELOW current price so we only need quote token (USDC) + + elif side == TradeType.BUY: + # Anchor at buy_price_max if set, otherwise at current price if self.config.buy_price_max: upper_price = min(current_price, self.config.buy_price_max) else: upper_price = current_price - # Apply offset to decrease upper bound (ensures out-of-range) upper_price = upper_price * (Decimal("1") - offset) lower_price = upper_price * (Decimal("1") - width) - # Clamp lower to floor - if self.config.buy_price_min: - lower_price = max(lower_price, self.config.buy_price_min) else: # SELL - # Position ABOVE current price so we only need base token (SOL) + # Anchor at sell_price_min if set, otherwise at current price if self.config.sell_price_min: lower_price = max(current_price, self.config.sell_price_min) else: lower_price = current_price - # Apply offset to increase lower bound (ensures out-of-range) lower_price = lower_price * (Decimal("1") + offset) upper_price = lower_price * (Decimal("1") + width) - # Clamp upper to ceiling - if self.config.sell_price_max: - upper_price = min(upper_price, self.config.sell_price_max) return lower_price, upper_price - def _is_price_within_limits(self, price: Decimal, side: int) -> bool: + def _is_price_within_limits(self, price: Decimal, side: TradeType) -> bool: """ Check if price is within configured limits for the position type. - - Price must be within the range to create a position that's IN_RANGE: - - BUY: price must be within [buy_price_min, buy_price_max] - - SELL: price must be within [sell_price_min, sell_price_max] - - BOTH: price must be within the intersection of both ranges - - If price is outside the range, the position would be immediately OUT_OF_RANGE. """ - if side == 2: # SELL + if side == TradeType.SELL: if self.config.sell_price_min and price < self.config.sell_price_min: return False if self.config.sell_price_max and price > self.config.sell_price_max: return False - elif side == 1: # BUY + elif side == TradeType.BUY: if self.config.buy_price_min and price < self.config.buy_price_min: return False if self.config.buy_price_max and price > self.config.buy_price_max: return False - else: # BOTH - must be within intersection of ranges - # Check buy range + else: # RANGE if self.config.buy_price_min and price < self.config.buy_price_min: return False if self.config.buy_price_max and price > self.config.buy_price_max: return False - # Check sell range if self.config.sell_price_min and price < self.config.sell_price_min: return False if self.config.sell_price_max and price > self.config.sell_price_max: return False return True + def _validate_and_clamp_bounds( + self, lower_price: Decimal, upper_price: Decimal, side: TradeType, current_price: Decimal + ) -> tuple: + """ + Validate bounds against price limits. Clamp if one bound exceeds, try opposite side if both exceed. + + Returns: (lower_price, upper_price, side) or (None, None, None) if no valid position possible. + + Note: RANGE positions skip price limit checks entirely. + """ + # RANGE positions skip price limit checks + if side == TradeType.RANGE: + return lower_price, upper_price, side + + # Get limits for this side + if side == TradeType.BUY: + min_limit = self.config.buy_price_min + max_limit = self.config.buy_price_max + else: # SELL + min_limit = self.config.sell_price_min + max_limit = self.config.sell_price_max + + # Check how many bounds exceed limits + lower_exceeds = min_limit and lower_price < min_limit + upper_exceeds = max_limit and upper_price > max_limit + + if not lower_exceeds and not upper_exceeds: + # Both bounds within limits + return lower_price, upper_price, side + + if lower_exceeds and upper_exceeds: + # Both bounds exceed - try opposite side + opposite_side = TradeType.SELL if side == TradeType.BUY else TradeType.BUY + opp_lower, opp_upper = self._calculate_price_bounds(opposite_side, current_price) + + # Check opposite side limits + if opposite_side == TradeType.BUY: + opp_min = self.config.buy_price_min + opp_max = self.config.buy_price_max + else: + opp_min = self.config.sell_price_min + opp_max = self.config.sell_price_max + + opp_lower_exceeds = opp_min and opp_lower < opp_min + opp_upper_exceeds = opp_max and opp_upper > opp_max + + if not opp_lower_exceeds and not opp_upper_exceeds: + self.logger().info(f"Side {side.name} out of limits, using {opposite_side.name}") + return opp_lower, opp_upper, opposite_side + elif opp_lower_exceeds and not opp_upper_exceeds: + # Clamp lower on opposite side + self.logger().info(f"Side {side.name} out of limits, using {opposite_side.name} (clamped lower)") + return opp_min, opp_upper, opposite_side + elif not opp_lower_exceeds and opp_upper_exceeds: + # Clamp upper on opposite side + self.logger().info(f"Side {side.name} out of limits, using {opposite_side.name} (clamped upper)") + return opp_lower, opp_max, opposite_side + else: + # Both sides completely out of limits + self.logger().info("Both sides out of price limits - waiting") + return None, None, None + + # Only one bound exceeds - clamp it + if lower_exceeds: + self.logger().debug(f"Clamping lower from {lower_price} to {min_limit}") + return min_limit, upper_price, side + else: # upper_exceeds + self.logger().debug(f"Clamping upper from {upper_price} to {max_limit}") + return lower_price, max_limit, side + + def _determine_side_from_price(self, current_price: Decimal) -> TradeType: + """ + Determine side (BUY or SELL) based on current price vs price limits. + """ + buy_mid = None + sell_mid = None + + if self.config.buy_price_min and self.config.buy_price_max: + buy_mid = (self.config.buy_price_min + self.config.buy_price_max) / 2 + if self.config.sell_price_min and self.config.sell_price_max: + sell_mid = (self.config.sell_price_min + self.config.sell_price_max) / 2 + + if buy_mid and sell_mid: + if current_price <= buy_mid: + return TradeType.BUY + elif current_price >= sell_mid: + return TradeType.SELL + else: + return TradeType.BUY if (current_price - buy_mid) < (sell_mid - current_price) else TradeType.SELL + + if buy_mid: + return TradeType.BUY + if sell_mid: + return TradeType.SELL + + return TradeType.BUY # Default to BUY + async def update_processed_data(self): - """Called every tick - always fetch fresh pool price for accurate position creation.""" + """Called every tick - fetch pool price.""" try: connector = self.market_data_provider.get_connector(self.config.connector_name) if hasattr(connector, 'get_pool_info_by_address'): - pool_info = await connector.get_pool_info_by_address(self.config.pool_address) + pool_info = await connector.get_pool_info_by_address( + self.config.pool_address, + dex_name=self.lp_dex_name, + trading_type=self.lp_trading_type, + ) if pool_info and pool_info.price: self._pool_price = Decimal(str(pool_info.price)) except Exception as e: @@ -581,7 +919,7 @@ def to_format_status(self) -> List[str]: """Format status for display.""" status = [] box_width = 100 - price_decimals = 8 # For small-value tokens like memecoins + price_decimals = 6 # Header status.append("+" + "-" * box_width + "+") @@ -589,113 +927,94 @@ def to_format_status(self) -> List[str]: status.append(header + " " * (box_width - len(header) + 1) + "|") status.append("+" + "-" * box_width + "+") - # Network, connector, pool - line = f"| Network: {self.config.network}" + # Config summary + line = f"| Network: {self.config.connector_name} | LP: {self.config.lp_provider}" status.append(line + " " * (box_width - len(line) + 1) + "|") line = f"| Pool: {self.config.pool_address}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Position info from current executor (active or transitioning) - executor = self.active_executor() or self.get_tracked_executor() - if executor and not executor.is_done: - position_address = executor.custom_info.get("position_address", "N/A") - line = f"| Position: {position_address}" - status.append(line + " " * (box_width - len(line) + 1) + "|") - - # Config summary - side_names = {0: "BOTH", 1: "BUY", 2: "SELL"} - side_str = side_names.get(self.config.side, '?') + side_str = self.config.side.name amt = self.config.total_amount_quote width = self.config.position_width_pct - rebal = self.config.rebalance_seconds - line = f"| Config: side={side_str}, amount={amt} {self._quote_token}, width={width}%, rebal={rebal}s" + offset = self.config.position_offset_pct + threshold = self.config.rebalance_threshold_pct + line = f"| Config: side={side_str}, amount={amt} {self._quote_token}, width={width}%, offset={offset}%, threshold={threshold}%" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Position fees and assets + status.append("|" + " " * box_width + "|") + + # Position section + executor = self.active_executor() or self.get_tracked_executor() + pos_base_amount = Decimal("0") + pos_quote_amount = Decimal("0") + if executor and not executor.is_done: custom = executor.custom_info - # Fees row: base_fee + quote_fee = total - base_fee = Decimal(str(custom.get("base_fee", 0))) - quote_fee = Decimal(str(custom.get("quote_fee", 0))) - fees_earned_quote = Decimal(str(custom.get("fees_earned_quote", 0))) - line = ( - f"| Fees: {float(base_fee):.6f} {self._base_token} + " - f"{float(quote_fee):.6f} {self._quote_token} = {float(fees_earned_quote):.6f}" - ) + position_address = custom.get("position_address", "N/A") + line = f"| Position: {position_address}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Value row: base_amount + quote_amount = total value - base_amount = Decimal(str(custom.get("base_amount", 0))) - quote_amount = Decimal(str(custom.get("quote_amount", 0))) + pos_base_amount = Decimal(str(custom.get("base_amount", 0))) + pos_quote_amount = Decimal(str(custom.get("quote_amount", 0))) total_value_quote = Decimal(str(custom.get("total_value_quote", 0))) line = ( - f"| Value: {float(base_amount):.6f} {self._base_token} + " - f"{float(quote_amount):.6f} {self._quote_token} = {float(total_value_quote):.4f}" + f"| Assets: {float(pos_base_amount):.6f} {self._base_token} + " + f"{float(pos_quote_amount):.6f} {self._quote_token} = {float(total_value_quote):.4f} {self._quote_token}" ) status.append(line + " " * (box_width - len(line) + 1) + "|") - # Position range visualization - lower_price = executor.custom_info.get("lower_price") - upper_price = executor.custom_info.get("upper_price") - - if lower_price and upper_price and self._pool_price: - # Show rebalance thresholds (convert % to decimal) - # Takes into account price limits - rebalance only happens within limits - threshold = self.config.rebalance_threshold_pct / Decimal("100") - lower_threshold = Decimal(str(lower_price)) * (Decimal("1") - threshold) - upper_threshold = Decimal(str(upper_price)) * (Decimal("1") + threshold) + base_fee = Decimal(str(custom.get("base_fee", 0))) + quote_fee = Decimal(str(custom.get("quote_fee", 0))) + fees_earned_quote = Decimal(str(custom.get("fees_earned_quote", 0))) + line = ( + f"| Fees: {float(base_fee):.6f} {self._base_token} + " + f"{float(quote_fee):.6f} {self._quote_token} = {float(fees_earned_quote):.6f} {self._quote_token}" + ) + status.append(line + " " * (box_width - len(line) + 1) + "|") - # Lower threshold triggers SELL - check sell_price_min - if self.config.sell_price_min and lower_threshold < self.config.sell_price_min: - lower_str = "N/A" # Below sell limit, no rebalance possible - else: - lower_str = f"{float(lower_threshold):.{price_decimals}f}" + lower_price = custom.get("lower_price") + upper_price = custom.get("upper_price") - # Upper threshold triggers BUY - check buy_price_max - if self.config.buy_price_max and upper_threshold > self.config.buy_price_max: - upper_str = "N/A" # Above buy limit, no rebalance possible - else: - upper_str = f"{float(upper_threshold):.{price_decimals}f}" + if lower_price is not None and upper_price is not None and self._pool_price: + # Show limit prices (auto-close thresholds) + threshold_pct = self.config.rebalance_threshold_pct / Decimal("100") + lower_limit = Decimal(str(lower_price)) * (Decimal("1") - threshold_pct) + upper_limit = Decimal(str(upper_price)) * (Decimal("1") + threshold_pct) - line = f"| Price: {float(self._pool_price):.{price_decimals}f} | Rebalance if: <{lower_str} or >{upper_str}" + line = f"| Price: {float(self._pool_price):.{price_decimals}f} | Auto-close if: <{float(lower_limit):.{price_decimals}f} or >{float(upper_limit):.{price_decimals}f}" status.append(line + " " * (box_width - len(line) + 1) + "|") - state = executor.custom_info.get("state", "UNKNOWN") + state = custom.get("state", "UNKNOWN") state_icons = { - "IN_RANGE": "●", - "OUT_OF_RANGE": "○", - "OPENING": "◐", - "CLOSING": "◑", - "COMPLETE": "◌", - "NOT_ACTIVE": "○", + "IN_RANGE": "[in]", + "OUT_OF_RANGE": "[out]", + "OPENING": "[...]", + "CLOSING": "[x]", + "COMPLETE": "[done]", + "NOT_ACTIVE": "[-]", } - state_icon = state_icons.get(state, "?") + state_icon = state_icons.get(state, "[?]") status.append("|" + " " * box_width + "|") - line = f"| Position Status: [{state_icon} {state}]" + line = f"| Status: {state_icon} {state}" status.append(line + " " * (box_width - len(line) + 1) + "|") + # Range visualization range_viz = self._create_price_range_visualization( Decimal(str(lower_price)), self._pool_price, - Decimal(str(upper_price)) + Decimal(str(upper_price)), + lower_limit, + upper_limit ) for viz_line in range_viz.split('\n'): line = f"| {viz_line}" status.append(line + " " * (box_width - len(line) + 1) + "|") - - # Show rebalance timer if out of range - out_of_range_seconds = executor.custom_info.get("out_of_range_seconds") - if out_of_range_seconds is not None: - # Check if beyond threshold - beyond_threshold = self._is_beyond_rebalance_threshold(executor) - if beyond_threshold: - line = f"| Rebalance: {out_of_range_seconds}s / {self.config.rebalance_seconds}s" - else: - line = f"| Rebalance: waiting (below {float(self.config.rebalance_threshold_pct):.2f}% threshold)" - status.append(line + " " * (box_width - len(line) + 1) + "|") + else: + line = "| Position: None" + status.append(line + " " * (box_width - len(line) + 1) + "|") # Price limits visualization has_limits = any([ @@ -703,10 +1022,9 @@ def to_format_status(self) -> List[str]: self.config.buy_price_min, self.config.buy_price_max ]) if has_limits and self._pool_price: - # Get position bounds if available pos_lower = None pos_upper = None - if executor: + if executor and not executor.is_done: pos_lower = executor.custom_info.get("lower_price") pos_upper = executor.custom_info.get("upper_price") if pos_lower: @@ -723,110 +1041,101 @@ def to_format_status(self) -> List[str]: line = f"| {viz_line}" status.append(line + " " * (box_width - len(line) + 1) + "|") - # Balance comparison table (formatted like main balance table) - status.append("|" + " " * box_width + "|") - try: - current_base = self.market_data_provider.get_balance( - self.config.connector_name, self._base_token - ) - current_quote = self.market_data_provider.get_balance( - self.config.connector_name, self._quote_token - ) - - line = "| Balances:" - status.append(line + " " * (box_width - len(line) + 1) + "|") - - # Table header - header = f"| {'Asset':<12} {'Initial':>14} {'Current':>14} {'Change':>16}" - status.append(header + " " * (box_width - len(header) + 1) + "|") - - # Base token row - if self._initial_base_balance is not None: - base_change = current_base - self._initial_base_balance - init_b = float(self._initial_base_balance) - curr_b = float(current_base) - chg_b = float(base_change) - line = f"| {self._base_token:<12} {init_b:>14.6f} {curr_b:>14.6f} {chg_b:>+16.6f}" - else: - curr_b = float(current_base) - line = f"| {self._base_token:<12} {'N/A':>14} {curr_b:>14.6f} {'N/A':>16}" - status.append(line + " " * (box_width - len(line) + 1) + "|") - - # Quote token row - if self._initial_quote_balance is not None: - quote_change = current_quote - self._initial_quote_balance - init_q = float(self._initial_quote_balance) - curr_q = float(current_quote) - chg_q = float(quote_change) - line = f"| {self._quote_token:<12} {init_q:>14.6f} {curr_q:>14.6f} {chg_q:>+16.6f}" - else: - curr_q = float(current_quote) - line = f"| {self._quote_token:<12} {'N/A':>14} {curr_q:>14.6f} {'N/A':>16}" - status.append(line + " " * (box_width - len(line) + 1) + "|") - except Exception as e: - line = f"| Balances: Error fetching ({e})" - status.append(line + " " * (box_width - len(line) + 1) + "|") - # Closed positions summary status.append("|" + " " * box_width + "|") + closed_lp = [e for e in self.executors_info + if e.is_done and getattr(e.config, "type", None) == "lp_executor"] + closed_swaps = [e for e in self.executors_info + if e.is_done and getattr(e.config, "type", None) == "order_executor"] - closed = [e for e in self.executors_info if e.is_done] - - # Count closed by side (config.side: 0=both, 1=buy, 2=sell) - both_count = len([e for e in closed if getattr(e.config, "side", None) == 0]) - buy_count = len([e for e in closed if getattr(e.config, "side", None) == 1]) - sell_count = len([e for e in closed if getattr(e.config, "side", None) == 2]) + buy_count = len([e for e in closed_lp if getattr(e.config, "side", None) == TradeType.BUY]) + sell_count = len([e for e in closed_lp if getattr(e.config, "side", None) == TradeType.SELL]) + range_count = len([e for e in closed_lp if getattr(e.config, "side", None) == TradeType.RANGE]) - # Calculate fees from closed positions total_fees_base = Decimal("0") total_fees_quote = Decimal("0") - - for e in closed: + for e in closed_lp: total_fees_base += Decimal(str(e.custom_info.get("base_fee", 0))) total_fees_quote += Decimal(str(e.custom_info.get("quote_fee", 0))) pool_price = self._pool_price or Decimal("0") total_fees_value = total_fees_base * pool_price + total_fees_quote - line = f"| Closed: {len(closed)} (both:{both_count} buy:{buy_count} sell:{sell_count})" + line = f"| Closed Positions: {len(closed_lp)} (buy:{buy_count} sell:{sell_count} range:{range_count})" status.append(line + " " * (box_width - len(line) + 1) + "|") - fb = float(total_fees_base) - fq = float(total_fees_quote) - fv = float(total_fees_value) - line = f"| Fees Collected: {fb:.6f} {self._base_token} + {fq:.6f} {self._quote_token} = {fv:.6f}" + + if closed_swaps: + line = f"| Swaps Executed: {len(closed_swaps)}" + status.append(line + " " * (box_width - len(line) + 1) + "|") + + line = f"| Fees Collected: {float(total_fees_base):.6f} {self._base_token} + {float(total_fees_quote):.6f} {self._quote_token} = {float(total_fees_value):.6f} {self._quote_token}" status.append(line + " " * (box_width - len(line) + 1) + "|") status.append("+" + "-" * box_width + "+") return status def _create_price_range_visualization(self, lower_price: Decimal, current_price: Decimal, - upper_price: Decimal) -> str: - """Create visual representation of price range with current price marker""" - price_range = upper_price - lower_price - if price_range == 0: + upper_price: Decimal, lower_limit: Decimal, + upper_limit: Decimal) -> str: + """ + Create visual representation of price range with current price marker. + + Shows: R for rebalance limits, | for position limits, * for current price + Example: R----|---*--------------------------------|----R + """ + total_range = upper_limit - lower_limit + if total_range == 0: return f"[{float(lower_price):.6f}] (zero width)" - current_position = (current_price - lower_price) / price_range bar_width = 50 - current_pos = int(current_position * bar_width) - range_bar = ['─'] * bar_width - range_bar[0] = '├' - range_bar[-1] = '┤' + def price_to_pos(price: Decimal) -> int: + return int(((price - lower_limit) / total_range) * bar_width) + + # Calculate positions + lower_pos = price_to_pos(lower_price) + upper_pos = price_to_pos(upper_price) + current_pos = price_to_pos(current_price) + + # Build bar (R at edges for rebalance limits) + range_bar = ['-'] * bar_width + range_bar[0] = 'R' + range_bar[-1] = 'R' + + # Place position limits (|) + if 0 < lower_pos < bar_width: + range_bar[lower_pos] = '|' + if 0 < upper_pos < bar_width: + range_bar[upper_pos] = '|' + # Place current price marker (*) if current_pos < 0: - marker_line = '● ' + ''.join(range_bar) + marker_line = '* ' + ''.join(range_bar) elif current_pos >= bar_width: - marker_line = ''.join(range_bar) + ' ●' + marker_line = ''.join(range_bar) + ' *' else: - range_bar[current_pos] = '●' + range_bar[current_pos] = '*' marker_line = ''.join(range_bar) viz_lines = [] viz_lines.append(marker_line) + + # Price labels: show all four prices + lower_limit_str = f'{float(lower_limit):.6f}' lower_str = f'{float(lower_price):.6f}' upper_str = f'{float(upper_price):.6f}' - viz_lines.append(lower_str + ' ' * (bar_width - len(lower_str) - len(upper_str)) + upper_str) + upper_limit_str = f'{float(upper_limit):.6f}' + + # Build price label line with proper spacing + label_line = lower_limit_str + spacing1 = max(1, lower_pos - len(lower_limit_str)) + label_line += ' ' * spacing1 + lower_str + spacing2 = max(1, upper_pos - lower_pos - len(lower_str)) + label_line += ' ' * spacing2 + upper_str + spacing3 = max(1, bar_width - upper_pos - len(upper_str)) + label_line += ' ' * spacing3 + upper_limit_str + + viz_lines.append(label_line) return '\n'.join(viz_lines) @@ -835,7 +1144,7 @@ def _create_price_limits_visualization( current_price: Decimal, pos_lower: Optional[Decimal] = None, pos_upper: Optional[Decimal] = None, - price_decimals: int = 8 + price_decimals: int = 6 ) -> Optional[str]: """Create visualization of sell/buy price limits on unified scale.""" viz_lines = [] diff --git a/models/executors.py b/models/executors.py index 1c1f1941..26f417f0 100644 --- a/models/executors.py +++ b/models/executors.py @@ -213,7 +213,7 @@ class PositionsSummaryResponse(BaseModel): "twap_executor", "xemm_executor", "order_executor", - "lp_executor" + "lp_executor", ] @@ -248,21 +248,20 @@ class CreateExecutorRequest(BaseModel): }, { "summary": "LP Executor", - "description": "Create an LP position on a CLMM DEX (Meteora, Raydium)", + "description": "Create an LP position on a CLMM DEX", "value": { "account_name": "master_account", "executor_config": { "type": "lp_executor", - "connector_name": "meteora/clmm", + "connector_name": "solana-mainnet-beta", + "lp_provider": "meteora/clmm", "trading_pair": "SOL-USDC", "pool_address": "HTvjzsfX3yU6BUodCjZ5vZkUrAxMDTrBs3CJaq43ashR", "lower_price": "80", "upper_price": "100", "base_amount": "0", "quote_amount": "10.0", - "side": 1, - "auto_close_above_range_seconds": None, - "auto_close_below_range_seconds": 300, + "side": "BUY", "extra_params": {"strategyType": 0}, "keep_position": False } diff --git a/routers/connectors.py b/routers/connectors.py index 85d7af86..fa897c9a 100644 --- a/routers/connectors.py +++ b/routers/connectors.py @@ -16,9 +16,11 @@ async def available_connectors(): Get a list of all available connectors. Returns: - List of connector names supported by the system + List of connector names supported by the system (excludes DEX providers which use Gateway networks) """ - return list(AllConnectorSettings.get_connector_settings().keys()) + all_connectors = AllConnectorSettings.get_connector_settings().keys() + # Filter out DEX providers (contain '/') - these are accessed via Gateway networks + return [c for c in all_connectors if '/' not in c] @router.get("/{connector_name}/config-map", response_model=Dict[str, dict]) diff --git a/routers/gateway.py b/routers/gateway.py index 7bd7b428..6c599b87 100644 --- a/routers/gateway.py +++ b/routers/gateway.py @@ -606,9 +606,7 @@ async def add_network_token( return { "success": True, - "message": f"Token {token_request.symbol} added to {network_id}. Restart Gateway for changes to take effect.", - "restart_required": True, - "restart_endpoint": "POST /gateway/restart", + "message": f"Token {token_request.symbol} added to {network_id}.", "token": { "symbol": token_request.symbol, "address": token_request.address, @@ -661,9 +659,7 @@ async def delete_network_token( return { "success": True, - "message": f"Token {token_address} deleted from {network_id}. Restart Gateway for changes to take effect.", - "restart_required": True, - "restart_endpoint": "POST /gateway/restart", + "message": f"Token {token_address} deleted from {network_id}.", "token_address": token_address, "network_id": network_id } diff --git a/services/accounts_service.py b/services/accounts_service.py index 4726dc19..70f5a1fd 100644 --- a/services/accounts_service.py +++ b/services/accounts_service.py @@ -438,16 +438,12 @@ class AccountsService: update the balances of each account. """ default_quotes = { - "hyperliquid": "USD", - "hyperliquid_perpetual": "USDC", + "hyperliquid": "USDC", + "hyperliquid_perpetual": "USD", "xrpl": "RLUSD", "kraken": "USD", } - gateway_default_pricing_connector = { - "ethereum": "uniswap/router", - "solana": "jupiter/router", - } - potential_wrapped_tokens = ["ETH", "SOL", "BNB", "POL", "AVAX", "FTM", "ONE", "GLMR", "MOVR"] + potential_wrapped_tokens = ["ETH", "SOL", "BNB", "POL", "AVAX"] # Cache for storing last successful prices by trading pair _last_known_prices = {} @@ -2164,10 +2160,6 @@ async def _fetch_gateway_prices_immediate(self, chain: str, network: str, Fetch prices immediately from Gateway for the given tokens. This is used to get prices right away instead of waiting for the background update task. - Uses the same pricing connector resolution as MarketDataProvider.update_rates_task(): - - solana -> jupiter/router - - ethereum -> uniswap/router - Args: chain: Blockchain chain (e.g., 'solana', 'ethereum') network: Network name (e.g., 'mainnet-beta', 'mainnet') @@ -2184,11 +2176,8 @@ async def _fetch_gateway_prices_immediate(self, chain: str, network: str, rate_oracle = RateOracle.get_instance() prices = {} - # Resolve pricing connector based on chain (same logic as MarketDataProvider) - pricing_connector = self.gateway_default_pricing_connector.get(chain) - if not pricing_connector: - logger.warning(f"No pricing connector configured for chain '{chain}', skipping immediate price fetch") - return prices + # Construct full network name (e.g., "solana-mainnet-beta") + full_network = f"{chain}-{network}" # Create tasks for all tokens in parallel tasks = [] @@ -2222,10 +2211,9 @@ async def _fetch_gateway_prices_immediate(self, chain: str, network: str, continue try: + # get_price will auto-fetch dex/trading_type from network's swap provider task = gateway_client.get_price( - chain=chain, - network=network, - connector=pricing_connector, + network=full_network, base_asset=token, quote_asset=quote_asset, amount=Decimal("1"), diff --git a/services/executor_service.py b/services/executor_service.py index 87f70dd4..bd18f091 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -352,13 +352,13 @@ async def create_executor( # Extract connector and trading pair from config connector_name = executor_config.get("connector_name") trading_pair = executor_config.get("trading_pair") - if not connector_name: - raise HTTPException(status_code=400, detail="connector_name is required in executor_config") - if not trading_pair: - raise HTTPException(status_code=400, detail="trading_pair is required in executor_config") # Ensure connector and market are ready - await trading_interface.add_market(connector_name, trading_pair) + if connector_name: + if trading_pair: + await trading_interface.add_market(connector_name, trading_pair) + else: + await trading_interface.ensure_connector(connector_name) # Set timestamp if not provided (required for time-based features like time_limit) if "timestamp" not in executor_config or executor_config["timestamp"] is None: @@ -650,6 +650,12 @@ def _format_executor_info( result["close_type"] = executor.close_type.name if executor.close_type else None result["is_active"] = not executor.is_closed + # Add side from executor_info (it's a property, not serialized by model_dump) + side = executor_info.side + if side is not None: + # Convert TradeType enum or int to string + result["side"] = side.name if hasattr(side, 'name') else str(side) + # For grid executors, filter out heavy fields from custom_info if executor_type == "grid_executor" and result.get("custom_info"): heavy_fields = {"levels_by_state", "filled_orders", "failed_orders", "canceled_orders"} diff --git a/services/gateway_client.py b/services/gateway_client.py index b62aec07..9a92a19b 100644 --- a/services/gateway_client.py +++ b/services/gateway_client.py @@ -1,5 +1,4 @@ import logging -from decimal import Decimal from typing import Dict, List, Optional import aiohttp @@ -582,7 +581,6 @@ async def poll_transaction( self, network_id: str, tx_hash: str, - wallet_address: Optional[str] = None ) -> Optional[Dict]: """ Poll transaction status on blockchain. @@ -590,12 +588,12 @@ async def poll_transaction( Args: network_id: Network ID in format 'chain-network' (e.g., 'solana-mainnet-beta', 'ethereum-mainnet') tx_hash: Transaction hash/signature - wallet_address: Optional wallet address for verification Returns: Transaction status dict with fields: - - txStatus: 1 for confirmed, 0 for failed/pending + - txStatus: 1 for confirmed, 0 for pending, -1 for failed - fee: Transaction fee amount + - error: Parsed error message if transaction failed (e.g., "SLIPPAGE_EXCEEDED (0x1771): ...") - txData: Full transaction data including meta.err Returns None if Gateway is unavailable or request fails. """ @@ -612,11 +610,8 @@ async def poll_transaction( "network": network, "signature": tx_hash } - if wallet_address: - payload["walletAddress"] = wallet_address return await self._request("POST", f"chains/{chain}/poll", json=payload) except Exception as e: logger.error(f"Error polling transaction {tx_hash}: {e}") return None - diff --git a/services/gateway_transaction_poller.py b/services/gateway_transaction_poller.py index 991fa44d..33d810f4 100644 --- a/services/gateway_transaction_poller.py +++ b/services/gateway_transaction_poller.py @@ -8,16 +8,16 @@ """ import asyncio import logging -from typing import Optional, Dict, List from datetime import datetime, timedelta, timezone from decimal import Decimal +from typing import Dict, List, Optional from sqlalchemy import select from sqlalchemy.orm import selectinload from database import AsyncDatabaseManager -from database.repositories import GatewaySwapRepository, GatewayCLMMRepository from database.models import GatewayCLMMEvent, GatewayCLMMPosition +from database.repositories import GatewayCLMMRepository, GatewaySwapRepository from services.gateway_client import GatewayClient logger = logging.getLogger(__name__) @@ -312,9 +312,6 @@ async def _check_transaction_status( # Parse the response with defensive checks tx_status = result.get("txStatus") - tx_data = result.get("txData") or {} - meta = tx_data.get("meta") if isinstance(tx_data, dict) else {} - error = meta.get("err") if isinstance(meta, dict) else None # Determine gas token based on chain gas_token = { @@ -326,8 +323,8 @@ async def _check_transaction_status( "avalanche": "AVAX" }.get(chain, "UNKNOWN") - # Transaction is confirmed if txStatus == 1 and no error - if tx_status == 1 and error is None: + # Transaction is confirmed if txStatus == 1 + if tx_status == 1: return { "status": "CONFIRMED", "gas_fee": result.get("fee", 0), @@ -335,9 +332,16 @@ async def _check_transaction_status( "error_message": None } - # Transaction failed if there's an error - if error is not None: - error_msg = str(error) if error else "Transaction failed on-chain" + # Transaction failed if txStatus == -1 or there's an error field + # Gateway now returns parsed error messages like "SLIPPAGE_EXCEEDED (0x1771): ..." + error_msg = result.get("error") + if tx_status == -1 or error_msg: + if not error_msg: + # Fallback to meta.err if no parsed error + tx_data = result.get("txData") or {} + meta = tx_data.get("meta") if isinstance(tx_data, dict) else {} + raw_error = meta.get("err") if isinstance(meta, dict) else None + error_msg = str(raw_error) if raw_error else "Transaction failed on-chain" return { "status": "FAILED", "gas_fee": result.get("fee", 0), @@ -352,14 +356,13 @@ async def _check_transaction_status( logger.error(f"Error checking transaction status for {tx_hash}: {e}") return None - async def poll_transaction_once(self, tx_hash: str, network_id: str, wallet_address: Optional[str] = None) -> Optional[Dict]: + async def poll_transaction_once(self, tx_hash: str, network_id: str) -> Optional[Dict]: """ Poll a specific transaction once (useful for immediate status checks). Args: tx_hash: Transaction hash network_id: Network ID in format 'chain-network' (e.g., 'solana-mainnet-beta') - wallet_address: Optional wallet address for verification Returns: Transaction status dict or None if pending diff --git a/services/trading_service.py b/services/trading_service.py index b3dac601..4e648725 100644 --- a/services/trading_service.py +++ b/services/trading_service.py @@ -7,14 +7,14 @@ import logging import time from decimal import Decimal -from typing import Dict, List, Optional, Set, TYPE_CHECKING +from typing import TYPE_CHECKING, Dict, List, Optional, Set from hummingbot.connector.connector_base import ConnectorBase -from hummingbot.core.data_type.common import OrderType, TradeType, PositionAction +from hummingbot.core.data_type.common import OrderType, PositionAction, TradeType if TYPE_CHECKING: - from services.unified_connector_service import UnifiedConnectorService from services.market_data_service import MarketDataService + from services.unified_connector_service import UnifiedConnectorService logger = logging.getLogger(__name__) @@ -173,6 +173,14 @@ async def add_market( # Register trading pair with connector self._register_trading_pair_with_connector(connector, trading_pair) + # Update balances to include tokens from new trading pair + if hasattr(connector, '_update_balances'): + try: + await connector._update_balances() + logger.debug(f"Updated balances for {connector_name} after adding {trading_pair}") + except Exception as e: + logger.warning(f"Failed to update balances for {connector_name}: {e}") + logger.info(f"Market {connector_name}/{trading_pair} added to trading interface") async def remove_market( diff --git a/services/unified_connector_service.py b/services/unified_connector_service.py index 9d697f0a..830a29ed 100644 --- a/services/unified_connector_service.py +++ b/services/unified_connector_service.py @@ -22,7 +22,7 @@ from hummingbot.connector.connector_base import ConnectorBase from hummingbot.connector.connector_metrics_collector import TradeVolumeMetricCollector from hummingbot.connector.exchange_py_base import ExchangePyBase -from hummingbot.connector.gateway.gateway_lp import GatewayLp +from hummingbot.connector.gateway.gateway import Gateway from hummingbot.connector.perpetual_derivative_py_base import PerpetualDerivativePyBase from hummingbot.core.data_type.common import OrderType, PositionAction, PositionMode, TradeType from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderState @@ -635,21 +635,21 @@ def _create_trading_connector( ) -> ConnectorBase: """Create a trading connector with API keys. - For gateway connectors (containing '/'), creates a GatewayLp connector - which auto-detects chain/network and uses the default wallet. + For Gateway network connectors (e.g., 'solana-mainnet-beta'), creates a unified + Gateway connector which auto-detects chain/network and uses the default wallet. + The dex_name and trading_type are passed to methods, not to the connector. """ BackendAPISecurity.login_account( account_name=account_name, secrets_manager=self.secrets_manager ) - # Gateway connectors (e.g., 'meteora/clmm', 'raydium/clmm') are not in AllConnectorSettings - # They use GatewayLp which auto-detects chain/network from gateway config - if '/' in connector_name: - logger.info(f"Creating gateway connector: {connector_name}") - # GatewayLp handles chain/network auto-detection and default wallet lookup - # via start_network() call - return GatewayLp( + # Check if this is a Gateway network connector + # Gateway connectors are NOT in AllConnectorSettings (those are exchange connectors) + # Network format: "chain-network" (e.g., "solana-mainnet-beta", "ethereum-mainnet") + if connector_name not in self._conn_settings: + logger.info(f"Creating Gateway connector for network: {connector_name}") + return Gateway( connector_name=connector_name, trading_pairs=[], trading_required=True,