Add tx filters to indexer.GetSubscription#1074
Conversation
…onnection # Conflicts: # api-spec/protobuf/gen/ark/v1/indexer.pb.go
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🔍 Aggressive Code Review — #1074
Reviewer: Arkana (automated, protocol-aware)
Summary
This PR adds CEL-based transaction filters to GetSubscription, introduces a new single-connection subscription flow (empty subscription_id → server creates inline), adds UpdateSubscription RPC, and adds a SubscriptionStartedEvent to the stream response. The proto changes, handler logic, CEL environment, and test coverage are generally well-structured. However, there are security and correctness issues that must be addressed before merge.
🔴 CRITICAL — CEL DoS: No limits on expression count or evaluation cost
Files: broker.go:102-177, indexer.go:532-706
The addTxFilters / overwriteTxFilters broker methods accept unbounded []string expression lists. A malicious client can:
- Memory exhaustion: Call
UpdateSubscriptionrepeatedly withModify.AddExpressions, growinglistener.txFilterswithout bound. Each compiledcel.Programis ~1-10KB. - CPU exhaustion:
matchesTx()(broker.go:148-166) evaluates ALL programs for EVERY transaction event for EVERY listener. With N listeners × M expressions × T transactions/sec, this isO(N·M·T)CEL evaluations. - No evaluation timeout:
celenv.Eval(tx_env.go:48-58) has no context/timeout. A pathological expression (e.g., deeply nested string operations on large packet hex data) blocks the singlelistenToTxEventsgoroutine, stalling ALL subscriptions.
Required fixes:
- Add a per-subscription expression limit (e.g.,
maxTxFiltersPerListener = 64). - Add a per-call limit on expressions in a single
UpdateSubscriptionrequest. - Consider adding
cel.EvalOption(cel.OptTrackCost)+cel.CostLimit()to cap evaluation cost.
🔴 HIGH — activation() called once per program, not once per tx
File: broker.go:148-166, tx_env.go:47-58
matchesTx calls celenv.Eval(prg, tx) in a loop, and Eval calls activation(tx) every time. activation() parses the OP_RETURN extension and serializes every packet to hex — this is non-trivial work. For a listener with M expressions, this runs M times per event instead of once.
// broker.go:162 — this loop calls Eval → activation(tx) for each program
for _, prg := range programs {
ok, err := celenv.Eval(prg, tx)Fix: Compute the activation map once outside the loop and pass it to a new EvalWithActivation(prg, act) function. This changes O(M·E) to O(M + E) where E = extension parse cost.
🟡 MEDIUM — Silent error swallowing in matchesTx
File: broker.go:162-165
ok, err := celenv.Eval(prg, tx)
if err == nil && ok {
return true
}CEL evaluation errors are silently ignored. If an expression panics, returns a non-bool, or hits an unexpected type, this silently drops the match. At minimum, log at debug level so operators can detect misbehaving expressions.
🟡 MEDIUM — OpenAPI spec has duplicate parameter names
File: indexer.openapi.json:25-46, indexer.openapi.json:136-157
filter.scripts.modify.removeScripts appears twice in both endpoint definitions. Same for filter.txs.modify.removeExpressions. This is likely a protobuf→OpenAPI codegen quirk (the add_* and remove_* fields map to the same query param name due to the parent modify wrapper), but it will confuse REST clients and may cause undefined behavior in OpenAPI code generators.
Fix: Verify the codegen output. If this is a codegen bug, file upstream. If hand-maintained, deduplicate.
🟡 MEDIUM — ScriptsFilterResult.Added/Removed returns raw input, not canonical form
File: indexer.go:541-549
return &arkv1.UpdateSubscriptionResponse{
Result: &arkv1.UpdateSubscriptionResponse_Scripts{
Scripts: &arkv1.ScriptsFilterResult{
Added: modify.GetAddScripts(), // raw input
Removed: modify.GetRemoveScripts(), // raw input
All: h.scriptSubsHandler.getTopics(subscriptionID), // formatted
},
},
}Added and Removed return the raw user input, but All returns formatTopic()-processed scripts (from getTopics). A client comparing Added entries against All entries may find mismatches if the input format differs from the stored format. Return the parsed/canonical form for all three fields.
🟡 MEDIUM — Fragile error string matching
File: indexer.go:464-467, indexer.go:570-573
if strings.Contains(err.Error(), "listener not found") {
return status.Error(codes.NotFound, err.Error())
}And:
if strings.Contains(err.Error(), "not found") {
return nil, status.Error(codes.NotFound, err.Error())
}Error classification via substring matching is brittle. If broker error messages change, these will silently produce wrong gRPC status codes. Define sentinel errors in the broker package and use errors.Is().
🟢 LOW — parseTxOnce is safe but could use sync.OnceValue
File: indexer.go:800-817
The parseTxOnce closure with parsedTxAttempted bool works correctly because listenToTxEvents processes events sequentially. However, sync.OnceValue from Go 1.21+ would make the intent clearer and be future-proof if this ever becomes concurrent.
🟢 LOW — UpdateSubscription permission is write, SubscribeForScripts is write
File: permissions.go:258-262
Both are write on EntityIndexer, which is consistent. ✅
🟢 Cross-repo Impact
Two test repos (layerzero-usdt0-arkade-demo, introspector-review) use the old two-step subscription flow (SubscribeForScripts → GetSubscription(id)). The old flow is preserved — the behavior change only applies when subscription_id is empty. The new SubscriptionStartedEvent oneof variant in GetSubscriptionResponse is additive and won't break existing gRPC clients (they'll ignore unrecognized oneof values). REST/SSE clients that don't handle the new variant should be checked.
No SDK repos found with direct indexer subscription consumers, but the proto changes will propagate on next SDK codegen. No breaking change for existing consumers.
🟢 Protocol Safety
This PR does NOT touch VTXO handling, transaction signing, forfeit paths, round lifecycle, connector trees, or exit paths. The CEL filters are read-only event filtering in the indexer subscription layer — they cannot modify transaction processing. No protocol-critical flag needed.
✅ Test Coverage
Excellent test coverage: 1428 lines of new tests covering:
- New flow (empty subscription_id): started event, script events, cleanup, heartbeat, invalid input
- Old flow (existing subscription_id): preserved behavior, timeout, cleanup
UpdateSubscription: overwrite, modify, validation, NotFound, idempotencyTxFilter: CEL compilation, evaluation, OR semantics, error resilience, atomicitycelenvpackage: compile rejection, eval correctness, issue examples
Missing test: no test for max-expression-count limits (because the limit doesn't exist yet — see critical finding above).
Verdict: Request Changes
The CEL DoS vector (unbounded expressions, no eval cost limit, no timeout) is the blocking issue. The activation() per-program overhead is a significant perf bug. The other items are medium/low severity.
🤖 Reviewed by Arkana
There was a problem hiding this comment.
🔍 Incremental Review — new commit 47c2f5c8 ("go mod tidy")
Only change since my last review: go.mod removes // indirect from github.com/google/cel-go v0.26.1, correctly marking it as a direct dependency now that pkg/ark-lib/indexer/celenv/ imports it. ✅ Trivial and correct.
All findings from my previous review still apply. The critical items remain unaddressed:
- 🔴 CEL DoS — no per-subscription expression limit, no eval cost cap
- 🔴 Redundant
activation()call — parsed once per program instead of once per tx - 🟡 Silent error swallowing in
matchesTx - 🟡 Fragile error string matching — use sentinel errors
- 🟡 OpenAPI duplicate parameters
- 🟡 Raw vs canonical form mismatch in filter results
Keeping my request-changes status until the critical items are resolved.
🤖 Reviewed by Arkana
closes #1072
forked from
bob/stream-single-connectionwhich is up for review: #951Adds CEL-based filtering of indexed transactions to
IndexerService.GetSubscription/UpdateSubscription, complementing the existing script topic filter. A tx event is now dispatched to a listener when any of the listener's script topics match or when any of its CEL tx expressions evaluates to true.Supported expression shapes (per the issue):
has(tx.extension)— true when the tx carries an ARK OP_RETURN extensionhasPacket(tx.extension, <packetType>)— true when a specific packet type is presenttx.extension[<packetType>].contains("<hex>")— substring match against the packet's hex-encoded bodyPrograms are compiled once at subscription-update time (invalid CEL →
InvalidArgument, listener state untouched) and re-evaluated against each indexed tx envelope on the event hot path.Changes
Area:
api-spec/protobuf/ark/v1/indexer.protoWhat changed: New messages
TxFilter,ModifyTxFilters,OverwriteTxFilters,TxFilterResult. ExtendedSubscriptionFilter.filteroneof withtxs = 2andUpdateSubscriptionResponse.resultoneof withtxs = 2. Backward compatible: additive only.────────────────────────────────────────
Area:
pkg/ark-lib/indexer/celenv/(new package)What changed:
txCEL env declaringtx.extensionas a dynamic map (runtime:map[int64]stringof packet type → hex-encoded packet bytes),hasPacketcustom function,Compile()(rejects non-bool expressions),Eval()building the activation fromextension.NewExtensionFromTx.tx.extensionis only populated when the tx actually carries an extension, sohas(tx.extension)is a meaningful guard.────────────────────────────────────────
Area:
internal/interface/grpc/handlers/broker.goWhat changed: Listener now holds
txFilters map[string]cel.Programkeyed by the original expression string. Broker methodsaddTxFilters/removeTxFilters/overwriteTxFilters/getTxFiltersmirror the scripts API.compileTxFiltersvalidates all expressions before mutating listener state (atomic).matchesTxtakes a lazygetTxthunk — listeners with no filters skip the decode entirely.────────────────────────────────────────
Area:
internal/interface/grpc/handlers/indexer.goWhat changed:
applyTxFiltermirrorsapplyScriptsFilter. BothGetSubscription's initial-filter dispatch andUpdateSubscriptionroute the newSubscriptionFilter_Txscase to it.listenToTxEventslazily decodesevent.Txonce per event (only if at least one listener has tx filters) and dispatches onscriptMatch || txMatch.────────────────────────────────────────
Area:
internal/interface/grpc/handlers/indexer_test.goWhat changed: New
TestTxFilter(17 passing subtests): initial filter viaGetSubscription, Modify-Add / Modify-Remove / Overwrite viaUpdateSubscription, invalid-CEL rejection at both entry points, atomic rollback on Modify-Add with invalid expression, idempotent add+remove, OR semantics fully exercised (script-only, tx-only, both → exactly once, neither → dropped), script path still works whenevent.Txis unparseable, and the lazy-parse contract (matchesTxdoesn't invokegetTxon a filterless listener).────────────────────────────────────────
Area:
pkg/ark-lib/indexer/celenv/tx_env_test.go(new)What changed: 15 passing assertions covering
has(tx.extension),hasPacket,.contains(), parser-rejects-invalid-CEL, parser-rejects-non-bool, and the issue's exacthasPacket(tx.extension, <type>) / tx.extension[<type>].contains("<hex>")patterns against fixture txs.Acceptance criteria (#1072 (comment))
6/8 met as written, 2 with deviations worth flagging:
SubscriptionFilteroneof (SubscriptionFilter.txs.{modify|overwrite}.expressions) rather than a flatrepeated string tx_filtersfield on the request. This matches the unified-filter shape altafan asked for in single-connection GetSubscription flow #951's review (the criterion text predates that). Same client contract end-to-end (CEL strings in / matching events out).tx.extensionkeys are CELint(runtimeint64), notuint32. Done so users can writetx.extension[0x00]directly (matching the issue's example literals); with uint keys they'd need0x00u.All other criteria are met and covered by tests.
Notes
hex.DecodeString/MsgTx.Deserializecost.extension.NewExtensionFromTxparsing per tx. Worth a benchmark before merge if event volume is high — flagging here.