diff --git a/src/meta_vault/service.py b/src/meta_vault/service.py index 8ed868a1..ddec572e 100644 --- a/src/meta_vault/service.py +++ b/src/meta_vault/service.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import cast +from queue import Queue from eth_typing import BlockNumber, ChecksumAddress from sw_utils import is_meta_vault_upgraded_to_release, memoize @@ -13,10 +13,11 @@ VaultContract, ) from src.config.settings import settings +from src.meta_vault.typings import SubVaultRedemption async def distribute_meta_vault_redemption_assets( - vault_to_redemption_assets: defaultdict[ChecksumAddress, Wei], + vault_to_redemption_assets: dict[ChecksumAddress, Wei], block_number: BlockNumber | None = None, ) -> defaultdict[ChecksumAddress, Wei]: """ @@ -24,62 +25,55 @@ async def distribute_meta_vault_redemption_assets( vault_to_redemption_assets: A mapping of vault addresses to their respective redemption assets, which may include meta vaults. - Distribute redemption assets from meta vaults to their underlying sub-vaults. + Distribute redemption assets from meta vaults across their entire sub-vault tree. Returns a mapping of vault addresses to their respective redemption assets, - ensuring all assets are assigned to non-meta vaults. + including leaf vaults, root meta vaults, and all intermediary meta sub-vaults. """ - final_vault_to_redemption_assets: defaultdict[ChecksumAddress, int] = defaultdict(lambda: 0) + final_vault_to_redemption_assets: defaultdict[ChecksumAddress, Wei] = defaultdict( + lambda: Wei(0) + ) + queue: Queue[tuple[ChecksumAddress, Wei]] = Queue() for vault, assets in vault_to_redemption_assets.items(): - if await is_meta_vault(vault): - sub_vaults_redemptions = await get_meta_vault_redemption_assets( - meta_vault_address=vault, - assets_to_redeem=assets, - block_number=block_number, - ) - for sub_vault, sub_assets in sub_vaults_redemptions.items(): - final_vault_to_redemption_assets[sub_vault] += sub_assets - else: - final_vault_to_redemption_assets[vault] += assets - - return cast(defaultdict[ChecksumAddress, Wei], final_vault_to_redemption_assets) - - -async def get_meta_vault_redemption_assets( + queue.put((vault, assets)) + + while not queue.empty(): + vault, assets = queue.get() + final_vault_to_redemption_assets[vault] = Wei( + final_vault_to_redemption_assets[vault] + assets + ) + if not await is_meta_vault(vault): + continue + sub_vaults_redemptions = await get_sub_vaults_redemptions( + meta_vault_address=vault, + assets_to_redeem=assets, + block_number=block_number, + ) + for sub_vault_redemption in sub_vaults_redemptions: + queue.put((sub_vault_redemption.vault, sub_vault_redemption.assets)) + + return final_vault_to_redemption_assets + + +async def get_sub_vaults_redemptions( meta_vault_address: ChecksumAddress, assets_to_redeem: Wei, block_number: BlockNumber | None = None, -) -> defaultdict[ChecksumAddress, Wei]: +) -> list[SubVaultRedemption]: """ - This function distributes the specified assets to redeem from the meta vault - among its underlying sub-vaults. It handles both regular and nested meta vaults. - Finally every asset should be assigned to a non-meta vault. + Distribute the specified assets to redeem from the meta vault across its + direct sub-vaults only. Does not recurse into nested meta vaults. - Returns a mapping of vault addresses to their respective redemption assets. + Returns a list of SubVaultRedemption entries, one per direct sub-vault. """ - vault_to_redemption_assets: defaultdict[ChecksumAddress, int] = defaultdict(lambda: 0) meta_vault_contract = MetaVaultContract(meta_vault_address) - sub_vaults_registry_address = await meta_vault_contract.sub_vaults_registry() sub_vaults_registry_contract = SubVaultsRegistryContract(sub_vaults_registry_address) sub_vaults_redemptions = await sub_vaults_registry_contract.calculate_sub_vaults_redemptions( assets_to_redeem, block_number=block_number ) - - for sub_vault_redemption in sub_vaults_redemptions: - if sub_vault_redemption.assets > 0 and await is_meta_vault(sub_vault_redemption.vault): - sub_vault_assets = await get_meta_vault_redemption_assets( - meta_vault_address=sub_vault_redemption.vault, - assets_to_redeem=sub_vault_redemption.assets, - block_number=block_number, - ) - for vault, assets in sub_vault_assets.items(): - vault_to_redemption_assets[vault] += assets - else: - vault_to_redemption_assets[sub_vault_redemption.vault] += sub_vault_redemption.assets - - return cast(defaultdict[ChecksumAddress, Wei], vault_to_redemption_assets) + return sub_vaults_redemptions @memoize diff --git a/src/meta_vault/tasks.py b/src/meta_vault/tasks.py index 609889ef..f6b897d7 100644 --- a/src/meta_vault/tasks.py +++ b/src/meta_vault/tasks.py @@ -1,12 +1,14 @@ import logging +from collections import defaultdict from eth_typing import ChecksumAddress, HexStr from hexbytes import HexBytes from sw_utils import GNO_NETWORKS, InterruptHandler, convert_to_mgno from web3 import Web3 -from web3.types import BlockNumber +from web3.types import BlockNumber, Wei from src.common.clients import execution_client +from src.common.consensus import get_chain_latest_head from src.common.contracts import ( MetaVaultContract, MetaVaultEncoder, @@ -28,6 +30,7 @@ graph_get_vaults, ) from src.meta_vault.typings import ContractCall, SubVaultExitRequest, Vault +from src.redemptions.tasks import get_vault_to_redemption_assets_distributed logger = logging.getLogger(__name__) @@ -50,6 +53,9 @@ async def process_block(self, interrupt_handler: InterruptHandler) -> None: if not await check_gas_price(): return + chain_head = await get_chain_latest_head() + vault_to_redemption_assets = await get_vault_to_redemption_assets_distributed(chain_head) + for vault in self.vaults: # Refresh the meta vaults map to include updates # made by previous meta vault transactions @@ -58,7 +64,11 @@ async def process_block(self, interrupt_handler: InterruptHandler) -> None: is_meta_vault=True, ) try: - await process_meta_vault_tree(vault=vault, meta_vaults_map=meta_vaults_map) + await process_meta_vault_tree( + vault=vault, + meta_vaults_map=meta_vaults_map, + vault_to_redemption_assets=vault_to_redemption_assets, + ) except Exception: logger.exception('Failed to process meta vault tree for vault %s', vault) @@ -66,6 +76,7 @@ async def process_block(self, interrupt_handler: InterruptHandler) -> None: async def process_meta_vault_tree( vault: ChecksumAddress, meta_vaults_map: dict[ChecksumAddress, Vault], + vault_to_redemption_assets: defaultdict[ChecksumAddress, Wei], ) -> None: """ Process a single meta vault tree: update state and deposit to sub vaults. @@ -103,7 +114,10 @@ async def process_meta_vault_tree( ) # Reverse to get top-down order: root deposits first, then nested meta vaults for meta_vault_address in reversed(meta_vault_addresses): - await process_deposit_to_sub_vaults(meta_vault_address=meta_vault_address) + await process_deposit_to_sub_vaults( + meta_vault_address=meta_vault_address, + redemption_assets=vault_to_redemption_assets[meta_vault_address], + ) async def meta_vault_tree_update_state( @@ -388,12 +402,18 @@ async def is_meta_vault_rewards_nonce_outdated( return meta_vault_event is None -async def process_deposit_to_sub_vaults(meta_vault_address: ChecksumAddress) -> None: +async def process_deposit_to_sub_vaults( + meta_vault_address: ChecksumAddress, + redemption_assets: Wei = Wei(0), +) -> None: meta_vault_contract = MetaVaultContract( address=meta_vault_address, ) withdrawable_assets = await meta_vault_contract.withdrawable_assets() + # Reserve redemption assets so they are not pushed down to sub vaults. + withdrawable_assets = Wei(max(0, withdrawable_assets - redemption_assets)) + if settings.network in GNO_NETWORKS: withdrawable_assets = convert_to_mgno(withdrawable_assets) diff --git a/src/meta_vault/tests/test_service.py b/src/meta_vault/tests/test_service.py index 289b7cce..6eef3d3d 100644 --- a/src/meta_vault/tests/test_service.py +++ b/src/meta_vault/tests/test_service.py @@ -70,6 +70,7 @@ def is_meta_vault_side_effect(addr): result = await distribute_meta_vault_redemption_assets(assets) assert dict(result) == { vaults['vault1']: 100, + vaults['meta_vault']: 300, vaults['sub_vault1']: 120, vaults['sub_vault2']: 180, } @@ -121,6 +122,8 @@ async def sub_vaults_registry_side_effect(self): ): result = await distribute_meta_vault_redemption_assets(assets) assert dict(result) == { + vaults['meta_vault']: 500, + vaults['nested_meta_vault']: 300, vaults['vault2']: 200, vaults['sub_vault1']: 100, vaults['sub_vault2']: 200, @@ -159,6 +162,7 @@ def is_meta_vault_side_effect(addr): result = await distribute_meta_vault_redemption_assets(assets) assert dict(result) == { vaults['vault1']: 50, + vaults['meta_vault']: 150, vaults['vault2']: 75, vaults['sub_vault1']: 60, vaults['sub_vault2']: 90, diff --git a/src/redemptions/tasks.py b/src/redemptions/tasks.py index 83b21df6..b4bc5938 100644 --- a/src/redemptions/tasks.py +++ b/src/redemptions/tasks.py @@ -28,34 +28,47 @@ async def get_redemption_assets(chain_head: ChainHead) -> Wei: Get redemption assets for operator's vault. For Gno networks return value in GNO-Wei. """ - # The contract increments nonce during setRedeemablePositions, - # but uses nonce - 1 for leaf hash computation during redemption. + vault_to_redemption_assets = await get_vault_to_redemption_assets_distributed(chain_head) + return vault_to_redemption_assets[settings.vault] + + +async def get_vault_to_redemption_assets_distributed( + chain_head: ChainHead, +) -> defaultdict[ChecksumAddress, Wei]: + """ + Get per-vault redemption assets after distributing meta vault assets + across their entire sub-vault tree. Contains leaf vaults, root meta vaults, + and all intermediary meta sub-vaults. + + For Gno networks values are in GNO-Wei. + """ nonce = await os_token_redeemer_contract.nonce(chain_head.block_number) if nonce == 0: - logger.info('Zero nonce for redemption. Skipping redemption assets.') - return Wei(0) + logger.debug('Zero nonce for redemption. Skipping redemption assets.') + return defaultdict(lambda: Wei(0)) protocol_config = await get_protocol_config() - # Aggregate redemption assets per vault - vault_to_redemption_assets = await get_vault_to_redemption_assets( + # The contract increments the nonce at the end of setRedeemablePositions, + # so use the previous nonce for leaf hash computation. + vault_to_redemption_assets = await get_vault_to_redemption_assets_direct( chain_head=chain_head, tree_nonce=nonce - 1, protocol_config=protocol_config ) - # Distribute redemption assets from meta vaults to their underlying vaults - vault_to_redemption_assets = await distribute_meta_vault_redemption_assets( + return await distribute_meta_vault_redemption_assets( vault_to_redemption_assets=vault_to_redemption_assets, block_number=chain_head.block_number, ) - # Filter by operator's vault - return vault_to_redemption_assets[settings.vault] -async def get_vault_to_redemption_assets( +async def get_vault_to_redemption_assets_direct( chain_head: ChainHead, tree_nonce: int, protocol_config: ProtocolConfig ) -> defaultdict[ChecksumAddress, Wei]: """ - Get redemption assets per vault. - For Gno networks return value in GNO-Wei. + Get redemption assets per vault, based only on assets directly assigned + to each vault in the IPFS redeemable positions file. Meta vault assets are + not yet distributed across their sub-vault tree. + + For Gno networks return value is in GNO-Wei. """ ticket = await os_token_redeemer_contract.get_exit_queue_cumulative_tickets( block_number=chain_head.block_number diff --git a/src/validators/tasks.py b/src/validators/tasks.py index 966c3e13..1d48ab2c 100644 --- a/src/validators/tasks.py +++ b/src/validators/tasks.py @@ -3,12 +3,13 @@ from typing import Sequence, cast from eth_typing import HexStr -from sw_utils import IpfsFetchClient, convert_to_mgno +from sw_utils import ChainHead, IpfsFetchClient, convert_to_mgno from sw_utils.networks import GNO_NETWORKS from web3 import Web3 -from web3.types import BlockNumber, Gwei +from web3.types import BlockNumber, Gwei, Wei from src.common.clients import execution_client +from src.common.consensus import get_chain_latest_head from src.common.contracts import VaultContract, validators_registry_contract from src.common.execution import check_gas_price from src.common.harvest import get_harvest_params @@ -20,6 +21,7 @@ VALIDATORS_FUNDING_BATCH_SIZE, settings, ) +from src.redemptions.tasks import get_redemption_assets from src.validators.consensus import fetch_compounding_validators_balances from src.validators.database import NetworkValidatorCrud from src.validators.exceptions import EmptyRelayerResponseException, FundingException @@ -52,7 +54,8 @@ async def process(self) -> None: ) harvest_params = await get_harvest_params(settings.vault) - vault_assets = await get_vault_assets(harvest_params=harvest_params) + chain_head = await get_chain_latest_head() + vault_assets = await get_vault_assets(harvest_params=harvest_params, chain_head=chain_head) vault_assets = Gwei(max(0, vault_assets - settings.vault_min_balance_gwei)) if vault_assets < settings.min_deposit_amount_gwei: @@ -261,8 +264,14 @@ async def register_new_validators( return tx_hash -async def get_vault_assets(harvest_params: HarvestParams | None) -> Gwei: +async def get_vault_assets(harvest_params: HarvestParams | None, chain_head: ChainHead) -> Gwei: vault_assets = await get_withdrawable_assets(settings.vault, harvest_params=harvest_params) + + # Reserve operator's vault redemption assets so that withdrawable assets are spent + # on redemptions first, then on validator registrations / fundings. + redemption_assets = await get_redemption_assets(chain_head) + vault_assets = Wei(max(0, vault_assets - redemption_assets)) + if settings.network in GNO_NETWORKS: # apply GNO -> mGNO exchange rate vault_assets = convert_to_mgno(vault_assets)