Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[substrait] Add support for ExtensionTable #13772

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ccciudatu
Copy link
Contributor

@ccciudatu ccciudatu commented Dec 13, 2024

Which issue does this PR close?

Closes #13771.
Addresses the first bullet in #13318.

Rationale for this change

Adds support for encoding/decoding custom table providers as ExtensionTables in Substrait.

What changes are included in this PR?

Two more methods for SerializerRegistry to handle TableSources and the necessary changes in from/to _substrait_plan to transparently map custom tables to ExtensionTable nodes.

Are these changes tested?

A round trip test is included.

Are there any user-facing changes?

No breaking changes, only a couple of convenience changes, such as default implementations for trait methods.

@github-actions github-actions bot added logical-expr Logical plan and expressions substrait labels Dec 13, 2024
@ccciudatu ccciudatu force-pushed the substrait-extension-tables branch from a65e926 to bac38cc Compare December 13, 2024 22:05
@github-actions github-actions bot added the core Core DataFusion crate label Dec 14, 2024
"Deserializing user defined logical plan node `{name}` is not supported"
)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SerializerRegistry trait now has two more methods for handling tables (with default implementations for backwards compatibility), so it makes sense for the existing methods to have default implementations as well.
This will allow implementors to conveniently implement the trait for user-defined logical nodes only or for tables only.
Since the implementations here are perfect as trait defaults, this PR just moves them into the trait itself.

@ccciudatu ccciudatu force-pushed the substrait-extension-tables branch from 8db92b8 to a12007b Compare December 16, 2024 14:16
@@ -994,8 +994,34 @@ pub async fn from_substrait_rel(
)
.await
}
_ => {
not_impl_err!("Unsupported ReadType: {:?}", &read.as_ref().read_type)
Some(ReadType::ExtensionTable(ext)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't actually get to handling ExtensionTables in #13803, but I think I've got most what we would need in place for it.

The way I would approach it, based on the work in that PR, is to add a method to the SubstraitConsumer trait like:

async fn consume_extension_table(&self, extension_table: &ExtensionTable) -> Result<LogicalPlan>

and wire it in here. Then, as a user you would be able to provide your own implementation of the decoder. This might user the SerializerRegistry, but it doesn't necessarily need to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great! I like where that is going.
My goal here was to add the missing support for reading & writing extension tables leveraging only what's available (to keep the patch as small as possible).
But I do agree that uniform handling of Substrait extensions would make more sense and would overcome some of the current limitations.

I think the code here is really easy to migrate to the new SubstraitConsumer (and SubstraitProducer?) interfaces once they're available, by just replacing the SerializerRegistry calls with the new dedicated read/write methods. But the rest would be mostly unchanged by this migration.
FWIW, once your refactoring is complete, I think there would be no place for SerializerRegistry anymore, and it should be removed (or at least deprecated).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and SubstraitProducer?)

I haven't gotten around to the producer yet, but if the SubstraitConsumer pattern makes sense to folks it should be easy enough to hammer it out.

I think the code here is really easy to migrate to the new SubstraitConsumer

Would you be open to having #13803 merged first, and then porting your code over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can do that.

@alamb
Copy link
Contributor

alamb commented Dec 21, 2024

@alamb alamb marked this pull request as draft December 21, 2024 02:57
@ccciudatu ccciudatu force-pushed the substrait-extension-tables branch from a12007b to bfda3d2 Compare December 24, 2024 10:26
@ccciudatu
Copy link
Contributor Author

@vbarua I rebased on top of #13803 and added a consume_extension_table method to the SubstraitConsumer trait.
I chose to pass the schema and projection to this method (instead of keeping the TableScan postprocessing in from_read_rel) to allow custom implementations to use that information for fully restoring their custom tables if needed.

However, there are a few things that I'd like to revisit once we have a (symmetrical) customisable SubstraitProducer alongside the consumer. In particular, I want to move away from the SerializerRegistry trait and provide full control to the user over the serialisation of the whole ReadRel in case of extension tables by working with proto objects directly instead of trying to model this as a generic serde concern. This would allow the user to:

  1. store the table name in (e.g.) ReadRel::common::hint::alias instead of abusing the proto::Any::type_url field of ExtensionTable::detail
  2. Use the ExtensionTable::detail::type_url field to distinguish between multiple table implementations (as per the protobuf spec)
  3. leverage the available ReadRel information that is currently unused (e.g. filtering, advanced extensions etc.)

@ccciudatu ccciudatu marked this pull request as ready for review December 24, 2024 11:03
@ccciudatu ccciudatu requested a review from vbarua December 24, 2024 11:03
extension_table: &ExtensionTable,
_schema: &DFSchema,
_projection: &Option<MaskExpression>,
) -> Result<LogicalPlan> {
Copy link
Contributor

@vbarua vbarua Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to pass the schema and projection to this method (instead of keeping the TableScan postprocessing in from_read_rel) to allow custom implementations to use that information for fully restoring their custom tables if needed.

I like where your head is at with this, but I almost want to go further. You already called out:

leverage the available ReadRel information that is currently unused (e.g. filtering, advanced extensions etc.)

Maybe the interface for this should just be:

    fn consume_extension_table(
        &self,
        read_rel: &ReadRel
        extension_table: &ExtensionTable) -> Result<LogicalPlan>

which will be future proofed for if fields are ever added to the ReadRel, and also provides access to common fields on the ReadRel.

We could even go further and add

    fn consume_named_table(
        &self,
        read_rel: &ReadRel
        named_table: &NamedTable) -> Result<LogicalPlan>

    fn consume_virtual_table(
        &self,
        read_rel: &ReadRel
        named_table: &VirtualTable) -> Result<LogicalPlan>

to make it easier to customize behaviour for specific read_types. This last idea might be better as it's own PR, as we would need to factor out some of the code in from_read_rel into functions to be re-used across the new helpers.

Copy link
Contributor Author

@ccciudatu ccciudatu Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see your point, especially as I've been toying with the same idea myself.
However, I found the intrinsic redundancy a bit error prone: at least in theory, this interface allows the ReadRel to contain a table other than the one passed in as the second argument.
Eliminating this redundancy would end up with the exact same signature as consume_read, which renders the new helper(s) superfluous.

Copy link
Contributor

@Blizzara Blizzara Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is maybe slightly against the stated goal of "to allow custom implementations to use that information for fully restoring their custom tables if needed", but I'm not sure why someone would need custom impl for that behavior based on the read type?

How about something like:

from_read_rel(consumer: &SubstraitConsumer, read: &ReadRel, ..) -> .. {
   let plan = match read.type {
     Some(NamedTable(nt)) => consumer.consume_named_table(nt)
     Some(VirtualTable(vt)) =>  consumer.consume_virtual_table(vt),
     Some(ExtensionTable(et)) => consumer.consume_extension_table(et),
    ...
  }
  ensure_schema_compatibility(plan.schema(), schema.clone())?;
  let schema = apply_masking(schema, projection)?;
  apply_projection(plan, schema)
}

That way the ReadRel handling doesn't need to happen in multiple places, its projection and schema are by default handled the same way for all relations (which I'd think they should?), but if a user wants they can easily override the whole from_read_rel (or more specifically, SubstraitConsumer::consume_read) and compose their desired result.

(Actually, dunno if there's reason to have consumer.consume_named_table() and .consume_virtual_table(), given probably nothing else than consume_read calls those, their default impl should be good enough and if not they can always be overridden by implementing consume_read. But having consumer.consume_extension_table makes sense as an easy way to specify that behavior.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blizzara You are most likely right. My main concern was to make sure the table name gets propagated for extension tables as well.
As @vbarua suggested, the name can be encoded in the extension payload, but I found that redundant, possibly because I misread the Substrait spec and assumed the name already had its reserved place outside the ExtensionTable::detail, but I'm a lot less convinced about that now.

I'll have a look at the new substrait producer and give it another try, unless this feature can be fully covered by implenting the Producer/Consumer traits, in which case I'll probably just close this PR without merging.

@vbarua
Copy link
Contributor

vbarua commented Dec 24, 2024

Left some feedback about the consume_extension_table API.

However, there are a few things that I'd like to revisit once we have a (symmetrical) customisable SubstraitProducer alongside the consumer.

I'm planning on working that over the holidays #13901, so that might come round faster than you think.

I want to move away from the SerializerRegistry trait and provide full control to the user over the serialisation of the whole ReadRel in case of extension tables by working with proto objects directly instead of trying to model this as a generic serde concern.

100%

store the table name in (e.g.) ReadRel::common::hint::alias instead of abusing the proto::Any::type_url field of ExtensionTable::detail

Honestly, if a user was doing this I would recommend that they store the name in the extension message directly.

@ccciudatu
Copy link
Contributor Author

I'm planning on working that over the holidays #13901, so that might come round faster than you think.

@vbarua That's great, especially since this PR feels a bit half-baked after the last rebase (as an attempt to provide a coherent roundtrip between the current inflexible producer and the new SubstraitConsumer).

Maybe I should just hold this until the SubstraitProducer is added and reassess whether this extension point is still needed or this capability can be achieved by simply providing custom trait implementations with fallbacks for produce_read and consume_read respectively, thus requiring no explicit support in the library code.

@ccciudatu ccciudatu marked this pull request as draft January 8, 2025 08:34
@ccciudatu ccciudatu force-pushed the substrait-extension-tables branch 3 times, most recently from faae1f9 to a11ea03 Compare January 8, 2025 22:05
@ccciudatu
Copy link
Contributor Author

ccciudatu commented Jan 8, 2025

@vbarua @Blizzara I finally got back to this, trying to figure out whether it still makes sense with the new APIs.
With the latest changes, supporting extension tables no longer requires a fork of datafusion, but merely a custom implementation for the new traits. However, there's a bit of boilerplate/delegation/copy-paste required for leveraging the default functionality.

So I decided to give this another try with a few changes in a fixup commit:

  • I changed the SerializerRegistry trait to return a string qualifier alongside the bytes on serialize. This way, the implementor can make proper use of the protobuf type_url or can use any string qualifier as a discriminator for the actual TableProvider implementation. The same pattern is used for UserDefinedLogicalPlanNode serialization as well, for consistency and to allow implementors to use the canonical protobuf type urls.
  • Since the ExtensionTable::detail::type_url is now supposed to be the proper protobuf one, I chose to store the original table name as rel.common.hint.alias, hoping to convince @vbarua that a custom TableProvider implementation is hardly aware of the names under which users choose to register the corresponding tables (especially for UDTFs), so it's not trivial to make the name part of the extension payload and it imposes unnecessary restrictions for the supported table providers. The alias property is loosely specified; the canonical example seems to be the SubqueryAlias, but it's not limited to that and it feels "close enough". A default name will be used if the hint is missing.

@ccciudatu ccciudatu marked this pull request as ready for review January 8, 2025 23:00
@ccciudatu ccciudatu force-pushed the substrait-extension-tables branch from a11ea03 to 64fb0e5 Compare January 8, 2025 23:01
@ccciudatu ccciudatu requested a review from vbarua January 8, 2025 23:01
@vbarua
Copy link
Contributor

vbarua commented Jan 9, 2025

With the latest changes, supporting extension tables no longer requires a fork of datafusion, but merely a custom implementation for the new traits.

Nice!

However, there's a bit of boilerplate/delegation/copy-paste required for leveraging the default functionality.

@ccciudatu do you have an example of any standard and/or built-in DataFusion functionality that can't be serialised to Substrait without this boilerplate? Is it primarily or only UserDefineTypeRels?

I'm realising that I don't have a good idea of what this would actually be needed for in standard DataFusion. If there is existing functionality that needs this kind of handling to work, I think it makes sense to include it in the default consumer and producer.

Most of my experience with ExtensionTables in Substrait (and hence biases) comes from generating plans outside of DataFusion where we have table-like sources, like embedding a SQL query for another system or fetching data from an internal cache store, which aren't part of standard Substrait (and are also highly-specific to our use case and deployments). For stuff like this we define our own handling because we don't expect DataFusion to be able to handle our custom messages.

a custom TableProvider implementation is hardly aware of the names under which users choose to register the corresponding tables (especially for UDTFs)

That's somewhat my argument for not supporting custom stuff generally. The only people who can know what names they used are the ones generating the plans and wiring in the UDTFs in the first place. As long as they have the hooks in place to wire things up how they want, we can service their needs.

@ccciudatu
Copy link
Contributor Author

Thanks, @vbarua. And sorry about the delay.

@ccciudatu do you have an example of any standard and/or built-in DataFusion functionality that can't be serialised to Substrait without this boilerplate?

No, I don't think any standard/built-in component is ever supposed to end up as an ExtensionTable or ExtensionRel of any kind.
However, since we do have explicit support for UserDefinedNodeTypes, it kind of makes sense to add support for user-defined tables as well. Trying to represent custom tables as ExtensionLeafRels in Substrait and UserDefinedNodeTypes has several limitations that I hinted at in the associated issue, e.g. the table context information gets lost in an ExtensionLeafRel as opposed to a ReadRel, ExtensionPlanner::plan_extension is not a suitable wrapper around TableProvider::scan, both producers and consumers need to pre/post-process the logical plans before/after conversion etc.

Is it primarily or only UserDefineTypeRels?

Yes, this is only meant to address user-defined / custom TableProvider implementations, and only the ones that cannot be restored by name alone -- otherwise a NamedTable read_type would suffice.

I'm realising that I don't have a good idea of what this would actually be needed for in standard DataFusion. If there is existing functionality that needs this kind of handling to work, I think it makes sense to include it in the default consumer and producer.

Again, I'm not sure if you consider extension points like UserDefinedLogicalNode and TableProvider as part of "standard DataFusion". However, I see them as equally legit, i.e. if there's room for the first in the default producer/consumer, the latter should also be achievable with just a custom serializer.

Most of my experience with ExtensionTables in Substrait (and hence biases) comes from generating plans outside of DataFusion where we have table-like sources, like embedding a SQL query for another system or fetching data from an internal cache store, which aren't part of standard Substrait (and are also highly-specific to our use case and deployments). For stuff like this we define our own handling because we don't expect DataFusion to be able to handle our custom messages.

One use-case that I'm trying to address with this is a UDTF that integrates a legacy OLAP system that we're using as a data source for both DataFusion and Spark, using Substrait to federate the query plan as we see fit.

An SQL query like select * from legacy_olap_system(arg1, arg2, ...) results in a TableScan node that references a custom TableProvider implementation and has the name "tmp_table" assigned by DataFusion.
Encoding/decoding such a plan to/from Substrait with the current producer/consumer results in the equivalent of select * from tmp_table and errors out with a "table not found" message.

With the code in this patch and a custom SerializerRegistry, such a custom table will encode its internal state as proto and will get fully restored in a different DataFusion context (or a different system altogether).

a custom TableProvider implementation is hardly aware of the names under which users choose to register the corresponding tables (especially for UDTFs)

That's somewhat my argument for not supporting custom stuff generally. The only people who can know what names they used are the ones generating the plans and wiring in the UDTFs in the first place. As long as they have the hooks in place to wire things up how they want, we can service their needs.

I agree this doesn't feel exactly right. The Substrait spec for extension tables is quite poorly defined, so to recreate the exact same plan (including names) on roundtrips we do require either small hacks or a more complex interface.
However, this is not a hard requirement and the code accepts any compliant input. I can even remove this hack for now and wait/request for enhancements in the Substrait spec, if this is too ugly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[substrait] Add support for ExtensionTable
4 participants