diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/IndexingFailureRecorder.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/IndexingFailureRecorder.java index a98ba4172dde..e2803cdf7f91 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/IndexingFailureRecorder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/IndexingFailureRecorder.java @@ -19,6 +19,7 @@ public enum FailureStage { } private static final int DEFAULT_BATCH_SIZE = 100; + private static final int ENTITY_ID_MAX_LENGTH = 36; private final CollectionDAO.SearchIndexFailureDAO failureDAO; private final String jobId; @@ -115,6 +116,16 @@ private void recordFailure( return; } + if (entityId != null && entityId.length() > ENTITY_ID_MAX_LENGTH) { + LOG.warn( + "Skipping failure record for entityType={}: entityId length {} exceeds column limit {} (value starts with '{}')", + entityType, + entityId.length(), + ENTITY_ID_MAX_LENGTH, + entityId.substring(0, Math.min(50, entityId.length()))); + return; + } + LOG.info( "Recording {} failure for entityType={}, entityId={}, error={}", stage, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java index f032619dd66b..e209337788b2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; @@ -506,8 +507,23 @@ private BatchResult processBatch( if (failureRecorder != null && readErrorCount > 0) { for (EntityError entityError : listOrEmpty(resultList.getErrors())) { - String entityId = - entityError.getEntity() != null ? entityError.getEntity().toString() : null; + Object rawEntity = entityError.getEntity(); + String entityId = null; + if (rawEntity instanceof EntityInterface) { + UUID id = ((EntityInterface) rawEntity).getId(); + if (id != null) { + entityId = id.toString(); + } + } else if (rawEntity != null) { + entityId = rawEntity.toString(); + } + if (entityId == null) { + LOG.warn( + "Skipping reader failure record for entityType={}: entityId is null, message={}", + entityType, + entityError.getMessage()); + continue; + } failureRecorder.recordReaderEntityFailure( entityType, entityId, null, entityError.getMessage()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/socket/Jetty12WebSocketHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/socket/Jetty12WebSocketHandler.java index cd01961a6836..a0db8a89dcd4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/socket/Jetty12WebSocketHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/socket/Jetty12WebSocketHandler.java @@ -16,6 +16,7 @@ import io.socket.engineio.server.EngineIoServer; import io.socket.engineio.server.EngineIoWebSocket; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -127,7 +128,16 @@ public void onClose(int statusCode, String reason) { @OnWebSocketError public void onError(Throwable error) { - LOG.error("WebSocket error: {}", error.getMessage(), error); - emit("error", "websocket error", error.getMessage()); + if (error instanceof ClosedChannelException) { + LOG.debug("WebSocket channel closed by peer (likely abnormal disconnect)"); + return; + } + try { + LOG.error( + "WebSocket error: {} - {}", error.getClass().getSimpleName(), error.getMessage(), error); + emit("error", "websocket error", error.getMessage()); + } catch (Exception e) { + LOG.error("Failed to handle WebSocket error gracefully: {}", e.getMessage(), e); + } } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorkerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorkerTest.java index 32d5563b3337..77c45d0968bb 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorkerTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorkerTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.lang.reflect.InvocationTargetException; @@ -385,6 +386,66 @@ void processBatchWritesEntitiesAndRecordsReaderFailures() throws Exception { assertEquals(statsTracker, contextCaptor.getValue().get(BulkSink.STATS_TRACKER_CONTEXT_KEY)); } + @Test + void processBatchExtractsIdFromEntityInterfaceForReaderFailure() throws Exception { + IndexingFailureRecorder failureRecorder = mock(IndexingFailureRecorder.class); + StageStatsTracker statsTracker = mock(StageStatsTracker.class); + PartitionWorker batchWorker = + new PartitionWorker(coordinator, bulkSink, BATCH_SIZE, null, false, failureRecorder); + + UUID errorEntityId = UUID.randomUUID(); + EntityInterface failingEntity = mock(EntityInterface.class); + when(failingEntity.getId()).thenReturn(errorEntityId); + EntityInterface successEntity = mock(EntityInterface.class); + + ResultList resultList = new ResultList<>(); + resultList.setData(List.of(successEntity)); + resultList.setErrors( + List.of(new EntityError().withEntity(failingEntity).withMessage("reader failure"))); + resultList.setWarningsCount(0); + resultList.setPaging(new Paging().withAfter("next-cursor")); + + try (MockedConstruction ignored = + mockConstruction( + PaginatedEntitiesSource.class, + (mock, context) -> doReturn(resultList).when(mock).readNextKeyset("cursor-1"))) { + + invokeProcessBatch(batchWorker, "table", "cursor-1", 2, statsTracker); + } + + verify(failureRecorder) + .recordReaderEntityFailure("table", errorEntityId.toString(), null, "reader failure"); + } + + @Test + void processBatchSkipsReaderFailureWhenEntityInterfaceHasNullId() throws Exception { + IndexingFailureRecorder failureRecorder = mock(IndexingFailureRecorder.class); + StageStatsTracker statsTracker = mock(StageStatsTracker.class); + PartitionWorker batchWorker = + new PartitionWorker(coordinator, bulkSink, BATCH_SIZE, null, false, failureRecorder); + + EntityInterface failingEntity = mock(EntityInterface.class); + when(failingEntity.getId()).thenReturn(null); + EntityInterface successEntity = mock(EntityInterface.class); + + ResultList resultList = new ResultList<>(); + resultList.setData(List.of(successEntity)); + resultList.setErrors( + List.of(new EntityError().withEntity(failingEntity).withMessage("reader failure"))); + resultList.setWarningsCount(0); + resultList.setPaging(new Paging().withAfter("next-cursor")); + + try (MockedConstruction ignored = + mockConstruction( + PaginatedEntitiesSource.class, + (mock, context) -> doReturn(resultList).when(mock).readNextKeyset("cursor-1"))) { + + invokeProcessBatch(batchWorker, "table", "cursor-1", 2, statsTracker); + } + + verifyNoInteractions(failureRecorder); + } + @Test void processBatchWrapsSinkFailuresAsSearchIndexException() throws Exception { PartitionWorker batchWorker =