Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ type InternalKV struct {
type KVMeta struct {
TieringSpanID TieringSpanID
TieringAttribute TieringAttribute
// SecondaryBlobHandle holds the raw encoded secondary (cold tier) blob
// handle for dual-tier blob values. It is nil when the value does not have
// a secondary blob handle. The raw bytes can be decoded using
// blob.DecodeInlineHandlePreface and blob.DecodeHandleSuffix.
SecondaryBlobHandle []byte
}

func (m KVMeta) String() string {
Expand Down
21 changes: 21 additions & 0 deletions internal/manifest/blob_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type BlobReference struct {
// size according to the ValueSize of the blob reference relative to the
// total ValueSize of the blob file.
EstimatedPhysicalSize uint64
// Tier indicates the storage tier (hot or cold) where the referenced blob
// file was written.
Tier base.StorageTier
}

// MakeBlobReference creates a BlobReference from the given file ID, value size,
Expand Down Expand Up @@ -74,6 +77,7 @@ func MakeBlobReference(
//
// We perform the multiplication first to avoid floating point arithmetic.
EstimatedPhysicalSize: (valueSize * phys.Size) / phys.ValueSize,
Tier: phys.Tier,
}
}

Expand Down Expand Up @@ -130,6 +134,9 @@ type PhysicalBlobFile struct {
// File creation time in seconds since the epoch (1970-01-01 00:00:00
// UTC).
CreationTime uint64
// Tier indicates the storage tier (hot or cold) where this blob file was
// written.
Tier base.StorageTier

// Mutable state

Expand All @@ -151,6 +158,10 @@ func (m *PhysicalBlobFile) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s size:[%d (%s)] vals:[%d (%s)]",
m.FileNum, redact.Safe(m.Size), humanize.Bytes.Uint64(m.Size),
redact.Safe(m.ValueSize), humanize.Bytes.Uint64(m.ValueSize))
// Only show tier if it's not the default (HotTier) for backward compatibility.
if m.Tier != base.HotTier {
w.Printf(" tier:cold")
}
}

// String implements fmt.Stringer.
Expand Down Expand Up @@ -294,6 +305,16 @@ func parsePhysicalBlobFileDebug(p *strparse.Parser) (*PhysicalBlobFile, error) {
p.Expect("]")
case "creationTime":
m.CreationTime = p.Uint64()
case "tier":
tierStr := p.Next()
switch tierStr {
case "hot":
m.Tier = base.HotTier
case "cold":
m.Tier = base.ColdTier
default:
p.Errf("unknown tier %q", tierStr)
}
default:
p.Errf("unknown field %q", field)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/manifest/testdata/version_edit_decode
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ encode
excise-op: [b, c] #3
----
67061d000b626172000e0000000000000b666f6f010d0000000000000000
45010129d8a301016baf0729d8a301f9a006006c21210a0162016301030b
0537
47010129d8a3010000016daf0729d8a301f9a00600006c21210a01620163
01030b0537
add-table: L6 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952); depth:1]
add-blob-file: B000943 physical:{000041 size:[20952 (20KB)] vals:[102521 (100KB)]}
del-blob-file: B000033 000033
Expand All @@ -110,7 +110,7 @@ encode
add-backing: 000009
----
69096467031d000b626172000e0000000000000b666f6f010d0000000000
000000420946010129d8a301e8f10101
000000420947010129d8a301e8f1010001
add-table: L3 000029(000009):[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1]
add-backing: 000009

Expand All @@ -128,7 +128,7 @@ encode
add-table: L3 000029:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952 / 30952); depth:1]
----
67031d000b626172000e0000000000000b666f6f010d0000000000000000
45010129d8a30101
47010129d8a301e8f1010001
add-table: L3 000029:[bar#14,DEL-foo#13,SET] seqnums:[#0-#0] points:[bar#14,DEL-foo#13,SET] blobrefs:[(B000041: 20952/30952); depth:1]


Expand Down
59 changes: 38 additions & 21 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
tagRemovedBackingTable = 106
tagNewBlobFile = 107
tagDeletedBlobFile = 108
tagNewBlobFile2 = 109

// The custom tags sub-format used by tagNewFile4 and above. All tags less
// than customTagNonSafeIgnoreMask are safe to ignore and their format must be
Expand All @@ -77,6 +78,8 @@ const (
customTagBlobReferences = 69
// customTagBlobReferences2 contains BackingValueSize for each BlobReference.
customTagBlobReferences2 = 70
// customTagBlobReferences3 contains BackingValueSize and Tier for each BlobReference.
customTagBlobReferences3 = 71
)

// DeletedTableEntry holds the state for a sstable deletion from a level. The
Expand Down Expand Up @@ -467,7 +470,7 @@ func (v *VersionEdit) Decode(r io.Reader) error {
return err
}

case customTagBlobReferences, customTagBlobReferences2:
case customTagBlobReferences, customTagBlobReferences2, customTagBlobReferences3:
// The first varint encodes the 'blob reference depth'
// of the table.
v, err := d.readUvarint()
Expand All @@ -490,16 +493,31 @@ func (v *VersionEdit) Decode(r io.Reader) error {
return err
}
var backingValueSize uint64
if customTag == customTagBlobReferences2 {
if customTag == customTagBlobReferences2 || customTag == customTagBlobReferences3 {
backingValueSize, err = d.readUvarint()
if err != nil {
return err
}
}
var tier base.StorageTier
if customTag == customTagBlobReferences3 {
tierValue, err := d.readUvarint()
if err != nil {
return err
}
if tierValue >= uint64(base.NumStorageTiers) {
return base.CorruptionErrorf("new-file4: invalid tier value: %d", tierValue)
}
tier = base.StorageTier(tierValue)
} else {
// For backward compatibility, default to HotTier.
tier = base.HotTier
}
blobReferences[i] = BlobReference{
FileID: base.BlobFileID(fileID),
ValueSize: valueSize,
BackingValueSize: backingValueSize,
Tier: tier,
}
}
continue
Expand Down Expand Up @@ -566,7 +584,7 @@ func (v *VersionEdit) Decode(r io.Reader) error {
}
v.NewTables = append(v.NewTables, nfe)

case tagNewBlobFile:
case tagNewBlobFile, tagNewBlobFile2:
fileID, err := d.readUvarint()
if err != nil {
return err
Expand All @@ -587,13 +605,25 @@ func (v *VersionEdit) Decode(r io.Reader) error {
if err != nil {
return err
}
var tier base.StorageTier
if tag == tagNewBlobFile2 {
tierValue, err := d.readUvarint()
if err != nil {
return err
}
tier = base.StorageTier(tierValue)
} else {
// For backward compatibility with tagNewBlobFile, default to HotTier.
tier = base.HotTier
}
v.NewBlobFiles = append(v.NewBlobFiles, BlobFileMetadata{
FileID: base.BlobFileID(fileID),
Physical: &PhysicalBlobFile{
FileNum: base.DiskFileNum(diskFileNum),
Size: size,
ValueSize: valueSize,
CreationTime: creationTime,
Tier: tier,
},
})

Expand Down Expand Up @@ -948,40 +978,27 @@ func (v *VersionEdit) Encode(w io.Writer) error {
e.writeBytes(x.Meta.SyntheticPrefixAndSuffix.Suffix())
}
if len(x.Meta.BlobReferences) > 0 {
writeBackingValueSize := false
if x.Meta.Virtual {
for _, ref := range x.Meta.BlobReferences {
if ref.BackingValueSize > 0 && ref.BackingValueSize != ref.ValueSize {
writeBackingValueSize = true
break
}
}
}
if writeBackingValueSize {
e.writeUvarint(customTagBlobReferences2)
} else {
e.writeUvarint(customTagBlobReferences)
}
e.writeUvarint(customTagBlobReferences3)
e.writeUvarint(uint64(x.Meta.BlobReferenceDepth))
e.writeUvarint(uint64(len(x.Meta.BlobReferences)))
for _, ref := range x.Meta.BlobReferences {
e.writeUvarint(uint64(ref.FileID))
e.writeUvarint(ref.ValueSize)
if writeBackingValueSize {
e.writeUvarint(ref.BackingValueSize)
}
e.writeUvarint(ref.BackingValueSize)
e.writeUvarint(uint64(ref.Tier))
}
}
e.writeUvarint(customTagTerminate)
}
}
for _, x := range v.NewBlobFiles {
e.writeUvarint(tagNewBlobFile)
e.writeUvarint(tagNewBlobFile2)
e.writeUvarint(uint64(x.FileID))
e.writeUvarint(uint64(x.Physical.FileNum))
e.writeUvarint(x.Physical.Size)
e.writeUvarint(x.Physical.ValueSize)
e.writeUvarint(x.Physical.CreationTime)
e.writeUvarint(uint64(x.Physical.Tier))
}
for x := range v.DeletedBlobFiles {
e.writeUvarint(tagDeletedBlobFile)
Expand Down
7 changes: 7 additions & 0 deletions internal/sstableinternal/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ type WriterOptions struct {
// in order. It is intended for use only in the construction of invalid
// sstables for testing. See tool/make_test_sstables.go.
DisableKeyOrderChecks bool

// BlobReferenceTiers provides the storage tier (hot or cold) for each blob
// reference ID. Used when WriteTieringHistograms is true to categorize blob
// references by tier. The tier for reference ID i is BlobReferenceTiers[i].
// When a value exists in both tiers (hot-and-cold), there are two separate
// blob references with different IDs and tiers.
BlobReferenceTiers []base.StorageTier
}
2 changes: 1 addition & 1 deletion replay/testdata/corpus/simple_val_sep
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ simple_val_sep:
stat simple_val_sep/MANIFEST-000013 simple_val_sep/000015.sst simple_val_sep/000016.blob
----
simple_val_sep/MANIFEST-000013:
size: 250
size: 259
simple_val_sep/000015.sst:
size: 792
simple_val_sep/000016.blob:
Expand Down
8 changes: 4 additions & 4 deletions replay/testdata/replay_val_sep
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ tree
792 000015.sst
97 000016.blob
0 LOCK
152 MANIFEST-000010
250 MANIFEST-000013
158 MANIFEST-000010
259 MANIFEST-000013
2942 OPTIONS-000002
0 marker.format-version.000011.024
0 marker.manifest.000003.MANIFEST-000013
simple_val_sep/
792 000015.sst
97 000016.blob
250 MANIFEST-000013
259 MANIFEST-000013
checkpoint/
819 000005.sst
101 000006.blob
798 000008.sst
97 000009.blob
11 000011.log
687 000012.sst
187 MANIFEST-000013
193 MANIFEST-000013
2942 OPTIONS-000002
0 marker.format-version.000001.024
0 marker.manifest.000001.MANIFEST-000013
Expand Down
21 changes: 18 additions & 3 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,9 @@ type DataBlockIter struct {
// for the current block.
tieringSpanIDs UnsafeUints
tieringAttributes UnsafeUints
// secondaryBlobHandles stores the decoded secondary blob handle column for
// dual-tier blob values. Only populated when the block has tiering columns.
secondaryBlobHandles RawBytes
// blockData stores a reference to the block data for lazy tiering column
// decoding. This is only set when Init() is called (not InitHandle()).
blockData []byte
Expand Down Expand Up @@ -1300,6 +1303,7 @@ func (i *DataBlockIter) Init(
i.tieringConfig = tieringConfig
i.tieringSpanIDs = UnsafeUints{}
i.tieringAttributes = UnsafeUints{}
i.secondaryBlobHandles = RawBytes{}
// Store block data for lazy tiering column decoding.
i.blockData = bd.Data()
return nil
Expand Down Expand Up @@ -1340,6 +1344,7 @@ func (i *DataBlockIter) InitHandle(
// from h.BlockData() when needed.
i.tieringSpanIDs = UnsafeUints{}
i.tieringAttributes = UnsafeUints{}
i.secondaryBlobHandles = RawBytes{}
i.blockData = nil
return nil
}
Expand Down Expand Up @@ -1553,8 +1558,12 @@ func (i *DataBlockIter) initTieringMetadata() bool {

// Decode the tiering columns.
bd := DecodeBlock(data, DataBlockCustomHeaderSize+i.keySchema.HeaderSize)
i.tieringSpanIDs = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringSpanID)
i.tieringAttributes = bd.Uints(len(i.keySchema.ColumnTypes) + dataBlockColumnTieringAttribute)
baseCol := len(i.keySchema.ColumnTypes)
i.tieringSpanIDs = bd.Uints(baseCol + dataBlockColumnTieringSpanID)
i.tieringAttributes = bd.Uints(baseCol + dataBlockColumnTieringAttribute)
if uint16(baseCol+dataBlockColumnMaxV2) <= bd.header.Columns {
i.secondaryBlobHandles = bd.RawBytes(baseCol + dataBlockColumnSecondaryBlobHandle)
}
return true
}

Expand All @@ -1563,10 +1572,16 @@ func (i *DataBlockIter) decodeMeta() base.KVMeta {
if !i.initTieringMetadata() {
return base.KVMeta{}
}
return base.KVMeta{
m := base.KVMeta{
TieringSpanID: base.TieringSpanID(i.tieringSpanIDs.At(i.row)),
TieringAttribute: base.TieringAttribute(i.tieringAttributes.At(i.row)),
}
if i.secondaryBlobHandles.Slices() > 0 {
if b := i.secondaryBlobHandles.At(i.row); len(b) > 0 {
m.SecondaryBlobHandle = b
}
}
return m
}

// Last implements the base.InternalIterator interface.
Expand Down
9 changes: 7 additions & 2 deletions sstable/colblk/data_block_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ func TestDataBlockIterWithMeta(t *testing.T) {
meta base.KVMeta
}{
{"a#1,SET", base.KVMeta{TieringSpanID: 42, TieringAttribute: 100}},
{"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200}},
{"b#2,SET", base.KVMeta{TieringSpanID: 43, TieringAttribute: 200, SecondaryBlobHandle: []byte("cold-handle-b")}},
{"c#3,SET", base.KVMeta{TieringSpanID: 0, TieringAttribute: 0}},
}
for _, kv := range testKVs {
ikey := base.ParseInternalKey(kv.key)
kcmp := w.KeyWriter.ComparePrev(ikey.UserKey)
w.Add(ikey, []byte("value"), block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false, kv.meta)
vp := block.InPlaceValuePrefix(kcmp.PrefixEqual())
if kv.meta.SecondaryBlobHandle != nil {
w.AddWithSecondaryBlobHandle(ikey, []byte("value"), vp, kcmp, false, kv.meta, kv.meta.SecondaryBlobHandle)
} else {
w.Add(ikey, []byte("value"), vp, kcmp, false, kv.meta)
}
}
block, _ := w.Finish(w.Rows(), w.Size())

Expand Down
7 changes: 4 additions & 3 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,16 +549,17 @@ func (w *RawColumnWriter) addWithBlobHandleInternal(
if secondaryHandle != nil {
// Dual-tier case: track hot-and-cold blob reference bytes.
w.tieringHistogramBlock.AddHotAndColdBlobRefBytes(uint64(hotHandle.ValueLen))
} else if w.opts.BlobReferenceTierGetter != nil {
} else if w.opts.internal.BlobReferenceTiers != nil {
// Single-tier case: track by tier.
var kindAndTier tieredmeta.KindAndTier
switch w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID) {
tier := w.opts.internal.BlobReferenceTiers[hotHandle.ReferenceID]
switch tier {
case base.HotTier:
kindAndTier = tieredmeta.SSTableBlobReferenceHotBytes
case base.ColdTier:
kindAndTier = tieredmeta.SSTableBlobReferenceColdBytes
default:
panic(errors.AssertionFailedf("unexpected tier %s", w.opts.BlobReferenceTierGetter(hotHandle.ReferenceID)))
panic(errors.AssertionFailedf("unexpected tier %s", tier))
}
w.tieringHistogramBlock.Add(kindAndTier, meta.TieringSpanID, meta.TieringAttribute, uint64(hotHandle.ValueLen))
}
Expand Down
Loading