Skip to content

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140

Open
ryerraguntla wants to merge 61 commits into
apache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector
Open

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140
ryerraguntla wants to merge 61 commits into
apache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector

Conversation

@ryerraguntla
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes # 3062

Rationale

This PR implements a unified InfluxDB connector supporting both InfluxDB V2 (Flux) and V3 (SQL) in a single crate per component (sink/source), eliminating code duplication while preserving full backward compatibility with existing V2 deployments.

Key Features:

  • Zero breaking changes for V2 users (backward-compatible config deserialization)
  • V3 stuck-timestamp detection with automatic batch inflation + circuit breaker
  • Performance improvements: SIMD JSON parsing (+40% in source), inlined hot paths (+3% in sink)
  • Enhanced safety: #[must_use] on critical functions, version-strict cursor validation
  • 95%+ test coverage maintained with 55+ new tests

What changed?

Architecture
Before (V2-only):
influxdb_sink/src/lib.rs (single flat config, 1,625 LOC)
influxdb_source/src/lib.rs (single flat config, 1,400 LOC)

After (V2 + V3)
influxdb_sink/src/
├── lib.rs (enum dispatch, 1,330 LOC)
└── protocol.rs (shared line-protocol escaping, 115 LOC)

influxdb_source/src/
├── lib.rs (enum dispatch, 817 LOC)
├── common.rs (shared config/validation, 815 LOC)
├── row.rs (CSV/JSONL parsing, 193 LOC)
├── v2.rs (Flux query logic, 374 LOC)
└── v3.rs (SQL query + stuck detection, 506 LOC)

Benefits:

Single .so per component (no InfluxClient trait overhead)
Zero code duplication (shared validation, escaping, retry logic)
Asymmetric structure (sink: 30-line diff; source: separate modules for V2/V3 query semantics)

For more details , please refer to the #3062 Comments section.
#3062 (comment)

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

AI Tools Used - Claude and Copilot

Scope of usage - Code review for quality and identifying performance issues. Generation of test cases, Documentation and summary notes generation.

Generated code is tested with actual test execution.

Can you explain every line of the code - yes

GaneshPatil7517 and others added 30 commits January 13, 2026 08:43
Implements Issue apache#2540 - Redshift Sink Connector with S3 staging support.

Features:
- S3 staging with automatic CSV file upload
- Redshift COPY command execution via PostgreSQL wire protocol
- IAM role authentication (recommended) or access key credentials
- Configurable batch size and compression (gzip, lzop, bzip2, zstd)
- Automatic table creation with customizable schema
- Retry logic with exponential backoff for transient failures
- Automatic cleanup of staged S3 files

Configuration options:
- connection_string: Redshift cluster connection URL
- target_table: Destination table name
- iam_role: IAM role ARN for S3 access (recommended)
- s3_bucket/s3_region/s3_prefix: S3 staging location
- batch_size: Messages per batch (default: 10000)
- compression: COPY compression format
- delete_staged_files: Auto-cleanup toggle (default: true)
- auto_create_table: Create table if missing (default: true)

Closes apache#2540
- Fix markdown lint issues in README.md (table formatting, blank lines, code fence language)
- Fix trailing newline in Cargo.toml
- Apply TOML formatting via taplo
- Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions)
- Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3
- Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO)
- Add path-style S3 access for custom endpoints
- Add localstack feature to testcontainers-modules
- Create test configuration files for Redshift connector
- Add s3_endpoint: None to test_config() in lib.rs (fixes E0063)
- Add endpoint parameter to S3Uploader tests in s3.rs
- Fix formatting for long line in init_s3_uploader()
- Add iggy_connector_redshift_sink to DEPENDENCIES.md
- Add maybe-async, md5, minidom to DEPENDENCIES.md
Critical fixes:
- Change Rust edition from 2024 to 2021 in Cargo.toml
- Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files)

Moderate fixes:
- Remove zstd from valid compression options (not supported by Redshift)
- Update README to remove zstd from compression list
- Handle bucket creation error in integration tests with expect()
- Log JSON serialization errors instead of silent unwrap_or_default()

Performance:
- Cache escaped quote string to avoid repeated format! allocations

Windows compatibility (for local testing):
- Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs
Fixes clippy warning about unused 'runtime' field in test setup struct.
The runtime field is kept for future test expansion.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Changed CONFIG_ to PLUGIN_CONFIG_ for plugin configuration fields
- Changed TOPICS_0 to TOPICS with proper JSON array format
- Added CONSUMER_GROUP environment variable
…ort with latest S3 crate

Migrate S3 usage from rust-s3 to s3-tokio and update related dependencies. Top-level Cargo.toml updated (http, lz4_flex, toml) and DEPENDENCIES.md adjusted. redshift_sink/Cargo.toml switched to s3-tokio, made sqlx a workspace dependency and added rustls as a dev-dependency. Code changes: S3Uploader now owns Bucket (removed Box) and tests install the rustls crypto provider. Integration tests were refactored to remove the manual testcontainers setup in favor of the iggy_harness-based test harness.
Introduce a new core/connectors/influxdb_common crate that provides a version-abstraction layer for InfluxDB (InfluxDB V2 and V3). Adds the InfluxDbAdapter trait, ApiVersion factory, line-protocol escaping helpers, CSV/JSONL response parsers, and concrete V2/V3 adapters plus unit tests and architecture notes. Wire the new crate into the workspace (Cargo.toml/Cargo.lock) and update existing influxdb sink/source connector manifests/sources to depend on it. Also add integration test fixtures and v3-specific integration tests and configs to exercise V3 behavior.
Remove the external influxdb common adapter and refactor the sink to natively support both V2 and V3 configurations.

Key changes:
- Removed iggy_connector_influxdb_common dependency (Cargo.toml & Cargo.lock) and inlined adapter logic.
- Introduced InfluxDbSinkConfig enum with V2/V3 variants and helper methods (url, auth header, build_write_url/health_url, precision mapping, feature flags, etc.).
- Reworked InfluxDbSink struct: store unified config, auth_header, measurement/precision, metadata flags, batch size limit, and other derived fields.
- Added line-protocol escaping helpers (write_measurement, write_tag_value, write_field_string) and simplified PayloadFormat handling.
- Adjusted client initialization, connectivity checks, retry middleware setup, and improved error messages and transient vs permanent error handling.
- Updated Sink impl: open(), consume(), process/ batching, circuit breaker interactions, and close() behavior.
- Expanded and updated unit tests to cover v2/v3 config behavior, URL/precision mapping, escaping, and append_line error/success cases.
- Added new source modules and test script files related to InfluxDB connectors.

This refactor centralises version-specific behaviour, improves configurability, and prepares the connector for V3 line-protocol and auth differences.
Delete influx_dB_test_proc_docs/scripts/test-connectors.sh — an interactive Bash end-to-end test harness for InfluxDB v2/v3 connector scenarios (Iggy messaging, polling, and five connector tests). Removes helper functions, polling logic and all test cases bundled in the script.
Extract shared parsing and protocol logic into the influxdb_common crate and update sinks/sources to consume it. Introduces delegate! macros to remove repetitive variant matching, unifies URL/auth handling via InfluxDbAdapter (including V3 precision mapping), and centralises line-protocol escaping/row parsing. Optimises body construction (build_body) and Bytes usage, adds extensive unit & HTTP integration tests (axum dev-dep), and updates Cargo.toml entries accordingly to reflect the new shared dependency.
Ensure health_url trims a trailing '/' from the base URL in both V2 and V3 adapters to avoid double slashes when appending /health, and add tests verifying the behavior. Add tests that verify write_url percent-encodes bucket/org/db query parameters and that decoding recovers the original values. Improve CSV row parsing by preallocating Row with capacity based on active headers. Clean up influxdb_source Cargo.toml by removing unused csv and futures deps, add a comment explaining dashmap/once_cell are required due to macro expansion, and update the ignored list.
Refactor and harden InfluxDB connector common code: move Row type into row.rs and re-export it; make ApiVersion::from_config return Result and error on unknown values (avoid silent defaulting); make V3 precision mapping return Result and reject invalid precisions; validate sink precision early in open() to prevent silent timestamp mistakes. Add tab escaping to line-protocol writers and expand unit tests (empty inputs, tab escapes, unicode). Make CSV parser flexible for multi-table results and handle header updates. Strengthen RFC3339 cursor regex to reject out-of-range date parts. Improve test fixture container port handling to support IPv6 mappings and better error messages. Misc: minor visibility changes, JSONL format constant, Cargo description tweak, and additional tests to cover URL/health/build_query error cases.
Add validation and runtime fixes across InfluxDB connectors:

- Require timezone suffix for cursor/initial_offset timestamps to avoid UTC-vs-local ambiguity and update regex/tests accordingly.
- Validate V2 sink config to reject empty or whitespace-only orgs at open() to prevent runtime 400s.
- Validate initial_offset early in source open() and add tests for invalid/timezone-free offsets.
- Warn when a V2 Flux query lacks an explicit sort() because Skip-N dedup relies on stable ordering.
- In V3 source row processing, emit a warning when no row contains the cursor column and ensure messages are still emitted while max_cursor remains None; add tests.
- Simplify auth header and health URL construction (removed dynamic adapter usage for these paths).
- Ensure circuit breaker records successes for successful batches and move record_success into the per-batch success path; add a test to prevent tripping on intermittent failures.
- Change several atomic counter loads to SeqCst for correctness in tests and tighten an unreachable branch where precision is validated.
- Minor protocol.rs doc clarifications about tab escaping in line protocol.

Includes multiple unit/integration tests covering the new validations and circuit-breaker behavior.
core/connectors/influxdb_common: broaden CSV header detection to recognize any of `_time`, `_start`, or `_stop` so Flux window-aggregate results are parsed correctly; add tests covering _start/_stop-only headers and aggregation queries.

core/connectors/sinks/influxdb_sink: strengthen atomic orderings (use AcqRel for fetch_add and Acquire for loads) to ensure correct cross-thread visibility of counters; update tests to use Acquire loads.

core/connectors/sources/influxdb_source: derive Debug for RowProcessingResult and change process_rows to return an Err(Error::InvalidRecordValue) when no row contains the configured cursor field (instead of silently leaving max_cursor None). Update tests to expect the error — this prevents silent infinite re-delivery and surfaces misconfigured queries to the operator.
Delete the shared iggy_connector_influxdb_common crate and fold its functionality into the sink and source connectors. protocol.rs was moved/renamed into core/connectors/sinks/influxdb_sink/src/protocol.rs (helper functions made crate-private); row parsing was moved into core/connectors/sources/influxdb_source/src/row.rs and made crate-private. Adapter/config/v2/v3 logic was inlined into the respective sink/source code (URL builders, auth header generation, precision mapping, query builders, health URL checks), and relevant visibility and call sites were updated. Workspace Cargo.toml and Cargo.lock were updated to remove the member/dependency and to add CSV where needed; tests were adapted/added for the inlined helpers and validation behavior.
Implement backward-compatible deserialization for InfluxDB configs by adding custom Deserialize impls for InfluxDbSinkConfig and InfluxDbSourceConfig that default missing version to "v2" and reject unknown versions with a clear error. Add V3-specific options and safety checks: introduce include_metadata to omit the cursor field from emitted payloads, add QUERY_FORMAT_JSONL, and enforce MAX_STUCK_CAP_FACTOR (100) with validation on open to avoid extremely large queries. Make timestamp comparison conservative (return false on parse failure) to avoid skipping data. Switch message ID generation to per-message UUIDs (remove uuid_base usage), adjust payload building to filter cursor when include_metadata=false, and small sink fix to append lines without producing trailing newlines. Update and add tests covering config deserialization, timestamp behavior, stuck-cap validation, and other affected behaviors.
Various refactors and improvements to InfluxDB source/sink connectors:

- Make many config fields pub(crate) to improve encapsulation.
- Add toml as a dev-dependency for connectors and add default "version = \"v2\"" to example config.toml files.
- Introduce base_url() helpers to normalize URLs (strip trailing slashes) and use them when building endpoints; validate V2 org is non-empty in sink config.
- Introduce RowContext to consolidate per-poll parameters passed to row-processing routines; simplify signatures for process_rows and poll functions and propagate include_metadata consistently.
- Optimize per-message UUID generation by deriving IDs from a single per-poll base UUID to reduce PRNG calls.
- Add query_has_sort_call heuristic to detect Flux sort() calls (avoids false positives on identifier prefixes) and use it when checking V2 queries.
- Improve error messages for cursor_field validation to be version-specific and add related tests.
- Add comments clarifying escaping rules and rationale for using simd_json in the sink hot path.
- Update integration test TOML keys from api_version to version and add unit tests verifying TOML deserialization defaults and behavior.

These changes are focused on robustness, performance, and clearer configuration/validation behavior.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

Let me look at the failing tests. Could be Cargo dependencies. Let me update branch

ryerraguntla and others added 3 commits May 4, 2026 08:00
Update Cargo.lock via cargo update: add a new package entry for lz4_flex v0.13.0 (with checksum and twox-hash dependency) and normalize dependency entries to explicitly reference lz4_flex 0.12.1 where applicable. This records the updated dependency resolution in the lockfile.
Add a new entry for lz4_flex v0.13.0 (MIT) to DEPENDENCIES.md to record the updated dependency version. The previous 0.12.1 entry remains in the file.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

@hubcio -Updated with the latest master. Please review at your convenience.

Comment thread core/connectors/sources/influxdb_source/src/v3.rs Outdated
Comment thread core/connectors/sources/influxdb_source/src/v2.rs Outdated
Comment thread core/connectors/sources/influxdb_source/src/v3.rs
Comment thread core/connectors/sources/influxdb_source/src/lib.rs
Comment thread core/connectors/sources/influxdb_source/src/lib.rs Outdated
Comment thread core/connectors/sinks/influxdb_sink/src/lib.rs
Comment thread core/connectors/sinks/quickwit_sink/src/lib.rs
Comment thread core/integration/tests/connectors/fixtures/wiremock.rs Outdated
Comment thread Cargo.toml Outdated
Comment thread core/connectors/sources/influxdb_source/src/common.rs
ryerraguntla and others added 12 commits May 5, 2026 12:23
Replace the previous ports()/map_to_host_port logic with a direct call to container.get_host_port_ipv4(WIREMOCK_PORT).await in the WireMock test fixture, and simplify the associated error message. This makes host port retrieval more straightforward and reduces mapping complexity.
Remove the workspace tokio entry from core/connectors/sinks/quickwit_sink/Cargo.toml and update Cargo.lock to drop tokio from the dependency list. This removes an unnecessary direct dependency from the quickwit sink; run a build/tests to verify there are no missing runtime or compilation requirements.
Multiple fixes and improvements across InfluxDB connectors:

- Cargo: pin assert_cmd to 2.2.0.
- Sink: switch JSON serialization path to serde_json for Json payloads; parse+compact non-Json payloads; treat timestamp==0 as unset and substitute current wall-clock time with a warning.
- Source (lib): add validation to reject V3 queries that lack an $offset when stuck-batch inflation is enabled (default); require V2 queries that use '>=' to include a sort call (error on open) to avoid skip-N dedup corruption; improve error handling when persisted V2 cursors fail RFC3339 parsing; update tests to include OFFSET and add V3 offset-related tests.
- V2: add a 256 MiB response body cap and stream response chunks to avoid OOMs, with explicit errors if cap exceeded or body is invalid UTF-8.
- V3: optimize payload serialization by serializing a borrowed RowView (avoids per-field clones); replace all_at_cursor boolean with rows_at_max_cursor counter and update stuck-batch detection to use this count; adjust stuck-batch circuit-breaker behavior to reset effective_batch_size to base and preserve offsets; update related tests.

These changes improve safety (prevent OOMs and silent data-loss cases), clarify semantics for stuck-batches, and reduce allocation/serialization overhead in hot paths.
Remove simd-json from the InfluxDB sink crate's dependencies and clean up related source code. Drop the now-unused timestamps_equal helper and its test from the InfluxDB source common module and remove its import from v3. Also apply small refactors/formatting: streamline serde_json serialization error handling in the sink and v3 modules, fix minor whitespace/formatting in tests and query string literal, and other small tidy-ups. These changes remove an unused dependency and eliminate dead code / improve formatting for clarity.
Include an OFFSET $offset placeholder in the SQL used by InfluxDb3SourceFixture to support paginated queries. Also remove a couple of stray blank lines in core/connectors/sources/influxdb_source/src/common.rs.
Change normalize_v3_timestamp to return Cow<'_, str> to avoid allocating when the input is already an RFC3339 timestamp, and adapt the call site to use the borrowed/owned value. Add extensive documentation to both V2 and V3 row-processing functions explaining cursor semantics, deduplication, timestamp normalization, cursor tracking, stuck-batch detection, message identity, and error conditions. Also clarify why V2 uses an `already_seen` parameter and how per-batch UUIDs are derived.
Sync Cargo.lock to updated dependency versions. Bump Arrow family crates (arrow, arrow-*, arrow-json, arrow-ipc, arrow-schema, etc.) to 58.3.0 and companion 57.3.1 releases where applicable, and update Parquet to 58.3.0 / 57.3.1. Also update various transitive deps and tooling crates (hashbrown, filetime, hybrid-array, kqueue-sys, libbz2-rs-sys, lz4_flex, retry-policies (rand -> 0.10.1), zerofrom, etc.). Update package checksums and adjust a few dependency entries (minor ordering/added axum in one sink). This commit only updates the lockfile to reflect those version changes.
Remove an extra space before the inline comment in normalize_v3_timestamp (core/connectors/sources/influxdb_source/src/v3.rs). Pure formatting change, no behavioral impact.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

@hubcio - I was traveling on family events and got delayed. I have implemented the review comments , please review and do the needful.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 13, 2026

@ryerraguntla no worries. take your time, family always should come first. i'll this review in upcoming days.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

please resolve conflicts

/author

@github-actions github-actions Bot added the S-waiting-on-author PR is waiting on author response label May 14, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

for future: see CONTRIBUTING.md. you can mark PRs as /ready once it's ready for review :) i'll do it now.

/review

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-waiting-on-author PR is waiting on author response labels May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants