Skip to content

Commit 40196aa

Browse files
Consul transaction is not chunked — silent data loss above 64 operations
1 parent 8dfa469 commit 40196aa

2 files changed

Lines changed: 169 additions & 19 deletions

File tree

consul-populate-core/src/main/java/com/frogdevelopment/consul/populate/PopulateServiceImpl.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import lombok.RequiredArgsConstructor;
66
import lombok.extern.slf4j.Slf4j;
77

8+
import java.util.ArrayList;
89
import java.util.Map;
910
import java.util.function.Function;
1011
import java.util.function.Predicate;
1112
import java.util.stream.Collectors;
13+
1214
import jakarta.inject.Singleton;
1315

1416
import com.frogdevelopment.consul.populate.config.GlobalProperties;
@@ -17,6 +19,7 @@
1719
import io.vertx.ext.consul.TxnError;
1820
import io.vertx.ext.consul.TxnKVOperation;
1921
import io.vertx.ext.consul.TxnKVVerb;
22+
import io.vertx.ext.consul.TxnOperation;
2023
import io.vertx.ext.consul.TxnRequest;
2124

2225
/**
@@ -32,6 +35,9 @@ class PopulateServiceImpl implements PopulateService {
3235
private final GlobalProperties globalProperties;
3336
private final DataImporter dataImporter;
3437

38+
// Consul transaction API limit
39+
static final int MAX_OPERATIONS_PER_TXN = 64;
40+
3541
@Override
3642
public void populate() {
3743
try {
@@ -49,33 +55,43 @@ public void populate() {
4955
.stream()
5056
.collect(Collectors.toMap(entry -> kvPath + entry.getKey(), Map.Entry::getValue));
5157

52-
// Create/Update/Delete config
53-
final var txnRequest = new TxnRequest();
58+
// collect all SET operations
59+
final var operations = new ArrayList<TxnOperation>();
5460
configsToImport.entrySet()
5561
.stream()
5662
.map(toSetOperation())
57-
.forEach(txnRequest::addOperation);
63+
.forEach(operations::add);
5864

59-
// retrieve current configs in Consul KV
65+
// retrieve current configs in Consul KV and append DELETE operations for removed keys
6066
final var existingKeysInConsul = toBlocking(consulClient.getKeys(kvPath));
61-
62-
// keep only those that are to be deleted (no present anymore in the data pushed)
6367
existingKeysInConsul.stream()
6468
.filter(Predicate.not(configsToImport::containsKey))
6569
.map(toDeleteOperation())
66-
.forEach(txnRequest::addOperation);
67-
68-
log.info("Exporting data to consul");
69-
final var result = toBlocking(consulClient.transaction(txnRequest));
70-
log.info("succeeded results size: {}", result.getResultsSize());
71-
if (result.getErrorsSize() > 0) {
72-
log.error("Some operations ({}) lead to error:{}",
73-
result.getErrorsSize(),
74-
result.getErrors()
75-
.stream()
76-
.map(TxnError::getWhat)
77-
.collect(Collectors.joining("\n\t- "))
78-
);
70+
.forEach(operations::add);
71+
72+
log.info("Exporting data to consul ({} operations in {} transaction(s))",
73+
operations.size(), (operations.size() + MAX_OPERATIONS_PER_TXN - 1) / MAX_OPERATIONS_PER_TXN);
74+
75+
// Consul transactions are limited to MAX_OPERATIONS_PER_TXN operations each
76+
for (var i = 0; i < operations.size(); i += MAX_OPERATIONS_PER_TXN) {
77+
final var chunk = operations.subList(i, Math.min(i + MAX_OPERATIONS_PER_TXN, operations.size()));
78+
final var txnRequest = new TxnRequest();
79+
chunk.forEach(txnRequest::addOperation);
80+
81+
final var result = toBlocking(consulClient.transaction(txnRequest));
82+
log.info("Transaction {}/{} succeeded: {} result(s)",
83+
(i / MAX_OPERATIONS_PER_TXN) + 1,
84+
(operations.size() + MAX_OPERATIONS_PER_TXN - 1) / MAX_OPERATIONS_PER_TXN,
85+
result.getResultsSize());
86+
if (result.getErrorsSize() > 0) {
87+
log.error("Some operations ({}) lead to error:{}",
88+
result.getErrorsSize(),
89+
result.getErrors()
90+
.stream()
91+
.map(TxnError::getWhat)
92+
.collect(Collectors.joining("\n\t- "))
93+
);
94+
}
7995
}
8096
}
8197

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package com.frogdevelopment.consul.populate;
2+
3+
import static com.frogdevelopment.consul.populate.PopulateServiceImpl.MAX_OPERATIONS_PER_TXN;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.mockito.ArgumentMatchers.any;
6+
import static org.mockito.BDDMockito.given;
7+
import static org.mockito.Mockito.lenient;
8+
import static org.mockito.Mockito.times;
9+
import static org.mockito.Mockito.verify;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.IntStream;
15+
16+
import org.junit.jupiter.api.BeforeEach;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
import org.mockito.ArgumentCaptor;
20+
import org.mockito.Captor;
21+
import org.mockito.InjectMocks;
22+
import org.mockito.Mock;
23+
import org.mockito.junit.jupiter.MockitoExtension;
24+
25+
import com.frogdevelopment.consul.populate.config.GlobalProperties;
26+
27+
import io.vertx.core.Future;
28+
import io.vertx.ext.consul.ConsulClient;
29+
import io.vertx.ext.consul.TxnRequest;
30+
import io.vertx.ext.consul.TxnResponse;
31+
32+
@ExtendWith(MockitoExtension.class)
33+
class PopulateServiceImplTest {
34+
35+
@InjectMocks
36+
private PopulateServiceImpl populateService;
37+
38+
@Mock
39+
private ConsulClient consulClient;
40+
@Mock
41+
private GlobalProperties globalProperties;
42+
@Mock
43+
private DataImporter dataImporter;
44+
@Mock
45+
private TxnResponse txnResponse;
46+
47+
@Captor
48+
private ArgumentCaptor<TxnRequest> txnRequestCaptor;
49+
50+
@BeforeEach
51+
void setUp() {
52+
var kv = new GlobalProperties.KV();
53+
given(globalProperties.getKv()).willReturn(kv);
54+
given(consulClient.leaderStatus()).willReturn(Future.succeededFuture("leader"));
55+
given(consulClient.getKeys(any())).willReturn(Future.succeededFuture(List.of()));
56+
lenient().when(consulClient.transaction(any())).thenReturn(Future.succeededFuture(txnResponse));
57+
lenient().when(txnResponse.getResultsSize()).thenReturn(0);
58+
lenient().when(txnResponse.getErrorsSize()).thenReturn(0);
59+
}
60+
61+
@Test
62+
void should_send_single_transaction_when_operations_fit_in_one_batch() {
63+
// given
64+
given(dataImporter.execute()).willReturn(entriesOfSize(10));
65+
66+
// when
67+
populateService.populate();
68+
69+
// then
70+
verify(consulClient, times(1)).transaction(txnRequestCaptor.capture());
71+
assertThat(txnRequestCaptor.getValue().getOperationsSize()).isEqualTo(10);
72+
}
73+
74+
@Test
75+
void should_split_into_multiple_transactions_when_operations_exceed_limit() {
76+
// given — 65 keys → 2 transactions (64 + 1)
77+
given(dataImporter.execute()).willReturn(entriesOfSize(65));
78+
79+
// when
80+
populateService.populate();
81+
82+
// then
83+
verify(consulClient, times(2)).transaction(txnRequestCaptor.capture());
84+
var batches = txnRequestCaptor.getAllValues();
85+
assertThat(batches.get(0).getOperationsSize()).isEqualTo(MAX_OPERATIONS_PER_TXN);
86+
assertThat(batches.get(1).getOperationsSize()).isEqualTo(1);
87+
}
88+
89+
@Test
90+
void should_split_into_exactly_two_full_batches_when_operations_equal_twice_limit() {
91+
// given — 128 keys → 2 transactions of exactly 64
92+
given(dataImporter.execute()).willReturn(entriesOfSize(128));
93+
94+
// when
95+
populateService.populate();
96+
97+
// then
98+
verify(consulClient, times(2)).transaction(txnRequestCaptor.capture());
99+
txnRequestCaptor.getAllValues()
100+
.forEach(req -> assertThat(req.getOperationsSize()).isEqualTo(MAX_OPERATIONS_PER_TXN));
101+
}
102+
103+
@Test
104+
void should_include_delete_operations_for_keys_removed_from_source() {
105+
// given — 1 new key, 1 existing key no longer in source → 1 SET + 1 DELETE = 2 ops
106+
given(dataImporter.execute()).willReturn(Map.of("new-key", "value"));
107+
given(consulClient.getKeys(any())).willReturn(Future.succeededFuture(List.of("config/new-key", "config/old-key")));
108+
109+
// when
110+
populateService.populate();
111+
112+
// then
113+
verify(consulClient, times(1)).transaction(txnRequestCaptor.capture());
114+
assertThat(txnRequestCaptor.getValue().getOperationsSize()).isEqualTo(2);
115+
}
116+
117+
@Test
118+
void should_send_no_transaction_when_there_is_nothing_to_import_and_nothing_to_delete() {
119+
// given
120+
given(dataImporter.execute()).willReturn(Map.of());
121+
122+
// when
123+
populateService.populate();
124+
125+
// then
126+
verify(consulClient, times(0)).transaction(any());
127+
}
128+
129+
private static Map<String, String> entriesOfSize(int count) {
130+
return IntStream.range(0, count)
131+
.boxed()
132+
.collect(Collectors.toMap(i -> "key-" + i, i -> "value-" + i));
133+
}
134+
}

0 commit comments

Comments
 (0)