diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 77bf4bb71d38..64f1318098be 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -65,6 +65,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, ScalarValue, @@ -82,6 +83,7 @@ use datafusion_expr::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_plan::execution_plan::InvariantLevel; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; @@ -1874,7 +1876,11 @@ impl DefaultPhysicalPlanner { displayable(plan.as_ref()).indent(true) ); - let mut new_plan = plan; + // This runs once before any optimization, + // to verify that the plan fulfills the base requirements. + InvariantChecker(InvariantLevel::Always).check(&plan)?; + + let mut new_plan = Arc::clone(&plan); for optimizer in optimizers { let before_schema = new_plan.schema(); new_plan = optimizer @@ -1882,18 +1888,11 @@ impl DefaultPhysicalPlanner { .map_err(|e| { DataFusionError::Context(optimizer.name().to_string(), Box::new(e)) })?; - if optimizer.schema_check() && new_plan.schema() != before_schema { - let e = DataFusionError::Internal(format!( - "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", - optimizer.name(), - before_schema, - new_plan.schema() - )); - return Err(DataFusionError::Context( - optimizer.name().to_string(), - Box::new(e), - )); - } + + // This only checks the schema in release build, and performs additional checks in debug mode. + OptimizationInvariantChecker::new(optimizer) + .check(&new_plan, before_schema)?; + trace!( "Optimized physical plan by {}:\n{}\n", optimizer.name(), @@ -1901,6 +1900,11 @@ impl DefaultPhysicalPlanner { ); observer(new_plan.as_ref(), optimizer.as_ref()) } + + // This runs once after all optimizer runs are complete, + // to verify that the plan is executable. + InvariantChecker(InvariantLevel::Executable).check(&new_plan)?; + debug!( "Optimized physical plan:\n{}\n", displayable(new_plan.as_ref()).indent(false) @@ -2008,6 +2012,83 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } +struct OptimizationInvariantChecker<'a> { + rule: &'a Arc, +} + +impl<'a> OptimizationInvariantChecker<'a> { + /// Create an [`OptimizationInvariantChecker`] that performs checking per tule. + pub fn new(rule: &'a Arc) -> Self { + Self { rule } + } + + /// Checks that the plan change is permitted, returning an Error if not. + /// + /// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check]. + /// In debug mode, this recursively walks the entire physical plan + /// and performs [`ExecutionPlan::check_invariants`]. + pub fn check( + &mut self, + plan: &Arc, + previous_schema: Arc, + ) -> Result<()> { + // if the rule is not permitted to change the schema, confirm that it did not change. + if self.rule.schema_check() && plan.schema() != previous_schema { + internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}", + self.rule.name(), + previous_schema, + plan.schema() + )? + } + + // check invariants per each ExecutionPlan node + #[cfg(debug_assertions)] + plan.visit(self)?; + + Ok(()) + } +} + +impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> { + type Node = Arc; + + fn f_down(&mut self, node: &'n Self::Node) -> Result { + // Checks for the more permissive `InvariantLevel::Always`. + // Plans are not guarenteed to be executable after each physical optimizer run. + node.check_invariants(InvariantLevel::Always).map_err(|e| + e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())) + )?; + Ok(TreeNodeRecursion::Continue) + } +} + +/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`]. +struct InvariantChecker(InvariantLevel); + +impl InvariantChecker { + /// Checks that the plan is executable, returning an Error if not. + pub fn check(&mut self, plan: &Arc) -> Result<()> { + // check invariants per each ExecutionPlan node + plan.visit(self)?; + + Ok(()) + } +} + +impl<'n> TreeNodeVisitor<'n> for InvariantChecker { + type Node = Arc; + + fn f_down(&mut self, node: &'n Self::Node) -> Result { + node.check_invariants(self.0).map_err(|e| { + e.context(format!( + "Invariant for ExecutionPlan node '{}' failed", + node.name() + )) + })?; + Ok(TreeNodeRecursion::Continue) + } +} + #[cfg(test)] mod tests { use std::any::Any; @@ -2028,6 +2109,7 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, DFSchemaRef, TableReference}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; @@ -2782,4 +2864,239 @@ digraph { assert_contains!(generated_graph, expected_tooltip); } + + /// Extension Node which passes invariant checks + #[derive(Debug)] + struct OkExtensionNode(Vec>); + impl ExecutionPlan for OkExtensionNode { + fn name(&self) -> &str { + "always ok" + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self(children))) + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + self.0.iter().collect::>() + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for OkExtensionNode { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + /// Extension Node which fails the [`OptimizationInvariantChecker`]. + #[derive(Debug)] + struct InvariantFailsExtensionNode; + impl ExecutionPlan for InvariantFailsExtensionNode { + fn name(&self) -> &str { + "InvariantFailsExtensionNode" + } + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"), + InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"), + } + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + unimplemented!() + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for InvariantFailsExtensionNode { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + /// Extension Optimizer rule that requires the schema check + #[derive(Debug)] + struct OptimizerRuleWithSchemaCheck; + impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + Ok(plan) + } + fn name(&self) -> &str { + "OptimizerRuleWithSchemaCheck" + } + fn schema_check(&self) -> bool { + true + } + } + + #[test] + fn test_optimization_invariant_checker() -> Result<()> { + let rule: Arc = + Arc::new(OptimizerRuleWithSchemaCheck); + + // ok plan + let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); + let child = Arc::clone(&ok_node); + let ok_plan = Arc::clone(&ok_node).with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?, + Arc::clone(&child), + ])?; + + // Test: check should pass with same schema + let equal_schema = ok_plan.schema(); + OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?; + + // Test: should fail with schema changed + let different_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); + let expected_err = OptimizationInvariantChecker::new(&rule) + .check(&ok_plan, different_schema) + .unwrap_err(); + assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema")); + + // Test: should fail when extension node fails it's own invariant check + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let expected_err = OptimizationInvariantChecker::new(&rule) + .check(&failing_node, ok_plan.schema()) + .unwrap_err(); + assert!(expected_err + .to_string() + .contains("extension node failed it's user-defined always-invariant check")); + + // Test: should fail when descendent extension node fails + let failing_node: Arc = Arc::new(InvariantFailsExtensionNode); + let invalid_plan = ok_node.with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, + Arc::clone(&child), + ])?; + let expected_err = OptimizationInvariantChecker::new(&rule) + .check(&invalid_plan, ok_plan.schema()) + .unwrap_err(); + assert!(expected_err + .to_string() + .contains("extension node failed it's user-defined always-invariant check")); + + Ok(()) + } + + /// Extension Node which fails the [`InvariantChecker`] + /// if, and only if, [`InvariantLevel::Executable`] + #[derive(Debug)] + struct ExecutableInvariantFails; + impl ExecutionPlan for ExecutableInvariantFails { + fn name(&self) -> &str { + "ExecutableInvariantFails" + } + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => Ok(()), + InvariantLevel::Executable => plan_err!( + "extension node failed it's user-defined executable-invariant check" + ), + } + } + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + fn children(&self) -> Vec<&Arc> { + vec![] + } + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + } + impl DisplayAs for ExecutableInvariantFails { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } + } + + #[test] + fn test_invariant_checker_levels() -> Result<()> { + // plan that passes the always-invariant, but fails the executable check + let plan: Arc = Arc::new(ExecutableInvariantFails); + + // Test: check should pass with less stringent Always check + InvariantChecker(InvariantLevel::Always).check(&plan)?; + + // Test: should fail the executable check + let expected_err = InvariantChecker(InvariantLevel::Executable) + .check(&plan) + .unwrap_err(); + assert!(expected_err.to_string().contains( + "extension node failed it's user-defined executable-invariant check" + )); + + // Test: should fail when descendent extension node fails + let failing_node: Arc = Arc::new(ExecutableInvariantFails); + let ok_node: Arc = Arc::new(OkExtensionNode(vec![])); + let child = Arc::clone(&ok_node); + let plan = ok_node.with_new_children(vec![ + Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?, + Arc::clone(&child), + ])?; + let expected_err = InvariantChecker(InvariantLevel::Executable) + .check(&plan) + .unwrap_err(); + assert!(expected_err.to_string().contains( + "extension node failed it's user-defined executable-invariant check" + )); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6d7935553116..753234c09994 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -110,6 +110,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// trait, which is implemented for all `ExecutionPlan`s. fn properties(&self) -> &PlanProperties; + /// Returns an error if this individual node does not conform to its invariants. + /// These invariants are typically only checked in debug mode. + /// + /// A default set of invariants is provided in the default implementation. + /// Extension nodes can provide their own invariants. + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + Ok(()) + } + /// Specifies the data distribution requirements for all the /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child, fn required_input_distribution(&self) -> Vec { @@ -424,6 +433,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } +/// [`ExecutionPlan`] Invariant Level +/// +/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan` +/// +/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science +#[derive(Clone, Copy)] +pub enum InvariantLevel { + /// Invariants that are always true for the [`ExecutionPlan`] node + /// such as the number of expected children. + Always, + /// Invariants that must hold true for the [`ExecutionPlan`] node + /// to be "executable", such as ordering and/or distribution requirements + /// being fulfilled. + Executable, +} + /// Extension trait provides an easy API to fetch various properties of /// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`]. pub trait ExecutionPlanProperties { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index cfa919425c54..bcd9572f45c7 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -32,14 +32,16 @@ use super::{ ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; +use crate::execution_plan::{ + boundedness_from_children, emission_type_from_children, InvariantLevel, +}; use crate::metrics::BaselineMetrics; use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties}; @@ -172,6 +174,14 @@ impl ExecutionPlan for UnionExec { &self.cache } + fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + (self.inputs().len() >= 2) + .then_some(()) + .ok_or(DataFusionError::Internal( + "UnionExec should have at least 2 children".into(), + )) + } + fn children(&self) -> Vec<&Arc> { self.inputs.iter().collect() }