Skip to content

Commit 75676b4

Browse files
remove unused public methods on dynamic filter expr
1 parent 952d16d commit 75676b4

4 files changed

Lines changed: 87 additions & 204 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 80 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,18 @@ pub struct DynamicFilterPhysicalExpr {
8282
/// `expression_id` lives here because it identifies the actual filter expression `expr`.
8383
/// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are
8484
/// the same logical filter and must report the same `expression_id`.
85-
///
86-
/// **Warning:** exposed publicly solely so that proto (de)serialization in
87-
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
88-
/// or its layout as a stable API.
8985
#[derive(Clone, Debug)]
90-
pub struct Inner {
86+
struct Inner {
9187
/// A unique identifier for the expression.
92-
pub expression_id: u64,
88+
expression_id: u64,
9389
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
9490
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
95-
pub generation: u64,
96-
pub expr: Arc<dyn PhysicalExpr>,
91+
generation: u64,
92+
expr: Arc<dyn PhysicalExpr>,
9793
/// Flag for quick synchronous check if filter is complete.
9894
/// This is redundant with the watch channel state, but allows us to return immediately
9995
/// from `wait_complete()` without subscribing if already complete.
100-
pub is_complete: bool,
96+
is_complete: bool,
10197
}
10298

10399
impl Inner {
@@ -193,6 +189,33 @@ impl DynamicFilterPhysicalExpr {
193189
}
194190
}
195191

192+
#[cfg(any(feature = "proto", test))]
193+
fn from_parts(
194+
children: Vec<Arc<dyn PhysicalExpr>>,
195+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
196+
inner: Inner,
197+
) -> Self {
198+
let state = if inner.is_complete {
199+
FilterState::Complete {
200+
generation: inner.generation,
201+
}
202+
} else {
203+
FilterState::InProgress {
204+
generation: inner.generation,
205+
}
206+
};
207+
let (state_watch, _) = watch::channel(state);
208+
209+
Self {
210+
children,
211+
remapped_children,
212+
inner: Arc::new(RwLock::new(inner)),
213+
state_watch,
214+
data_type: Arc::new(RwLock::new(None)),
215+
nullable: Arc::new(RwLock::new(None)),
216+
}
217+
}
218+
196219
fn remap_children(
197220
children: &[Arc<dyn PhysicalExpr>],
198221
remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
@@ -361,108 +384,10 @@ impl DynamicFilterPhysicalExpr {
361384

362385
write!(f, " ]")
363386
}
364-
365-
/// Return the filter's original children (before any remapping).
366-
///
367-
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
368-
/// Not a stable API.
369-
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
370-
&self.children
371-
}
372-
373-
/// Return the filter's remapped children, if any have been set via
374-
/// [`PhysicalExpr::with_new_children`].
375-
///
376-
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
377-
/// Not a stable API.
378-
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
379-
self.remapped_children.as_deref()
380-
}
381-
382-
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
383-
/// proto deserialization.
384-
///
385-
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
386-
/// Not a stable API.
387-
pub fn from_parts(
388-
children: Vec<Arc<dyn PhysicalExpr>>,
389-
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
390-
inner: Inner,
391-
) -> Self {
392-
let state = if inner.is_complete {
393-
FilterState::Complete {
394-
generation: inner.generation,
395-
}
396-
} else {
397-
FilterState::InProgress {
398-
generation: inner.generation,
399-
}
400-
};
401-
let (state_watch, _) = watch::channel(state);
402-
403-
Self {
404-
children,
405-
remapped_children,
406-
inner: Arc::new(RwLock::new(inner)),
407-
state_watch,
408-
data_type: Arc::new(RwLock::new(None)),
409-
nullable: Arc::new(RwLock::new(None)),
410-
}
411-
}
412-
413-
/// Return a clone of the atomically-captured `Inner` state.
414-
///
415-
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
416-
/// Not a stable API.
417-
pub fn inner(&self) -> Inner {
418-
self.inner.read().clone()
419-
}
420387
}
421388

422389
#[cfg(feature = "proto")]
423390
impl DynamicFilterPhysicalExpr {
424-
/// Serialize this expression to protobuf.
425-
///
426-
/// Encodes `children`, `remapped_children`, and the atomically-captured
427-
/// `Inner` state (expression id, generation, current expr, is_complete).
428-
pub fn try_to_proto(
429-
&self,
430-
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
431-
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
432-
use datafusion_proto_models::protobuf;
433-
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
434-
435-
let children = self
436-
.children
437-
.iter()
438-
.map(|c| ctx.encode_child(c))
439-
.collect::<Result<Vec<_>>>()?;
440-
441-
let remapped_children = match &self.remapped_children {
442-
Some(remapped) => remapped
443-
.iter()
444-
.map(|c| ctx.encode_child(c))
445-
.collect::<Result<Vec<_>>>()?,
446-
None => vec![],
447-
};
448-
449-
let inner = self.inner.read().clone();
450-
let inner_expr = Box::new(ctx.encode_child(&inner.expr)?);
451-
452-
Ok(Some(protobuf::PhysicalExprNode {
453-
expr_id: Some(inner.expression_id),
454-
expr_type: Some(ExprType::DynamicFilter(Box::new(
455-
protobuf::PhysicalDynamicFilterNode {
456-
children,
457-
remapped_children,
458-
generation: inner.generation,
459-
inner_expr: Some(inner_expr),
460-
is_complete: inner.is_complete,
461-
},
462-
))),
463-
}))
464-
}
465-
466391
/// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation.
467392
pub fn try_from_proto(
468393
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
@@ -609,6 +534,45 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
609534
current.evaluate(batch)
610535
}
611536

537+
#[cfg(feature = "proto")]
538+
fn try_to_proto(
539+
&self,
540+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
541+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
542+
use datafusion_proto_models::protobuf;
543+
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
544+
545+
let children = self
546+
.children
547+
.iter()
548+
.map(|c| ctx.encode_child(c))
549+
.collect::<Result<Vec<_>>>()?;
550+
551+
let remapped_children = match &self.remapped_children {
552+
Some(remapped) => remapped
553+
.iter()
554+
.map(|c| ctx.encode_child(c))
555+
.collect::<Result<Vec<_>>>()?,
556+
None => vec![],
557+
};
558+
559+
let inner = self.inner.read().clone();
560+
let inner_expr = Box::new(ctx.encode_child(&inner.expr)?);
561+
562+
Ok(Some(protobuf::PhysicalExprNode {
563+
expr_id: Some(inner.expression_id),
564+
expr_type: Some(ExprType::DynamicFilter(Box::new(
565+
protobuf::PhysicalDynamicFilterNode {
566+
children,
567+
remapped_children,
568+
generation: inner.generation,
569+
inner_expr: Some(inner_expr),
570+
is_complete: inner.is_complete,
571+
},
572+
))),
573+
}))
574+
}
575+
612576
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613577
self.render(f, |expr, f| expr.fmt_sql(f))
614578
}
@@ -1108,23 +1072,21 @@ mod test {
11081072
reassigned.mark_complete();
11091073

11101074
// Capture the parts and reconstruct. `expression_id` rides in `inner`.
1075+
let inner = reassigned.inner.read().clone();
11111076
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
1112-
reassigned.original_children().to_vec(),
1113-
reassigned.remapped_children().map(|r| r.to_vec()),
1114-
reassigned.inner(),
1077+
reassigned.children.clone(),
1078+
reassigned.remapped_children.clone(),
1079+
inner,
11151080
);
11161081

1082+
assert_eq!(reassigned.children, reconstructed.children);
11171083
assert_eq!(
1118-
reassigned.original_children(),
1119-
reconstructed.original_children(),
1120-
);
1121-
assert_eq!(
1122-
reassigned.remapped_children(),
1123-
reconstructed.remapped_children(),
1084+
reassigned.remapped_children,
1085+
reconstructed.remapped_children
11241086
);
11251087
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
1126-
let r = reassigned.inner();
1127-
let c = reconstructed.inner();
1088+
let r = reassigned.inner.read();
1089+
let c = reconstructed.inner.read();
11281090
assert_eq!(r.generation, c.generation);
11291091
assert_eq!(r.is_complete, c.is_complete);
11301092
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use case::{CaseExpr, case};
4545
pub use cast::{CastExpr, cast};
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
48+
pub use dynamic_filters::DynamicFilterPhysicalExpr;
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use datafusion_execution::{FunctionRegistry, TaskContext};
3939
use datafusion_expr::WindowFunctionDefinition;
4040
use datafusion_expr::dml::InsertOp;
4141
use datafusion_expr::execution_props::SubqueryIndex;
42+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
4243
use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
4344
use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
4445
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
@@ -60,9 +61,6 @@ use super::{
6061
use crate::convert::TryFromProto;
6162
use crate::protobuf::physical_expr_node::ExprType;
6263
use crate::{convert_required, convert_required_proto, protobuf};
63-
use datafusion_physical_expr::expressions::{
64-
DynamicFilterInner, DynamicFilterPhysicalExpr,
65-
};
6664

6765
/// Parses a physical sort expression from a protobuf.
6866
///
@@ -281,6 +279,9 @@ pub fn parse_physical_expr_with_converter(
281279
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
282280
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
283281
ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?,
282+
ExprType::DynamicFilter(_) => {
283+
DynamicFilterPhysicalExpr::try_from_proto(proto, &decode_ctx)?
284+
}
284285
ExprType::AggregateExpr(_) => {
285286
return not_impl_err!(
286287
"Cannot convert aggregate expr node to physical expression"
@@ -478,53 +479,6 @@ pub fn parse_physical_expr_with_converter(
478479
results.clone(),
479480
))
480481
}
481-
ExprType::DynamicFilter(dynamic_filter) => {
482-
let children = parse_physical_exprs(
483-
&dynamic_filter.children,
484-
ctx,
485-
input_schema,
486-
proto_converter,
487-
)?;
488-
489-
let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
490-
Some(parse_physical_exprs(
491-
&dynamic_filter.remapped_children,
492-
ctx,
493-
input_schema,
494-
proto_converter,
495-
)?)
496-
} else {
497-
None
498-
};
499-
500-
let inner_expr = parse_required_physical_expr(
501-
dynamic_filter.inner_expr.as_deref(),
502-
ctx,
503-
"inner_expr",
504-
input_schema,
505-
proto_converter,
506-
)?;
507-
508-
let expression_id = proto.expr_id.ok_or_else(|| {
509-
proto_error(
510-
"DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
511-
to be set by the serializer",
512-
)
513-
})?;
514-
515-
let base_filter: Arc<dyn PhysicalExpr> =
516-
Arc::new(DynamicFilterPhysicalExpr::from_parts(
517-
children,
518-
remapped_children,
519-
DynamicFilterInner {
520-
expression_id,
521-
generation: dynamic_filter.generation,
522-
expr: inner_expr,
523-
is_complete: dynamic_filter.is_complete,
524-
},
525-
));
526-
base_filter
527-
}
528482
ExprType::Extension(extension) => {
529483
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
530484
.inputs

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr,
40-
LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
39+
CaseExpr, CastExpr, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal,
40+
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4141
};
4242
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -528,39 +528,6 @@ pub fn serialize_physical_expr_with_converter(
528528
},
529529
)),
530530
})
531-
} else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
532-
let children = df
533-
.original_children()
534-
.iter()
535-
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
536-
.collect::<Result<Vec<_>>>()?;
537-
538-
let remapped_children = if let Some(remapped) = df.remapped_children() {
539-
remapped
540-
.iter()
541-
.map(|child| proto_converter.physical_expr_to_proto(child, codec))
542-
.collect::<Result<Vec<_>>>()?
543-
} else {
544-
vec![]
545-
};
546-
547-
// Atomic snapshot of inner state.
548-
let inner = df.inner();
549-
let inner_expr =
550-
Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?);
551-
552-
Ok(protobuf::PhysicalExprNode {
553-
expr_id,
554-
expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter(
555-
Box::new(protobuf::PhysicalDynamicFilterNode {
556-
children,
557-
remapped_children,
558-
generation: inner.generation,
559-
inner_expr: Some(inner_expr),
560-
is_complete: inner.is_complete,
561-
}),
562-
)),
563-
})
564531
} else {
565532
let mut buf: Vec<u8> = vec![];
566533
match codec.try_encode_expr(value, &mut buf) {

0 commit comments

Comments
 (0)