Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface for physical plan invariant checking. #13986

Merged
merged 12 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 197 additions & 12 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
Expand Down Expand Up @@ -1880,18 +1881,12 @@ impl DefaultPhysicalPlanner {
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
));
return Err(DataFusionError::Context(
optimizer.name().to_string(),
Box::new(e),
));
}

// confirm optimizer change did not violate invariants
InvariantChecker
.check(&new_plan, optimizer, before_schema)
.map_err(|e| e.context(optimizer.name().to_string()))?;

trace!(
"Optimized physical plan by {}:\n{}\n",
optimizer.name(),
Expand Down Expand Up @@ -2006,6 +2001,45 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

#[derive(Default)]
struct InvariantChecker;

impl InvariantChecker {
/// Checks that the plan change is permitted, returning an Error if not.
///
/// In debug mode, this recursively walks the entire physical plan and
/// performs additional checks using [`ExecutionPlan::check_node_invariants`].
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
rule: &Arc<dyn PhysicalOptimizerRule + Send + Sync>,
wiedld marked this conversation as resolved.
Show resolved Hide resolved
previous_schema: Arc<Schema>,
) -> Result<()> {
// Invariant: in most cases, the schema cannot be changed
// since the plan's output cannot change after the optimizer pass.
if rule.schema_check() && plan.schema() != previous_schema {
internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
wiedld marked this conversation as resolved.
Show resolved Hide resolved
rule.name(),
previous_schema,
plan.schema()
)?
}

#[cfg(debug_assertions)]
plan.visit(self)?;
Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_node_invariants()?;
Ok(TreeNodeRecursion::Continue)
}
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand All @@ -2026,6 +2060,7 @@ mod tests {
use crate::execution::session_state::SessionStateBuilder;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, DFSchemaRef, TableReference};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -2780,4 +2815,154 @@ digraph {

assert_contains!(generated_graph, expected_tooltip);
}

/// Extension Node which passes invariant checks
#[derive(Debug)]
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
impl ExecutionPlan for OkExtensionNode {
fn name(&self) -> &str {
"always ok"
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self(children)))
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for OkExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Node which fails invariant checks
#[derive(Debug)]
struct InvariantFailsExtensionNode;
impl ExecutionPlan for InvariantFailsExtensionNode {
fn name(&self) -> &str {
"InvariantFailsExtensionNode"
}
fn check_node_invariants(&self) -> Result<()> {
plan_err!("extension node failed it's user-defined invariant check")
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for InvariantFailsExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Optimizer rule that requires the schema check
#[derive(Debug)]
struct OptimizerRuleWithSchemaCheck;
impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn name(&self) -> &str {
"OptimizerRuleWithSchemaCheck"
}
fn schema_check(&self) -> bool {
true
}
}

#[test]
fn test_invariant_checker() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(OptimizerRuleWithSchemaCheck);

// ok plan
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
Arc::clone(&child),
])?;

// Test: check should pass with same schema
let equal_schema = ok_plan.schema();
InvariantChecker.check(&ok_plan, &rule, equal_schema)?;

// Test: should fail with schema changed
let different_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let expected_err = InvariantChecker
.check(&ok_plan, &rule, different_schema)
.unwrap_err();
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema"));

// Test: should fail when extension node fails it's own invariant check
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let expected_err = InvariantChecker
.check(&failing_node, &rule, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let invalid_plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker
.check(&invalid_plan, &rule, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined invariant check"));

Ok(())
}
}
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// trait, which is implemented for all `ExecutionPlan`s.
fn properties(&self) -> &PlanProperties;

/// Returns an error if this individual node does not conform to its invariants.
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// These invariants are typically only checked in debug mode.
///
/// A default set of invariants is provided in the default implementation.
/// Extension nodes can provide their own invariants.
fn check_node_invariants(&self) -> Result<()> {
// TODO
Copy link
Contributor Author

@wiedld wiedld Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what should be the default set. The SanityCheckPlan does exactly what I had been thinking:

/// The SanityCheckPlan rule rejects the following query plans:
/// 1. Invalid plans containing nodes whose order and/or distribution requirements
/// are not satisfied by their children.
/// 2. Plans that use pipeline-breaking operators on infinite input(s),
/// it is impossible to execute such queries (they will never generate output nor finish)
#[derive(Default, Debug)]
pub struct SanityCheckPlan {}

Also, I think this optimizer pass does not mutate anything and instead validates?

Copy link
Contributor Author

@wiedld wiedld Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the SanityPlanChecker be an invariant checker instead, and then (a) run after the other optimizer rules are applied (current behavior) as well as (b) after each optimizer rule in debug mode -- would this be useful?

The added debug mode check could help isolate when a user-defined optimizer rule extension, or a user defined ExecutionPlan node, does not work well with the DF upgrade (e.g. changes in DF plan nodes or optimizer rules).

Copy link
Contributor

@ozankabak ozankabak Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, sanity checking is a "more general" process -- it verifies that any two operators that exchange data (i.e. one's output feeds the other's input) are compatible. So I don't think we can "change" it to be an invariant checker, but we can extend it to also check "invariants" of each individual operator (however they are defined by an ExecutionPlan) as it traverses the plan tree.

However, we can not blindly run sanity checking after every rule. Why? Because rules have the following types regarding their input/output plan validity:

  • Some rules only take in valid plans and output valid plans (e.g. ProjectionPushdown). These are typically applied at later stages in the optimization/plan construction process.
  • Some take in invalid or valid plans, and always create valid plans (e.g. EnforceSorting and EnforceDistribution). These can be applied any time, but are typically applied in the middle of the optimization/plan construction process.
  • Some take invalid plans and yield still invalid plans (IIRC JoinSelection is this way). These are typically applied early in the optimization/plan construction process.

As of this writing, we don't have a formal cut-off point in our list of rules whereafter plans remain valid, but I suspect they do after EnforceSorting. In debug/upgrade mode, we can apply SanityCheckPlan after every rule after that point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the logical planner we have a split between

  • AnalyzerRules that make plans Executable (e.g. by coercing types, etc)
  • OptimizerRules that don't change the plan semantics (e.g. output types are the same, etc)

It seems like maybe we could make the same separation for physical optimizer rules as well ("not yet executable") and ("read to execute"),

Some take invalid plans and yield still invalid plans (IIRC JoinSelection is this way). These are typically applied early in the optimization/plan construction process.

This was surprising to me (I am not doubting it). It looked at the other passes, and it seems there are a few others

Arc::new(OutputRequirements::new_add_mode()),
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
// as that rule may inject other operations in between the different AggregateExecs.
// Applying the rule early means only directly-connected AggregateExecs must be examined.
Arc::new(LimitedDistinctAggregation::new()),
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
// requirements. Please make sure that the whole plan tree is determined before this rule.
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
// least one of the operators in the plan benefits from increased parallelism.
Arc::new(EnforceDistribution::new()),

🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, sanity checking is a "more general" process -- it verifies that any two operators that exchange data (i.e. one's output feeds the other's input) are compatible. So I don't think we can "change" it to be an invariant checker, but we can extend it to also check "invariants" of each individual operator (however they are defined by an ExecutionPlan) as it traverses the plan tree.

I agree with this sentiment. It seems to me that the "SanityChecker" is verifying invariants that should be true for all nodes (regardless of what they do -- for example that the declared required input sort is the same as the produced output sort)

Thus, focusing on ExecutionPlan specific invariants might be a good first step.

Some simple invariants to start with I could imagine are:

  1. Number of inputs (e.g. that unions have more than zero inputs, for example)

Copy link
Contributor Author

@wiedld wiedld Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both for the reviews. Apologies on the delayed response.

To summarize this nice explanation from @ozankabak , the Executable-ness of the output plan (post optimizer run) is dependent upon what each optimizer run does and if the input plan was valid. Although it is surprising, we currently permit optimizer rules to output invalid plans.

As such, I added a PhysicalOptimizerRule::executable_check which defines the expected behavior per optimizer rule (see commit here). This also helps us surface which rules may produce unexecutable plans, as well as when we can define an output plan as "executable".

Next, the InvariantChecker was expanded to conditionally check the executableness based upon the declared expectations. If the plan is expected to be executable, then the invariants are checked for both (a) Datafusion internal definition of "executable" from the sanity plan check, as well as (b) any defined invariants on ExecutionPlan nodes (including user extension nodes).

Finally, there is an example test case added which demonstrates how this could be useful for catching incompatibilities with users' PhysicalOptimizerRule extensions.

Copy link
Contributor Author

@wiedld wiedld Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some rules only take in valid plans and output valid plans (e.g. ProjectionPushdown). These are typically applied at later stages in the optimization/plan construction process

When running our current sqllogic test suite, I found that most rules were passing through the same validity of plan. For example, the ProjectionPushdown usually had an "unexecutable" plan (due to an early OutputRequirements rule output) and the pushdown rule itself did not change the validity.

That is why the default impl behavior of the PhysicalOptimizerRule::executable_check is to pass through the current plan validity expectation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has now changed per #13986 (review).

Ok(())
}

/// Specifies the data distribution requirements for all the
/// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
fn required_input_distribution(&self) -> Vec<Distribution> {
Expand Down
Loading