Skip to content

Commit 5ec71ea

Browse files
committed
fix(UtmDataInputStatusService): optimize data synchronization and improve error handling
1 parent 5034287 commit 5ec71ea

File tree

1 file changed

+65
-45
lines changed

1 file changed

+65
-45
lines changed

backend/src/main/java/com/park/utmstack/service/UtmDataInputStatusService.java

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.time.ZoneOffset;
4848
import java.util.*;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.function.Function;
5051
import java.util.stream.Collectors;
5152

5253
/**
@@ -173,42 +174,58 @@ private void checkDataInputStatus(List<UtmDataInputStatus> inputs, String server
173174
}
174175
}
175176

176-
@Scheduled(fixedDelay = 5000, initialDelay = 30000)
177+
@Scheduled(fixedDelay = 15000, initialDelay = 30000)
177178
public void syncDataInputStatus() {
178179
final String ctx = CLASSNAME + ".syncDataInputStatus";
180+
179181
try {
180-
Map<String, StatisticDocument> result = getLatestStatisticsByDataSource();
182+
Map<String, StatisticDocument> latestStats = getLatestStatisticsByDataSource();
183+
184+
Map<String, UtmDataInputStatus> existing = dataInputStatusRepository.findAll()
185+
.stream()
186+
.collect(Collectors.toMap(
187+
e -> e.getDataType() + "-" + e.getSource(),
188+
Function.identity()
189+
));
190+
191+
List<UtmDataInputStatus> toSave = new ArrayList<>();
181192

182-
result.forEach((key, statisticDoc) -> {
193+
latestStats.forEach((key, stat) -> {
183194
try {
184-
String dataType = statisticDoc.getDataType();
185-
String dataSource = statisticDoc.getDataSource();
186-
long timestamp = Instant.parse(statisticDoc.getTimestamp()).getEpochSecond();
187-
188-
Optional<UtmDataInputStatus> existingOpt = dataInputStatusRepository.findBySourceAndDataType(dataSource, dataType);
189-
190-
UtmDataInputStatus dataInputStatus = existingOpt
191-
.map(existing -> {
192-
if(timestamp != existing.getTimestamp()) {
193-
existing.setTimestamp(timestamp);
194-
}
195-
return existing;
196-
})
197-
.orElseGet(() -> UtmDataInputStatus.builder()
198-
.dataType(dataType)
199-
.source(dataSource)
200-
.timestamp(timestamp)
201-
.median(86400L)
202-
.id(String.join("-", dataType, dataSource))
203-
.build());
204-
205-
dataInputStatusRepository.save(dataInputStatus);
195+
String dataType = stat.getDataType();
196+
String dataSource = stat.getDataSource();
197+
long timestamp = Instant.parse(stat.getTimestamp()).getEpochSecond();
198+
199+
String compositeKey = dataType + "-" + dataSource;
200+
201+
UtmDataInputStatus status = existing.get(compositeKey);
202+
boolean changed = false;
203+
204+
if (status == null) {
205+
status = UtmDataInputStatus.builder()
206+
.id(compositeKey)
207+
.dataType(dataType)
208+
.source(dataSource)
209+
.timestamp(timestamp)
210+
.median(86400L)
211+
.build();
212+
changed = true;
213+
} else if (status.getTimestamp() != timestamp) {
214+
status.setTimestamp(timestamp);
215+
changed = true;
216+
}
217+
218+
if (changed) {
219+
toSave.add(status);
220+
}
206221

207222
} catch (Exception e) {
208-
log.error("{}: Error processing dataType {} - {}", ctx, statisticDoc.getDataType(), e.getMessage(), e);
223+
log.error("{}: Error processing dataType {} - {}", ctx, stat.getDataType(), e.getMessage(), e);
209224
}
210225
});
211226

227+
dataInputStatusRepository.saveAll(toSave);
228+
212229
} catch (Exception e) {
213230
String msg = ctx + ": " + e.getMessage();
214231
log.error(msg, e);
@@ -423,40 +440,43 @@ private Map<String, Object> createAlertForDatasourceDown(UtmDataInputStatus inpu
423440
}
424441

425442
private Map<String, StatisticDocument> getLatestStatisticsByDataSource() {
443+
426444
ArrayList<FilterType> filters = new ArrayList<>();
427445
filters.add(new FilterType("type", OperatorType.IS, "enqueue_success"));
428446
filters.add(new FilterType("@timestamp", OperatorType.IS_BETWEEN, List.of("now-24h", "now")));
429447

430448
SearchRequest sr = SearchRequest.of(s -> s
431449
.query(SearchUtil.toQuery(filters))
432450
.index(Constants.STATISTICS_INDEX_PATTERN)
433-
.aggregations("by_dataSource", agg -> agg
434-
.terms(t -> t.field("dataSource.keyword")
435-
.size(10000))
436-
.aggregations("latest", latest -> latest
437-
.topHits(th -> th.sort(sort -> sort.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
438-
.size(1))
451+
.collapse(c -> c
452+
.field("dataSource.keyword")
453+
.innerHits(ih -> ih
454+
.name("latest")
455+
.size(1)
456+
.sort(sort -> sort.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
439457
)
440458
)
441-
.size(0)
459+
.sort(sort -> sort.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
460+
.size(10000) // máximo de dataSources esperados
442461
);
443462

444-
SearchResponse<StatisticDocument> response = elasticsearchService.search(sr, StatisticDocument.class);
445-
Map<String, StatisticDocument> result = new HashMap<>();
446-
447-
List<BucketAggregation> dataTypeBuckets = TermAggregateParser.parse(response.aggregations().get("by_dataSource"));
463+
SearchResponse<StatisticDocument> response =
464+
elasticsearchService.search(sr, StatisticDocument.class);
448465

449-
for (BucketAggregation bucket : dataTypeBuckets) {
450-
TopHitsAggregate topHitsAgg = bucket.getSubAggregations().get("latest").topHits();
466+
Map<String, StatisticDocument> result = new HashMap<>();
451467

452-
if (topHitsAgg != null && !topHitsAgg.hits().hits().isEmpty()) {
453-
JsonData jsonData = topHitsAgg.hits().hits().get(0).source();
454-
if (!Objects.isNull(jsonData)) {
455-
StatisticDocument doc = jsonData.to(StatisticDocument.class);
456-
result.put(bucket.getKey(), doc);
468+
response.hits().hits().forEach(hit -> {
469+
if (hit.innerHits() != null && hit.innerHits().containsKey("latest")) {
470+
var inner = hit.innerHits().get("latest").hits().hits();
471+
if (!inner.isEmpty()) {
472+
JsonData json = inner.get(0).source();
473+
if (json != null) {
474+
StatisticDocument doc = json.to(StatisticDocument.class);
475+
result.put(doc.getDataSource(), doc);
476+
}
457477
}
458478
}
459-
}
479+
});
460480

461481
return result;
462482
}

0 commit comments

Comments
 (0)