From 31750d74f3e905415c08a5205004b33f85785393 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Tue, 30 Apr 2024 23:53:37 +0000 Subject: [PATCH 1/4] tmp --- storage-node/Cargo.toml | 2 + storage-node/src/bin/storage_node.rs | 15 ++-- .../src/cache/data_store_cache/memdisk/mod.rs | 9 +- storage-node/src/cache/replacer/mod.rs | 82 +++++++++++++++++++ storage-node/src/common/config.rs | 43 ++++++++++ storage-node/src/common/mod.rs | 1 + storage-node/src/server.rs | 82 ++++++++++++++++--- storage-node/src/storage_manager.rs | 10 +-- 8 files changed, 215 insertions(+), 29 deletions(-) create mode 100644 storage-node/src/common/config.rs diff --git a/storage-node/Cargo.toml b/storage-node/Cargo.toml index 21ed460..a71c67a 100644 --- a/storage-node/Cargo.toml +++ b/storage-node/Cargo.toml @@ -26,6 +26,8 @@ rusqlite = { version = "0.31", features = ["blob"] } log = "0.4" env_logger = "0.11" crc32fast = "1.4.0" +clap = { version = "4.5", features = ["derive"] } +serde = { version = "1", features = ["derive"] } [dev-dependencies] serial_test = "3.1" diff --git a/storage-node/src/bin/storage_node.rs b/storage-node/src/bin/storage_node.rs index 8bae40a..e9df926 100644 --- a/storage-node/src/bin/storage_node.rs +++ b/storage-node/src/bin/storage_node.rs @@ -1,13 +1,16 @@ use log::info; -use storage_node::server::storage_node_serve; - -// TODO: Add config here. +use storage_node::{common::config::ParpulseConfig, server::storage_node_serve}; #[tokio::main] async fn main() { - let _ = env_logger::builder() + // Init log. + if let Err(e) = env_logger::builder() .filter_level(log::LevelFilter::Info) - .try_init(); + .try_init() + { + println!("Failed to init logger: {:?}", e); + } info!("starting storage node server..."); - storage_node_serve("0.0.0.0", 3030).await.unwrap(); + let config = ParpulseConfig::parse(); + storage_node_serve("0.0.0.0", 3030, config).await.unwrap(); } diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 7d8b5b1..7eb9099 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -22,11 +22,6 @@ use tokio::sync::mpsc::Receiver; use self::data_store::{disk::DiskStore, memory::MemStore}; -/// The default maximum single file size for the memory cache. -/// If the file size exceeds this value, the file will be stored in the disk cache. -/// TODO(lanlou): make this value configurable. -pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; - /// [`MemDiskStoreReplacerKey`] is a path to the remote object store. pub type MemDiskStoreReplacerKey = String; @@ -113,8 +108,8 @@ impl> let disk_store = DiskStore::new(disk_manager, disk_base_path); if mem_replacer.is_some() { - let mut mem_max_file_size = - mem_max_file_size.unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE); + debug_assert!(mem_max_file_size.is_some()); + let mut mem_max_file_size = mem_max_file_size.unwrap(); let replacer_max_capacity = mem_replacer.as_ref().unwrap().max_capacity(); if mem_max_file_size > replacer_max_capacity { warn!("The maximum file size > replacer's max capacity, so we set maximum file size = 1/5 of the maximum capacity."); diff --git a/storage-node/src/cache/replacer/mod.rs b/storage-node/src/cache/replacer/mod.rs index bdf9176..c0b8e80 100644 --- a/storage-node/src/cache/replacer/mod.rs +++ b/storage-node/src/cache/replacer/mod.rs @@ -3,6 +3,13 @@ pub mod lru_k; use std::fmt::Debug; use std::hash::Hash; +use self::{lru::LruReplacer, lru_k::LruKReplacer}; + +use super::data_store_cache::{ + memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue}, + sqlite::{SqliteStoreReplacerKey, SqliteStoreReplacerValue}, +}; + /// [`ReplacerKey`] is the key type for data store replacers using different /// policies in the system. pub trait ReplacerKey: Hash + Eq + Clone + Debug + Send + Sync {} @@ -60,6 +67,81 @@ pub trait DataStoreReplacer: Send + Sync { fn clear(&mut self); } +pub enum ParpulseDataStoreReplacerKey { + MemDiskStoreReplacerKey(MemDiskStoreReplacerKey), + SqliteStoreReplacerKey(SqliteStoreReplacerKey), +} + +pub enum ParpulseDataStoreReplacerValue { + MemDiskStoreReplacerValue(MemDiskStoreReplacerValue), + SqliteStoreReplacerValue(SqliteStoreReplacerValue), +} + +pub enum ParpulseDataStoreReplacer { + MemDiskLruReplacer(LruReplacer), + MemDiskLruKReplacer(LruKReplacer), + SqliteLruReplacer(LruReplacer), + SqliteLruKReplacer(LruKReplacer), +} + +impl DataStoreReplacer + for ParpulseDataStoreReplacer +{ + fn get(&mut self, key: &K) -> Option<&V> { + match self { + ParpulseDataStoreReplacer::MemDiskLruReplacer(replacer) => replacer.get(key), + ParpulseDataStoreReplacer::MemDiskLruKReplacer(replacer) => replacer.get(key), + ParpulseDataStoreReplacer::SqliteLruReplacer(replacer) => replacer.get(key), + ParpulseDataStoreReplacer::SqliteLruKReplacer(replacer) => replacer.get(key), + } + } + + fn put(&mut self, key: K, value: V) -> Option> { + match self { + ParpulseDataStoreReplacer::MemDiskLruReplacer(replacer) => replacer.put(key, value), + ParpulseDataStoreReplacer::MemDiskLruKReplacer(replacer) => replacer.put(key, value), + ParpulseDataStoreReplacer::SqliteLruReplacer(replacer) => replacer.put(key, value), + ParpulseDataStoreReplacer::SqliteLruKReplacer(replacer) => replacer.put(key, value), + } + } + + fn pin(&mut self, key: &K, count: usize) -> bool { + todo!() + } + + fn unpin(&mut self, key: &K) -> bool { + todo!() + } + + fn peek(&self, key: &K) -> Option<&V> { + todo!() + } + + fn len(&self) -> usize { + todo!() + } + + fn size(&self) -> usize { + todo!() + } + + fn is_empty(&self) -> bool { + todo!() + } + + fn max_capacity(&self) -> usize { + todo!() + } + + fn set_max_capacity(&mut self, capacity: usize) { + todo!() + } + + fn clear(&mut self) { + todo!() + } +} + #[cfg(test)] mod tests { use super::ReplacerValue; diff --git a/storage-node/src/common/config.rs b/storage-node/src/common/config.rs new file mode 100644 index 0000000..838ee77 --- /dev/null +++ b/storage-node/src/common/config.rs @@ -0,0 +1,43 @@ +use clap::Parser; +use serde::Serialize; + +#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ParpulseConfigDataStore { + #[default] + Memdisk, + Disk, + Sqlite, +} + +#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ParpulseConfigCachePolicy { + #[default] + Lru, + Lruk, +} + +#[derive(Parser)] +pub struct ParpulseConfig { + #[clap(short, long, default_value_t, value_enum)] + pub cache_policy: ParpulseConfigCachePolicy, + + #[clap(short, long, default_value = None)] + pub cache_lru_k: Option, + + #[clap(short, long, default_value_t, value_enum)] + pub data_store: ParpulseConfigDataStore, + + #[clap(short, long, default_value = "3")] + pub data_store_cache_num: usize, + + #[clap(short, long, default_value = None)] + pub mem_cache_size: Option, + + #[clap(short, long, default_value = None)] + pub disk_cache_size: Option, + + #[clap(short, long, default_value = None)] + pub cache_path: Option, +} diff --git a/storage-node/src/common/mod.rs b/storage-node/src/common/mod.rs index ec5d33c..0f2c4c4 100644 --- a/storage-node/src/common/mod.rs +++ b/storage-node/src/common/mod.rs @@ -1 +1,2 @@ +pub mod config; pub mod hash; diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index 7aa79b2..beceb06 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -6,22 +6,73 @@ use tokio_stream::wrappers::ReceiverStream; use warp::{Filter, Rejection}; use crate::{ - cache::{data_store_cache::memdisk::MemDiskStoreCache, replacer::lru::LruReplacer}, - error::ParpulseResult, + cache::{data_store_cache::memdisk::{MemDiskStoreCache, MemDiskStoreReplacerValue}, replacer::{lru::LruReplacer, lru_k::LruKReplacer, DataStoreReplacer}}, + common::config::{ParpulseConfig, ParpulseConfigCachePolicy, ParpulseConfigDataStore}, + error::{ParpulseError, ParpulseResult}, storage_manager::StorageManager, }; -const CACHE_BASE_PATH: &str = "cache/"; -const DATA_STORE_CACHE_NUMBER: usize = 6; - -pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> { +const CACHE_BASE_PATH: &str = "parpulse-cache"; +const DATA_STORE_CACHE_NUMBER: usize = 3; +const DEFAULT_MEM_CACHE_SIZE: usize = 100 * 1024; +const DEFAULT_DISK_CACHE_SIZE: usize = 100 * 1024 * 1024; +const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 10 * 1024; +const DEFAULT_LRU_K_VALUE: usize = 2; + +pub async fn storage_node_serve( + ip_addr: &str, + port: u16, + config: ParpulseConfig, +) -> ParpulseResult<()> { // Should at least be able to store one 100MB file in the cache. // TODO: Read the type of the cache from config. - let dummy_size_per_disk_cache = 100 * 1024 * 1024; - let dummy_size_per_mem_cache = 100 * 1024; - let dummy_mem_max_file_cache = 10 * 1024; - let mut data_store_caches = Vec::new(); + + let storage_manager = match config.data_store { + ParpulseConfigDataStore::Memdisk => { + let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); + let mem_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_MEM_CACHE_SIZE); + let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); + for i in 0..config.data_store_cache_num { + match config.cache_policy { + ParpulseConfigCachePolicy::Lru => { + let mut data_store_caches = Vec::new(); + let disk_replacer = LruReplacer::new(disk_cache_size); + let mem_replacer = LruReplacer::new(mem_cache_size); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + Some(mem_replacer), + Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + ); + data_store_caches.push(data_store_cache); + Arc::new(StorageManager::new(data_store_caches)) + }, + ParpulseConfigCachePolicy::Lruk => { + let mut data_store_caches = Vec::new(); + let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); + let disk_replacer = LruKReplacer::new(disk_cache_size, k); + let mem_replacer = LruKReplacer::new(mem_cache_size, k); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + Some(mem_replacer), + Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + ); + data_store_caches.push(data_store_cache); + Arc::new(StorageManager::new(data_store_caches)) + } + } ; + + } + } + ParpulseConfigDataStore::Disk => { + let disk_cache_size = config.disk_cache_size.ok_or(ParpulseError::MissingConfig("disk-cache-size".to_string()))?; + }, + ParpulseConfigDataStore::Sqlite => { + todo!(); + } + } for i in 0..DATA_STORE_CACHE_NUMBER { let disk_replacer = LruReplacer::new(dummy_size_per_disk_cache); @@ -37,7 +88,7 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> let is_mem_disk_cache = true; // TODO: try to use more fine-grained lock instead of locking the whole storage_manager - let storage_manager = Arc::new(StorageManager::new(data_store_caches)); + let storage_manager =; let route = warp::path!("file") .and(warp::path::end()) @@ -111,6 +162,8 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> #[cfg(test)] mod tests { + use crate::common::config::ParpulseConfigDataStore; + use super::*; use reqwest::Client; use std::fs; @@ -121,6 +174,13 @@ mod tests { #[tokio::test] async fn test_server() { let original_file_path = "tests/parquet/userdata1.parquet"; + let config = ParpulseConfig { + data_store: ParpulseConfigDataStore::Memdisk, + data_store_cache_num: todo!(), + mem_cache_size: todo!(), + disk_cache_size: todo!(), + cache_path: todo!(), + } // Start the server let server_handle = tokio::spawn(async move { diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index c5a867e..c803510 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -1,5 +1,5 @@ use crate::{ - cache::data_store_cache::DataStoreCache, + cache::{data_store_cache::DataStoreCache, replacer::ParpulseDataStoreReplacer}, common::hash::calculate_hash_crc32fast, error::ParpulseResult, storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader}, @@ -16,13 +16,13 @@ use tokio::sync::mpsc::Receiver; /// which should be responsible for handling multiple requests at the /// same time. -pub struct StorageManager { +pub struct StorageManager { /// We don't use lock here because `data_store_cache` itself should handle the concurrency. - data_store_caches: Vec, + data_store_caches: Vec, } -impl StorageManager { - pub fn new(data_store_caches: Vec) -> Self { +impl StorageManager { + pub fn new(data_store_caches: Vec) -> Self { Self { data_store_caches } } From 35329539fa31dba404b3e1621d003b4a800c0324 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 1 May 2024 14:04:58 +0000 Subject: [PATCH 2/4] introduce config --- storage-node/src/bin/storage_node.rs | 1 + storage-node/src/cache/replacer/mod.rs | 82 --------- storage-node/src/common/config.rs | 23 +-- storage-node/src/server.rs | 235 ++++++++++++++++--------- storage-node/src/storage_manager.rs | 50 ++++-- tests/src/client_server_test.rs | 10 +- 6 files changed, 210 insertions(+), 191 deletions(-) diff --git a/storage-node/src/bin/storage_node.rs b/storage-node/src/bin/storage_node.rs index e9df926..2acac95 100644 --- a/storage-node/src/bin/storage_node.rs +++ b/storage-node/src/bin/storage_node.rs @@ -1,3 +1,4 @@ +use clap::Parser; use log::info; use storage_node::{common::config::ParpulseConfig, server::storage_node_serve}; diff --git a/storage-node/src/cache/replacer/mod.rs b/storage-node/src/cache/replacer/mod.rs index c0b8e80..bdf9176 100644 --- a/storage-node/src/cache/replacer/mod.rs +++ b/storage-node/src/cache/replacer/mod.rs @@ -3,13 +3,6 @@ pub mod lru_k; use std::fmt::Debug; use std::hash::Hash; -use self::{lru::LruReplacer, lru_k::LruKReplacer}; - -use super::data_store_cache::{ - memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue}, - sqlite::{SqliteStoreReplacerKey, SqliteStoreReplacerValue}, -}; - /// [`ReplacerKey`] is the key type for data store replacers using different /// policies in the system. pub trait ReplacerKey: Hash + Eq + Clone + Debug + Send + Sync {} @@ -67,81 +60,6 @@ pub trait DataStoreReplacer: Send + Sync { fn clear(&mut self); } -pub enum ParpulseDataStoreReplacerKey { - MemDiskStoreReplacerKey(MemDiskStoreReplacerKey), - SqliteStoreReplacerKey(SqliteStoreReplacerKey), -} - -pub enum ParpulseDataStoreReplacerValue { - MemDiskStoreReplacerValue(MemDiskStoreReplacerValue), - SqliteStoreReplacerValue(SqliteStoreReplacerValue), -} - -pub enum ParpulseDataStoreReplacer { - MemDiskLruReplacer(LruReplacer), - MemDiskLruKReplacer(LruKReplacer), - SqliteLruReplacer(LruReplacer), - SqliteLruKReplacer(LruKReplacer), -} - -impl DataStoreReplacer - for ParpulseDataStoreReplacer -{ - fn get(&mut self, key: &K) -> Option<&V> { - match self { - ParpulseDataStoreReplacer::MemDiskLruReplacer(replacer) => replacer.get(key), - ParpulseDataStoreReplacer::MemDiskLruKReplacer(replacer) => replacer.get(key), - ParpulseDataStoreReplacer::SqliteLruReplacer(replacer) => replacer.get(key), - ParpulseDataStoreReplacer::SqliteLruKReplacer(replacer) => replacer.get(key), - } - } - - fn put(&mut self, key: K, value: V) -> Option> { - match self { - ParpulseDataStoreReplacer::MemDiskLruReplacer(replacer) => replacer.put(key, value), - ParpulseDataStoreReplacer::MemDiskLruKReplacer(replacer) => replacer.put(key, value), - ParpulseDataStoreReplacer::SqliteLruReplacer(replacer) => replacer.put(key, value), - ParpulseDataStoreReplacer::SqliteLruKReplacer(replacer) => replacer.put(key, value), - } - } - - fn pin(&mut self, key: &K, count: usize) -> bool { - todo!() - } - - fn unpin(&mut self, key: &K) -> bool { - todo!() - } - - fn peek(&self, key: &K) -> Option<&V> { - todo!() - } - - fn len(&self) -> usize { - todo!() - } - - fn size(&self) -> usize { - todo!() - } - - fn is_empty(&self) -> bool { - todo!() - } - - fn max_capacity(&self) -> usize { - todo!() - } - - fn set_max_capacity(&mut self, capacity: usize) { - todo!() - } - - fn clear(&mut self) { - todo!() - } -} - #[cfg(test)] mod tests { use super::ReplacerValue; diff --git a/storage-node/src/common/config.rs b/storage-node/src/common/config.rs index 838ee77..9f5ee2a 100644 --- a/storage-node/src/common/config.rs +++ b/storage-node/src/common/config.rs @@ -2,7 +2,6 @@ use clap::Parser; use serde::Serialize; #[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)] -#[serde(rename_all = "kebab-case")] pub enum ParpulseConfigDataStore { #[default] Memdisk, @@ -11,33 +10,35 @@ pub enum ParpulseConfigDataStore { } #[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)] -#[serde(rename_all = "kebab-case")] pub enum ParpulseConfigCachePolicy { #[default] Lru, Lruk, } -#[derive(Parser)] +#[derive(Parser, Default)] pub struct ParpulseConfig { - #[clap(short, long, default_value_t, value_enum)] + #[clap(long, default_value_t, value_enum)] pub cache_policy: ParpulseConfigCachePolicy, - #[clap(short, long, default_value = None)] + #[clap(long, default_value = None)] pub cache_lru_k: Option, - #[clap(short, long, default_value_t, value_enum)] + #[clap(long, default_value_t, value_enum)] pub data_store: ParpulseConfigDataStore, - #[clap(short, long, default_value = "3")] - pub data_store_cache_num: usize, + #[clap( long, default_value = None)] + pub data_store_cache_num: Option, - #[clap(short, long, default_value = None)] + #[clap(long, default_value = None)] pub mem_cache_size: Option, - #[clap(short, long, default_value = None)] + #[clap(long, default_value = None)] pub disk_cache_size: Option, - #[clap(short, long, default_value = None)] + #[clap(long, default_value = None)] + pub sqlite_cache_size: Option, + + #[clap( long, default_value = None)] pub cache_path: Option, } diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index beceb06..8a24643 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -6,90 +6,29 @@ use tokio_stream::wrappers::ReceiverStream; use warp::{Filter, Rejection}; use crate::{ - cache::{data_store_cache::memdisk::{MemDiskStoreCache, MemDiskStoreReplacerValue}, replacer::{lru::LruReplacer, lru_k::LruKReplacer, DataStoreReplacer}}, + cache::{ + data_store_cache::{memdisk::MemDiskStoreCache, sqlite::SqliteStoreCache}, + replacer::{lru::LruReplacer, lru_k::LruKReplacer}, + }, common::config::{ParpulseConfig, ParpulseConfigCachePolicy, ParpulseConfigDataStore}, - error::{ParpulseError, ParpulseResult}, - storage_manager::StorageManager, + error::ParpulseResult, + storage_manager::{StorageManager, StorageManagerImpl}, }; const CACHE_BASE_PATH: &str = "parpulse-cache"; -const DATA_STORE_CACHE_NUMBER: usize = 3; +const DEFAULT_DATA_STORE_CACHE_NUM: usize = 3; const DEFAULT_MEM_CACHE_SIZE: usize = 100 * 1024; const DEFAULT_DISK_CACHE_SIZE: usize = 100 * 1024 * 1024; +const DEFAULT_SQLITE_CACHE_SIZE: usize = 100 * 1024 * 1024; const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 10 * 1024; const DEFAULT_LRU_K_VALUE: usize = 2; -pub async fn storage_node_serve( +async fn route( + storage_manager: Arc, + get_size_before_read: bool, ip_addr: &str, port: u16, - config: ParpulseConfig, -) -> ParpulseResult<()> { - // Should at least be able to store one 100MB file in the cache. - // TODO: Read the type of the cache from config. - - - let storage_manager = match config.data_store { - ParpulseConfigDataStore::Memdisk => { - let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); - let mem_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_MEM_CACHE_SIZE); - let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); - for i in 0..config.data_store_cache_num { - match config.cache_policy { - ParpulseConfigCachePolicy::Lru => { - let mut data_store_caches = Vec::new(); - let disk_replacer = LruReplacer::new(disk_cache_size); - let mem_replacer = LruReplacer::new(mem_cache_size); - let data_store_cache = MemDiskStoreCache::new( - disk_replacer, - i.to_string() + &cache_base_path, - Some(mem_replacer), - Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), - ); - data_store_caches.push(data_store_cache); - Arc::new(StorageManager::new(data_store_caches)) - }, - ParpulseConfigCachePolicy::Lruk => { - let mut data_store_caches = Vec::new(); - let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); - let disk_replacer = LruKReplacer::new(disk_cache_size, k); - let mem_replacer = LruKReplacer::new(mem_cache_size, k); - let data_store_cache = MemDiskStoreCache::new( - disk_replacer, - i.to_string() + &cache_base_path, - Some(mem_replacer), - Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), - ); - data_store_caches.push(data_store_cache); - Arc::new(StorageManager::new(data_store_caches)) - } - } ; - - } - } - ParpulseConfigDataStore::Disk => { - let disk_cache_size = config.disk_cache_size.ok_or(ParpulseError::MissingConfig("disk-cache-size".to_string()))?; - }, - ParpulseConfigDataStore::Sqlite => { - todo!(); - } - } - - for i in 0..DATA_STORE_CACHE_NUMBER { - let disk_replacer = LruReplacer::new(dummy_size_per_disk_cache); - let mem_replace = LruReplacer::new(dummy_size_per_mem_cache); - let data_store_cache = MemDiskStoreCache::new( - disk_replacer, - i.to_string() + CACHE_BASE_PATH, - Some(mem_replace), - Some(dummy_mem_max_file_cache), - ); - data_store_caches.push(data_store_cache); - } - - let is_mem_disk_cache = true; - // TODO: try to use more fine-grained lock instead of locking the whole storage_manager - let storage_manager =; - +) { let route = warp::path!("file") .and(warp::path::end()) .and(warp::query::()) @@ -114,7 +53,9 @@ pub async fn storage_node_serve( } else { RequestParams::S3((bucket, vec![keys])) }; - let result = storage_manager.get_data(request, is_mem_disk_cache).await; + let result = storage_manager + .get_data(request, !get_size_before_read) + .await; match result { Ok(data_rx) => { let stream = ReceiverStream::new(data_rx); @@ -156,6 +97,137 @@ pub async fn storage_node_serve( let routes = route.or(heartbeat).or(catch_all); let ip_addr: IpAddr = ip_addr.parse().unwrap(); warp::serve(routes).run((ip_addr, port)).await; +} + +pub async fn storage_node_serve( + ip_addr: &str, + port: u16, + config: ParpulseConfig, +) -> ParpulseResult<()> { + let data_store_cache_num = config + .data_store_cache_num + .unwrap_or(DEFAULT_DATA_STORE_CACHE_NUM); + match config.data_store { + ParpulseConfigDataStore::Memdisk => { + let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); + let mem_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_MEM_CACHE_SIZE); + let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); + match config.cache_policy { + ParpulseConfigCachePolicy::Lru => { + info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}", data_store_cache_num, disk_cache_size, mem_cache_size); + let mut data_store_caches = Vec::new(); + for i in 0..data_store_cache_num { + let disk_replacer = LruReplacer::new(disk_cache_size); + let mem_replacer = LruReplacer::new(mem_cache_size); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + Some(mem_replacer), + Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, false, ip_addr, port).await; + } + ParpulseConfigCachePolicy::Lruk => { + info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}", data_store_cache_num, disk_cache_size, mem_cache_size); + let mut data_store_caches = Vec::new(); + let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); + for i in 0..data_store_cache_num { + let disk_replacer = LruKReplacer::new(disk_cache_size, k); + let mem_replacer = LruKReplacer::new(mem_cache_size, k); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + Some(mem_replacer), + Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, false, ip_addr, port).await; + } + }; + } + ParpulseConfigDataStore::Disk => { + let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); + let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); + match config.cache_policy { + ParpulseConfigCachePolicy::Lru => { + info!("starting storage node with {} disk-only cache(s) and LRU cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size); + let mut data_store_caches = Vec::new(); + for i in 0..data_store_cache_num { + let disk_replacer = LruReplacer::new(disk_cache_size); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + None, + None, + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, false, ip_addr, port).await; + } + ParpulseConfigCachePolicy::Lruk => { + info!("starting storage node with {} disk-only cache(s) and LRU-K cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size); + let mut data_store_caches = Vec::new(); + let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); + for i in 0..data_store_cache_num { + let disk_replacer = LruKReplacer::new(disk_cache_size, k); + let data_store_cache = MemDiskStoreCache::new( + disk_replacer, + i.to_string() + &cache_base_path, + None, + None, + ); + data_store_caches.push(data_store_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, false, ip_addr, port).await; + } + } + } + ParpulseConfigDataStore::Sqlite => { + let sqlite_base_path = + config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()) + "sqlite.db3"; + let sqlite_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_SQLITE_CACHE_SIZE); + match config.cache_policy { + ParpulseConfigCachePolicy::Lru => { + info!("starting storage node with {} sqlite cache(s) and LRU cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size); + let mut data_store_caches = Vec::new(); + for i in 0..data_store_cache_num { + let replacer = LruReplacer::new(sqlite_cache_size); + let sqlite_data_cache = SqliteStoreCache::new( + replacer, + i.to_string() + &sqlite_base_path, + None, + )?; + data_store_caches.push(sqlite_data_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, true, ip_addr, port).await; + } + ParpulseConfigCachePolicy::Lruk => { + info!("starting storage node with {} sqlite cache(s) and LRU-K cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size); + let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); + let mut data_store_caches = Vec::new(); + for i in 0..data_store_cache_num { + let replacer = LruKReplacer::new(sqlite_cache_size, k); + let sqlite_data_cache = SqliteStoreCache::new( + replacer, + i.to_string() + &sqlite_base_path, + None, + )?; + data_store_caches.push(sqlite_data_cache); + } + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); + route(storage_manager, true, ip_addr, port).await; + } + } + } + }; Ok(()) } @@ -176,15 +248,18 @@ mod tests { let original_file_path = "tests/parquet/userdata1.parquet"; let config = ParpulseConfig { data_store: ParpulseConfigDataStore::Memdisk, - data_store_cache_num: todo!(), - mem_cache_size: todo!(), - disk_cache_size: todo!(), - cache_path: todo!(), - } + data_store_cache_num: Some(6), + mem_cache_size: None, + disk_cache_size: None, + cache_path: None, + cache_policy: ParpulseConfigCachePolicy::Lru, + cache_lru_k: None, + sqlite_cache_size: None, + }; // Start the server let server_handle = tokio::spawn(async move { - storage_node_serve("127.0.0.1", 3030).await.unwrap(); + storage_node_serve("127.0.0.1", 3030, config).await.unwrap(); }); // Give the server some time to start diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index c803510..4708e11 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -1,28 +1,37 @@ use crate::{ - cache::{data_store_cache::DataStoreCache, replacer::ParpulseDataStoreReplacer}, + cache::data_store_cache::DataStoreCache, common::hash::calculate_hash_crc32fast, error::ParpulseResult, storage_reader::{s3::S3Reader, s3_diskmock::MockS3Reader, AsyncStorageReader}, }; +use async_trait::async_trait; use bytes::Bytes; use log::debug; use parpulse_client::RequestParams; use tokio::sync::mpsc::Receiver; +#[async_trait] +pub trait StorageManager: Send + Sync { + async fn get_data( + &self, + request: RequestParams, + is_mem_disk_cache: bool, + ) -> ParpulseResult>>; +} + /// [`StorageManager`] handles the request from the storage client. /// /// We should allow concurrent requests fed into the storage manager, /// which should be responsible for handling multiple requests at the /// same time. - -pub struct StorageManager { +pub struct StorageManagerImpl { /// We don't use lock here because `data_store_cache` itself should handle the concurrency. - data_store_caches: Vec, + data_store_caches: Vec, } -impl StorageManager { - pub fn new(data_store_caches: Vec) -> Self { +impl StorageManagerImpl { + pub fn new(data_store_caches: Vec) -> Self { Self { data_store_caches } } @@ -92,6 +101,17 @@ impl StorageManager { } } +#[async_trait] +impl StorageManager for StorageManagerImpl { + async fn get_data( + &self, + request: RequestParams, + is_mem_disk_cache: bool, + ) -> ParpulseResult>> { + self.get_data(request, is_mem_disk_cache).await + } +} + /// fn buffer(&self) -> &[u8]; ensures Iterator has a buffer /// This buffer function returns the starting point of the result. /// **NOTE**: The result buffer must be **CONTINUOUS** in bytes with the size in Item as its length. @@ -132,7 +152,7 @@ mod tests { let data_store_cache = MemDiskStoreCache::new(cache, cache_base_path.display().to_string(), None, None); - let storage_manager = StorageManager::new(vec![data_store_cache]); + let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; @@ -188,7 +208,7 @@ mod tests { Some(mem_cache), Some(950), ); - let storage_manager = StorageManager::new(vec![data_store_cache]); + let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); let request_path_small_bucket = "tests-text".to_string(); let request_path_small_keys = vec!["what-can-i-hold-you-with".to_string()]; @@ -247,7 +267,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = StorageManager::new(vec![data_store_cache]); + let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -298,7 +318,7 @@ mod tests { None, None, ); - let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -358,7 +378,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; @@ -488,7 +508,7 @@ mod tests { ); data_store_caches.push(data_store_cache); } - let storage_manager = Arc::new(StorageManager::new(data_store_caches)); + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata1.parquet".to_string()]; @@ -538,7 +558,7 @@ mod tests { ); data_store_caches.push(data_store_cache); } - let storage_manager = Arc::new(StorageManager::new(data_store_caches)); + let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches)); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; @@ -625,7 +645,7 @@ mod tests { None, None, ); - let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; @@ -660,7 +680,7 @@ mod tests { Some(mem_cache), Some(120000), ); - let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); let request_path_bucket1 = "tests-parquet".to_string(); let request_path_keys1 = vec!["userdata2.parquet".to_string()]; diff --git a/tests/src/client_server_test.rs b/tests/src/client_server_test.rs index 7a505d7..f3db437 100644 --- a/tests/src/client_server_test.rs +++ b/tests/src/client_server_test.rs @@ -10,7 +10,7 @@ mod tests { use parpulse_client::client::StorageClientImpl; use serial_test::serial; use std::time::Instant; - use storage_node::server::storage_node_serve; + use storage_node::{common::config::ParpulseConfig, server::storage_node_serve}; #[test] fn setup() { @@ -26,7 +26,9 @@ mod tests { // The file dir should start from storage-node. // Start the server let server_handle = tokio::spawn(async move { - storage_node_serve("127.0.0.1", 3030).await.unwrap(); + storage_node_serve("127.0.0.1", 3030, ParpulseConfig::default()) + .await + .unwrap(); }); // Give the server some time to start @@ -80,7 +82,9 @@ mod tests { async fn test_client_server_s3() { // Start the server let server_handle = tokio::spawn(async move { - storage_node_serve("127.0.0.1", 3030).await.unwrap(); + storage_node_serve("127.0.0.1", 3030, ParpulseConfig::default()) + .await + .unwrap(); }); // Give the server some time to start From dbc79c924da262f9ed8d77fdce27b26c25dc249a Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 1 May 2024 14:14:40 +0000 Subject: [PATCH 3/4] add mem cache file size to config --- .../src/cache/data_store_cache/memdisk/mod.rs | 2 +- storage-node/src/common/config.rs | 3 +++ storage-node/src/server.rs | 25 +++++++------------ 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 7eb9099..30b5456 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -602,7 +602,7 @@ mod tests { LruReplacer::new(1024 * 512), disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(950)), - None, + Some(10 * 1024), ); let bucket = "tests-text".to_string(); let keys = vec!["what-can-i-hold-you-with".to_string()]; diff --git a/storage-node/src/common/config.rs b/storage-node/src/common/config.rs index 9f5ee2a..d23366f 100644 --- a/storage-node/src/common/config.rs +++ b/storage-node/src/common/config.rs @@ -33,6 +33,9 @@ pub struct ParpulseConfig { #[clap(long, default_value = None)] pub mem_cache_size: Option, + #[clap(long, default_value = None)] + pub mem_cache_file_size: Option, + #[clap(long, default_value = None)] pub disk_cache_size: Option, diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index 8a24643..38fa716 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -111,10 +111,13 @@ pub async fn storage_node_serve( ParpulseConfigDataStore::Memdisk => { let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); let mem_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_MEM_CACHE_SIZE); + let mem_cache_file_size = config + .mem_cache_file_size + .unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE); let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); match config.cache_policy { ParpulseConfigCachePolicy::Lru => { - info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}", data_store_cache_num, disk_cache_size, mem_cache_size); + info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size); let mut data_store_caches = Vec::new(); for i in 0..data_store_cache_num { let disk_replacer = LruReplacer::new(disk_cache_size); @@ -123,7 +126,7 @@ pub async fn storage_node_serve( disk_replacer, i.to_string() + &cache_base_path, Some(mem_replacer), - Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + Some(mem_cache_file_size), ); data_store_caches.push(data_store_cache); } @@ -131,7 +134,7 @@ pub async fn storage_node_serve( route(storage_manager, false, ip_addr, port).await; } ParpulseConfigCachePolicy::Lruk => { - info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}", data_store_cache_num, disk_cache_size, mem_cache_size); + info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size); let mut data_store_caches = Vec::new(); let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); for i in 0..data_store_cache_num { @@ -141,7 +144,7 @@ pub async fn storage_node_serve( disk_replacer, i.to_string() + &cache_base_path, Some(mem_replacer), - Some(DEFAULT_MEM_CACHE_MAX_FILE_SIZE), + Some(mem_cache_file_size), ); data_store_caches.push(data_store_cache); } @@ -234,7 +237,6 @@ pub async fn storage_node_serve( #[cfg(test)] mod tests { - use crate::common::config::ParpulseConfigDataStore; use super::*; use reqwest::Client; @@ -246,17 +248,8 @@ mod tests { #[tokio::test] async fn test_server() { let original_file_path = "tests/parquet/userdata1.parquet"; - let config = ParpulseConfig { - data_store: ParpulseConfigDataStore::Memdisk, - data_store_cache_num: Some(6), - mem_cache_size: None, - disk_cache_size: None, - cache_path: None, - cache_policy: ParpulseConfigCachePolicy::Lru, - cache_lru_k: None, - sqlite_cache_size: None, - }; - + let mut config = ParpulseConfig::default(); + config.data_store_cache_num = Some(6); // Start the server let server_handle = tokio::spawn(async move { storage_node_serve("127.0.0.1", 3030, config).await.unwrap(); From b1595dece1acc38bb1a2e718ef7b214aa8ea2657 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 1 May 2024 15:46:07 +0000 Subject: [PATCH 4/4] make reader buffer configurable --- .../memdisk/data_store/disk.rs | 15 +++++---- .../src/cache/data_store_cache/memdisk/mod.rs | 8 ++++- .../src/cache/data_store_cache/sqlite/mod.rs | 15 ++++----- storage-node/src/common/config.rs | 8 ++++- storage-node/src/server.rs | 31 ++++++++++++++----- storage-node/src/storage_manager.rs | 17 ++++++++-- 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index e373af3..4772230 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -15,9 +15,6 @@ use crate::{ storage_reader::StorageReaderStream, }; -/// TODO(lanlou): make them configurable. -/// MAX_DISK_READER_BUFFER_SIZE is 10MB. -const MAX_DISK_READER_BUFFER_SIZE: usize = 100 * 1024 * 1024; const DEFAULT_DISK_CHANNEL_BUFFER_SIZE: usize = 512; /// [`DiskStore`] stores the contents of remote objects on the local disk. @@ -25,6 +22,7 @@ pub struct DiskStore { disk_manager: DiskManager, /// The path to the directory where the data is stored on the disk. base_path: String, + max_disk_reader_buffer_size: usize, } impl Drop for DiskStore { @@ -37,7 +35,11 @@ impl Drop for DiskStore { } impl DiskStore { - pub fn new(disk_manager: DiskManager, base_path: String) -> Self { + pub fn new( + disk_manager: DiskManager, + base_path: String, + max_disk_reader_buffer_size: usize, + ) -> Self { let mut final_base_path = base_path; if !final_base_path.ends_with('/') { final_base_path += "/"; @@ -45,6 +47,7 @@ impl DiskStore { Self { disk_manager, base_path: final_base_path, + max_disk_reader_buffer_size, } } } @@ -63,8 +66,8 @@ impl DiskStore { { // TODO(lanlou): we later may consider the remaining space to decide the buffer size let mut buffer_size = self.disk_manager.file_size(key).await? as usize; - if buffer_size > MAX_DISK_READER_BUFFER_SIZE { - buffer_size = MAX_DISK_READER_BUFFER_SIZE; + if buffer_size > self.max_disk_reader_buffer_size { + buffer_size = self.max_disk_reader_buffer_size; } // FIXME: Shall we consider the situation where the data is not found? let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?; diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 06ec61d..a588603 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -106,9 +106,10 @@ impl> disk_base_path: String, mem_replacer: Option, mem_max_file_size: Option, + max_disk_reader_buffer_size: usize, ) -> Self { let disk_manager = DiskManager::default(); - let disk_store = DiskStore::new(disk_manager, disk_base_path); + let disk_store = DiskStore::new(disk_manager, disk_base_path, max_disk_reader_buffer_size); if mem_replacer.is_some() { debug_assert!(mem_max_file_size.is_some()); @@ -614,6 +615,7 @@ mod tests { disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(120000)), Some(10 * 1024), + 512, ); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; @@ -652,6 +654,7 @@ mod tests { disk_base_path.to_str().unwrap().to_string(), None, None, + 512, ); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; @@ -686,6 +689,7 @@ mod tests { disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(950)), Some(950), + 512, ); let bucket = "tests-text".to_string(); let keys = vec!["what-can-i-hold-you-with".to_string()]; @@ -725,6 +729,7 @@ mod tests { disk_base_path.to_str().unwrap().to_string(), None, None, + 512, )); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata2.parquet".to_string()]; @@ -760,6 +765,7 @@ mod tests { disk_base_path.to_str().unwrap().to_string(), Some(LruReplacer::new(120000)), Some(120000), + 512, )); let bucket = "tests-parquet".to_string(); let keys = vec!["userdata1.parquet".to_string()]; diff --git a/storage-node/src/cache/data_store_cache/sqlite/mod.rs b/storage-node/src/cache/data_store_cache/sqlite/mod.rs index cd94fa6..0a4562f 100644 --- a/storage-node/src/cache/data_store_cache/sqlite/mod.rs +++ b/storage-node/src/cache/data_store_cache/sqlite/mod.rs @@ -25,10 +25,8 @@ use super::{cache_key_from_request, DataStoreCache}; const SQLITE_CACHE_TABLE_NAME: &str = "parpulse_cache"; const SQLITE_CACHE_COLUMN_NAME: &str = "content"; -const SQLITE_BLOB_CHANNEL_CAPACITY: usize = 5; -const SQLITE_BLOB_READER_DEFAULT_BUFFER_SIZE: usize = 1024; - const SQLITE_MAX_BLOB_SIZE: usize = 512 * 1024 * 1024; // 512 MB +const SQLITE_BLOB_CHANNEL_CAPACITY: usize = 5; pub type SqliteStoreReplacerKey = String; pub struct SqliteStoreReplacerValue { @@ -62,27 +60,26 @@ pub struct SqliteStoreCache, sqlite_base_path: String, - buffer_size: usize, + reader_buffer_size: usize, } impl> SqliteStoreCache { pub fn new( replacer: R, sqlite_base_path: String, - buffer_size: Option, + reader_buffer_size: usize, ) -> ParpulseResult { let db = Connection::open(&sqlite_base_path)?; let create_table_stmt = format!( "CREATE TABLE IF NOT EXISTS {} ({} BLOB);", SQLITE_CACHE_TABLE_NAME, SQLITE_CACHE_COLUMN_NAME ); - let buffer_size = buffer_size.unwrap_or(SQLITE_BLOB_READER_DEFAULT_BUFFER_SIZE); db.execute_batch(&create_table_stmt)?; Ok(Self { replacer: Mutex::new(replacer), sqlite_base_path, - buffer_size, + reader_buffer_size, }) } } @@ -113,7 +110,7 @@ impl> Dat let (tx, rx) = channel(SQLITE_BLOB_CHANNEL_CAPACITY); let row_id = *replacer_value.as_value(); let sqlite_base_path = self.sqlite_base_path.clone(); - let buffer_size = self.buffer_size; + let buffer_size = self.reader_buffer_size; tokio::spawn(async move { let db = @@ -202,7 +199,7 @@ mod tests { let cache = SqliteStoreCache::new( replacer, sqlite_base_path.to_str().unwrap().to_string(), - Some(buffer_size), + buffer_size, ) .expect("create sqlite store cache failed"); diff --git a/storage-node/src/common/config.rs b/storage-node/src/common/config.rs index d23366f..fdbd3e2 100644 --- a/storage-node/src/common/config.rs +++ b/storage-node/src/common/config.rs @@ -42,6 +42,12 @@ pub struct ParpulseConfig { #[clap(long, default_value = None)] pub sqlite_cache_size: Option, - #[clap( long, default_value = None)] + #[clap(long, default_value = None)] pub cache_path: Option, + + #[clap(long, default_value = None)] + pub max_disk_reader_buffer_size: Option, + + #[clap(long, default_value = None)] + pub sqlite_blob_reader_buffer_size: Option, } diff --git a/storage-node/src/server.rs b/storage-node/src/server.rs index 008c9e8..99ca73b 100644 --- a/storage-node/src/server.rs +++ b/storage-node/src/server.rs @@ -22,6 +22,8 @@ const DEFAULT_DISK_CACHE_SIZE: usize = 1024 * 1024 * 1024; const DEFAULT_SQLITE_CACHE_SIZE: usize = 200 * 1024 * 1024; const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 10 * 1024 * 1024 + 1; const DEFAULT_LRU_K_VALUE: usize = 2; +const DEFAULT_MAX_DISK_READER_BUFFER_SIZE: usize = 100 * 1024 * 1024; +const DEFAULT_SQLITE_BLOB_READER_BUFFER_SIZE: usize = 1024; async fn route(storage_manager: Arc, ip_addr: &str, port: u16) { let route = warp::path!("file") @@ -108,10 +110,13 @@ pub async fn storage_node_serve( let mem_cache_file_size = config .mem_cache_file_size .unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE); + let max_disk_reader_buffer_size = config + .max_disk_reader_buffer_size + .unwrap_or(DEFAULT_MAX_DISK_READER_BUFFER_SIZE); let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); match config.cache_policy { ParpulseConfigCachePolicy::Lru => { - info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size); + info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}, max disk reader buffer size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size, max_disk_reader_buffer_size); let mut data_store_caches = Vec::new(); for i in 0..data_store_cache_num { let disk_replacer = LruReplacer::new(disk_cache_size); @@ -121,6 +126,7 @@ pub async fn storage_node_serve( i.to_string() + &cache_base_path, Some(mem_replacer), Some(mem_cache_file_size), + max_disk_reader_buffer_size, ); data_store_caches.push(data_store_cache); } @@ -128,7 +134,7 @@ pub async fn storage_node_serve( route(storage_manager, ip_addr, port).await; } ParpulseConfigCachePolicy::Lruk => { - info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size); + info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}, max disk reader buffer size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size, max_disk_reader_buffer_size); let mut data_store_caches = Vec::new(); let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); for i in 0..data_store_cache_num { @@ -139,6 +145,7 @@ pub async fn storage_node_serve( i.to_string() + &cache_base_path, Some(mem_replacer), Some(mem_cache_file_size), + max_disk_reader_buffer_size, ); data_store_caches.push(data_store_cache); } @@ -150,9 +157,12 @@ pub async fn storage_node_serve( ParpulseConfigDataStore::Disk => { let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE); let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()); + let max_disk_reader_buffer_size = config + .max_disk_reader_buffer_size + .unwrap_or(DEFAULT_MAX_DISK_READER_BUFFER_SIZE); match config.cache_policy { ParpulseConfigCachePolicy::Lru => { - info!("starting storage node with {} disk-only cache(s) and LRU cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size); + info!("starting storage node with {} disk-only cache(s) and LRU cache policy, disk cache size: {}, max disk reader buffer size: {}", data_store_cache_num, disk_cache_size, max_disk_reader_buffer_size); let mut data_store_caches = Vec::new(); for i in 0..data_store_cache_num { let disk_replacer = LruReplacer::new(disk_cache_size); @@ -161,6 +171,7 @@ pub async fn storage_node_serve( i.to_string() + &cache_base_path, None, None, + max_disk_reader_buffer_size, ); data_store_caches.push(data_store_cache); } @@ -168,7 +179,7 @@ pub async fn storage_node_serve( route(storage_manager, ip_addr, port).await; } ParpulseConfigCachePolicy::Lruk => { - info!("starting storage node with {} disk-only cache(s) and LRU-K cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size); + info!("starting storage node with {} disk-only cache(s) and LRU-K cache policy, disk cache size: {}, max disk reader buffer size: {}", data_store_cache_num, disk_cache_size, max_disk_reader_buffer_size); let mut data_store_caches = Vec::new(); let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); for i in 0..data_store_cache_num { @@ -178,6 +189,7 @@ pub async fn storage_node_serve( i.to_string() + &cache_base_path, None, None, + max_disk_reader_buffer_size, ); data_store_caches.push(data_store_cache); } @@ -190,16 +202,19 @@ pub async fn storage_node_serve( let sqlite_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()) + "sqlite.db3"; let sqlite_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_SQLITE_CACHE_SIZE); + let sqlite_blob_reader_buffer_size = config + .sqlite_blob_reader_buffer_size + .unwrap_or(DEFAULT_SQLITE_BLOB_READER_BUFFER_SIZE); match config.cache_policy { ParpulseConfigCachePolicy::Lru => { - info!("starting storage node with {} sqlite cache(s) and LRU cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size); + info!("starting storage node with {} sqlite cache(s) and LRU cache policy, cache size: {}, blob reader buffer size: {}", data_store_cache_num, sqlite_cache_size, sqlite_blob_reader_buffer_size); let mut data_store_caches = Vec::new(); for i in 0..data_store_cache_num { let replacer = LruReplacer::new(sqlite_cache_size); let sqlite_data_cache = SqliteStoreCache::new( replacer, i.to_string() + &sqlite_base_path, - None, + sqlite_blob_reader_buffer_size, )?; data_store_caches.push(sqlite_data_cache); } @@ -207,7 +222,7 @@ pub async fn storage_node_serve( route(storage_manager, ip_addr, port).await; } ParpulseConfigCachePolicy::Lruk => { - info!("starting storage node with {} sqlite cache(s) and LRU-K cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size); + info!("starting storage node with {} sqlite cache(s) and LRU-K cache policy, cache size: {}, blob reader buffer size: {}", data_store_cache_num, sqlite_cache_size, sqlite_blob_reader_buffer_size); let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE); let mut data_store_caches = Vec::new(); for i in 0..data_store_cache_num { @@ -215,7 +230,7 @@ pub async fn storage_node_serve( let sqlite_data_cache = SqliteStoreCache::new( replacer, i.to_string() + &sqlite_base_path, - None, + sqlite_blob_reader_buffer_size, )?; data_store_caches.push(sqlite_data_cache); } diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 49ec51e..3699e10 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -119,8 +119,13 @@ mod tests { let dir = tmp.path().to_owned(); let cache_base_path = dir.join("test-storage-manager"); - let data_store_cache = - MemDiskStoreCache::new(cache, cache_base_path.display().to_string(), None, None); + let data_store_cache = MemDiskStoreCache::new( + cache, + cache_base_path.display().to_string(), + None, + None, + 100 * 1024 * 1024, + ); let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); let bucket = "tests-parquet".to_string(); @@ -176,6 +181,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(950), + 100 * 1024 * 1024, ); let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); @@ -235,6 +241,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(120000), + 100 * 1024 * 1024, ); let storage_manager = StorageManagerImpl::new(vec![data_store_cache]); @@ -286,6 +293,7 @@ mod tests { disk_cache_base_path.display().to_string(), None, None, + 100 * 1024 * 1024, ); let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); @@ -346,6 +354,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(120000), + 100 * 1024 * 1024, ); let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); @@ -474,6 +483,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(120000), + 100 * 1024 * 1024, ); data_store_caches.push(data_store_cache); } @@ -524,6 +534,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(120000), + 100 * 1024 * 1024, ); data_store_caches.push(data_store_cache); } @@ -613,6 +624,7 @@ mod tests { disk_cache_base_path.display().to_string(), None, None, + 100 * 1024 * 1024, ); let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache])); @@ -648,6 +660,7 @@ mod tests { disk_cache_base_path.display().to_string(), Some(mem_cache), Some(120000), + 100 * 1024 * 1024, ); let storage_manager = Arc::new(StorageManagerImpl::new(vec![data_store_cache]));