Skip to content

arc: add output plugin for Arc columnar database#4236

Closed
xe-nvdk wants to merge 9 commits intoredpanda-data:mainfrom
Basekick-Labs:arc-output-plugin
Closed

arc: add output plugin for Arc columnar database#4236
xe-nvdk wants to merge 9 commits intoredpanda-data:mainfrom
Basekick-Labs:arc-output-plugin

Conversation

@xe-nvdk
Copy link
Copy Markdown
Contributor

@xe-nvdk xe-nvdk commented Apr 9, 2026

Summary

  • Add a new arc batch output plugin that sends data to Arc's MessagePack ingestion endpoint
  • Arc is a columnar analytical database optimized for time-series and observability workloads, capable of ingesting millions of records per second
  • This plugin enables Redpanda Connect users to stream data from any supported source directly into Arc without custom integration code

Features

  • Columnar and row payload formats (columnar default, 25-35% faster ingestion)
  • zstd/gzip compression with pooled encoders for high throughput
  • Interpolated measurement names for per-message routing
  • Automatic timestamp unit detection (s, ms, us, ns)
  • Bloblang-based tag extraction for row format
  • Bearer token authentication and TLS support
  • No new dependencies (uses existing msgpack/v5 and klauspost/compress)

Example config

output:
  arc:
    url: http://localhost:8000
    database: mydb
    measurement: server_metrics
    format: columnar
    compression: zstd
    batching:
      count: 1000
      period: 1s

Test plan

  • Unit tests covering columnar, row, multi-measurement, compression, auth, errors, empty batch, and timestamp conversion (9 tests, all passing)
  • End-to-end test against a running Arc instance with generated data
  • Build verification with go build and go vet

Add a new batch output plugin that sends data to Arc's MessagePack
ingestion endpoint. Arc is a columnar analytical database optimized
for time-series and observability workloads, capable of ingesting
millions of records per second.

This plugin enables Redpanda Connect users to stream data from any
supported source directly into Arc, making it straightforward to
build ingestion pipelines for analytical workloads without custom
integration code.

Features:
- Columnar and row payload formats (columnar is default, 25-35% faster)
- zstd/gzip compression with pooled encoders for high throughput
- Interpolated measurement names for per-message routing
- Automatic timestamp detection (seconds, milliseconds, microseconds, nanoseconds)
- Bloblang-based tag extraction for row format
- Bearer token authentication and TLS support
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 9, 2026

CLA assistant check
All committers have signed the CLA.

Comment thread internal/impl/arc/output.go
Comment thread internal/impl/arc/output.go
Comment thread internal/impl/arc/output.go Outdated
Comment thread internal/impl/arc/output_test.go
Comment thread internal/impl/arc/output_test.go
Comment thread internal/impl/arc/output_test.go
@mmatczuk
Copy link
Copy Markdown
Contributor

Commits
LGTM

Review
New arc batch output plugin. Well-structured implementation with good compression pooling and columnar/row format support. A few real bugs and test pattern gaps need attention before merge.

  1. [Data corruption] "time" column collision in columnar formatoutput.go#L313-L319: when timestamp_field is "" (default), the guard k == o.timestampField never matches any real key. A message with a "time" field gets it appended both from the explicit timestamp line and from the loop, producing a column with twice as many entries as others and corrupting the payload.

  2. [Correctness] Poisoned encoder returned to pooloutput.go#L477-L479: when w.Close() fails, the zstd encoder (and gzip writer, ~L492) is put back into the pool in an unknown state. A subsequent caller may silently produce malformed output. Only Put on the success path.

  3. [Panic] http.DefaultTransport type assertionoutput.go#L218: panics if any code in the process replaces http.DefaultTransport with a non-*http.Transport. Construct a fresh http.Transport{} instead.

  4. [Test] require inside HTTP handler goroutinesoutput_test.go#L24-L31: affects 5 tests. require.NoError calls t.FailNow() on the handler goroutine, not the test goroutine, causing the handler to exit without sending a response and leaving the test goroutine hung. Use assert.NoError inside handlers.

  5. [Test] Data race on shared variablesoutput_test.go#L76-L84: 5 tests write received/contentEncoding/authHeader from the handler goroutine and read them from the test goroutine without a mutex. go test -race will flag these. Follow the mutex pattern from TestArcOutput_ColumnarSingleMeasurement.

  6. [Test] Missing config linting test, invalid-config tests, and integration tests — no test covers config spec self-consistency, error paths in newArcOutput (e.g., token/database with \r\n), or an end-to-end integration test for the new component.

@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 10, 2026

Awesome feedback, thank you. We are going to address these today.

Bug fixes:
- Fix "time" column collision when timestamp_field is empty and message
  contains a "time" field, which caused duplicate column entries
- Fix poisoned compression writers returned to pool on error, only
  return to pool on the success path
- Construct fresh http.Transport instead of type-asserting on
  http.DefaultTransport which panics if replaced

Test fixes:
- Remove testify calls from HTTP handler goroutines to avoid
  cross-goroutine FailNow and race conditions
- Add sync.Mutex to all tests with shared variables across goroutines
- Use comma-ok type assertions for safer test failure messages
- Extract shared newTestServer helper to reduce duplication

New tests:
- TestArcOutput_TimeColumnCollision verifies time field deduplication
- TestArcOutput_InvalidConfig covers token/database CRLF rejection
- Integration test with Arc Docker container (testcontainers-go)
  covering columnar write+query and row format write+query
@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 10, 2026

Thanks for the thorough review @mmatczuk. All six issues have been addressed in the latest push:

Bug fixes:

  1. Time column collision - Fixed. When timestamp_field is empty (default), the "time" key from message data is now excluded from the column loop to prevent duplicate entries. Same fix applied to both columnar and row paths.
  2. Poisoned encoder pool - Fixed. Compression writers are only returned to the pool on the success path. On error, they are left for GC.
  3. DefaultTransport type assertion - Fixed. Now constructs a fresh http.Transport with explicit settings instead of asserting on http.DefaultTransport.

Test fixes:
4. require inside handler goroutines - Fixed. Removed all testify calls from HTTP handler goroutines. Handlers now use plain if err != nil checks and return 500.
5. Data races on shared variables - Fixed. All tests now use sync.Mutex for variables shared between handler and test goroutines. Extracted a shared newTestServer helper to enforce the pattern consistently.
6. Missing tests - Added:

  • TestArcOutput_TimeColumnCollision - verifies the time field deduplication fix
  • TestArcOutput_InvalidConfig - table-driven test covering CRLF rejection for token and database
  • integration_test.go - end-to-end test using ghcr.io/basekick-labs/arc:latest via testcontainers-go, covering columnar write+query and row format write+query

All unit tests pass with -race, integration tests pass against a real Arc container.

Ready for another look.

Copy link
Copy Markdown
Contributor

@mmatczuk mmatczuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of arc output plugin

Comment thread internal/impl/arc/output_test.go
Comment thread internal/impl/arc/output.go
Comment thread internal/impl/arc/output.go
@mmatczuk
Copy link
Copy Markdown
Contributor

Commits

  1. b7e491ce015e "arc: address PR review feedback" — vague message. Does not describe what actually changed. Should be split or reworded to reflect the specific fixes (e.g., arc: fix time column collision and pool leak in compress, arc: add integration tests).

Review
New Arc batch output plugin with columnar/row formats, compression, auth, and TLS. Overall solid structure — correct registration pattern, proper use of service.MustRegisterBatchOutput, good test coverage. Three issues found:

  1. Missing license headeroutput_test.go:1 — Missing Apache 2.0 license header present in the other two files.
  2. Columnar column length mismatchoutput.go:320-326 — Messages with heterogeneous fields produce columns of different lengths, resulting in invalid columnar data.
  3. sync.Pool writer leak on erroroutput.go:488-511 — Compression writers are not returned to the pool on error paths in both zstd and gzip branches.

@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 13, 2026

Vague commit message? Did you even read the message, yeah, the title maybe is vague but the message says what was fixed.

So, I heavily recommend you to not depend 100% of AI, doesn't look good when the responses are not accurate.

I hope that you take this comment in a good way.

I will address the remaining issues that you found.

Thank you

@mmatczuk
Copy link
Copy Markdown
Contributor

Vague commit message? Did you even read the message, yeah, the title maybe is vague but the message says what was fixed.

LOL don't worry about it, it's a policy thing, we can squash it later

…r field consistency

- Add missing Apache 2.0 license header to output_test.go
- Use defer to return compression writers to sync.Pool on all code
  paths, preventing pool depletion under sustained errors
- Document that columnar batches require consistent field sets
  within a single batch (Arc validates server-side and rejects
  mismatches; schema evolution across batches is fully supported)
@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 13, 2026

Thanks for the second pass @mmatczuk. Addressed in the latest push:

1. License header - Added Apache 2.0 header to output_test.go.

2. Columnar column length mismatch - This is an inherent constraint of columnar format, not a plugin bug. Arc validates this server-side and returns a clear error:

// arc/internal/ingest/msgpack.go:202-216
for colName, colData := range payload.Columns {
    if firstCol {
        numRecords = len(colData)
        firstCol = false
    } else if len(colData) != numRecords {
        return nil, fmt.Errorf("columnar format: array length mismatch (expected %d, got %d for '%s')",
            numRecords, len(colData), colName)
    }
}

Data is never silently misaligned. Arc rejects the batch with a descriptive error that the plugin surfaces to the caller.

Schema evolution across separate batches is fully supported. Batch 1 with columns {a, b} and batch 2 with columns {a, c} works fine. Arc appends new columns and fills nulls for previous rows.

Adding client-side null-padding would hide schema inconsistencies from the user. The row format already exists for heterogeneous schemas. Added a note in the config description documenting this constraint.

3. Pool writer leak - Fixed. Now uses defer pool.Put(w) immediately after Get() so writers are returned on all code paths. Both zstd.Encoder and gzip.Writer support Reset() after Close(), so reuse is safe.

Comment thread internal/impl/arc/output_test.go Outdated
@@ -0,0 +1,482 @@
// Copyright 2024 Redpanda Data, Inc.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026

@mmatczuk
Copy link
Copy Markdown
Contributor

xe-nvdk added 2 commits April 14, 2026 07:27
- Replace manual http.Transport construction with the shared
  internal/httpclient package, which provides TLS, retry, rate
  limiting, metrics, logging, and transport tuning out of the box
- Field renamed from url to base_url to match httpclient conventions
- Use BearerTokenSigner for auth instead of manual header building
- Update copyright year from 2024 to 2026 on all arc package files
@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 14, 2026

@mmatczuk all addressed in the latest push:

  1. License header year - Updated copyright year from 2024 to 2026 across all arc package files.

  2. HTTP client - Refactored to use `internal/httpclient`. Now uses `httpclient.NewConfigFromParsed`, `httpclient.BearerTokenSigner`, and `httpclient.NewClient`. This removes our manual `http.Transport` construction, retry logic, and timeout handling. Field renamed from `url` to `base_url` to match the shared convention.

Also, a recommendation for a future PR: the current `github.com/vmihailenco/msgpack/v5` dependency is effectively unmaintained (last commit is over 3 years old). We maintain a fork at `github.com/Basekick-Labs/msgpack/v6` that we use in Arc with the following improvements:

  • +8.6% throughput and -9.6% p50 latency on real workloads (measured in Arc)
  • Zero-alloc `MarshalAppend` API for encoding into pooled buffers
  • `*interface{}` fast path that bypasses reflection for common decode patterns
  • Pooled encoder/decoder buffers with `GetEncoder/PutEncoder` helpers
  • Type-switch fast paths for `map[string]interface{}`, `map[string]string`, `[]interface{}`
  • Bug fixes for OOM protection (capped slice/map allocations), float-to-int validation, non-addressable pointer receivers, and non-string map key support in interface decode

The API is fully compatible with vmihailenco/msgpack/v5 (same `Marshal`/`Unmarshal` signatures and interfaces), so it would be a drop-in replacement. Happy to open a separate PR for that if you're open to it.

@mmatczuk
Copy link
Copy Markdown
Contributor

Thanks for the heads up for github.com/Basekick-Labs/msgpack/v6

Copy link
Copy Markdown
Contributor

@mmatczuk mmatczuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: "time" field collision when timestamp_field is set to a non-"time" value. See inline comments.

Comment on lines +295 to +301
rec.columns["time"] = append(rec.columns["time"], ts)
for k, v := range dataMap {
if k == o.timestampField || (o.timestampField == "" && k == "time") {
continue
}
rec.columns[k] = append(rec.columns[k], v)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: "time" column collision when timestamp_field is set to a value other than "time".

L295 unconditionally appends the extracted timestamp to rec.columns["time"]. The skip condition at L297 only excludes k == "time" when timestampField is empty. When timestampField is e.g. "ts" and a message contains a field literally named "time", that field passes the filter and gets appended to rec.columns["time"] via L300.

Result: the "time" column ends up with 2N entries while all other columns have N entries. Arc rejects the batch due to column length mismatch.

Fix: always skip k == "time" since the output reserves that column for the extracted timestamp:

if k == o.timestampField || k == "time" {

Comment on lines +349 to +354
for k, v := range dataMap {
if k == o.timestampField || (o.timestampField == "" && k == "time") {
continue
}
fields[k] = v
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same "time" field collision issue as the columnar path. When timestamp_field is set to something other than "time" and a message has a "time" field, it leaks into fields as a regular data field alongside the extracted timestamp in "t". Lower severity here (data duplication, not structural corruption), but same root cause and same fix:

if k == o.timestampField || k == "time" {

@mmatczuk
Copy link
Copy Markdown
Contributor

mmatczuk commented Apr 15, 2026

Several fmt.Errorf calls use noun form instead of gerund form. Project pattern requires gerund-form prefixes. The file is inconsistent — "creating request: %w" and "sending request: %w" are correct, but others are not:

  • L217: "create HTTP client: %w""creating HTTP client: %w"
  • L251: "msgpack encoding: %w""encoding msgpack: %w"
  • L255: "compression: %w""compressing: %w"
  • L275, L335: "measurement interpolation: %w""interpolating measurement: %w"
  • L368: "tags mapping: %w""mapping tags: %w"
  • L373: "tags mapping result: %w""reading tags mapping result: %w"

@mmatczuk
Copy link
Copy Markdown
Contributor

Please fix lint and test

@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 15, 2026

@mmatczuk all addressed in the latest push:

Error message forms - Updated all 6 locations to gerund form:

  • `creating HTTP client`
  • `encoding msgpack`
  • `compressing`
  • `interpolating measurement`
  • `mapping tags`
  • `reading tags mapping result`

"time" column collision (when timestamp_field is set to something other than "time") - Good catch. Fixed in both columnar and row paths. The "time" column is now always reserved for the extracted timestamp; any literal "time" field in message data is skipped regardless of the `timestamp_field` configuration.

Lint and tests - Both clean:

  • `golangci-lint run internal/impl/arc/...` - 0 issues
  • `go test ./internal/impl/arc/... -race` - all 13 unit tests pass
  • `go test ./internal/impl/arc/... -tags integration` - both integration tests pass against `ghcr.io/basekick-labs/arc:latest` (26.04.1), now including end-to-end query verification for row format as well

Ready for final look.

@xe-nvdk xe-nvdk changed the title arc: add output plugin for Arc columnar database arc: add batch output plugin with MessagePack columnar/row formats, zstd/gzip compression, Bloblang tags, interpolated routing, TLS and bearer auth Apr 15, 2026
- Update fmt.Errorf messages to project gerund-form convention
  (creating HTTP client, encoding msgpack, compressing, interpolating
  measurement, mapping tags, reading tags mapping result)
- Always skip "time" key in message fields regardless of
  timestamp_field value (the "time" column is reserved for the
  extracted timestamp; a literal "time" field in data would otherwise
  collide in both columnar and row paths)
- Fix golangci-lint issues: unused receiver on Connect, unused
  request params in test handlers, testify GreaterOrEqual for
  length check
@xe-nvdk xe-nvdk force-pushed the arc-output-plugin branch from ced9cd7 to 3c645f6 Compare April 15, 2026 14:33
@xe-nvdk xe-nvdk changed the title arc: add batch output plugin with MessagePack columnar/row formats, zstd/gzip compression, Bloblang tags, interpolated routing, TLS and bearer auth arc: add output plugin for Arc columnar database Apr 15, 2026
@mmatczuk
Copy link
Copy Markdown
Contributor

Could you run task docs and include the docs file for arc.

@xe-nvdk
Copy link
Copy Markdown
Contributor Author

xe-nvdk commented Apr 15, 2026

@mmatczuk done, auto-generated docs/modules/components/pages/outputs/arc.adoc is included in the latest push.

@mmatczuk
Copy link
Copy Markdown
Contributor

Work moved to #4265

@mmatczuk mmatczuk closed this Apr 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants