diff --git a/internal/base/internal.go b/internal/base/internal.go index 5ecabbfb4cc..4b602f51f85 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -678,6 +678,11 @@ type InternalKV struct { type KVMeta struct { TieringSpanID TieringSpanID TieringAttribute TieringAttribute + // SecondaryBlobHandle holds the raw encoded secondary (cold tier) blob + // handle for dual-tier blob values. It is nil when the value does not have + // a secondary blob handle. The raw bytes can be decoded using + // blob.DecodeInlineHandlePreface and blob.DecodeHandleSuffix. + SecondaryBlobHandle []byte } func (m KVMeta) String() string { diff --git a/internal/manifest/blob_metadata.go b/internal/manifest/blob_metadata.go index fd0792e8bc4..dd331cfd6d1 100644 --- a/internal/manifest/blob_metadata.go +++ b/internal/manifest/blob_metadata.go @@ -46,6 +46,9 @@ type BlobReference struct { // size according to the ValueSize of the blob reference relative to the // total ValueSize of the blob file. EstimatedPhysicalSize uint64 + // Tier indicates the storage tier (hot or cold) where the referenced blob + // file was written. + Tier base.StorageTier } // MakeBlobReference creates a BlobReference from the given file ID, value size, @@ -74,6 +77,7 @@ func MakeBlobReference( // // We perform the multiplication first to avoid floating point arithmetic. EstimatedPhysicalSize: (valueSize * phys.Size) / phys.ValueSize, + Tier: phys.Tier, } } @@ -130,6 +134,9 @@ type PhysicalBlobFile struct { // File creation time in seconds since the epoch (1970-01-01 00:00:00 // UTC). CreationTime uint64 + // Tier indicates the storage tier (hot or cold) where this blob file was + // written. + Tier base.StorageTier // Mutable state @@ -151,6 +158,10 @@ func (m *PhysicalBlobFile) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("%s size:[%d (%s)] vals:[%d (%s)]", m.FileNum, redact.Safe(m.Size), humanize.Bytes.Uint64(m.Size), redact.Safe(m.ValueSize), humanize.Bytes.Uint64(m.ValueSize)) + // Only show tier if it's not the default (HotTier) for backward compatibility. + if m.Tier != base.HotTier { + w.Printf(" tier:cold") + } } // String implements fmt.Stringer. @@ -294,6 +305,16 @@ func parsePhysicalBlobFileDebug(p *strparse.Parser) (*PhysicalBlobFile, error) { p.Expect("]") case "creationTime": m.CreationTime = p.Uint64() + case "tier": + tierStr := p.Next() + switch tierStr { + case "hot": + m.Tier = base.HotTier + case "cold": + m.Tier = base.ColdTier + default: + p.Errf("unknown tier %q", tierStr) + } default: p.Errf("unknown field %q", field) } diff --git a/internal/manifest/testdata/version_edit_decode b/internal/manifest/testdata/version_edit_decode index 50d74928034..efcf7ea3186 100644 --- a/internal/manifest/testdata/version_edit_decode +++ b/internal/manifest/testdata/version_edit_decode @@ -83,8 +83,8 @@ encode excise-op: [b, c] #3 ---- 67061d000b626172000e0000000000000b666f6f010d0000000000000000 -45010129d8a301016baf0729d8a301f9a006006c21210a0162016301030b -0537 +47010129d8a3010000016daf0729d8a301f9a00600006c21210a01620163 +01030b0537 add-table: L6 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952); depth:1] add-blob-file: B000943 physical:{000041 size:[20952 (20KB)] vals:[102521 (100KB)]} del-blob-file: B000033 000033 @@ -110,7 +110,7 @@ encode add-backing: 000009 ---- 69096467031d000b626172000e0000000000000b666f6f010d0000000000 -000000420946010129d8a301e8f10101 +000000420947010129d8a301e8f1010001 add-table: L3 000029(000009):[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1] add-backing: 000009 @@ -128,7 +128,7 @@ encode add-table: L3 000029:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952 / 30952); depth:1] ---- 67031d000b626172000e0000000000000b666f6f010d0000000000000000 -45010129d8a30101 +47010129d8a301e8f1010001 add-table: L3 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1] diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 49b20d4ea29..6d3491f1192 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -62,6 +62,7 @@ const ( tagRemovedBackingTable = 106 tagNewBlobFile = 107 tagDeletedBlobFile = 108 + tagNewBlobFile2 = 109 // The custom tags sub-format used by tagNewFile4 and above. All tags less // than customTagNonSafeIgnoreMask are safe to ignore and their format must be @@ -77,6 +78,8 @@ const ( customTagBlobReferences = 69 // customTagBlobReferences2 contains BackingValueSize for each BlobReference. customTagBlobReferences2 = 70 + // customTagBlobReferences3 contains BackingValueSize and Tier for each BlobReference. + customTagBlobReferences3 = 71 ) // DeletedTableEntry holds the state for a sstable deletion from a level. The @@ -467,7 +470,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { return err } - case customTagBlobReferences, customTagBlobReferences2: + case customTagBlobReferences, customTagBlobReferences2, customTagBlobReferences3: // The first varint encodes the 'blob reference depth' // of the table. v, err := d.readUvarint() @@ -490,16 +493,31 @@ func (v *VersionEdit) Decode(r io.Reader) error { return err } var backingValueSize uint64 - if customTag == customTagBlobReferences2 { + if customTag == customTagBlobReferences2 || customTag == customTagBlobReferences3 { backingValueSize, err = d.readUvarint() if err != nil { return err } } + var tier base.StorageTier + if customTag == customTagBlobReferences3 { + tierValue, err := d.readUvarint() + if err != nil { + return err + } + if tierValue >= uint64(base.NumStorageTiers) { + return base.CorruptionErrorf("new-file4: invalid tier value: %d", tierValue) + } + tier = base.StorageTier(tierValue) + } else { + // For backward compatibility, default to HotTier. + tier = base.HotTier + } blobReferences[i] = BlobReference{ FileID: base.BlobFileID(fileID), ValueSize: valueSize, BackingValueSize: backingValueSize, + Tier: tier, } } continue @@ -566,7 +584,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { } v.NewTables = append(v.NewTables, nfe) - case tagNewBlobFile: + case tagNewBlobFile, tagNewBlobFile2: fileID, err := d.readUvarint() if err != nil { return err @@ -587,6 +605,17 @@ func (v *VersionEdit) Decode(r io.Reader) error { if err != nil { return err } + var tier base.StorageTier + if tag == tagNewBlobFile2 { + tierValue, err := d.readUvarint() + if err != nil { + return err + } + tier = base.StorageTier(tierValue) + } else { + // For backward compatibility with tagNewBlobFile, default to HotTier. + tier = base.HotTier + } v.NewBlobFiles = append(v.NewBlobFiles, BlobFileMetadata{ FileID: base.BlobFileID(fileID), Physical: &PhysicalBlobFile{ @@ -594,6 +623,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { Size: size, ValueSize: valueSize, CreationTime: creationTime, + Tier: tier, }, }) @@ -948,40 +978,27 @@ func (v *VersionEdit) Encode(w io.Writer) error { e.writeBytes(x.Meta.SyntheticPrefixAndSuffix.Suffix()) } if len(x.Meta.BlobReferences) > 0 { - writeBackingValueSize := false - if x.Meta.Virtual { - for _, ref := range x.Meta.BlobReferences { - if ref.BackingValueSize > 0 && ref.BackingValueSize != ref.ValueSize { - writeBackingValueSize = true - break - } - } - } - if writeBackingValueSize { - e.writeUvarint(customTagBlobReferences2) - } else { - e.writeUvarint(customTagBlobReferences) - } + e.writeUvarint(customTagBlobReferences3) e.writeUvarint(uint64(x.Meta.BlobReferenceDepth)) e.writeUvarint(uint64(len(x.Meta.BlobReferences))) for _, ref := range x.Meta.BlobReferences { e.writeUvarint(uint64(ref.FileID)) e.writeUvarint(ref.ValueSize) - if writeBackingValueSize { - e.writeUvarint(ref.BackingValueSize) - } + e.writeUvarint(ref.BackingValueSize) + e.writeUvarint(uint64(ref.Tier)) } } e.writeUvarint(customTagTerminate) } } for _, x := range v.NewBlobFiles { - e.writeUvarint(tagNewBlobFile) + e.writeUvarint(tagNewBlobFile2) e.writeUvarint(uint64(x.FileID)) e.writeUvarint(uint64(x.Physical.FileNum)) e.writeUvarint(x.Physical.Size) e.writeUvarint(x.Physical.ValueSize) e.writeUvarint(x.Physical.CreationTime) + e.writeUvarint(uint64(x.Physical.Tier)) } for x := range v.DeletedBlobFiles { e.writeUvarint(tagDeletedBlobFile) diff --git a/internal/sstableinternal/options.go b/internal/sstableinternal/options.go index 475b72fc334..d593a6e8616 100644 --- a/internal/sstableinternal/options.go +++ b/internal/sstableinternal/options.go @@ -33,4 +33,11 @@ type WriterOptions struct { // in order. It is intended for use only in the construction of invalid // sstables for testing. See tool/make_test_sstables.go. DisableKeyOrderChecks bool + + // BlobReferenceTiers provides the storage tier (hot or cold) for each blob + // reference ID. Used when WriteTieringHistograms is true to categorize blob + // references by tier. The tier for reference ID i is BlobReferenceTiers[i]. + // When a value exists in both tiers (hot-and-cold), there are two separate + // blob references with different IDs and tiers. + BlobReferenceTiers []base.StorageTier } diff --git a/replay/testdata/corpus/simple_val_sep b/replay/testdata/corpus/simple_val_sep index 8262afbbb21..e046328862c 100644 --- a/replay/testdata/corpus/simple_val_sep +++ b/replay/testdata/corpus/simple_val_sep @@ -101,7 +101,7 @@ simple_val_sep: stat simple_val_sep/MANIFEST-000013 simple_val_sep/000015.sst simple_val_sep/000016.blob ---- simple_val_sep/MANIFEST-000013: - size: 250 + size: 259 simple_val_sep/000015.sst: size: 792 simple_val_sep/000016.blob: diff --git a/replay/testdata/replay_val_sep b/replay/testdata/replay_val_sep index e71cbd70051..f95843132ca 100644 --- a/replay/testdata/replay_val_sep +++ b/replay/testdata/replay_val_sep @@ -15,15 +15,15 @@ tree 792 000015.sst 97 000016.blob 0 LOCK - 152 MANIFEST-000010 - 250 MANIFEST-000013 + 158 MANIFEST-000010 + 259 MANIFEST-000013 2942 OPTIONS-000002 0 marker.format-version.000011.024 0 marker.manifest.000003.MANIFEST-000013 simple_val_sep/ 792 000015.sst 97 000016.blob - 250 MANIFEST-000013 + 259 MANIFEST-000013 checkpoint/ 819 000005.sst 101 000006.blob @@ -31,7 +31,7 @@ tree 97 000009.blob 11 000011.log 687 000012.sst - 187 MANIFEST-000013 + 193 MANIFEST-000013 2942 OPTIONS-000002 0 marker.format-version.000001.024 0 marker.manifest.000001.MANIFEST-000013 diff --git a/sstable/colblk/data_block.go b/sstable/colblk/data_block.go index 8375ab7d268..820d2dc1688 100644 --- a/sstable/colblk/data_block.go +++ b/sstable/colblk/data_block.go @@ -1228,6 +1228,9 @@ type DataBlockIter struct { // for the current block. tieringSpanIDs UnsafeUints tieringAttributes UnsafeUints + // secondaryBlobHandles stores the decoded secondary blob handle column for + // dual-tier blob values. Only populated when the block has tiering columns. + secondaryBlobHandles RawBytes // blockData stores a reference to the block data for lazy tiering column // decoding. This is only set when Init() is called (not InitHandle()). blockData []byte @@ -1300,6 +1303,7 @@ func (i *DataBlockIter) Init( i.tieringConfig = tieringConfig i.tieringSpanIDs = UnsafeUints{} i.tieringAttributes = UnsafeUints{} + i.secondaryBlobHandles = RawBytes{} // Store block data for lazy tiering column decoding. i.blockData = bd.Data() return nil @@ -1340,6 +1344,7 @@ func (i *DataBlockIter) InitHandle( // from h.BlockData() when needed. i.tieringSpanIDs = UnsafeUints{} i.tieringAttributes = UnsafeUints{} + i.secondaryBlobHandles = RawBytes{} i.blockData = nil return nil } @@ -1553,8 +1558,12 @@ func (i *DataBlockIter) initTieringMetadata() bool { // Decode the tiering columns. bd := DecodeBlock(data, DataBlockCustomHeaderSize+i.keySchema.HeaderSize) - i.tieringSpanIDs = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringSpanID) - i.tieringAttributes = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringAttribute) + baseCol := len(i.keySchema.ColumnTypes) + i.tieringSpanIDs = bd.Uints(baseCol + dataBlockColumnTieringSpanID) + i.tieringAttributes = bd.Uints(baseCol + dataBlockColumnTieringAttribute) + if uint16(baseCol+dataBlockColumnMaxV2) <= bd.header.Columns { + i.secondaryBlobHandles = bd.RawBytes(baseCol + dataBlockColumnSecondaryBlobHandle) + } return true } @@ -1563,10 +1572,16 @@ func (i *DataBlockIter) decodeMeta() base.KVMeta { if !i.initTieringMetadata() { return base.KVMeta{} } - return base.KVMeta{ + m := base.KVMeta{ TieringSpanID: base.TieringSpanID(i.tieringSpanIDs.At(i.row)), TieringAttribute: base.TieringAttribute(i.tieringAttributes.At(i.row)), } + if i.secondaryBlobHandles.Slices() > 0 { + if b := i.secondaryBlobHandles.At(i.row); len(b) > 0 { + m.SecondaryBlobHandle = b + } + } + return m } // Last implements the base.InternalIterator interface. diff --git a/sstable/colblk/data_block_meta_test.go b/sstable/colblk/data_block_meta_test.go index 116b0b8c7b3..6d87c7c214f 100644 --- a/sstable/colblk/data_block_meta_test.go +++ b/sstable/colblk/data_block_meta_test.go @@ -28,13 +28,18 @@ func TestDataBlockIterWithMeta(t *testing.T) { meta base.KVMeta }{ {"a#1,SET", base.KVMeta{TieringSpanID: 42, TieringAttribute: 100}}, - {"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200}}, + {"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200, SecondaryBlobHandle: []byte("cold-handle-b")}}, {"c#3,SET", base.KVMeta{TieringSpanID: 0, TieringAttribute: 0}}, } for _, kv := range testKVs { ikey := base.ParseInternalKey(kv.key) kcmp := w.KeyWriter.ComparePrev(ikey.UserKey) - w.Add(ikey, []byte("value"), block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false, kv.meta) + vp := block.InPlaceValuePrefix(kcmp.PrefixEqual()) + if kv.meta.SecondaryBlobHandle != nil { + w.AddWithSecondaryBlobHandle(ikey, []byte("value"), vp, kcmp, false, kv.meta, kv.meta.SecondaryBlobHandle) + } else { + w.Add(ikey, []byte("value"), vp, kcmp, false, kv.meta) + } } block, _ := w.Finish(w.Rows(), w.Size()) diff --git a/sstable/colblk_writer.go b/sstable/colblk_writer.go index 12ed6e083eb..5f6a01a1de0 100644 --- a/sstable/colblk_writer.go +++ b/sstable/colblk_writer.go @@ -549,16 +549,17 @@ func (w *RawColumnWriter) addWithBlobHandleInternal( if secondaryHandle != nil { // Dual-tier case: track hot-and-cold blob reference bytes. w.tieringHistogramBlock.AddHotAndColdBlobRefBytes(uint64(hotHandle.ValueLen)) - } else if w.opts.BlobReferenceTierGetter != nil { + } else if w.opts.internal.BlobReferenceTiers != nil { // Single-tier case: track by tier. var kindAndTier tieredmeta.KindAndTier - switch w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID) { + tier := w.opts.internal.BlobReferenceTiers[hotHandle.ReferenceID] + switch tier { case base.HotTier: kindAndTier = tieredmeta.SSTableBlobReferenceHotBytes case base.ColdTier: kindAndTier = tieredmeta.SSTableBlobReferenceColdBytes default: - panic(errors.AssertionFailedf("unexpected tier %s", w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID))) + panic(errors.AssertionFailedf("unexpected tier %s", tier)) } w.tieringHistogramBlock.Add(kindAndTier, meta.TieringSpanID, meta.TieringAttribute, uint64(hotHandle.ValueLen)) } diff --git a/sstable/options.go b/sstable/options.go index e9e7366d1ca..5f4b8f1f476 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -245,18 +245,6 @@ type WriterOptions struct { // sstable. Requires TieringSpanIDGetter and TieringAttributeExtractor to be set. WriteTieringHistograms bool - // BlobReferenceTierGetter returns the storage tier (hot or cold) for a blob - // reference ID. Used when WriteTieringHistograms is true to categorize blob - // references by tier. When a value exists in both tiers (hot-and-cold), there - // are two separate blob references with different IDs, each returning its own - // tier. - // - // TODO(annie): This is temporary. Once each BlobReference in sstable metadata - // stores the tier (hot or cold) it was written to, we won't need this getter. - // The tier should be set based on which writer (hot or cold) created the blob - // file, and stored in both the BlobReference and PhysicalBlobFile metadata. - BlobReferenceTierGetter func(base.BlobReferenceID) base.StorageTier - // TieringThreshold is the tiering attribute threshold used to categorize keys // as below or above threshold in the histogram summary. TieringThreshold base.TieringAttribute diff --git a/sstable/reader_blob_test.go b/sstable/reader_blob_test.go new file mode 100644 index 00000000000..99dac9f174d --- /dev/null +++ b/sstable/reader_blob_test.go @@ -0,0 +1,103 @@ +// Copyright 2026 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "context" + "testing" + + "github.com/cockroachdb/crlib/testutils/leaktest" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/blobtest" + "github.com/cockroachdb/pebble/internal/sstableinternal" + "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable/colblk" + "github.com/stretchr/testify/require" +) + +// testBlobReferences implements BlobReferences. +type testBlobReferences struct { + fileIDs []base.BlobFileID +} + +func (t *testBlobReferences) BlobFileIDByID(i base.BlobReferenceID) base.BlobFileID { + return t.fileIDs[i] +} +func (t *testBlobReferences) IDByBlobFileID(fileID base.BlobFileID) (base.BlobReferenceID, bool) { + for i, id := range t.fileIDs { + if id == fileID { + return base.BlobReferenceID(i), true + } + } + return 0, false +} + +// TestDualTierBlobHandlePrimaryAndSecondaryFetchMatch builds an sstable with +// dual-tier blob values, then retrieves each value using both the primary +// (hot) and secondary (cold) blob handles and validates that both fetches +// return the same value. +func TestDualTierBlobHandlePrimaryAndSecondaryFetchMatch(t *testing.T) { + defer leaktest.AfterTest(t)() + + input := `e@100#1,SET:hot-blob{fileNum=1 valueID=0 valueLen=3 value=foo}cold-blob{fileNum=2 valueID=0 valueLen=3 value=foo}attr=10` + + bv := &blobtest.Values{} + _, err := ParseTestKVsAndSpans(input, bv) + require.NoError(t, err) + + keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16) + tiers := []base.StorageTier{base.HotTier, base.ColdTier} + opts := WriterOptions{ + Comparer: testkeys.Comparer, + KeySchema: &keySchema, + TableFormat: TableFormatMax, + TieringSpanIDGetter: func(key []byte) base.TieringSpanID { return 0 }, + TieringAttributeExtractor: func([]byte, int, []byte) (base.TieringAttribute, error) { return 0, nil }, + WriteTieringHistograms: true, + DisableValueBlocks: true, + } + opts.SetInternal(sstableinternal.WriterOptions{BlobReferenceTiers: tiers}) + + obj := &objstorage.MemObj{} + w := NewRawWriter(obj, opts) + require.NoError(t, ParseTestSST(w, input, bv)) + require.NoError(t, w.Close()) + + blobRefs := &testBlobReferences{fileIDs: []base.BlobFileID{1, 2}} + + readerOpts := ReaderOptions{ + Comparer: opts.Comparer, + KeySchemas: KeySchemas{keySchema.Name: &keySchema}, + } + r, err := NewMemReader(obj.Data(), readerOpts) + require.NoError(t, err) + defer r.Close() + + blobCtx := TableBlobContext{ValueFetcher: bv, References: blobRefs} + iter, err := r.NewPointIter(context.Background(), IterOptions{ + Transforms: NoTransforms, + BlobContext: blobCtx, + }) + require.NoError(t, err) + defer iter.Close() + + metaIter, ok := iter.(base.InternalIteratorWithKVMeta) + require.True(t, ok, "sstable iterator must implement InternalIteratorWithKVMeta") + + ctx := context.Background() + for kv, meta := metaIter.FirstWithMeta(); kv != nil; kv, meta = metaIter.NextWithMeta() { + primaryVal, _, err := kv.Value(nil /* buf */) + require.NoError(t, err) + + if len(meta.SecondaryBlobHandle) == 0 { + continue + } + + secondaryVal, _, err := blobCtx.FetchValueFromSecondaryHandle(ctx, meta.SecondaryBlobHandle, nil) + require.NoError(t, err) + require.Equal(t, primaryVal, secondaryVal, "key %s: value from primary handle != value from secondary handle", kv.K.UserKey) + } +} diff --git a/sstable/test_utils.go b/sstable/test_utils.go index f7e9ad421e0..32cdd2cfef1 100644 --- a/sstable/test_utils.go +++ b/sstable/test_utils.go @@ -109,7 +109,7 @@ type ParsedKVOrSpan struct { // // For blob values: // - Single-tier: Only BlobHandle is set. The tier (hot/cold) is determined - // by BlobReferenceTierGetter based on the blob file's reference ID. + // by sstableinternal.WriterOptions.BlobReferenceTiers[BlobHandle.ReferenceID]. // - Dual-tier: Both BlobHandle and SecondaryBlobHandle are set, representing // a value that exists in both tiers simultaneously (typically hot + cold). Value []byte diff --git a/sstable/values.go b/sstable/values.go index fda0ef477c6..7cfc7615ef7 100644 --- a/sstable/values.go +++ b/sstable/values.go @@ -5,6 +5,8 @@ package sstable import ( + "context" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/sstable/blob" @@ -88,6 +90,25 @@ type TableBlobContext struct { BlobHandleFn func(preface blob.InlineHandlePreface, remainder []byte) base.InternalValue } +// FetchValueFromSecondaryHandle fetches the value for an encoded secondary +// (e.g. cold-tier) blob handle. secondaryBlobHandle is the raw bytes from +// KVMeta.SecondaryBlobHandle. It returns the value, whether the caller owns the +// returned slice, and any error. ValueFetcher and References must be set on the +// TableBlobContext. +func (c TableBlobContext) FetchValueFromSecondaryHandle( + ctx context.Context, secondaryBlobHandle []byte, buf []byte, +) (val []byte, callerOwned bool, err error) { + if c.ValueFetcher == nil || c.References == nil { + return nil, false, errors.New("ValueFetcher and References must be set to fetch secondary handle") + } + preface, remainder := blob.DecodeInlineHandlePreface(secondaryBlobHandle) + handleSuffix := blob.DecodeHandleSuffix(remainder) + blobFileID := c.References.BlobFileIDByID(preface.ReferenceID) + var suffixBuf [blob.MaxInlineHandleLength]byte + suffixLen := handleSuffix.Encode(suffixBuf[:]) + return c.ValueFetcher.FetchHandle(ctx, suffixBuf[:suffixLen], blobFileID, preface.ValueLen, buf) +} + // defaultInternalValueConstructor is the default implementation of the // block.GetInternalValueForPrefixAndValueHandler interface. type defaultInternalValueConstructor struct { diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 69478e00152..2179e05c9ad 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -636,7 +636,7 @@ func TestWriterWithTieringHistogram(t *testing.T) { tieringMetadata := make(map[string]ParsedKVOrSpan) hasBlobHandles := false for _, kv := range kvs { - if kv.Meta != (base.KVMeta{}) && !kv.IsKeySpan() { + if kv.Meta.IsSet() && !kv.IsKeySpan() { tieringMetadata[string(kv.Key.UserKey)] = kv } if kv.HasBlobValue() { @@ -657,15 +657,6 @@ func TestWriterWithTieringHistogram(t *testing.T) { return 0, nil } - // BlobReferenceTierGetter determines which tier a blob reference is in. - // For testing, odd reference IDs are hot, even are cold. - blobReferenceTierGetter := func(refID base.BlobReferenceID) base.StorageTier { - if refID%2 == 1 { - return base.HotTier - } - return base.ColdTier - } - opts := &WriterOptions{ BlockSize: blockSize, Comparer: testkeys.Comparer, @@ -676,7 +667,19 @@ func TestWriterWithTieringHistogram(t *testing.T) { DisableValueBlocks: true, } if hasBlobHandles { - opts.BlobReferenceTierGetter = blobReferenceTierGetter + // Build BlobReferenceTiers for testing. For test simplicity, we + // use the pattern: odd reference IDs are hot, even are cold. + tiers := make([]base.StorageTier, 10) + for i := range tiers { + if i%2 == 1 { + tiers[i] = base.HotTier + } else { + tiers[i] = base.ColdTier + } + } + opts.SetInternal(sstableinternal.WriterOptions{ + BlobReferenceTiers: tiers, + }) } meta, r, err = runBuildCmd(td, opts, nil /* cacheHandle */) @@ -703,10 +706,6 @@ func TestWriterWithTieringHistogram(t *testing.T) { }) } -// TODO(annie): Once the read path is ready, add a unit test that retrieves -// values using the secondary blob handle (for dual-tier blob values) and -// validates that we get the same result as using the primary handle. - func asciiOrHex(b []byte) string { if bytes.ContainsFunc(b, func(r rune) bool { return r < ' ' || r > '~' }) { return fmt.Sprintf("hex:%x", b) diff --git a/testdata/metrics b/testdata/metrics index d287959d679..a9a6b60ff81 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -2600,7 +2600,7 @@ Blob files: disk-usage ---- -7,605B +7,617B init reopen ---- @@ -2608,4 +2608,4 @@ init reopen # The disk usage is expected to go down a bit because we remove the WALs. disk-usage ---- -7,088B +7,100B diff --git a/valsep/value_separator.go b/valsep/value_separator.go index c614f7db4fd..8998f6676f3 100644 --- a/valsep/value_separator.go +++ b/valsep/value_separator.go @@ -84,6 +84,9 @@ type pendingReference struct { // from an existing blob reference, or is a new reference being // written to a new blob file. preserved bool + // tier indicates the storage tier (hot or cold) where the blob file is + // stored. + tier base.StorageTier } type blobWriterAndMeta struct { @@ -338,16 +341,25 @@ func (vs *ValueSeparator) preserveBlobReference( fileID := lv.Fetcher.BlobFileID var refID base.BlobReferenceID + var tier base.StorageTier if refIdx := slices.IndexFunc(vs.currPendingReferences, func(ref pendingReference) bool { return ref.blobFileID == fileID }); refIdx != -1 { refID = base.BlobReferenceID(refIdx) + tier = vs.currPendingReferences[refIdx].tier } else { refID = base.BlobReferenceID(len(vs.currPendingReferences)) + // Look up the tier from the input blob file metadata. + phys, ok := vs.inputBlobPhysicalFiles[fileID] + if !ok { + panic(errors.AssertionFailedf("pebble: blob file %s not found in input blob files", fileID)) + } + tier = phys.Tier vs.currPendingReferences = append(vs.currPendingReferences, pendingReference{ blobFileID: fileID, valueSize: 0, preserved: true, + tier: tier, }) } if invariants.Enabled && vs.currPendingReferences[refID].blobFileID != fileID { @@ -368,7 +380,7 @@ func (vs *ValueSeparator) preserveBlobReference( return err } vs.currPendingReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen) - vs.blobTiers[base.HotTier].totalPreservedValueSize += uint64(lv.Fetcher.Attribute.ValueLen) + vs.blobTiers[tier].totalPreservedValueSize += uint64(lv.Fetcher.Attribute.ValueLen) return nil } @@ -425,6 +437,7 @@ func (vs *ValueSeparator) getWriter(blobTier base.StorageTier) (*blobWriterAndMe wnm.refID = base.BlobReferenceID(len(vs.currPendingReferences)) vs.currPendingReferences = append(vs.currPendingReferences, pendingReference{ blobFileID: base.BlobFileID(objMeta.DiskFileNum), + tier: blobTier, }) return wnm, nil } @@ -450,6 +463,18 @@ func (vs *ValueSeparator) maybeCheckInvariants() { } } +// BlobReferenceTiers returns a slice of storage tiers for the current pending +// blob references. The tier for reference ID i is BlobReferenceTiers()[i]. +// This should be called to populate sstableinternal.WriterOptions.BlobReferenceTiers +// for the sstable writer. +func (vs *ValueSeparator) BlobReferenceTiers() []base.StorageTier { + tiers := make([]base.StorageTier, len(vs.currPendingReferences)) + for i := range vs.currPendingReferences { + tiers[i] = vs.currPendingReferences[i].tier + } + return tiers +} + // FinishOutput closes the current blob file (if any). It returns the stats // and metadata of the now completed blob file. The state is reset to be // ready for the next output sstable.