1010import com .park .utmstack .service .logstash_pipeline .response .statistic .StatisticDocument ;
1111import lombok .RequiredArgsConstructor ;
1212import lombok .extern .slf4j .Slf4j ;
13- import org .opensearch .client .json .JsonData ;
1413import org .opensearch .client .opensearch ._types .SortOrder ;
1514import org .opensearch .client .opensearch .core .SearchRequest ;
1615import org .opensearch .client .opensearch .core .SearchResponse ;
@@ -37,7 +36,6 @@ public class SourceActivityProvider {
3736 public Map <String , StatisticDocument > fetchLatestSourceActivity () {
3837 UtmDataInputStatusCheckpoint checkpoint = getOrCreateCheckpoint ();
3938
40-
4139 String fromTimestamp = checkpoint .getLastProcessedTimestamp ()
4240 .minus (OVERLAP_SECONDS , ChronoUnit .SECONDS )
4341 .toString ();
@@ -51,12 +49,15 @@ public Map<String, StatisticDocument> fetchLatestSourceActivity() {
5149 Map <String , StatisticDocument > activityMap = extractLatestHits (response );
5250
5351 if (!activityMap .isEmpty ()) {
52+ log .debug ("Fetched {} active sources from statistics index" , activityMap .size ());
5453 updateCheckpoint (checkpoint , activityMap );
54+ } else {
55+ log .debug ("No new source activity found since checkpoint: {}" , checkpoint .getLastProcessedTimestamp ());
5556 }
5657
5758 return activityMap ;
5859 } catch (Exception e ) {
59- log .error ("Error consultando telemetría en Elasticsearch: {}" , e .getMessage ());
60+ log .error ("Error fetching telemetry from Elasticsearch: {}" , e .getMessage (), e );
6061 return Collections .emptyMap ();
6162 }
6263 }
@@ -70,45 +71,84 @@ private SearchRequest buildActivityQuery(String fromTimestamp) {
7071 return SearchRequest .of (s -> s
7172 .index (Constants .STATISTICS_INDEX_PATTERN )
7273 .query (SearchUtil .toQuery (filters ))
73- .collapse (c -> c
74- .field ("dataSource.keyword" )
75- .innerHits (ih -> ih
76- .name ("latest" )
77- .size (1 )
78- .sort (sort -> sort .field (f -> f .field ("@timestamp" ).order (SortOrder .Desc )))
74+ .aggregations ("by_source" , agg -> agg
75+ .terms (t -> t
76+ .field ("dataSource.keyword" )
77+ .size (10000 )
78+ )
79+ .aggregations ("latest_doc" , latestAgg -> latestAgg
80+ .topHits (th -> th
81+ .size (1 )
82+ .sort (sort -> sort .field (f -> f .field ("@timestamp" ).order (SortOrder .Desc )))
83+ )
7984 )
8085 )
81- .size (10000 )
86+ .size (0 ) // We don't need the main hits, only aggregations
8287 );
8388 }
8489
8590 private Map <String , StatisticDocument > extractLatestHits (SearchResponse <StatisticDocument > response ) {
8691 Map <String , StatisticDocument > results = new HashMap <>();
8792
88- response .hits ().hits ().forEach (hit -> {
89- if (hit .innerHits () != null && hit .innerHits ().containsKey ("latest" )) {
90- var innerHits = hit .innerHits ().get ("latest" ).hits ().hits ();
91- if (!innerHits .isEmpty ()) {
92- JsonData json = innerHits .get (0 ).source ();
93- if (json != null ) {
94- StatisticDocument doc = json .to (StatisticDocument .class );
95- results .put (doc .getDataSource (), doc );
93+ try {
94+ var aggregations = response .aggregations ();
95+ if (aggregations == null || aggregations .get ("by_source" ) == null ) {
96+ log .warn ("No aggregation results found in response" );
97+ return results ;
98+ }
99+
100+ var bySourceAgg = aggregations .get ("by_source" ).sterms ();
101+ if (bySourceAgg == null || bySourceAgg .buckets ().array ().isEmpty ()) {
102+ log .debug ("No data source buckets found in aggregation" );
103+ return results ;
104+ }
105+
106+ bySourceAgg .buckets ().array ().forEach (bucket -> {
107+ String dataSource = bucket .key ();
108+
109+ var latestDocsAgg = bucket .aggregations ().get ("latest_doc" );
110+ if (latestDocsAgg != null ) {
111+ var topHits = latestDocsAgg .topHits ();
112+ if (topHits != null && !topHits .hits ().hits ().isEmpty ()) {
113+ var hit = topHits .hits ().hits ().get (0 );
114+ if (hit .source () != null ) {
115+ StatisticDocument doc = hit .source ().to (StatisticDocument .class );
116+ if (doc != null ) {
117+ results .put (dataSource , doc );
118+ log .debug ("Extracted latest activity for source: {} at {}" , dataSource , doc .getTimestamp ());
119+ }
120+ }
96121 }
97122 }
98- }
99- });
100- return results ;
123+ });
124+
125+ return results ;
126+ } catch (Exception e ) {
127+ log .error ("Error extracting latest hits from aggregation response: {}" , e .getMessage (), e );
128+ return results ;
129+ }
101130 }
102131
103132 private void updateCheckpoint (UtmDataInputStatusCheckpoint checkpoint , Map <String , StatisticDocument > activityMap ) {
104133 activityMap .values ().stream ()
105- .map (doc -> Instant .parse (doc .getTimestamp ()))
134+ .map (doc -> {
135+ try {
136+ return Instant .parse (doc .getTimestamp ());
137+ } catch (Exception e ) {
138+ log .warn ("Failed to parse timestamp '{}' for document: {}" , doc .getTimestamp (), e .getMessage ());
139+ return null ;
140+ }
141+ })
142+ .filter (java .util .Objects ::nonNull )
106143 .max (Instant ::compareTo )
107- .ifPresent (latest -> {
108- checkpoint .setLastProcessedTimestamp (latest );
109- checkpointRepository .save (checkpoint );
110- log .debug ("Checkpoint actualizado a: {}" , latest );
111- });
144+ .ifPresentOrElse (
145+ latest -> {
146+ checkpoint .setLastProcessedTimestamp (latest );
147+ checkpointRepository .save (checkpoint );
148+ log .info ("Checkpoint updated to: {}" , latest );
149+ },
150+ () -> log .debug ("No valid timestamps found to update checkpoint" )
151+ );
112152 }
113153
114154 private UtmDataInputStatusCheckpoint getOrCreateCheckpoint () {
0 commit comments