Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 102 additions & 106 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,18 @@ pub struct DynamicFilterPhysicalExpr {
/// `expression_id` lives here because it identifies the actual filter expression `expr`.
/// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are
/// the same logical filter and must report the same `expression_id`.
///
/// **Warning:** exposed publicly solely so that proto (de)serialization in
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
/// or its layout as a stable API.
#[derive(Clone, Debug)]
pub struct Inner {
struct Inner {
/// A unique identifier for the expression.
pub expression_id: u64,
expression_id: u64,
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
pub generation: u64,
pub expr: Arc<dyn PhysicalExpr>,
generation: u64,
expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
pub is_complete: bool,
is_complete: bool,
}

impl Inner {
Expand Down Expand Up @@ -361,34 +357,65 @@ impl DynamicFilterPhysicalExpr {

write!(f, " ]")
}
}

/// Return the filter's original children (before any remapping).
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.children
}
#[cfg(feature = "proto")]
impl DynamicFilterPhysicalExpr {
/// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation.
pub fn try_from_proto(
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
) -> Result<Arc<dyn PhysicalExpr>> {
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;

let dynamic_filter = match &node.expr_type {
Some(ExprType::DynamicFilter(df)) => df.as_ref(),
_ => {
return datafusion_common::internal_err!(
"PhysicalExprNode is not a DynamicFilter"
);
}
};

/// Return the filter's remapped children, if any have been set via
/// [`PhysicalExpr::with_new_children`].
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
self.remapped_children.as_deref()
}
let expression_id = node.expr_id.ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
to be set by the serializer"
.to_string(),
)
})?;

/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
/// proto deserialization.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn from_parts(
children: Vec<Arc<dyn PhysicalExpr>>,
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
inner: Inner,
) -> Self {
let children = dynamic_filter
.children
.iter()
.map(|c| ctx.decode(c))
.collect::<Result<Vec<_>>>()?;

let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
Some(
dynamic_filter
.remapped_children
.iter()
.map(|c| ctx.decode(c))
.collect::<Result<Vec<_>>>()?,
)
} else {
None
};

let inner_expr =
ctx.decode(dynamic_filter.inner_expr.as_deref().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"DynamicFilterPhysicalExpr missing inner_expr".to_string(),
)
})?)?;

let inner = Inner {
expression_id,
generation: dynamic_filter.generation,
expr: inner_expr,
is_complete: dynamic_filter.is_complete,
};
let state = if inner.is_complete {
FilterState::Complete {
generation: inner.generation,
Expand All @@ -400,22 +427,14 @@ impl DynamicFilterPhysicalExpr {
};
let (state_watch, _) = watch::channel(state);

Self {
Ok(Arc::new(DynamicFilterPhysicalExpr {
children,
remapped_children,
inner: Arc::new(RwLock::new(inner)),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}

/// Return a clone of the atomically-captured `Inner` state.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn inner(&self) -> Inner {
self.inner.read().clone()
}))
}
}

Expand Down Expand Up @@ -503,6 +522,45 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
current.evaluate(batch)
}

#[cfg(feature = "proto")]
fn try_to_proto(
&self,
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
use datafusion_proto_models::protobuf;
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;

let children = self
.children
.iter()
.map(|c| ctx.encode_child(c))
.collect::<Result<Vec<_>>>()?;

let remapped_children = match &self.remapped_children {
Some(remapped) => remapped
.iter()
.map(|c| ctx.encode_child(c))
.collect::<Result<Vec<_>>>()?,
None => vec![],
};

let inner = self.inner.read().clone();
let inner_expr = Box::new(ctx.encode_child(&inner.expr)?);

Ok(Some(protobuf::PhysicalExprNode {
expr_id: Some(inner.expression_id),
expr_type: Some(ExprType::DynamicFilter(Box::new(
protobuf::PhysicalDynamicFilterNode {
children,
remapped_children,
generation: inner.generation,
inner_expr: Some(inner_expr),
is_complete: inner.is_complete,
},
))),
}))
}

fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.render(f, |expr, f| expr.fmt_sql(f))
}
Expand Down Expand Up @@ -962,68 +1020,6 @@ mod test {
);
}

/// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr`
/// whose observable state (original children, remapped children,
/// expression id, inner generation/expr/is_complete) matches the source
/// filter.
#[test]
fn test_from_parts_preserves_state() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let col_a = col("a", &schema).unwrap();

// Create a dynamic filter with children
let expr = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
datafusion_expr::Operator::Gt,
lit(10) as Arc<dyn PhysicalExpr>,
));
let filter = DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
expr as Arc<dyn PhysicalExpr>,
);

// Add remapped children.
let reassigned_schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let reassigned = reassign_expr_columns(
Arc::new(filter) as Arc<dyn PhysicalExpr>,
&reassigned_schema,
)
.expect("reassign_expr_columns should succeed");
let reassigned = reassigned
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Expected dynamic filter after reassignment");

reassigned
.update(lit(42) as Arc<dyn PhysicalExpr>)
.expect("Update should succeed");
reassigned.mark_complete();

// Capture the parts and reconstruct. `expression_id` rides in `inner`.
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
reassigned.original_children().to_vec(),
reassigned.remapped_children().map(|r| r.to_vec()),
reassigned.inner(),
);

assert_eq!(
reassigned.original_children(),
reconstructed.original_children(),
);
assert_eq!(
reassigned.remapped_children(),
reconstructed.remapped_children(),
);
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
let r = reassigned.inner();
let c = reconstructed.inner();
assert_eq!(r.generation, c.generation);
assert_eq!(r.is_complete, c.is_complete);
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));
}

#[tokio::test]
async fn test_expression_id() {
let source_schema =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use case::{CaseExpr, case};
pub use cast::{CastExpr, cast};
pub use column::{Column, col, with_new_schema};
pub use datafusion_expr::utils::format_state_name;
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
pub use dynamic_filters::DynamicFilterPhysicalExpr;
pub use in_list::{InListExpr, in_list};
pub use is_not_null::{IsNotNullExpr, is_not_null};
pub use is_null::{IsNullExpr, is_null};
Expand Down
54 changes: 4 additions & 50 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_execution::{FunctionRegistry, TaskContext};
use datafusion_expr::WindowFunctionDefinition;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::SubqueryIndex;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
Expand All @@ -60,9 +61,6 @@ use super::{
use crate::convert::TryFromProto;
use crate::protobuf::physical_expr_node::ExprType;
use crate::{convert_required, convert_required_proto, protobuf};
use datafusion_physical_expr::expressions::{
DynamicFilterInner, DynamicFilterPhysicalExpr,
};

/// Parses a physical sort expression from a protobuf.
///
Expand Down Expand Up @@ -281,6 +279,9 @@ pub fn parse_physical_expr_with_converter(
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?,
ExprType::DynamicFilter(_) => {
DynamicFilterPhysicalExpr::try_from_proto(proto, &decode_ctx)?
}
ExprType::AggregateExpr(_) => {
return not_impl_err!(
"Cannot convert aggregate expr node to physical expression"
Expand Down Expand Up @@ -478,53 +479,6 @@ pub fn parse_physical_expr_with_converter(
results.clone(),
))
}
ExprType::DynamicFilter(dynamic_filter) => {
let children = parse_physical_exprs(
&dynamic_filter.children,
ctx,
input_schema,
proto_converter,
)?;

let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
Some(parse_physical_exprs(
&dynamic_filter.remapped_children,
ctx,
input_schema,
proto_converter,
)?)
} else {
None
};

let inner_expr = parse_required_physical_expr(
dynamic_filter.inner_expr.as_deref(),
ctx,
"inner_expr",
input_schema,
proto_converter,
)?;

let expression_id = proto.expr_id.ok_or_else(|| {
proto_error(
"DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
to be set by the serializer",
)
})?;

let base_filter: Arc<dyn PhysicalExpr> =
Arc::new(DynamicFilterPhysicalExpr::from_parts(
children,
remapped_children,
DynamicFilterInner {
expression_id,
generation: dynamic_filter.generation,
expr: inner_expr,
is_complete: dynamic_filter.is_complete,
},
));
base_filter
}
ExprType::Extension(extension) => {
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
.inputs
Expand Down
Loading
Loading