Skip to content

Commit 81165da

Browse files
feat[backed](elasticSearchService): added batch processing of request… (#2090)
* feat[backed](elasticSearchService): added batch processing of requests and auto rebuild on IO errors * chore[backend](): updated go dependencies * fix[backend](elastic-service): sanitized csv before exportation and changed error messages
1 parent 9682d0d commit 81165da

4 files changed

Lines changed: 277 additions & 43 deletions

File tree

backend/src/main/java/com/park/utmstack/service/elasticsearch/ElasticsearchService.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ public <T> SearchResponse<T> search(List<FilterType> filters, Integer top, Strin
341341
try {
342342
Assert.hasText(indexPattern, "Parameter indexPattern must not be null or empty");
343343
SearchRequest query = buildQuery(indexPattern, filters, top, pageable);
344-
return client.getClient().search(query, type);
344+
return client.execute(c -> c.search(query, type));
345345
} catch (Exception e) {
346346
throw new RuntimeException(ctx + ": " + e.getMessage());
347347
}
@@ -400,12 +400,86 @@ public Map<String, Object> getLatestDocument(List<FilterType> filters, String in
400400
public <T> SearchResponse<T> search(SearchRequest request, Class<T> type) {
401401
final String ctx = CLASSNAME + ".search";
402402
try {
403-
return client.getClient().search(request, type);
403+
return client.execute(c -> c.search(request, type));
404404
} catch (Exception e) {
405405
throw new RuntimeException(ctx + ": " + e.getMessage());
406406
}
407407
}
408408

409+
@FunctionalInterface
410+
public interface SearchBatchConsumer<T> {
411+
/** Returns false to stop iteration early. */
412+
boolean accept(List<T> batch) throws Exception;
413+
}
414+
415+
/**
416+
* Streams a result set using search_after pagination, never holding more than {@code pageSize}
417+
* documents in memory at a time. Designed for very large exports where loading every hit at
418+
* once would OOM the JVM (and take the OpenSearch client's I/O reactor down with it).
419+
*
420+
* Sort is forced to {@code @timestamp desc} with {@code _id desc} as tiebreaker so that
421+
* search_after is stable and deterministic.
422+
*
423+
* @param filters filters to apply
424+
* @param max hard upper bound on total documents to emit; null or <=0 means unbounded
425+
* @param indexPattern target index pattern
426+
* @param pageSize batch size (capped at 10000 by OpenSearch per request)
427+
* @param type deserialization type
428+
* @param consumer receives each batch; return false to stop early
429+
* @return total number of documents emitted
430+
*/
431+
public <T> long searchStream(List<FilterType> filters, Integer max, String indexPattern,
432+
int pageSize, Class<T> type, SearchBatchConsumer<T> consumer) {
433+
final String ctx = CLASSNAME + ".searchStream";
434+
try {
435+
Assert.hasText(indexPattern, "Parameter indexPattern must not be null or empty");
436+
Assert.notNull(consumer, "consumer must not be null");
437+
if (pageSize <= 0) pageSize = 500;
438+
439+
long emitted = 0;
440+
List<String> after = null;
441+
while (true) {
442+
int remaining = (max != null && max > 0) ? (int) (max - emitted) : pageSize;
443+
if (remaining <= 0) break;
444+
int size = Math.min(pageSize, remaining);
445+
446+
final List<String> afterFinal = after;
447+
final int sizeFinal = size;
448+
SearchResponse<T> response = client.execute(c -> {
449+
SearchRequest.Builder srb = new SearchRequest.Builder()
450+
.index(indexPattern)
451+
.query(SearchUtil.toQuery(filters))
452+
.size(sizeFinal)
453+
.sort(s -> s.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
454+
.sort(s -> s.field(f -> f.field("_id").order(SortOrder.Desc)));
455+
if (afterFinal != null && !afterFinal.isEmpty())
456+
srb.searchAfter(afterFinal);
457+
return c.search(srb.build(), type);
458+
});
459+
460+
if (response == null || response.hits() == null) break;
461+
List<org.opensearch.client.opensearch.core.search.Hit<T>> hits = response.hits().hits();
462+
if (hits == null || hits.isEmpty()) break;
463+
464+
List<T> batch = new ArrayList<>(hits.size());
465+
for (org.opensearch.client.opensearch.core.search.Hit<T> h : hits)
466+
batch.add(h.source());
467+
468+
boolean keepGoing = consumer.accept(batch);
469+
emitted += hits.size();
470+
471+
if (!keepGoing) break;
472+
if (hits.size() < size) break;
473+
474+
after = hits.get(hits.size() - 1).sort();
475+
if (after == null || after.isEmpty()) break;
476+
}
477+
return emitted;
478+
} catch (Exception e) {
479+
throw new RuntimeException(ctx + ": " + e.getMessage(), e);
480+
}
481+
}
482+
409483
public void updateByQuery(Query query, String index, String script) {
410484
final String ctx = CLASSNAME + ".updateByQuery";
411485
try {

backend/src/main/java/com/park/utmstack/service/elasticsearch/OpensearchClientBuilder.java

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,71 @@
1616
public class OpensearchClientBuilder {
1717
private static final String CLASSNAME = "OpensearchClientBuilder";
1818
private final Logger log = LoggerFactory.getLogger(OpensearchClientBuilder.class);
19-
private OpenSearch client;
19+
private volatile OpenSearch client;
20+
21+
@FunctionalInterface
22+
public interface OsAction<T> {
23+
T apply(OpenSearch client) throws Exception;
24+
}
2025

2126
@Order(Ordered.HIGHEST_PRECEDENCE)
2227
@EventListener(ApplicationReadyEvent.class)
2328
public void init() throws Exception {
24-
final String ctx = CLASSNAME + ".init";
29+
buildClient();
30+
}
31+
32+
public OpenSearch getClient() {
33+
return client;
34+
}
35+
36+
/**
37+
* Runs an action against the OpenSearch client with one-shot recovery: if the underlying
38+
* Apache HttpAsyncClient I/O reactor has transitioned to STOPPED (typically after an OOM
39+
* or a fatal callback exception while streaming a very large response), the singleton
40+
* client is rebuilt and the action is retried once. All other failures propagate unchanged.
41+
* Callers that don't need recovery should keep using {@link #getClient()} directly.
42+
*/
43+
public <T> T execute(OsAction<T> action) throws Exception {
44+
try {
45+
return action.apply(client);
46+
} catch (Exception e) {
47+
if (!isReactorStopped(e))
48+
throw e;
49+
log.warn("OpenSearch I/O reactor is STOPPED; rebuilding client and retrying once", e);
50+
rebuild();
51+
return action.apply(client);
52+
}
53+
}
54+
55+
public synchronized void rebuild() {
56+
final String ctx = CLASSNAME + ".rebuild";
57+
try {
58+
OpenSearch old = this.client;
59+
buildClient();
60+
tryClose(old);
61+
} catch (Exception e) {
62+
String msg = ctx + ": " + e.getMessage();
63+
log.error(msg);
64+
throw new RuntimeException(msg);
65+
}
66+
}
67+
68+
private synchronized void buildClient() {
69+
final String ctx = CLASSNAME + ".buildClient";
2570
try {
2671
String host = System.getenv(Constants.ENV_ELASTICSEARCH_HOST);
27-
Assert.hasText(host, "Environment variable ELASTICSEARCH_HOST is missing or his value is null or empty");
72+
Assert.hasText(host, "Environment variable ELASTICSEARCH_HOST is missing or its value is null or empty");
2873

2974
String port = System.getenv(Constants.ENV_ELASTICSEARCH_PORT);
30-
Assert.hasText(port, "Environment variable ELASTICSEARCH_PORT is missing or his value is null or empty");
75+
Assert.hasText(port, "Environment variable ELASTICSEARCH_PORT is missing or its value is null or empty");
3176

3277
String user = System.getenv(Constants.ENV_ELASTICSEARCH_USER);
33-
Assert.hasText(user, "Environment variable ELASTICSEARCH_USER is missing or his value is null or empty");
78+
Assert.hasText(user, "Environment variable ELASTICSEARCH_USER is missing or its value is null or empty");
3479

3580
String password = System.getenv(Constants.ENV_ELASTICSEARCH_PASSWORD);
36-
Assert.hasText(password, "Environment variable ELASTICSEARCH_PASSWORD is missing or his value is null or empty");
81+
Assert.hasText(password, "Environment variable ELASTICSEARCH_PASSWORD is missing or its value is null or empty");
3782

38-
client = OpenSearch.builder()
83+
this.client = OpenSearch.builder()
3984
.withHost(host, Integer.parseInt(port), HttpScheme.https)
4085
.withCredentials(user, password)
4186
.build();
@@ -46,7 +91,28 @@ public void init() throws Exception {
4691
}
4792
}
4893

49-
public OpenSearch getClient() {
50-
return client;
94+
private void tryClose(OpenSearch old) {
95+
if (old == null) return;
96+
try {
97+
if (old instanceof AutoCloseable) {
98+
((AutoCloseable) old).close();
99+
}
100+
} catch (Exception ignored) {
101+
// best-effort: the old client is unusable anyway
102+
}
103+
}
104+
105+
/**
106+
* Detects the Apache HttpAsyncClient "Request cannot be executed; I/O reactor status: STOPPED"
107+
* condition anywhere in the cause chain.
108+
*/
109+
public static boolean isReactorStopped(Throwable t) {
110+
while (t != null) {
111+
String msg = t.getMessage();
112+
if (msg != null && msg.contains("I/O reactor") && msg.contains("STOPPED"))
113+
return true;
114+
t = t.getCause();
115+
}
116+
return false;
51117
}
52118
}

backend/src/main/java/com/park/utmstack/util/UtilCsv.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.park.utmstack.util;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.jayway.jsonpath.DocumentContext;
45
import com.jayway.jsonpath.JsonPath;
56
import com.jayway.jsonpath.PathNotFoundException;
67
import com.park.utmstack.domain.shared_types.DataColumn;
@@ -13,6 +14,7 @@
1314
import org.springframework.util.StringUtils;
1415

1516
import javax.servlet.http.HttpServletResponse;
17+
import java.io.IOException;
1618
import java.time.Instant;
1719
import java.time.format.DateTimeFormatter;
1820
import java.util.ArrayList;
@@ -51,6 +53,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
5153
List<String[]> rows = new ArrayList<>();
5254

5355
data.forEach(d -> {
56+
DocumentContext docctx = JsonPath.parse(d);
5457
String[] cells = new String[columns.length];
5558
for (int i = 0; i < columns.length; i++) {
5659
String fieldName = columns[i].getField();
@@ -59,7 +62,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
5962

6063
Object value;
6164
try {
62-
value = JsonPath.parse(d).read("$." + fieldName);
65+
value = docctx.read("$." + fieldName);
6366
} catch (PathNotFoundException e) {
6467
continue;
6568
}
@@ -81,6 +84,7 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
8184
cells[i] = value.toString();
8285
}
8386
}
87+
cells[i] = sanitizeCsvCell(cells[i]);
8488
}
8589
rows.add(cells);
8690
});
@@ -104,4 +108,91 @@ public static void prepareToDownload(HttpServletResponse response, DataColumn[]
104108
throw new UtmCsvException(msg);
105109
}
106110
}
111+
112+
/**
113+
* Opens a CSV response stream: sets content-type/disposition headers and writes the header row.
114+
* Caller is responsible for closing the returned printer (try-with-resources is fine).
115+
*
116+
* Column names are normalized in-place by stripping a trailing {@code .keyword}.
117+
*/
118+
public static CSVPrinter openCsvStream(HttpServletResponse response, DataColumn[] columns) throws IOException {
119+
Assert.notEmpty(columns);
120+
121+
Arrays.stream(columns).forEach(column ->
122+
column.setField(column.getField().replace(".keyword", "")));
123+
124+
String[] headers = Stream.of(columns).map(column -> {
125+
if (StringUtils.hasText(column.getLabel()))
126+
return column.getLabel();
127+
return column.getField().replace(".keyword", "");
128+
}).toArray(String[]::new);
129+
130+
response.setContentType("text/csv");
131+
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=data.csv");
132+
133+
return new CSVPrinter(response.getWriter(),
134+
CSVFormat.DEFAULT.withHeader(headers).withQuoteMode(QuoteMode.ALL));
135+
}
136+
137+
/**
138+
* Writes a batch of source maps as CSV rows using the same field-extraction logic as
139+
* {@link #prepareToDownload}. Intended to be called repeatedly while paginating through
140+
* a large result set; pair with {@link #openCsvStream}.
141+
*/
142+
public static void writeCsvBatch(CSVPrinter printer, DataColumn[] columns, List<?> data) throws IOException {
143+
if (data == null || data.isEmpty()) return;
144+
145+
final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss z")
146+
.withLocale(Locale.getDefault()).withZone(TimezoneUtil.getAppTimezone());
147+
148+
for (Object d : data) {
149+
DocumentContext ctx = JsonPath.parse(d);
150+
String[] cells = new String[columns.length];
151+
for (int i = 0; i < columns.length; i++) {
152+
String fieldName = columns[i].getField();
153+
String fieldType = columns[i].getType();
154+
cells[i] = null;
155+
156+
Object value;
157+
try {
158+
value = ctx.read("$." + fieldName);
159+
} catch (PathNotFoundException e) {
160+
continue;
161+
}
162+
163+
if (value == null)
164+
continue;
165+
166+
if (value instanceof String) {
167+
cells[i] = "date".equals(fieldType) ? DATE_FORMATTER.format(Instant.parse(String.valueOf(value))) :
168+
String.valueOf(value).replace("\n", " ").replace("\t", " ");
169+
} else if (value instanceof List) {
170+
cells[i] = ((List<?>) value).stream().map(String::valueOf).collect(Collectors.joining(","));
171+
} else if (value instanceof Number) {
172+
cells[i] = String.valueOf(value);
173+
} else if (value instanceof Map) {
174+
try {
175+
cells[i] = OBJECT_MAPPER.writeValueAsString(value);
176+
} catch (Exception ex) {
177+
cells[i] = value.toString();
178+
}
179+
}
180+
cells[i] = sanitizeCsvCell(cells[i]);
181+
}
182+
printer.printRecord((Object[]) cells);
183+
}
184+
printer.flush();
185+
}
186+
187+
/**
188+
* Neutralizes CSV-injection payloads by prefixing a single quote to any cell whose first
189+
* character is interpreted as a formula trigger by Excel/LibreOffice/Sheets.
190+
*/
191+
private static String sanitizeCsvCell(String value) {
192+
if (value == null || value.isEmpty()) return value;
193+
char first = value.charAt(0);
194+
if (first == '=' || first == '+' || first == '-' || first == '@' || first == '\t' || first == '\r')
195+
return "'" + value;
196+
return value;
197+
}
107198
}

0 commit comments

Comments
 (0)