Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import { PendingRcaConsumer } from '../pending-rca-consumer'
import { DecodedRcaProposal } from '../types'
import {
Allocation,
AllocationManager,
AllocationStatus,
IndexerManagementModels,
IndexingDecisionBasis,
Network,
SubgraphIdentifierType,
} from '@graphprotocol/indexer-common'

let logger: Logger
Expand Down Expand Up @@ -107,10 +110,17 @@ function createMockModels() {
findOne: jest.fn().mockResolvedValue(null),
findAll: jest.fn().mockResolvedValue([]),
destroy: jest.fn().mockResolvedValue(1),
upsert: jest.fn().mockResolvedValue([{ id: 1 }, true]),
},
} as unknown as IndexerManagementModels
}

function createMockParent() {
return {
matchingRuleExists: jest.fn().mockResolvedValue(false),
} as unknown as AllocationManager
}

function createMockNetwork() {
return {
contracts: {
Expand Down Expand Up @@ -167,9 +177,10 @@ function createDipsManager(
network: Network,
models: IndexerManagementModels,
consumer: PendingRcaConsumer,
parent: AllocationManager = createMockParent(),
): DipsManager {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const dm = new DipsManager(logger, models, network, {} as any, null)
const dm = new DipsManager(logger, models, network, {} as any, parent)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(dm as any).pendingRcaConsumer = consumer
return dm
Expand Down Expand Up @@ -592,4 +603,82 @@ describe('DipsManager.acceptPendingProposals', () => {
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})
})

describe('rule creation ordering (race condition fix)', () => {
test('upserts the DIPS indexing rule before broadcasting acceptIndexingAgreement', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})

const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

const upsertOrder = (models.IndexingRule.upsert as jest.Mock).mock
.invocationCallOrder[0]
const executeOrder = (network.transactionManager.executeTransaction as jest.Mock)
.mock.invocationCallOrder[0]

expect(upsertOrder).toBeDefined()
expect(executeOrder).toBeDefined()
expect(upsertOrder).toBeLessThan(executeOrder)
})

test('skips rule upsert and rejects proposal when deployment is blocklisted', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([])
const models = createMockModels()
;(models.IndexingRule.findAll as jest.Mock).mockResolvedValue([
{
identifier: proposal.subgraphDeploymentId.ipfsHash,
identifierType: SubgraphIdentifierType.DEPLOYMENT,
decisionBasis: IndexingDecisionBasis.NEVER,
},
])
const network = createMockNetwork()

const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

expect(consumer.markRejected).toHaveBeenCalledWith(
proposal.id,
'deployment blocklisted',
)
expect(models.IndexingRule.upsert).not.toHaveBeenCalled()
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
})

test('skips rule upsert when parent reports a matching rule already exists', async () => {
const proposal = createMockProposal()
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})

const parent = {
matchingRuleExists: jest.fn().mockResolvedValue(true),
} as unknown as AllocationManager

const dm = createDipsManager(network, models, consumer, parent)

await dm.acceptPendingProposals([allocation])

expect(models.IndexingRule.upsert).not.toHaveBeenCalled()
expect(network.transactionManager.executeTransaction).toHaveBeenCalled()
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})
})
})
121 changes: 74 additions & 47 deletions packages/indexer-common/src/indexing-fees/dips.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,59 +197,81 @@ export class DipsManager {
)

for (const proposal of proposals) {
const subgraphDeploymentID = proposal.subgraphDeploymentId
this.logger.info(
`Checking if indexing rule exists for proposal ${
proposal.id
}, deployment ${subgraphDeploymentID.toString()}`,
)
await this.ensureDipsRuleForProposal(proposal)
}
}

const ruleExists = await this.parent!.matchingRuleExists(
this.logger,
subgraphDeploymentID,
)
// Upsert the dips indexing rule for a proposal's deployment, or reject the
// proposal if the deployment is blocklisted. Returns true when the caller
// should continue processing the proposal, false when it was rejected.
//
// Shared between the reconcile loop (ensureAgreementRulesFromRca) and the
// fast accept loop (processProposal). The accept path calls this eagerly
// before acceptIndexingAgreement broadcasts, closing the race where the
// accept receipt clears the pending_rca_proposals row before the next
// 15s reconcile tick — without this, graph-node never gets told to deploy
// the subgraph and the agent spins on unallocate attempts for the newly
// created DIPs allocation.
private async ensureDipsRuleForProposal(
proposal: DecodedRcaProposal,
): Promise<boolean> {
const subgraphDeploymentID = proposal.subgraphDeploymentId
this.logger.info(
`Checking if indexing rule exists for proposal ${
proposal.id
}, deployment ${subgraphDeploymentID.toString()}`,
)

const allDeploymentRules = await this.models.IndexingRule.findAll({
where: {
identifierType: SubgraphIdentifierType.DEPLOYMENT,
},
})
const blocklistedRule = allDeploymentRules.find(
(rule) =>
new SubgraphDeploymentID(rule.identifier).bytes32 ===
subgraphDeploymentID.bytes32 &&
rule.decisionBasis === IndexingDecisionBasis.NEVER,
const ruleExists = await this.parent!.matchingRuleExists(
this.logger,
subgraphDeploymentID,
)

const allDeploymentRules = await this.models.IndexingRule.findAll({
where: {
identifierType: SubgraphIdentifierType.DEPLOYMENT,
},
})
const blocklistedRule = allDeploymentRules.find(
(rule) =>
new SubgraphDeploymentID(rule.identifier).bytes32 ===
subgraphDeploymentID.bytes32 &&
rule.decisionBasis === IndexingDecisionBasis.NEVER,
)

if (blocklistedRule) {
this.logger.info(
`Blocklisted deployment ${subgraphDeploymentID.toString()}, rejecting proposal`,
)
await this.pendingRcaConsumer!.markRejected(proposal.id, 'deployment blocklisted')
return false
}

if (blocklistedRule) {
this.logger.info(
`Blocklisted deployment ${subgraphDeploymentID.toString()}, rejecting proposal`,
)
await this.pendingRcaConsumer!.markRejected(proposal.id, 'deployment blocklisted')
} else if (!ruleExists) {
this.logger.info(
`Creating indexing rule for proposal ${
proposal.id
}, deployment ${subgraphDeploymentID.toString()}`,
)
const { amount } = await this.getDipsAllocationAmount(subgraphDeploymentID)
const indexingRule = {
identifier: subgraphDeploymentID.ipfsHash,
allocationAmount: formatGRT(amount),
identifierType: SubgraphIdentifierType.DEPLOYMENT,
decisionBasis: IndexingDecisionBasis.DIPS,
protocolNetwork: this.network.specification.networkIdentifier,
autoRenewal: true,
allocationLifetime: Math.max(
Number(proposal.minSecondsPerCollection),
Number(proposal.maxSecondsPerCollection),
),
requireSupported: false,
} as Partial<IndexingRuleAttributes>
if (!ruleExists) {
this.logger.info(
`Creating indexing rule for proposal ${
proposal.id
}, deployment ${subgraphDeploymentID.toString()}`,
)
const { amount } = await this.getDipsAllocationAmount(subgraphDeploymentID)
const indexingRule = {
identifier: subgraphDeploymentID.ipfsHash,
allocationAmount: formatGRT(amount),
identifierType: SubgraphIdentifierType.DEPLOYMENT,
decisionBasis: IndexingDecisionBasis.DIPS,
protocolNetwork: this.network.specification.networkIdentifier,
autoRenewal: true,
allocationLifetime: Math.max(
Number(proposal.minSecondsPerCollection),
Number(proposal.maxSecondsPerCollection),
),
requireSupported: false,
} as Partial<IndexingRuleAttributes>

await upsertIndexingRule(this.logger, this.models, indexingRule)
}
await upsertIndexingRule(this.logger, this.models, indexingRule)
}

return true
}

async acceptPendingProposals(activeAllocations: Allocation[]): Promise<void> {
Expand Down Expand Up @@ -297,6 +319,11 @@ export class DipsManager {
return
}

const shouldProceed = await this.ensureDipsRuleForProposal(proposal)
if (!shouldProceed) {
return
}

const allocation = activeAllocations.find(
(a) => a.subgraphDeployment.id.bytes32 === proposal.subgraphDeploymentId.bytes32,
)
Expand Down
Loading