2525import jakarta .ws .rs .core .SecurityContext ;
2626import jakarta .ws .rs .core .UriInfo ;
2727import java .io .IOException ;
28+ import java .nio .charset .StandardCharsets ;
2829import java .util .ArrayList ;
2930import java .util .Arrays ;
31+ import java .util .Base64 ;
3032import java .util .HashMap ;
3133import java .util .List ;
3234import java .util .Map ;
4749import org .openmetadata .schema .api .data .ColumnUpdate ;
4850import org .openmetadata .schema .api .data .ColumnUpdatePreview ;
4951import org .openmetadata .schema .api .data .GroupedColumnsResponse ;
52+ import org .openmetadata .schema .api .data .MetadataStatus ;
5053import org .openmetadata .schema .api .data .UpdateColumn ;
5154import org .openmetadata .schema .entity .data .DashboardDataModel ;
5255import org .openmetadata .schema .entity .data .Table ;
7881
7982@ Slf4j
8083public class ColumnRepository {
84+ private static final String FILTERED_CURSOR_PREFIX = "filteredOffset:" ;
85+ private static final int FILTERED_SCAN_BATCH_SIZE = 1000 ;
86+
8187 private final Authorizer authorizer ;
8288 private final ColumnAggregator columnAggregator ;
8389
@@ -93,36 +99,197 @@ public ColumnRepository(Authorizer authorizer, SearchClient searchClient) {
9399 }
94100 }
95101
102+ ColumnRepository (Authorizer authorizer , ColumnAggregator columnAggregator ) {
103+ this .authorizer = authorizer ;
104+ this .columnAggregator = columnAggregator ;
105+ }
106+
107+ /**
108+ * Returns paginated column grid with optional post-aggregation filtering.
109+ *
110+ * <p><b>IMPORTANT: Cursor Handling</b>
111+ * - When filters are applied (metadataStatus, hasConflicts, hasMissingMetadata), a custom
112+ * cursor format is used: `filteredOffset:<offset>`
113+ * - When no filters are applied, the aggregator's native cursor format is used
114+ * - <b>CURSOR MISMATCH WARNING</b>: If the client switches between filtered and non-filtered
115+ * states while holding a cursor from the other state, the cursor will be invalid and
116+ * pagination will reset to page 1. Clients should discard cursors when filter parameters
117+ * change.
118+ *
119+ * <p><b>Totals Consistency</b>
120+ * - `totalUniqueColumns` and `totalOccurrences` are computed from the complete filtered
121+ * result set and remain stable across pages within a single filtered request.
122+ * - These values may differ if filters are applied between requests.
123+ *
124+ * @param securityContext Security context of the request
125+ * @param request Column aggregation request with optional filter parameters
126+ * @return Paginated response with filtered columns and cursor for next page
127+ */
96128 public ColumnGridResponse getColumnGridPaginated (
97129 SecurityContext securityContext , ColumnAggregator .ColumnAggregationRequest request )
98130 throws IOException {
131+ if (requiresPostAggregationFiltering (request )) {
132+ return getFilteredColumnGridPage (request );
133+ }
134+
99135 ColumnGridResponse response = columnAggregator .aggregateColumns (request );
100136
101- if (Boolean .TRUE .equals (request .getHasConflicts ())) {
102- response .setColumns (
103- response .getColumns ().stream ()
104- .filter (ColumnGridItem ::getHasVariations )
105- .collect (Collectors .toList ()));
106- }
137+ return response ;
138+ }
139+
140+ private boolean requiresPostAggregationFiltering (
141+ ColumnAggregator .ColumnAggregationRequest request ) {
142+ return Boolean .TRUE .equals (request .getHasConflicts ())
143+ || Boolean .TRUE .equals (request .getHasMissingMetadata ())
144+ || !isBlank (request .getMetadataStatus ());
145+ }
107146
108- if (Boolean .TRUE .equals (request .getHasMissingMetadata ())) {
109- response .setColumns (
110- response .getColumns ().stream ()
111- .filter (this ::hasMissingMetadata )
112- .collect (Collectors .toList ()));
147+ private ColumnGridResponse getFilteredColumnGridPage (
148+ ColumnAggregator .ColumnAggregationRequest request ) throws IOException {
149+ int pageSize = Math .max (request .getSize (), 1 );
150+ String cursor = request .getCursor ();
151+ int offset = decodeFilteredCursorOffset (cursor );
152+
153+ // For first page (no cursor), we need to scan and build the complete filtered list
154+ // For subsequent pages, we rebuild to ensure accuracy of totals
155+ // (totals must be stable across pages)
156+ List <ColumnGridItem > allFilteredItems = new ArrayList <>();
157+ int totalOccurrences = 0 ;
158+ String scanCursor = null ;
159+ ColumnAggregator .ColumnAggregationRequest scanRequest = createScanRequest (request , pageSize );
160+
161+ do {
162+ scanRequest .setCursor (scanCursor );
163+ ColumnGridResponse scanResponse = columnAggregator .aggregateColumns (scanRequest );
164+ List <ColumnGridItem > matchingItems =
165+ applyPostAggregationFilters (scanResponse .getColumns (), request );
166+ allFilteredItems .addAll (matchingItems );
167+ totalOccurrences +=
168+ matchingItems .stream ().mapToInt (ColumnGridItem ::getTotalOccurrences ).sum ();
169+ scanCursor = scanResponse .getCursor ();
170+ } while (scanCursor != null );
171+
172+ // Calculate pagination
173+ int totalUniqueColumns = allFilteredItems .size ();
174+ int safeOffset = Math .min (Math .max (offset , 0 ), totalUniqueColumns );
175+ int end = Math .min (safeOffset + pageSize , totalUniqueColumns );
176+
177+ if (safeOffset >= totalUniqueColumns && safeOffset > 0 ) {
178+ LOG .warn (
179+ "Page offset {} exceeds total filtered items {}. Returning empty page." ,
180+ safeOffset ,
181+ totalUniqueColumns );
113182 }
114183
115- // Filter by INCONSISTENT status (requires post-aggregation filtering)
116- if ("INCONSISTENT" .equalsIgnoreCase (request .getMetadataStatus ())) {
117- response .setColumns (
118- response .getColumns ().stream ()
119- .filter (ColumnGridItem ::getHasVariations )
120- .collect (Collectors .toList ()));
184+ // Build response
185+ ColumnGridResponse response = new ColumnGridResponse ();
186+ response .setColumns (new ArrayList <>(allFilteredItems .subList (safeOffset , end )));
187+ response .setTotalUniqueColumns (totalUniqueColumns );
188+ response .setTotalOccurrences (totalOccurrences );
189+
190+ // Set cursor for next page if more data exists
191+ if (end < totalUniqueColumns ) {
192+ response .setCursor (encodeFilteredCursorOffset (end ));
121193 }
122194
123195 return response ;
124196 }
125197
198+ private ColumnAggregator .ColumnAggregationRequest createScanRequest (
199+ ColumnAggregator .ColumnAggregationRequest request , int pageSize ) {
200+ ColumnAggregator .ColumnAggregationRequest scanRequest =
201+ new ColumnAggregator .ColumnAggregationRequest ();
202+ scanRequest .setSize (Math .min (Math .max (pageSize , FILTERED_SCAN_BATCH_SIZE ), 10000 ));
203+ scanRequest .setCursor (null );
204+ scanRequest .setColumnNamePattern (request .getColumnNamePattern ());
205+ scanRequest .setEntityTypes (request .getEntityTypes ());
206+ scanRequest .setServiceName (request .getServiceName ());
207+ scanRequest .setServiceTypes (request .getServiceTypes ());
208+ scanRequest .setDatabaseName (request .getDatabaseName ());
209+ scanRequest .setSchemaName (request .getSchemaName ());
210+ scanRequest .setDomainId (request .getDomainId ());
211+ scanRequest .setHasConflicts (false );
212+ scanRequest .setHasMissingMetadata (false );
213+ scanRequest .setMetadataStatus (null );
214+ scanRequest .setTags (request .getTags ());
215+ scanRequest .setGlossaryTerms (request .getGlossaryTerms ());
216+
217+ return scanRequest ;
218+ }
219+
220+ private List <ColumnGridItem > applyPostAggregationFilters (
221+ List <ColumnGridItem > items , ColumnAggregator .ColumnAggregationRequest request ) {
222+ return items .stream ()
223+ .filter (item -> matchesAllPostAggregationFilters (item , request ))
224+ .collect (Collectors .toList ());
225+ }
226+
227+ private boolean matchesAllPostAggregationFilters (
228+ ColumnGridItem item , ColumnAggregator .ColumnAggregationRequest request ) {
229+ if (Boolean .TRUE .equals (request .getHasConflicts ())
230+ && !Boolean .TRUE .equals (item .getHasVariations ())) {
231+ return false ;
232+ }
233+
234+ if (Boolean .TRUE .equals (request .getHasMissingMetadata ()) && !hasMissingMetadata (item )) {
235+ return false ;
236+ }
237+
238+ return matchesMetadataStatus (item , request .getMetadataStatus ());
239+ }
240+
241+ private boolean matchesMetadataStatus (ColumnGridItem item , String requestedStatus ) {
242+ if (isBlank (requestedStatus )) {
243+ return true ;
244+ }
245+
246+ if (MetadataStatus .INCONSISTENT .value ().equalsIgnoreCase (requestedStatus )) {
247+ return Boolean .TRUE .equals (item .getHasVariations ());
248+ }
249+
250+ return item .getMetadataStatus () != null
251+ && item .getMetadataStatus ().value ().equalsIgnoreCase (requestedStatus );
252+ }
253+
254+ private int decodeFilteredCursorOffset (String cursor ) {
255+ if (isBlank (cursor )) {
256+ return 0 ;
257+ }
258+
259+ try {
260+ String decoded = new String (Base64 .getUrlDecoder ().decode (cursor ), StandardCharsets .UTF_8 );
261+ if (!decoded .startsWith (FILTERED_CURSOR_PREFIX )) {
262+ LOG .warn (
263+ "Invalid cursor format for filtered query. Expected format '{}...', got '{}'. "
264+ + "Cursor mismatch may occur when switching between filtered/non-filtered requests." ,
265+ FILTERED_CURSOR_PREFIX ,
266+ decoded );
267+ return 0 ;
268+ }
269+
270+ return Math .max (Integer .parseInt (decoded .substring (FILTERED_CURSOR_PREFIX .length ())), 0 );
271+ } catch (NumberFormatException e ) {
272+ LOG .error (
273+ "Failed to parse filtered cursor offset. Cursor: {}. Resetting to page 1. "
274+ + "This indicates a cursor format mismatch." ,
275+ cursor ,
276+ e );
277+ return 0 ;
278+ } catch (IllegalArgumentException e ) {
279+ LOG .error ("Failed to decode cursor from Base64. Cursor: {}. Resetting to page 1." , cursor , e );
280+ return 0 ;
281+ }
282+ }
283+
284+ private String encodeFilteredCursorOffset (int offset ) {
285+ String payload = FILTERED_CURSOR_PREFIX + offset ;
286+ return Base64 .getUrlEncoder ().encodeToString (payload .getBytes (StandardCharsets .UTF_8 ));
287+ }
288+
289+ private boolean isBlank (String value ) {
290+ return value == null || value .isBlank ();
291+ }
292+
126293 private boolean hasMissingMetadata (ColumnGridItem item ) {
127294 return item .getGroups ().stream ()
128295 .anyMatch (
0 commit comments