Skip to content

Add stopped state support for pipeline status and stop/cancel AI agents#27098

Open
Vishnuujain wants to merge 14 commits intomainfrom
feature/support-stop-cancel-ai-agents
Open

Add stopped state support for pipeline status and stop/cancel AI agents#27098
Vishnuujain wants to merge 14 commits intomainfrom
feature/support-stop-cancel-ai-agents

Conversation

@Vishnuujain
Copy link
Copy Markdown
Contributor

@Vishnuujain Vishnuujain commented Apr 6, 2026

Describe your changes:

Fixes: https://github.com/open-metadata/openmetadata-collate/issues/3456

image

Type of change:

  • Bug fix
  • Improvement
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Summary by Gitar

  • TypeScript types updated:
    • Added stopped state support to pipeline status enum in ingestionPipeline.ts
  • AI agent support:
    • Enabled stop/cancel functionality for AI agents managing ingestion pipelines

This will update automatically on new commits.

@Vishnuujain Vishnuujain requested a review from a team as a code owner April 6, 2026 14:12
@Vishnuujain Vishnuujain added safe to test Add this label to run secure Github workflows on PRs To release Will cherry-pick this PR into the release branch labels Apr 6, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

The Java checkstyle failed.

Please run mvn spotless:apply in the root of your repository and commit the changes to this PR.
You can also use pre-commit to automate the Java code formatting.

You can install the pre-commit hooks with make install_test precommit_install.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

✅ TypeScript Types Auto-Updated

The generated TypeScript types have been automatically updated based on JSON schema changes in this PR.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

Jest test Coverage

UI tests summary

Lines Statements Branches Functions
Coverage: 64%
64.27% (59561/92667) 43.76% (31044/70935) 46.94% (9362/19943)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 6, 2026

🔴 Playwright Results — 2 failure(s), 29 flaky

✅ 3590 passed · ❌ 2 failed · 🟡 29 flaky · ⏭️ 207 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 454 0 3 2
🟡 Shard 2 639 0 3 32
🔴 Shard 3 645 2 7 26
🟡 Shard 4 617 0 5 47
🟡 Shard 5 605 0 2 67
🟡 Shard 6 630 0 9 33

Genuine Failures (failed on all attempts)

Pages/AppStopRunModal.spec.ts › should open stop modal when stop button is clicked and call stop API with runId (shard 3)
Error: �[2mexpect(�[22m�[31mlocator�[39m�[2m).�[22mtoBeVisible�[2m(�[22m�[2m)�[22m failed

Locator:  getByTestId('stop-modal')
Expected: visible
Received: hidden
Timeout:  15000ms

Call log:
�[2m  - Expect "toBeVisible" with timeout 15000ms�[22m
�[2m  - waiting for getByTestId('stop-modal')�[22m
�[2m    19 × locator resolved to <div class="ant-modal-root" data-testid="stop-modal">…</div>�[22m
�[2m       - unexpected value "hidden"�[22m

Pages/AppStopRunModal.spec.ts › should close stop modal when cancel is clicked (shard 3)
Error: �[2mexpect(�[22m�[31mlocator�[39m�[2m).�[22mtoBeVisible�[2m(�[22m�[2m)�[22m failed

Locator:  getByTestId('stop-modal')
Expected: visible
Received: hidden
Timeout:  15000ms

Call log:
�[2m  - Expect "toBeVisible" with timeout 15000ms�[22m
�[2m  - waiting for getByTestId('stop-modal')�[22m
�[2m    19 × locator resolved to <div class="ant-modal-root" data-testid="stop-modal">…</div>�[22m
�[2m       - unexpected value "hidden"�[22m

🟡 29 flaky test(s) (passed on retry)
  • Features/CustomizeDetailPage.spec.ts › Database Schema - customization should work (shard 1, 1 retry)
  • Flow/Tour.spec.ts › Tour should work from URL directly (shard 1, 1 retry)
  • Pages/UserCreationWithPersona.spec.ts › Create user with persona and verify on profile (shard 1, 1 retry)
  • Features/BulkEditEntity.spec.ts › Glossary (shard 2, 1 retry)
  • Features/EntitySummaryPanel.spec.ts › should display summary panel for tableColumn (shard 2, 1 retry)
  • Features/EntitySummaryPanel.spec.ts › should display summary panel for database (shard 2, 1 retry)
  • Features/Permissions/GlossaryPermissions.spec.ts › Team-based permissions work correctly (shard 3, 1 retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 1 retry)
  • Features/TestSuiteMultiPipeline.spec.ts › TestSuite multi pipeline support (shard 3, 1 retry)
  • Features/UserProfileOnlineStatus.spec.ts › Should show "Active recently" for users active within last hour (shard 3, 1 retry)
  • Features/UserProfileOnlineStatus.spec.ts › Should not show online status for inactive users (shard 3, 1 retry)
  • Flow/ExploreDiscovery.spec.ts › Should not display soft deleted assets in search suggestions (shard 3, 1 retry)
  • Flow/ServiceForm.spec.ts › Verify form selects are working properly (shard 3, 1 retry)
  • Pages/Customproperties-part2.spec.ts › entityReferenceList shows item count, scrollable list, no expand toggle (shard 4, 1 retry)
  • Pages/DataContractsSemanticRules.spec.ts › Validate Owner Rule Is_Set (shard 4, 1 retry)
  • Pages/Domains.spec.ts › Multiple consecutive domain renames preserve all associations (shard 4, 1 retry)
  • Pages/Entity.spec.ts › User as Owner Add, Update and Remove (shard 4, 1 retry)
  • Pages/Entity.spec.ts › Tag Add, Update and Remove (shard 4, 1 retry)
  • Pages/EntityDataConsumer.spec.ts › Glossary Term Add, Update and Remove (shard 5, 1 retry)
  • Pages/ExploreTree.spec.ts › Verify Database and Database Schema available in explore tree (shard 5, 1 retry)
  • Pages/GlossaryImportExport.spec.ts › Export large glossary with many terms (shard 6, 1 retry)
  • Pages/HyperlinkCustomProperty.spec.ts › should display URL when no display text is provided (shard 6, 1 retry)
  • Pages/InputOutputPorts.spec.ts › Lineage section collapse/expand (shard 6, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › verify create lineage for entity - Table (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/ProfilerConfigurationPage.spec.ts › Non admin user (shard 6, 1 retry)
  • Pages/Users.spec.ts › Permissions for table details page for Data Consumer (shard 6, 1 retry)
  • Pages/Users.spec.ts › Check permissions for Data Steward (shard 6, 1 retry)
  • VersionPages/EntityVersionPages.spec.ts › Topic (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

mohityadav766
mohityadav766 previously approved these changes Apr 6, 2026
@Vishnuujain Vishnuujain force-pushed the feature/support-stop-cancel-ai-agents branch from 3bda980 to d562cc4 Compare April 8, 2026 20:32
Comment on lines +209 to +213
{record.status !== Status.Success &&
record.status !== Status.Failed &&
record.status !== Status.Stopped &&
record.status !== Status.Completed &&
record.status !== Status.StopInProgress &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for changing the assertions to negative conditions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually initially I had positive one only but I missed some of them to include like when in queue so stop button was not appearing, therefore I used negative assertion to ensure the button appears in all cases except these.

Comment on lines +143 to +146
default PipelineServiceClientResponse killIngestionRun(
IngestionPipeline ingestionPipeline, String runId) {
return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: killIngestionRun default no-op marks DB STOPPED but never kills

The killIngestionRun default method in PipelineServiceClientInterface returns a 200 success response without actually stopping anything. None of the concrete pipeline service clients (AirflowRESTClient, K8sPipelineClient, etc.) override this method. Combined with the fact that stopSpecificRun marks the DB status as STOPPED before calling killIngestionRun, this means:

  1. The pipeline status will show STOPPED in the database
  2. The actual workflow continues running in the pipeline service
  3. The API returns 200 success to the user

This creates a silent inconsistency where the UI shows the run as stopped but it's still executing. The same issue applies to getIngestionLogs which silently ignores the runId parameter and returns latest logs instead of per-run logs.

At minimum, killIngestionRun should throw UnsupportedOperationException or return a non-200 response so callers know the feature isn't supported, rather than silently pretending it succeeded. Alternatively, the concrete clients should implement these methods.

Suggested fix:

// Option A: Make it clear the operation is unsupported
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  throw new UnsupportedOperationException(
      "Per-run stop is not supported by " + getPlatform());
}

// Option B: Return a distinguishable response
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  return new PipelineServiceClientResponse()
      .withCode(501).withPlatform(getPlatform())
      .withReason("Per-run stop not supported");
}

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
return Response.ok(
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after),
pipelineServiceClient.getIngestionLogs(ingestionPipeline, after, runId),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this backward compatible? would it pick the last logs if runId is null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's backward comatible

}
}

private void markLatestPipelineStatusAsStopped(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this implementation be based on markPipelineStatusAsStopped but passing the last runId? Otherwise seems we're duplicating logic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored it, thanks

@gitar-bot
Copy link
Copy Markdown

gitar-bot bot commented Apr 10, 2026

Code Review ⚠️ Changes requested 11 resolved / 12 findings

Adds stopped state support for pipeline status and stop/cancel AI agents, but the killIngestionRun default no-op marks DB STOPPED without actually stopping anything, and stopAllRuns causes redundant side effects by calling addPipelineStatus in a loop for each running pipeline status. Despite resolving 10 prior issues including race conditions and error handling gaps, these two critical blockers must be addressed before merge.

⚠️ Bug: killIngestionRun default no-op marks DB STOPPED but never kills

📄 openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java:143-146 📄 openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java:132-135 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1282-1296 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1380-1394 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1347-1351

The killIngestionRun default method in PipelineServiceClientInterface returns a 200 success response without actually stopping anything. None of the concrete pipeline service clients (AirflowRESTClient, K8sPipelineClient, etc.) override this method. Combined with the fact that stopSpecificRun marks the DB status as STOPPED before calling killIngestionRun, this means:

  1. The pipeline status will show STOPPED in the database
  2. The actual workflow continues running in the pipeline service
  3. The API returns 200 success to the user

This creates a silent inconsistency where the UI shows the run as stopped but it's still executing. The same issue applies to getIngestionLogs which silently ignores the runId parameter and returns latest logs instead of per-run logs.

At minimum, killIngestionRun should throw UnsupportedOperationException or return a non-200 response so callers know the feature isn't supported, rather than silently pretending it succeeded. Alternatively, the concrete clients should implement these methods.

Suggested fix
// Option A: Make it clear the operation is unsupported
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  throw new UnsupportedOperationException(
      "Per-run stop is not supported by " + getPlatform());
}

// Option B: Return a distinguishable response
default PipelineServiceClientResponse killIngestionRun(
    IngestionPipeline ingestionPipeline, String runId) {
  return new PipelineServiceClientResponse()
      .withCode(501).withPlatform(getPlatform())
      .withReason("Per-run stop not supported");
}
✅ 11 resolved
Quality: endDate set in millis—verify consistency with startDate units

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1271
latestStatus.setEndDate(System.currentTimeMillis()) uses milliseconds. The PipelineStatus schema defines startDate and endDate as "format": "utc-millisec" which maps to epoch millis, so this is likely correct. However, if other parts of the codebase use epoch seconds for these fields, this would produce dates far in the future. Worth a quick sanity check that the convention is consistent.

Bug: UUID.fromString(runId) will throw on Argo workflow UIDs

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1238-1241 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1281
The runId query parameter is documented as "Pipeline run ID (Argo workflow UID)" but the code unconditionally calls UUID.fromString(runId) at line 1281. Argo workflow UIDs are not necessarily valid Java UUIDs, so this will throw an IllegalArgumentException for non-UUID format strings. While there is a catch-all Exception handler around it (line 1292), that silently swallows the error and proceeds to call killIngestionRun without having marked the status as stopped — which is the whole purpose of the method. This means the DB state won't reflect the stop for non-UUID runIds.

Bug: DataInsightSystemChartRepository missing properties with runId

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightSystemChartRepository.java:549-554 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:383-387
In AppResource.java, the PipelineStatusAppRunRecord mapping now includes .withProperties(Map.of("pipelineRunId", ...)) so the UI can pass the runId when stopping a specific run. The same mapping in DataInsightSystemChartRepository.java (line 552) adds the STOPPED case but does not include the withProperties mapping. This means data insight app runs won't carry the pipelineRunId, so the stop button will always fall back to stopping all runs instead of a specific one.

Bug: stopAllRuns lacks exception handling unlike stopSpecificRun

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1298-1307 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1298-1308
stopSpecificRun catches PipelineServiceClientException from killIngestionRun and returns a graceful BAD_GATEWAY response. However, stopAllRuns calls killIngestion without any try-catch. If the kill request fails, the exception propagates as a 500 Internal Server Error to the client, even though the DB statuses have already been marked as STOPPED. This creates the same DB-vs-reality inconsistency that stopSpecificRun handles gracefully, but without informing the user properly.

Bug: DB marked STOPPED before kill; catch misses non-client exceptions

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1279-1293 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1306-1320
In both stopSpecificRun and stopAllRuns, the pipeline status is marked as STOPPED in the database before the kill request is sent to the pipeline service. The catch block only handles PipelineServiceClientException. If the kill call throws an unexpected runtime exception (e.g., NullPointerException, IllegalStateException, network timeout wrapping as a different exception type), the exception propagates up to the JAX-RS layer and returns a 500, but the DB already says STOPPED while the workflow is actually still running.

This is a deliberate best-effort design (as documented in the logs), but the narrow catch type weakens the guarantee. Consider catching Exception (or at least RuntimeException) around the kill call to ensure the BAD_GATEWAY response is always returned with a clear message, rather than leaking a 500 with a stack trace.

...and 6 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Adds stopped state support for pipeline status and stop/cancel AI agents, but the `killIngestionRun` default no-op marks DB STOPPED without actually stopping anything, and `stopAllRuns` causes redundant side effects by calling `addPipelineStatus` in a loop for each running pipeline status. Despite resolving 10 prior issues including race conditions and error handling gaps, these two critical blockers must be addressed before merge.

1. ⚠️ Bug: killIngestionRun default no-op marks DB STOPPED but never kills
   Files: openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java:143-146, openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClientInterface.java:132-135, openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1282-1296, openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1380-1394, openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java:1347-1351

   The `killIngestionRun` default method in `PipelineServiceClientInterface` returns a 200 success response without actually stopping anything. None of the concrete pipeline service clients (`AirflowRESTClient`, `K8sPipelineClient`, etc.) override this method. Combined with the fact that `stopSpecificRun` marks the DB status as STOPPED *before* calling `killIngestionRun`, this means:
   
   1. The pipeline status will show STOPPED in the database
   2. The actual workflow continues running in the pipeline service
   3. The API returns 200 success to the user
   
   This creates a silent inconsistency where the UI shows the run as stopped but it's still executing. The same issue applies to `getIngestionLogs` which silently ignores the `runId` parameter and returns latest logs instead of per-run logs.
   
   At minimum, `killIngestionRun` should throw `UnsupportedOperationException` or return a non-200 response so callers know the feature isn't supported, rather than silently pretending it succeeded. Alternatively, the concrete clients should implement these methods.

   Suggested fix:
   // Option A: Make it clear the operation is unsupported
   default PipelineServiceClientResponse killIngestionRun(
       IngestionPipeline ingestionPipeline, String runId) {
     throw new UnsupportedOperationException(
         "Per-run stop is not supported by " + getPlatform());
   }
   
   // Option B: Return a distinguishable response
   default PipelineServiceClientResponse killIngestionRun(
       IngestionPipeline ingestionPipeline, String runId) {
     return new PipelineServiceClientResponse()
         .withCode(501).withPlatform(getPlatform())
         .withReason("Per-run stop not supported");
   }

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

safe to test Add this label to run secure Github workflows on PRs To release Will cherry-pick this PR into the release branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants