From 96495b7dbf2b38ecaf5720d15c8b2955e77f4bad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 19 Nov 2024 08:13:05 -0700 Subject: [PATCH] Use object store to transfer shuffle files between writers and readers --- Cargo.lock | 624 ++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 2 + src/context.rs | 3 +- src/planner.rs | 11 +- src/shuffle/codec.rs | 1 - src/shuffle/mod.rs | 27 ++ src/shuffle/reader.rs | 147 ++++++++-- src/shuffle/writer.rs | 223 +++++++++++---- 8 files changed, 925 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a976126..44bbb95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "snap", "strum 0.25.0", "strum_macros 0.25.3", - "thiserror", + "thiserror 1.0.64", "typed-builder", "uuid", "xz2", @@ -375,7 +375,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -387,6 +387,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.4.0" @@ -492,9 +498,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "bzip2" @@ -534,6 +540,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.38" @@ -543,6 +555,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "windows-targets", ] @@ -604,6 +617,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1127,11 +1150,13 @@ name = "datafusion_ray" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "datafusion", "datafusion-proto", "futures", "glob", "log", + "object_store", "pretty_assertions", "prost 0.13.3", "prost-types 0.13.3", @@ -1214,6 +1239,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1279,7 +1310,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1329,8 +1360,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1345,6 +1378,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -1405,12 +1457,109 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -1478,6 +1627,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "itertools" version = "0.10.5" @@ -1693,6 +1848,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1702,6 +1863,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "multimap" version = "0.8.3" @@ -1803,18 +1976,27 @@ dependencies = [ [[package]] name = "object_store" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" +checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" dependencies = [ "async-trait", + "base64", "bytes", "chrono", "futures", "humantime", + "hyper", "itertools 0.13.0", + "md-5", "parking_lot", "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -1828,6 +2010,12 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "ordered-float" version = "2.10.1" @@ -2092,7 +2280,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2160,7 +2348,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2173,7 +2361,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2182,6 +2370,68 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quinn" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.3", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +dependencies = [ + "bytes", + "getrandom", + "rand", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.3", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "quote" version = "1.0.37" @@ -2265,6 +2515,66 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "windows-registry", +] + +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -2277,6 +2587,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2299,6 +2615,62 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.23.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" +dependencies = [ + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +dependencies = [ + "web-time", +] + +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.18" @@ -2320,12 +2692,44 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.23" @@ -2355,7 +2759,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2370,6 +2774,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2426,7 +2842,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2435,6 +2851,22 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "sqlparser" version = "0.50.0" @@ -2453,7 +2885,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2487,7 +2919,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2500,7 +2932,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2522,15 +2954,24 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] + [[package]] name = "target-lexicon" version = "0.12.16" @@ -2556,7 +2997,16 @@ version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.64", +] + +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl 2.0.3", ] [[package]] @@ -2567,7 +3017,18 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] @@ -2613,8 +3074,12 @@ checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", + "libc", + "mio", "pin-project-lite", + "socket2", "tokio-macros", + "windows-sys 0.52.0", ] [[package]] @@ -2625,7 +3090,18 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", ] [[package]] @@ -2654,6 +3130,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.40" @@ -2673,7 +3155,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2685,6 +3167,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "twox-hash" version = "1.6.3" @@ -2712,7 +3200,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2760,6 +3248,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" @@ -2797,6 +3291,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2825,10 +3328,22 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.95" @@ -2847,7 +3362,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2858,6 +3373,19 @@ version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.72" @@ -2868,6 +3396,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" @@ -2898,6 +3436,36 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3013,9 +3581,15 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 91b0d11..faa6721 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,11 +29,13 @@ rust-version = "1.62" build = "build.rs" [dependencies] +bytes = "1.8.0" datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] } datafusion-proto = "42.0.0" futures = "0.3" glob = "0.3.1" log = "0.4" +object_store = { version = "0.11.1", features = ["aws"] } prost = "0.13" pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] } tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] } diff --git a/src/context.rs b/src/context.rs index 86c1b3a..1e063ee 100644 --- a/src/context.rs +++ b/src/context.rs @@ -129,8 +129,7 @@ pub fn execute_partition( py: Python, ) -> PyResult { let plan = deserialize_execution_plan(plan_bytes)?; - _execute_partition(plan, part) - .unwrap() + _execute_partition(plan, part)? .into_iter() .map(|batch| batch.to_pyarrow(py)) .collect() diff --git a/src/planner.rs b/src/planner.rs index 7d9fdf0..d93b5fa 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -18,7 +18,7 @@ use crate::query_stage::PyQueryStage; use crate::query_stage::QueryStage; use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec}; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -229,9 +229,14 @@ fn create_shuffle_exchange( fn create_temp_dir(stage_id: usize) -> Result { let uuid = Uuid::new_v4(); - let temp_dir = format!("/tmp/ray-sql-{uuid}-stage-{stage_id}"); + let temp_dir = format!("/tmp/ray-sql-{uuid}/{stage_id}"); debug!("Creating temp shuffle dir: {temp_dir}"); - std::fs::create_dir(&temp_dir)?; + std::fs::create_dir_all(&temp_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create shuffle directory {}: {:?}", + temp_dir, e + )) + })?; Ok(temp_dir) } diff --git a/src/shuffle/codec.rs b/src/shuffle/codec.rs index 79af0b8..3928d4c 100644 --- a/src/shuffle/codec.rs +++ b/src/shuffle/codec.rs @@ -30,7 +30,6 @@ use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionC use datafusion_proto::protobuf::{self, PhysicalHashRepartition, PhysicalPlanNode}; use prost::Message; use std::sync::Arc; - #[derive(Debug)] pub struct ShuffleCodec {} diff --git a/src/shuffle/mod.rs b/src/shuffle/mod.rs index 2aeb7c0..4b3c1b1 100644 --- a/src/shuffle/mod.rs +++ b/src/shuffle/mod.rs @@ -21,7 +21,10 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::Result; use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use futures::Stream; +use object_store::aws::AmazonS3Builder; +use object_store::ObjectStore; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tokio::macros::support::thread_rng_n; @@ -96,3 +99,27 @@ impl Stream for CombinedRecordBatchStream { } } } + +pub(crate) fn create_object_store() -> Result> { + let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_default(); + let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default(); + if access_key_id.is_empty() || secret_access_key.is_empty() { + println!("Warning! AWS_ACCESS_KEY_ID and/or AWS_SECRET_ACCESS_KEY are not defined"); + } + + // TODO configs + let bucket_name = "dfray"; + let region = "us-east-1"; + let endpoint = "http://127.0.0.1:9000"; + + Ok(Arc::new( + AmazonS3Builder::new() + .with_endpoint(endpoint) + .with_allow_http(true) + .with_region(region) + .with_bucket_name(bucket_name) + .with_access_key_id(access_key_id) + .with_secret_access_key(secret_access_key) + .build()?, + )) +} diff --git a/src/shuffle/reader.rs b/src/shuffle/reader.rs index c8cb4da..0fd37e8 100644 --- a/src/shuffle/reader.rs +++ b/src/shuffle/reader.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::shuffle::CombinedRecordBatchStream; +use crate::shuffle::{create_object_store, CombinedRecordBatchStream}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::ipc::reader::FileReader; use datafusion::arrow::record_batch::RecordBatch; @@ -28,15 +28,20 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use futures::Stream; -use glob::glob; +use futures::{Stream, StreamExt}; use log::debug; +use object_store::{path::Path as ObjectStorePath, ObjectStore}; use std::any::Any; use std::fmt::Formatter; use std::fs::File; +use std::io::Write; +use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Instant; +use tokio::runtime::Handle; +use tokio::task::block_in_place; #[derive(Debug)] pub struct ShuffleReaderExec { @@ -110,28 +115,71 @@ impl ExecutionPlan for ShuffleReaderExec { &self, partition: usize, _context: Arc, - ) -> datafusion::common::Result { - let pattern = format!( - "/{}/shuffle_{}_*_{partition}.arrow", - self.shuffle_dir, self.stage_id - ); + ) -> Result { let mut streams: Vec = vec![]; - for entry in glob(&pattern).expect("Failed to read glob pattern") { - let file = entry.unwrap(); + + for input_part in 0..self.properties.partitioning.partition_count() { + let file = format!( + "{}/shuffle_{}_{}_{partition}.arrow", + self.shuffle_dir, self.stage_id, input_part + ); debug!( "ShuffleReaderExec partition {} reading from stage {} file {}", - partition, - self.stage_id, - file.display() + partition, self.stage_id, file ); - let reader = FileReader::try_new(File::open(&file)?, None)?; - let stream = LocalShuffleStream::new(reader); - if self.schema != stream.schema() { - return Err(DataFusionError::Internal( - "Not all shuffle files have the same schema".to_string(), - )); + + let object_path = ObjectStorePath::from(file.as_str()); + let object_store = create_object_store()?; + + let result: Result> = block_in_place(move || { + Handle::current().block_on(async move { + match object_store.get(&object_path).await { + Ok(get_result) => { + println!("Downloading {file} from object storage"); + let start = Instant::now(); + let mut local_file = File::create(&file).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleReaderExec failed to create file: {}", + e + )) + })?; + let mut stream = get_result.into_stream(); + let mut total_bytes = 0; + while let Some(chunk) = stream.next().await { + let bytes = chunk?; + total_bytes += bytes.len(); + local_file.write_all(&bytes)?; + } + let end = Instant::now(); + println!( + "Downloaded {file} with {total_bytes} bytes in {:?}", + end.duration_since(start) + ); + println!("Deleting {} from object storage", object_path); + //object_store.delete(&object_path).await?; + Ok(Some(LocalShuffleStream::new( + PathBuf::from(&file), + self.schema.clone(), + ))) + } + Err(e) => { + let error_message = e.to_string(); + if error_message.contains("NotFound") + || error_message.contains("NoSuchKey") + { + // this is fine + } else { + println!("Download failed: {}", e); + } + Ok(None) + } + } + }) + }); + + if let Some(stream) = result? { + streams.push(Box::pin(stream)); } - streams.push(Box::pin(stream)); } Ok(Box::pin(CombinedRecordBatchStream::new( self.schema.clone(), @@ -147,7 +195,7 @@ impl ExecutionPlan for ShuffleReaderExec { "shuffle reader" } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &PlanProperties { &self.properties } } @@ -164,28 +212,69 @@ impl DisplayAs for ShuffleReaderExec { } struct LocalShuffleStream { - reader: FileReader, + file: PathBuf, + reader: Option>, + /// The output schema of the query stage being read from + schema: SchemaRef, } impl LocalShuffleStream { - pub fn new(reader: FileReader) -> Self { - LocalShuffleStream { reader } + pub fn new(file: PathBuf, schema: SchemaRef) -> Self { + LocalShuffleStream { + file, + schema, + reader: None, + } } } impl Stream for LocalShuffleStream { - type Item = datafusion::error::Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - if let Some(batch) = self.reader.next() { - return Poll::Ready(Some(batch.map_err(|e| e.into()))); + if self.reader.is_none() { + // download the file from object storage + + let file = File::open(&self.file).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleReaderExec failed to open file {}: {}", + self.file.display(), + e + )) + })?; + self.reader = Some(FileReader::try_new(file, None).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to open IPC file {}: {:?}", + self.file.display(), + e + )) + })?); + + // TODO reinstate + // if self.schema != stream.schema() { + // return Err(DataFusionError::Internal( + // "Not all shuffle files have the same schema".to_string(), + // )); + // } + } + if let Some(reader) = self.reader.as_mut() { + if let Some(batch) = reader.next() { + return Poll::Ready(Some(batch.map_err(|e| { + DataFusionError::Execution(format!( + "Error reading batch from Arrow IPC file: {:?}", + e + )) + }))); + } + Poll::Ready(None) + } else { + unreachable!() } - Poll::Ready(None) } } impl RecordBatchStream for LocalShuffleStream { fn schema(&self) -> SchemaRef { - self.reader.schema() + self.schema.clone() } } diff --git a/src/shuffle/writer.rs b/src/shuffle/writer.rs index 069f99d..80b2988 100644 --- a/src/shuffle/writer.rs +++ b/src/shuffle/writer.rs @@ -15,34 +15,39 @@ // specific language governing permissions and limitations // under the License. +use crate::shuffle::create_object_store; +use bytes::Bytes; use datafusion::arrow::array::Int32Array; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty::pretty_format_batches; -use datafusion::common::{Result, Statistics}; +use datafusion::common::{DataFusionError, Result, Statistics}; use datafusion::execution::context::TaskContext; +use datafusion::execution::RecordBatchStream; use datafusion::physical_expr::expressions::UnKnownColumn; use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_plan::common::IPCWriter; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion::physical_plan::repartition::BatchPartitioner; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ metrics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, + SendableRecordBatchStream, }; use datafusion_proto::protobuf::PartitionStats; use futures::StreamExt; use futures::TryStreamExt; use log::debug; +use object_store::{path::Path as ObjectStorePath, MultipartUpload, ObjectStore, PutPayload}; use std::any::Any; use std::fmt::Formatter; use std::fs::File; -use std::path::Path; +use std::io::Read; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; +use std::time::Instant; #[derive(Debug)] pub struct ShuffleWriterExec { @@ -122,7 +127,10 @@ impl ExecutionPlan for ShuffleWriterExec { self.stage_id ); + let object_store = create_object_store()?; + let mut stream = self.plan.execute(input_partition, context)?; + let write_time = MetricBuilder::new(&self.metrics).subset_time("write_time", input_partition); let repart_time = @@ -140,17 +148,31 @@ impl ExecutionPlan for ShuffleWriterExec { } Partitioning::UnknownPartitioning(_) => { // stream the results from the query, preserving the input partitioning - let file = - format!("/{shuffle_dir}/shuffle_{stage_id}_{input_partition}_0.arrow"); - debug!("Executing query and writing results to {file}"); - let stats = write_stream_to_disk(&mut stream, &file, &write_time).await?; + let path = + format!("{shuffle_dir}/shuffle_{stage_id}_{input_partition}_0.arrow"); + let path = Path::new(&path); + debug!( + "ShuffleWriterExec[stage={}] Writing results to {:?}", + stage_id, path + ); + + let mut rows = 0; + let mut writer = + IPCWriter::new(object_store.clone(), path, stream.schema().as_ref())?; + while let Some(result) = stream.next().await { + let input_batch = result?; + rows += input_batch.num_rows(); + writer.write(&input_batch)?; + } + writer.finish().await?; + debug!( "Query completed. Shuffle write time: {}. Rows: {}.", - write_time, stats.num_rows + write_time, rows ); } Partitioning::Hash(_, _) => { - // we won't necessary produce output for every possible partition, so we + // we won't necessarily produce output for every possible partition, so we // create writers on demand let mut writers: Vec> = vec![]; for _ in 0..partition_count { @@ -172,8 +194,6 @@ impl ExecutionPlan for ShuffleWriterExec { pretty_format_batches(&[input_batch.clone()])? ); - //write_metrics.input_rows.add(input_batch.num_rows()); - partitioner.partition(input_batch, |output_partition, output_batch| { match &mut writers[output_partition] { Some(w) => { @@ -181,12 +201,12 @@ impl ExecutionPlan for ShuffleWriterExec { } None => { let path = format!( - "/{shuffle_dir}/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow", + "{shuffle_dir}/shuffle_{stage_id}_{input_partition}_{output_partition}.arrow", ); let path = Path::new(&path); debug!("ShuffleWriterExec[stage={}] Writing results to {:?}", stage_id, path); - let mut writer = IPCWriter::new(path, stream.schema().as_ref())?; + let mut writer = IPCWriter::new(object_store.clone(), path, stream.schema().as_ref())?; writer.write(&output_batch)?; writers[output_partition] = Some(writer); @@ -199,7 +219,7 @@ impl ExecutionPlan for ShuffleWriterExec { for (i, w) in writers.iter_mut().enumerate() { match w { Some(w) => { - w.finish()?; + w.finish().await?; debug!( "ShuffleWriterExec[stage={}] Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.", stage_id, @@ -268,43 +288,140 @@ impl DisplayAs for ShuffleWriterExec { } } -/// Stream data to disk in Arrow IPC format -pub async fn write_stream_to_disk( - stream: &mut Pin>, - path: &str, - disk_write_metric: &metrics::Time, -) -> Result { - let file = File::create(path).unwrap(); - - /*.map_err(|e| { - error!("Failed to create partition file at {}: {:?}", path, e); - BallistaError::IoError(e) - })?;*/ - - let mut num_rows = 0; - let mut num_batches = 0; - let mut num_bytes = 0; - let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?; - - while let Some(result) = stream.next().await { - let batch = result?; - - let batch_size_bytes: usize = batch.get_array_memory_size(); - num_batches += 1; - num_rows += batch.num_rows(); - num_bytes += batch_size_bytes; - - let timer = disk_write_metric.timer(); - writer.write(&batch)?; - timer.done(); +struct IPCWriter { + /// object store + object_store: Arc, + /// path + pub path: PathBuf, + /// inner writer + pub writer: FileWriter, + /// batches written + pub num_batches: usize, + /// rows written + pub num_rows: usize, + /// bytes written + pub num_bytes: usize, +} + +impl IPCWriter { + /// Create new writer + pub fn new(object_store: Arc, path: &Path, schema: &Schema) -> Result { + let file = File::create(path).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleWriterExec failed to create partition file at {path:?}: {e:?}" + )) + })?; + Ok(Self { + object_store, + num_batches: 0, + num_rows: 0, + num_bytes: 0, + path: path.into(), + writer: FileWriter::try_new(file, schema).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleWriterExec failed to create IPC file: {e:?}" + )) + })?, + }) + } + + /// Write one single batch + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleWriterExec failed to write IPC batch: {e:?}" + )) + })?; + self.num_batches += 1; + self.num_rows += batch.num_rows(); + let num_bytes: usize = batch.get_array_memory_size(); + self.num_bytes += num_bytes; + Ok(()) + } + + /// Finish the writer + pub async fn finish(&mut self) -> Result<()> { + self.writer.finish().map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleWriterExec failed to finish writing IPC file: {e:?}" + )) + })?; + + if self.num_batches > 0 { + // upload to object storage + let mut file = File::open(&self.path).map_err(|e| { + DataFusionError::Execution(format!( + "ShuffleWriterExec failed to open file {}: {}", + self.path.display(), + e + )) + })?; + + let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?; + + const MULTIPART_CHUNK_SIZE: usize = 5 * 1024 * 1024; + + let start = Instant::now(); + if self.num_bytes > MULTIPART_CHUNK_SIZE { + // use multipart put for larger files + println!( + "Uploading shuffle file {} containing {} bytes (put_multipart)", + &self.path.display(), + self.num_bytes + ); + let mut buffer = vec![0; MULTIPART_CHUNK_SIZE]; + let mut writer = self.object_store.put_multipart(&object_store_path).await?; + let mut total_bytes = 0; + loop { + let bytes_read = read_full_buffer(&mut file, &mut buffer)?; + total_bytes += bytes_read; + if bytes_read == 0 { + break; + } + let bytes = Bytes::copy_from_slice(&buffer[..bytes_read]); + writer.put_part(PutPayload::from(bytes)).await?; + } + assert!(total_bytes > 0); + let _put_result = writer.complete().await?; + } else { + println!( + "Uploading shuffle file {} containing {} bytes (put)", + &self.path.display(), + self.num_bytes + ); + + let mut buffer = Vec::with_capacity(self.num_bytes); + let bytes_read = file.read_to_end(&mut buffer)?; + assert!(bytes_read > 0); + let bytes = Bytes::copy_from_slice(&buffer[..bytes_read]); + self.object_store + .put(&object_store_path, PutPayload::from(bytes)) + .await?; + } + let end = Instant::now(); + println!("File upload took {:?}", end.duration_since(start)); + } + + std::fs::remove_file(&self.path).map_err(|e| { + println!("ShuffleWriterExec failed to delete file: {}", e); + e + })?; + Ok(()) + } + + /// Path write to + pub fn path(&self) -> &Path { + &self.path + } +} + +fn read_full_buffer(file: &mut File, buffer: &mut [u8]) -> Result { + let mut total_read = 0; + while total_read < buffer.len() { + match file.read(&mut buffer[total_read..])? { + 0 => break, + bytes_read => total_read += bytes_read, + } } - let timer = disk_write_metric.timer(); - writer.finish()?; - timer.done(); - Ok(PartitionStats { - num_rows: num_rows as i64, - num_batches: num_batches as i64, - num_bytes: num_bytes as i64, - column_stats: vec![], - }) + Ok(total_read) }