Skip to content

Commit 0393888

Browse files
committed
feat(logstash): enhance logstash stats retrieval with improved error handling and pipeline status management
1 parent 9b75b97 commit 0393888

File tree

1 file changed

+45
-44
lines changed

1 file changed

+45
-44
lines changed

backend/src/main/java/com/park/utmstack/service/logstash_pipeline/UtmLogstashPipelineService.java

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.park.utmstack.service.logstash_pipeline.response.pipeline.PipelineStats;
2121
import com.park.utmstack.service.logstash_pipeline.response.statistic.StatisticDocument;
2222
import com.park.utmstack.service.web_clients.rest_template.RestTemplateService;
23+
import com.park.utmstack.util.exceptions.ApiException;
2324
import com.park.utmstack.web.rest.vm.UtmLogstashPipelineVM;
2425
import com.utmstack.opensearch_connector.parsers.TermAggregateParser;
2526
import com.utmstack.opensearch_connector.types.BucketAggregation;
@@ -38,6 +39,7 @@
3839
import org.springframework.data.domain.Page;
3940
import org.springframework.data.domain.Pageable;
4041
import org.springframework.data.support.PageableExecutionUtils;
42+
import org.springframework.http.HttpStatus;
4143
import org.springframework.scheduling.annotation.Scheduled;
4244
import org.springframework.stereotype.Service;
4345
import org.springframework.transaction.annotation.Transactional;
@@ -267,57 +269,56 @@ public ApiEngineResponse logstashJvmApiResponse() {
267269
* Getting active pipelines stats from DB, general jvm stats from logstash
268270
*/
269271
public ApiStatsResponse getLogstashStats() throws Exception {
270-
final String ctx = CLASSNAME + ".getLogstashStats";
271-
ApiStatsResponse statsResponse = new ApiStatsResponse();
272+
final String ctx = CLASSNAME + ".getLogstashStats";
272273

273-
// Variables used to set the general pipeline's status
274-
AtomicInteger activePipelinesCount = new AtomicInteger();
275-
AtomicInteger upPipelinesCount = new AtomicInteger();
274+
try {
275+
ApiStatsResponse statsResponse = new ApiStatsResponse();
276276
boolean isCorrelationUp = isEngineUp();
277277

278-
try {
279-
// Getting Jvm information (not used)
280-
ApiEngineResponse jvmData = logstashJvmApiResponse();
281-
if (jvmData != null) {
282-
statsResponse.setGeneral(jvmData);
283-
}
284-
// List to store stats mapped from DB
285-
List<PipelineStats> infoStats;
286-
287-
// Getting the active pipelines statistics
288-
infoStats = activePipelinesList().stream().map(activePip -> {
289-
290-
// Calculating stats for pipelines
291-
// Setting stats for non-logstash pipelines (correlation engine)
292-
if (isCorrelationUp) {
293-
activePipelinesCount.getAndIncrement(); // Total pipelines that have to be active
294-
if (activePip.getPipelineStatus().equals(PipelineStatus.PIPELINE_STATUS_UP.get())) {
295-
upPipelinesCount.getAndIncrement();
296-
}
297-
} else {
298-
activePip.setPipelineStatus(PipelineStatus.PIPELINE_STATUS_DOWN.get());
299-
}
300-
// }
301-
// Mapping stats from DB pipeline
302-
return PipelineStats.getPipelineStats(activePip);
303-
}).collect(Collectors.toList());
304-
305-
// Setting the final global status of pipelines
306-
if (isCorrelationUp) {
307-
if (upPipelinesCount.get() == 0) {
308-
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_RED.get());
309-
} else if (upPipelinesCount.get() == activePipelinesCount.get()) {
310-
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_GREEN.get());
311-
} else {
312-
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_YELLOW.get());
313-
}
278+
ApiEngineResponse jvmData = logstashJvmApiResponse();
279+
if (jvmData != null) {
280+
statsResponse.setGeneral(jvmData);
281+
}
282+
283+
List<UtmLogstashPipeline> activePipelines = activePipelinesList();
284+
285+
if (!isCorrelationUp) {
286+
activePipelines.forEach(p ->
287+
p.setPipelineStatus(PipelineStatus.PIPELINE_STATUS_DOWN.get())
288+
);
289+
}
290+
291+
List<PipelineStats> pipelineStatsList = activePipelines.stream()
292+
.map(PipelineStats::getPipelineStats)
293+
.sorted( Comparator.comparing(PipelineStats::getPipelineStatus).reversed())
294+
.collect(Collectors.toList());
295+
296+
statsResponse.setPipelines(pipelineStatsList);
297+
298+
if (isCorrelationUp && jvmData != null) {
299+
long upCount = activePipelines.stream()
300+
.filter(p -> PipelineStatus.PIPELINE_STATUS_UP.get()
301+
.equals(p.getPipelineStatus()))
302+
.count();
303+
304+
int totalCount = activePipelines.size();
305+
306+
if (upCount == 0) {
307+
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_RED.get());
308+
} else if (upCount == totalCount) {
309+
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_GREEN.get());
310+
} else {
311+
jvmData.setStatus(PipelineStatus.ENGINE_STATUS_YELLOW.get());
314312
}
315-
statsResponse.setPipelines(infoStats);
316-
} catch (Exception ex) {
317-
throw new Exception(ctx + ": " + ex.getMessage());
318313
}
314+
319315
return statsResponse;
316+
317+
} catch (Exception ex) {
318+
log.error("{}: An error occurred while fetching logstash stats: {}", ctx, ex.getMessage(), ex);
319+
throw new ApiException(String.format("%s: An error occurred while fetching logstash stats", ctx), HttpStatus.INTERNAL_SERVER_ERROR);
320320
}
321+
}
321322

322323
/**
323324
* Method to set the DB pipelines status

0 commit comments

Comments
 (0)