Skip to content

Commit c21330b

Browse files
authored
Merge branch 'main' into feat/redis-reduce-latency
2 parents c13ce6f + 613d308 commit c21330b

8 files changed

Lines changed: 350 additions & 41 deletions

File tree

adapter/redis.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ type RedisServer struct {
275275
// It eliminates TTL write conflicts from concurrent Lua script executions.
276276
ttlBuffer *TTLBuffer
277277
ttlFlushInterval time.Duration
278+
// compactor is the background DeltaCompactor for this node. When set,
279+
// urgent compaction is triggered on ErrDeltaScanTruncated to unblock
280+
// reads on hot keys faster than the regular compaction interval.
281+
compactor *DeltaCompactor
278282

279283
route map[string]func(conn redcon.Conn, cmd redcon.Command)
280284
}
@@ -287,6 +291,14 @@ func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisSe
287291
}
288292
}
289293

294+
// WithRedisCompactor wires a DeltaCompactor to the RedisServer so that urgent
295+
// single-key compaction can be triggered when ErrDeltaScanTruncated is hit.
296+
func WithRedisCompactor(c *DeltaCompactor) RedisServerOption {
297+
return func(r *RedisServer) {
298+
r.compactor = c
299+
}
300+
}
301+
290302
// WithRedisRequestObserver enables Prometheus-compatible request metrics.
291303
func WithRedisRequestObserver(observer monitoring.RedisRequestObserver) RedisServerOption {
292304
return func(r *RedisServer) {
@@ -485,6 +497,14 @@ func (r *RedisServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
485497
return r.readTracker.Pin(ts)
486498
}
487499

500+
// triggerUrgentCompaction signals the DeltaCompactor to immediately compact
501+
// the given key, bypassing the regular interval. No-op when no compactor is wired.
502+
func (r *RedisServer) triggerUrgentCompaction(typeName string, key []byte) {
503+
if r.compactor != nil {
504+
r.compactor.TriggerUrgentCompaction(typeName, key)
505+
}
506+
}
507+
488508
func (r *RedisServer) dispatchCommand(conn redcon.Conn, name string, handler func(redcon.Conn, redcon.Command), cmd redcon.Command, start time.Time) {
489509
switch {
490510
case r.requestObserver != nil:
@@ -2785,7 +2805,8 @@ func (r *RedisServer) buildListPopElems(ctx context.Context, key []byte, meta st
27852805
claimEnd = meta.Tail
27862806
}
27872807
// Capacity: n claim keys + n Del(item) for found items + 1 for the delta key appended by caller.
2788-
elems := make([]*kv.Elem[kv.OP], 0, n+int64(len(kvps))+listPopDeltaOverhead)
2808+
// n is bounded by maxWideColumnItems (100_000) so the int conversion is safe.
2809+
elems := make([]*kv.Elem[kv.OP], 0, int(n)+len(kvps)+listPopDeltaOverhead)
27892810
for seq := claimStart; seq < claimEnd; seq++ {
27902811
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: store.ListClaimKey(key, seq), Value: []byte{}})
27912812
}

adapter/redis_compat_helpers.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,9 @@ func (r *RedisServer) resolveListMeta(ctx context.Context, key []byte, readTS ui
743743
return d.LenDelta, errors.WithStack(unmarshalErr)
744744
})
745745
if err != nil {
746+
if errors.Is(err, ErrDeltaScanTruncated) {
747+
r.triggerUrgentCompaction("list", key)
748+
}
746749
return store.ListMeta{}, false, err
747750
}
748751
baseMeta.Len += lenSum
@@ -798,7 +801,7 @@ func (r *RedisServer) resolveCollectionLen(
798801

799802
// resolveHashMeta aggregates the base hash metadata with all uncompacted Delta keys.
800803
func (r *RedisServer) resolveHashMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
801-
return r.resolveCollectionLen(
804+
n, exists, err := r.resolveCollectionLen(
802805
ctx, key, readTS,
803806
store.HashMetaKey(key),
804807
store.HashMetaDeltaScanPrefix(key),
@@ -812,11 +815,15 @@ func (r *RedisServer) resolveHashMeta(ctx context.Context, key []byte, readTS ui
812815
},
813816
"resolveHashMeta: clamping negative Len to 0",
814817
)
818+
if errors.Is(err, ErrDeltaScanTruncated) {
819+
r.triggerUrgentCompaction("hash", key)
820+
}
821+
return n, exists, err
815822
}
816823

817824
// resolveSetMeta aggregates the base set metadata with all uncompacted Delta keys.
818825
func (r *RedisServer) resolveSetMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
819-
return r.resolveCollectionLen(
826+
n, exists, err := r.resolveCollectionLen(
820827
ctx, key, readTS,
821828
store.SetMetaKey(key),
822829
store.SetMetaDeltaScanPrefix(key),
@@ -830,11 +837,15 @@ func (r *RedisServer) resolveSetMeta(ctx context.Context, key []byte, readTS uin
830837
},
831838
"resolveSetMeta: clamping negative Len to 0",
832839
)
840+
if errors.Is(err, ErrDeltaScanTruncated) {
841+
r.triggerUrgentCompaction("set", key)
842+
}
843+
return n, exists, err
833844
}
834845

835846
// resolveZSetMeta aggregates the base sorted set metadata with all uncompacted Delta keys.
836847
func (r *RedisServer) resolveZSetMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
837-
return r.resolveCollectionLen(
848+
n, exists, err := r.resolveCollectionLen(
838849
ctx, key, readTS,
839850
store.ZSetMetaKey(key),
840851
store.ZSetMetaDeltaScanPrefix(key),
@@ -848,4 +859,8 @@ func (r *RedisServer) resolveZSetMeta(ctx context.Context, key []byte, readTS ui
848859
},
849860
"resolveZSetMeta: clamping negative Len to 0",
850861
)
862+
if errors.Is(err, ErrDeltaScanTruncated) {
863+
r.triggerUrgentCompaction("zset", key)
864+
}
865+
return n, exists, err
851866
}

0 commit comments

Comments
 (0)