Skip to content

Commit 9ce2496

Browse files
committed
Fix batch read retry on digest mismatch
Batch reads marked the request as complete before verifying the digest of each returned entry. If one entry in the response failed digest verification, the operation attempted to retry on another replica, but the request had already been completed, so the retry response could be ignored and the batch read could hang or fail. Verify all entries in the batch before completing the request. On digest mismatch, leave the request incomplete, discard the whole response, and retry the same batch on the next replica. Only create LedgerEntryImpl instances after the full batch has passed digest verification, so no partially verified entries are retained. Add tests for retrying after a corrupt batch response and for the case where an earlier entry verifies successfully but a later entry fails digest verification.
1 parent 8b6671f commit 9ce2496

2 files changed

Lines changed: 164 additions & 14 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBu
112112
heardFromHosts.add(rctx.to);
113113
heardFromHostsBitSet.set(rctx.bookieIndex, true);
114114

115+
/*
116+
* Retain the response while this read op handles it. complete() returns true only when it
117+
* transfers the buffers into request.entries. For digest failures, duplicate responses, or
118+
* other incomplete paths, complete() returns false and this retained reference is released here.
119+
*/
115120
bufList.retain();
116121
// if entry has completed don't handle twice
117122
if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
@@ -160,32 +165,41 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) {
160165
if (isComplete()) {
161166
return false;
162167
}
163-
if (!complete.getAndSet(true)) {
168+
169+
/*
170+
* Verify the whole batch before creating LedgerEntryImpl instances. If any entry fails
171+
* digest verification, no partial entries are retained and readEntriesComplete() releases
172+
* the retained ByteBufList after this method returns false.
173+
*/
174+
for (int i = 0; i < bufList.size(); i++) {
175+
ByteBuf buffer = bufList.getBuffer(i);
176+
try {
177+
lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
178+
} catch (BKException.BKDigestMatchException e) {
179+
clientCtx.getClientStats().getReadOpDmCounter().inc();
180+
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
181+
BKException.Code.DigestMatchException);
182+
return false;
183+
}
184+
}
185+
186+
if (complete.compareAndSet(false, true)) {
187+
rc = BKException.Code.OK;
164188
for (int i = 0; i < bufList.size(); i++) {
165189
ByteBuf buffer = bufList.getBuffer(i);
166-
ByteBuf content;
167-
try {
168-
content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
169-
} catch (BKException.BKDigestMatchException e) {
170-
clientCtx.getClientStats().getReadOpDmCounter().inc();
171-
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
172-
BKException.Code.DigestMatchException);
173-
return false;
174-
}
175-
rc = BKException.Code.OK;
176190
/*
177191
* The length is a long and it is the last field of the metadata of an entry.
178192
* Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
179193
*/
180-
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
194+
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
181195
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
182-
entryImpl.setEntryBuf(content);
196+
entryImpl.setEntryBuf(buffer);
183197
entries.add(entryImpl);
184198
}
185199
writeSet.recycle();
186200
return true;
187201
} else {
188-
writeSet.recycle();
202+
// Another response completed the request first; readEntriesComplete() releases bufList.
189203
return false;
190204
}
191205
}

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,32 @@
2121
package org.apache.bookkeeper.client;
2222

2323
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
24+
import static org.junit.Assert.assertArrayEquals;
2425
import static org.junit.Assert.assertEquals;
2526
import static org.junit.Assert.assertFalse;
2627
import static org.junit.Assert.assertNotNull;
2728
import static org.junit.Assert.assertTrue;
2829
import static org.junit.Assert.fail;
2930

31+
import io.netty.buffer.ByteBuf;
32+
import java.io.IOException;
33+
import java.nio.charset.StandardCharsets;
3034
import java.util.Iterator;
3135
import java.util.List;
3236
import java.util.concurrent.CompletableFuture;
3337
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
import org.apache.bookkeeper.bookie.Bookie;
40+
import org.apache.bookkeeper.bookie.BookieException;
41+
import org.apache.bookkeeper.bookie.TestBookieImpl;
3442
import org.apache.bookkeeper.client.BKException.Code;
3543
import org.apache.bookkeeper.client.BookKeeper.DigestType;
3644
import org.apache.bookkeeper.client.api.LedgerEntry;
3745
import org.apache.bookkeeper.conf.ClientConfiguration;
46+
import org.apache.bookkeeper.conf.ServerConfiguration;
3847
import org.apache.bookkeeper.net.BookieId;
3948
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
49+
import org.awaitility.Awaitility;
4050
import org.junit.Test;
4151
import org.slf4j.Logger;
4252
import org.slf4j.LoggerFactory;
@@ -289,4 +299,130 @@ public void testReadFailureWithFailedBookies() throws Exception {
289299
lh.close();
290300
newBk.close();
291301
}
302+
303+
@Test
304+
public void testDigestMismatchRetriesNextReplicaAndCompletes() throws Exception {
305+
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
306+
.setUseV2WireProtocol(true)
307+
.setReorderReadSequenceEnabled(false)
308+
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
309+
310+
try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
311+
byte[] data = "batch-digest-data".getBytes(StandardCharsets.UTF_8);
312+
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
313+
writer.addEntry(data);
314+
long ledgerId = writer.getId();
315+
BookieId corruptReplica = writer.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
316+
writer.close();
317+
318+
ServerConfiguration corruptConf = killBookie(corruptReplica);
319+
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf));
320+
321+
LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd);
322+
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 1, 1024, false);
323+
readOp.submit();
324+
325+
Iterator<LedgerEntry> entries = readOp.future().get().iterator();
326+
assertTrue(entries.hasNext());
327+
LedgerEntry entry = entries.next();
328+
assertArrayEquals(data, entry.getEntryBytes());
329+
entry.close();
330+
assertFalse(entries.hasNext());
331+
reader.close();
332+
}
333+
}
334+
335+
@Test
336+
public void testDigestMismatchAfterPartialVerificationDoesNotRetainEntries() throws Exception {
337+
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
338+
.setUseV2WireProtocol(true)
339+
.setReorderReadSequenceEnabled(false)
340+
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
341+
342+
try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
343+
byte[] entry0 = "batch-digest-entry-0".getBytes(StandardCharsets.UTF_8);
344+
byte[] entry1 = "batch-digest-entry-1".getBytes(StandardCharsets.UTF_8);
345+
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
346+
writer.addEntry(entry0);
347+
writer.addEntry(entry1);
348+
long ledgerId = writer.getId();
349+
List<BookieId> ensemble = writer.getLedgerMetadata().getAllEnsembles().get(0L);
350+
BookieId corruptReplica = ensemble.get(0);
351+
BookieId retryReplica = ensemble.get(1);
352+
writer.close();
353+
354+
CountDownLatch corruptReadLatch = new CountDownLatch(1);
355+
ServerConfiguration corruptConf = killBookie(corruptReplica);
356+
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf, 1L, corruptReadLatch));
357+
358+
CountDownLatch retryLatch = new CountDownLatch(1);
359+
sleepBookie(retryReplica, retryLatch);
360+
361+
LedgerHandle reader = null;
362+
try {
363+
reader = bk.openLedger(ledgerId, digestType, passwd);
364+
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 2, 2048, false);
365+
readOp.submit();
366+
367+
assertTrue("corrupt replica did not read the corrupted entry",
368+
corruptReadLatch.await(10, TimeUnit.SECONDS));
369+
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
370+
assertNotNull(readOp.request);
371+
BatchedReadOp.SequenceReadRequest request =
372+
(BatchedReadOp.SequenceReadRequest) readOp.request;
373+
assertTrue(request.nextReplicaIndexToReadFrom >= 2);
374+
});
375+
assertTrue("digest mismatch must not retain partially verified entries",
376+
readOp.request.entries.isEmpty());
377+
378+
retryLatch.countDown();
379+
Iterator<LedgerEntry> entries = readOp.future().get(10, TimeUnit.SECONDS).iterator();
380+
assertTrue(entries.hasNext());
381+
LedgerEntry first = entries.next();
382+
assertArrayEquals(entry0, first.getEntryBytes());
383+
first.close();
384+
assertTrue(entries.hasNext());
385+
LedgerEntry second = entries.next();
386+
assertArrayEquals(entry1, second.getEntryBytes());
387+
second.close();
388+
assertFalse(entries.hasNext());
389+
} finally {
390+
retryLatch.countDown();
391+
if (reader != null) {
392+
reader.close();
393+
}
394+
}
395+
}
396+
}
397+
398+
static class CorruptReadBookie extends TestBookieImpl {
399+
private final long corruptEntryId;
400+
private final CountDownLatch corruptReadLatch;
401+
402+
CorruptReadBookie(ServerConfiguration conf) throws Exception {
403+
this(conf, -1L, null);
404+
}
405+
406+
CorruptReadBookie(ServerConfiguration conf, long corruptEntryId, CountDownLatch corruptReadLatch)
407+
throws Exception {
408+
super(conf);
409+
this.corruptEntryId = corruptEntryId;
410+
this.corruptReadLatch = corruptReadLatch;
411+
}
412+
413+
@Override
414+
public ByteBuf readEntry(long ledgerId, long entryId)
415+
throws IOException, Bookie.NoLedgerException, BookieException {
416+
ByteBuf localBuf = super.readEntry(ledgerId, entryId);
417+
if (corruptEntryId < 0 || corruptEntryId == entryId) {
418+
for (int i = 0; i < localBuf.capacity(); i++) {
419+
localBuf.setByte(i, 0);
420+
}
421+
if (corruptReadLatch != null) {
422+
corruptReadLatch.countDown();
423+
}
424+
}
425+
return localBuf;
426+
}
427+
}
292428
}

0 commit comments

Comments
 (0)