Event Stream RPC#403
Conversation
📝 WalkthroughWalkthroughThis PR adds VHTLC event streaming: new API protobuf/OpenAPI schemas, an internal event translation layer from indexer script events, persistence of VHTLC tracking state, service subscription wiring, a gRPC EventStream endpoint, and end-to-end tests exercising lifecycle events. Changes
Sequence DiagramsequenceDiagram
participant Indexer as Indexer/Blockchain
participant DB as Database
participant Service as VHTLC Service
participant EventStream as Event Broadcaster
participant Handler as gRPC Handler
participant Client as gRPC Client
Indexer->>Service: ScriptEvent (funding or spend)
activate Service
Service->>DB: GetByScripts / GetScripts
DB-->>Service: VHTLC record(s)
Service->>Service: Classify event (CREATED/FUNDED/CLAIMED/REFUNDED/SPENT)
Service->>Service: Extract preimage (PSBT/witness/forfeit txs)
Service->>DB: UntrackByScripts (on terminal events)
Service->>EventStream: Emit VhtlcEvent
deactivate Service
EventStream->>Handler: Deliver VhtlcEvent
Handler->>Handler: toVhtlcEventProto conversion
Handler->>Client: Stream EventStreamResponse
activate Client
Client->>Client: Consume event
deactivate Client
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (6)
pkg/vhtlc/vhtlc.go (1)
297-309: Add context when returning errors in the new helper.The current returns propagate raw errors, which makes failures harder to trace during event-stream/script lookup flows.
Suggested patch
func LockingScriptHexFromOpts(opts Opts) (string, error) { vhtlcScript, err := NewVHTLCScriptFromOpts(opts) if err != nil { - return "", err + return "", fmt.Errorf("create vhtlc script from opts: %w", err) } tapKey, _, err := vhtlcScript.TapTree() if err != nil { - return "", err + return "", fmt.Errorf("derive taproot key from vhtlc script: %w", err) } scriptPubKey, err := txscript.PayToTaprootScript(tapKey) if err != nil { - return "", err + return "", fmt.Errorf("build taproot scriptPubKey: %w", err) } return hex.EncodeToString(scriptPubKey), nil }As per coding guidelines, "Always check errors and use descriptive error messages in error handling".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/vhtlc/vhtlc.go` around lines 297 - 309, Wrap and augment the raw errors returned from NewVHTLCScriptFromOpts, vhtlcScript.TapTree, and txscript.PayToTaprootScript with descriptive context so callers can trace which step failed; e.g., when NewVHTLCScriptFromOpts fails return an error like "failed to build VHTLC script: %w" referencing NewVHTLCScriptFromOpts/vhtlcScript, when TapTree fails return "failed to derive taproot key from VHTLC script: %w" referencing vhtlcScript.TapTree/tapKey, and when PayToTaprootScript fails return "failed to create scriptPubKey from tap key: %w" referencing txscript.PayToTaprootScript/scriptPubKey. Ensure you use fmt.Errorf or the project's error-wrapping utility to preserve the original error.internal/infrastructure/db/sqlite/vhtlc_repo.go (2)
82-113: Consider adding database-level script filtering for scalability.
GetByScriptsloads all VHTLCs into memory before filtering by computed locking script. While this works for small datasets, it could become inefficient if the VHTLC count grows significantly.For now this is acceptable given expected usage patterns, but consider adding a computed/indexed script column if performance becomes a concern.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 82 - 113, GetByScripts currently pulls all rows via r.querier.ListVHTLC and filters in memory using vhtlc.LockingScriptHexFromOpts, which won't scale; update the implementation (vhtlcRepository.GetByScripts) to perform filtering at the DB layer instead: add a new querier method that accepts the computed locking script(s) or a precomputed/ indexed script column and returns matching rows (e.g., ListVHTLCByLockingScripts or ListVHTLCWithScriptColumn), update the repository to call that new querier method rather than ListVHTLC, and if needed extend the schema to store/ index the locking script so LockingScriptHexFromOpts is not required during query time. Ensure the new querier and repository method names are used consistently and errors are propagated as before.
136-149: Consider batching UntrackByScripts updates.Each VHTLC is untracked via an individual
UntrackVHTLCcall. For multiple scripts, this results in N database round-trips. A single batch update would be more efficient.Given the expected low cardinality of terminal VHTLCs per event, this is acceptable but worth noting for future optimization.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 136 - 149, The current UntrackByScripts in vhtlcRepository calls querier.UntrackVHTLC for each vhtlc causing N DB round-trips; change it to collect all v.Id values from GetByScripts and call a single batch method (e.g., querier.UntrackVHTLCs or UntrackVHTLCByIds) that performs one UPDATE/DELETE with WHERE id IN (...) inside a transaction; add the new batch method to the querier interface/implementation and replace the per-item loop in UntrackByScripts to call that batch method with the slice of ids.internal/core/application/event_stream_test.go (1)
232-251: Remove unused helper function.
buildEventStreamRawSpendTxis defined but never called in the test file. Consider removing it to keep the test code clean, or add a test case that exercises raw transaction parsing.♻️ Remove unused code
-func buildEventStreamRawSpendTx(t *testing.T) string { - t.Helper() - - prevHash, err := chainhash.NewHashFromStr(randomEventStreamTxID(t)) - require.NoError(t, err) - - tx := wire.NewMsgTx(2) - tx.AddTxIn(&wire.TxIn{ - PreviousOutPoint: wire.OutPoint{ - Hash: *prevHash, - Index: 1, - }, - }) - tx.AddTxOut(&wire.TxOut{Value: 1}) - - var buf bytes.Buffer - err = tx.Serialize(&buf) - require.NoError(t, err) - return hex.EncodeToString(buf.Bytes()) -}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/application/event_stream_test.go` around lines 232 - 251, The function buildEventStreamRawSpendTx is unused; either delete it to remove dead code or add a test that exercises raw-transaction parsing and calls buildEventStreamRawSpendTx (e.g., a new TestParseRawSpendTx that decodes the returned hex, deserializes into a wire.MsgTx and asserts expected fields). If you remove it, also clean up any imports that become unused (bytes, hex, chainhash, wire, require) to keep the test file compiling.internal/core/application/service.go (1)
956-973: Consider handling subscription failure more robustly.When
vhtlcSubscription.subscribefails, the VHTLC is already persisted withTracked=truebut won't receive indexer events until the next restart. This could leave the VHTLC in an inconsistent state where it's marked as tracked but isn't actually subscribed.Consider either:
- Marking the VHTLC as untracked on subscription failure, or
- Retrying the subscription with backoff
The current behavior is acceptable for best-effort event delivery, but worth documenting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/application/service.go` around lines 956 - 973, The code currently persists a VHTLC with Tracked=true then calls s.vhtlcSubscription.subscribe(scripts) and only logs on error, leaving the DB inconsistent; fix by either (A) on subscribe error update the persisted VHTLC to Tracked=false (find the VHTLC record created earlier and persist the change) or (B) implement a retry-with-backoff that re-attempts s.vhtlcSubscription.subscribe(context.Background(), scripts) in a separate goroutine until success (use exponential backoff and a limit or cancellation), and only leave Tracked=true if a subscribe ultimately succeeds; locate the subscription call s.vhtlcSubscription.subscribe, the scripts derivation offchainAddressesPkScripts, and the VHTLC persistence where Tracked is set to change the behavior.internal/test/e2e/vhtlc_test.go (1)
1354-1387: Potential goroutine leak in waitForVhtlcEvent.The goroutine spawned on line 1366 to call
stream.Recv()will block indefinitely if the test times out, as there's no mechanism to cancel the blocking receive. This can lead to goroutine accumulation in long test runs.Consider using a context with timeout passed to the stream, or accepting this as acceptable test code behavior.
♻️ Alternative: Document the leak as acceptable for tests
+// waitForVhtlcEvent polls the stream until an event matches the predicate or +// times out. Note: On timeout, the goroutine calling stream.Recv() will leak +// until the stream is closed. This is acceptable for e2e tests. func waitForVhtlcEvent( t *testing.T, stream pb.NotificationService_EventStreamClient,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/test/e2e/vhtlc_test.go` around lines 1354 - 1387, The goroutine started inside waitForVhtlcEvent that calls stream.Recv() can block forever on test timeout and leak; modify waitForVhtlcEvent to make the Recv cancellable by using a context with timeout/cancel or a cancellable Recv API: create a ctx, pass it to the stream Recv (or to a wrapper that calls stream.Recv in a child goroutine) and cancel the ctx when the select hits the deadline so the goroutine unblocks; ensure the goroutine observes ctx.Done() (or returns the Recv error) and stops, and remove the unbounded eventCh/errCh leak by closing or returning on cancel. Target symbols: waitForVhtlcEvent, stream.Recv, eventCh, errCh.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sql`:
- Line 1: The migration currently adds tracked with DEFAULT FALSE which forces
all existing vhtlc rows to become untracked; change the migration to add tracked
as nullable (DEFAULT NULL), backfill existing rows to tracked = TRUE for all
pre-existing vhtlcs, then alter the schema to make tracked NOT NULL with DEFAULT
FALSE for future rows — reference the vhtlc table and the tracked column in your
changes and ensure the backfill happens before enforcing NOT NULL/DEFAULT.
In `@internal/interface/grpc/handlers/notification_handler.go`:
- Around line 161-177: listenToEvents currently iterates
h.eventsListenerHandler.listeners without synchronization, causing a data race
with pushListener/removeListener; fix by, inside listenToEvents (and mirror the
same change in listenToNotifications), acquire the eventsListenerHandler mutex,
make a shallow copy of h.eventsListenerHandler.listeners, then release the mutex
and iterate over the copied slice to spawn the goroutines that send to each
listener channel—this ensures pushListener/removeListener can safely mutate the
original slice while you iterate the copy.
---
Nitpick comments:
In `@internal/core/application/event_stream_test.go`:
- Around line 232-251: The function buildEventStreamRawSpendTx is unused; either
delete it to remove dead code or add a test that exercises raw-transaction
parsing and calls buildEventStreamRawSpendTx (e.g., a new TestParseRawSpendTx
that decodes the returned hex, deserializes into a wire.MsgTx and asserts
expected fields). If you remove it, also clean up any imports that become unused
(bytes, hex, chainhash, wire, require) to keep the test file compiling.
In `@internal/core/application/service.go`:
- Around line 956-973: The code currently persists a VHTLC with Tracked=true
then calls s.vhtlcSubscription.subscribe(scripts) and only logs on error,
leaving the DB inconsistent; fix by either (A) on subscribe error update the
persisted VHTLC to Tracked=false (find the VHTLC record created earlier and
persist the change) or (B) implement a retry-with-backoff that re-attempts
s.vhtlcSubscription.subscribe(context.Background(), scripts) in a separate
goroutine until success (use exponential backoff and a limit or cancellation),
and only leave Tracked=true if a subscribe ultimately succeeds; locate the
subscription call s.vhtlcSubscription.subscribe, the scripts derivation
offchainAddressesPkScripts, and the VHTLC persistence where Tracked is set to
change the behavior.
In `@internal/infrastructure/db/sqlite/vhtlc_repo.go`:
- Around line 82-113: GetByScripts currently pulls all rows via
r.querier.ListVHTLC and filters in memory using vhtlc.LockingScriptHexFromOpts,
which won't scale; update the implementation (vhtlcRepository.GetByScripts) to
perform filtering at the DB layer instead: add a new querier method that accepts
the computed locking script(s) or a precomputed/ indexed script column and
returns matching rows (e.g., ListVHTLCByLockingScripts or
ListVHTLCWithScriptColumn), update the repository to call that new querier
method rather than ListVHTLC, and if needed extend the schema to store/ index
the locking script so LockingScriptHexFromOpts is not required during query
time. Ensure the new querier and repository method names are used consistently
and errors are propagated as before.
- Around line 136-149: The current UntrackByScripts in vhtlcRepository calls
querier.UntrackVHTLC for each vhtlc causing N DB round-trips; change it to
collect all v.Id values from GetByScripts and call a single batch method (e.g.,
querier.UntrackVHTLCs or UntrackVHTLCByIds) that performs one UPDATE/DELETE with
WHERE id IN (...) inside a transaction; add the new batch method to the querier
interface/implementation and replace the per-item loop in UntrackByScripts to
call that batch method with the slice of ids.
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 1354-1387: The goroutine started inside waitForVhtlcEvent that
calls stream.Recv() can block forever on test timeout and leak; modify
waitForVhtlcEvent to make the Recv cancellable by using a context with
timeout/cancel or a cancellable Recv API: create a ctx, pass it to the stream
Recv (or to a wrapper that calls stream.Recv in a child goroutine) and cancel
the ctx when the select hits the deadline so the goroutine unblocks; ensure the
goroutine observes ctx.Done() (or returns the Recv error) and stops, and remove
the unbounded eventCh/errCh leak by closing or returning on cancel. Target
symbols: waitForVhtlcEvent, stream.Recv, eventCh, errCh.
In `@pkg/vhtlc/vhtlc.go`:
- Around line 297-309: Wrap and augment the raw errors returned from
NewVHTLCScriptFromOpts, vhtlcScript.TapTree, and txscript.PayToTaprootScript
with descriptive context so callers can trace which step failed; e.g., when
NewVHTLCScriptFromOpts fails return an error like "failed to build VHTLC script:
%w" referencing NewVHTLCScriptFromOpts/vhtlcScript, when TapTree fails return
"failed to derive taproot key from VHTLC script: %w" referencing
vhtlcScript.TapTree/tapKey, and when PayToTaprootScript fails return "failed to
create scriptPubKey from tap key: %w" referencing
txscript.PayToTaprootScript/scriptPubKey. Ensure you use fmt.Errorf or the
project's error-wrapping utility to preserve the original error.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 78bdb21c-bcac-4009-a88a-761ba1595568
⛔ Files ignored due to path filters (3)
api-spec/protobuf/gen/go/fulmine/v1/notification.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/go/fulmine/v1/notification_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/go/fulmine/v1/types.pb.gois excluded by!**/*.pb.go,!**/gen/**
📒 Files selected for processing (24)
api-spec/openapi/swagger/fulmine/v1/notification.swagger.jsonapi-spec/protobuf/fulmine/v1/notification.protoapi-spec/protobuf/fulmine/v1/types.protointernal/config/permissions.gointernal/core/application/event_stream.gointernal/core/application/event_stream_test.gointernal/core/application/service.gointernal/core/application/subscription.gointernal/core/domain/vhtlc.gointernal/infrastructure/db/badger/swap_repo.gointernal/infrastructure/db/badger/vhtlc_repo.gointernal/infrastructure/db/db_test.gointernal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.down.sqlinternal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sqlinternal/infrastructure/db/sqlite/sqlc/queries/models.gointernal/infrastructure/db/sqlite/sqlc/queries/query.sql.gointernal/infrastructure/db/sqlite/sqlc/query.sqlinternal/infrastructure/db/sqlite/sqlc/schema.sqlinternal/infrastructure/db/sqlite/vhtlc_repo.gointernal/interface/grpc/handlers/notification_handler.gointernal/interface/grpc/handlers/utils.gointernal/test/e2e/utils_test.gointernal/test/e2e/vhtlc_test.gopkg/vhtlc/vhtlc.go
| @@ -0,0 +1 @@ | |||
| ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE; | |||
There was a problem hiding this comment.
Defaulting migrated rows to untracked breaks upgrade continuity.
Line 1 sets tracked to FALSE by default, so all existing VHTLC rows become untracked immediately after migration. That can skip resubscription/event handling for already persisted VHTLCs after restart.
Suggested migration adjustment
-ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE;
+ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT TRUE;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE; | |
| ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT TRUE; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sql`
at line 1, The migration currently adds tracked with DEFAULT FALSE which forces
all existing vhtlc rows to become untracked; change the migration to add tracked
as nullable (DEFAULT NULL), backfill existing rows to tracked = TRUE for all
pre-existing vhtlcs, then alter the schema to make tracked NOT NULL with DEFAULT
FALSE for future rows — reference the vhtlc table and the tracked column in your
changes and ensure the backfill happens before enforcing NOT NULL/DEFAULT.
| func (h *notificationHandler) listenToEvents() { | ||
| for { | ||
| select { | ||
| case event := <-h.svc.GetEvents(context.Background()): | ||
| for _, l := range h.eventsListenerHandler.listeners { | ||
| go func(l *listener[*pb.EventStreamResponse]) { | ||
| l.ch <- &pb.EventStreamResponse{ | ||
| Event: toVhtlcEventProto(event), | ||
| } | ||
| }(l) | ||
| } | ||
| case <-h.stopCh: | ||
| h.eventsListenerHandler.stop() | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Data race when iterating listeners.
The listenToEvents loop reads h.eventsListenerHandler.listeners without holding the mutex, while pushListener and removeListener modify the slice under the lock. This creates a data race.
The same issue exists in listenToNotifications (line 147), but since this is new code, it's worth addressing here.
🔒 Proposed fix to protect listener iteration
func (h *notificationHandler) listenToEvents() {
for {
select {
case event := <-h.svc.GetEvents(context.Background()):
+ h.eventsListenerHandler.lock.Lock()
+ listeners := make([]*listener[*pb.EventStreamResponse], len(h.eventsListenerHandler.listeners))
+ copy(listeners, h.eventsListenerHandler.listeners)
+ h.eventsListenerHandler.lock.Unlock()
- for _, l := range h.eventsListenerHandler.listeners {
+ for _, l := range listeners {
go func(l *listener[*pb.EventStreamResponse]) {
l.ch <- &pb.EventStreamResponse{
Event: toVhtlcEventProto(event),
}
}(l)
}
case <-h.stopCh:
h.eventsListenerHandler.stop()
return
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (h *notificationHandler) listenToEvents() { | |
| for { | |
| select { | |
| case event := <-h.svc.GetEvents(context.Background()): | |
| for _, l := range h.eventsListenerHandler.listeners { | |
| go func(l *listener[*pb.EventStreamResponse]) { | |
| l.ch <- &pb.EventStreamResponse{ | |
| Event: toVhtlcEventProto(event), | |
| } | |
| }(l) | |
| } | |
| case <-h.stopCh: | |
| h.eventsListenerHandler.stop() | |
| return | |
| } | |
| } | |
| } | |
| func (h *notificationHandler) listenToEvents() { | |
| for { | |
| select { | |
| case event := <-h.svc.GetEvents(context.Background()): | |
| h.eventsListenerHandler.lock.Lock() | |
| listeners := make([]*listener[*pb.EventStreamResponse], len(h.eventsListenerHandler.listeners)) | |
| copy(listeners, h.eventsListenerHandler.listeners) | |
| h.eventsListenerHandler.lock.Unlock() | |
| for _, l := range listeners { | |
| go func(l *listener[*pb.EventStreamResponse]) { | |
| l.ch <- &pb.EventStreamResponse{ | |
| Event: toVhtlcEventProto(event), | |
| } | |
| }(l) | |
| } | |
| case <-h.stopCh: | |
| h.eventsListenerHandler.stop() | |
| return | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/interface/grpc/handlers/notification_handler.go` around lines 161 -
177, listenToEvents currently iterates h.eventsListenerHandler.listeners without
synchronization, causing a data race with pushListener/removeListener; fix by,
inside listenToEvents (and mirror the same change in listenToNotifications),
acquire the eventsListenerHandler mutex, make a shallow copy of
h.eventsListenerHandler.listeners, then release the mutex and iterate over the
copied slice to spawn the goroutines that send to each listener channel—this
ensures pushListener/removeListener can safely mutate the original slice while
you iterate the copy.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/test/e2e/vhtlc_test.go (1)
109-183: Add aVHTLC_SPENTcase to this matrix.The terminal action table only exercises CLAIMED and REFUNDED outcomes, but the feature also adds the generic spent fallback. Without one scenario that triggers an unclassifiable external spend and asserts
EVENT_TYPE_VHTLC_SPENT, the fallback path stays untested end to end. Based on learnings "When adding new features: define API in Protocol Buffers if needed, implement core logic ininternal/core/, add infrastructure layer ininternal/infrastructure/, expose via handlers ininternal/interface/, write tests, and update documentation".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/test/e2e/vhtlc_test.go` around lines 109 - 183, Add a new terminalAction entry to the actions slice named "spent" with expectedEvent set to pb.EventType_EVENT_TYPE_VHTLC_SPENT; implement its run func to create/derive a VHTLC (reuse buildTestVHTLC and setupArkSDKwithPublicKey patterns), then simulate an external/unclassifiable spend of that VHTLC via the Ark client (similar to submitManualRefundVHTLCWithoutReceiver) so the service observes an external spend and returns a non-empty txid (or redeem txid) and the test asserts no error and returns that txid; place it alongside the other cases ("claim","refund","settle_claim", etc.) so the end-to-end path for EVENT_TYPE_VHTLC_SPENT is exercised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 1342-1375: The helper waitForVhtlcEvent currently consumes and
discards any non-matching EventStreamResponse from stream
(pb.NotificationService_EventStreamClient), causing racey/flaky sequential
assertions; change it to preserve unmatched events: introduce a per-stream
in-memory buffer (e.g.,
map[*pb.NotificationService_EventStreamClient][]*pb.EventStreamResponse keyed by
the stream) and on each call first scan the buffer for a matching event (using
match func), returning if found; when reading from stream (via stream.Recv())
push non-matching responses into that buffer instead of dropping them so future
waitForVhtlcEvent invocations can observe them; keep the function signature
(waitForVhtlcEvent) and error/timeout behavior unchanged.
---
Nitpick comments:
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 109-183: Add a new terminalAction entry to the actions slice named
"spent" with expectedEvent set to pb.EventType_EVENT_TYPE_VHTLC_SPENT; implement
its run func to create/derive a VHTLC (reuse buildTestVHTLC and
setupArkSDKwithPublicKey patterns), then simulate an external/unclassifiable
spend of that VHTLC via the Ark client (similar to
submitManualRefundVHTLCWithoutReceiver) so the service observes an external
spend and returns a non-empty txid (or redeem txid) and the test asserts no
error and returns that txid; place it alongside the other cases
("claim","refund","settle_claim", etc.) so the end-to-end path for
EVENT_TYPE_VHTLC_SPENT is exercised.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 91f74b9b-e4c4-4509-86b4-036b1e915bf2
📒 Files selected for processing (3)
internal/test/e2e/main_test.gointernal/test/e2e/utils_test.gointernal/test/e2e/vhtlc_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/test/e2e/utils_test.go
| func waitForVhtlcEvent( | ||
| t *testing.T, | ||
| stream pb.NotificationService_EventStreamClient, | ||
| timeout time.Duration, | ||
| match func(event *pb.Event) bool, | ||
| ) *pb.Event { | ||
| t.Helper() | ||
|
|
||
| deadline := time.After(timeout) | ||
| for { | ||
| eventCh := make(chan *pb.EventStreamResponse, 1) | ||
| errCh := make(chan error, 1) | ||
| go func() { | ||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| eventCh <- resp | ||
| }() | ||
|
|
||
| select { | ||
| case <-deadline: | ||
| t.Fatalf("timed out waiting for vhtlc event") | ||
| case err := <-errCh: | ||
| require.NoError(t, err) | ||
| case resp := <-eventCh: | ||
| event := resp.GetEvent() | ||
| if event != nil && match(event) { | ||
| return event | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Avoid lossy matching on a single event stream.
This helper drops every non-matching event, which makes the sequential CREATED → FUNDED assertions in this test flaky. notification_handler.go fans out each event via its own goroutine into the same listener channel, so delivery order to one client is not guaranteed; if FUNDED is received before CREATED here, it is consumed and lost, and the next wait can time out.
Possible fix
-func waitForVhtlcEvent(
- t *testing.T,
- stream pb.NotificationService_EventStreamClient,
- timeout time.Duration,
- match func(event *pb.Event) bool,
-) *pb.Event {
- t.Helper()
-
- deadline := time.After(timeout)
- for {
- eventCh := make(chan *pb.EventStreamResponse, 1)
- errCh := make(chan error, 1)
- go func() {
- resp, err := stream.Recv()
- if err != nil {
- errCh <- err
- return
- }
- eventCh <- resp
- }()
-
- select {
- case <-deadline:
- t.Fatalf("timed out waiting for vhtlc event")
- case err := <-errCh:
- require.NoError(t, err)
- case resp := <-eventCh:
- event := resp.GetEvent()
- if event != nil && match(event) {
- return event
- }
- }
- }
-}
+func waitForVhtlcEvents(
+ t *testing.T,
+ stream pb.NotificationService_EventStreamClient,
+ timeout time.Duration,
+ id string,
+ want ...pb.EventType,
+) map[pb.EventType]*pb.Event {
+ t.Helper()
+
+ deadline := time.Now().Add(timeout)
+ wanted := make(map[pb.EventType]struct{}, len(want))
+ seen := make(map[pb.EventType]*pb.Event, len(want))
+ for _, typ := range want {
+ wanted[typ] = struct{}{}
+ }
+
+ for len(seen) < len(wanted) {
+ if time.Now().After(deadline) {
+ t.Fatalf("timed out waiting for VHTLC events %v", want)
+ }
+
+ resp, err := stream.Recv()
+ require.NoError(t, err)
+
+ event := resp.GetEvent()
+ if event == nil || event.GetId() != id {
+ continue
+ }
+ if _, ok := wanted[event.GetType()]; ok {
+ seen[event.GetType()] = event
+ }
+ }
+
+ return seen
+}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/test/e2e/vhtlc_test.go` around lines 1342 - 1375, The helper
waitForVhtlcEvent currently consumes and discards any non-matching
EventStreamResponse from stream (pb.NotificationService_EventStreamClient),
causing racey/flaky sequential assertions; change it to preserve unmatched
events: introduce a per-stream in-memory buffer (e.g.,
map[*pb.NotificationService_EventStreamClient][]*pb.EventStreamResponse keyed by
the stream) and on each call first scan the buffer for a matching event (using
match func), returning if found; when reading from stream (via stream.Recv())
push non-matching responses into that buffer instead of dropping them so future
waitForVhtlcEvent invocations can observe them; keep the function signature
(waitForVhtlcEvent) and error/timeout behavior unchanged.
Summary
Implements VHTLC
EventStreambacked by indexer script events.What Changed
trackedflag.Why
Event emission should reflect what actually happened on-chain / in Ark, not what an RPC method attempted to do. Parsing indexer events makes the stream work for RPC flows, batch settlement, manual transactions, external spenders, and restart recovery.
This also covers cases where another wallet client or external participant claims or refunds the VHTLC outside Fulmine.
If the spend path cannot be classified, Fulmine emits a generic
SPENTevent so consumers still know the VHTLC is terminal.@altafan @Kukks please review
Summary by CodeRabbit