|
29 | 29 | .join() |
30 | 30 | .unwrap_or_else(|_| error::ThreadPanickedWhileExecutingFutureSnafu.fail()?) |
31 | 31 | } |
32 | | - |
33 | | -pub mod test_utils { |
34 | | - use datafusion::arrow::array::{ArrayRef, RecordBatch}; |
35 | | - use datafusion::arrow::compute::{ |
36 | | - SortColumn, SortOptions, lexsort_to_indices, take_record_batch, |
37 | | - }; |
38 | | - use datafusion::arrow::datatypes::{DataType, Field, Schema}; |
39 | | - use std::collections::HashSet; |
40 | | - use std::sync::Arc; |
41 | | - |
42 | | - #[allow(clippy::unwrap_used, clippy::must_use_candidate)] |
43 | | - pub fn sort_record_batch_by_sortable_columns(batch: &RecordBatch) -> RecordBatch { |
44 | | - let sort_columns: Vec<SortColumn> = (0..batch.num_columns()) |
45 | | - .filter_map(|i| { |
46 | | - let col = batch.column(i).clone(); |
47 | | - let field = batch.schema().field(i).clone(); |
48 | | - if matches!(field.data_type(), DataType::Null) { |
49 | | - None |
50 | | - } else { |
51 | | - Some(SortColumn { |
52 | | - values: col, |
53 | | - options: Some(SortOptions::default()), |
54 | | - }) |
55 | | - } |
56 | | - }) |
57 | | - .collect(); |
58 | | - |
59 | | - if sort_columns.is_empty() { |
60 | | - return batch.clone(); |
61 | | - } |
62 | | - |
63 | | - let indices = lexsort_to_indices(&sort_columns, Some(batch.num_rows())).unwrap(); |
64 | | - take_record_batch(batch, &indices).unwrap() |
65 | | - } |
66 | | - |
67 | | - #[allow(clippy::unwrap_used, clippy::must_use_candidate)] |
68 | | - pub fn remove_columns_from_batches<S: ::std::hash::BuildHasher>( |
69 | | - batches: Vec<RecordBatch>, |
70 | | - excluded_columns: &HashSet<&str, S>, |
71 | | - ) -> Vec<RecordBatch> { |
72 | | - batches |
73 | | - .into_iter() |
74 | | - .map(|batch| { |
75 | | - let schema = batch.schema(); |
76 | | - let indices: Vec<usize> = schema |
77 | | - .fields() |
78 | | - .iter() |
79 | | - .enumerate() |
80 | | - .filter_map(|(i, f)| { |
81 | | - if excluded_columns.contains(f.name().as_str()) { |
82 | | - None |
83 | | - } else { |
84 | | - Some(i) |
85 | | - } |
86 | | - }) |
87 | | - .collect(); |
88 | | - |
89 | | - let columns: Vec<ArrayRef> = |
90 | | - indices.iter().map(|&i| batch.column(i).clone()).collect(); |
91 | | - let fields: Vec<Field> = indices.iter().map(|&i| schema.field(i).clone()).collect(); |
92 | | - let new_schema = Arc::new(Schema::new(fields)); |
93 | | - |
94 | | - RecordBatch::try_new(new_schema, columns).unwrap() |
95 | | - }) |
96 | | - .collect() |
97 | | - } |
98 | | -} |
0 commit comments