Skip to content
105 changes: 105 additions & 0 deletions packages/indexer-agent/src/__tests__/agent.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Agent,
addIndexingPaymentsSubgraphToTarget,
convertSubgraphBasedRulesToDeploymentBased,
consolidateAllocationDecisions,
resolveTargetDeployments,
Expand Down Expand Up @@ -472,3 +473,107 @@ describe('reconcileDeploymentAllocationAction', () => {
expect(operator.presentPOIForAllocations).not.toHaveBeenCalled()
})
})

describe('addIndexingPaymentsSubgraphToTarget function', () => {
const paymentsDeployment = new SubgraphDeploymentID(
'QmddSbatCN1XmufoBm1bBwPx4L3FtuMAMHUNNBSPzrgL2a',
)

it('pushes the deployment when enableDips is true and deployment is defined', () => {
const target: SubgraphDeploymentID[] = []
addIndexingPaymentsSubgraphToTarget(true, paymentsDeployment, target)
expect(target.map(d => d.bytes32)).toContain(paymentsDeployment.bytes32)
})

it('does nothing when enableDips is false', () => {
const target: SubgraphDeploymentID[] = []
addIndexingPaymentsSubgraphToTarget(false, paymentsDeployment, target)
expect(target).toHaveLength(0)
})

it('does nothing when deployment is undefined', () => {
const target: SubgraphDeploymentID[] = []
addIndexingPaymentsSubgraphToTarget(true, undefined, target)
expect(target).toHaveLength(0)
})

it('does not duplicate an already-present deployment', () => {
const target: SubgraphDeploymentID[] = [paymentsDeployment]
addIndexingPaymentsSubgraphToTarget(true, paymentsDeployment, target)
expect(target).toHaveLength(1)
})
})

describe('reconcileDeployments indexing-payments carve-out wiring', () => {
const paymentsDeployment = new SubgraphDeploymentID(
'QmddSbatCN1XmufoBm1bBwPx4L3FtuMAMHUNNBSPzrgL2a',
)

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const mockLogger: any = {
child: jest.fn().mockReturnThis(),
info: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
trace: jest.fn(),
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function createAgentUnderTest(network: any) {
const agent = Object.create(Agent.prototype)
agent.logger = mockLogger
agent.offchainSubgraphs = []
agent.multiNetworks = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
map: async (fn: any) => Promise.all([fn({ network })]),
}
agent.graphNode = {
subgraphDeploymentsAssignments: jest.fn().mockResolvedValue([]),
ensure: jest.fn().mockResolvedValue(undefined),
pause: jest.fn().mockResolvedValue(undefined),
}
return agent
}

it('schedules the indexing-payments deployment for indexing when DIPS is enabled', async () => {
const agent = createAgentUnderTest({
networkSubgraph: { deployment: undefined },
specification: { indexerOptions: { enableDips: true } },
indexingPaymentsSubgraph: { deployment: { id: paymentsDeployment } },
})

await agent.reconcileDeployments([], [], [])

const ensureCalls = agent.graphNode.ensure.mock.calls
const ensuredDeployments = ensureCalls.map(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(call: any[]) => (call[1] as SubgraphDeploymentID).bytes32,
)
expect(ensuredDeployments).toContain(paymentsDeployment.bytes32)
})

it('does not schedule the indexing-payments deployment when DIPS is disabled', async () => {
const agent = createAgentUnderTest({
networkSubgraph: { deployment: undefined },
specification: { indexerOptions: { enableDips: false } },
indexingPaymentsSubgraph: { deployment: { id: paymentsDeployment } },
})

await agent.reconcileDeployments([], [], [])

expect(agent.graphNode.ensure).not.toHaveBeenCalled()
})

it('does not schedule the indexing-payments deployment when deployment is undefined', async () => {
const agent = createAgentUnderTest({
networkSubgraph: { deployment: undefined },
specification: { indexerOptions: { enableDips: true } },
indexingPaymentsSubgraph: { deployment: undefined },
})

await agent.reconcileDeployments([], [], [])

expect(agent.graphNode.ensure).not.toHaveBeenCalled()
})
})
28 changes: 25 additions & 3 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ const deploymentInList = (
): boolean =>
list.find(item => item.bytes32 === deployment.bytes32) !== undefined

export function addIndexingPaymentsSubgraphToTarget(
enableDips: boolean,
deployment: SubgraphDeploymentID | undefined,
targetDeployments: SubgraphDeploymentID[],
): void {
if (!enableDips || !deployment) return
if (deploymentInList(targetDeployments, deployment)) return
targetDeployments.push(deployment)
}

const deploymentRuleInList = (
list: IndexingRuleAttributes[],
deployment: SubgraphDeploymentID,
Expand Down Expand Up @@ -717,16 +727,17 @@ export class Agent {
throw new Error('DipsManager is not available')
}

await operator.dipsManager.acceptPendingProposals(
activeAllocations,
)
// Proposal acceptance is handled by the dedicated fast loop
// (startProposalAcceptanceLoop), not the reconciliation cycle.

this.logger.debug(
`Matching agreement allocations for network ${network.specification.networkIdentifier}`,
)
await operator.dipsManager.matchAgreementAllocations(
activeAllocations,
)

await operator.dipsManager.collectAgreementPayments()
}
},
)
Expand Down Expand Up @@ -904,6 +915,17 @@ export class Agent {
}
})

// ----------------------------------------------------------------------------------------
// Ensure the indexing-payments subgraph is always indexed when DIPS is enabled
// ----------------------------------------------------------------------------------------
await this.multiNetworks.map(async ({ network }) => {
addIndexingPaymentsSubgraphToTarget(
network.specification.indexerOptions.enableDips,
network.indexingPaymentsSubgraph?.deployment?.id,
targetDeployments,
)
})

// ----------------------------------------------------------------------------------------
// Inspect Deployments and Networks
// ----------------------------------------------------------------------------------------
Expand Down
34 changes: 34 additions & 0 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ export const start = {
type: 'string',
group: 'TAP Subgraph',
})
.option('indexing-payments-subgraph-deployment', {
description:
'Indexing payments subgraph deployment (for local hosting)',
array: false,
type: 'string',
group: 'Indexing Fees ("DIPs")',
})
.option('indexing-payments-subgraph-endpoint', {
description: 'Endpoint to query the indexing payments subgraph from',
array: false,
type: 'string',
group: 'Indexing Fees ("DIPs")',
})
.option('allocate-on-network-subgraph', {
description: 'Whether to allocate to the network subgraph',
type: 'boolean',
Expand Down Expand Up @@ -395,6 +408,15 @@ export const start = {
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-collection-target', {
description:
'Target collection point within the agreement window as a percentage (1-90). ' +
'Lower values collect sooner (safer), higher values collect later (fewer txs).',
type: 'number',
default: 50,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -425,6 +447,13 @@ export const start = {
if (argv['enable-dips'] && !argv['dipper-endpoint']) {
return 'Invalid --dipper-endpoint provided. Must be provided when --enable-dips is true.'
}
if (
argv['enable-dips'] &&
!argv['indexing-payments-subgraph-endpoint'] &&
!argv['indexing-payments-subgraph-deployment']
) {
return 'At least one of --indexing-payments-subgraph-endpoint and --indexing-payments-subgraph-deployment must be provided when --enable-dips is true.'
}
return true
})
},
Expand Down Expand Up @@ -471,6 +500,7 @@ export async function createNetworkSpecification(
ravCollectionInterval: argv.ravCollectionInterval,
ravCheckInterval: argv.ravCheckInterval,
dipsEpochsMargin: argv.dipsEpochsMargin,
dipsCollectionTarget: argv.dipsCollectionTarget,
}

const transactionMonitoring = {
Expand All @@ -497,6 +527,10 @@ export async function createNetworkSpecification(
deployment: argv.tapSubgraphDeployment,
url: argv.tapSubgraphEndpoint,
},
indexingPaymentsSubgraph: {
deployment: argv.indexingPaymentsSubgraphDeployment,
url: argv.indexingPaymentsSubgraphEndpoint,
},
}

const networkProvider = {
Expand Down
11 changes: 2 additions & 9 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ export class AllocationManager {
this.logger,
this.models,
this.network,
this.graphNode,
this,
this.pendingRcaModel,
)
this.dipsManager.startProposalAcceptanceLoop()
}
}

Expand Down Expand Up @@ -1213,15 +1215,6 @@ export class AllocationManager {

await upsertIndexingRule(logger, this.models, neverIndexingRule)

if (this.dipsManager) {
await this.dipsManager.tryCancelAgreement(allocationID)
await this.dipsManager.tryUpdateAgreementAllocation(
allocation.subgraphDeployment.id.toString(),
toAddress(allocationID),
null,
)
}

return {
actionID,
type: 'unallocate',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ export default {
force: boolean
protocolNetwork: string
},
{ logger, models, multiNetworks, actionManager }: IndexerManagementResolverContext,
{ logger, models, multiNetworks }: IndexerManagementResolverContext,
): Promise<CloseAllocationResult> => {
logger.debug('Execute closeAllocation() mutation', {
allocationID: allocation,
Expand Down Expand Up @@ -1762,17 +1762,6 @@ export default {

await models.IndexingRule.upsert(offchainIndexingRule)

const allocationManager =
actionManager?.allocationManagers[network.specification.networkIdentifier]
if (allocationManager?.dipsManager) {
await allocationManager.dipsManager.tryCancelAgreement(allocation)
await allocationManager.dipsManager.tryUpdateAgreementAllocation(
allocationData.subgraphDeployment.id.toString(),
toAddress(allocation),
null,
)
}

// Since upsert succeeded, we _must_ have a rule
const updatedRule = await models.IndexingRule.findOne({
where: { identifier: offchainIndexingRule.identifier },
Expand Down
Loading
Loading