Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Polars on Arrow2 and implement Polars on Arrow-rs #726

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
931 changes: 708 additions & 223 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ lto = true
[workspace.dependencies]
arrow = {version = "46", features = ["prettyprint", "ffi"]}
arrow2 = {version = "0.17", default-features = false}
polars = {version = "0.45", features=["dtype-u8", "dtype-u16", "lazy"]}
polars-arrow = {version = "0.45"}
943 changes: 558 additions & 385 deletions connectorx-python/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions connectorx-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ license = "MIT"
maintainers = ["Weiyuan Wu <[email protected]>"]
name = "connectorx"
readme = "README.md" # Markdown files are supported
version = "0.4.1-alpha1"

[project]
name = "connectorx" # Target file name of maturin build
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.10"
version = "0.4.1-alpha1"

[tool.poetry.dependencies]
dask = {version = "^2021", optional = true, extras = ["dataframe"]}
Expand All @@ -43,7 +43,7 @@ pandas = ["pandas"]
polars = ["pyarrow", "polars"]
pyarrow = ["pyarrow"]

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
black = "^21.4b0"
contexttimer = "^0.3.3"
dask = {extras = ["dataframe"], version = "^2021.7.0"}
Expand Down
8 changes: 5 additions & 3 deletions connectorx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ chrono = "0.4"

arrow = {workspace = true, optional = true}
arrow2 = {workspace = true, default-features = false, optional = true}
polars = {workspace = true, optional = true, features=["dtype-u8", "dtype-u16", "lazy"]}
polars-arrow = {workspace = true, optional = true}
bb8 = {version = "0.7", optional = true}
bb8-tiberius = {version = "0.8", optional = true}
csv = {version = "1", optional = true}
Expand All @@ -36,7 +38,6 @@ ndarray = {version = "0.15", optional = true}
num-traits = {version = "0.2", optional = true}
openssl = {version = "0.10", optional = true, features = ["vendored"]}
oracle = {version = "0.5", optional = true}
polars = {version = "0.32", optional = true, features=["dtype-u8", "dtype-u16"]}
postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-0_8", "with-serde_json-1"], optional = true}
postgres-native-tls = {version = "0.5", optional = true}
postgres-openssl = {version = "0.5", optional = true}
Expand Down Expand Up @@ -71,11 +72,12 @@ iai = "0.1"
pprof = {version = "0.5", features = ["flamegraph"]}

[features]
all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "src_trino", "dst_arrow", "dst_arrow2", "federation", "fed_exec"]
all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "src_trino", "dst_arrow", "dst_arrow2", "dst_arrow", "federation", "fed_exec", "dst_polars"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a duplicate dst_arrow?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed!

branch = []
default = ["fptr"]
dst_arrow = ["arrow"]
dst_arrow2 = ["polars", "arrow2"]
dst_arrow2 = ["polars", "polars-arrow", "arrow2"]
dst_polars = ["dst_arrow", "polars", "polars-arrow"]
fptr = []
src_bigquery = ["gcp-bigquery-client", "tokio"]
src_csv = ["csv", "regex"]
Expand Down
4 changes: 4 additions & 0 deletions connectorx/src/destinations/arrow/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub enum ArrowDestinationError {
#[error(transparent)]
ConnectorXError(#[from] crate::errors::ConnectorXError),

#[cfg(feature = "dst_polars")]
#[error(transparent)]
PolarsError(#[from] polars::error::PolarsError),

/// Any other errors that are too trivial to be put here explicitly.
#[error(transparent)]
Other(#[from] anyhow::Error),
Expand Down
66 changes: 66 additions & 0 deletions connectorx/src/destinations/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ use std::{
sync::{Arc, Mutex},
};

#[cfg(feature = "dst_polars")]
use {
arrow::ffi::to_ffi,
polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs},
polars_arrow::ffi::{import_array_from_c, import_field_from_c},
std::iter::FromIterator,
std::mem::transmute,
};

type Builder = Box<dyn Any + Send>;
type Builders = Vec<Builder>;

Expand Down Expand Up @@ -125,6 +134,63 @@ impl ArrowDestination {
.map_err(|e| anyhow!("mutex poisoned {}", e))?
}

#[cfg(feature = "dst_polars")]
#[throws(ArrowDestinationError)]
pub fn polars(self) -> DataFrame {
// Convert to arrow first
let rbs = self.arrow()?;

// Ready LazyFrame vector for the chunks
let mut lf_vec = vec![];

for chunk in rbs.into_iter() {
// Column vector
let mut columns = Vec::with_capacity(chunk.num_columns());

// Arrow stores data by columns, therefore need to be Zero-copied by column
for (i, col) in chunk.columns().iter().enumerate() {
// Convert to ArrayData (arrow-rs)
let array = col.to_data();

// Convert to ffi with arrow-rs
let (out_array, out_schema) = to_ffi(&array).unwrap();

// Import field from ffi with polars
let field = unsafe {
import_field_from_c(transmute::<
&arrow::ffi::FFI_ArrowSchema,
&polars_arrow::ffi::ArrowSchema,
>(&out_schema))
}
.unwrap();

// Import data from ffi with polars
let data = unsafe {
import_array_from_c(
transmute::<arrow::ffi::FFI_ArrowArray, polars_arrow::ffi::ArrowArray>(
out_array,
),
field.dtype().clone(),
)
}
.unwrap();

// Create Polars series from arrow column
columns.push(Series::from_arrow(
PlSmallStr::from(chunk.schema().field(i).name()),
data,
)?);
}

// Create DataFrame from the columns
lf_vec.push(DataFrame::from_iter(columns).lazy());
}

// Concat the chunks
let union_args = UnionArgs::default();
concat(lf_vec, union_args)?.collect()?
}

#[throws(ArrowDestinationError)]
pub fn record_batch(&mut self) -> Option<RecordBatch> {
let mut guard = self
Expand Down
91 changes: 48 additions & 43 deletions connectorx/src/destinations/arrow2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ use crate::typesystem::{Realize, TypeAssoc, TypeSystem};
use anyhow::anyhow;
use arrow2::array::{Array, MutableArray};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::datatypes::Schema;
use arrow2::ffi::{export_array_to_c, export_field_to_c};
use arrow_assoc::ArrowAssoc;
pub use errors::{Arrow2DestinationError, Result};
use fehler::throw;
use fehler::throws;
use funcs::{FFinishBuilder, FNewBuilder, FNewField};
use polars::prelude::{DataFrame, PolarsError, Series};
use std::convert::TryFrom;
use polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs};
use polars_arrow::ffi::{import_array_from_c, import_field_from_c};
use std::iter::FromIterator;
use std::mem::transmute;
use std::sync::{Arc, Mutex};
pub use typesystem::Arrow2TypeSystem;

Expand Down Expand Up @@ -118,51 +121,53 @@ impl Arrow2Destination {

#[throws(Arrow2DestinationError)]
pub fn polars(self) -> DataFrame {
// Convert to arrow first
let (rbs, schema): (Vec<Chunk<Box<dyn Array>>>, Arc<Schema>) = self.arrow()?;
//let fields = schema.fields.as_slice();
let fields: &[Field] = schema.fields.as_slice();

// This should be in polars but their version needs updating.
// Whave placed this here contained in an inner function until the fix is merged upstream
fn try_from(
chunks: (Vec<Chunk<Box<dyn Array>>>, &[Field]),
) -> std::result::Result<DataFrame, PolarsError> {
use polars::prelude::NamedFrom;

let mut series: Vec<Series> = vec![];

for chunk in chunks.0.into_iter() {
let columns_results: std::result::Result<Vec<Series>, PolarsError> = chunk
.into_arrays()
.into_iter()
.zip(chunks.1)
.map(|(arr, field)| {
let a = Series::try_from((field.name.as_str(), arr)).map_err(|_| {
PolarsError::ComputeError("Couldn't build Series from box".into())
});
a
})
.collect();

let columns = columns_results?;

if series.is_empty() {
for col in columns.iter() {
let name = col.name().to_string();
series.push(Series::new(&name, col));
}
continue;
}

for (i, col) in columns.into_iter().enumerate() {
series[i].append(&col)?;
}
let fields = schema.fields.as_slice();

// Ready LazyFrame vector for the chunks
let mut lf_vec = vec![];

for chunk in rbs.into_iter() {
// Column vector
let mut columns = Vec::with_capacity(chunk.len());

// Arrow stores data by columns, therefore need to be Zero-copied by column
for (i, col) in chunk.into_arrays().into_iter().enumerate() {
// From arrow2 to FFI
let ffi_schema = export_field_to_c(&fields[i]);
let ffi_array = export_array_to_c(col);

// From FFI to polars_arrow;
let field = unsafe {
import_field_from_c(transmute::<
&arrow2::ffi::ArrowSchema,
&polars_arrow::ffi::ArrowSchema,
>(&ffi_schema))
}?;
let data = unsafe {
import_array_from_c(
transmute::<arrow2::ffi::ArrowArray, polars_arrow::ffi::ArrowArray>(
ffi_array,
),
field.dtype().clone(),
)
}?;

// Create Polars series from arrow column
columns.push(Series::from_arrow(
PlSmallStr::from(fields[i].name.to_string()),
data,
)?);
}

DataFrame::new(series)
// Create DataFrame from the columns
lf_vec.push(DataFrame::from_iter(columns).lazy());
}

try_from((rbs, fields)).unwrap()
// Concat the chunks
let union_args = UnionArgs::default();
concat(lf_vec, union_args)?.collect()?
}
}

Expand Down
Loading
Loading