Skip to content

Commit eb237ea

Browse files
committed
Enhanced Queries (Joins & Aggregations) in AWS SDK Java v2
1 parent 6984a2b commit eb237ea

7 files changed

Lines changed: 823 additions & 596 deletions

File tree

services-custom/dynamodb-enhanced/ENHANCED_QUERIES_DESIGN.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ This initiative was driven by **sustained public engagement** (Stack Overflow, G
2828

2929
DynamoDB is optimized for predictable, key-based access. It does **not** provide SQL-style joins or server-side GROUP BY. That forces teams either to leave DynamoDB, add external query systems, or reinvent the same application-side patterns.
3030

31-
We close this gap with a **fluent, type-safe** API that runs in the application JVM, uses DynamoDB `Query` by default (and `Scan` only when explicitly allowed), and returns structured results plus optional timing breakdowns.
31+
We close this gap with a **fluent, type-safe** API that runs in the application JVM, uses DynamoDB `Query` by default (and `Scan` only when explicitly allowed), and returns structured results.
3232

3333
### Usage and Adoption
3434

@@ -133,8 +133,8 @@ All conditions evaluate **in memory** on the merged row, except **`keyCondition`
133133
<td><pre lang="java">QueryExpressionSpec spec = QueryExpressionBuilder.from(customersTable)
134134
.join(ordersTable, JoinType.INNER, "customerId", "customerId")
135135
.keyCondition(QueryConditional.keyEqualTo(k -> k.partitionValue("c1")))
136+
.filterBase(Condition.eq("region", "EU"))
136137
.filterJoined(Condition.gte("amount", 50))
137-
.where(Condition.eq("region", "EU"))
138138
.project("customerId", "region", "orderId", "amount")
139139
.build();</pre></td>
140140
</tr>

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/query/internal/DefaultQueryExpressionAsyncEngine.java

Lines changed: 127 additions & 300 deletions
Large diffs are not rendered by default.

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/query/internal/DefaultQueryExpressionEngine.java

Lines changed: 88 additions & 294 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.enhanced.dynamodb.query.internal;
17+
18+
import java.util.Collections;
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
23+
/**
24+
* Builds the {@code itemsByAlias} map for {@link software.amazon.awssdk.enhanced.dynamodb.query.EnhancedQueryRow} in join
25+
* queries. Centralizes base/joined alias keys so engines stay readable.
26+
*/
27+
@SdkInternalApi
28+
public final class JoinRowAliases {
29+
30+
private JoinRowAliases() {
31+
}
32+
33+
/**
34+
* Row for LEFT/FULL when the join key is null on the base side, or when no joined rows exist for the key: base attributes
35+
* plus an empty joined map.
36+
*/
37+
public static Map<String, Map<String, Object>> leftOuterJoinRowWithEmptyJoined(Map<String, Object> base) {
38+
Map<String, Map<String, Object>> itemsByAlias = new HashMap<>(2);
39+
itemsByAlias.put(QueryEngineSupport.BASE_ALIAS, base);
40+
itemsByAlias.put(QueryEngineSupport.JOINED_ALIAS, Collections.emptyMap());
41+
return itemsByAlias;
42+
}
43+
44+
/**
45+
* Row when both base and joined attribute maps are present (inner match, or outer join with at least one joined row).
46+
*/
47+
public static Map<String, Map<String, Object>> innerJoinRow(Map<String, Object> base, Map<String, Object> joined) {
48+
Map<String, Map<String, Object>> itemsByAlias = new HashMap<>(2);
49+
itemsByAlias.put(QueryEngineSupport.BASE_ALIAS, base);
50+
itemsByAlias.put(QueryEngineSupport.JOINED_ALIAS, joined);
51+
return itemsByAlias;
52+
}
53+
54+
/**
55+
* Row for RIGHT/FULL when scanning the joined table for keys with no base match: empty base map and joined attributes.
56+
*/
57+
public static Map<String, Map<String, Object>> rightOuterJoinRowWithEmptyBase(Map<String, Object> joined) {
58+
Map<String, Map<String, Object>> itemsByAlias = new HashMap<>(2);
59+
itemsByAlias.put(QueryEngineSupport.BASE_ALIAS, Collections.emptyMap());
60+
itemsByAlias.put(QueryEngineSupport.JOINED_ALIAS, joined);
61+
return itemsByAlias;
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.enhanced.dynamodb.query.internal;
17+
18+
import java.util.AbstractMap;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Future;
29+
import software.amazon.awssdk.annotations.SdkInternalApi;
30+
import software.amazon.awssdk.enhanced.dynamodb.MappedTableResource;
31+
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
32+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
33+
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
34+
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
35+
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
36+
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
37+
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
38+
39+
/**
40+
* Async counterpart to {@link JoinedTableObjectMapSyncFetcher}: loads joined-table rows for join-key values using
41+
* DynamoDbAsyncClient. Used by the async query engine.
42+
*/
43+
@SdkInternalApi
44+
public final class JoinedTableObjectMapAsyncFetcher {
45+
46+
private final DynamoDbAsyncClient asyncLowLevel;
47+
private final ExecutorService joinExecutor;
48+
49+
public JoinedTableObjectMapAsyncFetcher(DynamoDbAsyncClient asyncLowLevel, ExecutorService joinExecutor) {
50+
this.asyncLowLevel = asyncLowLevel;
51+
this.joinExecutor = joinExecutor;
52+
}
53+
54+
/**
55+
* Three-tier routing mirroring the sync engine: PK match -> parallel per-key query, GSI match -> parallel per-key index
56+
* query, no index -> low-level parallel scan via DynamoDbAsyncClient.
57+
*/
58+
@SuppressWarnings("unchecked")
59+
public Map<Object, List<Map<String, Object>>> resolveAndFetchJoinedObjectMaps(
60+
MappedTableResource<?> joinedTable, Set<Object> joinKeys, String joinedJoinAttr) {
61+
if (joinKeys.isEmpty()) {
62+
return Collections.emptyMap();
63+
}
64+
65+
TableSchema<Object> joinedSchema = (TableSchema<Object>) joinedTable.tableSchema();
66+
String primaryPk = joinedSchema.tableMetadata().primaryPartitionKey();
67+
68+
if (primaryPk.equals(joinedJoinAttr)) {
69+
return lowLevelQueryByPk(joinedTable.tableName(), primaryPk, joinKeys);
70+
}
71+
72+
String matchedIndex = QueryEngineSupport.findIndexForAttribute(joinedSchema, joinedJoinAttr);
73+
if (matchedIndex != null) {
74+
return lowLevelQueryByGsi(joinedTable.tableName(), matchedIndex, joinedJoinAttr, joinKeys);
75+
}
76+
77+
return parallelScanFallback(joinedTable.tableName(), joinKeys, joinedJoinAttr);
78+
}
79+
80+
/**
81+
* Low-level per-key Query path when the join attribute matches the table's primary key. Executes inline for a single key to
82+
* avoid thread pool overhead.
83+
*/
84+
private Map<Object, List<Map<String, Object>>> lowLevelQueryByPk(
85+
String tableName, String pkAttr, Set<Object> joinKeys) {
86+
List<Callable<Map.Entry<Object, List<Map<String, Object>>>>> tasks = new ArrayList<>();
87+
for (Object key : joinKeys) {
88+
Object keyFinal = key;
89+
tasks.add(() -> queryByPkForKey(asyncLowLevel, tableName, pkAttr, keyFinal));
90+
}
91+
if (tasks.size() == 1) {
92+
return executeInline(tasks.get(0));
93+
}
94+
return executeAndCollect(tasks);
95+
}
96+
97+
private static Map.Entry<Object, List<Map<String, Object>>> queryByPkForKey(
98+
DynamoDbAsyncClient client, String tableName, String pkAttr, Object keyFinal) {
99+
List<Map<String, Object>> items = new ArrayList<>();
100+
Map<String, AttributeValue> exclusiveStartKey = null;
101+
do {
102+
QueryRequest.Builder reqBuilder =
103+
QueryRequest.builder()
104+
.tableName(tableName)
105+
.keyConditionExpression("#k = :v")
106+
.expressionAttributeNames(Collections.singletonMap("#k", pkAttr))
107+
.expressionAttributeValues(Collections.singletonMap(
108+
":v", AttributeValueConversion.toKeyAttributeValue(keyFinal)));
109+
if (exclusiveStartKey != null) {
110+
reqBuilder.exclusiveStartKey(exclusiveStartKey);
111+
}
112+
QueryResponse response;
113+
try {
114+
response = client.query(reqBuilder.build()).get();
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
throw new RuntimeException(e);
118+
} catch (ExecutionException e) {
119+
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
120+
}
121+
for (Map<String, AttributeValue> item : response.items()) {
122+
items.add(AttributeValueConversion.toObjectMap(item));
123+
}
124+
exclusiveStartKey = response.lastEvaluatedKey().isEmpty()
125+
? null : response.lastEvaluatedKey();
126+
} while (exclusiveStartKey != null);
127+
return new AbstractMap.SimpleEntry<>(keyFinal, items);
128+
}
129+
130+
/**
131+
* Low-level per-key Query path when the join attribute matches a GSI partition key. Executes inline for a single key to avoid
132+
* thread pool overhead.
133+
*/
134+
private Map<Object, List<Map<String, Object>>> lowLevelQueryByGsi(
135+
String tableName, String indexName, String attrName, Set<Object> joinKeys) {
136+
List<Callable<Map.Entry<Object, List<Map<String, Object>>>>> tasks = new ArrayList<>();
137+
for (Object key : joinKeys) {
138+
Object keyFinal = key;
139+
tasks.add(() -> queryByGsiForKey(asyncLowLevel, tableName, indexName, attrName, keyFinal));
140+
}
141+
if (tasks.size() == 1) {
142+
return executeInline(tasks.get(0));
143+
}
144+
return executeAndCollect(tasks);
145+
}
146+
147+
private static Map.Entry<Object, List<Map<String, Object>>> queryByGsiForKey(
148+
DynamoDbAsyncClient client, String tableName, String indexName, String attrName, Object keyFinal) {
149+
List<Map<String, Object>> items = new ArrayList<>();
150+
Map<String, AttributeValue> exclusiveStartKey = null;
151+
do {
152+
QueryRequest.Builder reqBuilder =
153+
QueryRequest.builder()
154+
.tableName(tableName)
155+
.indexName(indexName)
156+
.keyConditionExpression("#k = :v")
157+
.expressionAttributeNames(Collections.singletonMap("#k", attrName))
158+
.expressionAttributeValues(Collections.singletonMap(
159+
":v", AttributeValueConversion.toKeyAttributeValue(keyFinal)));
160+
if (exclusiveStartKey != null) {
161+
reqBuilder.exclusiveStartKey(exclusiveStartKey);
162+
}
163+
QueryResponse response;
164+
try {
165+
response = client.query(reqBuilder.build()).get();
166+
} catch (InterruptedException e) {
167+
Thread.currentThread().interrupt();
168+
throw new RuntimeException(e);
169+
} catch (ExecutionException e) {
170+
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
171+
}
172+
for (Map<String, AttributeValue> item : response.items()) {
173+
items.add(AttributeValueConversion.toObjectMap(item));
174+
}
175+
exclusiveStartKey = response.lastEvaluatedKey().isEmpty()
176+
? null : response.lastEvaluatedKey();
177+
} while (exclusiveStartKey != null);
178+
return new AbstractMap.SimpleEntry<>(keyFinal, items);
179+
}
180+
181+
/**
182+
* Low-level parallel scan using DynamoDbAsyncClient directly. Bypasses the enhanced-client bean round-trip and goes straight
183+
* from AV map -> Object map (one conversion instead of three).
184+
*/
185+
private Map<Object, List<Map<String, Object>>> parallelScanFallback(
186+
String tableName, Set<Object> neededKeys, String joinedJoinAttr) {
187+
int totalSegments = QueryEngineSupport.PARALLEL_SCAN_SEGMENTS;
188+
189+
List<Callable<Map<Object, List<Map<String, Object>>>>> tasks = new ArrayList<>();
190+
for (int seg = 0; seg < totalSegments; seg++) {
191+
int segment = seg;
192+
tasks.add(() -> {
193+
Map<Object, List<Map<String, Object>>> partial = new HashMap<>();
194+
Map<String, AttributeValue> exclusiveStartKey = null;
195+
do {
196+
ScanRequest.Builder reqBuilder =
197+
ScanRequest.builder()
198+
.tableName(tableName)
199+
.segment(segment)
200+
.totalSegments(totalSegments);
201+
if (exclusiveStartKey != null) {
202+
reqBuilder.exclusiveStartKey(exclusiveStartKey);
203+
}
204+
ScanResponse response;
205+
try {
206+
response = asyncLowLevel.scan(reqBuilder.build()).get();
207+
} catch (InterruptedException e) {
208+
Thread.currentThread().interrupt();
209+
throw new RuntimeException(e);
210+
} catch (ExecutionException e) {
211+
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
212+
}
213+
for (Map<String, AttributeValue> item : response.items()) {
214+
AttributeValue keyAv = item.get(joinedJoinAttr);
215+
if (keyAv == null) {
216+
continue;
217+
}
218+
Object keyObj = AttributeValueConversion.toObject(keyAv);
219+
if (keyObj != null && neededKeys.contains(keyObj)) {
220+
partial.computeIfAbsent(keyObj, k -> new ArrayList<>())
221+
.add(AttributeValueConversion.toObjectMap(item));
222+
}
223+
}
224+
exclusiveStartKey = response.lastEvaluatedKey().isEmpty()
225+
? null : response.lastEvaluatedKey();
226+
} while (exclusiveStartKey != null);
227+
return partial;
228+
});
229+
}
230+
231+
try {
232+
List<Future<Map<Object, List<Map<String, Object>>>>> futures = joinExecutor.invokeAll(tasks);
233+
Map<Object, List<Map<String, Object>>> merged = new HashMap<>();
234+
for (Future<Map<Object, List<Map<String, Object>>>> f : futures) {
235+
for (Map.Entry<Object, List<Map<String, Object>>> e : f.get().entrySet()) {
236+
List<Map<String, Object>> list = merged.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
237+
list.addAll(e.getValue());
238+
}
239+
}
240+
return merged;
241+
} catch (InterruptedException e) {
242+
Thread.currentThread().interrupt();
243+
throw new RuntimeException(e);
244+
} catch (ExecutionException e) {
245+
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
246+
}
247+
}
248+
249+
private Map<Object, List<Map<String, Object>>> executeAndCollect(
250+
List<Callable<Map.Entry<Object, List<Map<String, Object>>>>> tasks) {
251+
try {
252+
List<Future<Map.Entry<Object, List<Map<String, Object>>>>> futures = joinExecutor.invokeAll(tasks);
253+
Map<Object, List<Map<String, Object>>> result = new HashMap<>();
254+
for (Future<Map.Entry<Object, List<Map<String, Object>>>> f : futures) {
255+
Map.Entry<Object, List<Map<String, Object>>> e = f.get();
256+
result.put(e.getKey(), e.getValue());
257+
}
258+
return result;
259+
} catch (InterruptedException e) {
260+
Thread.currentThread().interrupt();
261+
throw new RuntimeException(e);
262+
} catch (ExecutionException e) {
263+
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
264+
}
265+
}
266+
267+
private static Map<Object, List<Map<String, Object>>> executeInline(
268+
Callable<Map.Entry<Object, List<Map<String, Object>>>> task) {
269+
try {
270+
Map.Entry<Object, List<Map<String, Object>>> entry = task.call();
271+
Map<Object, List<Map<String, Object>>> result = new HashMap<>();
272+
result.put(entry.getKey(), entry.getValue());
273+
return result;
274+
} catch (RuntimeException e) {
275+
throw e;
276+
} catch (Exception e) {
277+
throw new RuntimeException(e);
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)