diff --git a/src/azure_extension.cpp b/src/azure_extension.cpp index 4f5fc34..83c2412 100644 --- a/src/azure_extension.cpp +++ b/src/azure_extension.cpp @@ -71,6 +71,27 @@ static AzureAuthentication ParseAzureAuthSettings(FileOpener *opener) { return auth; } +static AzureReadOptions ParseAzureReadOptions(FileOpener *opener) { + AzureReadOptions options; + + Value concurrency_val; + if (FileOpener::TryGetCurrentSetting(opener, "azure_read_transfer_concurrency", concurrency_val)) { + options.transfer_concurrency = concurrency_val.GetValue(); + } + + Value chunk_size_val; + if (FileOpener::TryGetCurrentSetting(opener, "azure_read_transfer_chunk_size", chunk_size_val)) { + options.transfer_chunk_size = chunk_size_val.GetValue(); + } + + Value buffer_size_val; + if (FileOpener::TryGetCurrentSetting(opener, "azure_read_buffer_size", buffer_size_val)) { + options.buffer_size = buffer_size_val.GetValue(); + } + + return options; +} + static Azure::Storage::Blobs::BlobContainerClient GetContainerClient(AzureAuthentication &auth, AzureParsedUrl &url) { if (!auth.connection_string.empty()) { return Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(auth.connection_string, @@ -108,9 +129,10 @@ Azure::Storage::Blobs::BlobClient *BlobClientWrapper::GetClient() { }; AzureStorageFileHandle::AzureStorageFileHandle(FileSystem &fs, string path_p, uint8_t flags, AzureAuthentication &auth, - AzureParsedUrl parsed_url) + const AzureReadOptions &read_options, AzureParsedUrl parsed_url) : FileHandle(fs, std::move(path_p)), flags(flags), length(0), last_modified(time_t()), buffer_available(0), - buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0), blob_client(auth, parsed_url) { + buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0), blob_client(auth, parsed_url), + read_options(read_options) { try { auto client = *blob_client.GetClient(); auto res = client.GetProperties(); @@ -121,7 +143,7 @@ AzureStorageFileHandle::AzureStorageFileHandle(FileSystem &fs, string path_p, ui } if (flags & FileFlags::FILE_FLAGS_READ) { - read_buffer = duckdb::unique_ptr(new data_t[READ_BUFFER_LEN]); + read_buffer = duckdb::unique_ptr(new data_t[read_options.buffer_size]); } } @@ -133,8 +155,9 @@ unique_ptr AzureStorageFileSystem::CreateHandle(const st auto parsed_url = ParseUrl(path); auto azure_auth = ParseAzureAuthSettings(opener); + auto read_options = ParseAzureReadOptions(opener); - return make_uniq(*this, path, flags, azure_auth, parsed_url); + return make_uniq(*this, path, flags, azure_auth, read_options, parsed_url); } unique_ptr AzureStorageFileSystem::OpenFile(const string &path, uint8_t flags, FileLockType lock, @@ -193,6 +216,23 @@ static void LoadInternal(DatabaseInstance &instance) { config.AddExtensionOption("azure_endpoint", "Override the azure endpoint for when the Azure credential providers are used.", LogicalType::VARCHAR, "blob.core.windows.net"); + + AzureReadOptions default_read_options; + config.AddExtensionOption("azure_read_transfer_concurrency", + "Maximum number of threads the Azure client can use for a single parallel read. " + "If azure_read_transfer_chunk_size is less than azure_read_buffer_size then setting " + "this > 1 will allow the Azure client to do concurrent requests to fill the buffer.", + LogicalType::INTEGER, Value::INTEGER(default_read_options.transfer_concurrency)); + + config.AddExtensionOption("azure_read_transfer_chunk_size", + "Maximum size in bytes that the Azure client will read in a single request. " + "It is recommended that this is a factor of azure_read_buffer_size.", + LogicalType::BIGINT, Value::BIGINT(default_read_options.transfer_chunk_size)); + + config.AddExtensionOption("azure_read_buffer_size", + "Size of the read buffer. It is recommended that this is evenly divisible by " + "azure_read_transfer_chunk_size.", + LogicalType::UBIGINT, Value::UBIGINT(default_read_options.buffer_size)); } int64_t AzureStorageFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) { @@ -321,7 +361,7 @@ void AzureStorageFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_b } if (to_read > 0 && hfh.buffer_available == 0) { - auto new_buffer_available = MinValue(hfh.READ_BUFFER_LEN, hfh.length - hfh.file_offset); + auto new_buffer_available = MinValue(hfh.read_options.buffer_size, hfh.length - hfh.file_offset); // Bypass buffer if we read more than buffer size if (to_read > new_buffer_available) { @@ -366,6 +406,9 @@ void AzureStorageFileSystem::ReadRange(FileHandle &handle, idx_t file_offset, ch range.Length = buffer_out_len; Azure::Storage::Blobs::DownloadBlobToOptions options; options.Range = range; + options.TransferOptions.Concurrency = afh.read_options.transfer_concurrency; + options.TransferOptions.InitialChunkSize = afh.read_options.transfer_chunk_size; + options.TransferOptions.ChunkSize = afh.read_options.transfer_chunk_size; auto res = blob_client.DownloadTo((uint8_t *)buffer_out, buffer_out_len, options); } catch (Azure::Storage::StorageException &e) { diff --git a/src/include/azure_extension.hpp b/src/include/azure_extension.hpp index 20e44b4..1c46b1a 100644 --- a/src/include/azure_extension.hpp +++ b/src/include/azure_extension.hpp @@ -28,6 +28,12 @@ struct AzureAuthentication { string endpoint; }; +struct AzureReadOptions { + int32_t transfer_concurrency = 5; + int64_t transfer_chunk_size = 1 * 1024 * 1024; + idx_t buffer_size = 1 * 1024 * 1024; +}; + struct AzureParsedUrl { string container; string prefix; @@ -47,7 +53,7 @@ class BlobClientWrapper { class AzureStorageFileHandle : public FileHandle { public: AzureStorageFileHandle(FileSystem &fs, string path, uint8_t flags, AzureAuthentication &auth, - AzureParsedUrl parsed_url); + const AzureReadOptions &read_options, AzureParsedUrl parsed_url); ~AzureStorageFileHandle() override = default; public: @@ -67,10 +73,12 @@ class AzureStorageFileHandle : public FileHandle { // Read buffer duckdb::unique_ptr read_buffer; - constexpr static idx_t READ_BUFFER_LEN = 1000000; // Azure Blob Client BlobClientWrapper blob_client; + + AzureReadOptions read_options; + }; class AzureStorageFileSystem : public FileSystem {