From a3a2f2aa31a8806ab776e530ac06f74376951376 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Thu, 31 Jul 2025 16:24:09 +0530 Subject: [PATCH 01/10] ENGTAI-65929: Add custom metrics in AbstractThrottledPunctuator to track total events in eventStore and if the yield condition is hit --- .../AbstractThrottledPunctuator.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index b90576a..4f48492 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -1,5 +1,7 @@ package org.hypertrace.core.kafkastreams.framework.punctuators; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -8,6 +10,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.Punctuator; @@ -20,12 +23,26 @@ public abstract class AbstractThrottledPunctuator implements Punctuator { private final Clock clock; private final KeyValueStore> eventStore; private final ThrottledPunctuatorConfig config; + private final CustomMetrics customMetrics; public AbstractThrottledPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { this.clock = clock; this.config = config; this.eventStore = eventStore; + this.customMetrics = null; + } + + public AbstractThrottledPunctuator( + Clock clock, + ThrottledPunctuatorConfig config, + KeyValueStore> eventStore, + MeterRegistry meterRegistry, + String name) { + this.clock = clock; + this.config = config; + this.eventStore = eventStore; + this.customMetrics = new CustomMetrics(meterRegistry, name); } public void scheduleTask(long scheduleMs, T event) { @@ -85,6 +102,7 @@ public final void punctuate(long timestamp) { Map> rescheduledTasks = new HashMap<>(); // loop through all events for this key until yield timeout is reached int i = 0; + int tasksBefore = totalProcessedTasks; for (; i < events.size() && !shouldYieldNow(startTime); i++) { T event = events.get(i); totalProcessedTasks++; @@ -97,6 +115,10 @@ public final void punctuate(long timestamp) { .computeIfAbsent(normalize(rescheduleTimestamp), (t) -> new ArrayList<>()) .add(event)); } + if (customMetrics != null) { + customMetrics.setTaskProcessingStatus( + totalProcessedTasks > tasksBefore ? 0 : 1); // 0 if at least one task processed + } // process all reschedules rescheduledTasks.forEach( (newWindowMs, rescheduledEvents) -> { @@ -124,6 +146,9 @@ public final void punctuate(long timestamp) { } } } + if (customMetrics != null) { + customMetrics.setEventStoreSize(currentTotalEventCount()); + } log.debug( "processed windows: {}, processed tasks: {}, time taken: {}", totalProcessedWindows, @@ -148,4 +173,46 @@ private boolean shouldYieldNow(long startTimestamp) { private long normalize(long timestamp) { return timestamp - (timestamp % config.getWindowMs()); } + + private int currentTotalEventCount() { + int count = 0; + try (KeyValueIterator> it = eventStore.all()) { + while (it.hasNext()) { + count += Optional.ofNullable(it.next().value).map(List::size).orElse(0); + } + } + return count; + } + + protected final class CustomMetrics { + private final AtomicInteger eventStoreSize; + private final AtomicInteger noTasksProcessed; + + public CustomMetrics(MeterRegistry meterRegistry, String name) { + this.noTasksProcessed = new AtomicInteger(1); // default to 1 = no tasks processed + this.eventStoreSize = new AtomicInteger(0); + if (meterRegistry != null) { + Gauge.builder("abstract.throttled.punctuator.task.size", eventStoreSize, AtomicInteger::get) + .description("Total number of scheduled tasks across all windows") + .tag("name", name) + .register(meterRegistry); + Gauge.builder( + "abstract.throttled.punctuator.no.tasks.processed", + noTasksProcessed, + AtomicInteger::get) + .description( + "1 if no tasks processed in last punctuate call, 0 if at least one processed") + .tag("name", name) + .register(meterRegistry); + } + } + + public void setTaskProcessingStatus(int value) { + this.noTasksProcessed.set(value); + } + + public void setEventStoreSize(int value) { + this.eventStoreSize.set(value); + } + } } From 66f77c81bce6250fc76dcc83ed9ae328f1b7100a Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Fri, 1 Aug 2025 11:10:34 +0530 Subject: [PATCH 02/10] update metric --- .../AbstractThrottledPunctuator.java | 90 +++++++------------ .../AbstractThrottledPunctuatorTest.java | 2 +- 2 files changed, 32 insertions(+), 60 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 4f48492..395dc88 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -1,7 +1,7 @@ package org.hypertrace.core.kafkastreams.framework.punctuators; -import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -10,6 +10,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; @@ -20,29 +21,24 @@ @Slf4j public abstract class AbstractThrottledPunctuator implements Punctuator { + private static final ConcurrentHashMap totalEventCountGauge = + new ConcurrentHashMap<>(); + private static final String TOTAL_EVENT_COUNT_GAUGE_NAME = + "abstract.throttled.punctuator.total.events.count"; private final Clock clock; private final KeyValueStore> eventStore; private final ThrottledPunctuatorConfig config; - private final CustomMetrics customMetrics; - - public AbstractThrottledPunctuator( - Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { - this.clock = clock; - this.config = config; - this.eventStore = eventStore; - this.customMetrics = null; - } + private final MeterRegistry meterRegistry; public AbstractThrottledPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore, - MeterRegistry meterRegistry, - String name) { + MeterRegistry meterRegistry) { this.clock = clock; this.config = config; this.eventStore = eventStore; - this.customMetrics = new CustomMetrics(meterRegistry, name); + this.meterRegistry = meterRegistry; } public void scheduleTask(long scheduleMs, T event) { @@ -85,6 +81,7 @@ public final void punctuate(long timestamp) { long startTime = clock.millis(); int totalProcessedWindows = 0; int totalProcessedTasks = 0; + int totalEventCount = 0; log.debug( "Processing tasks with throttling yield of {} until timestamp {}", @@ -98,11 +95,11 @@ public final void punctuate(long timestamp) { totalProcessedWindows++; List events = kv.value; long windowMs = kv.key; + totalEventCount += events.size(); // collect all tasks to be rescheduled by key to perform bulk reschedules Map> rescheduledTasks = new HashMap<>(); // loop through all events for this key until yield timeout is reached int i = 0; - int tasksBefore = totalProcessedTasks; for (; i < events.size() && !shouldYieldNow(startTime); i++) { T event = events.get(i); totalProcessedTasks++; @@ -115,10 +112,6 @@ public final void punctuate(long timestamp) { .computeIfAbsent(normalize(rescheduleTimestamp), (t) -> new ArrayList<>()) .add(event)); } - if (customMetrics != null) { - customMetrics.setTaskProcessingStatus( - totalProcessedTasks > tasksBefore ? 0 : 1); // 0 if at least one task processed - } // process all reschedules rescheduledTasks.forEach( (newWindowMs, rescheduledEvents) -> { @@ -146,14 +139,16 @@ public final void punctuate(long timestamp) { } } } - if (customMetrics != null) { - customMetrics.setEventStoreSize(currentTotalEventCount()); - } + long timeTakenMs = clock.millis() - startTime; + boolean yielded = shouldYieldNow(startTime); + updateEventCountGauge(totalEventCount, yielded); + log.debug( - "processed windows: {}, processed tasks: {}, time taken: {}", + "processed windows: {}, processed tasks: {}, total events: {}, time taken: {}ms", totalProcessedWindows, totalProcessedTasks, - clock.millis() - startTime); + totalEventCount, + timeTakenMs); } protected abstract TaskResult executeTask(long punctuateTimestamp, T object); @@ -174,45 +169,22 @@ private long normalize(long timestamp) { return timestamp - (timestamp % config.getWindowMs()); } - private int currentTotalEventCount() { - int count = 0; - try (KeyValueIterator> it = eventStore.all()) { - while (it.hasNext()) { - count += Optional.ofNullable(it.next().value).map(List::size).orElse(0); - } + private void updateEventCountGauge(int totalEventCount, boolean yielded) { + if (meterRegistry == null) { + return; } - return count; - } - protected final class CustomMetrics { - private final AtomicInteger eventStoreSize; - private final AtomicInteger noTasksProcessed; - - public CustomMetrics(MeterRegistry meterRegistry, String name) { - this.noTasksProcessed = new AtomicInteger(1); // default to 1 = no tasks processed - this.eventStoreSize = new AtomicInteger(0); - if (meterRegistry != null) { - Gauge.builder("abstract.throttled.punctuator.task.size", eventStoreSize, AtomicInteger::get) - .description("Total number of scheduled tasks across all windows") - .tag("name", name) - .register(meterRegistry); - Gauge.builder( - "abstract.throttled.punctuator.no.tasks.processed", - noTasksProcessed, - AtomicInteger::get) - .description( - "1 if no tasks processed in last punctuate call, 0 if at least one processed") - .tag("name", name) - .register(meterRegistry); - } - } + String tagValue = String.valueOf(yielded); - public void setTaskProcessingStatus(int value) { - this.noTasksProcessed.set(value); - } + AtomicInteger gauge = + totalEventCountGauge.computeIfAbsent( + tagValue, + key -> { + AtomicInteger newGauge = new AtomicInteger(0); + meterRegistry.gauge(TOTAL_EVENT_COUNT_GAUGE_NAME, Tags.of("yielded", key), newGauge); + return newGauge; + }); - public void setEventStoreSize(int value) { - this.eventStoreSize.set(value); - } + gauge.set(totalEventCount); } } diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java index 95cfcb6..f232470 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java @@ -194,7 +194,7 @@ public TestPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> objectStore) { - super(clock, config, objectStore); + super(clock, config, objectStore, null); } void setReturnResult(String object, TaskResult result) { From c575d0f3ac57d409e2013205a1934a51925dc014 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Fri, 1 Aug 2025 12:14:35 +0530 Subject: [PATCH 03/10] update method name --- .../framework/punctuators/AbstractThrottledPunctuator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 395dc88..c98c398 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -141,7 +141,7 @@ public final void punctuate(long timestamp) { } long timeTakenMs = clock.millis() - startTime; boolean yielded = shouldYieldNow(startTime); - updateEventCountGauge(totalEventCount, yielded); + updateTotalEventCountGauge(totalEventCount, yielded); log.debug( "processed windows: {}, processed tasks: {}, total events: {}, time taken: {}ms", @@ -169,7 +169,7 @@ private long normalize(long timestamp) { return timestamp - (timestamp % config.getWindowMs()); } - private void updateEventCountGauge(int totalEventCount, boolean yielded) { + private void updateTotalEventCountGauge(int totalEventCount, boolean yielded) { if (meterRegistry == null) { return; } From 8c32928c4244c991b0e6f6e4ec44b26b36e9e94a Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Fri, 1 Aug 2025 12:30:26 +0530 Subject: [PATCH 04/10] add default constructor --- .../framework/punctuators/AbstractThrottledPunctuator.java | 5 +++++ .../punctuators/AbstractThrottledPunctuatorTest.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index c98c398..9517a2f 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -41,6 +41,11 @@ public AbstractThrottledPunctuator( this.meterRegistry = meterRegistry; } + public AbstractThrottledPunctuator( + Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { + this(clock, config, eventStore, null); + } + public void scheduleTask(long scheduleMs, T event) { long windowMs = normalize(scheduleMs); List events = Optional.ofNullable(eventStore.get(windowMs)).orElse(new ArrayList<>()); diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java index f232470..95cfcb6 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuatorTest.java @@ -194,7 +194,7 @@ public TestPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> objectStore) { - super(clock, config, objectStore, null); + super(clock, config, objectStore); } void setReturnResult(String object, TaskResult result) { From 7c0f04f2cb50e3a75cab85c7d19a48d66c26bb67 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Mon, 4 Aug 2025 13:12:00 +0530 Subject: [PATCH 05/10] separate yielded metric from tags to gauge --- .../AbstractThrottledPunctuator.java | 41 +++++-------------- 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 9517a2f..4bd3af4 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -10,8 +10,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.Punctuator; @@ -21,10 +19,6 @@ @Slf4j public abstract class AbstractThrottledPunctuator implements Punctuator { - private static final ConcurrentHashMap totalEventCountGauge = - new ConcurrentHashMap<>(); - private static final String TOTAL_EVENT_COUNT_GAUGE_NAME = - "abstract.throttled.punctuator.total.events.count"; private final Clock clock; private final KeyValueStore> eventStore; private final ThrottledPunctuatorConfig config; @@ -86,7 +80,6 @@ public final void punctuate(long timestamp) { long startTime = clock.millis(); int totalProcessedWindows = 0; int totalProcessedTasks = 0; - int totalEventCount = 0; log.debug( "Processing tasks with throttling yield of {} until timestamp {}", @@ -100,7 +93,6 @@ public final void punctuate(long timestamp) { totalProcessedWindows++; List events = kv.value; long windowMs = kv.key; - totalEventCount += events.size(); // collect all tasks to be rescheduled by key to perform bulk reschedules Map> rescheduledTasks = new HashMap<>(); // loop through all events for this key until yield timeout is reached @@ -144,16 +136,13 @@ public final void punctuate(long timestamp) { } } } - long timeTakenMs = clock.millis() - startTime; boolean yielded = shouldYieldNow(startTime); - updateTotalEventCountGauge(totalEventCount, yielded); - + publishMetrics(totalProcessedTasks, yielded); log.debug( - "processed windows: {}, processed tasks: {}, total events: {}, time taken: {}ms", + "processed windows: {}, processed tasks: {}, time taken: {}", totalProcessedWindows, totalProcessedTasks, - totalEventCount, - timeTakenMs); + clock.millis() - startTime); } protected abstract TaskResult executeTask(long punctuateTimestamp, T object); @@ -174,22 +163,14 @@ private long normalize(long timestamp) { return timestamp - (timestamp % config.getWindowMs()); } - private void updateTotalEventCountGauge(int totalEventCount, boolean yielded) { - if (meterRegistry == null) { - return; + private void publishMetrics(int totalProcessedTasks, boolean yielded) { + if (meterRegistry != null) { + String className = this.getClass().getSimpleName(); + meterRegistry + .counter("throttled.punctuator.processed.task.count", Tags.of("class", className)) + .increment(totalProcessedTasks); + meterRegistry.gauge( + "throttled.punctuator.yielded", Tags.of("class", className), yielded ? 1 : 0); } - - String tagValue = String.valueOf(yielded); - - AtomicInteger gauge = - totalEventCountGauge.computeIfAbsent( - tagValue, - key -> { - AtomicInteger newGauge = new AtomicInteger(0); - meterRegistry.gauge(TOTAL_EVENT_COUNT_GAUGE_NAME, Tags.of("yielded", key), newGauge); - return newGauge; - }); - - gauge.set(totalEventCount); } } From 424d68083b052bf1335cf2a7e82532e22b0c0a24 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Mon, 4 Aug 2025 13:22:02 +0530 Subject: [PATCH 06/10] pass punctuator name as constructor argument --- .../AbstractThrottledPunctuator.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 4bd3af4..640ab79 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -23,21 +23,28 @@ public abstract class AbstractThrottledPunctuator implements Punctuator { private final KeyValueStore> eventStore; private final ThrottledPunctuatorConfig config; private final MeterRegistry meterRegistry; + private final String punctuatorName; public AbstractThrottledPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore, - MeterRegistry meterRegistry) { + MeterRegistry meterRegistry, + String punctuatorName) { this.clock = clock; this.config = config; this.eventStore = eventStore; this.meterRegistry = meterRegistry; + this.punctuatorName = resolvePunctuatorName(punctuatorName); } public AbstractThrottledPunctuator( Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { - this(clock, config, eventStore, null); + this(clock, config, eventStore, null, null); + } + + private String resolvePunctuatorName(String name) { + return (name != null && !name.isBlank()) ? name : this.getClass().getSimpleName(); } public void scheduleTask(long scheduleMs, T event) { @@ -165,12 +172,12 @@ private long normalize(long timestamp) { private void publishMetrics(int totalProcessedTasks, boolean yielded) { if (meterRegistry != null) { - String className = this.getClass().getSimpleName(); meterRegistry - .counter("throttled.punctuator.processed.task.count", Tags.of("class", className)) + .counter( + "throttled.punctuator.processed.task.count", Tags.of("punctuator", punctuatorName)) .increment(totalProcessedTasks); meterRegistry.gauge( - "throttled.punctuator.yielded", Tags.of("class", className), yielded ? 1 : 0); + "throttled.punctuator.yielded", Tags.of("punctuator", punctuatorName), yielded ? 1 : 0); } } } From fc5e3dd2ab93eaff79414f9533e869a2a486b225 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Tue, 5 Aug 2025 10:09:30 +0530 Subject: [PATCH 07/10] upgrade commons-lang version --- kafka-streams-framework/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-streams-framework/build.gradle.kts b/kafka-streams-framework/build.gradle.kts index 692e2a4..31a31ba 100644 --- a/kafka-streams-framework/build.gradle.kts +++ b/kafka-streams-framework/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation("org.apache.kafka:kafka-clients") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.89") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.89") - implementation("org.apache.commons:commons-lang3:3.12.0") + implementation("org.apache.commons:commons-lang3:3.18.0") testCompileOnly("org.projectlombok:lombok:1.18.26") testAnnotationProcessor("org.projectlombok:lombok:1.18.26") From 2ab8332da48fdcdfed92583c4825350dd4ca10de Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Tue, 5 Aug 2025 14:47:26 +0530 Subject: [PATCH 08/10] upgrade grpc-utils and apache avro versions --- kafka-bom/build.gradle.kts | 2 +- kafka-streams-framework/build.gradle.kts | 2 +- .../weighted-group-partitioner/build.gradle.kts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-bom/build.gradle.kts b/kafka-bom/build.gradle.kts index 115344a..025e518 100644 --- a/kafka-bom/build.gradle.kts +++ b/kafka-bom/build.gradle.kts @@ -30,6 +30,6 @@ dependencies { api("org.apache.kafka:kafka-clients:$confluentCcsVersion") api("org.apache.kafka:kafka-streams:$confluentCcsVersion") api("org.apache.kafka:kafka-streams-test-utils:$confluentCcsVersion") - api("org.apache.avro:avro:1.11.4") + api("org.apache.avro:avro:1.12.0") } } diff --git a/kafka-streams-framework/build.gradle.kts b/kafka-streams-framework/build.gradle.kts index 31a31ba..69db434 100644 --- a/kafka-streams-framework/build.gradle.kts +++ b/kafka-streams-framework/build.gradle.kts @@ -18,7 +18,7 @@ dependencies { api(platform(project(":kafka-bom"))) api("org.apache.kafka:kafka-streams") api("io.confluent:kafka-streams-avro-serde") - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16") implementation("org.apache.avro:avro") implementation("org.apache.kafka:kafka-clients") diff --git a/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts b/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts index 6c2f360..0e75c5f 100644 --- a/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts +++ b/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { api(platform(project(":kafka-bom"))) api("org.apache.kafka:kafka-streams") - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16") api("com.typesafe:config:1.4.2") implementation("com.google.guava:guava:32.0.1-jre") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.13.14") From e9db7437e728fa7d3863139ddb4312f1675020a6 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Wed, 6 Aug 2025 11:56:21 +0530 Subject: [PATCH 09/10] upgrade dependency check version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index e7cc1d5..5df088b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { id("org.hypertrace.publish-plugin") version "1.1.1" apply false id("org.hypertrace.jacoco-report-plugin") version "0.3.0" apply false id("org.hypertrace.code-style-plugin") version "2.1.2" apply false - id("org.owasp.dependencycheck") version "12.1.0" + id("org.owasp.dependencycheck") version "12.1.3" } subprojects { From 37e2ccaf7993165e37c3f08627ee2b7c0750bd11 Mon Sep 17 00:00:00 2001 From: Bhuvan506 Date: Wed, 6 Aug 2025 12:44:52 +0530 Subject: [PATCH 10/10] add dependency constraint --- kafka-bom/build.gradle.kts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka-bom/build.gradle.kts b/kafka-bom/build.gradle.kts index 025e518..77e1eee 100644 --- a/kafka-bom/build.gradle.kts +++ b/kafka-bom/build.gradle.kts @@ -22,6 +22,10 @@ dependencies { api("org.apache.commons:commons-compress:1.26.0") { because("https://www.tenable.com/cve/CVE-2024-25710") } + api("org.apache.commons:commons-lang3:3.18.0") { + because("CVE-2025-48924 is fixed in 3.18.0") + } + api("io.confluent:kafka-streams-avro-serde:$confluentVersion") api("io.confluent:kafka-protobuf-serializer:$confluentVersion")