Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2778,12 +2778,21 @@ func (t *txnContext) prepareDispatch() (preparedTxnDispatch, error) {
// non-string keys get a !redis|ttl| element written in the same transaction.
ttlElems := t.buildTTLElems()

// Derive a single redisDispatchTimeout-bounded context covering both the
// stream-deletion scans (paginated ScanAt/ExistsAt over StreamEntryScanPrefix)
// and the final Dispatch. Without this bound, buildStreamDeletionElems would
// run on the server-lifetime handlerContext, leaving its scans uncancellable
// from the request side on a slow disk or hot-key pathological commit.
ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout)
// Derive a single redisDispatchTimeout-bounded context covering both
// the stream-deletion scans (paginated ScanAt/ExistsAt over
// StreamEntryScanPrefix) and the final Dispatch. The parent is the
// txnContext's own ctx (the caller's dispatchCtx), not the server-
// lifetime handlerContext, so an outer cancellation (client
// disconnect, retryRedisWrite timeout) interrupts the prepare+dispatch
// promptly instead of waiting the full redisDispatchTimeout — same
// rationale as runTransactionWithDedup's reuseCtx fix on PR #887. The
// nil-guard falls back to handlerContext for callers that construct a
// txnContext without setting ctx (test fixtures).
parentCtx := t.ctx
if parentCtx == nil {
parentCtx = t.server.handlerContext()
}
ctx, cancel := context.WithTimeout(parentCtx, redisDispatchTimeout)

streamElems, err := t.buildStreamDeletionElems(ctx)
if err != nil {
Expand Down Expand Up @@ -3607,7 +3616,16 @@ type reusableListPush struct {
// fresh meta. Extracted from listPushCoreWithDedup to keep that closure
// under the cyclop / gocognit / nestif limits.
func (r *RedisServer) dispatchListPushReuse(ctx context.Context, key []byte, pending *reusableListPush) (newLen int64, drop bool, err error) {
commitTS := r.coordinator.Clock().Next()
// HLC-4 parity: persistence-grade commit_ts allocation must honor
// the physical-ceiling fence so a stale-leader window cannot mint a
// timestamp that collides with the successor's. dispatchExecReuse
// already uses NextFenced (PR #887 round 1); the two listPush dedup
// sites shipped before the HLC-4 migration and were missed in that
// pass — see PR #887 verdict for the noted gap.
commitTS, allocErr := r.coordinator.Clock().NextFenced()
if allocErr != nil {
return 0, false, errors.Wrap(allocErr, "redis list-push reuse: allocate commitTS")
}
_, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: pending.startTS,
Expand Down Expand Up @@ -3778,7 +3796,12 @@ func (r *RedisServer) listPushCoreWithDedup(ctx context.Context, key []byte, val
return err
}

commitTS := r.coordinator.Clock().Next()
// HLC-4 parity with prepareDispatch / dispatchExecReuse —
// see dispatchListPushReuse above for the rationale.
commitTS, allocErr := r.coordinator.Clock().NextFenced()
if allocErr != nil {
return errors.Wrap(allocErr, "redis list-push first-attempt: allocate commitTS")
}
ops, updatedMeta, err := buildFn(meta, key, values, commitTS, 0)
if err != nil {
return err
Expand Down
Loading