diff --git a/adapter/redis.go b/adapter/redis.go index 170f0fa7..5f132733 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -2778,12 +2778,21 @@ func (t *txnContext) prepareDispatch() (preparedTxnDispatch, error) { // non-string keys get a !redis|ttl| element written in the same transaction. ttlElems := t.buildTTLElems() - // Derive a single redisDispatchTimeout-bounded context covering both the - // stream-deletion scans (paginated ScanAt/ExistsAt over StreamEntryScanPrefix) - // and the final Dispatch. Without this bound, buildStreamDeletionElems would - // run on the server-lifetime handlerContext, leaving its scans uncancellable - // from the request side on a slow disk or hot-key pathological commit. - ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout) + // Derive a single redisDispatchTimeout-bounded context covering both + // the stream-deletion scans (paginated ScanAt/ExistsAt over + // StreamEntryScanPrefix) and the final Dispatch. The parent is the + // txnContext's own ctx (the caller's dispatchCtx), not the server- + // lifetime handlerContext, so an outer cancellation (client + // disconnect, retryRedisWrite timeout) interrupts the prepare+dispatch + // promptly instead of waiting the full redisDispatchTimeout. Symmetric + // with the reuseCtx threading in runTransactionWithDedup. The nil-guard + // falls back to handlerContext for callers that construct a txnContext + // without setting ctx (test fixtures). + parentCtx := t.ctx + if parentCtx == nil { + parentCtx = t.server.handlerContext() + } + ctx, cancel := context.WithTimeout(parentCtx, redisDispatchTimeout) streamElems, err := t.buildStreamDeletionElems(ctx) if err != nil { @@ -3273,7 +3282,7 @@ func (r *RedisServer) runTransactionWithDedup(queue []redcon.Command) ([]redisRe // fresh 10 s budget elapses. The earlier "fresh ctx from // handlerContext" pattern (noted in design doc §M3) was strictly // more conservative but wasted resources on a disconnected - // client — see PR #887 review. + // client. reuseCtx, reuseCancel := context.WithTimeout(dispatchCtx, redisDispatchTimeout) defer reuseCancel() res, drop, dispErr := r.dispatchExecReuse(reuseCtx, pending) @@ -3607,7 +3616,17 @@ type reusableListPush struct { // fresh meta. Extracted from listPushCoreWithDedup to keep that closure // under the cyclop / gocognit / nestif limits. func (r *RedisServer) dispatchListPushReuse(ctx context.Context, key []byte, pending *reusableListPush) (newLen int64, drop bool, err error) { - commitTS := r.coordinator.Clock().Next() + // HLC-4 parity: persistence-grade commit_ts allocation must honor + // the physical-ceiling fence so a stale-leader window cannot mint a + // timestamp that collides with the successor's. The error path + // returns ErrCeilingExpired which isRetryableRedisTxnErr classifies + // as non-retryable, so it exits retryRedisWrite directly to the + // client — same shape as the other persistence-grade Next call + // sites in this file. + commitTS, allocErr := r.coordinator.Clock().NextFenced() + if allocErr != nil { + return 0, false, errors.Wrap(allocErr, "redis list-push reuse: allocate commitTS") + } _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ IsTxn: true, StartTS: pending.startTS, @@ -3778,7 +3797,12 @@ func (r *RedisServer) listPushCoreWithDedup(ctx context.Context, key []byte, val return err } - commitTS := r.coordinator.Clock().Next() + // HLC-4 parity with prepareDispatch / dispatchExecReuse — + // see dispatchListPushReuse above for the rationale. + commitTS, allocErr := r.coordinator.Clock().NextFenced() + if allocErr != nil { + return errors.Wrap(allocErr, "redis list-push first-attempt: allocate commitTS") + } ops, updatedMeta, err := buildFn(meta, key, values, commitTS, 0) if err != nil { return err