Skip to content

Extract streamArrowIPC helper and add Flush-error regression test (follow-up to #424) #425

@xe-nvdk

Description

@xe-nvdk

Context

PR #424 fixed the streaming-on-client-disconnect bug in three handlers and added a regression test for one of them (`streamArrowJSON`). The JSON path has a testable extracted helper that takes `(ctx, w *bufio.Writer, reader array.RecordReader, ...)` and returns `(rowCount, error)` — the test exercises it directly without spinning up Fiber.

The Arrow IPC path (`internal/api/query_arrow.go`) has the same code shape but no equivalent helper: the streaming loop lives inline inside the `SetBodyStreamWriter` closure (lines 136-221). That makes it untestable without HTTP scaffolding, so the IPC path has zero coverage for the break-on-Flush behaviour, the decimal-cast Release ordering, and the streamCtx cancellation interaction.

This is fine as-is — the logic is structurally identical to the JSON helper and has been read by four reviewer agents — but it's the kind of thing that quietly regresses when someone else edits the closure later.

Proposed refactor

Extract the streaming loop body from `executeQueryArrow`'s closure into a free function:

```go
// streamArrowIPC writes Arrow record batches as IPC frames to w. Returns the
// total row count and any error that aborted the stream (timeout, ctx cancel,
// client disconnect via Flush failure, IPC writer error, or batch cast error).
func streamArrowIPC(
ctx context.Context,
w *bufio.Writer,
reader array.RecordReader,
schema *arrow.Schema,
castInfo *decimalCastInfo,
logger zerolog.Logger,
) (int64, error)
```

The closure inside `executeQueryArrow` becomes:

```go
c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
totalRows, streamErr := streamArrowIPC(streamCtx, w, reader, schema, castInfo, h.logger)
// ... existing cleanup (reader.Release, conn.Close, cancel, logging) ...
})
```

Test plan

Add `TestStreamArrowIPC_FlushErrorBreaksLoop` mirroring the JSON test in query_arrow_json_test.go:

  • Reuse the existing `errAfterNBytes` helper (move it to a shared `*_test.go` file in the same package if not already exported).
  • Build N records via `buildArrowBatch`, wrap in `newSimpleRecordReader`.
  • Wrap a failing `io.Writer` in `bufio.NewWriter` and pass to `streamArrowIPC`.
  • Assert: error wraps the sentinel via `errors.Is`, row count is bounded.

Add adjacent tests for:

  • Decimal cast path (`castInfo != nil`): verify casted batches are Released even on the break path.
  • Timeout-via-ctx: verify `streamCtx.Done()` short-circuits before the next batch.
  • Reader.Err() surfacing: feed a reader whose Err() returns non-nil after iteration; verify the returned error wraps it.

Out of scope for this issue

  • Adding metrics like `arc_query_client_disconnects_total` — separate observability work.
  • Fixing the pre-existing `buildArrowBatch` test-helper bug where builders aren't Released (would let us switch to `memory.NewCheckedAllocator` for stronger guarantees, but it's a six-test refactor).
  • Sentinel error `ErrClientDisconnected` for callers to branch on — defer until a caller actually needs it.

Origin

Filed as a follow-up to gemini-code-assist review on PR #424. The IPC path is correct as written; this issue is about lifting test coverage and reducing the risk of silent regression on future edits.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions