Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9e7454c
Replace SubscribeEvictions with IRecordTriggers.OnEvict for heap-size…
badrishc Apr 17, 2026
1717f9b
fix(object-store): eliminate redundant Clone() in ObjectStore PostCop…
badrishc Apr 17, 2026
eeb6a3b
refactor(object-store): split PCU heap tracking into creation + dispo…
badrishc Apr 17, 2026
58dd0ac
Remove OnDisposeValueObject and disposer lambdas
badrishc Apr 17, 2026
3f446e4
Add OnDisposeDiskRecord trigger for symmetric DiskLogRecord disposal
badrishc Apr 17, 2026
d6d1376
Add RecordLifecycleTests covering IRecordTriggers call counts and dis…
badrishc Apr 17, 2026
52786fe
fix(unified-store): remove legacy ClearSourceValueObject branch from PCU
badrishc Apr 17, 2026
a6ce3d1
style: fix trailing newline in RecordLifecycleTests.cs
badrishc Apr 17, 2026
2e89f39
fix: handle all heap types in OnDispose(Deleted) and harden DiskLogRe…
badrishc Apr 17, 2026
0358eea
fix: visit sealed source records during eviction to prevent overflow …
badrishc Apr 18, 2026
61089a1
refactor: move tombstone after OnDispose so triggers see pre-tombston…
badrishc Apr 18, 2026
89cbce5
refactor: internalize CopyUpdated value-object accounting into Tsavorite
badrishc Apr 18, 2026
f96ee7e
chore: clean up stale comments across PR
badrishc Apr 18, 2026
fe94d95
fix: use value-only heap in OnDispose(Deleted) to prevent key-overflo…
badrishc Apr 18, 2026
3b96c8c
refactor: move all destruction-side heap accounting into Tsavorite
badrishc Apr 18, 2026
3d952c1
fix: track key overflow at CAS success sites for balanced accounting
badrishc Apr 18, 2026
2e61bea
refactor: move creation-site value heap tracking from ISF into Tsavorite
badrishc Apr 18, 2026
21b98bc
fix: use HeapMemorySize instead of TotalSize in GetValueHeapMemorySize
badrishc Apr 18, 2026
5134791
chore: remove cacheSizeTracker from GarnetRecordTriggers
badrishc Apr 18, 2026
2104e39
chore: simplify CallOnEvict to parameterless property, polish comments
badrishc Apr 18, 2026
290cfb5
chore: make CallOnFlush/CallOnEvict/CallOnDiskRead default to false v…
badrishc Apr 18, 2026
4e8770c
perf: skip GetValueHeapMemorySize for inline values in MainStore IPU
badrishc Apr 18, 2026
0c324e6
fix: add +value for in-chain revivification and +key for tombstoned r…
badrishc Apr 18, 2026
4301c90
fix: account for elided/freelist heap and prevent SubscribeEvictions …
badrishc Apr 18, 2026
285bf22
fix: apply sizeChange delta before HasRemoveKey early return in Objec…
badrishc Apr 18, 2026
39300fe
Move IPU/IPW heap-size tracking from ISF into Tsavorite; remove Opera…
badrishc Apr 19, 2026
d5c784f
fix: avoid double heap-decrement in ReinitializeExpiredRecord IPU path
badrishc Apr 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libs/cluster/Session/RespClusterMigrateCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ void Process(BasicGarnetApi basicGarnetApi, byte[] input, bool replaceOption, bo
if (replaceOption || !Exists(keySlice))
_ = basicGarnetApi.SET(in diskLogRecord);

storeWrapper.storeFunctions.OnDisposeDiskRecord(ref diskLogRecord, DisposeReason.DeserializedFromDisk);
diskLogRecord.Dispose();
diskLogRecord = default; // prevent double-trigger in finally
}
else
{
Expand All @@ -198,7 +200,11 @@ void Process(BasicGarnetApi basicGarnetApi, byte[] input, bool replaceOption, bo
}
finally
{
diskLogRecord.Dispose();
if (diskLogRecord.IsSet)
{
storeWrapper.storeFunctions.OnDisposeDiskRecord(ref diskLogRecord, DisposeReason.DeserializedFromDisk);
diskLogRecord.Dispose();
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion libs/cluster/Session/RespClusterReplicationCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ private bool NetworkClusterSync(out bool invalidParameters)

diskLogRecord = DiskLogRecord.Deserialize(recordSpan, storeWrapper.GarnetObjectSerializer, transientObjectIdMap, storeWrapper.storeFunctions);
_ = basicGarnetApi.SET(in diskLogRecord);
storeWrapper.storeFunctions.OnDisposeDiskRecord(ref diskLogRecord, DisposeReason.DeserializedFromDisk);
diskLogRecord.Dispose();
diskLogRecord = default; // prevent double-trigger in catch
}
else
{
Expand All @@ -549,7 +551,11 @@ private bool NetworkClusterSync(out bool invalidParameters)
catch
{
// Dispose the diskLogRecord if there was an exception in SET
diskLogRecord.Dispose();
if (diskLogRecord.IsSet)
{
storeWrapper.storeFunctions.OnDisposeDiskRecord(ref diskLogRecord, DisposeReason.DeserializedFromDisk);
diskLogRecord.Dispose();
}
throw;
}

Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ private TsavoriteKV<StoreFunctions, StoreAllocator> CreateStore(int dbId, IClust
var store = new TsavoriteKV<StoreFunctions, StoreAllocator>(kvSettings
, Tsavorite.core.StoreFunctions.Create(new GarnetKeyComparer(),
() => new GarnetObjectSerializer(customCommandManager),
new GarnetRecordTriggers(cacheSizeTracker))
new GarnetRecordTriggers())
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));

if (kvSettings.LogMemorySize > 0 || kvSettings.ReadCacheMemorySize > 0)
Expand Down
4 changes: 1 addition & 3 deletions libs/server/Custom/CustomObjectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ public sealed override void DoSerialize(BinaryWriter writer)

/// <inheritdoc />
public sealed override bool Operate(ref ObjectInput input, ref ObjectOutput output,
byte respProtocolVersion, out long sizeChange)
byte respProtocolVersion)
{
sizeChange = 0;

switch (input.header.cmd)
{
// Scan Command
Expand Down
7 changes: 1 addition & 6 deletions libs/server/Objects/Hash/HashObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,8 @@ public override void Dispose() { }
public override GarnetObjectBase Clone() => new HashObject(hash, expirationTimes, expirationQueue, HeapMemorySize);

/// <inheritdoc />
public override bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion, out long memorySizeChange)
public override bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
memorySizeChange = 0;

if (input.header.type != GarnetObjectType.Hash)
{
//Indicates when there is an incorrect type
Expand All @@ -194,7 +192,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output, byt
return true;
}

var previousMemorySize = HeapMemorySize;
switch (input.header.HashOp)
{
case HashOperation.HSET:
Expand Down Expand Up @@ -261,8 +258,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output, byt
throw new GarnetException($"Unsupported operation {input.header.HashOp} in HashObject.Operate");
}

memorySizeChange = HeapMemorySize - previousMemorySize;

if (hash.Count == 0)
output.OutputFlags |= ObjectOutputFlags.RemoveKey;

Expand Down
7 changes: 1 addition & 6 deletions libs/server/Objects/List/ListObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ public override void Dispose() { }

/// <inheritdoc />
public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
byte respProtocolVersion, out long memorySizeChange)
byte respProtocolVersion)
{
memorySizeChange = 0;

if (input.header.type != GarnetObjectType.List)
{
// Indicates an incorrect type of key
Expand All @@ -140,7 +138,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
return true;
}

var previousMemorySize = HeapMemorySize;
switch (input.header.ListOp)
{
case ListOperation.LPUSH:
Expand Down Expand Up @@ -186,8 +183,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
throw new GarnetException($"Unsupported operation {input.header.ListOp} in ListObject.Operate");
}

memorySizeChange = HeapMemorySize - previousMemorySize;

if (list.Count == 0)
output.OutputFlags |= ObjectOutputFlags.RemoveKey;

Expand Down
7 changes: 1 addition & 6 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ public override void Dispose() { }

/// <inheritdoc />
public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
byte respProtocolVersion, out long memorySizeChange)
byte respProtocolVersion)
{
memorySizeChange = 0;

if (input.header.type != GarnetObjectType.Set)
{
// Indicates an incorrect type of key
Expand All @@ -134,7 +132,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
return true;
}

var prevMemorySize = HeapMemorySize;
switch (input.header.SetOp)
{
case SetOperation.SADD:
Expand Down Expand Up @@ -168,8 +165,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
throw new GarnetException($"Unsupported operation {input.header.SetOp} in SetObject.Operate");
}

memorySizeChange = HeapMemorySize - prevMemorySize;

if (Set.Count == 0)
output.OutputFlags |= ObjectOutputFlags.RemoveKey;

Expand Down
7 changes: 1 addition & 6 deletions libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,8 @@ public override void Dispose() { }

/// <inheritdoc />
public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
byte respProtocolVersion, out long memorySizeChange)
byte respProtocolVersion)
{
memorySizeChange = 0;

var header = input.header;
if (header.type != GarnetObjectType.SortedSet)
{
Expand All @@ -329,7 +327,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
return true;
}

var prevMemorySize = HeapMemorySize;
var op = header.SortedSetOp;
switch (op)
{
Expand Down Expand Up @@ -415,8 +412,6 @@ public override bool Operate(ref ObjectInput input, ref ObjectOutput output,
throw new GarnetException($"Unsupported operation {op} in SortedSetObject.Operate");
}

memorySizeChange = HeapMemorySize - prevMemorySize;

if (sortedSetDict.Count == 0)
output.OutputFlags |= ObjectOutputFlags.RemoveKey;

Expand Down
2 changes: 1 addition & 1 deletion libs/server/Objects/Types/GarnetObjectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected GarnetObjectBase(BinaryReader reader, long heapMemorySize)
}

/// <inheritdoc />
public abstract bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion, out long sizeChange);
public abstract bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion);

/// <summary>
/// Serialize to given writer
Expand Down
3 changes: 1 addition & 2 deletions libs/server/Objects/Types/IGarnetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ public interface IGarnetObject : IHeapObject
/// </summary>
/// <param name="input"></param>
/// <param name="output"></param>
/// <param name="sizeChange"></param>
/// <returns></returns>
bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion, out long sizeChange);
bool Operate(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion);

/// <summary>
/// Scan the items of the collection
Expand Down
44 changes: 4 additions & 40 deletions libs/server/Storage/Functions/GarnetRecordTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,12 @@
namespace Garnet.server
{
/// <summary>
/// Record lifecycle triggers for Garnet's unified store. Handles per-record cleanup
/// on delete via <see cref="IRecordTriggers.OnDispose"/>.
/// Record lifecycle triggers for Garnet's unified store. Available for app-level
/// resource cleanup (e.g. <see cref="System.IDisposable.Dispose"/> on value objects
/// holding external resources). Currently a no-op — Garnet's IHeapObject implementations
/// (Hash/List/Set/SortedSet) hold no external resources.
/// </summary>
public readonly struct GarnetRecordTriggers : IRecordTriggers
{
/// <summary>
/// Cache size tracker for heap size accounting on delete.
/// Created before the store and initialized after via <see cref="CacheSizeTracker.Initialize"/>.
/// </summary>
internal readonly CacheSizeTracker cacheSizeTracker;

/// <summary>
/// Creates a GarnetRecordTriggers with a cache size tracker.
/// </summary>
public GarnetRecordTriggers(CacheSizeTracker cacheSizeTracker)
{
this.cacheSizeTracker = cacheSizeTracker;
}

/// <inheritdoc/>
public bool CallOnFlush => false;

/// <inheritdoc/>
public bool CallOnEvict => false;

/// <inheritdoc/>
public bool CallOnDiskRead => false;

/// <inheritdoc/>
public void OnDisposeValueObject(IHeapObject valueObject, DisposeReason reason)
{
// Heap object disposal is handled by ClearHeapFields in ObjectAllocatorImpl
}

/// <inheritdoc/>
public void OnDispose(ref LogRecord logRecord, DisposeReason reason)
{
// Handle heap objects: update cache size tracker on delete
if (logRecord.Info.ValueIsObject && reason == DisposeReason.Deleted)
{
cacheSizeTracker?.AddHeapSize(-logRecord.ValueObject.HeapMemorySize);
}
}
}
}
1 change: 1 addition & 0 deletions libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,7 @@ public readonly bool PostCopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLo
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF

return true;
}

Expand Down
33 changes: 10 additions & 23 deletions libs/server/Storage/Functions/ObjectStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public bool InitialUpdater(ref LogRecord logRecord, in RecordSizeInfo sizeInfo,
if ((byte)type < CustomCommandManager.CustomTypeIdStartOffset)
{
value = GarnetObject.Create(type);
_ = value.Operate(ref input, ref output, functionsState.respProtocolVersion, out _);
_ = value.Operate(ref input, ref output, functionsState.respProtocolVersion);
_ = logRecord.TrySetValueObjectAndPrepareOptionals(value, in sizeInfo);
return true;
}
Expand Down Expand Up @@ -82,8 +82,6 @@ public void PostInitialUpdater(ref LogRecord dstLogRecord, in RecordSizeInfo siz
input.header.SetExpiredFlag();
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
}

functionsState.cacheSizeTracker?.AddHeapSize(dstLogRecord.ValueObject.HeapMemorySize);
}

/// <inheritdoc />
Expand All @@ -96,34 +94,29 @@ public bool InPlaceUpdater(ref LogRecord logRecord, ref ObjectInput input, ref O
return true;
}

if (InPlaceUpdaterWorker(ref logRecord, ref input, ref output, ref rmwInfo, out long sizeChange))
if (InPlaceUpdaterWorker(ref logRecord, ref input, ref output, ref rmwInfo))
{
if (!logRecord.Info.Modified)
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
functionsState.cacheSizeTracker?.AddHeapSize(sizeChange);
return true;
}
return false;
}

bool InPlaceUpdaterWorker(ref LogRecord logRecord, ref ObjectInput input, ref ObjectOutput output, ref RMWInfo rmwInfo, out long sizeChange)
bool InPlaceUpdaterWorker(ref LogRecord logRecord, ref ObjectInput input, ref ObjectOutput output, ref RMWInfo rmwInfo)
{
sizeChange = 0;

// Expired data
if (logRecord.Info.HasExpiration && input.header.CheckExpiry(logRecord.Expiration))
{
// Heap disposal and cache size tracking are handled by
// OnDispose(Deleted) in InternalRMW when processing ExpireAndResume.
rmwInfo.Action = RMWAction.ExpireAndResume;
return false;
}

if ((byte)input.header.type < CustomCommandManager.CustomTypeIdStartOffset)
{
var operateSuccessful = ((IGarnetObject)logRecord.ValueObject).Operate(ref input, ref output, functionsState.respProtocolVersion, out sizeChange);
var operateSuccessful = ((IGarnetObject)logRecord.ValueObject).Operate(ref input, ref output, functionsState.respProtocolVersion);
if (output.HasWrongType)
return true;
if (output.HasRemoveKey)
Expand All @@ -132,8 +125,6 @@ bool InPlaceUpdaterWorker(ref LogRecord logRecord, ref ObjectInput input, ref Ob
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog;
// Heap disposal and cache size tracking are handled by
// OnDispose(Deleted) in the ExpireAndStop path of InternalRMW.
rmwInfo.Action = RMWAction.ExpireAndStop;
return false;
}
Expand Down Expand Up @@ -190,11 +181,11 @@ public bool CopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, ref
public bool PostCopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, ref LogRecord dstLogRecord, in RecordSizeInfo sizeInfo, ref ObjectInput input, ref ObjectOutput output, ref RMWInfo rmwInfo)
where TSourceLogRecord : ISourceLogRecord
{
// We're performing the object update here (and not in CopyUpdater) so that we are guaranteed that
// the record was CASed into the hash chain before it gets modified
var value = Unsafe.As<IGarnetObject>(srcLogRecord.ValueObject.Clone());
var oldValueSize = srcLogRecord.ValueObject.HeapMemorySize;
_ = dstLogRecord.TrySetValueObject(value);
// We perform the object update here (and not in CopyUpdater) so that we are guaranteed that the record
// was CASed into the hash chain before it gets modified. Tsavorite's CacheSerializedObjectData (called
// by InternalRMW right before PCU, for both memory and disk sources) has already cloned src.ValueObject
// into dstLogRecord. Reuse it directly — do not clone again.
var value = Unsafe.As<IGarnetObject>(dstLogRecord.ValueObject);

// Do not set actually set dstLogRecord.Expiration until we know it is a command for which we allocated length in the LogRecord for it.
// TODO: Object store ETags
Expand All @@ -203,7 +194,7 @@ public bool PostCopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRecord,

if ((byte)input.header.type < CustomCommandManager.CustomTypeIdStartOffset)
{
value.Operate(ref input, ref output, functionsState.respProtocolVersion, out _);
value.Operate(ref input, ref output, functionsState.respProtocolVersion);
if (output.HasWrongType)
return true;
if (output.HasRemoveKey)
Expand Down Expand Up @@ -241,10 +232,6 @@ public bool PostCopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRecord,

sizeInfo.AssertOptionalsIfSet(dstLogRecord.Info);

// If oldValue has been set to null, subtract its size from the tracked heap size
var sizeAdjustment = rmwInfo.ClearSourceValueObject ? value.HeapMemorySize - oldValueSize : value.HeapMemorySize;
functionsState.cacheSizeTracker?.AddHeapSize(sizeAdjustment);

if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog; // Mark that we need to write to AOF
return true;
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Functions/ObjectStore/ReadMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public bool Reader<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, ref Objec
var garnetObject = (IGarnetObject)srcLogRecord.ValueObject;
if ((byte)input.header.type < CustomCommandManager.CustomTypeIdStartOffset)
{
var opResult = garnetObject.Operate(ref input, ref output, functionsState.respProtocolVersion, out _);
var opResult = garnetObject.Operate(ref input, ref output, functionsState.respProtocolVersion);
if (output.HasWrongType)
return true;

Expand Down
Loading
Loading