@@ -194,69 +194,88 @@ pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, Parqu
194194}
195195
196196/// Extracts distinct metric names from a RecordBatch.
197- pub ( crate ) fn extract_metric_names ( batch : & RecordBatch ) -> Result < HashSet < String > , ParquetWriteError > {
197+ pub ( crate ) fn extract_metric_names (
198+ batch : & RecordBatch ,
199+ ) -> Result < HashSet < String > , ParquetWriteError > {
198200 let metric_idx = batch
199201 . schema ( )
200202 . index_of ( "metric_name" )
201203 . map_err ( |_| ParquetWriteError :: SchemaValidation ( "missing metric_name column" . into ( ) ) ) ?;
202- let metric_col = batch. column ( metric_idx) ;
203- let mut names = HashSet :: new ( ) ;
204-
205- // The column is Dictionary(Int32, Utf8)
206- if let Some ( dict_array) = metric_col
207- . as_any ( )
208- . downcast_ref :: < arrow:: array:: DictionaryArray < arrow:: datatypes:: Int32Type > > ( )
209- {
210- let values = dict_array. values ( ) ;
211- if let Some ( string_values) = values. as_any ( ) . downcast_ref :: < arrow:: array:: StringArray > ( ) {
212- // Get all dictionary values that are actually used
213- for i in 0 ..dict_array. len ( ) {
214- if !dict_array. is_null ( i)
215- && let Ok ( key) = dict_array. keys ( ) . value ( i) . try_into ( )
216- {
217- let key: usize = key;
218- if key < string_values. len ( ) && !string_values. is_null ( key) {
219- names. insert ( string_values. value ( key) . to_string ( ) ) ;
220- }
221- }
222- }
223- }
224- }
225-
226- Ok ( names)
204+ extract_distinct_strings ( batch. column ( metric_idx) )
227205}
228206
229207/// Extracts distinct service names from a RecordBatch.
230- pub ( crate ) fn extract_service_names ( batch : & RecordBatch ) -> Result < HashSet < String > , ParquetWriteError > {
208+ pub ( crate ) fn extract_service_names (
209+ batch : & RecordBatch ,
210+ ) -> Result < HashSet < String > , ParquetWriteError > {
231211 let service_idx = match batch. schema ( ) . index_of ( "service" ) . ok ( ) {
232212 Some ( idx) => idx,
233213 None => return Ok ( HashSet :: new ( ) ) ,
234214 } ;
235- let service_col = batch. column ( service_idx) ;
236- let mut names = HashSet :: new ( ) ;
215+ extract_distinct_strings ( batch. column ( service_idx) )
216+ }
237217
238- // The column is Dictionary(Int32, Utf8)
239- if let Some ( dict_array) = service_col
218+ /// Extracts distinct non-null string values from a column.
219+ ///
220+ /// Handles both Dictionary(Int32, Utf8) encoding (common at ingest) and
221+ /// plain Utf8/LargeUtf8 (possible after optimize_output_batch in the merge
222+ /// path when cardinality is too high for dictionary encoding).
223+ fn extract_distinct_strings (
224+ col : & dyn arrow:: array:: Array ,
225+ ) -> Result < HashSet < String > , ParquetWriteError > {
226+ let mut values = HashSet :: new ( ) ;
227+
228+ // Try Dictionary(Int32, Utf8) first — the common case at ingest.
229+ if let Some ( dict_array) = col
240230 . as_any ( )
241231 . downcast_ref :: < arrow:: array:: DictionaryArray < arrow:: datatypes:: Int32Type > > ( )
242232 {
243- let values = dict_array. values ( ) ;
244- if let Some ( string_values) = values. as_any ( ) . downcast_ref :: < arrow:: array:: StringArray > ( ) {
245- // Get all dictionary values that are actually used
233+ if let Some ( string_values) = dict_array
234+ . values ( )
235+ . as_any ( )
236+ . downcast_ref :: < arrow:: array:: StringArray > ( )
237+ {
246238 for i in 0 ..dict_array. len ( ) {
247239 if !dict_array. is_null ( i)
248240 && let Ok ( key) = dict_array. keys ( ) . value ( i) . try_into ( )
249241 {
250242 let key: usize = key;
251243 if key < string_values. len ( ) && !string_values. is_null ( key) {
252- names . insert ( string_values. value ( key) . to_string ( ) ) ;
244+ values . insert ( string_values. value ( key) . to_string ( ) ) ;
253245 }
254246 }
255247 }
248+ return Ok ( values) ;
249+ }
250+ }
251+
252+ // Fall back to plain Utf8 (after optimize_output_batch strips dictionary
253+ // encoding for high-cardinality columns).
254+ if let Some ( string_array) = col. as_any ( ) . downcast_ref :: < arrow:: array:: StringArray > ( ) {
255+ for i in 0 ..string_array. len ( ) {
256+ if !string_array. is_null ( i) {
257+ values. insert ( string_array. value ( i) . to_string ( ) ) ;
258+ }
259+ }
260+ return Ok ( values) ;
261+ }
262+
263+ // LargeUtf8 variant.
264+ if let Some ( string_array) = col
265+ . as_any ( )
266+ . downcast_ref :: < arrow:: array:: LargeStringArray > ( )
267+ {
268+ for i in 0 ..string_array. len ( ) {
269+ if !string_array. is_null ( i) {
270+ values. insert ( string_array. value ( i) . to_string ( ) ) ;
271+ }
256272 }
273+ return Ok ( values) ;
257274 }
258275
259- Ok ( names)
276+ // Unrecognized column type — return empty rather than error, since the
277+ // column may legitimately be a type we don't extract strings from.
278+ Ok ( values)
260279}
261280
262281#[ cfg( test) ]
0 commit comments