Skip to content

Commit 688ba8a

Browse files
authored
adapter: wrap DynamoDB query/scan/transactGetItems in LeaseRead (#952)
## Summary Part of #557. Wraps the remaining DynamoDB read handlers in a quorum-freshness `LeaseRead` check, following the pattern PR #549 established for `getItem`. Previously these handlers read local `LastCommitTS`/snapshot state with **no quorum check at all**, so a partitioned-but-not-yet-stepped-down stale leader could serve stale reads. Each handler now performs one lease check (cheap atomic-load fast path within the lease window; one `LinearizableRead` on a lease miss, after which the lease is refreshed for the rest of the window) before resolving its read timestamp. The Redis slice of #557 is deliberately left for a follow-up — this PR touches only `adapter/dynamodb.go`. ### Per-handler placement - **`query`** — keyless `LeaseRead(ctx)` (`leaseReadKeyless` helper) before `queryItems` samples `readTS`. A Query scans a key range that can span shards in a multi-group deployment, and the owning shard can't be resolved in the handler without duplicating `prepareReadSchema`/`resolveQueryCondition`, so the keyless check (which anchors the cluster freshness bound) is the correct, cheaper call. - **`scan`** — keyless `LeaseRead(ctx)` (same helper) before `scanItems` samples `readTS`. A Scan reads the whole table and spans every shard holding its items, so keyless is the right choice (matches the issue's guidance for keyless/multi-shard ops). - **`transactGetItems`** — per-key `LeaseReadForKey`, deduplicated by item key, run in `leaseCheckTransactGetItems` **before** `nextTxnReadTS()` resolves the single snapshot timestamp. Item keys are resolved at a tentative schema timestamp purely to route the lease check; the single-snapshot-ts semantics (one `readTS` shared by all items) are unchanged. Malformed items are skipped in the pre-pass so the existing validation/error mapping still surfaces downstream identically. Within the lease window, same-shard keys hit the cheap fast path, so per-key checks are effectively free. ### Timeout decision Every wrapped handler runs the lease check under a bounded `context.WithTimeout(r.Context(), dynamoLeaseReadTimeout)` with `defer cancel()` — the exact constant (`5s`) and structure `getItem` uses after PR #549 — so a stalled Raft cannot hang a handler indefinitely when the client never cancels. ### Note on `batchGetItem` The issue lists `batchGetItem` in scope, but **there is no `BatchGetItem` handler in this codebase** (no `batchGetItemTarget`, no registration, no implementation). There was nothing to wrap; the multi-key precedent it asked about is implemented in `transactGetItems` (per-key `LeaseReadForKey`, deduped). If `BatchGetItem` is added later, it should reuse the same `leaseCheckTransactGetItems`-style per-key dedup pattern. ## Behavior change Adds a quorum-freshness check to `query`/`scan`/`transactGetItems`. Read results and error mapping are otherwise identical; lease-read failure surfaces as the same `500 InternalServerError` (`dynamoErrInternal`) that `getItem` produces. ## Risk Low. The only new failure mode is a `500` when the lease check fails (quorum loss), which is strictly more correct than serving a stale read. No write paths, FSM, or snapshot semantics touched. The added per-request `LeaseRead` is amortized by the lease window. ## Test evidence - `go test -race -run TestDynamoDB ./adapter/` → ok (15s) - Targeted: `TestDynamoDB_Query_LeaseRead`, `TestDynamoDB_Scan_LeaseRead`, `TestDynamoDB_TransactGetItems_LeaseRead` (each asserts (a) success with a healthy lease and (b) lease failure → 500 `InternalServerError`, mirroring `getItem`'s error class) plus existing `TestDynamoDB_TransactGetItems*` → ok with `-race`. - `golangci-lint --config=.golangci.yaml run ./adapter/...` → 0 issues. - `go vet ./adapter/`, `gofmt -l` → clean. - Full `go test -race ./adapter/` exceeds the default 10-minute package timeout (the suite spans Redis/S3/SQS/gRPC/etc., unrelated to this change); it passes under an extended `-timeout`. New tests inject lease-read failures via a `failLeaseReads` toggle added to the existing `testCoordinatorWrapper`, and drive the failure path over raw HTTP so the AWS SDK does not retry the `500`. ## Self-review 1. **Data loss** — None. Read-only handlers; no propose/apply, FSM, snapshot, TTL, or compaction paths touched. No `return nil`-after-error introduced; lease failure returns an explicit `500`. 2. **Concurrency / distributed failures** — Lease check is bounded by `dynamoLeaseReadTimeout` with `defer cancel()`, so a leader change / partition can't hang a handler. `transactGetItems` keeps its single shared `readTS`; the lease pre-pass runs before the timestamp is sampled, so a leadership flip mid-pass only causes a lease miss (→ `LinearizableRead`) or a `500`, never a torn snapshot. Ran `go test -race` on the dynamo suite. 3. **Performance** — One `LeaseRead` per request, amortized by the lease window (atomic-load fast path; one `LinearizableRead` per lease miss, then refreshed). `transactGetItems` dedups by item key and same-shard keys hit the fast path, so per-key checks add negligible cost; no extra Raft round-trips per HLC tick. The tentative schema load in the transact pre-pass is a cheap, cached Pebble read. 4. **Data consistency** — `readTS` is sampled **after** the lease confirmation in every handler, so any commit that landed before confirmation is visible; this tightens the freshness bound without bypassing the leader-issued read pipeline. `transactGetItems` single-snapshot-ts semantics are preserved. 5. **Test coverage** — Added a success + lease-failure test for each wrapped handler, reusing the existing `testCoordinatorWrapper`/`createNode` harness and mirroring `getItem`'s error class on failure. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Read operations perform pre-read freshness checks to ensure consistent, up-to-date responses; queries and scans fallback to broader fencing when needed. * Transactional reads now confirm freshness across all affected groups before resolving a transaction snapshot. * Coordinators support multi-group lease fencing and per-group read collapsing for more robust multi-shard reads. * **Tests** * Extensive tests added covering lease/fencing, validation, skip-paths, and multi-group behaviors for query, scan, and transactional reads. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents f598868 + a8befaa commit 688ba8a

10 files changed

Lines changed: 2522 additions & 0 deletions

adapter/dynamodb.go

Lines changed: 612 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package adapter
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"net/http"
7+
"net/http/httptest"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
12+
"github.com/bootjp/elastickv/store"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// schemaDeadlineStore records, for the first GetAt on a DynamoDB table
17+
// meta key, whether the context carried a deadline. The lease-read
18+
// pre-pass resolves keys by reading the schema; if that read runs under
19+
// a context with no deadline, a stalled schema read can block the handler
20+
// indefinitely before the bounded lease phase begins (claude #952 issue #2).
21+
type schemaDeadlineStore struct {
22+
store.MVCCStore
23+
24+
metaKey []byte
25+
26+
// mu guards sawMetaGet/metaHadDeadline so the check-then-set in GetAt is
27+
// race-free if the handler ever issues concurrent schema reads.
28+
mu sync.Mutex
29+
sawMetaGet bool
30+
metaHadDeadline bool
31+
}
32+
33+
func (s *schemaDeadlineStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error) {
34+
if bytes.Equal(key, s.metaKey) {
35+
s.mu.Lock()
36+
if !s.sawMetaGet {
37+
s.sawMetaGet = true
38+
_, s.metaHadDeadline = ctx.Deadline()
39+
}
40+
s.mu.Unlock()
41+
}
42+
return s.MVCCStore.GetAt(ctx, key, ts)
43+
}
44+
45+
func (s *schemaDeadlineStore) observed() (sawMetaGet bool, metaHadDeadline bool) {
46+
s.mu.Lock()
47+
defer s.mu.Unlock()
48+
return s.sawMetaGet, s.metaHadDeadline
49+
}
50+
51+
// newSchemaDeadlineServer builds a DynamoDB server over a real MVCC store
52+
// fronted by schemaDeadlineStore, seeds one table schema + item, and
53+
// returns the server and the recording store.
54+
func newSchemaDeadlineServer(t *testing.T) (*DynamoDBServer, *schemaDeadlineStore) {
55+
t.Helper()
56+
57+
schema := &dynamoTableSchema{
58+
TableName: "t",
59+
Generation: 1,
60+
KeyEncodingVersion: dynamoOrderedKeyEncodingV2,
61+
AttributeDefinitions: map[string]string{
62+
"pk": "S",
63+
"sk": "S",
64+
},
65+
PrimaryKey: dynamoKeySchema{HashKey: "pk", RangeKey: "sk"},
66+
}
67+
68+
recording := &schemaDeadlineStore{
69+
MVCCStore: store.NewMVCCStore(),
70+
metaKey: dynamoTableMetaKey(schema.TableName),
71+
}
72+
writer := newDynamoFixtureWriter(t, recording.MVCCStore)
73+
writer.writeSchema(schema)
74+
writer.writeItem(schema, map[string]attributeValue{
75+
"pk": newStringAttributeValue("tenant"),
76+
"sk": newStringAttributeValue("0001"),
77+
})
78+
79+
server := NewDynamoDBServer(nil, recording, &stubAdapterCoordinator{})
80+
return server, recording
81+
}
82+
83+
// noDeadlineRequest returns a request whose context has NO deadline, so the
84+
// only way the schema pre-fetch can observe a deadline is via the bounded
85+
// leaseCtx the handler must establish before resolving keys.
86+
func noDeadlineRequest(t *testing.T) *http.Request {
87+
t.Helper()
88+
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(nil))
89+
_, hasDeadline := req.Context().Deadline()
90+
require.False(t, hasDeadline, "test precondition: request context must have no deadline")
91+
return req
92+
}
93+
94+
// TestDynamoDB_LeaseCheckQuery_SchemaReadBounded asserts the base-table
95+
// Query lease pre-pass resolves the routing key under the bounded leaseCtx,
96+
// so a stalled schema read cannot block past dynamoLeaseReadTimeout before
97+
// the lease phase begins (claude #952 issue #2).
98+
func TestDynamoDB_LeaseCheckQuery_SchemaReadBounded(t *testing.T) {
99+
t.Parallel()
100+
101+
server, recording := newSchemaDeadlineServer(t)
102+
103+
in := queryInput{
104+
TableName: "t",
105+
KeyConditionExpression: "pk = :pk",
106+
ExpressionAttributeValues: map[string]attributeValue{
107+
":pk": newStringAttributeValue("tenant"),
108+
},
109+
}
110+
111+
rec := httptest.NewRecorder()
112+
require.True(t, server.leaseCheckQuery(rec, noDeadlineRequest(t), in))
113+
114+
sawMetaGet, metaHadDeadline := recording.observed()
115+
require.True(t, sawMetaGet, "lease pre-pass must read the table schema")
116+
require.True(t, metaHadDeadline,
117+
"queryLeaseKey schema read must run under the bounded leaseCtx, not the deadline-free request context")
118+
}
119+
120+
// TestDynamoDB_LeaseCheckTransactGetItems_SchemaReadBounded asserts the
121+
// TransactGetItems lease pre-pass resolves item keys under the bounded
122+
// leaseCtx, so a stalled schema read cannot block past dynamoLeaseReadTimeout
123+
// before the lease phase begins (claude #952 issue #2).
124+
func TestDynamoDB_LeaseCheckTransactGetItems_SchemaReadBounded(t *testing.T) {
125+
t.Parallel()
126+
127+
server, recording := newSchemaDeadlineServer(t)
128+
129+
in := transactGetItemsInput{
130+
TransactItems: []transactGetItem{
131+
{Get: &transactGetItemGet{
132+
TableName: "t",
133+
Key: map[string]attributeValue{
134+
"pk": newStringAttributeValue("tenant"),
135+
"sk": newStringAttributeValue("0001"),
136+
},
137+
}},
138+
},
139+
}
140+
141+
rec := httptest.NewRecorder()
142+
require.True(t, server.leaseCheckTransactGetItems(rec, noDeadlineRequest(t), in))
143+
144+
sawMetaGet, metaHadDeadline := recording.observed()
145+
require.True(t, sawMetaGet, "lease pre-pass must read the table schema")
146+
require.True(t, metaHadDeadline,
147+
"transactGetItemKey schema read must run under the bounded leaseCtx, not the deadline-free request context")
148+
}
149+
150+
// TestDynamoDB_LeaseCheckTransactGetItems_AllItemsSkippedNoLeaseRead asserts
151+
// that when every TransactItems entry fails key resolution (malformed Get),
152+
// the pre-pass touches no shard and returns true without issuing a lease
153+
// read — making the implicit empty-uniqueKeys fallback explicit
154+
// (claude #952 issue #1).
155+
func TestDynamoDB_LeaseCheckTransactGetItems_AllItemsSkippedNoLeaseRead(t *testing.T) {
156+
t.Parallel()
157+
158+
st := store.NewMVCCStore()
159+
wrapped := &leaseReadCountingCoordinator{stubAdapterCoordinator: &stubAdapterCoordinator{}}
160+
server := NewDynamoDBServer(nil, st, wrapped)
161+
162+
// No schema is written, so transactGetItemKey cannot resolve any key:
163+
// every item is skipped and uniqueKeys stays empty.
164+
in := transactGetItemsInput{
165+
TransactItems: []transactGetItem{
166+
{Get: &transactGetItemGet{
167+
TableName: "missing",
168+
Key: map[string]attributeValue{"pk": newStringAttributeValue("x")},
169+
}},
170+
{Get: nil},
171+
},
172+
}
173+
174+
rec := httptest.NewRecorder()
175+
require.True(t, server.leaseCheckTransactGetItems(rec, noDeadlineRequest(t), in))
176+
require.Equal(t, int64(0), wrapped.leaseReadForKeyCalls.Load(),
177+
"all-items-skipped path must not issue a lease read when no shard is touched")
178+
require.Equal(t, int64(0), wrapped.leaseReadCalls.Load(),
179+
"all-items-skipped path must not fall back to the keyless lease check")
180+
require.Equal(t, http.StatusOK, rec.Code, "no error response should be written")
181+
}
182+
183+
// leaseReadCountingCoordinator wraps stubAdapterCoordinator and counts
184+
// lease-read calls so a test can assert none were issued. The counters are
185+
// atomic so the assertions stay race-free if a handler ever fans out lease
186+
// reads concurrently.
187+
type leaseReadCountingCoordinator struct {
188+
*stubAdapterCoordinator
189+
leaseReadCalls atomic.Int64
190+
leaseReadForKeyCalls atomic.Int64
191+
}
192+
193+
func (c *leaseReadCountingCoordinator) LeaseRead(ctx context.Context) (uint64, error) {
194+
c.leaseReadCalls.Add(1)
195+
return c.stubAdapterCoordinator.LeaseRead(ctx)
196+
}
197+
198+
func (c *leaseReadCountingCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) {
199+
c.leaseReadForKeyCalls.Add(1)
200+
return c.stubAdapterCoordinator.LeaseReadForKey(ctx, key)
201+
}

0 commit comments

Comments
 (0)