Skip to content

Commit 9f255ab

Browse files
committed
feat: add pagination and filtering for column grid response in ColumnRepository
1 parent d76cb14 commit 9f255ab

2 files changed

Lines changed: 314 additions & 17 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ColumnRepository.java

Lines changed: 134 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import jakarta.ws.rs.core.SecurityContext;
2626
import jakarta.ws.rs.core.UriInfo;
2727
import java.io.IOException;
28+
import java.nio.charset.StandardCharsets;
2829
import java.util.ArrayList;
2930
import java.util.Arrays;
31+
import java.util.Base64;
3032
import java.util.HashMap;
3133
import java.util.List;
3234
import java.util.Map;
@@ -47,6 +49,7 @@
4749
import org.openmetadata.schema.api.data.ColumnUpdate;
4850
import org.openmetadata.schema.api.data.ColumnUpdatePreview;
4951
import org.openmetadata.schema.api.data.GroupedColumnsResponse;
52+
import org.openmetadata.schema.api.data.MetadataStatus;
5053
import org.openmetadata.schema.api.data.UpdateColumn;
5154
import org.openmetadata.schema.entity.data.DashboardDataModel;
5255
import org.openmetadata.schema.entity.data.Table;
@@ -78,6 +81,9 @@
7881

7982
@Slf4j
8083
public class ColumnRepository {
84+
private static final String FILTERED_CURSOR_PREFIX = "filteredOffset:";
85+
private static final int FILTERED_SCAN_BATCH_SIZE = 1000;
86+
8187
private final Authorizer authorizer;
8288
private final ColumnAggregator columnAggregator;
8389

@@ -93,34 +99,145 @@ public ColumnRepository(Authorizer authorizer, SearchClient searchClient) {
9399
}
94100
}
95101

102+
ColumnRepository(Authorizer authorizer, ColumnAggregator columnAggregator) {
103+
this.authorizer = authorizer;
104+
this.columnAggregator = columnAggregator;
105+
}
106+
96107
public ColumnGridResponse getColumnGridPaginated(
97108
SecurityContext securityContext, ColumnAggregator.ColumnAggregationRequest request)
98109
throws IOException {
110+
if (requiresPostAggregationFiltering(request)) {
111+
return getFilteredColumnGridPage(request);
112+
}
113+
99114
ColumnGridResponse response = columnAggregator.aggregateColumns(request);
100115

101-
if (Boolean.TRUE.equals(request.getHasConflicts())) {
102-
response.setColumns(
103-
response.getColumns().stream()
104-
.filter(ColumnGridItem::getHasVariations)
105-
.collect(Collectors.toList()));
116+
return response;
117+
}
118+
119+
private boolean requiresPostAggregationFiltering(
120+
ColumnAggregator.ColumnAggregationRequest request) {
121+
return Boolean.TRUE.equals(request.getHasConflicts())
122+
|| Boolean.TRUE.equals(request.getHasMissingMetadata())
123+
|| !isBlank(request.getMetadataStatus());
124+
}
125+
126+
private ColumnGridResponse getFilteredColumnGridPage(
127+
ColumnAggregator.ColumnAggregationRequest request) throws IOException {
128+
int pageSize = Math.max(request.getSize(), 1);
129+
int offset = decodeFilteredCursorOffset(request.getCursor());
130+
List<ColumnGridItem> filteredItems = new ArrayList<>();
131+
int totalOccurrences = 0;
132+
String scanCursor = null;
133+
ColumnAggregator.ColumnAggregationRequest scanRequest = createScanRequest(request, pageSize);
134+
135+
do {
136+
scanRequest.setCursor(scanCursor);
137+
ColumnGridResponse scanResponse = columnAggregator.aggregateColumns(scanRequest);
138+
List<ColumnGridItem> matchingItems =
139+
applyPostAggregationFilters(scanResponse.getColumns(), request);
140+
filteredItems.addAll(matchingItems);
141+
totalOccurrences +=
142+
matchingItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum();
143+
scanCursor = scanResponse.getCursor();
144+
} while (scanCursor != null);
145+
146+
int totalUniqueColumns = filteredItems.size();
147+
int safeOffset = Math.min(Math.max(offset, 0), totalUniqueColumns);
148+
int end = Math.min(safeOffset + pageSize, totalUniqueColumns);
149+
150+
ColumnGridResponse response = new ColumnGridResponse();
151+
response.setColumns(new ArrayList<>(filteredItems.subList(safeOffset, end)));
152+
response.setTotalUniqueColumns(totalUniqueColumns);
153+
response.setTotalOccurrences(totalOccurrences);
154+
if (end < totalUniqueColumns) {
155+
response.setCursor(encodeFilteredCursorOffset(end));
106156
}
107157

108-
if (Boolean.TRUE.equals(request.getHasMissingMetadata())) {
109-
response.setColumns(
110-
response.getColumns().stream()
111-
.filter(this::hasMissingMetadata)
112-
.collect(Collectors.toList()));
158+
return response;
159+
}
160+
161+
private ColumnAggregator.ColumnAggregationRequest createScanRequest(
162+
ColumnAggregator.ColumnAggregationRequest request, int pageSize) {
163+
ColumnAggregator.ColumnAggregationRequest scanRequest =
164+
new ColumnAggregator.ColumnAggregationRequest();
165+
scanRequest.setSize(Math.min(Math.max(pageSize, FILTERED_SCAN_BATCH_SIZE), 10000));
166+
scanRequest.setCursor(null);
167+
scanRequest.setColumnNamePattern(request.getColumnNamePattern());
168+
scanRequest.setEntityTypes(request.getEntityTypes());
169+
scanRequest.setServiceName(request.getServiceName());
170+
scanRequest.setServiceTypes(request.getServiceTypes());
171+
scanRequest.setDatabaseName(request.getDatabaseName());
172+
scanRequest.setSchemaName(request.getSchemaName());
173+
scanRequest.setDomainId(request.getDomainId());
174+
scanRequest.setHasConflicts(false);
175+
scanRequest.setHasMissingMetadata(false);
176+
scanRequest.setMetadataStatus(null);
177+
scanRequest.setTags(request.getTags());
178+
scanRequest.setGlossaryTerms(request.getGlossaryTerms());
179+
180+
return scanRequest;
181+
}
182+
183+
private List<ColumnGridItem> applyPostAggregationFilters(
184+
List<ColumnGridItem> items, ColumnAggregator.ColumnAggregationRequest request) {
185+
return items.stream()
186+
.filter(item -> matchesAllPostAggregationFilters(item, request))
187+
.collect(Collectors.toList());
188+
}
189+
190+
private boolean matchesAllPostAggregationFilters(
191+
ColumnGridItem item, ColumnAggregator.ColumnAggregationRequest request) {
192+
if (Boolean.TRUE.equals(request.getHasConflicts())
193+
&& !Boolean.TRUE.equals(item.getHasVariations())) {
194+
return false;
113195
}
114196

115-
// Filter by INCONSISTENT status (requires post-aggregation filtering)
116-
if ("INCONSISTENT".equalsIgnoreCase(request.getMetadataStatus())) {
117-
response.setColumns(
118-
response.getColumns().stream()
119-
.filter(ColumnGridItem::getHasVariations)
120-
.collect(Collectors.toList()));
197+
if (Boolean.TRUE.equals(request.getHasMissingMetadata()) && !hasMissingMetadata(item)) {
198+
return false;
121199
}
122200

123-
return response;
201+
return matchesMetadataStatus(item, request.getMetadataStatus());
202+
}
203+
204+
private boolean matchesMetadataStatus(ColumnGridItem item, String requestedStatus) {
205+
if (isBlank(requestedStatus)) {
206+
return true;
207+
}
208+
209+
if (MetadataStatus.INCONSISTENT.value().equalsIgnoreCase(requestedStatus)) {
210+
return Boolean.TRUE.equals(item.getHasVariations());
211+
}
212+
213+
return item.getMetadataStatus() != null
214+
&& item.getMetadataStatus().value().equalsIgnoreCase(requestedStatus);
215+
}
216+
217+
private int decodeFilteredCursorOffset(String cursor) {
218+
if (isBlank(cursor)) {
219+
return 0;
220+
}
221+
222+
try {
223+
String decoded = new String(Base64.getUrlDecoder().decode(cursor), StandardCharsets.UTF_8);
224+
if (!decoded.startsWith(FILTERED_CURSOR_PREFIX)) {
225+
return 0;
226+
}
227+
228+
return Math.max(Integer.parseInt(decoded.substring(FILTERED_CURSOR_PREFIX.length())), 0);
229+
} catch (Exception e) {
230+
return 0;
231+
}
232+
}
233+
234+
private String encodeFilteredCursorOffset(int offset) {
235+
String payload = FILTERED_CURSOR_PREFIX + offset;
236+
return Base64.getUrlEncoder().encodeToString(payload.getBytes(StandardCharsets.UTF_8));
237+
}
238+
239+
private boolean isBlank(String value) {
240+
return value == null || value.isBlank();
124241
}
125242

126243
private boolean hasMissingMetadata(ColumnGridItem item) {
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2026 Collate
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package org.openmetadata.service.jdbi3;
15+
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertFalse;
18+
import static org.junit.jupiter.api.Assertions.assertNotNull;
19+
import static org.junit.jupiter.api.Assertions.assertNull;
20+
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
import static org.junit.jupiter.api.Assertions.fail;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
25+
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import org.junit.jupiter.api.Test;
29+
import org.openmetadata.schema.api.data.ColumnGridItem;
30+
import org.openmetadata.schema.api.data.ColumnGridResponse;
31+
import org.openmetadata.schema.api.data.ColumnMetadataGroup;
32+
import org.openmetadata.schema.api.data.MetadataStatus;
33+
import org.openmetadata.schema.type.TagLabel;
34+
import org.openmetadata.service.search.ColumnAggregator;
35+
import org.openmetadata.service.security.Authorizer;
36+
37+
class ColumnRepositoryTest {
38+
39+
@Test
40+
void testGetColumnGridPaginated_filtersCompleteStatusAndUsesStableFilteredCursor()
41+
throws Exception {
42+
ColumnAggregator aggregator = mock(ColumnAggregator.class);
43+
ColumnRepository repository = new ColumnRepository(mock(Authorizer.class), aggregator);
44+
45+
List<ColumnGridItem> firstScanBatch =
46+
List.of(
47+
createGridItem("customer_id", MetadataStatus.COMPLETE, false, 1, "id", true),
48+
createGridItem("region", MetadataStatus.INCOMPLETE, false, 1, "region", false),
49+
createGridItem("email", MetadataStatus.COMPLETE, false, 2, "email", true));
50+
51+
List<ColumnGridItem> secondScanBatch =
52+
List.of(
53+
createGridItem("status", MetadataStatus.INCONSISTENT, true, 4, "status", true),
54+
createGridItem("order_id", MetadataStatus.COMPLETE, false, 3, "order", true));
55+
56+
when(aggregator.aggregateColumns(any()))
57+
.thenAnswer(
58+
invocation -> {
59+
ColumnAggregator.ColumnAggregationRequest request = invocation.getArgument(0);
60+
assertNull(request.getMetadataStatus());
61+
62+
if (request.getCursor() == null) {
63+
return createResponse(firstScanBatch, "scan-2");
64+
}
65+
if ("scan-2".equals(request.getCursor())) {
66+
return createResponse(secondScanBatch, null);
67+
}
68+
69+
fail("Unexpected scan cursor: " + request.getCursor());
70+
return null;
71+
});
72+
73+
ColumnAggregator.ColumnAggregationRequest firstPageRequest =
74+
new ColumnAggregator.ColumnAggregationRequest();
75+
firstPageRequest.setSize(2);
76+
firstPageRequest.setMetadataStatus("COMPLETE");
77+
78+
ColumnGridResponse firstPage = repository.getColumnGridPaginated(null, firstPageRequest);
79+
80+
assertEquals(2, firstPage.getColumns().size());
81+
assertTrue(
82+
firstPage.getColumns().stream()
83+
.allMatch(item -> MetadataStatus.COMPLETE.equals(item.getMetadataStatus())));
84+
assertEquals(3, firstPage.getTotalUniqueColumns());
85+
assertEquals(6, firstPage.getTotalOccurrences());
86+
assertNotNull(firstPage.getCursor());
87+
88+
ColumnAggregator.ColumnAggregationRequest secondPageRequest =
89+
new ColumnAggregator.ColumnAggregationRequest();
90+
secondPageRequest.setSize(2);
91+
secondPageRequest.setMetadataStatus("COMPLETE");
92+
secondPageRequest.setCursor(firstPage.getCursor());
93+
94+
ColumnGridResponse secondPage = repository.getColumnGridPaginated(null, secondPageRequest);
95+
96+
assertEquals(1, secondPage.getColumns().size());
97+
assertEquals("order_id", secondPage.getColumns().getFirst().getColumnName());
98+
assertEquals(3, secondPage.getTotalUniqueColumns());
99+
assertEquals(6, secondPage.getTotalOccurrences());
100+
assertNull(secondPage.getCursor());
101+
}
102+
103+
@Test
104+
void testGetColumnGridPaginated_filtersMissingMetadataAndRecomputesTotals() throws Exception {
105+
ColumnAggregator aggregator = mock(ColumnAggregator.class);
106+
ColumnRepository repository = new ColumnRepository(mock(Authorizer.class), aggregator);
107+
108+
List<ColumnGridItem> scanBatch =
109+
List.of(
110+
createGridItem("customer_id", MetadataStatus.COMPLETE, false, 1, "id", true),
111+
createGridItem("region", MetadataStatus.INCOMPLETE, false, 2, "", false),
112+
createGridItem("status", MetadataStatus.INCONSISTENT, true, 3, "status", false));
113+
114+
when(aggregator.aggregateColumns(any()))
115+
.thenAnswer(
116+
invocation -> {
117+
ColumnAggregator.ColumnAggregationRequest request = invocation.getArgument(0);
118+
assertNull(request.getMetadataStatus());
119+
assertFalse(Boolean.TRUE.equals(request.getHasMissingMetadata()));
120+
121+
return createResponse(scanBatch, null);
122+
});
123+
124+
ColumnAggregator.ColumnAggregationRequest request =
125+
new ColumnAggregator.ColumnAggregationRequest();
126+
request.setSize(10);
127+
request.setHasMissingMetadata(true);
128+
129+
ColumnGridResponse response = repository.getColumnGridPaginated(null, request);
130+
131+
assertEquals(2, response.getColumns().size());
132+
assertEquals(2, response.getTotalUniqueColumns());
133+
assertEquals(5, response.getTotalOccurrences());
134+
assertNull(response.getCursor());
135+
assertTrue(
136+
response.getColumns().stream()
137+
.allMatch(item -> !"customer_id".equals(item.getColumnName())));
138+
}
139+
140+
private ColumnGridResponse createResponse(List<ColumnGridItem> columns, String cursor) {
141+
ColumnGridResponse response = new ColumnGridResponse();
142+
response.setColumns(columns);
143+
response.setCursor(cursor);
144+
response.setTotalUniqueColumns(columns.size());
145+
response.setTotalOccurrences(
146+
columns.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum());
147+
148+
return response;
149+
}
150+
151+
private ColumnGridItem createGridItem(
152+
String columnName,
153+
MetadataStatus status,
154+
boolean hasVariations,
155+
int totalOccurrences,
156+
String description,
157+
boolean hasTags) {
158+
ColumnMetadataGroup group = new ColumnMetadataGroup();
159+
group.setGroupId(columnName + "-group");
160+
group.setDescription(description);
161+
group.setOccurrenceCount(1);
162+
group.setOccurrences(new ArrayList<>());
163+
if (hasTags) {
164+
TagLabel tagLabel = new TagLabel();
165+
tagLabel.setTagFQN("Tier.Tier1");
166+
group.setTags(List.of(tagLabel));
167+
} else {
168+
group.setTags(new ArrayList<>());
169+
}
170+
171+
ColumnGridItem item = new ColumnGridItem();
172+
item.setColumnName(columnName);
173+
item.setMetadataStatus(status);
174+
item.setHasVariations(hasVariations);
175+
item.setTotalOccurrences(totalOccurrences);
176+
item.setGroups(List.of(group));
177+
178+
return item;
179+
}
180+
}

0 commit comments

Comments
 (0)