Skip to content

Commit e66c62b

Browse files
committed
Merge apache/iggy master into feat/doris-sink-connector
Resolve Cargo.toml workspace-deps conflict (keep both windows-native-keyring-store from master and wiremock from this branch, in sorted order) and migrate the Doris integration fixtures to sqlx 0.9's SqlSafeStr API (wrap test-controlled dynamic SQL in AssertSqlSafe).
2 parents 4ad34ed + 6b9eda3 commit e66c62b

262 files changed

Lines changed: 15982 additions & 1792 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.claude/skills/connector-runtime/SKILL.md

Lines changed: 283 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
---
2+
name: connector-sdk
3+
description: Extend or modify the Apache Iggy Connectors SDK (`core/connectors/sdk/`). Use when adding a new `Schema` variant, a new `StreamDecoder`/`StreamEncoder`, a new `Error` variant, modifying the `Sink`/`Source`/`Transform` trait surface, the FFI macros, or the `retry`/`api`/`convert` modules. NOT for writing plugins - use `connector-sink` or `connector-source` for those.
4+
---
5+
6+
# Extending the Apache Iggy Connectors SDK
7+
8+
The SDK (`core/connectors/sdk/`) is the **stable contract** between the
9+
runtime and every plugin. Changes ripple to every sink/source in-tree
10+
and every third-party plugin. Treat the public surface as a versioned API.
11+
12+
> **Universal connector rules** (benchmark, SecretString, drop accounting, exemplar patterns) live in
13+
> [connectors-overview](../connectors-overview/SKILL.md). This skill covers SDK-level changes only.
14+
15+
## Contents
16+
17+
- [STOP and ask the user before](#stop-and-ask-the-user-before)
18+
- [File map](#file-map)
19+
- [Cardinal rules](#cardinal-rules)
20+
- [Adding a new `Schema` variant](#adding-a-new-schema-variant)
21+
- [Adding a `StreamDecoder` / `StreamEncoder`](#adding-a-streamdecoder--streamencoder)
22+
- [Adding a new `Error` variant](#adding-a-new-error-variant)
23+
- [Modifying `Sink` / `Source` / `Transform` traits](#modifying-sink--source--transform-traits)
24+
- [FFI macros (`sink_connector!`, `source_connector!`)](#ffi-macros-sink_connector-source_connector)
25+
- [`Payload::try_to_bytes(&self)` is non-negotiable for JSON](#payloadtry_to_bytesself-is-non-negotiable-for-json)
26+
- [Retry helpers (`retry.rs`)](#retry-helpers-retryrs)
27+
- [`ConnectorState`](#connectorstate)
28+
- [Tests for SDK changes](#tests-for-sdk-changes)
29+
- [Before declaring done](#before-declaring-done)
30+
31+
## STOP and ask the user before
32+
33+
- Bumping the SDK MAJOR version or changing any `#[repr(C)]` layout - misaligns every pre-built plugin `.so`.
34+
- Changing `Sink` / `Source` / `Transform` / `StreamDecoder` / `StreamEncoder` trait signatures - cascades through every plugin.
35+
- Removing or renaming a `Schema` variant or `Error` variant - decoders/encoders pinned to it. downstream pattern matches break.
36+
- Changing wire serialization (postcard / rmp_serde / serde_json) for any FFI / state / config payload.
37+
- Changing FFI macro return codes or function names (`iggy_*_open` / `consume` / `handle` / `close`) - runtime + plugins both need lockstep update.
38+
39+
## File map
40+
41+
```text
42+
sdk/src/
43+
├── lib.rs Traits (Sink, Source, StreamDecoder, StreamEncoder),
44+
│ Payload, Schema, ConnectorState, Error enum, FFI message structs.
45+
├── sink.rs SinkContainer + sink_connector! macro (FFI plumbing).
46+
├── source.rs SourceContainer + source_connector! macro (FFI plumbing).
47+
├── api.rs ConnectorStatus, ConnectorStats (feature = "api").
48+
├── convert.rs owned_value_to_serde_json (simd_json ⇄ serde_json bridge).
49+
├── log.rs CallbackLayer for tracing across FFI.
50+
├── retry.rs CircuitBreaker, HttpRetryMiddleware, exponential_backoff, jitter.
51+
├── decoders/ One per schema: json, raw, text, proto, flatbuffer, avro.
52+
├── encoders/ Mirror of decoders.
53+
└── transforms/ add_fields, delete_fields, update_fields, filter_fields,
54+
unwrap_envelope, proto_convert, flatbuffer_convert, avro_convert.
55+
```
56+
57+
## Cardinal rules
58+
59+
1. **Apache 2.0 license header** on every new file.
60+
2. **`Send + Sync`** on every public trait (FFI runs across thread pools).
61+
3. **`#[repr(C)]`** on every type that crosses FFI (`Schema`, `TopicMetadata`, `MessagesMetadata`, `RawMessage`, `ProducedMessages`, `ConsumedMessage`, `DecodedMessage`, etc. in `lib.rs`). Adding fields requires bumping the SDK version - existing plugins built against the old layout will misalign on `postcard::from_bytes`.
62+
4. **postcard** for FFI message serialization (handled by `SinkContainer::consume`, `SourceContainer`'s `handle_messages`). **MessagePack (`rmp_serde`)** for `ConnectorState`. **JSON** (`serde_json`) only for human-editable config that crosses FFI. Don't mix.
63+
5. **`simd_json::OwnedValue`** for JSON payloads, not `serde_json::Value`. Use `convert::owned_value_to_serde_json` as a bridge when interop is required.
64+
6. **`BTreeMap`** for headers - deterministic ordering. Never `HashMap` on the wire.
65+
7. **No breaking changes** to `Sink`/`Source`/`StreamDecoder`/`StreamEncoder`/`Transform` trait signatures without coordinating with all in-tree plugins in the same PR.
66+
67+
## Adding a new `Schema` variant
68+
69+
The `Schema` enum (in `lib.rs`) is `#[repr(C)]` - it crosses FFI. Touch points to update **in one PR**:
70+
71+
1. `Schema` enum: add variant with `#[strum(to_string = "...")]` matching the `serde(rename_all="snake_case")` form.
72+
2. `Payload` enum: add a matching variant carrying the deserialized form.
73+
3. `Payload::try_into_vec` - consuming bytes-out path.
74+
4. `Payload::try_to_bytes` - **borrowing** bytes-out path. For non-trivial payloads (parsed trees), implement a no-clone serialization, not a `clone() + serialize`. See the `Payload::Json` arm for the canonical optimization.
75+
5. `Payload::Display`.
76+
6. `Schema::try_into_payload` - bytes → `Payload`.
77+
7. `Schema::decoder()` - factory returning `Arc<dyn StreamDecoder>`.
78+
8. `Schema::encoder()` - factory returning `Arc<dyn StreamEncoder>`.
79+
9. New `decoders/<name>.rs` and `encoders/<name>.rs`.
80+
10. Update `sdk/README.md` and `core/connectors/README.md` schema list.
81+
11. Tests: round-trip encode/decode, error paths.
82+
83+
Miss any of these → silent failures at runtime (typically `Error::InvalidPayloadType` or surprising decoder behavior in plugin code).
84+
85+
## Adding a `StreamDecoder` / `StreamEncoder`
86+
87+
Pattern from `decoders/json.rs::JsonStreamDecoder`:
88+
89+
```rust
90+
pub struct MyStreamDecoder; // unit struct if stateless
91+
92+
impl StreamDecoder for MyStreamDecoder {
93+
fn schema(&self) -> Schema { Schema::MyFormat }
94+
fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
95+
// Decode payload bytes into Payload::MyFormat(...).
96+
// Errors: Error::CannotDecode(Schema::MyFormat), Error::InvalidJsonPayload, etc.
97+
}
98+
}
99+
```
100+
101+
For **stateful** decoders (proto/avro/flatbuffer) holding schema descriptors:
102+
103+
- Store the schema state directly in the struct (`config`, `message_descriptor`, `schema`). The instance is built once and wrapped in `Arc<dyn>` for sharing - it must be `Send + Sync` but does not need additional locking if construction-time loading is final.
104+
- Provide **two constructors** when schema loading can fail: lenient `new(config) -> Self` (logs + degrades) AND strict `try_new(config) -> Result<Self, Error>` (fail-fast). Plus a `Default` impl for the no-schema case. Canonical example: `sdk/src/decoders/avro.rs` exports all three (`AvroStreamDecoder::new`, `::try_new`, `Default`).
105+
- If you genuinely need to mutate the schema after construction (e.g., a `update_config` method), use `&mut self` plus `std::mem::replace(&mut self.config, new_config)` to swap without cloning the old value. Live pattern: `sdk/src/decoders/avro.rs::AvroStreamDecoder::update_config` and `sdk/src/encoders/avro.rs::AvroStreamEncoder::update_config`.
106+
- A fresh decoder instance is created via `Schema::decoder()` on each call (it returns `Arc<dyn StreamDecoder>`), so per-decoder caching of shared mutable state is the wrong abstraction.
107+
108+
## Adding a new `Error` variant
109+
110+
The `Error` enum (in `lib.rs`) is `Clone + PartialEq + Eq + Hash`. New variants must preserve these. Guidelines:
111+
112+
- **Use existing variants first.** Only add a new one if the failure mode is distinct in handling, not just description. `InvalidConfigValue(String)` covers most "bad config" cases.
113+
- **Carry context as `String`** - don't add a struct unless multiple discrete fields are needed by callers programmatically.
114+
- **Document retry semantics in the docstring.** Existing variants (`PermanentHttpError`, `CatalogCommitError`, `TransactionApplyError`) document retry behavior - mirror that style.
115+
- **Place near related variants** to keep the enum readable top-to-bottom.
116+
117+
## Modifying `Sink` / `Source` / `Transform` traits
118+
119+
**Breaking changes.** Required process:
120+
121+
1. Document migration in the SDK changelog (and `connectors/sdk/README.md`).
122+
2. Update every in-tree plugin in the same PR. CI builds them all.
123+
3. Bump SDK minor version.
124+
4. Note in PR description: dynamic plugins compiled against the old SDK version will fail to load with mismatched FFI symbols.
125+
126+
Non-breaking additions (new default method, new struct field with `#[serde(default)]`, new variant on a non-exhaustive enum) are preferable.
127+
128+
## FFI macros (`sink_connector!`, `source_connector!`)
129+
130+
The macros (in `sink.rs` and `source.rs`) generate:
131+
132+
- `iggy_<kind>_open(id, config_ptr, config_len, [state_ptr, state_len,] log_callback) -> i32`
133+
- `iggy_sink_consume(id, ...)` / `iggy_source_handle(id, callback) -> i32`
134+
- `iggy_<kind>_close(id) -> i32`
135+
- `iggy_<kind>_version() -> *const c_char` (static lifetime, from `env!("CARGO_PKG_VERSION")`)
136+
137+
Invariants:
138+
139+
- **Duplicate-ID guard** returns `-1` if the caller reopens an ID without closing first. Don't remove this - it prevents silent buffered-data loss.
140+
- `INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>>` (and `SourceContainer<$type>` mirror) is the only mutable global the macro introduces - keep it that way.
141+
- Return codes: `0` success, `-1` invalid call, `1` open failure. Don't repurpose.
142+
143+
Changes to the FFI signature must update `runtime/src/main.rs::{SourceApi, SinkApi}` in the same PR.
144+
145+
## `Payload::try_to_bytes(&self)` is non-negotiable for JSON
146+
147+
Plugin authors call this on every consumed message. The implementation in `lib.rs::Payload::try_to_bytes` documents *why* it skips the deep `OwnedValue` clone - replacing `O(n) clone + O(n) serialize` with `O(n) serialize`. If you add a new variant requiring a parsed tree, do the same: serialize in place, don't clone.
148+
149+
## Retry helpers (`retry.rs`)
150+
151+
- `CircuitBreaker`: threshold + cooldown, `try_lock()` on the success path to avoid hot-path contention.
152+
- `HttpRetryMiddleware`: integrates with `reqwest-middleware`. Retries 429 + 5xx + network errors. Honors `Retry-After`.
153+
- `max_retries` = **total attempts** including the first try, not extra retries. Document if you change this convention.
154+
- New helpers must take `Duration` (not `u64 millis`) on the public API. Internal computation uses `humantime` parsing of `String`.
155+
156+
## `ConnectorState`
157+
158+
- Bytes are opaque - the wrapping type does not impose schema.
159+
- Serialization is MessagePack via `rmp_serde`. Compact, deterministic, well-supported in serde.
160+
- `serialize`/`deserialize` helpers return `Option<T>` and log on failure. **Failures are non-fatal** - design downstream code to tolerate fresh state.
161+
- Don't change the underlying format without coordinating with all sources.
162+
163+
## Tests for SDK changes
164+
165+
- Unit tests for any new pure function in the changed module.
166+
- Round-trip tests for new schemas/transforms (encode → decode == identity for JSON-equivalent payloads).
167+
- `sdk/tests/` integration tests when the change crosses module boundaries.
168+
169+
## Before declaring done
170+
171+
```text
172+
cargo fmt --all
173+
cargo sort --no-format --workspace
174+
cargo clippy -p iggy_connector_sdk --all-targets --all-features -- -D warnings
175+
cargo test -p iggy_connector_sdk --all-features
176+
177+
# Rebuild all plugins to catch breaking-change leaks:
178+
cargo build -p iggy_connector_stdout_sink -p iggy_connector_random_source
179+
# If FFI signatures changed, also build the runtime:
180+
cargo build -p iggy-connectors
181+
```
182+
183+
---
184+
185+
Discussion / help: see [AGENTS.md](../../../AGENTS.md#discussion-and-support).

0 commit comments

Comments
 (0)