Skip to content

Commit e7b50ed

Browse files
committed
Log Gracefully and ignore entity id null error (#27459)
* Log Gracefully and ignore entity id null error * Ignore in recorder * Review comments addressed (cherry picked from commit 35c3c92)
1 parent 3fc48ac commit e7b50ed

4 files changed

Lines changed: 102 additions & 4 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/IndexingFailureRecorder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public enum FailureStage {
1919
}
2020

2121
private static final int DEFAULT_BATCH_SIZE = 100;
22+
private static final int ENTITY_ID_MAX_LENGTH = 36;
2223

2324
private final CollectionDAO.SearchIndexFailureDAO failureDAO;
2425
private final String jobId;
@@ -115,6 +116,16 @@ private void recordFailure(
115116
return;
116117
}
117118

119+
if (entityId != null && entityId.length() > ENTITY_ID_MAX_LENGTH) {
120+
LOG.warn(
121+
"Skipping failure record for entityType={}: entityId length {} exceeds column limit {} (value starts with '{}')",
122+
entityType,
123+
entityId.length(),
124+
ENTITY_ID_MAX_LENGTH,
125+
entityId.substring(0, Math.min(50, entityId.length())));
126+
return;
127+
}
128+
118129
LOG.info(
119130
"Recording {} failure for entityType={}, entityId={}, error={}",
120131
stage,

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.UUID;
2425
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicLong;
2627
import lombok.extern.slf4j.Slf4j;
@@ -506,8 +507,23 @@ private BatchResult processBatch(
506507

507508
if (failureRecorder != null && readErrorCount > 0) {
508509
for (EntityError entityError : listOrEmpty(resultList.getErrors())) {
509-
String entityId =
510-
entityError.getEntity() != null ? entityError.getEntity().toString() : null;
510+
Object rawEntity = entityError.getEntity();
511+
String entityId = null;
512+
if (rawEntity instanceof EntityInterface) {
513+
UUID id = ((EntityInterface) rawEntity).getId();
514+
if (id != null) {
515+
entityId = id.toString();
516+
}
517+
} else if (rawEntity != null) {
518+
entityId = rawEntity.toString();
519+
}
520+
if (entityId == null) {
521+
LOG.warn(
522+
"Skipping reader failure record for entityType={}: entityId is null, message={}",
523+
entityType,
524+
entityError.getMessage());
525+
continue;
526+
}
511527
failureRecorder.recordReaderEntityFailure(
512528
entityType, entityId, null, entityError.getMessage());
513529
}

openmetadata-service/src/main/java/org/openmetadata/service/socket/Jetty12WebSocketHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.socket.engineio.server.EngineIoServer;
1717
import io.socket.engineio.server.EngineIoWebSocket;
1818
import java.nio.ByteBuffer;
19+
import java.nio.channels.ClosedChannelException;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
@@ -127,7 +128,16 @@ public void onClose(int statusCode, String reason) {
127128

128129
@OnWebSocketError
129130
public void onError(Throwable error) {
130-
LOG.error("WebSocket error: {}", error.getMessage(), error);
131-
emit("error", "websocket error", error.getMessage());
131+
if (error instanceof ClosedChannelException) {
132+
LOG.debug("WebSocket channel closed by peer (likely abnormal disconnect)");
133+
return;
134+
}
135+
try {
136+
LOG.error(
137+
"WebSocket error: {} - {}", error.getClass().getSimpleName(), error.getMessage(), error);
138+
emit("error", "websocket error", error.getMessage());
139+
} catch (Exception e) {
140+
LOG.error("Failed to handle WebSocket error gracefully: {}", e.getMessage(), e);
141+
}
132142
}
133143
}

openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorkerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static org.mockito.Mockito.never;
3535
import static org.mockito.Mockito.times;
3636
import static org.mockito.Mockito.verify;
37+
import static org.mockito.Mockito.verifyNoInteractions;
3738
import static org.mockito.Mockito.when;
3839

3940
import java.lang.reflect.InvocationTargetException;
@@ -385,6 +386,66 @@ void processBatchWritesEntitiesAndRecordsReaderFailures() throws Exception {
385386
assertEquals(statsTracker, contextCaptor.getValue().get(BulkSink.STATS_TRACKER_CONTEXT_KEY));
386387
}
387388

389+
@Test
390+
void processBatchExtractsIdFromEntityInterfaceForReaderFailure() throws Exception {
391+
IndexingFailureRecorder failureRecorder = mock(IndexingFailureRecorder.class);
392+
StageStatsTracker statsTracker = mock(StageStatsTracker.class);
393+
PartitionWorker batchWorker =
394+
new PartitionWorker(coordinator, bulkSink, BATCH_SIZE, null, false, failureRecorder);
395+
396+
UUID errorEntityId = UUID.randomUUID();
397+
EntityInterface failingEntity = mock(EntityInterface.class);
398+
when(failingEntity.getId()).thenReturn(errorEntityId);
399+
EntityInterface successEntity = mock(EntityInterface.class);
400+
401+
ResultList<EntityInterface> resultList = new ResultList<>();
402+
resultList.setData(List.of(successEntity));
403+
resultList.setErrors(
404+
List.of(new EntityError().withEntity(failingEntity).withMessage("reader failure")));
405+
resultList.setWarningsCount(0);
406+
resultList.setPaging(new Paging().withAfter("next-cursor"));
407+
408+
try (MockedConstruction<PaginatedEntitiesSource> ignored =
409+
mockConstruction(
410+
PaginatedEntitiesSource.class,
411+
(mock, context) -> doReturn(resultList).when(mock).readNextKeyset("cursor-1"))) {
412+
413+
invokeProcessBatch(batchWorker, "table", "cursor-1", 2, statsTracker);
414+
}
415+
416+
verify(failureRecorder)
417+
.recordReaderEntityFailure("table", errorEntityId.toString(), null, "reader failure");
418+
}
419+
420+
@Test
421+
void processBatchSkipsReaderFailureWhenEntityInterfaceHasNullId() throws Exception {
422+
IndexingFailureRecorder failureRecorder = mock(IndexingFailureRecorder.class);
423+
StageStatsTracker statsTracker = mock(StageStatsTracker.class);
424+
PartitionWorker batchWorker =
425+
new PartitionWorker(coordinator, bulkSink, BATCH_SIZE, null, false, failureRecorder);
426+
427+
EntityInterface failingEntity = mock(EntityInterface.class);
428+
when(failingEntity.getId()).thenReturn(null);
429+
EntityInterface successEntity = mock(EntityInterface.class);
430+
431+
ResultList<EntityInterface> resultList = new ResultList<>();
432+
resultList.setData(List.of(successEntity));
433+
resultList.setErrors(
434+
List.of(new EntityError().withEntity(failingEntity).withMessage("reader failure")));
435+
resultList.setWarningsCount(0);
436+
resultList.setPaging(new Paging().withAfter("next-cursor"));
437+
438+
try (MockedConstruction<PaginatedEntitiesSource> ignored =
439+
mockConstruction(
440+
PaginatedEntitiesSource.class,
441+
(mock, context) -> doReturn(resultList).when(mock).readNextKeyset("cursor-1"))) {
442+
443+
invokeProcessBatch(batchWorker, "table", "cursor-1", 2, statsTracker);
444+
}
445+
446+
verifyNoInteractions(failureRecorder);
447+
}
448+
388449
@Test
389450
void processBatchWrapsSinkFailuresAsSearchIndexException() throws Exception {
390451
PartitionWorker batchWorker =

0 commit comments

Comments
 (0)