diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index dc8fbb237..b4b5b98d1 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -727,6 +727,8 @@ export class Agent { await operator.dipsManager.matchAgreementAllocations( activeAllocations, ) + + await operator.dipsManager.collectAgreementPayments() } }, ) diff --git a/packages/indexer-agent/src/commands/start.ts b/packages/indexer-agent/src/commands/start.ts index 5ee288ea1..98d8ce0a8 100644 --- a/packages/indexer-agent/src/commands/start.ts +++ b/packages/indexer-agent/src/commands/start.ts @@ -395,6 +395,15 @@ export const start = { required: false, group: 'Indexing Fees ("DIPs")', }) + .option('dips-collection-target', { + description: + 'Target collection point within the agreement window as a percentage (1-90). ' + + 'Lower values collect sooner (safer), higher values collect later (fewer txs).', + type: 'number', + default: 50, + required: false, + group: 'Indexing Fees ("DIPs")', + }) .check(argv => { if ( !argv['network-subgraph-endpoint'] && @@ -471,6 +480,7 @@ export async function createNetworkSpecification( ravCollectionInterval: argv.ravCollectionInterval, ravCheckInterval: argv.ravCheckInterval, dipsEpochsMargin: argv.dipsEpochsMargin, + dipsCollectionTarget: argv.dipsCollectionTarget, } const transactionMonitoring = { diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 6fc02c116..79716f515 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -174,6 +174,7 @@ export class AllocationManager { this.logger, this.models, this.network, + this.graphNode, this, this.pendingRcaModel, ) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts index e09a7c6b3..6e38e93e2 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts @@ -168,7 +168,8 @@ function createDipsManager( models: IndexerManagementModels, consumer: PendingRcaConsumer, ): DipsManager { - const dm = new DipsManager(logger, models, network, null) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const dm = new DipsManager(logger, models, network, {} as any, null) // eslint-disable-next-line @typescript-eslint/no-explicit-any ;(dm as any).pendingRcaConsumer = consumer return dm @@ -246,7 +247,8 @@ describe('DipsManager.acceptPendingProposals', () => { test('returns early when pendingRcaConsumer is null', async () => { const models = createMockModels() const network = createMockNetwork() - const dm = new DipsManager(logger, models, network, null) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const dm = new DipsManager(logger, models, network, {} as any, null) // Should not throw await dm.acceptPendingProposals([]) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/agreement-monitor.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/agreement-monitor.test.ts new file mode 100644 index 000000000..f1788d15c --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/agreement-monitor.test.ts @@ -0,0 +1,103 @@ +/* eslint-disable @typescript-eslint/no-explicit-any,@typescript-eslint/no-unused-vars */ +import { + fetchCollectableAgreements, + SubgraphIndexingAgreement, +} from '../agreement-monitor' + +const mockQuery = jest.fn() +const mockNetworkSubgraph = { query: mockQuery } as any + +const INDEXER_ADDRESS = '0x1234567890abcdef1234567890abcdef12345678' + +describe('fetchCollectableAgreements', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + test('returns agreements in Accepted and CanceledByPayer states', async () => { + mockQuery.mockResolvedValueOnce({ + data: { + indexingAgreements: [ + { + id: '0x00000000000000000000000000000001', + allocationId: '0xaaaa', + subgraphDeploymentId: '0xbbbb', + state: 1, + lastCollectionAt: '1000', + endsAt: '9999999999', + maxInitialTokens: '1000000', + maxOngoingTokensPerSecond: '100', + tokensPerSecond: '50', + tokensPerEntityPerSecond: '10', + minSecondsPerCollection: 3600, + maxSecondsPerCollection: 86400, + canceledAt: '0', + }, + ], + }, + }) + + const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS) + + expect(result).toHaveLength(1) + expect(result[0].id).toBe('0x00000000000000000000000000000001') + expect(result[0].state).toBe(1) + expect(mockQuery).toHaveBeenCalledTimes(1) + }) + + test('returns empty array when no agreements exist', async () => { + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [] }, + }) + + const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS) + + expect(result).toHaveLength(0) + }) + + test('paginates through large result sets', async () => { + // First page: 1000 results + const page1 = Array.from({ length: 1000 }, (_, i) => ({ + id: `0x${i.toString(16).padStart(32, '0')}`, + allocationId: '0xaaaa', + subgraphDeploymentId: '0xbbbb', + state: 1, + lastCollectionAt: '1000', + endsAt: '9999999999', + maxInitialTokens: '1000000', + maxOngoingTokensPerSecond: '100', + tokensPerSecond: '50', + tokensPerEntityPerSecond: '10', + minSecondsPerCollection: 3600, + maxSecondsPerCollection: 86400, + canceledAt: '0', + })) + // Second page: 1 result + const page2 = [ + { + id: '0x' + 'f'.repeat(32), + allocationId: '0xaaaa', + subgraphDeploymentId: '0xbbbb', + state: 1, + lastCollectionAt: '1000', + endsAt: '9999999999', + maxInitialTokens: '1000000', + maxOngoingTokensPerSecond: '100', + tokensPerSecond: '50', + tokensPerEntityPerSecond: '10', + minSecondsPerCollection: 3600, + maxSecondsPerCollection: 86400, + canceledAt: '0', + }, + ] + + mockQuery + .mockResolvedValueOnce({ data: { indexingAgreements: page1 } }) + .mockResolvedValueOnce({ data: { indexingAgreements: page2 } }) + + const result = await fetchCollectableAgreements(mockNetworkSubgraph, INDEXER_ADDRESS) + + expect(result).toHaveLength(1001) + expect(mockQuery).toHaveBeenCalledTimes(2) + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/collect-agreements.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/collect-agreements.test.ts new file mode 100644 index 000000000..94c8d3caf --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/collect-agreements.test.ts @@ -0,0 +1,188 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Logger } from '@graphprotocol/common-ts' +import { DipsManager } from '../dips' + +const logger = { + child: jest.fn().mockReturnThis(), + info: jest.fn(), + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + trace: jest.fn(), +} as unknown as Logger + +const mockQuery = jest.fn() +const mockNetworkSubgraph = { query: mockQuery } as any + +const mockCollectEstimateGas = jest.fn() +const mockCollect = jest.fn() + +const mockContracts = { + SubgraphService: { + collect: Object.assign(mockCollect, { + estimateGas: mockCollectEstimateGas, + }), + }, +} as any + +const mockExecuteTransaction = jest.fn() +const mockTransactionManager = { + executeTransaction: mockExecuteTransaction, +} as any + +const mockGraphNode = { + entityCount: jest.fn(), + proofOfIndexing: jest.fn(), + blockHashFromNumber: jest.fn(), + subgraphFeatures: jest.fn().mockResolvedValue({ network: 'mainnet' }), +} as any + +const mockNetwork = { + contracts: mockContracts, + networkSubgraph: mockNetworkSubgraph, + transactionManager: mockTransactionManager, + specification: { + indexerOptions: { + address: '0x1234567890abcdef1234567890abcdef12345678', + dipperEndpoint: undefined, + dipsCollectionTarget: 50, + }, + networkIdentifier: 'eip155:421614', + }, + networkProvider: { + getBlockNumber: jest.fn().mockResolvedValue(1000), + getBlock: jest.fn().mockResolvedValue({ timestamp: Math.floor(Date.now() / 1000) }), + }, +} as any + +const mockModels = {} as any + +function createDipsManager(): DipsManager { + return new DipsManager(logger, mockModels, mockNetwork, mockGraphNode, null) +} + +// Helper: agreement that was last collected long ago (ready to collect) +function makeReadyAgreement(id = '0x00000000000000000000000000000001') { + return { + id, + allocationId: '0xaaaa', + subgraphDeploymentId: + '0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', + state: 1, + lastCollectionAt: '0', // never collected → always ready + endsAt: '9999999999', + maxInitialTokens: '1000000', + maxOngoingTokensPerSecond: '100', + tokensPerSecond: '50', + tokensPerEntityPerSecond: '10', + minSecondsPerCollection: 3600, + maxSecondsPerCollection: 86400, + canceledAt: '0', + } +} + +describe('DipsManager.collectAgreementPayments', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + test('skips when no collectable agreements found', async () => { + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [] }, + }) + + const dm = createDipsManager() + await dm.collectAgreementPayments() + + expect(mockExecuteTransaction).not.toHaveBeenCalled() + }) + + test('skips agreement when tracker says not ready yet', async () => { + const recentlyCollected = makeReadyAgreement() + // Collected very recently — min is 3600, target at 50% is ~45000s + recentlyCollected.lastCollectionAt = String(Math.floor(Date.now() / 1000) - 100) + + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [recentlyCollected] }, + }) + + const dm = createDipsManager() + await dm.collectAgreementPayments() + + expect(mockExecuteTransaction).not.toHaveBeenCalled() + }) + + test('collects payment when agreement is ready', async () => { + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [makeReadyAgreement()] }, + }) + + mockGraphNode.entityCount.mockResolvedValueOnce([500]) + mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32)) + mockGraphNode.proofOfIndexing.mockResolvedValueOnce('0x' + 'cd'.repeat(32)) + mockExecuteTransaction.mockResolvedValueOnce({ hash: '0xtxhash', status: 1 }) + + const dm = createDipsManager() + await dm.collectAgreementPayments() + + expect(mockExecuteTransaction).toHaveBeenCalledTimes(1) + }) + + test('updates tracker after successful collection', async () => { + mockQuery + .mockResolvedValueOnce({ data: { indexingAgreements: [makeReadyAgreement()] } }) + .mockResolvedValueOnce({ data: { indexingAgreements: [makeReadyAgreement()] } }) + + mockGraphNode.entityCount.mockResolvedValue([500]) + mockGraphNode.blockHashFromNumber.mockResolvedValue('0x' + 'ab'.repeat(32)) + mockGraphNode.proofOfIndexing.mockResolvedValue('0x' + 'cd'.repeat(32)) + mockExecuteTransaction.mockResolvedValue({ hash: '0xtxhash', status: 1 }) + + const dm = createDipsManager() + + // First call: collects + await dm.collectAgreementPayments() + expect(mockExecuteTransaction).toHaveBeenCalledTimes(1) + + // Second call: tracker should skip (just collected) + await dm.collectAgreementPayments() + // Still only 1 call — second was skipped by tracker + expect(mockExecuteTransaction).toHaveBeenCalledTimes(1) + }) + + test('still attempts collection when POI is unavailable (best effort)', async () => { + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [makeReadyAgreement()] }, + }) + + mockGraphNode.entityCount.mockResolvedValueOnce([500]) + mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32)) + mockGraphNode.proofOfIndexing.mockResolvedValueOnce(null) // POI unavailable + mockExecuteTransaction.mockResolvedValueOnce({ hash: '0xtxhash', status: 1 }) + + const dm = createDipsManager() + await dm.collectAgreementPayments() + + // Should log warning but still attempt collection with zero POI + expect(logger.warn).toHaveBeenCalled() + expect(mockExecuteTransaction).toHaveBeenCalledTimes(1) + }) + + test('handles deterministic error gracefully', async () => { + mockQuery.mockResolvedValueOnce({ + data: { indexingAgreements: [makeReadyAgreement()] }, + }) + + mockGraphNode.entityCount.mockResolvedValueOnce([500]) + mockGraphNode.blockHashFromNumber.mockResolvedValueOnce('0x' + 'ab'.repeat(32)) + mockGraphNode.proofOfIndexing.mockResolvedValueOnce('0x' + 'cd'.repeat(32)) + mockExecuteTransaction.mockRejectedValueOnce( + Object.assign(new Error('revert'), { code: 'CALL_EXCEPTION' }), + ) + + const dm = createDipsManager() + await dm.collectAgreementPayments() + + expect(logger.warn).toHaveBeenCalled() + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/collection-tracker.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/collection-tracker.test.ts new file mode 100644 index 000000000..91155bd90 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/collection-tracker.test.ts @@ -0,0 +1,110 @@ +import { CollectionTracker, AgreementTimingState } from '../collection-tracker' + +describe('CollectionTracker', () => { + const DEFAULT_TARGET_PCT = 50 + const NOW = 1000000 + + function makeState( + overrides: Partial = {}, + ): AgreementTimingState { + return { + lastCollectedAt: NOW - 5000, + minSecondsPerCollection: 3600, + maxSecondsPerCollection: 86400, + ...overrides, + } + } + + describe('isReadyForCollection', () => { + test('returns false when elapsed time is below target', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState()) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + }) + + test('returns true when elapsed time exceeds target', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + }) + + test('returns true for untracked agreement (forces subgraph refresh)', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + expect(tracker.isReadyForCollection('0xunknown', NOW)).toBe(true) + }) + + test('respects different target percentages', () => { + const tracker = new CollectionTracker(10) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 12000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + }) + + test('handles first collection (lastCollectedAt = 0, uses acceptedAt)', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: 0 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + }) + }) + + describe('track and updateAfterCollection', () => { + test('updateAfterCollection updates lastCollectedAt', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + + tracker.updateAfterCollection('0x01', NOW) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + }) + + test('track does not overwrite more recent local lastCollectedAt (subgraph lag)', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + tracker.updateAfterCollection('0x01', NOW) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + }) + + test('track updates when subgraph has newer data than local', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 100 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + }) + + test('remove stops tracking an agreement', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState()) + tracker.remove('0x01') + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + }) + }) + + describe('getReadyAgreements', () => { + test('returns only agreements past their target time', () => { + const tracker = new CollectionTracker(DEFAULT_TARGET_PCT) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 50000 })) + tracker.track('0x02', makeState({ lastCollectedAt: NOW - 5000 })) + tracker.track('0x03', makeState({ lastCollectedAt: NOW - 46000 })) + + const ready = tracker.getReadyAgreements(NOW) + expect(ready.sort()).toEqual(['0x01', '0x03'].sort()) + }) + }) + + describe('target percentage clamping', () => { + test('clamps target above 90 to 90', () => { + const tracker = new CollectionTracker(100) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 80000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(true) + }) + + test('clamps target below 1 to 1', () => { + const tracker = new CollectionTracker(0) + tracker.track('0x01', makeState({ lastCollectedAt: NOW - 4000 })) + expect(tracker.isReadyForCollection('0x01', NOW)).toBe(false) + }) + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/dips.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/dips.test.ts index fcb8c807d..636c515bf 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/dips.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/dips.test.ts @@ -175,7 +175,13 @@ describe('DipsManager', () => { describe('initialization', () => { test('creates DipsManager when dipperEndpoint is configured', () => { - const dipsManager = new DipsManager(logger, managementModels, network, null) + const dipsManager = new DipsManager( + logger, + managementModels, + network, + graphNode, + null, + ) expect(dipsManager).toBeDefined() }) @@ -201,6 +207,7 @@ describe('DipsManager', () => { logger, managementModels, networkWithoutDipper, + graphNode, null, ) expect(dipsManager).toBeDefined() @@ -225,7 +232,13 @@ describe('DipsManager', () => { network, ) - dipsManager = new DipsManager(logger, managementModels, network, allocationManager) + dipsManager = new DipsManager( + logger, + managementModels, + network, + graphNode, + allocationManager, + ) // Create a test agreement await managementModels.IndexingAgreement.create({ diff --git a/packages/indexer-common/src/indexing-fees/agreement-monitor.ts b/packages/indexer-common/src/indexing-fees/agreement-monitor.ts new file mode 100644 index 000000000..413256d5a --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/agreement-monitor.ts @@ -0,0 +1,68 @@ +import gql from 'graphql-tag' +import { SubgraphClient } from '../subgraph-client' + +export interface SubgraphIndexingAgreement { + id: string + allocationId: string + subgraphDeploymentId: string + state: number + lastCollectionAt: string + endsAt: string + maxInitialTokens: string + maxOngoingTokensPerSecond: string + tokensPerSecond: string + tokensPerEntityPerSecond: string + minSecondsPerCollection: number + maxSecondsPerCollection: number + canceledAt: string +} + +const INDEXING_AGREEMENTS_QUERY = gql` + query indexingAgreements($indexer: String!, $lastId: String!) { + indexingAgreements( + where: { serviceProvider: $indexer, state_in: [1, 3], id_gt: $lastId } + orderBy: id + orderDirection: asc + first: 1000 + ) { + id + allocationId + subgraphDeploymentId + state + lastCollectionAt + endsAt + maxInitialTokens + maxOngoingTokensPerSecond + tokensPerSecond + tokensPerEntityPerSecond + minSecondsPerCollection + maxSecondsPerCollection + canceledAt + } + } +` + +export async function fetchCollectableAgreements( + networkSubgraph: SubgraphClient, + indexerAddress: string, +): Promise { + const all: SubgraphIndexingAgreement[] = [] + let lastId = '' + + for (;;) { + const result = await networkSubgraph.query(INDEXING_AGREEMENTS_QUERY, { + indexer: indexerAddress.toLowerCase(), + lastId, + }) + + if (!result.data?.indexingAgreements?.length) break + + const agreements: SubgraphIndexingAgreement[] = result.data.indexingAgreements + all.push(...agreements) + + if (agreements.length < 1000) break + lastId = agreements[agreements.length - 1].id + } + + return all +} diff --git a/packages/indexer-common/src/indexing-fees/collection-tracker.ts b/packages/indexer-common/src/indexing-fees/collection-tracker.ts new file mode 100644 index 000000000..64a4c6921 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/collection-tracker.ts @@ -0,0 +1,61 @@ +export interface AgreementTimingState { + lastCollectedAt: number // unix timestamp (0 = never collected, use acceptedAt externally) + minSecondsPerCollection: number + maxSecondsPerCollection: number +} + +export class CollectionTracker { + private state: Map = new Map() + private targetPct: number + + constructor(targetPercentage: number) { + this.targetPct = Math.min(90, Math.max(1, targetPercentage)) / 100 + } + + track(agreementId: string, timing: AgreementTimingState): void { + const existing = this.state.get(agreementId) + if (existing && existing.lastCollectedAt > timing.lastCollectedAt) { + // Keep local lastCollectedAt if more recent (subgraph may lag behind) + existing.minSecondsPerCollection = timing.minSecondsPerCollection + existing.maxSecondsPerCollection = timing.maxSecondsPerCollection + return + } + this.state.set(agreementId, { ...timing }) + } + + remove(agreementId: string): void { + this.state.delete(agreementId) + } + + updateAfterCollection(agreementId: string, collectedAt: number): void { + const existing = this.state.get(agreementId) + if (existing) { + existing.lastCollectedAt = collectedAt + } + } + + isReadyForCollection(agreementId: string, now: number): boolean { + const timing = this.state.get(agreementId) + if (!timing) return true // untracked → force check + + const elapsed = now - timing.lastCollectedAt + const targetSeconds = this.computeTargetSeconds(timing) + return elapsed >= targetSeconds + } + + getReadyAgreements(now: number): string[] { + const ready: string[] = [] + for (const [id, timing] of this.state) { + const elapsed = now - timing.lastCollectedAt + if (elapsed >= this.computeTargetSeconds(timing)) { + ready.push(id) + } + } + return ready + } + + private computeTargetSeconds(timing: AgreementTimingState): number { + const windowSize = timing.maxSecondsPerCollection - timing.minSecondsPerCollection + return timing.minSecondsPerCollection + windowSize * this.targetPct + } +} diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index f7ec8bce6..f585b1466 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -35,9 +35,14 @@ import { PendingRcaConsumer } from './pending-rca-consumer' import { DecodedRcaProposal } from './types' import { tryParseCustomError } from '../utils' import { uniqueAllocationID, horizonAllocationIdProof } from '../allocations/keys' -import { encodeStartServiceData } from '@graphprotocol/toolshed' +import { encodeStartServiceData, PaymentTypes } from '@graphprotocol/toolshed' import { NetworkSpecification } from '../network-specification' -import { BaseWallet, Signer } from 'ethers' +import { AbiCoder, BaseWallet, Signer } from 'ethers' +import { + fetchCollectableAgreements, + SubgraphIndexingAgreement, +} from './agreement-monitor' +import { CollectionTracker } from './collection-tracker' const DIPS_COLLECTION_INTERVAL = 60_000 @@ -53,10 +58,12 @@ export class DipsManager { declare gatewayDipsServiceClient: GatewayDipsServiceClientImpl declare gatewayDipsServiceMessagesCodec: GatewayDipsServiceMessagesCodec declare pendingRcaConsumer: PendingRcaConsumer | null + declare collectionTracker: CollectionTracker constructor( private logger: Logger, private models: IndexerManagementModels, private network: Network, + private graphNode: GraphNode, private parent: AllocationManager | null, pendingRcaModel?: typeof PendingRcaProposal, ) { @@ -74,6 +81,10 @@ export class DipsManager { } else { this.pendingRcaConsumer = null } + + this.collectionTracker = new CollectionTracker( + this.network.specification.indexerOptions.dipsCollectionTarget, + ) } // Cancel an agreement associated to an allocation if it exists async tryCancelAgreement(allocationId: string) { @@ -470,6 +481,146 @@ export class DipsManager { } } + async collectAgreementPayments(): Promise { + const logger = this.logger.child({ function: 'collectAgreementPayments' }) + const indexerAddress = this.network.specification.indexerOptions.address + + const agreements = await fetchCollectableAgreements( + this.network.networkSubgraph, + indexerAddress, + ) + + if (agreements.length === 0) { + logger.debug('No collectable agreements found') + return + } + + // Use chain timestamp for consistency with contract timing and subgraph data + const blockNumber = await this.network.networkProvider.getBlockNumber() + const block = await this.network.networkProvider.getBlock(blockNumber) + const nowSeconds = block ? Number(block.timestamp) : Math.floor(Date.now() / 1000) + + // Sync tracker state from subgraph data + for (const agreement of agreements) { + this.collectionTracker.track(agreement.id, { + lastCollectedAt: Number(agreement.lastCollectionAt), + minSecondsPerCollection: agreement.minSecondsPerCollection, + maxSecondsPerCollection: agreement.maxSecondsPerCollection, + }) + } + + const readyIds = this.collectionTracker.getReadyAgreements(nowSeconds) + if (readyIds.length === 0) { + logger.debug('No agreements ready for collection', { + total: agreements.length, + }) + return + } + + logger.info( + `${readyIds.length} of ${agreements.length} agreement(s) ready for collection`, + ) + + const readyAgreements = agreements.filter((a) => readyIds.includes(a.id)) + + for (const agreement of readyAgreements) { + try { + await this.tryCollectAgreement(agreement, blockNumber, logger) + this.collectionTracker.updateAfterCollection(agreement.id, nowSeconds) + } catch (err) { + if (this.isDeterministicError(err)) { + const parsedError = tryParseCustomError(err) + logger.warn('Deterministic error collecting agreement, skipping', { + agreementId: agreement.id, + error: parsedError, + }) + } else { + const errorMsg = err instanceof Error ? err.message : String(err) + const errorStack = err instanceof Error ? err.stack : undefined + logger.warn('Transient error collecting agreement, will retry', { + agreementId: agreement.id, + error: errorMsg, + stack: errorStack, + }) + } + } + } + } + + private async tryCollectAgreement( + agreement: SubgraphIndexingAgreement, + blockNumber: number, + logger: Logger, + ): Promise { + const deploymentId = new SubgraphDeploymentID(agreement.subgraphDeploymentId) + const entityCounts = await this.graphNode.entityCount([deploymentId]) + const entities = entityCounts[0] + + const recentBlock = blockNumber - 10 + const { network: networkAlias } = await this.graphNode.subgraphFeatures(deploymentId) + const blockHash = await this.graphNode.blockHashFromNumber(networkAlias!, recentBlock) + const poi = await this.graphNode.proofOfIndexing( + deploymentId, + { number: recentBlock, hash: blockHash }, + this.network.specification.indexerOptions.address, + ) + + if (!poi) { + logger.warn('Could not get POI for agreement, using zero POI', { + agreementId: agreement.id, + deployment: deploymentId.ipfsHash, + }) + } + + const effectivePoi = + poi || '0x0000000000000000000000000000000000000000000000000000000000000000' + + const abiCoder = AbiCoder.defaultAbiCoder() + + const collectData = abiCoder.encode( + ['uint256', 'bytes32', 'uint256', 'bytes', 'uint256'], + [entities, effectivePoi, recentBlock, '0x', 0], + ) + + const data = abiCoder.encode(['bytes16', 'bytes'], [agreement.id, collectData]) + + const indexerAddress = this.network.specification.indexerOptions.address + const receipt = await this.network.transactionManager.executeTransaction( + async () => + this.network.contracts.SubgraphService.collect.estimateGas( + indexerAddress, + PaymentTypes.IndexingFee, + data, + ), + async (gasLimit) => + this.network.contracts.SubgraphService.collect( + indexerAddress, + PaymentTypes.IndexingFee, + data, + { gasLimit }, + ), + logger.child({ + function: 'SubgraphService.collect', + agreementId: agreement.id, + }), + ) + + if (receipt === 'paused' || receipt === 'unauthorized') { + logger.warn('Cannot collect: network paused or unauthorized', { + agreementId: agreement.id, + result: receipt, + }) + return + } + + logger.info('Successfully collected indexing fees', { + agreementId: agreement.id, + txHash: receipt.hash, + deployment: deploymentId.ipfsHash, + entities, + }) + } + private async handleAcceptError( consumer: PendingRcaConsumer, proposal: DecodedRcaProposal, diff --git a/packages/indexer-common/src/network-specification.ts b/packages/indexer-common/src/network-specification.ts index 4b49ac115..813d92314 100644 --- a/packages/indexer-common/src/network-specification.ts +++ b/packages/indexer-common/src/network-specification.ts @@ -78,6 +78,7 @@ export const IndexerOptions = z dipsAllocationAmount: GRT().default(0), ravCollectionInterval: positiveNumber().default(14400), dipsEpochsMargin: positiveNumber().default(1), + dipsCollectionTarget: positiveNumber().min(1).max(90).default(50), }) .strict() export type IndexerOptions = z.infer