Skip to content

Event Stream RPC#403

Open
sekulicd wants to merge 2 commits into
masterfrom
EventStreamRPC
Open

Event Stream RPC#403
sekulicd wants to merge 2 commits into
masterfrom
EventStreamRPC

Conversation

@sekulicd
Copy link
Copy Markdown
Contributor

@sekulicd sekulicd commented Apr 28, 2026

Summary

Implements VHTLC EventStream backed by indexer script events.

What Changed

  • Added persistent VHTLC tracking with a tracked flag.
  • New VHTLCs are tracked by default.
  • Tracked VHTLC scripts are subscribed on startup.
  • VHTLCs are untracked once their script is spent.
  • Added EventStream publishing for:
    • VHTLC funded
    • VHTLC claimed
    • VHTLC refunded
    • generic VHTLC spent fallback
  • Added classification for direct spends and Ark settlement batch spends.
  • Added tests for repository tracking, event handling, and e2e VHTLC event paths.

Why

Event emission should reflect what actually happened on-chain / in Ark, not what an RPC method attempted to do. Parsing indexer events makes the stream work for RPC flows, batch settlement, manual transactions, external spenders, and restart recovery.

This also covers cases where another wallet client or external participant claims or refunds the VHTLC outside Fulmine.

If the spend path cannot be classified, Fulmine emits a generic SPENT event so consumers still know the VHTLC is terminal.

@altafan @Kukks please review

Summary by CodeRabbit

  • New Features
    • Streaming EventStream API for subscribing to VHTLC lifecycle events.
    • Emits VHTLC_CREATED, VHTLC_FUNDED and terminal events (VHTLC_CLAIMED, VHTLC_REFUNDED, VHTLC_SPENT) with id, txid, preimage (when applicable), type and timestamp.
    • Persistent tracking of VHTLCs to improve accurate event delivery; tests and end-to-end coverage added for event semantics.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 28, 2026

📝 Walkthrough

Walkthrough

This PR adds VHTLC event streaming: new API protobuf/OpenAPI schemas, an internal event translation layer from indexer script events, persistence of VHTLC tracking state, service subscription wiring, a gRPC EventStream endpoint, and end-to-end tests exercising lifecycle events.

Changes

Cohort / File(s) Summary
API Specs & Protobufs
api-spec/openapi/swagger/fulmine/v1/notification.swagger.json, api-spec/protobuf/fulmine/v1/notification.proto
Added event schemas (v1Event, v1EventType, v1EventStreamResponse) and a streaming RPC EventStream(EventStreamRequest) returns (stream EventStreamResponse).
Protobuf Types
api-spec/protobuf/fulmine/v1/types.proto
Added EventType enum and Event message to model VHTLC lifecycle events.
Application event stream
internal/core/application/event_stream.go, internal/core/application/event_stream_test.go
New translation layer classifying funding and terminal script events, extracting preimages, emitting VhtlcEvents, and tests validating funding→claim flows and classification logic.
Service & Subscription wiring
internal/core/application/service.go, internal/core/application/subscription.go, internal/config/permissions.go
Added events chan VhtlcEvent, GetEvents() method, vhtlcSubscription store, subscription lifecycle integration with Unlock/Lock, and permission mapping for notification RPC.
gRPC handler & utilities
internal/interface/grpc/handlers/notification_handler.go, internal/interface/grpc/handlers/utils.go
Added EventStream RPC handler, listener management/broadcast loop, and conversion helper toVhtlcEventProto.
Domain model & helper
internal/core/domain/vhtlc.go, pkg/vhtlc/vhtlc.go
Added Tracked field to Vhtlc, repository methods GetByScripts/GetScripts/UntrackByScripts, and LockingScriptHexFromOpts helper.
Badger persistence
internal/infrastructure/db/badger/vhtlc_repo.go, internal/infrastructure/db/badger/swap_repo.go
Persisted Tracked flag in vhtlc data and added script-based repository methods/behavior.
SQLite schema & SQLC layer
internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.*, internal/infrastructure/db/sqlite/sqlc/schema.sql, internal/infrastructure/db/sqlite/sqlc/query.sql, internal/infrastructure/db/sqlite/sqlc/queries/*, internal/infrastructure/db/sqlite/vhtlc_repo.go
Added tracked boolean column (default FALSE), updated SQLC models/queries to read/write tracked, added UntrackVHTLC, and implemented script-based repo methods.
DB tests
internal/infrastructure/db/db_test.go
Added test verifying script tracking lifecycle (GetScripts → UntrackByScripts → tracked=false).
End-to-end tests & helpers
internal/test/e2e/vhtlc_test.go, internal/test/e2e/utils_test.go, internal/test/e2e/main_test.go
Added TestVHTLCEventStream and helpers for create/fund/wait/settle flows, gRPC notification client factory, and settle retry/refactor logic.

Sequence Diagram

sequenceDiagram
    participant Indexer as Indexer/Blockchain
    participant DB as Database
    participant Service as VHTLC Service
    participant EventStream as Event Broadcaster
    participant Handler as gRPC Handler
    participant Client as gRPC Client

    Indexer->>Service: ScriptEvent (funding or spend)
    activate Service
    Service->>DB: GetByScripts / GetScripts
    DB-->>Service: VHTLC record(s)
    Service->>Service: Classify event (CREATED/FUNDED/CLAIMED/REFUNDED/SPENT)
    Service->>Service: Extract preimage (PSBT/witness/forfeit txs)
    Service->>DB: UntrackByScripts (on terminal events)
    Service->>EventStream: Emit VhtlcEvent
    deactivate Service

    EventStream->>Handler: Deliver VhtlcEvent
    Handler->>Handler: toVhtlcEventProto conversion
    Handler->>Client: Stream EventStreamResponse
    activate Client
    Client->>Client: Consume event
    deactivate Client
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • Fix Boltz Swaps #263 — touches internal/core/application/service.go; likely overlaps with service/events changes.
  • Fixed Receive Invoice #253 — modifies API notification protobuf/OpenAPI definitions; may conflict with schema changes.
  • Swap E2E #334 — modifies UnlockNode/subscription initialization in the service; related to subscription wiring here.

Suggested labels

enhancement

Suggested reviewers

  • altafan
  • Kukks

Poem

🐰 I hopped through logs and scripts tonight,
emitting events in rosy light,
from create to fund to claim so sweet,
I stream each hop with nimble feet — hooray, a stream complete! 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Event Stream RPC' directly summarizes the main feature being added: a new EventStream RPC endpoint for VHTLC lifecycle events, which is the primary objective of the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch EventStreamRPC

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
pkg/vhtlc/vhtlc.go (1)

297-309: Add context when returning errors in the new helper.

The current returns propagate raw errors, which makes failures harder to trace during event-stream/script lookup flows.

Suggested patch
 func LockingScriptHexFromOpts(opts Opts) (string, error) {
 	vhtlcScript, err := NewVHTLCScriptFromOpts(opts)
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("create vhtlc script from opts: %w", err)
 	}

 	tapKey, _, err := vhtlcScript.TapTree()
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("derive taproot key from vhtlc script: %w", err)
 	}

 	scriptPubKey, err := txscript.PayToTaprootScript(tapKey)
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("build taproot scriptPubKey: %w", err)
 	}

 	return hex.EncodeToString(scriptPubKey), nil
 }

As per coding guidelines, "Always check errors and use descriptive error messages in error handling".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/vhtlc/vhtlc.go` around lines 297 - 309, Wrap and augment the raw errors
returned from NewVHTLCScriptFromOpts, vhtlcScript.TapTree, and
txscript.PayToTaprootScript with descriptive context so callers can trace which
step failed; e.g., when NewVHTLCScriptFromOpts fails return an error like
"failed to build VHTLC script: %w" referencing
NewVHTLCScriptFromOpts/vhtlcScript, when TapTree fails return "failed to derive
taproot key from VHTLC script: %w" referencing vhtlcScript.TapTree/tapKey, and
when PayToTaprootScript fails return "failed to create scriptPubKey from tap
key: %w" referencing txscript.PayToTaprootScript/scriptPubKey. Ensure you use
fmt.Errorf or the project's error-wrapping utility to preserve the original
error.
internal/infrastructure/db/sqlite/vhtlc_repo.go (2)

82-113: Consider adding database-level script filtering for scalability.

GetByScripts loads all VHTLCs into memory before filtering by computed locking script. While this works for small datasets, it could become inefficient if the VHTLC count grows significantly.

For now this is acceptable given expected usage patterns, but consider adding a computed/indexed script column if performance becomes a concern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 82 - 113,
GetByScripts currently pulls all rows via r.querier.ListVHTLC and filters in
memory using vhtlc.LockingScriptHexFromOpts, which won't scale; update the
implementation (vhtlcRepository.GetByScripts) to perform filtering at the DB
layer instead: add a new querier method that accepts the computed locking
script(s) or a precomputed/ indexed script column and returns matching rows
(e.g., ListVHTLCByLockingScripts or ListVHTLCWithScriptColumn), update the
repository to call that new querier method rather than ListVHTLC, and if needed
extend the schema to store/ index the locking script so LockingScriptHexFromOpts
is not required during query time. Ensure the new querier and repository method
names are used consistently and errors are propagated as before.

136-149: Consider batching UntrackByScripts updates.

Each VHTLC is untracked via an individual UntrackVHTLC call. For multiple scripts, this results in N database round-trips. A single batch update would be more efficient.

Given the expected low cardinality of terminal VHTLCs per event, this is acceptable but worth noting for future optimization.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 136 - 149, The
current UntrackByScripts in vhtlcRepository calls querier.UntrackVHTLC for each
vhtlc causing N DB round-trips; change it to collect all v.Id values from
GetByScripts and call a single batch method (e.g., querier.UntrackVHTLCs or
UntrackVHTLCByIds) that performs one UPDATE/DELETE with WHERE id IN (...) inside
a transaction; add the new batch method to the querier interface/implementation
and replace the per-item loop in UntrackByScripts to call that batch method with
the slice of ids.
internal/core/application/event_stream_test.go (1)

232-251: Remove unused helper function.

buildEventStreamRawSpendTx is defined but never called in the test file. Consider removing it to keep the test code clean, or add a test case that exercises raw transaction parsing.

♻️ Remove unused code
-func buildEventStreamRawSpendTx(t *testing.T) string {
-	t.Helper()
-
-	prevHash, err := chainhash.NewHashFromStr(randomEventStreamTxID(t))
-	require.NoError(t, err)
-
-	tx := wire.NewMsgTx(2)
-	tx.AddTxIn(&wire.TxIn{
-		PreviousOutPoint: wire.OutPoint{
-			Hash:  *prevHash,
-			Index: 1,
-		},
-	})
-	tx.AddTxOut(&wire.TxOut{Value: 1})
-
-	var buf bytes.Buffer
-	err = tx.Serialize(&buf)
-	require.NoError(t, err)
-	return hex.EncodeToString(buf.Bytes())
-}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/event_stream_test.go` around lines 232 - 251, The
function buildEventStreamRawSpendTx is unused; either delete it to remove dead
code or add a test that exercises raw-transaction parsing and calls
buildEventStreamRawSpendTx (e.g., a new TestParseRawSpendTx that decodes the
returned hex, deserializes into a wire.MsgTx and asserts expected fields). If
you remove it, also clean up any imports that become unused (bytes, hex,
chainhash, wire, require) to keep the test file compiling.
internal/core/application/service.go (1)

956-973: Consider handling subscription failure more robustly.

When vhtlcSubscription.subscribe fails, the VHTLC is already persisted with Tracked=true but won't receive indexer events until the next restart. This could leave the VHTLC in an inconsistent state where it's marked as tracked but isn't actually subscribed.

Consider either:

  1. Marking the VHTLC as untracked on subscription failure, or
  2. Retrying the subscription with backoff

The current behavior is acceptable for best-effort event delivery, but worth documenting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 956 - 973, The code
currently persists a VHTLC with Tracked=true then calls
s.vhtlcSubscription.subscribe(scripts) and only logs on error, leaving the DB
inconsistent; fix by either (A) on subscribe error update the persisted VHTLC to
Tracked=false (find the VHTLC record created earlier and persist the change) or
(B) implement a retry-with-backoff that re-attempts
s.vhtlcSubscription.subscribe(context.Background(), scripts) in a separate
goroutine until success (use exponential backoff and a limit or cancellation),
and only leave Tracked=true if a subscribe ultimately succeeds; locate the
subscription call s.vhtlcSubscription.subscribe, the scripts derivation
offchainAddressesPkScripts, and the VHTLC persistence where Tracked is set to
change the behavior.
internal/test/e2e/vhtlc_test.go (1)

1354-1387: Potential goroutine leak in waitForVhtlcEvent.

The goroutine spawned on line 1366 to call stream.Recv() will block indefinitely if the test times out, as there's no mechanism to cancel the blocking receive. This can lead to goroutine accumulation in long test runs.

Consider using a context with timeout passed to the stream, or accepting this as acceptable test code behavior.

♻️ Alternative: Document the leak as acceptable for tests
+// waitForVhtlcEvent polls the stream until an event matches the predicate or
+// times out. Note: On timeout, the goroutine calling stream.Recv() will leak
+// until the stream is closed. This is acceptable for e2e tests.
 func waitForVhtlcEvent(
 	t *testing.T,
 	stream pb.NotificationService_EventStreamClient,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/vhtlc_test.go` around lines 1354 - 1387, The goroutine
started inside waitForVhtlcEvent that calls stream.Recv() can block forever on
test timeout and leak; modify waitForVhtlcEvent to make the Recv cancellable by
using a context with timeout/cancel or a cancellable Recv API: create a ctx,
pass it to the stream Recv (or to a wrapper that calls stream.Recv in a child
goroutine) and cancel the ctx when the select hits the deadline so the goroutine
unblocks; ensure the goroutine observes ctx.Done() (or returns the Recv error)
and stops, and remove the unbounded eventCh/errCh leak by closing or returning
on cancel. Target symbols: waitForVhtlcEvent, stream.Recv, eventCh, errCh.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sql`:
- Line 1: The migration currently adds tracked with DEFAULT FALSE which forces
all existing vhtlc rows to become untracked; change the migration to add tracked
as nullable (DEFAULT NULL), backfill existing rows to tracked = TRUE for all
pre-existing vhtlcs, then alter the schema to make tracked NOT NULL with DEFAULT
FALSE for future rows — reference the vhtlc table and the tracked column in your
changes and ensure the backfill happens before enforcing NOT NULL/DEFAULT.

In `@internal/interface/grpc/handlers/notification_handler.go`:
- Around line 161-177: listenToEvents currently iterates
h.eventsListenerHandler.listeners without synchronization, causing a data race
with pushListener/removeListener; fix by, inside listenToEvents (and mirror the
same change in listenToNotifications), acquire the eventsListenerHandler mutex,
make a shallow copy of h.eventsListenerHandler.listeners, then release the mutex
and iterate over the copied slice to spawn the goroutines that send to each
listener channel—this ensures pushListener/removeListener can safely mutate the
original slice while you iterate the copy.

---

Nitpick comments:
In `@internal/core/application/event_stream_test.go`:
- Around line 232-251: The function buildEventStreamRawSpendTx is unused; either
delete it to remove dead code or add a test that exercises raw-transaction
parsing and calls buildEventStreamRawSpendTx (e.g., a new TestParseRawSpendTx
that decodes the returned hex, deserializes into a wire.MsgTx and asserts
expected fields). If you remove it, also clean up any imports that become unused
(bytes, hex, chainhash, wire, require) to keep the test file compiling.

In `@internal/core/application/service.go`:
- Around line 956-973: The code currently persists a VHTLC with Tracked=true
then calls s.vhtlcSubscription.subscribe(scripts) and only logs on error,
leaving the DB inconsistent; fix by either (A) on subscribe error update the
persisted VHTLC to Tracked=false (find the VHTLC record created earlier and
persist the change) or (B) implement a retry-with-backoff that re-attempts
s.vhtlcSubscription.subscribe(context.Background(), scripts) in a separate
goroutine until success (use exponential backoff and a limit or cancellation),
and only leave Tracked=true if a subscribe ultimately succeeds; locate the
subscription call s.vhtlcSubscription.subscribe, the scripts derivation
offchainAddressesPkScripts, and the VHTLC persistence where Tracked is set to
change the behavior.

In `@internal/infrastructure/db/sqlite/vhtlc_repo.go`:
- Around line 82-113: GetByScripts currently pulls all rows via
r.querier.ListVHTLC and filters in memory using vhtlc.LockingScriptHexFromOpts,
which won't scale; update the implementation (vhtlcRepository.GetByScripts) to
perform filtering at the DB layer instead: add a new querier method that accepts
the computed locking script(s) or a precomputed/ indexed script column and
returns matching rows (e.g., ListVHTLCByLockingScripts or
ListVHTLCWithScriptColumn), update the repository to call that new querier
method rather than ListVHTLC, and if needed extend the schema to store/ index
the locking script so LockingScriptHexFromOpts is not required during query
time. Ensure the new querier and repository method names are used consistently
and errors are propagated as before.
- Around line 136-149: The current UntrackByScripts in vhtlcRepository calls
querier.UntrackVHTLC for each vhtlc causing N DB round-trips; change it to
collect all v.Id values from GetByScripts and call a single batch method (e.g.,
querier.UntrackVHTLCs or UntrackVHTLCByIds) that performs one UPDATE/DELETE with
WHERE id IN (...) inside a transaction; add the new batch method to the querier
interface/implementation and replace the per-item loop in UntrackByScripts to
call that batch method with the slice of ids.

In `@internal/test/e2e/vhtlc_test.go`:
- Around line 1354-1387: The goroutine started inside waitForVhtlcEvent that
calls stream.Recv() can block forever on test timeout and leak; modify
waitForVhtlcEvent to make the Recv cancellable by using a context with
timeout/cancel or a cancellable Recv API: create a ctx, pass it to the stream
Recv (or to a wrapper that calls stream.Recv in a child goroutine) and cancel
the ctx when the select hits the deadline so the goroutine unblocks; ensure the
goroutine observes ctx.Done() (or returns the Recv error) and stops, and remove
the unbounded eventCh/errCh leak by closing or returning on cancel. Target
symbols: waitForVhtlcEvent, stream.Recv, eventCh, errCh.

In `@pkg/vhtlc/vhtlc.go`:
- Around line 297-309: Wrap and augment the raw errors returned from
NewVHTLCScriptFromOpts, vhtlcScript.TapTree, and txscript.PayToTaprootScript
with descriptive context so callers can trace which step failed; e.g., when
NewVHTLCScriptFromOpts fails return an error like "failed to build VHTLC script:
%w" referencing NewVHTLCScriptFromOpts/vhtlcScript, when TapTree fails return
"failed to derive taproot key from VHTLC script: %w" referencing
vhtlcScript.TapTree/tapKey, and when PayToTaprootScript fails return "failed to
create scriptPubKey from tap key: %w" referencing
txscript.PayToTaprootScript/scriptPubKey. Ensure you use fmt.Errorf or the
project's error-wrapping utility to preserve the original error.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 78bdb21c-bcac-4009-a88a-761ba1595568

📥 Commits

Reviewing files that changed from the base of the PR and between 3ea61e2 and 12d44db.

⛔ Files ignored due to path filters (3)
  • api-spec/protobuf/gen/go/fulmine/v1/notification.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/go/fulmine/v1/notification_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/go/fulmine/v1/types.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (24)
  • api-spec/openapi/swagger/fulmine/v1/notification.swagger.json
  • api-spec/protobuf/fulmine/v1/notification.proto
  • api-spec/protobuf/fulmine/v1/types.proto
  • internal/config/permissions.go
  • internal/core/application/event_stream.go
  • internal/core/application/event_stream_test.go
  • internal/core/application/service.go
  • internal/core/application/subscription.go
  • internal/core/domain/vhtlc.go
  • internal/infrastructure/db/badger/swap_repo.go
  • internal/infrastructure/db/badger/vhtlc_repo.go
  • internal/infrastructure/db/db_test.go
  • internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.down.sql
  • internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sql
  • internal/infrastructure/db/sqlite/sqlc/queries/models.go
  • internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go
  • internal/infrastructure/db/sqlite/sqlc/query.sql
  • internal/infrastructure/db/sqlite/sqlc/schema.sql
  • internal/infrastructure/db/sqlite/vhtlc_repo.go
  • internal/interface/grpc/handlers/notification_handler.go
  • internal/interface/grpc/handlers/utils.go
  • internal/test/e2e/utils_test.go
  • internal/test/e2e/vhtlc_test.go
  • pkg/vhtlc/vhtlc.go

@@ -0,0 +1 @@
ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defaulting migrated rows to untracked breaks upgrade continuity.

Line 1 sets tracked to FALSE by default, so all existing VHTLC rows become untracked immediately after migration. That can skip resubscription/event handling for already persisted VHTLCs after restart.

Suggested migration adjustment
-ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE;
+ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT TRUE;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE vhtlc ADD COLUMN tracked BOOLEAN NOT NULL DEFAULT TRUE;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@internal/infrastructure/db/sqlite/migration/20260427000000_add_vhtlc_tracked.up.sql`
at line 1, The migration currently adds tracked with DEFAULT FALSE which forces
all existing vhtlc rows to become untracked; change the migration to add tracked
as nullable (DEFAULT NULL), backfill existing rows to tracked = TRUE for all
pre-existing vhtlcs, then alter the schema to make tracked NOT NULL with DEFAULT
FALSE for future rows — reference the vhtlc table and the tracked column in your
changes and ensure the backfill happens before enforcing NOT NULL/DEFAULT.

Comment on lines +161 to +177
func (h *notificationHandler) listenToEvents() {
for {
select {
case event := <-h.svc.GetEvents(context.Background()):
for _, l := range h.eventsListenerHandler.listeners {
go func(l *listener[*pb.EventStreamResponse]) {
l.ch <- &pb.EventStreamResponse{
Event: toVhtlcEventProto(event),
}
}(l)
}
case <-h.stopCh:
h.eventsListenerHandler.stop()
return
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Data race when iterating listeners.

The listenToEvents loop reads h.eventsListenerHandler.listeners without holding the mutex, while pushListener and removeListener modify the slice under the lock. This creates a data race.

The same issue exists in listenToNotifications (line 147), but since this is new code, it's worth addressing here.

🔒 Proposed fix to protect listener iteration
 func (h *notificationHandler) listenToEvents() {
 	for {
 		select {
 		case event := <-h.svc.GetEvents(context.Background()):
+			h.eventsListenerHandler.lock.Lock()
+			listeners := make([]*listener[*pb.EventStreamResponse], len(h.eventsListenerHandler.listeners))
+			copy(listeners, h.eventsListenerHandler.listeners)
+			h.eventsListenerHandler.lock.Unlock()
-			for _, l := range h.eventsListenerHandler.listeners {
+			for _, l := range listeners {
 				go func(l *listener[*pb.EventStreamResponse]) {
 					l.ch <- &pb.EventStreamResponse{
 						Event: toVhtlcEventProto(event),
 					}
 				}(l)
 			}
 		case <-h.stopCh:
 			h.eventsListenerHandler.stop()
 			return
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *notificationHandler) listenToEvents() {
for {
select {
case event := <-h.svc.GetEvents(context.Background()):
for _, l := range h.eventsListenerHandler.listeners {
go func(l *listener[*pb.EventStreamResponse]) {
l.ch <- &pb.EventStreamResponse{
Event: toVhtlcEventProto(event),
}
}(l)
}
case <-h.stopCh:
h.eventsListenerHandler.stop()
return
}
}
}
func (h *notificationHandler) listenToEvents() {
for {
select {
case event := <-h.svc.GetEvents(context.Background()):
h.eventsListenerHandler.lock.Lock()
listeners := make([]*listener[*pb.EventStreamResponse], len(h.eventsListenerHandler.listeners))
copy(listeners, h.eventsListenerHandler.listeners)
h.eventsListenerHandler.lock.Unlock()
for _, l := range listeners {
go func(l *listener[*pb.EventStreamResponse]) {
l.ch <- &pb.EventStreamResponse{
Event: toVhtlcEventProto(event),
}
}(l)
}
case <-h.stopCh:
h.eventsListenerHandler.stop()
return
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/notification_handler.go` around lines 161 -
177, listenToEvents currently iterates h.eventsListenerHandler.listeners without
synchronization, causing a data race with pushListener/removeListener; fix by,
inside listenToEvents (and mirror the same change in listenToNotifications),
acquire the eventsListenerHandler mutex, make a shallow copy of
h.eventsListenerHandler.listeners, then release the mutex and iterate over the
copied slice to spawn the goroutines that send to each listener channel—this
ensures pushListener/removeListener can safely mutate the original slice while
you iterate the copy.

@altafan altafan changed the title Evant Stream RPC Event Stream RPC Apr 28, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/test/e2e/vhtlc_test.go (1)

109-183: Add a VHTLC_SPENT case to this matrix.

The terminal action table only exercises CLAIMED and REFUNDED outcomes, but the feature also adds the generic spent fallback. Without one scenario that triggers an unclassifiable external spend and asserts EVENT_TYPE_VHTLC_SPENT, the fallback path stays untested end to end. Based on learnings "When adding new features: define API in Protocol Buffers if needed, implement core logic in internal/core/, add infrastructure layer in internal/infrastructure/, expose via handlers in internal/interface/, write tests, and update documentation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/vhtlc_test.go` around lines 109 - 183, Add a new
terminalAction entry to the actions slice named "spent" with expectedEvent set
to pb.EventType_EVENT_TYPE_VHTLC_SPENT; implement its run func to create/derive
a VHTLC (reuse buildTestVHTLC and setupArkSDKwithPublicKey patterns), then
simulate an external/unclassifiable spend of that VHTLC via the Ark client
(similar to submitManualRefundVHTLCWithoutReceiver) so the service observes an
external spend and returns a non-empty txid (or redeem txid) and the test
asserts no error and returns that txid; place it alongside the other cases
("claim","refund","settle_claim", etc.) so the end-to-end path for
EVENT_TYPE_VHTLC_SPENT is exercised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 1342-1375: The helper waitForVhtlcEvent currently consumes and
discards any non-matching EventStreamResponse from stream
(pb.NotificationService_EventStreamClient), causing racey/flaky sequential
assertions; change it to preserve unmatched events: introduce a per-stream
in-memory buffer (e.g.,
map[*pb.NotificationService_EventStreamClient][]*pb.EventStreamResponse keyed by
the stream) and on each call first scan the buffer for a matching event (using
match func), returning if found; when reading from stream (via stream.Recv())
push non-matching responses into that buffer instead of dropping them so future
waitForVhtlcEvent invocations can observe them; keep the function signature
(waitForVhtlcEvent) and error/timeout behavior unchanged.

---

Nitpick comments:
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 109-183: Add a new terminalAction entry to the actions slice named
"spent" with expectedEvent set to pb.EventType_EVENT_TYPE_VHTLC_SPENT; implement
its run func to create/derive a VHTLC (reuse buildTestVHTLC and
setupArkSDKwithPublicKey patterns), then simulate an external/unclassifiable
spend of that VHTLC via the Ark client (similar to
submitManualRefundVHTLCWithoutReceiver) so the service observes an external
spend and returns a non-empty txid (or redeem txid) and the test asserts no
error and returns that txid; place it alongside the other cases
("claim","refund","settle_claim", etc.) so the end-to-end path for
EVENT_TYPE_VHTLC_SPENT is exercised.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 91f74b9b-e4c4-4509-86b4-036b1e915bf2

📥 Commits

Reviewing files that changed from the base of the PR and between 12d44db and e148e0c.

📒 Files selected for processing (3)
  • internal/test/e2e/main_test.go
  • internal/test/e2e/utils_test.go
  • internal/test/e2e/vhtlc_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/test/e2e/utils_test.go

Comment on lines +1342 to +1375
func waitForVhtlcEvent(
t *testing.T,
stream pb.NotificationService_EventStreamClient,
timeout time.Duration,
match func(event *pb.Event) bool,
) *pb.Event {
t.Helper()

deadline := time.After(timeout)
for {
eventCh := make(chan *pb.EventStreamResponse, 1)
errCh := make(chan error, 1)
go func() {
resp, err := stream.Recv()
if err != nil {
errCh <- err
return
}
eventCh <- resp
}()

select {
case <-deadline:
t.Fatalf("timed out waiting for vhtlc event")
case err := <-errCh:
require.NoError(t, err)
case resp := <-eventCh:
event := resp.GetEvent()
if event != nil && match(event) {
return event
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid lossy matching on a single event stream.

This helper drops every non-matching event, which makes the sequential CREATED → FUNDED assertions in this test flaky. notification_handler.go fans out each event via its own goroutine into the same listener channel, so delivery order to one client is not guaranteed; if FUNDED is received before CREATED here, it is consumed and lost, and the next wait can time out.

Possible fix
-func waitForVhtlcEvent(
-	t *testing.T,
-	stream pb.NotificationService_EventStreamClient,
-	timeout time.Duration,
-	match func(event *pb.Event) bool,
-) *pb.Event {
-	t.Helper()
-
-	deadline := time.After(timeout)
-	for {
-		eventCh := make(chan *pb.EventStreamResponse, 1)
-		errCh := make(chan error, 1)
-		go func() {
-			resp, err := stream.Recv()
-			if err != nil {
-				errCh <- err
-				return
-			}
-			eventCh <- resp
-		}()
-
-		select {
-		case <-deadline:
-			t.Fatalf("timed out waiting for vhtlc event")
-		case err := <-errCh:
-			require.NoError(t, err)
-		case resp := <-eventCh:
-			event := resp.GetEvent()
-			if event != nil && match(event) {
-				return event
-			}
-		}
-	}
-}
+func waitForVhtlcEvents(
+	t *testing.T,
+	stream pb.NotificationService_EventStreamClient,
+	timeout time.Duration,
+	id string,
+	want ...pb.EventType,
+) map[pb.EventType]*pb.Event {
+	t.Helper()
+
+	deadline := time.Now().Add(timeout)
+	wanted := make(map[pb.EventType]struct{}, len(want))
+	seen := make(map[pb.EventType]*pb.Event, len(want))
+	for _, typ := range want {
+		wanted[typ] = struct{}{}
+	}
+
+	for len(seen) < len(wanted) {
+		if time.Now().After(deadline) {
+			t.Fatalf("timed out waiting for VHTLC events %v", want)
+		}
+
+		resp, err := stream.Recv()
+		require.NoError(t, err)
+
+		event := resp.GetEvent()
+		if event == nil || event.GetId() != id {
+			continue
+		}
+		if _, ok := wanted[event.GetType()]; ok {
+			seen[event.GetType()] = event
+		}
+	}
+
+	return seen
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/vhtlc_test.go` around lines 1342 - 1375, The helper
waitForVhtlcEvent currently consumes and discards any non-matching
EventStreamResponse from stream (pb.NotificationService_EventStreamClient),
causing racey/flaky sequential assertions; change it to preserve unmatched
events: introduce a per-stream in-memory buffer (e.g.,
map[*pb.NotificationService_EventStreamClient][]*pb.EventStreamResponse keyed by
the stream) and on each call first scan the buffer for a matching event (using
match func), returning if found; when reading from stream (via stream.Recv())
push non-matching responses into that buffer instead of dropping them so future
waitForVhtlcEvent invocations can observe them; keep the function signature
(waitForVhtlcEvent) and error/timeout behavior unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant