Skip to content

Commit 4caf59f

Browse files
committed
Add support for streaming and cursor query execution in SQL templates
Motivation: - Introduce a feature to handle large result sets efficiently with streaming and cursor-based query execution. Changes: - Added `SqlTemplateStream` and `forCursor` methods to `SqlTemplate`. - Added `SqlTemplateStreamImpl`, `CursorSqlTemplateImpl`, and `MappingRowStream` - Added documentation to `index.adoc` with examples
1 parent 947cf72 commit 4caf59f

File tree

13 files changed

+807
-23
lines changed

13 files changed

+807
-23
lines changed

vertx-sql-client-templates/src/main/asciidoc/index.adoc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,35 @@ When your template must be executed inside a transaction, you might create a tem
6565
----
6666
====
6767

68+
== Streaming
69+
70+
When dealing with large result sets, you can use {@link io.vertx.sqlclient.templates.SqlTemplateStream} to read rows progressively using a cursor with a configurable fetch size, instead of loading all rows in memory at once.
71+
72+
NOTE: Streaming requires a {@link io.vertx.sqlclient.SqlConnection}. Some databases (e.g. PostgreSQL) also require an active transaction for cursors.
73+
74+
[source,$lang]
75+
----
76+
{@link examples.TemplateExamples#streamExample}
77+
----
78+
79+
You can use `mapTo` to map each row emitted by the stream to a custom type:
80+
81+
[source,$lang]
82+
----
83+
{@link examples.TemplateExamples#streamWithMapToExample}
84+
----
85+
86+
== Cursor
87+
88+
If you need finer control over row fetching, you can use a cursor-based template with {@link io.vertx.sqlclient.templates.SqlTemplate#forCursor}. This gives you a {@link io.vertx.sqlclient.Cursor} that allows you to read rows in batches.
89+
90+
NOTE: Cursors require a {@link io.vertx.sqlclient.SqlConnection}. Some databases (e.g. PostgreSQL) also require an active transaction for cursors.
91+
92+
[source,$lang]
93+
----
94+
{@link examples.TemplateExamples#cursorExample}
95+
----
96+
6897
== Template syntax
6998

7099
The template syntax uses `#{XXX}` syntax where `XXX` is a valid https://docs.oracle.com/javase/specs/jls/se8/html/jls-3.html#jls-3.8[java identifier] string

vertx-sql-client-templates/src/main/java/examples/TemplateExamples.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.vertx.sqlclient.*;
1010
import io.vertx.sqlclient.templates.RowMapper;
1111
import io.vertx.sqlclient.templates.SqlTemplate;
12+
import io.vertx.sqlclient.templates.SqlTemplateStream;
1213
import io.vertx.sqlclient.templates.TupleMapper;
1314
import io.vertx.sqlclient.templates.annotations.Column;
1415
import io.vertx.sqlclient.templates.annotations.ParametersMapped;
@@ -424,6 +425,56 @@ public Tuple map(Function<Integer, String> mapping, int size, UserDataObject par
424425
}
425426
}
426427

428+
public void streamExample(SqlConnection connection) {
429+
SqlTemplateStream
430+
.forStream(connection, "SELECT * FROM users WHERE age > #{age}", 50)
431+
.execute(Collections.singletonMap("age", 18))
432+
.onSuccess(stream -> {
433+
stream.handler(row -> {
434+
System.out.println(row.getString("first_name") + " " + row.getString("last_name"));
435+
});
436+
stream.endHandler(v -> {
437+
System.out.println("End of stream");
438+
});
439+
stream.exceptionHandler(err -> {
440+
System.out.println("Error: " + err.getMessage());
441+
});
442+
});
443+
}
444+
445+
public void streamWithMapToExample(SqlConnection connection) {
446+
SqlTemplateStream
447+
.forStream(connection, "SELECT * FROM users WHERE age > #{age}", 50)
448+
.mapTo(ROW_USER_MAPPER)
449+
.execute(Collections.singletonMap("age", 18))
450+
.onSuccess(stream -> {
451+
stream.handler(user -> {
452+
System.out.println(user.firstName + " " + user.lastName);
453+
});
454+
stream.endHandler(v -> {
455+
System.out.println("End of stream");
456+
});
457+
});
458+
}
459+
460+
public void cursorExample(SqlConnection connection) {
461+
SqlTemplate
462+
.forCursor(connection, "SELECT * FROM users WHERE age > #{age}")
463+
.execute(Collections.singletonMap("age", 18))
464+
.onSuccess(cursor -> {
465+
cursor.read(100).onSuccess(rows -> {
466+
rows.forEach(row -> {
467+
System.out.println(row.getString("first_name") + " " + row.getString("last_name"));
468+
});
469+
if (cursor.hasMore()) {
470+
// Read more
471+
} else {
472+
cursor.close();
473+
}
474+
});
475+
});
476+
}
477+
427478
public void templateInTransaction(Pool pool) {
428479
SqlTemplate<Map<String, Object>, RowSet<UserDataObject>> template = SqlTemplate
429480
.forQuery(pool, "SELECT * FROM users WHERE id=#{id}")

vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/RowMapper.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package io.vertx.sqlclient.templates;
1212

1313
import io.vertx.codegen.annotations.VertxGen;
14+
import io.vertx.core.json.JsonObject;
1415
import io.vertx.sqlclient.Row;
1516

1617
/**
@@ -20,6 +21,25 @@
2021
@FunctionalInterface
2122
public interface RowMapper<T> {
2223

24+
/**
25+
* Create a mapper that converts a {@link Row} to an instance of the given {@code type}.
26+
*
27+
* <p>This feature relies on {@link io.vertx.core.json.JsonObject#mapTo} feature. This likely requires
28+
* to use Jackson databind in the project.
29+
*
30+
* @param type the target class
31+
* @return the mapper
32+
*/
33+
static <T> RowMapper<T> mapper(Class<T> type) {
34+
return row -> {
35+
JsonObject json = new JsonObject();
36+
for (int i = 0; i < row.size(); i++) {
37+
json.getMap().put(row.getColumnName(i), row.getValue(i));
38+
}
39+
return json.mapTo(type);
40+
};
41+
}
42+
2343
/**
2444
* Build a {@code T} representation of the given {@code row}
2545
*

vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/SqlTemplate.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414
import io.vertx.codegen.annotations.GenIgnore;
1515
import io.vertx.codegen.annotations.VertxGen;
1616
import io.vertx.core.Future;
17-
import io.vertx.core.json.JsonObject;
18-
import io.vertx.sqlclient.Row;
19-
import io.vertx.sqlclient.RowSet;
20-
import io.vertx.sqlclient.SqlClient;
21-
import io.vertx.sqlclient.SqlResult;
17+
import io.vertx.sqlclient.*;
2218
import io.vertx.sqlclient.impl.SqlClientInternal;
19+
import io.vertx.sqlclient.templates.impl.CursorSqlTemplateImpl;
2320
import io.vertx.sqlclient.templates.impl.SqlTemplateImpl;
2421

25-
import java.util.LinkedHashMap;
2622
import java.util.List;
2723
import java.util.Map;
2824
import java.util.function.Function;
@@ -69,6 +65,32 @@ static SqlTemplate<Map<String, Object>, SqlResult<Void>> forUpdate(SqlClient cli
6965
return new SqlTemplateImpl<>(clientInternal, sqlTemplate, query -> query.collecting(SqlTemplateImpl.NULL_COLLECTOR), sqlTemplate::mapTuple);
7066
}
7167

68+
/**
69+
* Create an SQL template for streaming query results consuming map parameters and returning {@link Row}.
70+
*
71+
* <p>Delegates to {@link SqlTemplateStream#forStream(SqlConnection, String, int)}.
72+
*
73+
* @param client the wrapped SQL connection
74+
* @param template the template query string
75+
* @param fetchSize the cursor fetch size
76+
* @return the template
77+
*/
78+
static SqlTemplateStream<Map<String, Object>, Row> forStream(SqlConnection client, String template, int fetchSize) {
79+
return SqlTemplateStream.forStream(client, template, fetchSize);
80+
}
81+
82+
/**
83+
* Create an SQL template for cursor-based query execution consuming map parameters and returning a {@link Cursor}.
84+
*
85+
* @param client the wrapped SQL connection
86+
* @param template the template query string
87+
* @return the template
88+
*/
89+
static SqlTemplate<Map<String, Object>, Cursor> forCursor(SqlConnection client, String template) {
90+
SqlClientInternal clientInternal = (SqlClientInternal) client;
91+
io.vertx.sqlclient.templates.impl.SqlTemplate sqlTemplate = io.vertx.sqlclient.templates.impl.SqlTemplate.create(clientInternal, template);
92+
return new CursorSqlTemplateImpl<>(client, sqlTemplate, sqlTemplate::mapTuple);
93+
}
7294

7395
/**
7496
* @return the computed SQL for this template
@@ -99,14 +121,7 @@ static SqlTemplate<Map<String, Object>, SqlResult<Void>> forUpdate(SqlClient cli
99121
* @return a new template
100122
*/
101123
default <T> SqlTemplate<T, R> mapFrom(Class<T> type) {
102-
return mapFrom(TupleMapper.mapper(params -> {
103-
JsonObject jsonObject = JsonObject.mapFrom(params);
104-
Map<String, Object> map = new LinkedHashMap<>(jsonObject.size());
105-
for (String fieldName : jsonObject.fieldNames()) {
106-
map.put(fieldName, jsonObject.getValue(fieldName));
107-
}
108-
return map;
109-
}));
124+
return mapFrom(TupleMapper.mapper(type));
110125
}
111126

112127
/**
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.vertx.sqlclient.templates;
2+
3+
import io.vertx.codegen.annotations.VertxGen;
4+
import io.vertx.core.Future;
5+
import io.vertx.sqlclient.Row;
6+
import io.vertx.sqlclient.RowStream;
7+
import io.vertx.sqlclient.SqlConnection;
8+
import io.vertx.sqlclient.impl.SqlClientInternal;
9+
import io.vertx.sqlclient.templates.impl.SqlTemplateStreamImpl;
10+
11+
import java.util.Map;
12+
13+
/**
14+
* An SQL template for streaming query results.
15+
*
16+
* <p>Stream templates execute queries using named instead of positional parameters and return results
17+
* as a {@link RowStream} that reads rows progressively using a cursor with a configurable fetch size.
18+
*
19+
* @param <I> the input parameters type
20+
* @param <T> the row output type
21+
*/
22+
@VertxGen
23+
public interface SqlTemplateStream<I, T> {
24+
25+
/**
26+
* Create an SQL template for streaming query results consuming map parameters and returning {@link Row}.
27+
*
28+
* <p>The returned stream template uses a cursor with the given {@code fetchSize} to read rows progressively.
29+
*
30+
* @param client the wrapped SQL connection
31+
* @param template the template query string
32+
* @param fetchSize the cursor fetch size
33+
* @return the template
34+
*/
35+
static SqlTemplateStream<Map<String, Object>, Row> forStream(SqlConnection client, String template, int fetchSize) {
36+
SqlClientInternal clientInternal = (SqlClientInternal) client;
37+
io.vertx.sqlclient.templates.impl.SqlTemplate sqlTemplate = io.vertx.sqlclient.templates.impl.SqlTemplate.create(clientInternal, template);
38+
return new SqlTemplateStreamImpl<>(client, sqlTemplate, sqlTemplate::mapTuple, null, fetchSize);
39+
}
40+
41+
/**
42+
* @return the computed SQL for this template
43+
*/
44+
String sql();
45+
46+
/**
47+
* Set a parameters user defined mapping function.
48+
*
49+
* <p> At query execution, the {@code mapper} is called to map the parameters object
50+
* to a {@link io.vertx.sqlclient.Tuple} that configures the prepared query.
51+
*
52+
* @param mapper the mapping function
53+
* @return a new template
54+
*/
55+
<T2> SqlTemplateStream<T2, T> mapFrom(TupleMapper<T2> mapper);
56+
57+
/**
58+
* Set a parameters user defined class mapping.
59+
*
60+
* <p> At query execution, the parameters object is mapped to a {@code Map<String, Object>} that
61+
* configures the prepared query.
62+
*
63+
* <p> This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires
64+
* to use Jackson databind in the project.
65+
*
66+
* @param type the mapping type
67+
* @return a new template
68+
*/
69+
default <T2> SqlTemplateStream<T2, T> mapFrom(Class<T2> type) {
70+
return mapFrom(TupleMapper.mapper(type));
71+
}
72+
73+
/**
74+
* Set a row user defined mapping function.
75+
*
76+
* <p>When rows are emitted by the stream, the {@code mapper} function is called to map each {@link Row}
77+
* to the target type.
78+
*
79+
* @param mapper the mapping function
80+
* @return a new template
81+
*/
82+
<U> SqlTemplateStream<I, U> mapTo(RowMapper<U> mapper);
83+
84+
/**
85+
* Set a row user defined mapping function.
86+
*
87+
* <p>When rows are emitted by the stream, resulting rows are mapped to {@code type} instances.
88+
*
89+
* <p> This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires
90+
* to use Jackson databind in the project.
91+
*
92+
* @param type the mapping type
93+
* @return a new template
94+
*/
95+
default <U> SqlTemplateStream<I, U> mapTo(Class<U> type) {
96+
return mapTo(RowMapper.mapper(type));
97+
}
98+
99+
/**
100+
* Returns a new template, using the specified {@code connection}.
101+
*
102+
* @param connection the connection that will execute requests
103+
* @return a new template
104+
*/
105+
SqlTemplateStream<I, T> withConnection(SqlConnection connection);
106+
107+
/**
108+
* Execute the query with the {@code parameters}
109+
*
110+
* @param params the query parameters
111+
* @return a future notified with the result
112+
*/
113+
Future<RowStream<T>> execute(I params);
114+
115+
}

vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/TupleMapper.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.vertx.sqlclient.Tuple;
1616
import io.vertx.sqlclient.templates.impl.JsonTuple;
1717

18+
import java.util.LinkedHashMap;
1819
import java.util.Map;
1920
import java.util.function.Function;
2021

@@ -44,6 +45,26 @@ static <T> TupleMapper<T> mapper(Function<T, Map<String, Object>> fn) {
4445
};
4546
}
4647

48+
/**
49+
* Create a mapper that converts a parameters object of the given {@code type} to a map of named parameters.
50+
*
51+
* <p>This feature relies on {@link io.vertx.core.json.JsonObject#mapFrom} feature. This likely requires
52+
* to use Jackson databind in the project.
53+
*
54+
* @param type the parameters class
55+
* @return the mapper
56+
*/
57+
static <T> TupleMapper<T> mapper(Class<T> type) {
58+
return mapper(params -> {
59+
JsonObject jsonObject = JsonObject.mapFrom(params);
60+
Map<String, Object> map = new LinkedHashMap<>(jsonObject.size());
61+
for (String fieldName : jsonObject.fieldNames()) {
62+
map.put(fieldName, jsonObject.getValue(fieldName));
63+
}
64+
return map;
65+
});
66+
}
67+
4768
/**
4869
* Map a {@link JsonObject} to a {@link Tuple}.
4970
*/

0 commit comments

Comments
 (0)