diff --git a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts index 6e38e93e2..7982630e8 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts @@ -4,9 +4,12 @@ import { PendingRcaConsumer } from '../pending-rca-consumer' import { DecodedRcaProposal } from '../types' import { Allocation, + AllocationManager, AllocationStatus, IndexerManagementModels, + IndexingDecisionBasis, Network, + SubgraphIdentifierType, } from '@graphprotocol/indexer-common' let logger: Logger @@ -107,10 +110,17 @@ function createMockModels() { findOne: jest.fn().mockResolvedValue(null), findAll: jest.fn().mockResolvedValue([]), destroy: jest.fn().mockResolvedValue(1), + upsert: jest.fn().mockResolvedValue([{ id: 1 }, true]), }, } as unknown as IndexerManagementModels } +function createMockParent() { + return { + matchingRuleExists: jest.fn().mockResolvedValue(false), + } as unknown as AllocationManager +} + function createMockNetwork() { return { contracts: { @@ -167,9 +177,10 @@ function createDipsManager( network: Network, models: IndexerManagementModels, consumer: PendingRcaConsumer, + parent: AllocationManager = createMockParent(), ): DipsManager { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const dm = new DipsManager(logger, models, network, {} as any, null) + const dm = new DipsManager(logger, models, network, {} as any, parent) // eslint-disable-next-line @typescript-eslint/no-explicit-any ;(dm as any).pendingRcaConsumer = consumer return dm @@ -592,4 +603,82 @@ describe('DipsManager.acceptPendingProposals', () => { expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) }) }) + + describe('rule creation ordering (race condition fix)', () => { + test('upserts the DIPS indexing rule before broadcasting acceptIndexingAgreement', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + const upsertOrder = (models.IndexingRule.upsert as jest.Mock).mock + .invocationCallOrder[0] + const executeOrder = (network.transactionManager.executeTransaction as jest.Mock) + .mock.invocationCallOrder[0] + + expect(upsertOrder).toBeDefined() + expect(executeOrder).toBeDefined() + expect(upsertOrder).toBeLessThan(executeOrder) + }) + + test('skips rule upsert and rejects proposal when deployment is blocklisted', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + ;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([]) + const models = createMockModels() + ;(models.IndexingRule.findAll as jest.Mock).mockResolvedValue([ + { + identifier: proposal.subgraphDeploymentId.ipfsHash, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + decisionBasis: IndexingDecisionBasis.NEVER, + }, + ]) + const network = createMockNetwork() + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + expect(consumer.markRejected).toHaveBeenCalledWith( + proposal.id, + 'deployment blocklisted', + ) + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + }) + + test('skips rule upsert when parent reports a matching rule already exists', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const parent = { + matchingRuleExists: jest.fn().mockResolvedValue(true), + } as unknown as AllocationManager + + const dm = createDipsManager(network, models, consumer, parent) + + await dm.acceptPendingProposals([allocation]) + + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + }) }) diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index ab4304fd5..df49d6d2a 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -197,59 +197,81 @@ export class DipsManager { ) for (const proposal of proposals) { - const subgraphDeploymentID = proposal.subgraphDeploymentId - this.logger.info( - `Checking if indexing rule exists for proposal ${ - proposal.id - }, deployment ${subgraphDeploymentID.toString()}`, - ) + await this.ensureDipsRuleForProposal(proposal) + } + } - const ruleExists = await this.parent!.matchingRuleExists( - this.logger, - subgraphDeploymentID, - ) + // Upsert the dips indexing rule for a proposal's deployment, or reject the + // proposal if the deployment is blocklisted. Returns true when the caller + // should continue processing the proposal, false when it was rejected. + // + // Shared between the reconcile loop (ensureAgreementRulesFromRca) and the + // fast accept loop (processProposal). The accept path calls this eagerly + // before acceptIndexingAgreement broadcasts, closing the race where the + // accept receipt clears the pending_rca_proposals row before the next + // 15s reconcile tick — without this, graph-node never gets told to deploy + // the subgraph and the agent spins on unallocate attempts for the newly + // created DIPs allocation. + private async ensureDipsRuleForProposal( + proposal: DecodedRcaProposal, + ): Promise { + const subgraphDeploymentID = proposal.subgraphDeploymentId + this.logger.info( + `Checking if indexing rule exists for proposal ${ + proposal.id + }, deployment ${subgraphDeploymentID.toString()}`, + ) - const allDeploymentRules = await this.models.IndexingRule.findAll({ - where: { - identifierType: SubgraphIdentifierType.DEPLOYMENT, - }, - }) - const blocklistedRule = allDeploymentRules.find( - (rule) => - new SubgraphDeploymentID(rule.identifier).bytes32 === - subgraphDeploymentID.bytes32 && - rule.decisionBasis === IndexingDecisionBasis.NEVER, + const ruleExists = await this.parent!.matchingRuleExists( + this.logger, + subgraphDeploymentID, + ) + + const allDeploymentRules = await this.models.IndexingRule.findAll({ + where: { + identifierType: SubgraphIdentifierType.DEPLOYMENT, + }, + }) + const blocklistedRule = allDeploymentRules.find( + (rule) => + new SubgraphDeploymentID(rule.identifier).bytes32 === + subgraphDeploymentID.bytes32 && + rule.decisionBasis === IndexingDecisionBasis.NEVER, + ) + + if (blocklistedRule) { + this.logger.info( + `Blocklisted deployment ${subgraphDeploymentID.toString()}, rejecting proposal`, ) + await this.pendingRcaConsumer!.markRejected(proposal.id, 'deployment blocklisted') + return false + } - if (blocklistedRule) { - this.logger.info( - `Blocklisted deployment ${subgraphDeploymentID.toString()}, rejecting proposal`, - ) - await this.pendingRcaConsumer!.markRejected(proposal.id, 'deployment blocklisted') - } else if (!ruleExists) { - this.logger.info( - `Creating indexing rule for proposal ${ - proposal.id - }, deployment ${subgraphDeploymentID.toString()}`, - ) - const { amount } = await this.getDipsAllocationAmount(subgraphDeploymentID) - const indexingRule = { - identifier: subgraphDeploymentID.ipfsHash, - allocationAmount: formatGRT(amount), - identifierType: SubgraphIdentifierType.DEPLOYMENT, - decisionBasis: IndexingDecisionBasis.DIPS, - protocolNetwork: this.network.specification.networkIdentifier, - autoRenewal: true, - allocationLifetime: Math.max( - Number(proposal.minSecondsPerCollection), - Number(proposal.maxSecondsPerCollection), - ), - requireSupported: false, - } as Partial + if (!ruleExists) { + this.logger.info( + `Creating indexing rule for proposal ${ + proposal.id + }, deployment ${subgraphDeploymentID.toString()}`, + ) + const { amount } = await this.getDipsAllocationAmount(subgraphDeploymentID) + const indexingRule = { + identifier: subgraphDeploymentID.ipfsHash, + allocationAmount: formatGRT(amount), + identifierType: SubgraphIdentifierType.DEPLOYMENT, + decisionBasis: IndexingDecisionBasis.DIPS, + protocolNetwork: this.network.specification.networkIdentifier, + autoRenewal: true, + allocationLifetime: Math.max( + Number(proposal.minSecondsPerCollection), + Number(proposal.maxSecondsPerCollection), + ), + requireSupported: false, + } as Partial - await upsertIndexingRule(this.logger, this.models, indexingRule) - } + await upsertIndexingRule(this.logger, this.models, indexingRule) } + + return true } async acceptPendingProposals(activeAllocations: Allocation[]): Promise { @@ -297,6 +319,11 @@ export class DipsManager { return } + const shouldProceed = await this.ensureDipsRuleForProposal(proposal) + if (!shouldProceed) { + return + } + const allocation = activeAllocations.find( (a) => a.subgraphDeployment.id.bytes32 === proposal.subgraphDeploymentId.bytes32, )