Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for cloud object storage (e.g. s3) based shuffle #48

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
624 changes: 599 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
bytes = "1.8.0"
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"
futures = "0.3"
glob = "0.3.1"
log = "0.4"
object_store = { version = "0.11.1", features = ["aws"] }
prost = "0.13"
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }
Expand Down
3 changes: 1 addition & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ pub fn execute_partition(
py: Python,
) -> PyResult<PyResultSet> {
let plan = deserialize_execution_plan(plan_bytes)?;
_execute_partition(plan, part)
.unwrap()
_execute_partition(plan, part)?
.into_iter()
.map(|batch| batch.to_pyarrow(py))
.collect()
Expand Down
11 changes: 8 additions & 3 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::query_stage::PyQueryStage;
use crate::query_stage::QueryStage;
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
use datafusion::error::Result;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
Expand Down Expand Up @@ -229,9 +229,14 @@ fn create_shuffle_exchange(

fn create_temp_dir(stage_id: usize) -> Result<String> {
let uuid = Uuid::new_v4();
let temp_dir = format!("/tmp/ray-sql-{uuid}-stage-{stage_id}");
let temp_dir = format!("/tmp/ray-sql-{uuid}/{stage_id}");
debug!("Creating temp shuffle dir: {temp_dir}");
std::fs::create_dir(&temp_dir)?;
std::fs::create_dir_all(&temp_dir).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create shuffle directory {}: {:?}",
temp_dir, e
))
})?;
Ok(temp_dir)
}

Expand Down
1 change: 0 additions & 1 deletion src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionC
use datafusion_proto::protobuf::{self, PhysicalHashRepartition, PhysicalPlanNode};
use prost::Message;
use std::sync::Arc;

#[derive(Debug)]
pub struct ShuffleCodec {}

Expand Down
27 changes: 27 additions & 0 deletions src/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::Stream;
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::macros::support::thread_rng_n;

Expand Down Expand Up @@ -96,3 +99,27 @@ impl Stream for CombinedRecordBatchStream {
}
}
}

pub(crate) fn create_object_store() -> Result<Arc<dyn ObjectStore>> {
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_default();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default();
if access_key_id.is_empty() || secret_access_key.is_empty() {
println!("Warning! AWS_ACCESS_KEY_ID and/or AWS_SECRET_ACCESS_KEY are not defined");
}

// TODO configs
let bucket_name = "dfray";
let region = "us-east-1";
let endpoint = "http://127.0.0.1:9000";

Ok(Arc::new(
AmazonS3Builder::new()
.with_endpoint(endpoint)
.with_allow_http(true)
.with_region(region)
.with_bucket_name(bucket_name)
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key)
.build()?,
))
}
147 changes: 118 additions & 29 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::shuffle::CombinedRecordBatchStream;
use crate::shuffle::{create_object_store, CombinedRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::record_batch::RecordBatch;
Expand All @@ -28,15 +28,20 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use futures::Stream;
use glob::glob;
use futures::{Stream, StreamExt};
use log::debug;
use object_store::{path::Path as ObjectStorePath, ObjectStore};
use std::any::Any;
use std::fmt::Formatter;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::runtime::Handle;
use tokio::task::block_in_place;

#[derive(Debug)]
pub struct ShuffleReaderExec {
Expand Down Expand Up @@ -110,28 +115,71 @@ impl ExecutionPlan for ShuffleReaderExec {
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
let pattern = format!(
"/{}/shuffle_{}_*_{partition}.arrow",
self.shuffle_dir, self.stage_id
);
) -> Result<SendableRecordBatchStream> {
let mut streams: Vec<SendableRecordBatchStream> = vec![];
for entry in glob(&pattern).expect("Failed to read glob pattern") {
let file = entry.unwrap();

for input_part in 0..self.properties.partitioning.partition_count() {
let file = format!(
"{}/shuffle_{}_{}_{partition}.arrow",
self.shuffle_dir, self.stage_id, input_part
);
debug!(
"ShuffleReaderExec partition {} reading from stage {} file {}",
partition,
self.stage_id,
file.display()
partition, self.stage_id, file
);
let reader = FileReader::try_new(File::open(&file)?, None)?;
let stream = LocalShuffleStream::new(reader);
if self.schema != stream.schema() {
return Err(DataFusionError::Internal(
"Not all shuffle files have the same schema".to_string(),
));

let object_path = ObjectStorePath::from(file.as_str());
let object_store = create_object_store()?;

let result: Result<Option<LocalShuffleStream>> = block_in_place(move || {
Handle::current().block_on(async move {
match object_store.get(&object_path).await {
Ok(get_result) => {
println!("Downloading {file} from object storage");
let start = Instant::now();
let mut local_file = File::create(&file).map_err(|e| {
DataFusionError::Execution(format!(
"ShuffleReaderExec failed to create file: {}",
e
))
})?;
let mut stream = get_result.into_stream();
let mut total_bytes = 0;
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
total_bytes += bytes.len();
local_file.write_all(&bytes)?;
}
let end = Instant::now();
println!(
"Downloaded {file} with {total_bytes} bytes in {:?}",
end.duration_since(start)
);
println!("Deleting {} from object storage", object_path);
//object_store.delete(&object_path).await?;
Ok(Some(LocalShuffleStream::new(
PathBuf::from(&file),
self.schema.clone(),
)))
}
Err(e) => {
let error_message = e.to_string();
if error_message.contains("NotFound")
|| error_message.contains("NoSuchKey")
{
// this is fine
} else {
println!("Download failed: {}", e);
}
Ok(None)
}
}
})
});

if let Some(stream) = result? {
streams.push(Box::pin(stream));
}
streams.push(Box::pin(stream));
}
Ok(Box::pin(CombinedRecordBatchStream::new(
self.schema.clone(),
Expand All @@ -147,7 +195,7 @@ impl ExecutionPlan for ShuffleReaderExec {
"shuffle reader"
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &PlanProperties {
&self.properties
}
}
Expand All @@ -164,28 +212,69 @@ impl DisplayAs for ShuffleReaderExec {
}

struct LocalShuffleStream {
reader: FileReader<File>,
file: PathBuf,
reader: Option<FileReader<File>>,
/// The output schema of the query stage being read from
schema: SchemaRef,
}

impl LocalShuffleStream {
pub fn new(reader: FileReader<File>) -> Self {
LocalShuffleStream { reader }
pub fn new(file: PathBuf, schema: SchemaRef) -> Self {
LocalShuffleStream {
file,
schema,
reader: None,
}
}
}

impl Stream for LocalShuffleStream {
type Item = datafusion::error::Result<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.reader.next() {
return Poll::Ready(Some(batch.map_err(|e| e.into())));
if self.reader.is_none() {
// download the file from object storage

let file = File::open(&self.file).map_err(|e| {
DataFusionError::Execution(format!(
"ShuffleReaderExec failed to open file {}: {}",
self.file.display(),
e
))
})?;
self.reader = Some(FileReader::try_new(file, None).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to open IPC file {}: {:?}",
self.file.display(),
e
))
})?);

// TODO reinstate
// if self.schema != stream.schema() {
// return Err(DataFusionError::Internal(
// "Not all shuffle files have the same schema".to_string(),
// ));
// }
}
if let Some(reader) = self.reader.as_mut() {
if let Some(batch) = reader.next() {
return Poll::Ready(Some(batch.map_err(|e| {
DataFusionError::Execution(format!(
"Error reading batch from Arrow IPC file: {:?}",
e
))
})));
}
Poll::Ready(None)
} else {
unreachable!()
}
Poll::Ready(None)
}
}

impl RecordBatchStream for LocalShuffleStream {
fn schema(&self) -> SchemaRef {
self.reader.schema()
self.schema.clone()
}
}
Loading
Loading