Skip to content

Improve Kafka adapter to provide features closer to Kafka Connect#306

Open
amotl wants to merge 1 commit into
bruin-data:mainfrom
crate-workbench:kafka-decoder
Open

Improve Kafka adapter to provide features closer to Kafka Connect#306
amotl wants to merge 1 commit into
bruin-data:mainfrom
crate-workbench:kafka-decoder

Conversation

@amotl
Copy link
Copy Markdown
Contributor

@amotl amotl commented Jul 13, 2025

Pitch

Expand Kafka adapter's default_msg_processor into a miniature decoding unit.

Loading data from Kafka into a database destination works well, but we found there are no options to specifically decode and drill down into the Kafka event value properly, in order to only relay that into the target database, without any metadata information.

Comparison

For example, Kafka Connect provides such configuration options for similar use cases which are fragments hereof.

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonIotaConverter",

Solution

This patch slightly builds upon and expands the existing default_msg_processor implementation to accept a few more options to resolve our needs, and to improve matureness closer to what Kafka Connect provides. The update is fully backwards compatible, which means the patch does not introduce a breaking change for existing users and applications.

Details

  • Accept a bunch of decoding options per KafkaDecodingOptions
  • Provide a bunch of output formatting options per KafkaEvent
  • Tie both elements together using KafkaEventProcessor

The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots (key_type and value_type), a selection mechanism to limit the output to specific fields only (include), a small projection mechanism to optionally drill down into a specific field (select), and an option to select the output format (format).

In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. Currently, the output formatter provides three different variants out of the box (standard_v1, standard_v2, flexible) 1. More variants can be added in the future, as other users or use cases may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks do NOT require a corresponding transformation unit down the pipeline. This keeps the whole ensemble lean, in the very spirit of ingestr.

Preview

uv pip install --upgrade 'ingestr @ git+https://github.com/crate-workbench/ingestr.git@kafka-decoder'

Example

Use URL parameters value_type=json&select=value to exclusively emit the Kafka event's message payload into the data sink.

docker run --rm --name=kafka \
  --publish=9092:9092 docker.io/apache/kafka:latest
echo '{"sensor_id":1,"ts":"2025-06-01 10:00","reading":42.42}' | \
  kcat -P -b localhost -t demo
echo '{"sensor_id":2,"ts":"2025-06-01 11:00","reading":451.00}' | \
  kcat -P -b localhost -t demo
ingestr ingest --yes \
  --source-uri "kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value" \
  --source-table "demo" \
  --dest-uri "duckdb:///kafka.duckdb" \
  --dest-table "demo.kafka"
duckdb kafka.duckdb 'SELECT * FROM demo.kafka WHERE sensor_id>1;'

Backlog

  • Add software tests for non-standard decoding and output formatting options
  • Add docs and improve inline comments

Footnotes

  1. The standard_v2 output format is intended to resolve Naming things: Rename _kafka_msg_id to _kafka__msg_id #289.

@amotl amotl force-pushed the kafka-decoder branch 4 times, most recently from bee01aa to 7e91ef5 Compare July 14, 2025 22:34
@amotl amotl marked this pull request as ready for review July 14, 2025 22:51
```
```sh
ingestr ingest \
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value' \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would the same work without the select but the format set to flexible?

Copy link
Copy Markdown
Contributor Author

@amotl amotl Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. When using select=value, the decoder machinery automatically selects the output format flexible, you don't need to specify it explicitly.

- `include`: A list of event attributes to include in the output, comma-separated.
- `select`: A single event attribute to select and drill down into.
Use `select=value` to relay the Kafka event **payload data** only.
- `format`: The output format/layout. Possible values: `standard_v1` (default),
`standard_v2`, `flexible`. When using the `include` or `select` option, the
decoder will automatically select the `flexible` output format.

Longer explanation: format={standard_v1,standard_v2,flexible} is both a parameter and an internal identifier to let the adapter provide three different types of output layouts. This leaves the door open to add other flavours in the future. When the user chooses to use either the select or include option, the only viable output layout is flexible, so the decoder selects it automatically.

Comment thread docs/supported-sources/kafka.md Outdated
Comment thread ingestr/src/kafka/model.py
@amotl amotl requested a review from karakanb August 3, 2025 21:05
Comment thread docs/supported-sources/kafka.md Outdated
@amotl
Copy link
Copy Markdown
Contributor Author

amotl commented Jan 5, 2026

Hi again. Happy New Year! Do you think the patch could be a candidate for merging? Would you like to see any changes or improvements? I've just rebased it on top of main.

@amotl amotl force-pushed the kafka-decoder branch 3 times, most recently from a60c719 to 1fc2fc1 Compare February 24, 2026 21:02
@amotl amotl changed the title Kafka: Expand default_msg_processor into a miniature decoding unit Improve Kafka adapter to provide features closer to Kafka Connect Feb 24, 2026
@amotl
Copy link
Copy Markdown
Contributor Author

amotl commented Feb 25, 2026

Hi again. I've just refreshed the patch and further improved the documentation.

- Accept a bunch of decoding options per `KafkaDecodingOptions`
- Provide a bunch of output formatting options per `KafkaEvent`
- Tie both elements together using `KafkaEventProcessor`

The machinery is effectively the same like before, but provides a few
more options to allow type decoding for Kafka event's key/value slots,
a selection mechanism to limit the output to specific fields only, and
a small projection mechanism to optionally drill down into a specific
field.

In combination, those decoding options allow users to relay
JSON-encoded Kafka event values directly into a destination table,
without any metadata wrappings.

The output formatter provides three different variants out of the box.
More variants can be added in the future, as other users or use cases
may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks
don't need a corresponding transformation unit down the pipeline, to
keep the whole ensemble lean, in the very spirit of ingestr.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants