Skip to content

Commit

Permalink
feat: Improve subquery support
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed May 26, 2023
1 parent 1a26be2 commit 5e65b8d
Show file tree
Hide file tree
Showing 24 changed files with 1,261 additions and 344 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ cranelift-module = { version = "0.82.0", optional = true }
ordered-float = "2.10"
parquet = { git = 'https://github.com/cube-js/arrow-rs.git', rev = "096ef28dde6b1ae43ce89ba2c3a9d98295f2972e", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "10782e5d11fc0e2900c9359dddee0fbefbffd359" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "0bea7fa17907b96f32abf4bd6bb1cde43fe8d244" }
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pin-project-lite= "^0.2.7"
pyo3 = { version = "0.16", optional = true }
rand = "0.8"
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "10782e5d11fc0e2900c9359dddee0fbefbffd359" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "0bea7fa17907b96f32abf4bd6bb1cde43fe8d244" }
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::ILike { .. }
| Expr::SimilarTo { .. }
| Expr::InList { .. }
| Expr::InSubquery { .. }
| Expr::GetIndexedField { .. }
| Expr::Case { .. } => Recursion::Continue(self),

Expand Down
17 changes: 16 additions & 1 deletion datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ impl ExprRewritable for Expr {
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::AnyExpr { left, op, right } => Expr::AnyExpr {
Expr::AnyExpr {
left,
op,
right,
all,
} => Expr::AnyExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
all,
},
Expr::Like(Like {
negated,
Expand Down Expand Up @@ -263,6 +269,15 @@ impl ExprRewritable for Expr {
list: rewrite_vec(list, rewriter)?,
negated,
},
Expr::InSubquery {
expr,
subquery,
negated,
} => Expr::InSubquery {
expr: rewrite_boxed(expr, rewriter)?,
subquery: rewrite_boxed(subquery, rewriter)?,
negated,
},
Expr::Wildcard => Expr::Wildcard,
Expr::QualifiedWildcard { qualifier } => {
Expr::QualifiedWildcard { qualifier }
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/logical_plan/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl ExprSchemable for Expr {
| Expr::IsNull(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::InSubquery { .. }
| Expr::AnyExpr { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::BinaryExpr {
Expand Down Expand Up @@ -158,7 +159,7 @@ impl ExprSchemable for Expr {
| Expr::Between { expr, .. }
| Expr::InList { expr, .. } => expr.nullable(input_schema),
Expr::Column(c) => input_schema.nullable(c),
Expr::OuterColumn(_, _) => Ok(true),
Expr::OuterColumn(_, _) | Expr::InSubquery { .. } => Ok(true),
Expr::Literal(value) => Ok(value.is_null()),
Expr::Case {
when_then_expr,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/logical_plan/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl ExprVisitable for Expr {
list.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))
}
Expr::InSubquery { expr, subquery, .. } => {
let visitor = expr.accept(visitor)?;
subquery.accept(visitor)
}
}?;

visitor.post_visit(self)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ pub use plan::{
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
DropTable, EmptyRelation, Filter, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery,
TableScan, ToStringifiedPlan, Union, Values,
SubqueryNode, SubqueryType, TableScan, ToStringifiedPlan, Union, Values,
};
pub use registry::FunctionRegistry;
107 changes: 104 additions & 3 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::DataFusionError;
use crate::logical_plan::dfschema::DFSchemaRef;
use crate::sql::parser::FileType;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::DFSchema;
use datafusion_common::{DFField, DFSchema};
use std::fmt::Formatter;
use std::{
collections::HashSet,
Expand Down Expand Up @@ -267,14 +267,37 @@ pub struct Limit {
/// Evaluates correlated sub queries
#[derive(Clone)]
pub struct Subquery {
/// The list of sub queries
/// The list of sub queries (SubqueryNode)
pub subqueries: Vec<LogicalPlan>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The schema description of the output
pub schema: DFSchemaRef,
}

/// Subquery node defines single subquery with its type
#[derive(Clone)]
pub struct SubqueryNode {
/// The logical plan of subquery
pub input: Arc<LogicalPlan>,
/// The subquery type
pub typ: SubqueryType,
/// The schema description of the output
pub schema: DFSchemaRef,
}

/// Subquery type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub enum SubqueryType {
/// Scalar (SELECT, WHERE) evaluating to one value
Scalar,
/// EXISTS(...) evaluating to true if at least one row was produced
Exists,
/// ANY(...) / ALL(...)
AnyAll,
// [NOT] IN(...) is not defined as it is implicitly evaluated as ANY = (...) / ALL <> (...)
}

impl Subquery {
/// Merge schema of main input and correlated subquery columns
pub fn merged_schema(input: &LogicalPlan, subqueries: &[LogicalPlan]) -> DFSchema {
Expand All @@ -284,6 +307,72 @@ impl Subquery {
res
})
}

/// Transform DataFusion schema according to subquery type
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
match typ {
SubqueryType::Scalar => schema.clone(),
SubqueryType::Exists | SubqueryType::AnyAll => {
let new_fields = schema
.fields()
.iter()
.map(|field| {
let new_field = Subquery::transform_field(field.field(), typ);
if let Some(qualifier) = field.qualifier() {
DFField::from_qualified(qualifier, new_field)
} else {
DFField::from(new_field)
}
})
.collect();
DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
.unwrap()
}
}
}

/// Transform Arrow field according to subquery type
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
match typ {
SubqueryType::Scalar => field.clone(),
SubqueryType::Exists => Field::new(field.name(), DataType::Boolean, false),
// ANY/ALL subquery converts subquery result rows into a list
// and uses existing code evaluating ANY with a list to evaluate the result
SubqueryType::AnyAll => {
let item = Field::new_dict(
"item",
field.data_type().clone(),
true,
field.dict_id().unwrap_or(0),
field.dict_is_ordered().unwrap_or(false),
);
Field::new(field.name(), DataType::List(Box::new(item)), false)
}
}
}
}

impl SubqueryNode {
/// Creates a new SubqueryNode evaluating the schema based on subquery type
pub fn new(input: LogicalPlan, typ: SubqueryType) -> Self {
let schema = Subquery::transform_dfschema(input.schema(), typ);
Self {
input: Arc::new(input),
typ,
schema: Arc::new(schema),
}
}
}

impl Display for SubqueryType {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Scalar => "Scalar",
Self::Exists => "Exists",
Self::AnyAll => "AnyAll",
};
write!(f, "{}", name)
}
}

/// Values expression. See
Expand Down Expand Up @@ -402,6 +491,8 @@ pub enum LogicalPlan {
Limit(Limit),
/// Evaluates correlated sub queries
Subquery(Subquery),
/// Single subquery node with subquery type
SubqueryNode(SubqueryNode),
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
Expand Down Expand Up @@ -439,6 +530,7 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Subquery(Subquery { schema, .. }) => schema,
LogicalPlan::SubqueryNode(SubqueryNode { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
Expand Down Expand Up @@ -498,7 +590,8 @@ impl LogicalPlan {
schemas.insert(0, schema);
schemas
}
LogicalPlan::Union(Union { schema, .. }) => {
LogicalPlan::Union(Union { schema, .. })
| LogicalPlan::SubqueryNode(SubqueryNode { schema, .. }) => {
vec![schema]
}
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
Expand Down Expand Up @@ -569,6 +662,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryNode(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_) => {
vec![]
Expand All @@ -587,6 +681,7 @@ impl LogicalPlan {
.into_iter()
.chain(subqueries.iter())
.collect(),
LogicalPlan::SubqueryNode(SubqueryNode { input, .. }) => vec![input],
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
LogicalPlan::Window(Window { input, .. }) => vec![input],
Expand Down Expand Up @@ -735,6 +830,9 @@ impl LogicalPlan {
}
true
}
LogicalPlan::SubqueryNode(SubqueryNode { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
LogicalPlan::Repartition(Repartition { input, .. }) => {
input.accept(visitor)?
Expand Down Expand Up @@ -1064,6 +1162,9 @@ impl LogicalPlan {
Ok(())
}
LogicalPlan::Subquery(Subquery { .. }) => write!(f, "Subquery"),
LogicalPlan::SubqueryNode(SubqueryNode { typ, .. }) => {
write!(f, "SubqueryNode: type={:?}", typ)
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ fn optimize(
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryNode(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
Expand Down Expand Up @@ -508,6 +509,10 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("InList-");
desc.push_str(&negated.to_string());
}
Expr::InSubquery { negated, .. } => {
desc.push_str("InSubquery-");
desc.push_str(&negated.to_string());
}
Expr::Wildcard => {
desc.push_str("Wildcard-");
}
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/optimizer/projection_drop_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{Aggregate, Projection, Sort, Subquery};
use crate::logical_plan::{
normalize_col, replace_col_to_expr, unnormalize_col, Column, DFField, DFSchema,
Filter, LogicalPlan,
Filter, LogicalPlan, SubqueryNode,
};
use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
Expand Down Expand Up @@ -274,6 +274,17 @@ fn optimize_plan(
None,
))
}
LogicalPlan::SubqueryNode(SubqueryNode { input, typ, .. }) => {
// TODO: subqueries are not optimized
Ok((
LogicalPlan::SubqueryNode(SubqueryNode::new(
optimize_plan(input, _optimizer_config, false, aliased_projection)
.map(|(p, _)| p)?,
*typ,
)),
None,
))
}
LogicalPlan::Join(_)
| LogicalPlan::Window(_)
| LogicalPlan::Analyze(_)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ fn optimize_plan(
| LogicalPlan::CrossJoin(_)
| LogicalPlan::TableUDFs(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::SubqueryNode(_)
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ impl<'a> ConstEvaluator<'a> {
| Expr::OuterColumn(_, _)
| Expr::WindowFunction { .. }
| Expr::Sort { .. }
| Expr::InSubquery { .. }
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => false,
Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()),
Expand Down
Loading

0 comments on commit 5e65b8d

Please sign in to comment.