From a5ea66fe3e1679d996c3d87c5004fd70b17b62bf Mon Sep 17 00:00:00 2001 From: Annie Pompa Date: Mon, 9 Feb 2026 11:21:24 -0500 Subject: [PATCH] internal/manifest: add tiering info to blob references Previously, blob references did not store which tier (hot or cold) the blob file was written to. This patch adds a Tier field to both BlobReference and PhysicalBlobFile. The tier is now persisted in version edits using new tags (tagNewBlobFile2 and customTagBlobReferences3) that include tier information in the encoding. Decoding of old formats defaults to HotTier for backward compatibility. WriterOptions.BlobReferenceTierGetter is replaced with BlobReferenceTiers, a slice indexed by reference ID. --- internal/base/internal.go | 5 + internal/manifest/blob_metadata.go | 21 ++++ .../manifest/testdata/version_edit_decode | 8 +- internal/manifest/version_edit.go | 59 ++++++---- internal/sstableinternal/options.go | 7 ++ replay/testdata/corpus/simple_val_sep | 2 +- replay/testdata/replay_val_sep | 8 +- sstable/colblk/data_block.go | 21 +++- sstable/colblk/data_block_meta_test.go | 9 +- sstable/colblk_writer.go | 7 +- sstable/options.go | 12 -- sstable/reader_blob_test.go | 103 ++++++++++++++++++ sstable/test_utils.go | 2 +- sstable/values.go | 21 ++++ sstable/writer_test.go | 29 +++-- testdata/metrics | 4 +- valsep/value_separator.go | 27 ++++- 17 files changed, 276 insertions(+), 69 deletions(-) create mode 100644 sstable/reader_blob_test.go diff --git a/internal/base/internal.go b/internal/base/internal.go index 5ecabbfb4cc..4b602f51f85 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -678,6 +678,11 @@ type InternalKV struct { type KVMeta struct { TieringSpanID TieringSpanID TieringAttribute TieringAttribute + // SecondaryBlobHandle holds the raw encoded secondary (cold tier) blob + // handle for dual-tier blob values. It is nil when the value does not have + // a secondary blob handle. The raw bytes can be decoded using + // blob.DecodeInlineHandlePreface and blob.DecodeHandleSuffix. + SecondaryBlobHandle []byte } func (m KVMeta) String() string { diff --git a/internal/manifest/blob_metadata.go b/internal/manifest/blob_metadata.go index fd0792e8bc4..dd331cfd6d1 100644 --- a/internal/manifest/blob_metadata.go +++ b/internal/manifest/blob_metadata.go @@ -46,6 +46,9 @@ type BlobReference struct { // size according to the ValueSize of the blob reference relative to the // total ValueSize of the blob file. EstimatedPhysicalSize uint64 + // Tier indicates the storage tier (hot or cold) where the referenced blob + // file was written. + Tier base.StorageTier } // MakeBlobReference creates a BlobReference from the given file ID, value size, @@ -74,6 +77,7 @@ func MakeBlobReference( // // We perform the multiplication first to avoid floating point arithmetic. EstimatedPhysicalSize: (valueSize * phys.Size) / phys.ValueSize, + Tier: phys.Tier, } } @@ -130,6 +134,9 @@ type PhysicalBlobFile struct { // File creation time in seconds since the epoch (1970-01-01 00:00:00 // UTC). CreationTime uint64 + // Tier indicates the storage tier (hot or cold) where this blob file was + // written. + Tier base.StorageTier // Mutable state @@ -151,6 +158,10 @@ func (m *PhysicalBlobFile) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("%s size:[%d (%s)] vals:[%d (%s)]", m.FileNum, redact.Safe(m.Size), humanize.Bytes.Uint64(m.Size), redact.Safe(m.ValueSize), humanize.Bytes.Uint64(m.ValueSize)) + // Only show tier if it's not the default (HotTier) for backward compatibility. + if m.Tier != base.HotTier { + w.Printf(" tier:cold") + } } // String implements fmt.Stringer. @@ -294,6 +305,16 @@ func parsePhysicalBlobFileDebug(p *strparse.Parser) (*PhysicalBlobFile, error) { p.Expect("]") case "creationTime": m.CreationTime = p.Uint64() + case "tier": + tierStr := p.Next() + switch tierStr { + case "hot": + m.Tier = base.HotTier + case "cold": + m.Tier = base.ColdTier + default: + p.Errf("unknown tier %q", tierStr) + } default: p.Errf("unknown field %q", field) } diff --git a/internal/manifest/testdata/version_edit_decode b/internal/manifest/testdata/version_edit_decode index 50d74928034..efcf7ea3186 100644 --- a/internal/manifest/testdata/version_edit_decode +++ b/internal/manifest/testdata/version_edit_decode @@ -83,8 +83,8 @@ encode excise-op: [b, c] #3 ---- 67061d000b626172000e0000000000000b666f6f010d0000000000000000 -45010129d8a301016baf0729d8a301f9a006006c21210a0162016301030b -0537 +47010129d8a3010000016daf0729d8a301f9a00600006c21210a01620163 +01030b0537 add-table: L6 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952); depth:1] add-blob-file: B000943 physical:{000041 size:[20952 (20KB)] vals:[102521 (100KB)]} del-blob-file: B000033 000033 @@ -110,7 +110,7 @@ encode add-backing: 000009 ---- 69096467031d000b626172000e0000000000000b666f6f010d0000000000 -000000420946010129d8a301e8f10101 +000000420947010129d8a301e8f1010001 add-table: L3 000029(000009):[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1] add-backing: 000009 @@ -128,7 +128,7 @@ encode add-table: L3 000029:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952 / 30952); depth:1] ---- 67031d000b626172000e0000000000000b666f6f010d0000000000000000 -45010129d8a30101 +47010129d8a301e8f1010001 add-table: L3 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1] diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 49b20d4ea29..6d3491f1192 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -62,6 +62,7 @@ const ( tagRemovedBackingTable = 106 tagNewBlobFile = 107 tagDeletedBlobFile = 108 + tagNewBlobFile2 = 109 // The custom tags sub-format used by tagNewFile4 and above. All tags less // than customTagNonSafeIgnoreMask are safe to ignore and their format must be @@ -77,6 +78,8 @@ const ( customTagBlobReferences = 69 // customTagBlobReferences2 contains BackingValueSize for each BlobReference. customTagBlobReferences2 = 70 + // customTagBlobReferences3 contains BackingValueSize and Tier for each BlobReference. + customTagBlobReferences3 = 71 ) // DeletedTableEntry holds the state for a sstable deletion from a level. The @@ -467,7 +470,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { return err } - case customTagBlobReferences, customTagBlobReferences2: + case customTagBlobReferences, customTagBlobReferences2, customTagBlobReferences3: // The first varint encodes the 'blob reference depth' // of the table. v, err := d.readUvarint() @@ -490,16 +493,31 @@ func (v *VersionEdit) Decode(r io.Reader) error { return err } var backingValueSize uint64 - if customTag == customTagBlobReferences2 { + if customTag == customTagBlobReferences2 || customTag == customTagBlobReferences3 { backingValueSize, err = d.readUvarint() if err != nil { return err } } + var tier base.StorageTier + if customTag == customTagBlobReferences3 { + tierValue, err := d.readUvarint() + if err != nil { + return err + } + if tierValue >= uint64(base.NumStorageTiers) { + return base.CorruptionErrorf("new-file4: invalid tier value: %d", tierValue) + } + tier = base.StorageTier(tierValue) + } else { + // For backward compatibility, default to HotTier. + tier = base.HotTier + } blobReferences[i] = BlobReference{ FileID: base.BlobFileID(fileID), ValueSize: valueSize, BackingValueSize: backingValueSize, + Tier: tier, } } continue @@ -566,7 +584,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { } v.NewTables = append(v.NewTables, nfe) - case tagNewBlobFile: + case tagNewBlobFile, tagNewBlobFile2: fileID, err := d.readUvarint() if err != nil { return err @@ -587,6 +605,17 @@ func (v *VersionEdit) Decode(r io.Reader) error { if err != nil { return err } + var tier base.StorageTier + if tag == tagNewBlobFile2 { + tierValue, err := d.readUvarint() + if err != nil { + return err + } + tier = base.StorageTier(tierValue) + } else { + // For backward compatibility with tagNewBlobFile, default to HotTier. + tier = base.HotTier + } v.NewBlobFiles = append(v.NewBlobFiles, BlobFileMetadata{ FileID: base.BlobFileID(fileID), Physical: &PhysicalBlobFile{ @@ -594,6 +623,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { Size: size, ValueSize: valueSize, CreationTime: creationTime, + Tier: tier, }, }) @@ -948,40 +978,27 @@ func (v *VersionEdit) Encode(w io.Writer) error { e.writeBytes(x.Meta.SyntheticPrefixAndSuffix.Suffix()) } if len(x.Meta.BlobReferences) > 0 { - writeBackingValueSize := false - if x.Meta.Virtual { - for _, ref := range x.Meta.BlobReferences { - if ref.BackingValueSize > 0 && ref.BackingValueSize != ref.ValueSize { - writeBackingValueSize = true - break - } - } - } - if writeBackingValueSize { - e.writeUvarint(customTagBlobReferences2) - } else { - e.writeUvarint(customTagBlobReferences) - } + e.writeUvarint(customTagBlobReferences3) e.writeUvarint(uint64(x.Meta.BlobReferenceDepth)) e.writeUvarint(uint64(len(x.Meta.BlobReferences))) for _, ref := range x.Meta.BlobReferences { e.writeUvarint(uint64(ref.FileID)) e.writeUvarint(ref.ValueSize) - if writeBackingValueSize { - e.writeUvarint(ref.BackingValueSize) - } + e.writeUvarint(ref.BackingValueSize) + e.writeUvarint(uint64(ref.Tier)) } } e.writeUvarint(customTagTerminate) } } for _, x := range v.NewBlobFiles { - e.writeUvarint(tagNewBlobFile) + e.writeUvarint(tagNewBlobFile2) e.writeUvarint(uint64(x.FileID)) e.writeUvarint(uint64(x.Physical.FileNum)) e.writeUvarint(x.Physical.Size) e.writeUvarint(x.Physical.ValueSize) e.writeUvarint(x.Physical.CreationTime) + e.writeUvarint(uint64(x.Physical.Tier)) } for x := range v.DeletedBlobFiles { e.writeUvarint(tagDeletedBlobFile) diff --git a/internal/sstableinternal/options.go b/internal/sstableinternal/options.go index 475b72fc334..d593a6e8616 100644 --- a/internal/sstableinternal/options.go +++ b/internal/sstableinternal/options.go @@ -33,4 +33,11 @@ type WriterOptions struct { // in order. It is intended for use only in the construction of invalid // sstables for testing. See tool/make_test_sstables.go. DisableKeyOrderChecks bool + + // BlobReferenceTiers provides the storage tier (hot or cold) for each blob + // reference ID. Used when WriteTieringHistograms is true to categorize blob + // references by tier. The tier for reference ID i is BlobReferenceTiers[i]. + // When a value exists in both tiers (hot-and-cold), there are two separate + // blob references with different IDs and tiers. + BlobReferenceTiers []base.StorageTier } diff --git a/replay/testdata/corpus/simple_val_sep b/replay/testdata/corpus/simple_val_sep index 8262afbbb21..e046328862c 100644 --- a/replay/testdata/corpus/simple_val_sep +++ b/replay/testdata/corpus/simple_val_sep @@ -101,7 +101,7 @@ simple_val_sep: stat simple_val_sep/MANIFEST-000013 simple_val_sep/000015.sst simple_val_sep/000016.blob ---- simple_val_sep/MANIFEST-000013: - size: 250 + size: 259 simple_val_sep/000015.sst: size: 792 simple_val_sep/000016.blob: diff --git a/replay/testdata/replay_val_sep b/replay/testdata/replay_val_sep index e71cbd70051..f95843132ca 100644 --- a/replay/testdata/replay_val_sep +++ b/replay/testdata/replay_val_sep @@ -15,15 +15,15 @@ tree 792 000015.sst 97 000016.blob 0 LOCK - 152 MANIFEST-000010 - 250 MANIFEST-000013 + 158 MANIFEST-000010 + 259 MANIFEST-000013 2942 OPTIONS-000002 0 marker.format-version.000011.024 0 marker.manifest.000003.MANIFEST-000013 simple_val_sep/ 792 000015.sst 97 000016.blob - 250 MANIFEST-000013 + 259 MANIFEST-000013 checkpoint/ 819 000005.sst 101 000006.blob @@ -31,7 +31,7 @@ tree 97 000009.blob 11 000011.log 687 000012.sst - 187 MANIFEST-000013 + 193 MANIFEST-000013 2942 OPTIONS-000002 0 marker.format-version.000001.024 0 marker.manifest.000001.MANIFEST-000013 diff --git a/sstable/colblk/data_block.go b/sstable/colblk/data_block.go index 8375ab7d268..820d2dc1688 100644 --- a/sstable/colblk/data_block.go +++ b/sstable/colblk/data_block.go @@ -1228,6 +1228,9 @@ type DataBlockIter struct { // for the current block. tieringSpanIDs UnsafeUints tieringAttributes UnsafeUints + // secondaryBlobHandles stores the decoded secondary blob handle column for + // dual-tier blob values. Only populated when the block has tiering columns. + secondaryBlobHandles RawBytes // blockData stores a reference to the block data for lazy tiering column // decoding. This is only set when Init() is called (not InitHandle()). blockData []byte @@ -1300,6 +1303,7 @@ func (i *DataBlockIter) Init( i.tieringConfig = tieringConfig i.tieringSpanIDs = UnsafeUints{} i.tieringAttributes = UnsafeUints{} + i.secondaryBlobHandles = RawBytes{} // Store block data for lazy tiering column decoding. i.blockData = bd.Data() return nil @@ -1340,6 +1344,7 @@ func (i *DataBlockIter) InitHandle( // from h.BlockData() when needed. i.tieringSpanIDs = UnsafeUints{} i.tieringAttributes = UnsafeUints{} + i.secondaryBlobHandles = RawBytes{} i.blockData = nil return nil } @@ -1553,8 +1558,12 @@ func (i *DataBlockIter) initTieringMetadata() bool { // Decode the tiering columns. bd := DecodeBlock(data, DataBlockCustomHeaderSize+i.keySchema.HeaderSize) - i.tieringSpanIDs = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringSpanID) - i.tieringAttributes = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringAttribute) + baseCol := len(i.keySchema.ColumnTypes) + i.tieringSpanIDs = bd.Uints(baseCol + dataBlockColumnTieringSpanID) + i.tieringAttributes = bd.Uints(baseCol + dataBlockColumnTieringAttribute) + if uint16(baseCol+dataBlockColumnMaxV2) <= bd.header.Columns { + i.secondaryBlobHandles = bd.RawBytes(baseCol + dataBlockColumnSecondaryBlobHandle) + } return true } @@ -1563,10 +1572,16 @@ func (i *DataBlockIter) decodeMeta() base.KVMeta { if !i.initTieringMetadata() { return base.KVMeta{} } - return base.KVMeta{ + m := base.KVMeta{ TieringSpanID: base.TieringSpanID(i.tieringSpanIDs.At(i.row)), TieringAttribute: base.TieringAttribute(i.tieringAttributes.At(i.row)), } + if i.secondaryBlobHandles.Slices() > 0 { + if b := i.secondaryBlobHandles.At(i.row); len(b) > 0 { + m.SecondaryBlobHandle = b + } + } + return m } // Last implements the base.InternalIterator interface. diff --git a/sstable/colblk/data_block_meta_test.go b/sstable/colblk/data_block_meta_test.go index 116b0b8c7b3..6d87c7c214f 100644 --- a/sstable/colblk/data_block_meta_test.go +++ b/sstable/colblk/data_block_meta_test.go @@ -28,13 +28,18 @@ func TestDataBlockIterWithMeta(t *testing.T) { meta base.KVMeta }{ {"a#1,SET", base.KVMeta{TieringSpanID: 42, TieringAttribute: 100}}, - {"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200}}, + {"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200, SecondaryBlobHandle: []byte("cold-handle-b")}}, {"c#3,SET", base.KVMeta{TieringSpanID: 0, TieringAttribute: 0}}, } for _, kv := range testKVs { ikey := base.ParseInternalKey(kv.key) kcmp := w.KeyWriter.ComparePrev(ikey.UserKey) - w.Add(ikey, []byte("value"), block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false, kv.meta) + vp := block.InPlaceValuePrefix(kcmp.PrefixEqual()) + if kv.meta.SecondaryBlobHandle != nil { + w.AddWithSecondaryBlobHandle(ikey, []byte("value"), vp, kcmp, false, kv.meta, kv.meta.SecondaryBlobHandle) + } else { + w.Add(ikey, []byte("value"), vp, kcmp, false, kv.meta) + } } block, _ := w.Finish(w.Rows(), w.Size()) diff --git a/sstable/colblk_writer.go b/sstable/colblk_writer.go index 12ed6e083eb..5f6a01a1de0 100644 --- a/sstable/colblk_writer.go +++ b/sstable/colblk_writer.go @@ -549,16 +549,17 @@ func (w *RawColumnWriter) addWithBlobHandleInternal( if secondaryHandle != nil { // Dual-tier case: track hot-and-cold blob reference bytes. w.tieringHistogramBlock.AddHotAndColdBlobRefBytes(uint64(hotHandle.ValueLen)) - } else if w.opts.BlobReferenceTierGetter != nil { + } else if w.opts.internal.BlobReferenceTiers != nil { // Single-tier case: track by tier. var kindAndTier tieredmeta.KindAndTier - switch w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID) { + tier := w.opts.internal.BlobReferenceTiers[hotHandle.ReferenceID] + switch tier { case base.HotTier: kindAndTier = tieredmeta.SSTableBlobReferenceHotBytes case base.ColdTier: kindAndTier = tieredmeta.SSTableBlobReferenceColdBytes default: - panic(errors.AssertionFailedf("unexpected tier %s", w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID))) + panic(errors.AssertionFailedf("unexpected tier %s", tier)) } w.tieringHistogramBlock.Add(kindAndTier, meta.TieringSpanID, meta.TieringAttribute, uint64(hotHandle.ValueLen)) } diff --git a/sstable/options.go b/sstable/options.go index e9e7366d1ca..5f4b8f1f476 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -245,18 +245,6 @@ type WriterOptions struct { // sstable. Requires TieringSpanIDGetter and TieringAttributeExtractor to be set. WriteTieringHistograms bool - // BlobReferenceTierGetter returns the storage tier (hot or cold) for a blob - // reference ID. Used when WriteTieringHistograms is true to categorize blob - // references by tier. When a value exists in both tiers (hot-and-cold), there - // are two separate blob references with different IDs, each returning its own - // tier. - // - // TODO(annie): This is temporary. Once each BlobReference in sstable metadata - // stores the tier (hot or cold) it was written to, we won't need this getter. - // The tier should be set based on which writer (hot or cold) created the blob - // file, and stored in both the BlobReference and PhysicalBlobFile metadata. - BlobReferenceTierGetter func(base.BlobReferenceID) base.StorageTier - // TieringThreshold is the tiering attribute threshold used to categorize keys // as below or above threshold in the histogram summary. TieringThreshold base.TieringAttribute diff --git a/sstable/reader_blob_test.go b/sstable/reader_blob_test.go new file mode 100644 index 00000000000..99dac9f174d --- /dev/null +++ b/sstable/reader_blob_test.go @@ -0,0 +1,103 @@ +// Copyright 2026 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "context" + "testing" + + "github.com/cockroachdb/crlib/testutils/leaktest" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/blobtest" + "github.com/cockroachdb/pebble/internal/sstableinternal" + "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable/colblk" + "github.com/stretchr/testify/require" +) + +// testBlobReferences implements BlobReferences. +type testBlobReferences struct { + fileIDs []base.BlobFileID +} + +func (t *testBlobReferences) BlobFileIDByID(i base.BlobReferenceID) base.BlobFileID { + return t.fileIDs[i] +} +func (t *testBlobReferences) IDByBlobFileID(fileID base.BlobFileID) (base.BlobReferenceID, bool) { + for i, id := range t.fileIDs { + if id == fileID { + return base.BlobReferenceID(i), true + } + } + return 0, false +} + +// TestDualTierBlobHandlePrimaryAndSecondaryFetchMatch builds an sstable with +// dual-tier blob values, then retrieves each value using both the primary +// (hot) and secondary (cold) blob handles and validates that both fetches +// return the same value. +func TestDualTierBlobHandlePrimaryAndSecondaryFetchMatch(t *testing.T) { + defer leaktest.AfterTest(t)() + + input := `e@100#1,SET:hot-blob{fileNum=1 valueID=0 valueLen=3 value=foo}cold-blob{fileNum=2 valueID=0 valueLen=3 value=foo}attr=10` + + bv := &blobtest.Values{} + _, err := ParseTestKVsAndSpans(input, bv) + require.NoError(t, err) + + keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16) + tiers := []base.StorageTier{base.HotTier, base.ColdTier} + opts := WriterOptions{ + Comparer: testkeys.Comparer, + KeySchema: &keySchema, + TableFormat: TableFormatMax, + TieringSpanIDGetter: func(key []byte) base.TieringSpanID { return 0 }, + TieringAttributeExtractor: func([]byte, int, []byte) (base.TieringAttribute, error) { return 0, nil }, + WriteTieringHistograms: true, + DisableValueBlocks: true, + } + opts.SetInternal(sstableinternal.WriterOptions{BlobReferenceTiers: tiers}) + + obj := &objstorage.MemObj{} + w := NewRawWriter(obj, opts) + require.NoError(t, ParseTestSST(w, input, bv)) + require.NoError(t, w.Close()) + + blobRefs := &testBlobReferences{fileIDs: []base.BlobFileID{1, 2}} + + readerOpts := ReaderOptions{ + Comparer: opts.Comparer, + KeySchemas: KeySchemas{keySchema.Name: &keySchema}, + } + r, err := NewMemReader(obj.Data(), readerOpts) + require.NoError(t, err) + defer r.Close() + + blobCtx := TableBlobContext{ValueFetcher: bv, References: blobRefs} + iter, err := r.NewPointIter(context.Background(), IterOptions{ + Transforms: NoTransforms, + BlobContext: blobCtx, + }) + require.NoError(t, err) + defer iter.Close() + + metaIter, ok := iter.(base.InternalIteratorWithKVMeta) + require.True(t, ok, "sstable iterator must implement InternalIteratorWithKVMeta") + + ctx := context.Background() + for kv, meta := metaIter.FirstWithMeta(); kv != nil; kv, meta = metaIter.NextWithMeta() { + primaryVal, _, err := kv.Value(nil /* buf */) + require.NoError(t, err) + + if len(meta.SecondaryBlobHandle) == 0 { + continue + } + + secondaryVal, _, err := blobCtx.FetchValueFromSecondaryHandle(ctx, meta.SecondaryBlobHandle, nil) + require.NoError(t, err) + require.Equal(t, primaryVal, secondaryVal, "key %s: value from primary handle != value from secondary handle", kv.K.UserKey) + } +} diff --git a/sstable/test_utils.go b/sstable/test_utils.go index f7e9ad421e0..32cdd2cfef1 100644 --- a/sstable/test_utils.go +++ b/sstable/test_utils.go @@ -109,7 +109,7 @@ type ParsedKVOrSpan struct { // // For blob values: // - Single-tier: Only BlobHandle is set. The tier (hot/cold) is determined - // by BlobReferenceTierGetter based on the blob file's reference ID. + // by sstableinternal.WriterOptions.BlobReferenceTiers[BlobHandle.ReferenceID]. // - Dual-tier: Both BlobHandle and SecondaryBlobHandle are set, representing // a value that exists in both tiers simultaneously (typically hot + cold). Value []byte diff --git a/sstable/values.go b/sstable/values.go index fda0ef477c6..7cfc7615ef7 100644 --- a/sstable/values.go +++ b/sstable/values.go @@ -5,6 +5,8 @@ package sstable import ( + "context" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/sstable/blob" @@ -88,6 +90,25 @@ type TableBlobContext struct { BlobHandleFn func(preface blob.InlineHandlePreface, remainder []byte) base.InternalValue } +// FetchValueFromSecondaryHandle fetches the value for an encoded secondary +// (e.g. cold-tier) blob handle. secondaryBlobHandle is the raw bytes from +// KVMeta.SecondaryBlobHandle. It returns the value, whether the caller owns the +// returned slice, and any error. ValueFetcher and References must be set on the +// TableBlobContext. +func (c TableBlobContext) FetchValueFromSecondaryHandle( + ctx context.Context, secondaryBlobHandle []byte, buf []byte, +) (val []byte, callerOwned bool, err error) { + if c.ValueFetcher == nil || c.References == nil { + return nil, false, errors.New("ValueFetcher and References must be set to fetch secondary handle") + } + preface, remainder := blob.DecodeInlineHandlePreface(secondaryBlobHandle) + handleSuffix := blob.DecodeHandleSuffix(remainder) + blobFileID := c.References.BlobFileIDByID(preface.ReferenceID) + var suffixBuf [blob.MaxInlineHandleLength]byte + suffixLen := handleSuffix.Encode(suffixBuf[:]) + return c.ValueFetcher.FetchHandle(ctx, suffixBuf[:suffixLen], blobFileID, preface.ValueLen, buf) +} + // defaultInternalValueConstructor is the default implementation of the // block.GetInternalValueForPrefixAndValueHandler interface. type defaultInternalValueConstructor struct { diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 69478e00152..2179e05c9ad 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -636,7 +636,7 @@ func TestWriterWithTieringHistogram(t *testing.T) { tieringMetadata := make(map[string]ParsedKVOrSpan) hasBlobHandles := false for _, kv := range kvs { - if kv.Meta != (base.KVMeta{}) && !kv.IsKeySpan() { + if kv.Meta.IsSet() && !kv.IsKeySpan() { tieringMetadata[string(kv.Key.UserKey)] = kv } if kv.HasBlobValue() { @@ -657,15 +657,6 @@ func TestWriterWithTieringHistogram(t *testing.T) { return 0, nil } - // BlobReferenceTierGetter determines which tier a blob reference is in. - // For testing, odd reference IDs are hot, even are cold. - blobReferenceTierGetter := func(refID base.BlobReferenceID) base.StorageTier { - if refID%2 == 1 { - return base.HotTier - } - return base.ColdTier - } - opts := &WriterOptions{ BlockSize: blockSize, Comparer: testkeys.Comparer, @@ -676,7 +667,19 @@ func TestWriterWithTieringHistogram(t *testing.T) { DisableValueBlocks: true, } if hasBlobHandles { - opts.BlobReferenceTierGetter = blobReferenceTierGetter + // Build BlobReferenceTiers for testing. For test simplicity, we + // use the pattern: odd reference IDs are hot, even are cold. + tiers := make([]base.StorageTier, 10) + for i := range tiers { + if i%2 == 1 { + tiers[i] = base.HotTier + } else { + tiers[i] = base.ColdTier + } + } + opts.SetInternal(sstableinternal.WriterOptions{ + BlobReferenceTiers: tiers, + }) } meta, r, err = runBuildCmd(td, opts, nil /* cacheHandle */) @@ -703,10 +706,6 @@ func TestWriterWithTieringHistogram(t *testing.T) { }) } -// TODO(annie): Once the read path is ready, add a unit test that retrieves -// values using the secondary blob handle (for dual-tier blob values) and -// validates that we get the same result as using the primary handle. - func asciiOrHex(b []byte) string { if bytes.ContainsFunc(b, func(r rune) bool { return r < ' ' || r > '~' }) { return fmt.Sprintf("hex:%x", b) diff --git a/testdata/metrics b/testdata/metrics index d287959d679..a9a6b60ff81 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -2600,7 +2600,7 @@ Blob files: disk-usage ---- -7,605B +7,617B init reopen ---- @@ -2608,4 +2608,4 @@ init reopen # The disk usage is expected to go down a bit because we remove the WALs. disk-usage ---- -7,088B +7,100B diff --git a/valsep/value_separator.go b/valsep/value_separator.go index c614f7db4fd..8998f6676f3 100644 --- a/valsep/value_separator.go +++ b/valsep/value_separator.go @@ -84,6 +84,9 @@ type pendingReference struct { // from an existing blob reference, or is a new reference being // written to a new blob file. preserved bool + // tier indicates the storage tier (hot or cold) where the blob file is + // stored. + tier base.StorageTier } type blobWriterAndMeta struct { @@ -338,16 +341,25 @@ func (vs *ValueSeparator) preserveBlobReference( fileID := lv.Fetcher.BlobFileID var refID base.BlobReferenceID + var tier base.StorageTier if refIdx := slices.IndexFunc(vs.currPendingReferences, func(ref pendingReference) bool { return ref.blobFileID == fileID }); refIdx != -1 { refID = base.BlobReferenceID(refIdx) + tier = vs.currPendingReferences[refIdx].tier } else { refID = base.BlobReferenceID(len(vs.currPendingReferences)) + // Look up the tier from the input blob file metadata. + phys, ok := vs.inputBlobPhysicalFiles[fileID] + if !ok { + panic(errors.AssertionFailedf("pebble: blob file %s not found in input blob files", fileID)) + } + tier = phys.Tier vs.currPendingReferences = append(vs.currPendingReferences, pendingReference{ blobFileID: fileID, valueSize: 0, preserved: true, + tier: tier, }) } if invariants.Enabled && vs.currPendingReferences[refID].blobFileID != fileID { @@ -368,7 +380,7 @@ func (vs *ValueSeparator) preserveBlobReference( return err } vs.currPendingReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen) - vs.blobTiers[base.HotTier].totalPreservedValueSize += uint64(lv.Fetcher.Attribute.ValueLen) + vs.blobTiers[tier].totalPreservedValueSize += uint64(lv.Fetcher.Attribute.ValueLen) return nil } @@ -425,6 +437,7 @@ func (vs *ValueSeparator) getWriter(blobTier base.StorageTier) (*blobWriterAndMe wnm.refID = base.BlobReferenceID(len(vs.currPendingReferences)) vs.currPendingReferences = append(vs.currPendingReferences, pendingReference{ blobFileID: base.BlobFileID(objMeta.DiskFileNum), + tier: blobTier, }) return wnm, nil } @@ -450,6 +463,18 @@ func (vs *ValueSeparator) maybeCheckInvariants() { } } +// BlobReferenceTiers returns a slice of storage tiers for the current pending +// blob references. The tier for reference ID i is BlobReferenceTiers()[i]. +// This should be called to populate sstableinternal.WriterOptions.BlobReferenceTiers +// for the sstable writer. +func (vs *ValueSeparator) BlobReferenceTiers() []base.StorageTier { + tiers := make([]base.StorageTier, len(vs.currPendingReferences)) + for i := range vs.currPendingReferences { + tiers[i] = vs.currPendingReferences[i].tier + } + return tiers +} + // FinishOutput closes the current blob file (if any). It returns the stats // and metadata of the now completed blob file. The state is reset to be // ready for the next output sstable.