4747import java .util .Map ;
4848import java .util .UUID ;
4949import java .util .concurrent .ExecutorService ;
50+ import java .util .concurrent .TimeUnit ;
5051import lombok .extern .slf4j .Slf4j ;
5152import org .openmetadata .schema .ServiceEntityInterface ;
5253import org .openmetadata .schema .api .data .RestoreEntity ;
5960import org .openmetadata .schema .entity .services .ingestionPipelines .IngestionPipeline ;
6061import org .openmetadata .schema .entity .services .ingestionPipelines .PipelineServiceClientResponse ;
6162import org .openmetadata .schema .entity .services .ingestionPipelines .PipelineStatus ;
63+ import org .openmetadata .schema .entity .services .ingestionPipelines .PipelineStatusType ;
6264import org .openmetadata .schema .services .connections .metadata .OpenMetadataConnection ;
6365import org .openmetadata .schema .type .EntityHistory ;
6466import org .openmetadata .schema .type .EntityReference ;
9496import org .openmetadata .service .util .DeleteEntityResponse ;
9597import org .openmetadata .service .util .EntityUtil ;
9698import org .openmetadata .service .util .OpenMetadataConnectionBuilder ;
99+ import org .openmetadata .service .util .PipelineStatusUtils ;
97100import org .openmetadata .service .util .RestUtil ;
98101import org .openmetadata .service .util .WebsocketNotificationHandler ;
99102import org .quartz .SchedulerException ;
@@ -374,8 +377,13 @@ protected static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipe
374377 case SUCCESS -> AppRunRecord .Status .SUCCESS ;
375378 case FAILED , PARTIAL_SUCCESS -> AppRunRecord .Status .FAILED ;
376379 case RUNNING -> AppRunRecord .Status .RUNNING ;
380+ case STOPPED -> AppRunRecord .Status .STOPPED ;
377381 })
378- .withConfig (pipelineStatus .getConfig ());
382+ .withConfig (pipelineStatus .getConfig ())
383+ .withProperties (
384+ pipelineStatus .getRunId () != null
385+ ? Map .of ("pipelineRunId" , pipelineStatus .getRunId ())
386+ : null );
379387 }
380388
381389 private ResultList <AppRunRecord > sortRunsByStartTime (ResultList <AppRunRecord > runs ) {
@@ -527,7 +535,14 @@ public Response getLastLogs(
527535 schema = @ Schema (type = "string" ))
528536 @ QueryParam ("after" )
529537 @ DefaultValue ("" )
530- String after ) {
538+ String after ,
539+ @ Parameter (
540+ description =
541+ "Pipeline run ID to fetch logs for a specific run. "
542+ + "If not provided, returns logs for the latest run." ,
543+ schema = @ Schema (type = "string" ))
544+ @ QueryParam ("runId" )
545+ String runId ) {
531546 App installation = repository .getByName (uriInfo , name , repository .getFields ("id,pipelines" ));
532547 if (installation .getAppType ().equals (AppType .Internal )) {
533548 AppRunRecord latestRun = repository .getLatestAppRunsOptional (installation ).orElse (null );
@@ -544,7 +559,7 @@ public Response getLastLogs(
544559 ingestionPipelineRepository .get (
545560 uriInfo , pipelineRef .getId (), ingestionPipelineRepository .getFields (FIELD_OWNERS ));
546561 return Response .ok (
547- pipelineServiceClient .getLastIngestionLogs (ingestionPipeline , after ),
562+ pipelineServiceClient .getIngestionLogs (ingestionPipeline , after , runId ),
548563 MediaType .APPLICATION_JSON_TYPE )
549564 .build ();
550565 }
@@ -1227,7 +1242,12 @@ public Response stopApplicationRun(
12271242 @ Context SecurityContext securityContext ,
12281243 @ Parameter (description = "Name of the App" , schema = @ Schema (type = "string" ))
12291244 @ PathParam ("name" )
1230- String name ) {
1245+ String name ,
1246+ @ Parameter (
1247+ description = "Pipeline run ID to stop a specific run" ,
1248+ schema = @ Schema (type = "string" ))
1249+ @ QueryParam ("runId" )
1250+ String runId ) {
12311251 EntityUtil .Fields fields = getFields (String .format ("%s,bot,pipelines" , FIELD_OWNERS ));
12321252 App app = repository .getByName (uriInfo , name , fields );
12331253 OperationContext operationContext = new OperationContext (entityType , MetadataOperation .TRIGGER );
@@ -1241,17 +1261,148 @@ public Response stopApplicationRun(
12411261 .entity ("Application stop in progress. Please check status via." )
12421262 .build ();
12431263 } else {
1244- if (!app .getPipelines ().isEmpty ()) {
1245- IngestionPipeline ingestionPipeline = getIngestionPipeline (uriInfo , securityContext , app );
1246- PipelineServiceClientResponse response =
1247- pipelineServiceClient .killIngestion (ingestionPipeline );
1248- return Response .status (response .getCode ()).entity (response ).build ();
1264+ if (nullOrEmpty (app .getPipelines ())) {
1265+ throw new BadRequestException (
1266+ String .format (
1267+ "Application [%s] supports interrupts but has no associated pipeline configured." ,
1268+ name ));
1269+ }
1270+ IngestionPipeline ingestionPipeline = getIngestionPipeline (uriInfo , securityContext , app );
1271+ if (runId != null && !runId .isBlank ()) {
1272+ return stopSpecificRun (uriInfo , ingestionPipeline , runId );
1273+ } else {
1274+ return stopAllRuns (app , ingestionPipeline );
12491275 }
12501276 }
12511277 }
12521278 throw new BadRequestException ("Application does not support Interrupts." );
12531279 }
12541280
1281+ private Response stopSpecificRun (
1282+ UriInfo uriInfo , IngestionPipeline ingestionPipeline , String runId ) {
1283+ markPipelineStatusAsStopped (ingestionPipeline , runId );
1284+ PipelineServiceClientResponse killResponse ;
1285+ try {
1286+ killResponse = pipelineServiceClient .killIngestionRun (ingestionPipeline , runId );
1287+ } catch (Exception e ) {
1288+ LOG .error (
1289+ "Kill request for run [{}] on pipeline [{}] failed after DB update. Workflow may still be running." ,
1290+ runId ,
1291+ ingestionPipeline .getFullyQualifiedName (),
1292+ e );
1293+ return Response .status (Response .Status .BAD_GATEWAY )
1294+ .entity (
1295+ new PipelineServiceClientResponse ()
1296+ .withCode (Response .Status .BAD_GATEWAY .getStatusCode ())
1297+ .withReason (e .getMessage ())
1298+ .withPlatform (pipelineServiceClient .getPlatform ()))
1299+ .build ();
1300+ }
1301+ return toStopResponse (killResponse );
1302+ }
1303+
1304+ private Response stopAllRuns (App app , IngestionPipeline ingestionPipeline ) {
1305+ Long runStartTime =
1306+ repository
1307+ .getLatestAppRunsOptional (app , ingestionPipeline .getService ().getId ())
1308+ .map (AppRunRecord ::getStartTime )
1309+ .orElse (null );
1310+ markLatestPipelineStatusAsStopped (ingestionPipeline , runStartTime );
1311+ PipelineServiceClientResponse killResponse ;
1312+ try {
1313+ killResponse = pipelineServiceClient .killIngestion (ingestionPipeline );
1314+ } catch (Exception e ) {
1315+ LOG .error (
1316+ "Kill request for pipeline [{}] failed after DB update. Workflows may still be running." ,
1317+ ingestionPipeline .getFullyQualifiedName (),
1318+ e );
1319+ return Response .status (Response .Status .BAD_GATEWAY )
1320+ .entity (
1321+ new PipelineServiceClientResponse ()
1322+ .withCode (Response .Status .BAD_GATEWAY .getStatusCode ())
1323+ .withReason (e .getMessage ())
1324+ .withPlatform (pipelineServiceClient .getPlatform ()))
1325+ .build ();
1326+ }
1327+ return toStopResponse (killResponse );
1328+ }
1329+
1330+ private void markPipelineStatusAsStopped (IngestionPipeline ingestionPipeline , String runId ) {
1331+ IngestionPipelineRepository ingestionPipelineRepository =
1332+ (IngestionPipelineRepository ) Entity .getEntityRepository (Entity .INGESTION_PIPELINE );
1333+ try {
1334+ PipelineStatus status =
1335+ ingestionPipelineRepository .getPipelineStatus (
1336+ ingestionPipeline .getFullyQualifiedName (), runId );
1337+ if (status == null ) {
1338+ LOG .warn (
1339+ "Pipeline status not found in DB for run [{}] on pipeline [{}]. Proceeding with kill but DB state will remain inconsistent." ,
1340+ runId ,
1341+ ingestionPipeline .getFullyQualifiedName ());
1342+ return ;
1343+ }
1344+ if (!PipelineStatusUtils .isTerminalState (status .getPipelineState ())) {
1345+ status .setPipelineState (PipelineStatusType .STOPPED );
1346+ status .setEndDate (System .currentTimeMillis ());
1347+ // Use updatePipelineStatusByRunId instead of addPipelineStatus to avoid overwriting
1348+ // the pipeline-level current status. When stopping a specific run, other runs may still
1349+ // be active and their status should not be affected.
1350+ ingestionPipelineRepository .updatePipelineStatusByRunId (
1351+ ingestionPipeline .getFullyQualifiedName (), status );
1352+ }
1353+ } catch (Exception e ) {
1354+ LOG .error (
1355+ "Failed to mark run [{}] as STOPPED in DB for pipeline [{}]. Kill will proceed but DB status may remain inconsistent." ,
1356+ runId ,
1357+ ingestionPipeline .getFullyQualifiedName (),
1358+ e );
1359+ }
1360+ }
1361+
1362+ private void markLatestPipelineStatusAsStopped (
1363+ IngestionPipeline ingestionPipeline , Long runStartTime ) {
1364+ IngestionPipelineRepository ingestionPipelineRepository =
1365+ (IngestionPipelineRepository ) Entity .getEntityRepository (Entity .INGESTION_PIPELINE );
1366+ long now = System .currentTimeMillis ();
1367+ long startTs = runStartTime != null ? runStartTime : now - TimeUnit .HOURS .toMillis (1 );
1368+ ResultList <PipelineStatus > statuses ;
1369+ try {
1370+ statuses =
1371+ ingestionPipelineRepository .listPipelineStatus (
1372+ ingestionPipeline .getFullyQualifiedName (), startTs , now );
1373+ } catch (Exception e ) {
1374+ LOG .error (
1375+ "Failed to list pipeline statuses for [{}]. Kill will proceed but DB statuses may remain inconsistent." ,
1376+ ingestionPipeline .getFullyQualifiedName (),
1377+ e );
1378+ return ;
1379+ }
1380+ for (PipelineStatus status : statuses .getData ()) {
1381+ if (status .getRunId () == null || status .getRunId ().isBlank ()) {
1382+ continue ;
1383+ }
1384+ if (!PipelineStatusUtils .isTerminalState (status .getPipelineState ())) {
1385+ markPipelineStatusAsStopped (ingestionPipeline , status .getRunId ());
1386+ }
1387+ }
1388+ }
1389+
1390+ private Response toStopResponse (PipelineServiceClientResponse killResponse ) {
1391+ int code = killResponse .getCode ();
1392+ if (code >= 200 && code < 300 ) {
1393+ return Response .status (code ).entity (killResponse ).build ();
1394+ }
1395+ if (code == 404 ) {
1396+ LOG .warn (
1397+ "Kill request returned 404 — workflow already completed. DB status already marked STOPPED." );
1398+ return Response .ok (killResponse ).build ();
1399+ }
1400+ LOG .error (
1401+ "Kill request returned unexpected code [{}]. DB status already marked STOPPED but workflow may still be running." ,
1402+ code );
1403+ return Response .status (Response .Status .BAD_GATEWAY ).entity (killResponse ).build ();
1404+ }
1405+
12551406 @ POST
12561407 @ Path ("/deploy/{name}" )
12571408 @ Operation (
0 commit comments