Skip to content

Commit a8befaa

Browse files
committed
adapter(dynamodb): keyless lease shares pre-pass ctx + concurrent cancel-on-first-error (#952)
Coderabbit Major round-4 on PR #952 — two implementation issues: Line 2291 — leaseReadKeyless re-armed the 5s budget leaseReadKeyless built its own context.WithTimeout(r.Context(), dynamoLeaseReadTimeout) on every call. When called from leaseCheckScan/leaseCheckQuery after a slow schema read had consumed most of the pre-pass leaseCtx, the keyless fallback got a fresh 5s — end-to-end the pre-pass could spend close to 10s. Fix: signature becomes leaseReadKeyless(w, leaseCtx); function uses the caller's ctx directly. Dropped the unused *http.Request parameter (the function never read it; the ctx-from-request pattern was the only reason it was there). All 5 production callers updated to pass their leaseCtx; the test call (TestDynamoDB_ScanLeaseFencesAllGroups) updated to pass req.Context(). Caller audit per loop directive: 6 sites updated. Line 4796 — TransactGetItems fan-out kept waiting after first failure The concurrent leaseReadGroupKeys waited for every goroutine before returning. On a degraded path one fast failure would not unwind the remaining LeaseReadForKeyThrough calls until their own ctx hit dynamoLeaseReadTimeout. Fix: derive a cancellable child ctx, pass it to all goroutines, cancel() on the first error. The select-with-default on errCh keeps the first-error semantics (rest dropped to preserve the single-response contract at the HTTP layer).
1 parent a9987d9 commit a8befaa

2 files changed

Lines changed: 28 additions & 11 deletions

File tree

adapter/dynamodb.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,9 +2227,16 @@ func (d *DynamoDBServer) scan(w http.ResponseWriter, r *http.Request) {
22272227
// client never cancels, and writes the same InternalServerError that getItem
22282228
// produces on lease failure. Returns false after writing an error response;
22292229
// the caller should simply return.
2230-
func (d *DynamoDBServer) leaseReadKeyless(w http.ResponseWriter, r *http.Request) bool {
2231-
leaseCtx, leaseCancel := context.WithTimeout(r.Context(), dynamoLeaseReadTimeout)
2232-
defer leaseCancel()
2230+
// leaseReadKeyless fences every group via the keyless all-groups lease check.
2231+
// `leaseCtx` MUST be the SAME context the pre-pass armed (it bounds the entire
2232+
// pre-pass — schema read + the lease that lands here — by dynamoLeaseReadTimeout
2233+
// total; coderabbit Major on PR #952 round-4). Creating a fresh
2234+
// context.WithTimeout(r.Context(), dynamoLeaseReadTimeout) here would re-arm
2235+
// the 5s budget per call, so a slow schema read followed by the keyless
2236+
// fallback could consume close to 10s end-to-end. Callers that do NOT have a
2237+
// pre-pass context must pass their own bounded ctx; r.Context() with the
2238+
// handler's own timeout-on-the-roundabout is the conservative choice.
2239+
func (d *DynamoDBServer) leaseReadKeyless(w http.ResponseWriter, leaseCtx context.Context) bool {
22332240
if err := kv.LeaseReadAllGroupsThrough(d.coordinator, leaseCtx); err != nil {
22342241
writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
22352242
return false
@@ -2260,7 +2267,7 @@ func (d *DynamoDBServer) leaseCheckScan(w http.ResponseWriter, r *http.Request,
22602267
// Transient/internal schema-read failure: fail closed by fencing every
22612268
// group. leaseReadKeyless writes the same InternalServerError on its
22622269
// own failure.
2263-
return d.leaseReadKeyless(w, r)
2270+
return d.leaseReadKeyless(w, leaseCtx)
22642271
}
22652272
if plan == queryLeaseSkip {
22662273
// Client-side validation problem (table not found, unknown index,
@@ -2288,7 +2295,7 @@ func (d *DynamoDBServer) leaseCheckScan(w http.ResponseWriter, r *http.Request,
22882295
return true
22892296
}
22902297
// Valid whole-table read: fence every group (fail closed).
2291-
return d.leaseReadKeyless(w, r)
2298+
return d.leaseReadKeyless(w, leaseCtx)
22922299
}
22932300

22942301
// projectionInvalid returns true when the ProjectionExpression cannot be
@@ -2404,7 +2411,7 @@ func (d *DynamoDBServer) leaseCheckQuery(w http.ResponseWriter, r *http.Request,
24042411
// keyless check (a strict superset of the single group this query
24052412
// would have routed to). leaseReadKeyless writes the same
24062413
// InternalServerError on its own failure.
2407-
return d.leaseReadKeyless(w, r)
2414+
return d.leaseReadKeyless(w, leaseCtx)
24082415
}
24092416
switch plan {
24102417
case queryLeaseSkip:
@@ -2420,7 +2427,7 @@ func (d *DynamoDBServer) leaseCheckQuery(w http.ResponseWriter, r *http.Request,
24202427
case queryLeaseAllGroups:
24212428
// GSI / whole-table query: a VALID read that spans multiple shards,
24222429
// so the keyless all-groups check is the correct fence (fail closed).
2423-
return d.leaseReadKeyless(w, r)
2430+
return d.leaseReadKeyless(w, leaseCtx)
24242431
case queryLeaseSingleGroup:
24252432
if _, err := kv.LeaseReadForKeyThrough(d.coordinator, leaseCtx, leaseKey); err != nil {
24262433
writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
@@ -2430,7 +2437,7 @@ func (d *DynamoDBServer) leaseCheckQuery(w http.ResponseWriter, r *http.Request,
24302437
default:
24312438
// Unreachable: queryLeaseKey only returns the three plans above. Fail
24322439
// closed via the all-groups fence rather than silently proceeding.
2433-
return d.leaseReadKeyless(w, r)
2440+
return d.leaseReadKeyless(w, leaseCtx)
24342441
}
24352442
}
24362443

@@ -4775,14 +4782,24 @@ func (d *DynamoDBServer) leaseReadGroupKeys(ctx context.Context, groupKeys [][]b
47754782
_, err := kv.LeaseReadForKeyThrough(d.coordinator, ctx, groupKeys[0])
47764783
return errors.WithStack(err)
47774784
}
4785+
// Derive a cancellable child so the first error cancels the sibling lease
4786+
// reads instead of letting them run out the full dynamoLeaseReadTimeout
4787+
// budget (coderabbit Major on PR #952 round-4). The siblings observe the
4788+
// cancellation via the LeaseReadForKeyThrough's own context check.
4789+
cancelCtx, cancel := context.WithCancel(ctx)
4790+
defer cancel()
47784791
errCh := make(chan error, len(groupKeys))
47794792
var wg sync.WaitGroup
47804793
for _, itemKey := range groupKeys {
47814794
wg.Add(1)
47824795
go func(k []byte) {
47834796
defer wg.Done()
4784-
if _, err := kv.LeaseReadForKeyThrough(d.coordinator, ctx, k); err != nil {
4785-
errCh <- err
4797+
if _, err := kv.LeaseReadForKeyThrough(d.coordinator, cancelCtx, k); err != nil {
4798+
select {
4799+
case errCh <- err:
4800+
cancel() // unwind the remaining goroutines on the first error.
4801+
default:
4802+
}
47864803
}
47874804
}(itemKey)
47884805
}

adapter/dynamodb_lease_fence_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestDynamoDB_ScanLeaseFencesAllGroups(t *testing.T) {
100100

101101
rec := httptest.NewRecorder()
102102
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(nil))
103-
require.True(t, server.leaseReadKeyless(rec, req))
103+
require.True(t, server.leaseReadKeyless(rec, req.Context()))
104104

105105
fenced := coord.fencedGroupSet()
106106
require.Contains(t, fenced, multiGroupDefaultID,

0 commit comments

Comments
 (0)