Skip to content

Commit 1f48e3c

Browse files
committed
feat: refactor asset synchronization logic and enhance data source aggregation
1 parent 33d6d53 commit 1f48e3c

File tree

2 files changed

+109
-84
lines changed

2 files changed

+109
-84
lines changed

backend/src/main/java/com/park/utmstack/service/network_scan/AssetSynchronizationService.java

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -41,113 +41,123 @@ public class AssetSynchronizationService {
4141
@Transactional
4242
@Scheduled(fixedDelay = 60000, initialDelay = 120000)
4343
public void syncDataInputsAndAssets() {
44-
4544
String correlationId = UUID.randomUUID().toString().substring(0, 8);
4645
log.info("[{}] Starting unified asset synchronization cycle", correlationId);
4746

4847
try {
49-
Map<String, AgentDTO> agentsMap = loadAgents();
5048
Map<String, StatisticDocument> statsMap = sourceActivityProvider.fetchLatestSourceActivity();
51-
5249
if (statsMap.isEmpty()) {
5350
log.debug("[{}] No new activity detected in data sources", correlationId);
5451
return;
5552
}
5653

57-
List<String> sourcesKeys = new ArrayList<>(statsMap.keySet());
58-
59-
List<UtmDataInputStatus> dataInputStatus = dataInputStatusService.findDataInputStatus();
60-
61-
Map<String, UtmDataInputStatus> currentDataInputStatusMap = dataInputStatus
62-
.stream()
63-
.collect(Collectors.toMap(UtmDataInputStatus::getId, f -> f));
64-
65-
Map<String, UtmNetworkScan> currentAssetsMap =
66-
networkScanRepository.findByAssetIpInOrAssetNameIn(sourcesKeys, sourcesKeys)
67-
.stream()
68-
.collect(Collectors.toMap(UtmNetworkScan::getAssetName, f -> f));
54+
Map<String, AgentDTO> agentsMap = loadAgents();
55+
Map<String, UtmDataInputStatus> statusMap = buildDataInputStatusMap();
56+
Map<String, UtmNetworkScan> assetsMap = buildNetworkAssetsMap(new ArrayList<>(statsMap.keySet()));
6957

7058
List<UtmDataInputStatus> statusToSave = new ArrayList<>();
7159
List<UtmNetworkScan> assetsToSave = new ArrayList<>();
7260

73-
Map<String, List<StatisticDocument>> statsBySource = statsMap.values()
74-
.stream()
75-
.collect(Collectors.groupingBy(StatisticDocument::getDataSource));
61+
for (String sourceName : statsMap.keySet()) {
7662

77-
statsBySource.forEach((sourceName, stats) -> {
63+
StatisticDocument stat = statsMap.get(sourceName);
64+
UtmDataInputStatus status = processDataInputStatus(stat, statusMap);
65+
statusToSave.add(status);
7866

79-
for (StatisticDocument stat : stats) {
80-
processDataInputStatus(stat, currentDataInputStatusMap, statusToSave);
81-
}
67+
// Update network asset
68+
UtmNetworkScan asset = processNetworkAsset(sourceName, agentsMap, assetsMap, statusMap);
69+
assetsToSave.add(asset);
70+
}
8271

83-
processNetworkAsset(sourceName, agentsMap, currentAssetsMap, currentDataInputStatusMap, assetsToSave);
84-
});
72+
if (!statusToSave.isEmpty()) {
73+
dataInputStatusRepository.saveAll(statusToSave);
74+
}
8575

86-
if (!statusToSave.isEmpty()) dataInputStatusRepository.saveAll(statusToSave);
87-
if (!assetsToSave.isEmpty()) networkScanRepository.saveAll(assetsToSave);
76+
if (!assetsToSave.isEmpty()) {
77+
networkScanRepository.saveAll(assetsToSave);
78+
}
8879

89-
log.info("[{}] Asset synchronization cycle completed successfully - {} data input status updated, {} assets synced",
80+
log.info("[{}] Asset synchronization cycle completed - {} status updated, {} assets synced",
9081
correlationId, statusToSave.size(), assetsToSave.size());
9182

9283
} catch (Exception e) {
9384
log.error("[{}] Critical error during asset synchronization: {}", correlationId, e.getMessage(), e);
9485
}
9586
}
9687

97-
private void processDataInputStatus(StatisticDocument stat,
98-
Map<String, UtmDataInputStatus> currentStatusMap,
99-
List<UtmDataInputStatus> statusToSave) {
88+
private Map<String, UtmDataInputStatus> buildDataInputStatusMap() {
89+
return dataInputStatusService.findDataInputStatus()
90+
.stream()
91+
.collect(Collectors.toMap(UtmDataInputStatus::getId, Function.identity()));
92+
}
93+
94+
private Map<String, UtmNetworkScan> buildNetworkAssetsMap(List<String> sourcesKeys) {
95+
return networkScanRepository.findByAssetIpInOrAssetNameIn(sourcesKeys, sourcesKeys)
96+
.stream()
97+
.collect(Collectors.toMap(UtmNetworkScan::getAssetName, Function.identity(), (a1, a2) -> a1));
98+
}
10099

100+
private UtmDataInputStatus processDataInputStatus(StatisticDocument stat,
101+
Map<String, UtmDataInputStatus> statusMap) {
101102
String statusId = stat.getDataType() + "-" + stat.getDataSource();
102103
long statTimestamp = Instant.parse(stat.getTimestamp()).getEpochSecond();
103104

104-
UtmDataInputStatus status = currentStatusMap.getOrDefault(statusId,
105-
UtmDataInputStatus.builder()
106-
.id(statusId)
107-
.dataType(stat.getDataType())
108-
.timestamp(statTimestamp)
109-
.source(stat.getDataSource())
110-
.median(86400L)
111-
.build());
105+
UtmDataInputStatus status = statusMap.getOrDefault(statusId, createNewDataInputStatus(statusId, stat, statTimestamp));
112106

113-
boolean isExisting = status.getId() != null;
114-
115-
if (isExisting && status.getTimestamp() != statTimestamp) {
107+
if (status.getTimestamp() != statTimestamp) {
116108
status.setTimestamp(statTimestamp);
117109
}
118110

119-
statusToSave.add(status);
111+
return status;
120112
}
121113

122-
private void processNetworkAsset(String sourceName,
123-
Map<String, AgentDTO> agentsMap,
124-
Map<String, UtmNetworkScan> currentAssetsMap,
125-
Map<String, UtmDataInputStatus> currentDataInputStatusMap,
126-
List<UtmNetworkScan> assetsToSave) {
114+
private UtmDataInputStatus createNewDataInputStatus(String id, StatisticDocument stat, long timestamp) {
115+
return UtmDataInputStatus.builder()
116+
.id(id)
117+
.dataType(stat.getDataType())
118+
.timestamp(timestamp)
119+
.source(stat.getDataSource())
120+
.median(86400L)
121+
.build();
122+
}
127123

128-
boolean hasAlias = false;
129-
boolean isAlive = currentDataInputStatusMap.values().stream()
124+
private UtmNetworkScan processNetworkAsset(String sourceName,
125+
Map<String, AgentDTO> agentsMap,
126+
Map<String, UtmNetworkScan> assetsMap,
127+
Map<String, UtmDataInputStatus> statusMap) {
128+
boolean isAlive = isDataSourceAlive(sourceName, statusMap);
129+
UtmNetworkScan asset = resolveAsset(sourceName, assetsMap);
130+
boolean isExisting = asset != null && asset.getId() != null;
131+
132+
if (asset == null) {
133+
asset = new UtmNetworkScan(sourceName, isAlive);
134+
}
135+
136+
enrichAssetWithData(asset, sourceName, agentsMap, isAlive, isExisting);
137+
return asset;
138+
}
139+
140+
private boolean isDataSourceAlive(String sourceName, Map<String, UtmDataInputStatus> statusMap) {
141+
return statusMap.values().stream()
130142
.filter(status -> status.getSource().equalsIgnoreCase(sourceName))
131143
.anyMatch(s -> !s.isDown());
144+
}
132145

133-
UtmNetworkScan asset = currentAssetsMap.get(sourceName);
134-
String resolvedAssetName = null;
146+
private UtmNetworkScan resolveAsset(String sourceName, Map<String, UtmNetworkScan> assetsMap) {
147+
UtmNetworkScan asset = assetsMap.get(sourceName);
135148

136149
if (asset == null) {
137150
asset = resolveAssetNameFromTenantConfig(sourceName);
138-
hasAlias = asset != null;
139151
}
140152

141-
boolean isExisting = asset != null && asset.getId() != null;
142-
143-
if (asset == null) {
144-
asset = new UtmNetworkScan(sourceName, isAlive);
145-
} else {
146-
if (hasAlias) {
147-
asset.assetName(sourceName);
148-
}
149-
}
153+
return asset;
154+
}
150155

156+
private void enrichAssetWithData(UtmNetworkScan asset,
157+
String sourceName,
158+
Map<String, AgentDTO> agentsMap,
159+
boolean isAlive,
160+
boolean isExisting) {
151161
asset.assetAlive(isAlive)
152162
.updateLevel(UpdateLevel.DATASOURCE)
153163
.modifiedAt(LocalDateTime.now().toInstant(ZoneOffset.UTC));
@@ -166,8 +176,6 @@ private void processNetworkAsset(String sourceName,
166176
} else {
167177
asset.setIsAgent(false);
168178
}
169-
170-
assetsToSave.add(asset);
171179
}
172180

173181
private UtmNetworkScan resolveAssetNameFromTenantConfig(String sourceName) {

backend/src/main/java/com/park/utmstack/service/network_scan/SourceActivityProvider.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,21 @@ private SearchRequest buildActivityQuery(String fromTimestamp) {
7676
.field("dataSource.keyword")
7777
.size(10000)
7878
)
79-
.aggregations("latest_doc", latestAgg -> latestAgg
80-
.topHits(th -> th
81-
.size(1)
82-
.sort(sort -> sort.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
79+
.aggregations("by_type", typeAgg -> typeAgg
80+
.terms(t -> t
81+
.field("dataType.keyword")
82+
.size(10000)
83+
)
84+
// Get the latest document for each dataSource + dataType combination
85+
.aggregations("latest_doc", latestAgg -> latestAgg
86+
.topHits(th -> th
87+
.size(1)
88+
.sort(sort -> sort.field(f -> f.field("@timestamp").order(SortOrder.Desc)))
89+
)
8390
)
8491
)
8592
)
86-
.size(0) // We don't need the main hits, only aggregations
93+
.size(0)
8794
);
8895
}
8996

@@ -103,23 +110,33 @@ private Map<String, StatisticDocument> extractLatestHits(SearchResponse<Statisti
103110
return results;
104111
}
105112

106-
bySourceAgg.buckets().array().forEach(bucket -> {
107-
String dataSource = bucket.key();
108-
109-
var latestDocsAgg = bucket.aggregations().get("latest_doc");
110-
if (latestDocsAgg != null) {
111-
var topHits = latestDocsAgg.topHits();
112-
if (topHits != null && !topHits.hits().hits().isEmpty()) {
113-
var hit = topHits.hits().hits().get(0);
114-
if (hit.source() != null) {
115-
StatisticDocument doc = hit.source().to(StatisticDocument.class);
116-
if (doc != null) {
117-
results.put(dataSource, doc);
118-
log.debug("Extracted latest activity for source: {} at {}", dataSource, doc.getTimestamp());
113+
bySourceAgg.buckets().array().forEach(sourceBucket -> {
114+
String dataSource = sourceBucket.key();
115+
116+
var byTypeAgg = sourceBucket.aggregations().get("by_type").sterms();
117+
if (byTypeAgg == null || byTypeAgg.buckets().array().isEmpty()) {
118+
log.debug("No data type buckets found for source: {}", dataSource);
119+
return;
120+
}
121+
122+
byTypeAgg.buckets().array().forEach(typeBucket -> {
123+
String dataType = typeBucket.key();
124+
125+
var latestDocsAgg = typeBucket.aggregations().get("latest_doc");
126+
if (latestDocsAgg != null) {
127+
var topHits = latestDocsAgg.topHits();
128+
if (topHits != null && !topHits.hits().hits().isEmpty()) {
129+
var hit = topHits.hits().hits().get(0);
130+
if (hit.source() != null) {
131+
StatisticDocument doc = hit.source().to(StatisticDocument.class);
132+
if (doc != null) {
133+
String compositeKey = dataSource + "|" + dataType;
134+
results.put(compositeKey, doc);
135+
}
119136
}
120137
}
121138
}
122-
}
139+
});
123140
});
124141

125142
return results;
@@ -135,7 +152,7 @@ private void updateCheckpoint(UtmDataInputStatusCheckpoint checkpoint, Map<Strin
135152
try {
136153
return Instant.parse(doc.getTimestamp());
137154
} catch (Exception e) {
138-
log.warn("Failed to parse timestamp '{}' for document: {}", doc.getTimestamp(), e.getMessage());
155+
log.error("Failed to parse timestamp '{}': {}", doc.getTimestamp(), e.getMessage());
139156
return null;
140157
}
141158
})
@@ -145,7 +162,7 @@ private void updateCheckpoint(UtmDataInputStatusCheckpoint checkpoint, Map<Strin
145162
latest -> {
146163
checkpoint.setLastProcessedTimestamp(latest);
147164
checkpointRepository.save(checkpoint);
148-
log.info("Checkpoint updated to: {}", latest);
165+
log.info("Checkpoint updated to: {} ({} active sources)", latest, activityMap.size());
149166
},
150167
() -> log.debug("No valid timestamps found to update checkpoint")
151168
);

0 commit comments

Comments
 (0)