Improve Kafka adapter to provide features closer to Kafka Connect#306
Improve Kafka adapter to provide features closer to Kafka Connect#306amotl wants to merge 1 commit into
Conversation
bee01aa to
7e91ef5
Compare
| ``` | ||
| ```sh | ||
| ingestr ingest \ | ||
| --source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value' \ |
There was a problem hiding this comment.
would the same work without the select but the format set to flexible?
There was a problem hiding this comment.
Hi. When using select=value, the decoder machinery automatically selects the output format flexible, you don't need to specify it explicitly.
ingestr/docs/supported-sources/kafka.md
Lines 30 to 35 in 1fc2fc1
Longer explanation: format={standard_v1,standard_v2,flexible} is both a parameter and an internal identifier to let the adapter provide three different types of output layouts. This leaves the door open to add other flavours in the future. When the user chooses to use either the select or include option, the only viable output layout is flexible, so the decoder selects it automatically.
|
Hi again. Happy New Year! Do you think the patch could be a candidate for merging? Would you like to see any changes or improvements? I've just rebased it on top of |
a60c719 to
1fc2fc1
Compare
default_msg_processor into a miniature decoding unit|
Hi again. I've just refreshed the patch and further improved the documentation. |
- Accept a bunch of decoding options per `KafkaDecodingOptions` - Provide a bunch of output formatting options per `KafkaEvent` - Tie both elements together using `KafkaEventProcessor` The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots, a selection mechanism to limit the output to specific fields only, and a small projection mechanism to optionally drill down into a specific field. In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. The output formatter provides three different variants out of the box. More variants can be added in the future, as other users or use cases may have different requirements in the same area. Most importantly, the decoding unit is very compact, so relevant tasks don't need a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.
Pitch
Expand Kafka adapter's
default_msg_processorinto a miniature decoding unit.Loading data from Kafka into a database destination works well, but we found there are no options to specifically decode and drill down into the Kafka event value properly, in order to only relay that into the target database, without any metadata information.
Comparison
For example, Kafka Connect provides such configuration options for similar use cases which are fragments hereof.
Solution
This patch slightly builds upon and expands the existing
default_msg_processorimplementation to accept a few more options to resolve our needs, and to improve matureness closer to what Kafka Connect provides. The update is fully backwards compatible, which means the patch does not introduce a breaking change for existing users and applications.Details
KafkaDecodingOptionsKafkaEventKafkaEventProcessorThe machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots (
key_typeandvalue_type), a selection mechanism to limit the output to specific fields only (include), a small projection mechanism to optionally drill down into a specific field (select), and an option to select the output format (format).In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. Currently, the output formatter provides three different variants out of the box (
standard_v1,standard_v2,flexible) 1. More variants can be added in the future, as other users or use cases may have different requirements in the same area.Most importantly, the decoding unit is very compact, so relevant tasks do NOT require a corresponding transformation unit down the pipeline. This keeps the whole ensemble lean, in the very spirit of ingestr.
Preview
uv pip install --upgrade 'ingestr @ git+https://github.com/crate-workbench/ingestr.git@kafka-decoder'Example
Use URL parameters
value_type=json&select=valueto exclusively emit the Kafka event's message payload into the data sink.duckdb kafka.duckdb 'SELECT * FROM demo.kafka WHERE sensor_id>1;'Backlog
Footnotes
The
standard_v2output format is intended to resolve Naming things: Rename_kafka_msg_idto_kafka__msg_id#289. ↩