Skip to content

Commit 466e554

Browse files
authored
perf(s3): align Raft entry size with MaxSizePerMsg via s3ChunkBatchOps=4 (#636)
## Summary S3 PutObject の Raft entry サイズを `MaxSizePerMsg` (PR #593 で 4 MiB) と整列させます。 **変更:** `s3ChunkBatchOps = 16 → 4`。1 Raft entry = `s3ChunkBatchOps × s3ChunkSize ≒ 4 MiB` に揃える。1 行差分。 ## Why `etcd/raft` の `util.go:limitSize` には「単一 entry が `MaxSizePerMsg` を超えていても、その entry だけは reject せず単独で MsgApp に載せる」という documented exception があります (`"if the size of the first entry exceeds maxSize, a non-empty slice with just this entry is returned"`). 16 MiB entry はこの経路で素通りするので: | 項目 | s3ChunkBatchOps=16 | s3ChunkBatchOps=4 (本 PR) | |---|---|---| | 1 Raft entry size | ~16 MiB | ~4 MiB | | Leader worst-case / peer | 1024 × 16 MiB = **16 GiB** | 1024 × 4 MiB = **4 GiB** | | 3-node cluster (leader) | 32 GiB | 8 GiB | | WAL fsync per entry | 16 MiB | 4 MiB | | 5 GiB PUT の Raft commit 数 | 320 | 1280 (4×) | PR #593 の本文が謳う `1024 × 4 MiB = 4 GiB / peer` の memory bound は **小エントリ前提**で、S3 経路では成立していませんでした。本 PR で S3 が普通の batched MsgApp 経路に乗るようになり、bound が S3 込みでも正確になります。 Raft commit 数が 4× に増える代わりに、各 fsync が 4× 速くなるので、PR #600 の WAL group commit と相殺されてエンドツーエンド throughput はほぼ同等のはずです。 ## Test plan - [x] `go build ./...` clean - [x] `golangci-lint run ./...` 0 issues - [x] `go test ./adapter/ -short -run 'TestS3|S3Server'` pass - [ ] CI ## Follow-ups (別 PR で design doc) 1. **S3 admission control** — 同時並行の S3 PUT body bytes に hard cap を入れて、クライアント並列度が上がっても leader-side memory が `4 GiB × peers` を超えないようにする。 2. **Raft snapshot 戦略 / blob bypass** — follower fall-behind 時の snapshot transfer に 5 GiB blob が乗ると不都合なので、blob を Raft 経路から外して manifest だけ raft で同期するアーキテクチャ。 /gemini review @codex review
2 parents dbe4725 + 97b4fc5 commit 466e554

2 files changed

Lines changed: 253 additions & 18 deletions

File tree

adapter/s3.go

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,44 @@ const (
3535
s3LeaderHealthPath = "/healthz/leader"
3636
s3HealthMaxRequestBodyBytes = 1024
3737
s3ChunkSize = 1 << 20
38-
s3ChunkBatchOps = 16
39-
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
40-
s3DefaultRegion = "us-east-1"
41-
s3MaxKeys = 1000
42-
s3ListPageSize = 256
43-
s3ManifestCleanupTimeout = 2 * time.Minute
44-
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.
38+
// s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single
39+
// coordinator.Dispatch call on the data-write path
40+
// (PutObject / UploadPart). Sized so the resulting Raft entry stays
41+
// strictly under the post-PR-#593 default `MaxSizePerMsg = 4 MiB`
42+
// even after protobuf framing overhead — each pb.Mutation carries
43+
// the Op tag, Key tag + bytes, and Value length prefix; the
44+
// pb.Request envelope wraps them; marshalRaftCommand prepends one
45+
// byte. Empirically the per-Mutation overhead is ~60 B for normal
46+
// keys and grows linearly with the bucket / objectKey length, so
47+
// `4 × 1 MiB = 4 MiB` exactly is *over* MaxSizePerMsg in practice
48+
// and falls into etcd/raft's util.go:limitSize oversized-first-
49+
// entry path, bypassing the documented
50+
// `MaxInflight × MaxSizePerMsg` per-peer memory bound. Capping at
51+
// `3 × 1 MiB ≈ 3 MiB + few hundred bytes` leaves ~1 MiB of headroom
52+
// even with kilobyte-scale object keys, so the entry rides the
53+
// normal batched-MsgApp path and the bound holds. Per-PUT Raft
54+
// commit count grows ~5× from the pre-PR-#636 baseline (a 5 GiB
55+
// PUT goes from 320 → ~1707 entries) but each fsync is ~5×
56+
// smaller; the WAL group commit landed in PR #600 absorbs the
57+
// higher commit rate. See TestS3ChunkBatchFitsInRaftMaxSize
58+
// for the encoded-size invariant.
59+
s3ChunkBatchOps = 3
60+
// s3MetaBatchOps batches key-only Del / scan ops on cleanup paths
61+
// (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs).
62+
// These ops carry no chunk payload, so the MaxSizePerMsg cap
63+
// translates to a pure key-count budget: 64 BlobKey-shaped keys
64+
// × ~100 B each ≈ 6 KiB per batch, three orders of magnitude
65+
// under the 4 MiB limit. Keeping this batch large means a
66+
// 5 GiB-object cleanup commits ~80 batches instead of ~1707, so
67+
// orphaned-blob garbage collection finishes proportionally faster
68+
// and does not amplify Raft load relative to the data-write path.
69+
s3MetaBatchOps = 64
70+
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
71+
s3DefaultRegion = "us-east-1"
72+
s3MaxKeys = 1000
73+
s3ListPageSize = 256
74+
s3ManifestCleanupTimeout = 2 * time.Minute
75+
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.
4576

4677
s3TxnRetryInitialBackoff = 2 * time.Millisecond
4778
s3TxnRetryMaxBackoff = 32 * time.Millisecond
@@ -1876,7 +1907,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
18761907
defer func() { <-s.cleanupSem }()
18771908
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
18781909
defer cancel()
1879-
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
1910+
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
18801911
flush := func() {
18811912
if len(pending) == 0 {
18821913
return
@@ -1897,7 +1928,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
18971928
Op: kv.Del,
18981929
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion),
18991930
})
1900-
if len(pending) >= s3ChunkBatchOps {
1931+
if len(pending) >= s3MetaBatchOps {
19011932
flush()
19021933
}
19031934
}
@@ -1930,7 +1961,7 @@ func (s *S3Server) deleteByPrefix(ctx context.Context, prefix []byte, bucket str
19301961
for {
19311962
readTS := s.readTS()
19321963
readPin := s.pinReadTS(readTS)
1933-
kvs, err := s.store.ScanAt(ctx, cursor, end, s3ChunkBatchOps, readTS)
1964+
kvs, err := s.store.ScanAt(ctx, cursor, end, s3MetaBatchOps, readTS)
19341965
readPin.Release()
19351966
if err != nil {
19361967
slog.ErrorContext(ctx, "deleteByPrefix: scan failed",
@@ -2189,7 +2220,7 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
21892220
if s == nil || manifest == nil || manifest.UploadID == "" || s.coordinator == nil {
21902221
return
21912222
}
2192-
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
2223+
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
21932224
flush := func() {
21942225
if len(pending) == 0 {
21952226
return
@@ -2206,29 +2237,39 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
22062237
pending = pending[:0]
22072238
}
22082239
for _, part := range manifest.Parts {
2209-
var ok bool
2210-
if pending, ok = s.appendPartBlobKeys(pending, bucket, generation, objectKey, manifest.UploadID, part, flush); !ok {
2240+
if !s.appendPartBlobKeys(&pending, bucket, generation, objectKey, manifest.UploadID, part, flush) {
22112241
return
22122242
}
22132243
}
22142244
flush()
22152245
}
22162246

2217-
func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) ([]*kv.Elem[kv.OP], bool) {
2247+
// appendPartBlobKeys queues every blob-chunk Del for one manifest part
2248+
// onto *pending and triggers flush whenever the batch reaches
2249+
// s3MetaBatchOps. The slice is taken by pointer so that the caller's
2250+
// `flush` closure (which captures pending from the enclosing
2251+
// cleanupManifestBlobs scope) observes appends performed here. A
2252+
// previous value-passing version silently no-op'd flush — flush saw
2253+
// the outer `pending` whose header still pointed at length 0, and the
2254+
// helper accumulated every chunk into one batch on return, defeating
2255+
// the s3MetaBatchOps cap and re-opening the OOM / oversized-MsgApp
2256+
// risk the cap was meant to bound. See TestS3CleanupManifestBlobs
2257+
// _RespectsMetaBatchOps for the regression guard.
2258+
func (s *S3Server) appendPartBlobKeys(pending *[]*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) bool {
22182259
for chunkNo := range part.ChunkSizes {
22192260
chunkIndex, err := uint64FromInt(chunkNo)
22202261
if err != nil {
2221-
return pending, false
2262+
return false
22222263
}
2223-
pending = append(pending, &kv.Elem[kv.OP]{
2264+
*pending = append(*pending, &kv.Elem[kv.OP]{
22242265
Op: kv.Del,
22252266
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion),
22262267
})
2227-
if len(pending) >= s3ChunkBatchOps {
2268+
if len(*pending) >= s3MetaBatchOps {
22282269
flush()
22292270
}
22302271
}
2231-
return pending, true
2272+
return true
22322273
}
22332274

22342275
//nolint:cyclop // Proxying depends on root, bucket, and object-level leadership decisions.

adapter/s3_chunk_batch_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package adapter
2+
3+
import (
4+
"strings"
5+
"testing"
6+
7+
"github.com/bootjp/elastickv/internal/s3keys"
8+
"github.com/bootjp/elastickv/kv"
9+
pb "github.com/bootjp/elastickv/proto"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/protobuf/proto"
12+
)
13+
14+
// raftMaxSizePerMsgPostPR593 is the post-PR-#593 default for
15+
// etcd/raft's MaxSizePerMsg setting. Hardcoded here (rather than
16+
// imported from internal/raftengine/etcd) so the test does not pull
17+
// the engine package; the value is intentionally duplicated and pinned
18+
// because the entire point of this test is to detect when an S3 batch
19+
// silently grows past it.
20+
const raftMaxSizePerMsgPostPR593 = 4 << 20
21+
22+
// TestS3ChunkBatchFitsInRaftMaxSize is the byte-budget invariant the
23+
// s3ChunkBatchOps comment advertises: a worst-case S3 PutObject /
24+
// UploadPart batch must encode strictly under
25+
// raftMaxSizePerMsgPostPR593, plus the 1-byte raft framing prefix
26+
// added by marshalRaftCommand.
27+
//
28+
// Without this guard, raising s3ChunkBatchOps or growing s3ChunkSize
29+
// would silently route Raft entries through etcd/raft's
30+
// oversized-first-entry path (util.go:limitSize, the "as an
31+
// exception, if the size of the first entry exceeds maxSize, a
32+
// non-empty slice with just this entry is returned" branch), which
33+
// inflates the documented `MaxInflight × MaxSizePerMsg` per-peer
34+
// memory bound.
35+
func TestS3ChunkBatchFitsInRaftMaxSize(t *testing.T) {
36+
t.Parallel()
37+
38+
// Worst-case key: a kilobyte-scale objectKey amplifies the
39+
// per-Mutation envelope. Choose 1 KiB to model a deeply nested
40+
// S3 path; longer keys are unusual but the headroom is generous.
41+
bucket := "test-bucket"
42+
objectKey := strings.Repeat("a", 1024)
43+
uploadID := "upload-12345678901234567890"
44+
const generation uint64 = 1
45+
const partNo uint64 = 1
46+
47+
// Fill the chunk value with non-zero bytes so protobuf does not
48+
// elide trailing zeros and underestimate the encoded size.
49+
value := make([]byte, s3ChunkSize)
50+
for i := range value {
51+
value[i] = 0xAB
52+
}
53+
54+
muts := make([]*pb.Mutation, 0, s3ChunkBatchOps)
55+
for i := uint64(0); i < uint64(s3ChunkBatchOps); i++ {
56+
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
57+
muts = append(muts, &pb.Mutation{
58+
Op: pb.Op_PUT,
59+
Key: key,
60+
Value: value,
61+
})
62+
}
63+
64+
req := &pb.Request{
65+
IsTxn: false,
66+
Phase: pb.Phase_NONE,
67+
Ts: 1234567890,
68+
Mutations: muts,
69+
}
70+
71+
encoded, err := proto.Marshal(req)
72+
require.NoError(t, err)
73+
74+
// marshalRaftCommand prepends one framing byte (raftEncodeSingle
75+
// or raftEncodeBatch). Account for it explicitly.
76+
const raftFramingPrefix = 1
77+
totalEntrySize := len(encoded) + raftFramingPrefix
78+
79+
require.Lessf(t,
80+
totalEntrySize, raftMaxSizePerMsgPostPR593,
81+
"S3 chunk batch entry must fit strictly under MaxSizePerMsg=%d to avoid the etcd/raft oversized-first-entry path; got %d (s3ChunkBatchOps=%d, s3ChunkSize=%d, objectKey=%dB)",
82+
raftMaxSizePerMsgPostPR593, totalEntrySize, s3ChunkBatchOps, s3ChunkSize, len(objectKey),
83+
)
84+
85+
// Sanity: the headroom should be meaningful (at least 64 KiB) so
86+
// future small bumps in key length or Request envelope fields do
87+
// not silently push past the limit. This is the constant we
88+
// document in the s3ChunkBatchOps comment.
89+
const minHeadroom = 64 << 10
90+
require.Greaterf(t,
91+
raftMaxSizePerMsgPostPR593-totalEntrySize, minHeadroom,
92+
"S3 chunk batch headroom under MaxSizePerMsg has fallen below %d B (got %d B); reduce s3ChunkBatchOps or s3ChunkSize",
93+
minHeadroom, raftMaxSizePerMsgPostPR593-totalEntrySize,
94+
)
95+
}
96+
97+
// TestS3MetaBatchFitsInRaftMaxSize is the same byte-budget invariant
98+
// for the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix,
99+
// cleanupManifestBlobs). These ops carry no chunk payload so the
100+
// batch is dominated by key bytes; even at the worst-case key length
101+
// the total stays well under the cap. The test pins the headroom
102+
// margin so a future bump in s3MetaBatchOps that pushes too far is
103+
// caught at PR time.
104+
func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) {
105+
t.Parallel()
106+
107+
bucket := "test-bucket"
108+
objectKey := strings.Repeat("a", 1024)
109+
uploadID := "upload-12345678901234567890"
110+
const generation uint64 = 1
111+
const partNo uint64 = 1
112+
113+
muts := make([]*pb.Mutation, 0, s3MetaBatchOps)
114+
for i := uint64(0); i < uint64(s3MetaBatchOps); i++ {
115+
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
116+
muts = append(muts, &pb.Mutation{
117+
Op: pb.Op_DEL,
118+
Key: key,
119+
})
120+
}
121+
122+
req := &pb.Request{
123+
IsTxn: false,
124+
Phase: pb.Phase_NONE,
125+
Ts: 1234567890,
126+
Mutations: muts,
127+
}
128+
129+
encoded, err := proto.Marshal(req)
130+
require.NoError(t, err)
131+
132+
const raftFramingPrefix = 1
133+
totalEntrySize := len(encoded) + raftFramingPrefix
134+
135+
require.Lessf(t,
136+
totalEntrySize, raftMaxSizePerMsgPostPR593,
137+
"S3 meta batch entry must fit under MaxSizePerMsg=%d; got %d (s3MetaBatchOps=%d, objectKey=%dB)",
138+
raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey),
139+
)
140+
}
141+
142+
// TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps is the
143+
// regression guard for the slice-by-value bug Gemini caught: a
144+
// previous version of appendPartBlobKeys took `pending` by value, so
145+
// the flush closure (captured from cleanupManifestBlobs's enclosing
146+
// scope) saw the outer slice header at length 0 and never fired,
147+
// silently accumulating every chunk into one giant batch. This test
148+
// pins the contract that the helper drains via flush exactly every
149+
// s3MetaBatchOps appends, never building a slice longer than the cap.
150+
func TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps(t *testing.T) {
151+
t.Parallel()
152+
153+
// Build a manifest part with chunkCount > 2× s3MetaBatchOps so the
154+
// flush closure must fire at least twice, plus a tail flush from
155+
// the caller's final flush() in cleanupManifestBlobs.
156+
const chunkCount = 2*s3MetaBatchOps + 7
157+
chunkSizes := make([]uint64, chunkCount)
158+
for i := range chunkSizes {
159+
chunkSizes[i] = 1
160+
}
161+
part := s3ObjectPart{
162+
PartNo: 1,
163+
PartVersion: 1,
164+
ChunkSizes: chunkSizes,
165+
}
166+
167+
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
168+
flushCalls := 0
169+
flushBatchSizes := make([]int, 0, 4)
170+
flush := func() {
171+
// Mirror cleanupManifestBlobs's flush: record the batch size
172+
// and then truncate. If the helper's pointer plumbing is
173+
// broken, len(pending) here would always be 0 and the
174+
// recorded batch sizes would never match s3MetaBatchOps.
175+
flushCalls++
176+
flushBatchSizes = append(flushBatchSizes, len(pending))
177+
pending = pending[:0]
178+
}
179+
180+
srv := (*S3Server)(nil) // method body does not touch s
181+
ok := srv.appendPartBlobKeys(&pending, "bucket", 1, "key", "upload", part, flush)
182+
require.True(t, ok)
183+
184+
// Exactly two threshold-triggered flushes inside the helper:
185+
// at append #s3MetaBatchOps and #2×s3MetaBatchOps. The 7-entry
186+
// remainder is left in pending for the caller's tail flush().
187+
require.Equal(t, 2, flushCalls,
188+
"expected flush to fire twice (at append %d and %d); slice-by-value bug regressed?",
189+
s3MetaBatchOps, 2*s3MetaBatchOps)
190+
require.Equal(t, []int{s3MetaBatchOps, s3MetaBatchOps}, flushBatchSizes,
191+
"each flush must drain exactly s3MetaBatchOps entries; pending must not silently overflow the cap")
192+
require.Len(t, pending, 7,
193+
"trailing 7 entries should remain for the caller's final flush()")
194+
}

0 commit comments

Comments
 (0)