Skip to content

Commit 415fd97

Browse files
committed
Add parameter to acknowledge group events on conclude immediately in aggregate processor
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 20c50d7 commit 415fd97

4 files changed

Lines changed: 49 additions & 3 deletions

File tree

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.opensearch.dataprepper.plugins.processor.aggregate;
1111

1212
import com.google.common.collect.Maps;
13+
import org.opensearch.dataprepper.model.event.EventHandle;
1314
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
1415

1516
import java.time.Duration;
@@ -21,9 +22,11 @@ class AggregateGroupManager {
2122

2223
private final Map<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
2324
private final Duration groupDuration;
25+
private final boolean acknowledgeOnConclude;
2426

25-
AggregateGroupManager(final Duration groupDuration) {
27+
AggregateGroupManager(final Duration groupDuration, final boolean acknowledgeOnConclude) {
2628
this.groupDuration = groupDuration;
29+
this.acknowledgeOnConclude = acknowledgeOnConclude;
2730
}
2831

2932
AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
@@ -43,6 +46,13 @@ List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>>
4346

4447
void closeGroup(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
4548
allGroups.remove(hashKeyMap, group);
49+
50+
if (acknowledgeOnConclude) {
51+
EventHandle handle = group.getEventHandle();
52+
if (handle != null) {
53+
handle.release(true);
54+
}
55+
}
4656
group.resetGroup();
4757
}
4858

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
6060

6161
@DataPrepperPluginConstructor
6262
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) {
63-
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration()),
63+
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration(), aggregateProcessorConfig.getAcknowledgeOnConclude()),
6464
new IdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
6565
}
6666
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager,

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public class AggregateProcessorConfig {
8181
})
8282
private String whenCondition;
8383

84+
@JsonPropertyDescription("When set to true, releases the group's event handle when the group concludes. " +
85+
"This sacrifices true end-to-end acknowledgments for aggregated events but prevents reprocessing.")
86+
@JsonProperty("acknowledge_on_conclude")
87+
private Boolean acknowledgeOnConclude = false;
88+
8489
public List<String> getIdentificationKeys() {
8590
return identificationKeys;
8691
}
@@ -112,4 +117,8 @@ boolean isValidConfig() {
112117

113118
public PluginModel getAggregateAction() { return aggregateAction; }
114119

120+
public Boolean getAcknowledgeOnConclude() {
121+
return acknowledgeOnConclude;
122+
}
123+
115124
}

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.opensearch.dataprepper.plugins.processor.aggregate;
1111

12+
import org.opensearch.dataprepper.model.event.EventHandle;
1213
import org.junit.jupiter.api.BeforeEach;
1314
import org.junit.jupiter.api.Test;
1415
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
@@ -27,6 +28,7 @@
2728
import static org.hamcrest.MatcherAssert.assertThat;
2829
import static org.hamcrest.Matchers.is;
2930
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.verify;
3032
import static org.mockito.Mockito.when;
3133

3234
public class AggregateGroupManagerTest {
@@ -46,7 +48,7 @@ void setup() {
4648
}
4749

4850
private AggregateGroupManager createObjectUnderTest() {
49-
return new AggregateGroupManager(TEST_GROUP_DURATION);
51+
return new AggregateGroupManager(TEST_GROUP_DURATION, false);
5052
}
5153

5254
@Test
@@ -145,4 +147,29 @@ void getGroupsToConclude_with_force_conclude_return_all() {
145147
assertThat(groupsToConclude.get(1).getValue(), equalTo(groupToConclude1));
146148
}
147149
}
150+
151+
@Test
152+
void closeGroup_with_acknowledge_on_conclude_releases_event_handle() {
153+
aggregateGroupManager = new AggregateGroupManager(TEST_GROUP_DURATION, true);
154+
155+
final AggregateGroup group = mock(AggregateGroup.class);
156+
final EventHandle eventHandle = mock(EventHandle.class);
157+
when(group.getEventHandle()).thenReturn(eventHandle);
158+
159+
aggregateGroupManager.closeGroup(identificationKeysMap, group);
160+
161+
verify(eventHandle).release(true);
162+
verify(group).resetGroup();
163+
}
164+
165+
@Test
166+
void closeGroup_without_acknowledge_on_conclude_does_not_release_event_handle() {
167+
aggregateGroupManager = new AggregateGroupManager(TEST_GROUP_DURATION, false);
168+
169+
final AggregateGroup group = mock(AggregateGroup.class);
170+
171+
aggregateGroupManager.closeGroup(identificationKeysMap, group);
172+
173+
verify(group).resetGroup();
174+
}
148175
}

0 commit comments

Comments
 (0)