Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ type IndexingAgreement @entity(immutable: false) {
lastUpdatedAt: BigInt!
"Timestamp when agreement was canceled (0 if not canceled)"
canceledAt: BigInt!
"Address that initiated the cancel (zero address if not canceled). Taken from SubgraphService.IndexingAgreementCanceled.canceledOnBehalfOf so operator-initiated cancels are captured correctly."
canceledBy: Bytes!
"Total tokens collected over lifetime"
tokensCollected: BigInt!
"Block number of the latest state change on this agreement (Accepted / Updated / Canceled / RCACollected). Consumers that reconcile state diffs poll with `lastStateChangeBlock_gt` since last seen block."
lastStateChangeBlock: BigInt!
"Fee collection history"
collections: [IndexingFeeCollection!]! @derivedFrom(field: "agreement")
}
Expand Down
8 changes: 7 additions & 1 deletion src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BigInt, Bytes } from '@graphprotocol/graph-ts'
import { Address, BigInt, Bytes } from '@graphprotocol/graph-ts'
import { IndexingAgreement } from '../generated/schema'

export const BIGINT_ZERO = BigInt.fromI32(0)
Expand All @@ -23,7 +23,13 @@ export function createOrLoadIndexingAgreement(agreementId: Bytes): IndexingAgree
agreement.maxSecondsPerCollection = 0
agreement.lastUpdatedAt = BIGINT_ZERO
agreement.canceledAt = BIGINT_ZERO
// Default to 20-byte zero address rather than Bytes.empty(). Graph-node
// serializes empty Bytes on non-nullable fields with unpredictable
// padding (observed as "0x00000000" in practice), which breaks strict
// 20-byte-address parsers on the consumer side.
agreement.canceledBy = Address.zero() as Bytes
agreement.tokensCollected = BIGINT_ZERO
agreement.lastStateChangeBlock = BIGINT_ZERO
}
return agreement
}
9 changes: 8 additions & 1 deletion src/recurringCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export function handleAgreementAccepted(event: AgreementAccepted): void {
agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32()
agreement.canceledAt = BIGINT_ZERO
agreement.tokensCollected = BIGINT_ZERO
agreement.lastStateChangeBlock = event.block.number

agreement.save()
}
Expand All @@ -31,13 +32,17 @@ export function handleAgreementCanceled(event: AgreementCanceled): void {
let agreement = IndexingAgreement.load(event.params.agreementId)
if (agreement == null) return

// canceledBy enum: 0=ServiceProvider, 1=Payer
// canceledBy enum: 0=ServiceProvider, 1=Payer. The actual canceler address
// is written by subgraphService.handleIndexingAgreementCanceled, which
// fires in the same transaction and reads the SubgraphService event's
// canceledOnBehalfOf parameter.
if (event.params.canceledBy == 0) {
agreement.state = 'CanceledByServiceProvider'
} else {
agreement.state = 'CanceledByPayer'
}
agreement.canceledAt = event.params.canceledAt
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand All @@ -51,6 +56,7 @@ export function handleAgreementUpdated(event: AgreementUpdated): void {
agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond
agreement.minSecondsPerCollection = event.params.minSecondsPerCollection.toI32()
agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32()
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand All @@ -60,6 +66,7 @@ export function handleRCACollected(event: RCACollected): void {

agreement.lastCollectionAt = event.block.timestamp
agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens)
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand Down
14 changes: 14 additions & 0 deletions src/subgraphService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ethereum } from '@graphprotocol/graph-ts'
import {
IndexingAgreementAccepted as AcceptedEvent,
IndexingAgreementCanceled as CanceledEvent,
IndexingAgreementUpdated as UpdatedEvent,
IndexingFeesCollectedV1 as FeesCollectedEvent,
} from '../generated/SubgraphService/SubgraphService'
Expand All @@ -19,6 +20,18 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
agreement.tokensPerEntityPerSecond = terms[1].toBigInt()
}

agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

export function handleIndexingAgreementCanceled(event: CanceledEvent): void {
let agreement = createOrLoadIndexingAgreement(event.params.agreementId)
// canceledOnBehalfOf is the actual signer that initiated the cancel. For
// operator-initiated cancels this is the operator, not the payer/indexer
// directly. Dipper's chain_listener compares this to its own signer
// address to decide CanceledByRequester vs CanceledByIndexer.
agreement.canceledBy = event.params.canceledOnBehalfOf
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand All @@ -33,6 +46,7 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
agreement.tokensPerEntityPerSecond = terms[1].toBigInt()
}

agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand Down
2 changes: 2 additions & 0 deletions subgraph.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dataSources:
eventHandlers:
- event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes)
handler: handleIndexingAgreementAccepted
- event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address)
handler: handleIndexingAgreementCanceled
- event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes)
handler: handleIndexingAgreementUpdated
- event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes)
Expand Down
69 changes: 69 additions & 0 deletions tests/subgraphService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { assert, describe, test, clearStore, afterEach } from 'matchstick-as'
import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts'
import {
handleIndexingAgreementAccepted,
handleIndexingAgreementCanceled,
handleIndexingAgreementUpdated,
handleIndexingFeesCollectedV1,
} from '../src/subgraphService'
import {
IndexingAgreementAccepted as AcceptedEvent,
IndexingAgreementCanceled as CanceledEvent,
IndexingAgreementUpdated as UpdatedEvent,
IndexingFeesCollectedV1 as FeesCollectedEvent,
} from '../generated/SubgraphService/SubgraphService'
Expand Down Expand Up @@ -46,6 +48,27 @@ function createAcceptedEvent(
return event
}

function createCanceledEvent(
indexer: Address,
payer: Address,
agreementId: Bytes,
canceledOnBehalfOf: Address,
): CanceledEvent {
let event = changetype<CanceledEvent>(newMockEvent())

event.parameters = new Array()
event.parameters.push(new ethereum.EventParam('indexer', ethereum.Value.fromAddress(indexer)))
event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(payer)))
event.parameters.push(
new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)),
)
event.parameters.push(
new ethereum.EventParam('canceledOnBehalfOf', ethereum.Value.fromAddress(canceledOnBehalfOf)),
)

return event
}

function createUpdatedEvent(
indexer: Address,
payer: Address,
Expand Down Expand Up @@ -153,6 +176,7 @@ describe('handleIndexingAgreementAccepted', () => {
1,
versionTerms,
)
event.block.number = BigInt.fromI32(100)
handleIndexingAgreementAccepted(event)

assert.entityCount('IndexingAgreement', 1)
Expand All @@ -175,11 +199,49 @@ describe('handleIndexingAgreementAccepted', () => {
'tokensPerEntityPerSecond',
'50',
)
assert.fieldEquals(
'IndexingAgreement',
agreementId.toHexString(),
'lastStateChangeBlock',
'100',
)
// State remains NotAccepted until RC handler fires
assert.fieldEquals('IndexingAgreement', agreementId.toHexString(), 'state', 'NotAccepted')
})
})

describe('handleIndexingAgreementCanceled', () => {
afterEach(() => {
clearStore()
})

test('sets canceledBy to canceledOnBehalfOf and stamps lastStateChangeBlock', () => {
let indexer = Address.fromString('0x0000000000000000000000000000000000000001')
let payer = Address.fromString('0x0000000000000000000000000000000000000002')
let agreementId = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10')
// Operator address, distinct from payer/indexer, to prove the handler
// captures whoever actually initiated the cancel rather than inferring it.
let operator = Address.fromString('0x000000000000000000000000000000000000000a')

let event = createCanceledEvent(indexer, payer, agreementId, operator)
event.block.number = BigInt.fromI32(200)
handleIndexingAgreementCanceled(event)

assert.fieldEquals(
'IndexingAgreement',
agreementId.toHexString(),
'canceledBy',
operator.toHexString(),
)
assert.fieldEquals(
'IndexingAgreement',
agreementId.toHexString(),
'lastStateChangeBlock',
'200',
)
})
})

describe('handleIndexingAgreementUpdated', () => {
afterEach(() => {
clearStore()
Expand Down Expand Up @@ -216,6 +278,7 @@ describe('handleIndexingAgreementUpdated', () => {
1,
newVersionTerms,
)
updateEvent.block.number = BigInt.fromI32(300)
handleIndexingAgreementUpdated(updateEvent)

assert.entityCount('IndexingAgreement', 1)
Expand All @@ -232,6 +295,12 @@ describe('handleIndexingAgreementUpdated', () => {
'tokensPerEntityPerSecond',
'100',
)
assert.fieldEquals(
'IndexingAgreement',
agreementId.toHexString(),
'lastStateChangeBlock',
'300',
)
})
})

Expand Down
Loading