Skip to content

Commit 681d118

Browse files
authored
Fix flaky PipelinesWithAcksIT by awaiting ack callback (#6718)
The acknowledgement callback is invoked asynchronously after sink processing completes. Tests were asserting the ack result immediately after confirming sink output, causing intermittent NullPointerException when the callback had not yet fired. Wrap ack assertions in await().untilAsserted() to poll until the callback completes, matching the pattern already used in three_pipelines_with_all_unrouted_records(). Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 1ac64aa commit 681d118

1 file changed

Lines changed: 41 additions & 14 deletions

File tree

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
import static org.hamcrest.MatcherAssert.assertThat;
3131
import static org.hamcrest.Matchers.empty;
3232
import static org.hamcrest.Matchers.lessThanOrEqualTo;
33-
import static org.junit.Assert.assertFalse;
3433
import static org.junit.jupiter.api.Assertions.assertNotNull;
35-
import static org.junit.jupiter.api.Assertions.assertTrue;
3634

3735
@FixMethodOrder()
3836
class PipelinesWithAcksIT {
@@ -81,7 +79,10 @@ void simple_pipeline_with_single_record() {
8179
assertThat(outputRecords, not(empty()));
8280
assertThat(outputRecords.size(), equalTo(numRecords));
8381
});
84-
assertTrue(inMemorySourceAccessor.getAckReceived());
82+
await().atMost(40000, TimeUnit.MILLISECONDS)
83+
.untilAsserted(() -> {
84+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
85+
});
8586

8687
}
8788

@@ -97,7 +98,10 @@ void simple_pipeline_with_multiple_records() {
9798
assertThat(outputRecords, not(empty()));
9899
assertThat(outputRecords.size(), equalTo(numRecords));
99100
});
100-
assertTrue(inMemorySourceAccessor.getAckReceived());
101+
await().atMost(40000, TimeUnit.MILLISECONDS)
102+
.untilAsserted(() -> {
103+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
104+
});
101105
}
102106

103107
@Test
@@ -112,7 +116,10 @@ void two_pipelines_with_multiple_records() {
112116
assertThat(outputRecords, not(empty()));
113117
assertThat(outputRecords.size(), equalTo(numRecords));
114118
});
115-
assertTrue(inMemorySourceAccessor.getAckReceived());
119+
await().atMost(40000, TimeUnit.MILLISECONDS)
120+
.untilAsserted(() -> {
121+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
122+
});
116123
}
117124

118125
@Test
@@ -127,7 +134,10 @@ void three_pipelines_with_multiple_records() {
127134
assertThat(outputRecords, not(empty()));
128135
assertThat(outputRecords.size(), equalTo(numRecords));
129136
});
130-
assertTrue(inMemorySourceAccessor.getAckReceived());
137+
await().atMost(40000, TimeUnit.MILLISECONDS)
138+
.untilAsserted(() -> {
139+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
140+
});
131141
}
132142

133143
@Test
@@ -139,11 +149,10 @@ void three_pipelines_with_all_unrouted_records() {
139149
await().atMost(40000, TimeUnit.MILLISECONDS)
140150
.untilAsserted(() -> {
141151
assertNotNull(inMemorySourceAccessor);
142-
assertNotNull(inMemorySourceAccessor.getAckReceived());
152+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
143153
});
144154
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
145155
assertThat(outputRecords.size(), equalTo(0));
146-
assertTrue(inMemorySourceAccessor.getAckReceived());
147156
}
148157

149158
@Test
@@ -158,7 +167,10 @@ void three_pipelines_with_route_and_multiple_records() {
158167
assertThat(outputRecords, not(empty()));
159168
assertThat(outputRecords.size(), lessThanOrEqualTo(numRecords));
160169
});
161-
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
170+
await().atMost(40000, TimeUnit.MILLISECONDS)
171+
.untilAsserted(() -> {
172+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
173+
});
162174
}
163175

164176
@Test
@@ -174,7 +186,10 @@ void three_pipelines_with_default_route_and_multiple_records() {
174186
assertThat(outputRecords, not(empty()));
175187
assertThat(outputRecords.size(), equalTo(2*numRecords));
176188
});
177-
assertTrue(inMemorySourceAccessor.getAckReceived());
189+
await().atMost(40000, TimeUnit.MILLISECONDS)
190+
.untilAsserted(() -> {
191+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
192+
});
178193
}
179194

180195
@Test
@@ -189,7 +204,10 @@ void two_parallel_pipelines_multiple_records() {
189204
assertThat(outputRecords, not(empty()));
190205
assertThat(outputRecords.size(), equalTo(2*numRecords));
191206
});
192-
assertTrue(inMemorySourceAccessor.getAckReceived());
207+
await().atMost(40000, TimeUnit.MILLISECONDS)
208+
.untilAsserted(() -> {
209+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
210+
});
193211
}
194212

195213
@Test
@@ -204,7 +222,10 @@ void three_pipelines_multi_sink_multiple_records() {
204222
assertThat(outputRecords, not(empty()));
205223
assertThat(outputRecords.size(), equalTo(3*numRecords));
206224
});
207-
assertTrue(inMemorySourceAccessor.getAckReceived());
225+
await().atMost(40000, TimeUnit.MILLISECONDS)
226+
.untilAsserted(() -> {
227+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
228+
});
208229
}
209230

210231
@Test
@@ -219,7 +240,10 @@ void one_pipeline_three_sinks_multiple_records() {
219240
assertThat(outputRecords, not(empty()));
220241
assertThat(outputRecords.size(), equalTo(3*numRecords));
221242
});
222-
assertTrue(inMemorySourceAccessor.getAckReceived());
243+
await().atMost(40000, TimeUnit.MILLISECONDS)
244+
.untilAsserted(() -> {
245+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true));
246+
});
223247
}
224248

225249
@Test
@@ -250,6 +274,9 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
250274
assertThat(outputRecords, not(empty()));
251275
assertThat(outputRecords.size(), equalTo(3*numRecords));
252276
});
253-
assertFalse(inMemorySourceAccessor.getAckReceived());
277+
await().atMost(40000, TimeUnit.MILLISECONDS)
278+
.untilAsserted(() -> {
279+
assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(false));
280+
});
254281
}
255282
}

0 commit comments

Comments
 (0)