Skip to content

Commit 2ce5393

Browse files
committed
Add Kafka LZ4 producer compression support
Adds the lz4b NIF dependency and wires up configuration tos set brod's default_producer_config with lz4 compression.
1 parent 329808a commit 2ce5393

File tree

12 files changed

+82
-16
lines changed

12 files changed

+82
-16
lines changed

assets/svelte/consumers/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ export type KafkaConsumer = BaseConsumer & {
176176
tls: boolean;
177177
topic: string;
178178
sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512";
179+
producer_compression: "no_compression" | "lz4";
179180
};
180181
};
181182

assets/svelte/sinks/kafka/KafkaSinkCard.svelte

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,23 @@
5353
>
5454
</div>
5555
</div>
56+
57+
<div>
58+
<span class="text-sm text-muted-foreground">Producer Compression</span>
59+
<div class="mt-2">
60+
<span
61+
class="font-mono bg-slate-50 py-1 px-2 border border-slate-100 rounded-md whitespace-nowrap"
62+
>
63+
{#if consumer.sink.producer_compression === "lz4"}
64+
LZ4
65+
{:else if consumer.sink.producer_compression === "no_compression"}
66+
None
67+
{:else}
68+
{consumer.sink.producer_compression}
69+
{/if}
70+
</span>
71+
</div>
72+
</div>
5673
</div>
5774
</CardContent>
5875
</Card>

assets/svelte/sinks/kafka/KafkaSinkForm.svelte

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,27 @@
236236
</p>
237237
</div>
238238

239+
<!-- Producer Compression -->
240+
<div class="space-y-2">
241+
<Label for="producer_compression">Producer Compression</Label>
242+
<select
243+
id="producer_compression"
244+
bind:value={form.sink.producer_compression}
245+
class="block w-full border border-gray-300 rounded-md p-2"
246+
>
247+
<option value="no_compression">None</option>
248+
<option value="lz4">LZ4</option>
249+
</select>
250+
<p class="text-sm text-muted-foreground">
251+
Compression algorithm for Kafka producer messages.
252+
</p>
253+
{#if errors.sink?.producer_compression}
254+
<p class="text-destructive text-sm">
255+
{errors.sink.producer_compression}
256+
</p>
257+
{/if}
258+
</div>
259+
239260
<!-- TLS Switch -->
240261
<div class="space-y-2">
241262
<div class="flex items-center gap-2">

config/runtime.exs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,17 @@ config :sequin, Sequin.Runtime.HttpPushSqsPipeline,
8484
config :sequin, Sequin.Runtime.SlotProcessorServer,
8585
max_accumulated_bytes: ConfigParser.replication_flush_max_accumulated_bytes(env_vars),
8686
max_accumulated_messages: ConfigParser.replication_flush_max_accumulated_messages(env_vars),
87+
# ## Using releases
88+
#
89+
# If you use `mix release`, you need to explicitly enable the server
90+
# by passing the PHX_SERVER=true when you start it:
91+
#
92+
# PHX_SERVER=true bin/sequin start
93+
#
94+
# Alternatively, you can use `mix phx.gen.release` to generate a `bin/server`
95+
# script that automatically sets the env var above.
8796
max_accumulated_messages_time_ms: ConfigParser.replication_flush_max_accumulated_time_ms(env_vars)
8897

89-
# ## Using releases
90-
#
91-
# If you use `mix release`, you need to explicitly enable the server
92-
# by passing the PHX_SERVER=true when you start it:
93-
#
94-
# PHX_SERVER=true bin/sequin start
95-
#
96-
# Alternatively, you can use `mix phx.gen.release` to generate a `bin/server`
97-
# script that automatically sets the env var above.
9898
if System.get_env("PHX_SERVER") do
9999
config :sequin, SequinWeb.Endpoint, server: true
100100
config :sequin, SequinWeb.MetricsEndpoint, server: true

docs/reference/sequin-yaml.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ destination:
363363
username: "kafka-user" # Optional, for SASL authentication
364364
password: "kafka-pass" # Optional, for SASL authentication
365365
sasl_mechanism: "plain" # Optional: plain, scram_sha_256, scram_sha_512
366+
producer_compression: "lz4" # Optional: no_compression (default), lz4
366367
batch_size: 50 # Optional, messages per batch
367368
```
368369

lib/sequin/consumers/kafka_sink.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do
2525
field :aws_secret_access_key, EncryptedField
2626
field :connection_id, :string
2727
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
28+
field :producer_compression, Ecto.Enum, values: [:no_compression, :lz4], default: :no_compression
2829
end
2930

3031
def changeset(struct, params) do
@@ -39,7 +40,8 @@ defmodule Sequin.Consumers.KafkaSink do
3940
:aws_region,
4041
:aws_access_key_id,
4142
:aws_secret_access_key,
42-
:routing_mode
43+
:routing_mode,
44+
:producer_compression
4345
])
4446
|> validate_required([:hosts, :tls])
4547
|> validate_routing()
@@ -177,6 +179,7 @@ defmodule Sequin.Consumers.KafkaSink do
177179
|> maybe_add_ssl(sink)
178180
|> Keyword.put(:query_api_versions, true)
179181
|> Keyword.put(:auto_start_producers, true)
182+
|> Keyword.put(:default_producer_config, compression: sink.producer_compression)
180183
end
181184

182185
# Add SASL authentication if username/password are configured

lib/sequin/transforms/transforms.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ defmodule Sequin.Transforms do
226226
sasl_mechanism: sink.sasl_mechanism,
227227
aws_access_key_id: sink.aws_access_key_id,
228228
aws_secret_access_key: SensitiveValue.new(sink.aws_secret_access_key, show_sensitive),
229-
aws_region: sink.aws_region
229+
aws_region: sink.aws_region,
230+
producer_compression: sink.producer_compression
230231
})
231232
end
232233

@@ -1142,7 +1143,8 @@ defmodule Sequin.Transforms do
11421143
sasl_mechanism: sasl_mechanism,
11431144
aws_access_key_id: attrs["aws_access_key_id"],
11441145
aws_secret_access_key: attrs["aws_secret_access_key"],
1145-
aws_region: attrs["aws_region"]
1146+
aws_region: attrs["aws_region"],
1147+
producer_compression: attrs["producer_compression"]
11461148
}}
11471149
end
11481150
end

lib/sequin_web/live/components/consumer_form.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,8 @@ defmodule SequinWeb.Components.ConsumerForm do
820820
"aws_access_key_id" => sink["aws_access_key_id"],
821821
"aws_secret_access_key" => sink["aws_secret_access_key"],
822822
"aws_region" => sink["aws_region"],
823-
"batch_size" => sink["batch_size"]
823+
"batch_size" => sink["batch_size"],
824+
"producer_compression" => sink["producer_compression"]
824825
}
825826
end
826827

@@ -1090,7 +1091,8 @@ defmodule SequinWeb.Components.ConsumerForm do
10901091
"sasl_mechanism" => sink.sasl_mechanism,
10911092
"aws_access_key_id" => sink.aws_access_key_id,
10921093
"aws_secret_access_key" => sink.aws_secret_access_key,
1093-
"aws_region" => sink.aws_region
1094+
"aws_region" => sink.aws_region,
1095+
"producer_compression" => sink.producer_compression
10941096
}
10951097
end
10961098

@@ -1480,7 +1482,7 @@ defmodule SequinWeb.Components.ConsumerForm do
14801482
:sns -> {%SnsSink{}, %{batch_size: 10}}
14811483
:kinesis -> {%KinesisSink{}, %{batch_size: 100}}
14821484
:s2 -> {%S2Sink{}, %{batch_size: 10}}
1483-
:kafka -> {%KafkaSink{tls: false}, %{batch_size: 200}}
1485+
:kafka -> {%KafkaSink{tls: false, producer_compression: :no_compression}, %{batch_size: 200}}
14841486
:redis_stream -> {%RedisStreamSink{}, %{batch_size: 50}}
14851487
:sequin_stream -> {%SequinStreamSink{}, %{}}
14861488
:gcp_pubsub -> {%GcpPubsubSink{}, %{message_grouping: false, batch_size: 100}}

lib/sequin_web/live/sink_consumers/show.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,8 @@ defmodule SequinWeb.SinkConsumersLive.Show do
904904
password: sink.password,
905905
topic: sink.topic,
906906
tls: sink.tls,
907-
sasl_mechanism: sink.sasl_mechanism
907+
sasl_mechanism: sink.sasl_mechanism,
908+
producer_compression: sink.producer_compression
908909
}
909910
end
910911

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ defmodule Sequin.MixProject do
102102
{:amqp, "~> 4.1"},
103103
{:amqp_client, "~> 4.2"},
104104
{:brod, "~> 4.3"},
105+
{:lz4b, "~> 0.0.13"},
105106

106107
# Caching and State Management
107108
{:con_cache, "~> 1.1"},

0 commit comments

Comments
 (0)