arc: add output plugin for Arc columnar database#4236
arc: add output plugin for Arc columnar database#4236xe-nvdk wants to merge 9 commits intoredpanda-data:mainfrom
Conversation
Add a new batch output plugin that sends data to Arc's MessagePack ingestion endpoint. Arc is a columnar analytical database optimized for time-series and observability workloads, capable of ingesting millions of records per second. This plugin enables Redpanda Connect users to stream data from any supported source directly into Arc, making it straightforward to build ingestion pipelines for analytical workloads without custom integration code. Features: - Columnar and row payload formats (columnar is default, 25-35% faster) - zstd/gzip compression with pooled encoders for high throughput - Interpolated measurement names for per-message routing - Automatic timestamp detection (seconds, milliseconds, microseconds, nanoseconds) - Bloblang-based tag extraction for row format - Bearer token authentication and TLS support
|
Commits Review
|
|
Awesome feedback, thank you. We are going to address these today. |
Bug fixes: - Fix "time" column collision when timestamp_field is empty and message contains a "time" field, which caused duplicate column entries - Fix poisoned compression writers returned to pool on error, only return to pool on the success path - Construct fresh http.Transport instead of type-asserting on http.DefaultTransport which panics if replaced Test fixes: - Remove testify calls from HTTP handler goroutines to avoid cross-goroutine FailNow and race conditions - Add sync.Mutex to all tests with shared variables across goroutines - Use comma-ok type assertions for safer test failure messages - Extract shared newTestServer helper to reduce duplication New tests: - TestArcOutput_TimeColumnCollision verifies time field deduplication - TestArcOutput_InvalidConfig covers token/database CRLF rejection - Integration test with Arc Docker container (testcontainers-go) covering columnar write+query and row format write+query
|
Thanks for the thorough review @mmatczuk. All six issues have been addressed in the latest push: Bug fixes:
Test fixes:
All unit tests pass with Ready for another look. |
mmatczuk
left a comment
There was a problem hiding this comment.
Review of arc output plugin
|
Commits
Review
|
|
Vague commit message? Did you even read the message, yeah, the title maybe is vague but the message says what was fixed. So, I heavily recommend you to not depend 100% of AI, doesn't look good when the responses are not accurate. I hope that you take this comment in a good way. I will address the remaining issues that you found. Thank you |
LOL don't worry about it, it's a policy thing, we can squash it later |
…r field consistency - Add missing Apache 2.0 license header to output_test.go - Use defer to return compression writers to sync.Pool on all code paths, preventing pool depletion under sustained errors - Document that columnar batches require consistent field sets within a single batch (Arc validates server-side and rejects mismatches; schema evolution across batches is fully supported)
|
Thanks for the second pass @mmatczuk. Addressed in the latest push: 1. License header - Added Apache 2.0 header to 2. Columnar column length mismatch - This is an inherent constraint of columnar format, not a plugin bug. Arc validates this server-side and returns a clear error: // arc/internal/ingest/msgpack.go:202-216
for colName, colData := range payload.Columns {
if firstCol {
numRecords = len(colData)
firstCol = false
} else if len(colData) != numRecords {
return nil, fmt.Errorf("columnar format: array length mismatch (expected %d, got %d for '%s')",
numRecords, len(colData), colName)
}
}Data is never silently misaligned. Arc rejects the batch with a descriptive error that the plugin surfaces to the caller. Schema evolution across separate batches is fully supported. Batch 1 with columns Adding client-side null-padding would hide schema inconsistencies from the user. The row format already exists for heterogeneous schemas. Added a note in the config description documenting this constraint. 3. Pool writer leak - Fixed. Now uses |
| @@ -0,0 +1,482 @@ | |||
| // Copyright 2024 Redpanda Data, Inc. | |||
|
Please use https://github.com/redpanda-data/connect/tree/main/internal/httpclient for http client |
- Replace manual http.Transport construction with the shared internal/httpclient package, which provides TLS, retry, rate limiting, metrics, logging, and transport tuning out of the box - Field renamed from url to base_url to match httpclient conventions - Use BearerTokenSigner for auth instead of manual header building - Update copyright year from 2024 to 2026 on all arc package files
|
@mmatczuk all addressed in the latest push:
Also, a recommendation for a future PR: the current `github.com/vmihailenco/msgpack/v5` dependency is effectively unmaintained (last commit is over 3 years old). We maintain a fork at `github.com/Basekick-Labs/msgpack/v6` that we use in Arc with the following improvements:
The API is fully compatible with vmihailenco/msgpack/v5 (same `Marshal`/`Unmarshal` signatures and interfaces), so it would be a drop-in replacement. Happy to open a separate PR for that if you're open to it. |
|
Thanks for the heads up for |
mmatczuk
left a comment
There was a problem hiding this comment.
Bug: "time" field collision when timestamp_field is set to a non-"time" value. See inline comments.
| rec.columns["time"] = append(rec.columns["time"], ts) | ||
| for k, v := range dataMap { | ||
| if k == o.timestampField || (o.timestampField == "" && k == "time") { | ||
| continue | ||
| } | ||
| rec.columns[k] = append(rec.columns[k], v) | ||
| } |
There was a problem hiding this comment.
Bug: "time" column collision when timestamp_field is set to a value other than "time".
L295 unconditionally appends the extracted timestamp to rec.columns["time"]. The skip condition at L297 only excludes k == "time" when timestampField is empty. When timestampField is e.g. "ts" and a message contains a field literally named "time", that field passes the filter and gets appended to rec.columns["time"] via L300.
Result: the "time" column ends up with 2N entries while all other columns have N entries. Arc rejects the batch due to column length mismatch.
Fix: always skip k == "time" since the output reserves that column for the extracted timestamp:
if k == o.timestampField || k == "time" {| for k, v := range dataMap { | ||
| if k == o.timestampField || (o.timestampField == "" && k == "time") { | ||
| continue | ||
| } | ||
| fields[k] = v | ||
| } |
There was a problem hiding this comment.
Same "time" field collision issue as the columnar path. When timestamp_field is set to something other than "time" and a message has a "time" field, it leaks into fields as a regular data field alongside the extracted timestamp in "t". Lower severity here (data duplication, not structural corruption), but same root cause and same fix:
if k == o.timestampField || k == "time" {|
Several
|
|
Please fix lint and test |
|
@mmatczuk all addressed in the latest push: Error message forms - Updated all 6 locations to gerund form:
"time" column collision (when timestamp_field is set to something other than "time") - Good catch. Fixed in both columnar and row paths. The "time" column is now always reserved for the extracted timestamp; any literal "time" field in message data is skipped regardless of the `timestamp_field` configuration. Lint and tests - Both clean:
Ready for final look. |
- Update fmt.Errorf messages to project gerund-form convention (creating HTTP client, encoding msgpack, compressing, interpolating measurement, mapping tags, reading tags mapping result) - Always skip "time" key in message fields regardless of timestamp_field value (the "time" column is reserved for the extracted timestamp; a literal "time" field in data would otherwise collide in both columnar and row paths) - Fix golangci-lint issues: unused receiver on Connect, unused request params in test handlers, testify GreaterOrEqual for length check
ced9cd7 to
3c645f6
Compare
|
Could you run |
|
@mmatczuk done, auto-generated |
|
Work moved to #4265 |
Summary
arcbatch output plugin that sends data to Arc's MessagePack ingestion endpointFeatures
Example config
Test plan
go buildandgo vet