Skip to content

Commit 515f477

Browse files
authored
index escrow events (#1378)
* index escrow events * fix pr comments * test fix * offset & size * add docs * check type, strip backticks * address pr comments
1 parent 5f476a5 commit 515f477

20 files changed

Lines changed: 961 additions & 6 deletions

docs/API.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,61 @@ updates node configuration and reloads it gracefully (admin only)
13291329

13301330
---
13311331

1332+
## Get Escrow Events
1333+
1334+
### `HTTP` GET /api/services/escrow/events?
1335+
1336+
### `HTTP` POST /directCommand
1337+
1338+
### `P2P` command: getEscrowEvents
1339+
1340+
#### Description
1341+
1342+
Returns indexed Escrow contract events. The indexer matches Escrow logs by topic hash, verifies they came from the chain's `Escrow` contract (`Deposit`/`Withdraw`/`Lock` are generic signatures), and stores one row per event in the append-only `escrow` collection keyed by `${txHash}-${logIndex}`. All filters are optional.
1343+
1344+
#### Parameters
1345+
1346+
| name | type | required | description |
1347+
| --------- | ------ | --------- | --------------------------------------------------------- |
1348+
| command | string | POST only | command name (`getEscrowEvents`) |
1349+
| chainId | number | | chain id |
1350+
| eventType | string | | one of `Auth, Lock, Claimed, Canceled, Deposit, Withdraw` |
1351+
| payer | string | | payer address (case-insensitive) |
1352+
| payee | string | | payee address (case-insensitive) |
1353+
| token | string | | token address (case-insensitive) |
1354+
| jobId | string | | compute job id |
1355+
| txId | string | | transaction hash |
1356+
| offset | number | | rows to skip (default 0) |
1357+
| size | number | | page size (default 100, max 250) |
1358+
1359+
#### Request (POST /directCommand)
1360+
1361+
```json
1362+
{ "command": "getEscrowEvents", "chainId": 8996, "eventType": "Deposit", "offset": 0, "size": 50 }
1363+
```
1364+
1365+
#### Response
1366+
1367+
Every row has `id, eventType, chainId, contract, block, txHash` plus event-specific fields (`payer, payee, token, jobId, amount, expiry, proof, maxLockedAmount, maxLockSeconds, maxLockCounts`).
1368+
1369+
```json
1370+
[
1371+
{
1372+
"id": "0x39f3...6575-3",
1373+
"eventType": "Deposit",
1374+
"chainId": 8996,
1375+
"contract": "0x282d...a1a1",
1376+
"block": 55,
1377+
"txHash": "0x39f3...6575",
1378+
"payer": "0xbe54...ab5e",
1379+
"token": "0x282d...a1a1",
1380+
"amount": "100000000000000000000"
1381+
}
1382+
]
1383+
```
1384+
1385+
---
1386+
13321387
# Compute
13331388

13341389
For starters, you can find a list of algorithms in the [Ocean Algorithms repository](https://github.com/oceanprotocol/algo_dockers) and the docker images in the [Algo Dockerhub](https://hub.docker.com/r/oceanprotocol/algo_dockers/tags).

docs/Arhitecture.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ An off-chain, multi-chain metadata & chain events cache. It continually monitors
8080
- validates DDO, according to multiple SHACL schemas
8181
- provides proof for valid DDOs
8282
- monitors datatokens contracts & stores orders
83+
- monitors the Escrow contract events (Auth, Lock, Claimed, Canceled, Deposit, Withdraw) and stores them for querying
8384
- allows querys for all the above
8485
- supports graceful shutdown and chain-specific reindexing
8586

src/@types/Escrow.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,22 @@ export interface EscrowLock {
1414
expiry: BigInt
1515
token: string
1616
}
17+
18+
export interface EscrowEvent {
19+
id: string
20+
eventType: string
21+
chainId: number
22+
contract: string
23+
block: number
24+
txHash: string
25+
payer?: string
26+
payee?: string
27+
token?: string
28+
jobId?: string
29+
amount?: string
30+
expiry?: string
31+
proof?: string
32+
maxLockedAmount?: string
33+
maxLockSeconds?: string
34+
maxLockCounts?: string
35+
}

src/@types/commands.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,18 @@ export interface QueryCommand extends Command {
106106
maxResultsPerPage?: number
107107
pageNumber?: number
108108
}
109+
110+
export interface GetEscrowEventsCommand extends Command {
111+
chainId?: number
112+
eventType?: string
113+
payer?: string
114+
payee?: string
115+
token?: string
116+
jobId?: string
117+
txId?: string
118+
offset?: number
119+
size?: number
120+
}
109121
export interface ReindexCommand extends Command {
110122
txId: string
111123
chainId: number

src/components/Indexer/processor.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
NewAccessListEventProcessor,
2121
AddressAddedEventProcessor,
2222
AddressRemovedEventProcessor,
23+
EscrowEventProcessor,
2324
ProcessorConstructor
2425
} from './processors/index.js'
2526
import { findEventByKey } from './utils.js'
@@ -42,7 +43,13 @@ const EVENT_PROCESSOR_MAP: Record<string, ProcessorConstructor> = {
4243
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor,
4344
[EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor,
4445
[EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor,
45-
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor
46+
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor,
47+
[EVENTS.ESCROW_AUTH]: EscrowEventProcessor,
48+
[EVENTS.ESCROW_LOCK]: EscrowEventProcessor,
49+
[EVENTS.ESCROW_CLAIMED]: EscrowEventProcessor,
50+
[EVENTS.ESCROW_CANCELED]: EscrowEventProcessor,
51+
[EVENTS.ESCROW_DEPOSIT]: EscrowEventProcessor,
52+
[EVENTS.ESCROW_WITHDRAW]: EscrowEventProcessor
4653
}
4754

4855
const processorInstances = new Map<string, BaseEventProcessor>()
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
2+
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
3+
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
4+
import { BaseEventProcessor } from './BaseProcessor.js'
5+
import { getContractAddress } from '../utils.js'
6+
import { EVENTS } from '../../../utils/constants.js'
7+
import { EscrowEvent } from '../../../@types/Escrow.js'
8+
import { OceanNodeConfig } from '../../../@types/OceanNode.js'
9+
import EscrowJson from '@oceanprotocol/contracts/artifacts/contracts/escrow/Escrow.sol/Escrow.json' with { type: 'json' }
10+
11+
const escrowInterface = new Interface(EscrowJson.abi)
12+
13+
const addr = (v: any): string => v?.toString().toLowerCase()
14+
const num = (v: any): string => v?.toString()
15+
16+
export class EscrowEventProcessor extends BaseEventProcessor {
17+
private readonly escrowAddress: string
18+
19+
constructor(chainId: number, config: OceanNodeConfig) {
20+
super(chainId, config)
21+
this.escrowAddress = getContractAddress(chainId, 'Escrow')
22+
}
23+
24+
async processEvent(
25+
event: ethers.Log,
26+
chainId: number,
27+
signer: Signer,
28+
provider: FallbackProvider,
29+
eventName?: string
30+
): Promise<any> {
31+
try {
32+
if (
33+
!this.escrowAddress ||
34+
event.address.toLowerCase() !== this.escrowAddress.toLowerCase()
35+
) {
36+
return null
37+
}
38+
39+
const decoded = escrowInterface.parseLog({
40+
topics: Array.from(event.topics),
41+
data: event.data
42+
})
43+
if (!decoded) return null
44+
45+
const { args } = decoded
46+
if (!eventName) return null
47+
const record: EscrowEvent = {
48+
id: `${event.transactionHash}-${event.index}`,
49+
eventType: eventName,
50+
chainId,
51+
contract: event.address.toLowerCase(),
52+
block: event.blockNumber,
53+
txHash: event.transactionHash
54+
}
55+
56+
switch (eventName) {
57+
case EVENTS.ESCROW_AUTH:
58+
record.payer = addr(args.payer)
59+
record.payee = addr(args.payee)
60+
record.maxLockedAmount = num(args.maxLockedAmount)
61+
record.maxLockSeconds = num(args.maxLockSeconds)
62+
record.maxLockCounts = num(args.maxLockCounts)
63+
break
64+
case EVENTS.ESCROW_LOCK:
65+
record.payer = addr(args.payer)
66+
record.payee = addr(args.payee)
67+
record.jobId = num(args.jobId)
68+
record.amount = num(args.amount)
69+
record.expiry = num(args.expiry)
70+
record.token = addr(args.token)
71+
break
72+
case EVENTS.ESCROW_CLAIMED:
73+
record.payee = addr(args.payee)
74+
record.jobId = num(args.jobId)
75+
record.token = addr(args.token)
76+
record.payer = addr(args.payer)
77+
record.amount = num(args.amount)
78+
record.proof = args.proof?.toString()
79+
break
80+
case EVENTS.ESCROW_CANCELED:
81+
record.payee = addr(args.payee)
82+
record.jobId = num(args.jobId)
83+
record.token = addr(args.token)
84+
record.payer = addr(args.payer)
85+
record.amount = num(args.amount)
86+
break
87+
case EVENTS.ESCROW_DEPOSIT:
88+
case EVENTS.ESCROW_WITHDRAW:
89+
record.payer = addr(args.payer)
90+
record.token = addr(args.token)
91+
record.amount = num(args.amount)
92+
break
93+
default:
94+
return null
95+
}
96+
97+
const { escrow } = await this.getDatabase()
98+
if (!escrow) return null
99+
const result = await escrow.create(record)
100+
INDEXER_LOGGER.logMessage(
101+
`[Escrow] ${eventName} indexed for tx ${event.transactionHash} on chain ${chainId}`
102+
)
103+
return result
104+
} catch (err) {
105+
INDEXER_LOGGER.log(
106+
LOG_LEVELS_STR.LEVEL_ERROR,
107+
`Error processing Escrow ${eventName} event: ${err.message}`,
108+
true
109+
)
110+
return null
111+
}
112+
}
113+
}

src/components/Indexer/processors/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export * from './OrderStartedEventProcessor.js'
1515
export * from './NewAccessListEventProcessor.js'
1616
export * from './AddressAddedEventProcessor.js'
1717
export * from './AddressRemovedEventProcessor.js'
18+
export * from './EscrowEventProcessor.js'
1819
export * from './BaseProcessor.js'
1920

2021
export type ProcessorConstructor = new (

src/components/core/handler/coreHandlersRegistry.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import {
5656
PersistentStorageUploadFileHandler
5757
} from './persistentStorage.js'
5858
import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js'
59+
import { EscrowEventsHandler } from './escrowHandler.js'
5960

6061
export type HandlerRegistry = {
6162
handlerName: string // name of the handler
@@ -208,6 +209,10 @@ export class CoreHandlersRegistry {
208209
PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST,
209210
new SearchAccessListHandler(node)
210211
)
212+
this.registerCoreHandler(
213+
PROTOCOL_COMMANDS.GET_ESCROW_EVENTS,
214+
new EscrowEventsHandler(node)
215+
)
211216
}
212217

213218
public static getInstance(
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { CommandHandler } from './handler.js'
2+
import { GetEscrowEventsCommand } from '../../../@types/commands.js'
3+
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
4+
import { Readable } from 'stream'
5+
import {
6+
ValidateParams,
7+
validateCommandParameters
8+
} from '../../httpRoutes/validateCommands.js'
9+
import { CORE_LOGGER } from '../../../utils/logging/common.js'
10+
import { ESCROW_EVENTS } from '../../../utils/constants.js'
11+
12+
export class EscrowEventsHandler extends CommandHandler {
13+
validate(command: GetEscrowEventsCommand): ValidateParams {
14+
if (command.eventType && !ESCROW_EVENTS.includes(command.eventType)) {
15+
return {
16+
valid: false,
17+
status: 400,
18+
reason: `eventType must be one of: ${ESCROW_EVENTS.join(', ')}`
19+
}
20+
}
21+
return validateCommandParameters(command, [])
22+
}
23+
24+
async handle(task: GetEscrowEventsCommand): Promise<P2PCommandResponse> {
25+
const validationResponse = await this.verifyParamsAndRateLimits(task)
26+
if (this.shouldDenyTaskHandling(validationResponse)) {
27+
return validationResponse
28+
}
29+
try {
30+
const database = await this.getOceanNode().getDatabase()
31+
if (!database || !database.escrow) {
32+
CORE_LOGGER.error('Escrow database is not available')
33+
return {
34+
stream: null,
35+
status: { httpStatus: 503, error: 'Escrow database is not available' }
36+
}
37+
}
38+
39+
const filters: Record<string, any> = {
40+
chainId: task.chainId,
41+
eventType: task.eventType,
42+
payer: typeof task.payer === 'string' ? task.payer.toLowerCase() : undefined,
43+
payee: typeof task.payee === 'string' ? task.payee.toLowerCase() : undefined,
44+
token: typeof task.token === 'string' ? task.token.toLowerCase() : undefined,
45+
jobId: task.jobId,
46+
txHash: task.txId
47+
}
48+
49+
let result = await database.escrow.search(filters, task.offset, task.size)
50+
if (!result) {
51+
result = []
52+
}
53+
return {
54+
stream: Readable.from(JSON.stringify(result)),
55+
status: { httpStatus: 200 }
56+
}
57+
} catch (error) {
58+
CORE_LOGGER.error(`Error in EscrowEventsHandler: ${error.message}`)
59+
return {
60+
stream: null,
61+
status: { httpStatus: 500, error: 'Unknown error: ' + error.message }
62+
}
63+
}
64+
}
65+
}

src/components/database/BaseDatabase.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Schema } from '.'
22
import { OceanNodeDBConfig } from '../../@types'
33
import { AccessListUser } from '../../@types/AccessList.js'
4+
import { EscrowEvent } from '../../@types/Escrow.js'
45
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
56
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
67
import { ElasticsearchSchema } from './ElasticSchemas.js'
@@ -152,6 +153,28 @@ export abstract class AbstractOrderDatabase {
152153
abstract delete(orderId: string): Promise<any>
153154
}
154155

156+
export abstract class AbstractEscrowDatabase {
157+
protected config: OceanNodeDBConfig
158+
protected schema: Schema
159+
160+
constructor(config: OceanNodeDBConfig, schema: Schema) {
161+
this.config = config
162+
this.schema = schema
163+
}
164+
165+
abstract create(event: EscrowEvent): Promise<any>
166+
167+
abstract retrieve(id: string): Promise<Record<string, any> | null>
168+
169+
abstract search(
170+
filters: Record<string, any>,
171+
offset?: number,
172+
size?: number
173+
): Promise<Record<string, any>[] | null>
174+
175+
abstract delete(id: string): Promise<any>
176+
}
177+
155178
export abstract class AbstractDdoDatabase {
156179
protected config: OceanNodeDBConfig
157180
protected schemas: Schema[]

0 commit comments

Comments
 (0)