Skip to content

Commit 36670c2

Browse files
MoonBoi9001claude
andcommitted
feat(subgraph): restore immutable transition log entities
PR #5 replaces the immutable IndexingAgreementAccepted/Canceled/Updated event-log entities with a mutable aggregated IndexingAgreement. Current-state queries improve, but event-sourcing consumers break: dipper's chain_listener polls indexingAgreementAccepteds/indexingAgreementCanceleds since-block to replay state transitions into its local DB. Restore the logs alongside the aggregated entity so snapshot and streaming consumers share one subgraph. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dafb4e1 commit 36670c2

4 files changed

Lines changed: 146 additions & 1 deletion

File tree

schema.graphql

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,46 @@ type IndexerDeploymentLatest @entity(immutable: false) {
7272
blockTimestamp: BigInt!
7373
}
7474

75+
# Immutable transition logs. The aggregated IndexingAgreement entity above
76+
# serves current-state queries; these append-only logs serve event-sourcing
77+
# consumers (e.g. dipper's chain_listener polls them since-block to replay
78+
# state transitions into its local DB). Preserving both views lets snapshot
79+
# and streaming consumers share one subgraph.
80+
type IndexingAgreementAccepted @entity(immutable: true) {
81+
id: ID!
82+
indexer: Bytes!
83+
payer: Bytes!
84+
agreementId: Bytes!
85+
allocationId: Bytes!
86+
blockNumber: BigInt!
87+
blockTimestamp: BigInt!
88+
transactionHash: Bytes!
89+
}
90+
91+
type IndexingAgreementCanceled @entity(immutable: true) {
92+
id: ID!
93+
indexer: Bytes!
94+
payer: Bytes!
95+
agreementId: Bytes!
96+
"Address that initiated the cancel (from SubgraphService canceledOnBehalfOf param)"
97+
canceledBy: Bytes!
98+
blockNumber: BigInt!
99+
blockTimestamp: BigInt!
100+
transactionHash: Bytes!
101+
}
102+
103+
type IndexingAgreementUpdated @entity(immutable: true) {
104+
id: ID!
105+
indexer: Bytes!
106+
payer: Bytes!
107+
agreementId: Bytes!
108+
allocationId: Bytes!
109+
version: Int!
110+
blockNumber: BigInt!
111+
blockTimestamp: BigInt!
112+
transactionHash: Bytes!
113+
}
114+
75115
# Immutable log of every OfferStored event emitted by RecurringCollector.
76116
# OFFER_TYPE_NEW = 0, OFFER_TYPE_UPDATE = 1.
77117
type OfferStored @entity(immutable: true) {

src/subgraphService.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import { ethereum } from '@graphprotocol/graph-ts'
22
import {
33
IndexingAgreementAccepted as AcceptedEvent,
4+
IndexingAgreementCanceled as CanceledEvent,
45
IndexingAgreementUpdated as UpdatedEvent,
56
IndexingFeesCollectedV1 as FeesCollectedEvent,
67
} from '../generated/SubgraphService/SubgraphService'
7-
import { IndexerDeploymentLatest, IndexingFeeCollection } from '../generated/schema'
8+
import {
9+
IndexerDeploymentLatest,
10+
IndexingFeeCollection,
11+
IndexingAgreementAccepted,
12+
IndexingAgreementCanceled,
13+
IndexingAgreementUpdated,
14+
} from '../generated/schema'
815
import { createOrLoadIndexingAgreement } from './helpers'
916

1017
export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
@@ -20,6 +27,34 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
2027
}
2128

2229
agreement.save()
30+
31+
let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
32+
let log = new IndexingAgreementAccepted(logId)
33+
log.indexer = event.params.indexer
34+
log.payer = event.params.payer
35+
log.agreementId = event.params.agreementId
36+
log.allocationId = event.params.allocationId
37+
log.blockNumber = event.block.number
38+
log.blockTimestamp = event.block.timestamp
39+
log.transactionHash = event.transaction.hash
40+
log.save()
41+
}
42+
43+
export function handleIndexingAgreementCanceled(event: CanceledEvent): void {
44+
// State and canceledAt are set by RecurringCollector.handleAgreementCanceled,
45+
// which emits the canonical cancel event with the canceledBy enum. This
46+
// handler just writes the immutable transition log for event-sourcing
47+
// consumers (dipper's chain_listener).
48+
let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
49+
let log = new IndexingAgreementCanceled(logId)
50+
log.indexer = event.params.indexer
51+
log.payer = event.params.payer
52+
log.agreementId = event.params.agreementId
53+
log.canceledBy = event.params.canceledOnBehalfOf
54+
log.blockNumber = event.block.number
55+
log.blockTimestamp = event.block.timestamp
56+
log.transactionHash = event.transaction.hash
57+
log.save()
2358
}
2459

2560
export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
@@ -34,6 +69,18 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
3469
}
3570

3671
agreement.save()
72+
73+
let logId = event.transaction.hash.concatI32(event.logIndex.toI32()).toHexString()
74+
let log = new IndexingAgreementUpdated(logId)
75+
log.indexer = event.params.indexer
76+
log.payer = event.params.payer
77+
log.agreementId = event.params.agreementId
78+
log.allocationId = event.params.allocationId
79+
log.version = event.params.version
80+
log.blockNumber = event.block.number
81+
log.blockTimestamp = event.block.timestamp
82+
log.transactionHash = event.transaction.hash
83+
log.save()
3784
}
3885

3986
export function handleIndexingFeesCollectedV1(event: FeesCollectedEvent): void {

subgraph.template.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@ dataSources:
1919
- IndexingAgreement
2020
- IndexingFeeCollection
2121
- IndexerDeploymentLatest
22+
- IndexingAgreementAccepted
23+
- IndexingAgreementCanceled
24+
- IndexingAgreementUpdated
2225
abis:
2326
- name: SubgraphService
2427
file: ./abis/SubgraphService.json
2528
eventHandlers:
2629
- event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes)
2730
handler: handleIndexingAgreementAccepted
31+
- event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address)
32+
handler: handleIndexingAgreementCanceled
2833
- event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes)
2934
handler: handleIndexingAgreementUpdated
3035
- event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes)

tests/subgraphService.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ import { assert, describe, test, clearStore, afterEach } from 'matchstick-as'
22
import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts'
33
import {
44
handleIndexingAgreementAccepted,
5+
handleIndexingAgreementCanceled,
56
handleIndexingAgreementUpdated,
67
handleIndexingFeesCollectedV1,
78
} from '../src/subgraphService'
89
import {
910
IndexingAgreementAccepted as AcceptedEvent,
11+
IndexingAgreementCanceled as CanceledEvent,
1012
IndexingAgreementUpdated as UpdatedEvent,
1113
IndexingFeesCollectedV1 as FeesCollectedEvent,
1214
} from '../generated/SubgraphService/SubgraphService'
@@ -46,6 +48,30 @@ function createAcceptedEvent(
4648
return event
4749
}
4850

51+
function createCanceledEvent(
52+
indexer: Address,
53+
payer: Address,
54+
agreementId: Bytes,
55+
canceledOnBehalfOf: Address,
56+
): CanceledEvent {
57+
let event = changetype<CanceledEvent>(newMockEvent())
58+
59+
event.parameters = new Array()
60+
event.parameters.push(new ethereum.EventParam('indexer', ethereum.Value.fromAddress(indexer)))
61+
event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(payer)))
62+
event.parameters.push(
63+
new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)),
64+
)
65+
event.parameters.push(
66+
new ethereum.EventParam(
67+
'canceledOnBehalfOf',
68+
ethereum.Value.fromAddress(canceledOnBehalfOf),
69+
),
70+
)
71+
72+
return event
73+
}
74+
4975
function createUpdatedEvent(
5076
indexer: Address,
5177
payer: Address,
@@ -177,6 +203,29 @@ describe('handleIndexingAgreementAccepted', () => {
177203
)
178204
// State remains NotAccepted until RC handler fires
179205
assert.fieldEquals('IndexingAgreement', agreementId.toHexString(), 'state', 'NotAccepted')
206+
207+
// Immutable transition log emitted for event-sourcing consumers
208+
assert.entityCount('IndexingAgreementAccepted', 1)
209+
})
210+
})
211+
212+
describe('handleIndexingAgreementCanceled', () => {
213+
afterEach(() => {
214+
clearStore()
215+
})
216+
217+
test('emits immutable IndexingAgreementCanceled log with canceledBy address', () => {
218+
let indexer = Address.fromString('0x0000000000000000000000000000000000000001')
219+
let payer = Address.fromString('0x0000000000000000000000000000000000000002')
220+
let agreementId = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10')
221+
let canceledOnBehalfOf = Address.fromString('0x0000000000000000000000000000000000000004')
222+
223+
let event = createCanceledEvent(indexer, payer, agreementId, canceledOnBehalfOf)
224+
handleIndexingAgreementCanceled(event)
225+
226+
assert.entityCount('IndexingAgreementCanceled', 1)
227+
// Aggregated entity state is owned by the RC handler — SS handler must not create one
228+
assert.entityCount('IndexingAgreement', 0)
180229
})
181230
})
182231

@@ -232,6 +281,10 @@ describe('handleIndexingAgreementUpdated', () => {
232281
'tokensPerEntityPerSecond',
233282
'100',
234283
)
284+
285+
// One log per event: the accept and the update each emit their own immutable log
286+
assert.entityCount('IndexingAgreementAccepted', 1)
287+
assert.entityCount('IndexingAgreementUpdated', 1)
235288
})
236289
})
237290

0 commit comments

Comments
 (0)