Skip to content

Commit 97b4fc5

Browse files
committed
fix(s3): pass appendPartBlobKeys' pending by pointer so flush observes appends
Gemini high — the previous round split cleanup paths onto s3MetaBatchOps but missed that `appendPartBlobKeys` still took `pending []*kv.Elem[kv.OP]` by value. The flush closure inside `cleanupManifestBlobs` captures the outer `pending` slice header; appends performed inside the helper update the local header only, so flush sees length 0 every iteration and silently no-ops. After the helper returns, the caller reassigns `pending` from the helper's return value — but by then the entire part's chunks have accumulated into one slice that bypasses the s3MetaBatchOps cap entirely. For a manifest with thousands of chunks (e.g. a 5 GiB-object cleanup with ~5120 chunks), the broken helper builds one giant batch and hands it to coordinator.Dispatch in a single call, re-introducing exactly the OOM / oversized-MsgApp risk the s3MetaBatchOps cap was meant to prevent. This is the same shape as the pre-PR-#636 behaviour s3ChunkBatchOps was tightening, just on the cleanup side. Fix: take `pending` by pointer (`*[]*kv.Elem[kv.OP]`) and write through it. Now appends inside the helper update the variable the flush closure reads, so threshold-triggered flushes fire correctly and the slice length never exceeds s3MetaBatchOps. Test: - TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps builds a part with 2 × s3MetaBatchOps + 7 chunks, calls the helper directly with a recording flush closure, and asserts flush fired exactly twice with batch sizes [s3MetaBatchOps, s3MetaBatchOps] plus 7 trailing entries left for the caller's tail flush. The test catches the bug: the value-passing version would record 0 flushes (closure always saw length 0) — the assertion `Equal(2, flushCalls)` would fail loudly. Build / vet / lint clean. All S3 + cleanup tests pass.
1 parent 5338c05 commit 97b4fc5

2 files changed

Lines changed: 72 additions & 7 deletions

File tree

adapter/s3.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2237,29 +2237,39 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
22372237
pending = pending[:0]
22382238
}
22392239
for _, part := range manifest.Parts {
2240-
var ok bool
2241-
if pending, ok = s.appendPartBlobKeys(pending, bucket, generation, objectKey, manifest.UploadID, part, flush); !ok {
2240+
if !s.appendPartBlobKeys(&pending, bucket, generation, objectKey, manifest.UploadID, part, flush) {
22422241
return
22432242
}
22442243
}
22452244
flush()
22462245
}
22472246

2248-
func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) ([]*kv.Elem[kv.OP], bool) {
2247+
// appendPartBlobKeys queues every blob-chunk Del for one manifest part
2248+
// onto *pending and triggers flush whenever the batch reaches
2249+
// s3MetaBatchOps. The slice is taken by pointer so that the caller's
2250+
// `flush` closure (which captures pending from the enclosing
2251+
// cleanupManifestBlobs scope) observes appends performed here. A
2252+
// previous value-passing version silently no-op'd flush — flush saw
2253+
// the outer `pending` whose header still pointed at length 0, and the
2254+
// helper accumulated every chunk into one batch on return, defeating
2255+
// the s3MetaBatchOps cap and re-opening the OOM / oversized-MsgApp
2256+
// risk the cap was meant to bound. See TestS3CleanupManifestBlobs
2257+
// _RespectsMetaBatchOps for the regression guard.
2258+
func (s *S3Server) appendPartBlobKeys(pending *[]*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) bool {
22492259
for chunkNo := range part.ChunkSizes {
22502260
chunkIndex, err := uint64FromInt(chunkNo)
22512261
if err != nil {
2252-
return pending, false
2262+
return false
22532263
}
2254-
pending = append(pending, &kv.Elem[kv.OP]{
2264+
*pending = append(*pending, &kv.Elem[kv.OP]{
22552265
Op: kv.Del,
22562266
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion),
22572267
})
2258-
if len(pending) >= s3MetaBatchOps {
2268+
if len(*pending) >= s3MetaBatchOps {
22592269
flush()
22602270
}
22612271
}
2262-
return pending, true
2272+
return true
22632273
}
22642274

22652275
//nolint:cyclop // Proxying depends on root, bucket, and object-level leadership decisions.

adapter/s3_chunk_batch_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"github.com/bootjp/elastickv/internal/s3keys"
8+
"github.com/bootjp/elastickv/kv"
89
pb "github.com/bootjp/elastickv/proto"
910
"github.com/stretchr/testify/require"
1011
"google.golang.org/protobuf/proto"
@@ -137,3 +138,57 @@ func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) {
137138
raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey),
138139
)
139140
}
141+
142+
// TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps is the
143+
// regression guard for the slice-by-value bug Gemini caught: a
144+
// previous version of appendPartBlobKeys took `pending` by value, so
145+
// the flush closure (captured from cleanupManifestBlobs's enclosing
146+
// scope) saw the outer slice header at length 0 and never fired,
147+
// silently accumulating every chunk into one giant batch. This test
148+
// pins the contract that the helper drains via flush exactly every
149+
// s3MetaBatchOps appends, never building a slice longer than the cap.
150+
func TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps(t *testing.T) {
151+
t.Parallel()
152+
153+
// Build a manifest part with chunkCount > 2× s3MetaBatchOps so the
154+
// flush closure must fire at least twice, plus a tail flush from
155+
// the caller's final flush() in cleanupManifestBlobs.
156+
const chunkCount = 2*s3MetaBatchOps + 7
157+
chunkSizes := make([]uint64, chunkCount)
158+
for i := range chunkSizes {
159+
chunkSizes[i] = 1
160+
}
161+
part := s3ObjectPart{
162+
PartNo: 1,
163+
PartVersion: 1,
164+
ChunkSizes: chunkSizes,
165+
}
166+
167+
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
168+
flushCalls := 0
169+
flushBatchSizes := make([]int, 0, 4)
170+
flush := func() {
171+
// Mirror cleanupManifestBlobs's flush: record the batch size
172+
// and then truncate. If the helper's pointer plumbing is
173+
// broken, len(pending) here would always be 0 and the
174+
// recorded batch sizes would never match s3MetaBatchOps.
175+
flushCalls++
176+
flushBatchSizes = append(flushBatchSizes, len(pending))
177+
pending = pending[:0]
178+
}
179+
180+
srv := (*S3Server)(nil) // method body does not touch s
181+
ok := srv.appendPartBlobKeys(&pending, "bucket", 1, "key", "upload", part, flush)
182+
require.True(t, ok)
183+
184+
// Exactly two threshold-triggered flushes inside the helper:
185+
// at append #s3MetaBatchOps and #2×s3MetaBatchOps. The 7-entry
186+
// remainder is left in pending for the caller's tail flush().
187+
require.Equal(t, 2, flushCalls,
188+
"expected flush to fire twice (at append %d and %d); slice-by-value bug regressed?",
189+
s3MetaBatchOps, 2*s3MetaBatchOps)
190+
require.Equal(t, []int{s3MetaBatchOps, s3MetaBatchOps}, flushBatchSizes,
191+
"each flush must drain exactly s3MetaBatchOps entries; pending must not silently overflow the cap")
192+
require.Len(t, pending, 7,
193+
"trailing 7 entries should remain for the caller's final flush()")
194+
}

0 commit comments

Comments
 (0)