Skip to content

Commit 6239553

Browse files
authored
Merge pull request #153 from rostilos/1.5.3-rc
feat: Add health check for RAG pipeline and enhance PR coverage checks to include merged PRs
2 parents 00dcf24 + dc7de78 commit 6239553

8 files changed

Lines changed: 87 additions & 28 deletions

File tree

java-ecosystem/libs/analysis-api/src/main/java/org/rostilos/codecrow/analysisapi/rag/RagOperationsService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ public interface RagOperationsService {
3434
*/
3535
boolean isRagIndexReady(Project project);
3636

37+
/**
38+
* Check if the RAG pipeline service is reachable (health check).
39+
* Used to verify connectivity before attempting RAG operations.
40+
*
41+
* @return true if the RAG pipeline is healthy and reachable, false otherwise
42+
*/
43+
default boolean isRagPipelineHealthy() {
44+
return true; // Default assumes healthy; implementations should do actual check
45+
}
46+
3747
/**
3848
* Trigger an incremental RAG update for the given project after a branch merge or commit.
3949
*

java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/processor/analysis/BranchAnalysisProcessor.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,13 @@ public Map<String, Object> process(
205205
augmentChangedFilesFromPr(changedFiles, project, prNumber);
206206

207207
// ── Hybrid analysis: AI analysis for uncovered direct pushes ─────
208-
// Check if unanalyzed commits are covered by open PRs.
208+
// Check if unanalyzed commits are covered by PRs (open or recently merged).
209209
// If NOT covered (direct push without PR), run full AI analysis
210210
// on the commit range diff, producing CodeAnalysisIssues marked
211211
// with DetectionSource.DIRECT_PUSH_ANALYSIS.
212212
performDirectPushAnalysisIfNeeded(
213213
project, request, unanalyzedCommits, rawDiff,
214-
changedFiles, provider, consumer);
214+
changedFiles, provider, consumer, prNumber);
215215

216216
EventNotificationEmitter.emitStatus(consumer, "analyzing_files",
217217
"Analyzing " + changedFiles.size() + " changed files");
@@ -516,7 +516,8 @@ private void performDirectPushAnalysisIfNeeded(
516516
String rawDiff,
517517
Set<String> changedFiles,
518518
EVcsProvider provider,
519-
Consumer<Map<String, Object>> consumer) {
519+
Consumer<Map<String, Object>> consumer,
520+
Long mergedPrNumber) {
520521

521522
if (unanalyzedCommits.isEmpty()) {
522523
log.debug("No unanalyzed commits — skipping direct push analysis check");
@@ -528,6 +529,16 @@ private void performDirectPushAnalysisIfNeeded(
528529
return;
529530
}
530531

532+
// ── Fast path: PR merge ──────────────────────────────────────────
533+
// If this branch analysis was triggered by a PR merge, the code was
534+
// already reviewed during the PR. Always skip — a merge is never a
535+
// direct push, and any missing analysis will be handled by reconciliation.
536+
if (mergedPrNumber != null) {
537+
log.info("Skipping direct push analysis — branch event originates from PR #{} merge (not a direct push)",
538+
mergedPrNumber);
539+
return;
540+
}
541+
531542
// Check if a PR analysis lock is active for this branch.
532543
// If so, wait — the PR analysis will handle these commits.
533544
boolean prAnalysisInProgress = analysisLockService.isLocked(
@@ -538,7 +549,7 @@ private void performDirectPushAnalysisIfNeeded(
538549
return;
539550
}
540551

541-
// Check commit coverage by open PRs
552+
// Check commit coverage by open/merged PRs
542553
CommitCoverageService.CoverageResult coverage = commitCoverageService.checkCoverage(
543554
project.getId(), request.getTargetBranchName(), unanalyzedCommits);
544555

@@ -640,6 +651,15 @@ private void performIncrementalRagUpdate(BranchProcessRequest request, Project p
640651
String targetBranch = request.getTargetBranchName();
641652
String baseBranch = ragOperationsService.getBaseBranch(project);
642653

654+
// Health check: verify RAG pipeline is reachable before starting
655+
if (!ragOperationsService.isRagPipelineHealthy()) {
656+
log.warn("RAG pipeline is not reachable — skipping incremental update for project={}",
657+
project.getId());
658+
EventNotificationEmitter.emitStatus(consumer, "rag_skipped",
659+
"RAG pipeline not reachable — skipping incremental update");
660+
return;
661+
}
662+
643663
if (targetBranch.equals(baseBranch)) {
644664
log.info("Main branch push - updating RAG index for project={}, branch={}, commit={}",
645665
project.getId(), targetBranch, request.getCommitHash());

java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/service/gitgraph/CommitCoverageService.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
/**
1818
* Determines whether unanalyzed commits on a branch are already covered
19-
* by open pull requests targeting that branch.
19+
* by pull requests (open or recently merged) targeting that branch.
2020
* <p>
2121
* Used by {@code BranchAnalysisProcessor} to decide whether to run
2222
* a full AI analysis (hybrid path) or only reconciliation.
@@ -59,15 +59,15 @@ public enum CoverageStatus {
5959
}
6060

6161
/**
62-
* Check if the given unanalyzed commits are covered by open PRs targeting the branch.
62+
* Check if the given unanalyzed commits are covered by PRs targeting the branch.
63+
* <p>
64+
* Checks both OPEN and MERGED PRs. MERGED PRs must be included because
65+
* the branch analysis flow marks a PR as MERGED <em>before</em> calling this
66+
* method, so a freshly-merged PR would otherwise be invisible.
6367
* <p>
6468
* A commit is considered "covered" if it was already analyzed as part of a PR analysis
6569
* (i.e., there exists a {@code CodeAnalysis} with {@code analysisType = PR_REVIEW}
66-
* whose commit hash matches, or the commit is in the DAG path of an analyzed PR).
67-
* <p>
68-
* We use a simpler heuristic: check if any open PR targeting this branch has
69-
* an analysis whose commit hash is the HEAD or one of the unanalyzed commits.
70-
* This avoids expensive DAG traversal through PR source branches.
70+
* whose commit hash matches).
7171
*
7272
* @param projectId the project ID
7373
* @param targetBranchName the branch being pushed to
@@ -80,20 +80,25 @@ public CoverageResult checkCoverage(Long projectId, String targetBranchName,
8080
return new CoverageResult(CoverageStatus.FULLY_COVERED, Collections.emptyList());
8181
}
8282

83-
// Find open PRs targeting this branch
84-
List<PullRequest> openPRs = pullRequestRepository.findByProjectIdAndTargetBranchNameAndState(
85-
projectId, targetBranchName, PullRequestState.OPEN);
86-
87-
if (openPRs.isEmpty()) {
88-
log.debug("No open PRs targeting branch {} — all {} commits are uncovered",
83+
// Find PRs targeting this branch — both OPEN and MERGED.
84+
// MERGED PRs must be included because:
85+
// 1. The branch analysis marks the PR as MERGED *before* calling this check
86+
// 2. Merge commits have different hashes from the PR's source commits,
87+
// so we need the PR's analyzed commit hash for coverage matching
88+
List<PullRequest> relevantPRs = pullRequestRepository.findByProjectIdAndTargetBranchNameAndStateIn(
89+
projectId, targetBranchName,
90+
List.of(PullRequestState.OPEN, PullRequestState.MERGED));
91+
92+
if (relevantPRs.isEmpty()) {
93+
log.debug("No open/merged PRs targeting branch {} — all {} commits are uncovered",
8994
targetBranchName, unanalyzedCommits.size());
9095
return new CoverageResult(CoverageStatus.NOT_COVERED, new ArrayList<>(unanalyzedCommits));
9196
}
9297

9398
// Collect all commit hashes that have been analyzed via PR_REVIEW for this project
9499
Set<String> prAnalyzedCommits = new HashSet<>();
95-
for (PullRequest pr : openPRs) {
96-
// Each open PR's latest commit hash was analyzed when the PR was analyzed
100+
for (PullRequest pr : relevantPRs) {
101+
// Each PR's latest commit hash was analyzed when the PR was reviewed
97102
if (pr.getCommitHash() != null) {
98103
// Check if there's an actual analysis for this PR's commit
99104
codeAnalysisRepository.findByProjectIdAndCommitHashAndPrNumber(
@@ -102,14 +107,15 @@ public CoverageResult checkCoverage(Long projectId, String targetBranchName,
102107
// The PR analysis covers this commit and potentially all commits
103108
// in the PR's source branch up to this point
104109
prAnalyzedCommits.add(pr.getCommitHash());
105-
log.debug("Open PR #{} has analysis for commit {} on target branch {}",
106-
pr.getPrNumber(), shortHash(pr.getCommitHash()), targetBranchName);
110+
log.debug("PR #{} (state={}) has analysis for commit {} on target branch {}",
111+
pr.getPrNumber(), pr.getState(),
112+
shortHash(pr.getCommitHash()), targetBranchName);
107113
});
108114
}
109115
}
110116

111117
if (prAnalyzedCommits.isEmpty()) {
112-
log.debug("Open PRs found but none have completed analyses — all {} commits uncovered",
118+
log.debug("PRs found but none have completed analyses — all {} commits uncovered",
113119
unanalyzedCommits.size());
114120
return new CoverageResult(CoverageStatus.NOT_COVERED, new ArrayList<>(unanalyzedCommits));
115121
}
@@ -120,16 +126,16 @@ public CoverageResult checkCoverage(Long projectId, String targetBranchName,
120126
.collect(Collectors.toList());
121127

122128
if (uncovered.isEmpty()) {
123-
log.info("All {} unanalyzed commits are covered by open PR analyses on branch {}",
129+
log.info("All {} unanalyzed commits are covered by PR analyses on branch {}",
124130
unanalyzedCommits.size(), targetBranchName);
125131
return new CoverageResult(CoverageStatus.FULLY_COVERED, Collections.emptyList());
126132
} else if (uncovered.size() < unanalyzedCommits.size()) {
127-
log.info("{}/{} unanalyzed commits covered by open PR analyses on branch {} — {} uncovered",
133+
log.info("{}/{} unanalyzed commits covered by PR analyses on branch {} — {} uncovered",
128134
unanalyzedCommits.size() - uncovered.size(), unanalyzedCommits.size(),
129135
targetBranchName, uncovered.size());
130136
return new CoverageResult(CoverageStatus.PARTIALLY_COVERED, uncovered);
131137
} else {
132-
log.info("No unanalyzed commits covered by open PR analyses on branch {}",
138+
log.info("No unanalyzed commits covered by PR analyses on branch {}",
133139
targetBranchName);
134140
return new CoverageResult(CoverageStatus.NOT_COVERED, new ArrayList<>(unanalyzedCommits));
135141
}

java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/service/pr/PrFileEnrichmentService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class PrFileEnrichmentService {
4242
@Value("${pr.enrichment.max-total-size-bytes:10485760}") // 10MB default
4343
private long maxTotalSizeBytes;
4444

45-
@Value("${pr.enrichment.rag-pipeline-url:http://localhost:8006}")
45+
@Value("${pr.enrichment.rag-pipeline-url:${codecrow.rag.api.url:http://rag-pipeline:8001}}")
4646
private String ragPipelineUrl;
4747

4848
@Value("${pr.enrichment.request-timeout-seconds:60}")

java-ecosystem/libs/analysis-engine/src/test/java/org/rostilos/codecrow/analysisengine/processor/analysis/BranchAnalysisProcessorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,11 @@ void shouldPerformRagUpdateOnMainBranch() throws Exception {
577577
when(branchRepository.findByIdWithIssues(10L)).thenReturn(Optional.of(savedBranch));
578578
when(branchRepository.save(any(Branch.class))).thenReturn(savedBranch);
579579

580-
// RAG enabled, index ready, main branch
580+
// RAG enabled, index ready, main branch, pipeline healthy
581581
when(ragOperationsService.isRagEnabled(project)).thenReturn(true);
582582
when(ragOperationsService.isRagIndexReady(project)).thenReturn(true);
583583
when(ragOperationsService.getBaseBranch(project)).thenReturn("main");
584+
when(ragOperationsService.isRagPipelineHealthy()).thenReturn(true);
584585

585586
// Final markHealthy
586587
when(branchRepository.findByProjectIdAndBranchName(1L, "main"))
@@ -642,6 +643,7 @@ void shouldCallUpdateBranchIndexForNonMainBranch() throws Exception {
642643
when(ragOperationsService.isRagEnabled(project)).thenReturn(true);
643644
when(ragOperationsService.isRagIndexReady(project)).thenReturn(true);
644645
when(ragOperationsService.getBaseBranch(project)).thenReturn("main");
646+
when(ragOperationsService.isRagPipelineHealthy()).thenReturn(true);
645647

646648
when(branchRepository.findByProjectIdAndBranchName(1L, "feature-x"))
647649
.thenReturn(Optional.empty())

java-ecosystem/libs/core/src/main/java/org/rostilos/codecrow/core/persistence/repository/pullrequest/PullRequestRepository.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,16 @@ List<PullRequest> findByProjectIdAndTargetBranchNameAndState(
3131
@Param("projectId") Long projectId,
3232
@Param("targetBranch") String targetBranch,
3333
@Param("state") PullRequestState state);
34+
35+
/**
36+
* Find PRs targeting a specific branch in any of the given states.
37+
* Used by commit coverage checks to include both OPEN and MERGED PRs
38+
* when determining if commits are already covered by PR analyses.
39+
*/
40+
@Query("SELECT pr FROM PullRequest pr WHERE pr.project.id = :projectId " +
41+
"AND pr.targetBranchName = :targetBranch AND pr.state IN :states")
42+
List<PullRequest> findByProjectIdAndTargetBranchNameAndStateIn(
43+
@Param("projectId") Long projectId,
44+
@Param("targetBranch") String targetBranch,
45+
@Param("states") List<PullRequestState> states);
3446
}

java-ecosystem/libs/rag-engine/src/main/java/org/rostilos/codecrow/ragengine/client/RagPipelineClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,11 @@ private Map<String, Object> doRequest(String url, Map<String, Object> payload, O
435435

436436
if (!response.isSuccessful()) {
437437
log.error("RAG API request failed: {} - {}", response.code(), responseBody);
438-
throw new IOException("RAG API error: " + response.code());
438+
// Include truncated response body in exception so callers can see the actual error
439+
String detail = responseBody.length() > 500
440+
? responseBody.substring(0, 500) + "..."
441+
: responseBody;
442+
throw new IOException("RAG API error: " + response.code() + " — " + detail);
439443
}
440444

441445
return objectMapper.readValue(responseBody, Map.class);

java-ecosystem/libs/rag-engine/src/main/java/org/rostilos/codecrow/ragengine/service/RagOperationsServiceImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public RagOperationsServiceImpl(
7171
this.ragPipelineClient = ragPipelineClient;
7272
}
7373

74+
@Override
75+
public boolean isRagPipelineHealthy() {
76+
return ragPipelineClient.isHealthy();
77+
}
78+
7479
@Override
7580
public boolean isRagEnabled(Project project) {
7681
if (!ragApiEnabled) {

0 commit comments

Comments
 (0)