Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: introduce storage node configuration #69

Merged
merged 5 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions storage-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ rusqlite = { version = "0.31", features = ["blob"] }
log = "0.4"
env_logger = "0.11"
crc32fast = "1.4.0"
clap = { version = "4.5", features = ["derive"] }
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
serial_test = "3.1"
16 changes: 10 additions & 6 deletions storage-node/src/bin/storage_node.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use clap::Parser;
use log::info;
use storage_node::server::storage_node_serve;

// TODO: Add config here.
use storage_node::{common::config::ParpulseConfig, server::storage_node_serve};

#[tokio::main]
async fn main() {
let _ = env_logger::builder()
// Init log.
if let Err(e) = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.try_init();
.try_init()
{
println!("Failed to init logger: {:?}", e);
unw9527 marked this conversation as resolved.
Show resolved Hide resolved
}
info!("starting storage node server...");
storage_node_serve("0.0.0.0", 3030).await.unwrap();
let config = ParpulseConfig::parse();
storage_node_serve("0.0.0.0", 3030, config).await.unwrap();
}
11 changes: 3 additions & 8 deletions storage-node/src/cache/data_store_cache/memdisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ use self::data_store::{disk::DiskStore, memory::MemStore};

use super::cache_key_from_request;

/// The default maximum single file size for the memory cache.
/// If the file size exceeds this value, the file will be stored in the disk cache.
/// TODO(lanlou): make this value configurable.
pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512;

/// [`MemDiskStoreReplacerKey`] is a path to the remote object store.
pub type MemDiskStoreReplacerKey = String;

Expand Down Expand Up @@ -116,8 +111,8 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>>
let disk_store = DiskStore::new(disk_manager, disk_base_path);

if mem_replacer.is_some() {
let mut mem_max_file_size =
mem_max_file_size.unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE);
debug_assert!(mem_max_file_size.is_some());
let mut mem_max_file_size = mem_max_file_size.unwrap();
let replacer_max_capacity = mem_replacer.as_ref().unwrap().max_capacity();
if mem_max_file_size > replacer_max_capacity {
warn!("The maximum file size > replacer's max capacity, so we set maximum file size = 1/5 of the maximum capacity.");
Expand Down Expand Up @@ -618,7 +613,7 @@ mod tests {
LruReplacer::new(1024 * 512),
disk_base_path.to_str().unwrap().to_string(),
Some(LruReplacer::new(120000)),
None,
Some(10 * 1024),
);
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
Expand Down
47 changes: 47 additions & 0 deletions storage-node/src/common/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use clap::Parser;
use serde::Serialize;

#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)]
pub enum ParpulseConfigDataStore {
#[default]
Memdisk,
Disk,
Sqlite,
}

#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)]
pub enum ParpulseConfigCachePolicy {
#[default]
Lru,
Lruk,
}

#[derive(Parser, Default)]
pub struct ParpulseConfig {
#[clap(long, default_value_t, value_enum)]
pub cache_policy: ParpulseConfigCachePolicy,

#[clap(long, default_value = None)]
pub cache_lru_k: Option<usize>,

#[clap(long, default_value_t, value_enum)]
pub data_store: ParpulseConfigDataStore,

#[clap( long, default_value = None)]
pub data_store_cache_num: Option<usize>,

#[clap(long, default_value = None)]
pub mem_cache_size: Option<usize>,

#[clap(long, default_value = None)]
pub mem_cache_file_size: Option<usize>,

#[clap(long, default_value = None)]
pub disk_cache_size: Option<usize>,

#[clap(long, default_value = None)]
pub sqlite_cache_size: Option<usize>,

#[clap( long, default_value = None)]
pub cache_path: Option<String>,
}
1 change: 1 addition & 0 deletions storage-node/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod config;
pub mod hash;
184 changes: 154 additions & 30 deletions storage-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,24 @@ use tokio_stream::wrappers::ReceiverStream;
use warp::{Filter, Rejection};

use crate::{
cache::{data_store_cache::memdisk::MemDiskStoreCache, replacer::lru::LruReplacer},
cache::{
data_store_cache::{memdisk::MemDiskStoreCache, sqlite::SqliteStoreCache},
replacer::{lru::LruReplacer, lru_k::LruKReplacer},
},
common::config::{ParpulseConfig, ParpulseConfigCachePolicy, ParpulseConfigDataStore},
error::ParpulseResult,
storage_manager::StorageManager,
storage_manager::{StorageManager, StorageManagerImpl},
};

const CACHE_BASE_PATH: &str = "cache/";
const DATA_STORE_CACHE_NUMBER: usize = 3;

pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()> {
// Should at least be able to store one 100MB file in the cache.
// TODO: Read the type of the cache from config.
let dummy_size_per_disk_cache = 1024 * 1024 * 1024;
let dummy_size_per_mem_cache = 200 * 1024 * 1024;
let dummy_mem_max_file_cache = 10 * 1024 * 1024 + 1;

let mut data_store_caches = Vec::new();

for i in 0..DATA_STORE_CACHE_NUMBER {
let disk_replacer = LruReplacer::new(dummy_size_per_disk_cache);
let mem_replace = LruReplacer::new(dummy_size_per_mem_cache);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + CACHE_BASE_PATH,
Some(mem_replace),
Some(dummy_mem_max_file_cache),
);
data_store_caches.push(data_store_cache);
}

// TODO: try to use more fine-grained lock instead of locking the whole storage_manager
let storage_manager = Arc::new(StorageManager::new(data_store_caches));
const CACHE_BASE_PATH: &str = "parpulse-cache";
const DEFAULT_DATA_STORE_CACHE_NUM: usize = 3;
const DEFAULT_MEM_CACHE_SIZE: usize = 100 * 1024;
const DEFAULT_DISK_CACHE_SIZE: usize = 1024 * 1024 * 1024;
const DEFAULT_SQLITE_CACHE_SIZE: usize = 200 * 1024 * 1024;
const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 10 * 1024 * 1024 + 1;
const DEFAULT_LRU_K_VALUE: usize = 2;

async fn route(storage_manager: Arc<impl StorageManager + 'static>, ip_addr: &str, port: u16) {
let route = warp::path!("file")
.and(warp::path::end())
.and(warp::query::<S3Request>())
Expand All @@ -62,6 +48,7 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()>
} else {
RequestParams::S3((bucket, vec![keys]))
};

let result = storage_manager.get_data(request).await;
match result {
Ok(data_rx) => {
Expand Down Expand Up @@ -104,12 +91,147 @@ pub async fn storage_node_serve(ip_addr: &str, port: u16) -> ParpulseResult<()>
let routes = route.or(heartbeat).or(catch_all);
let ip_addr: IpAddr = ip_addr.parse().unwrap();
warp::serve(routes).run((ip_addr, port)).await;
}

pub async fn storage_node_serve(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to explicitly match xxx to LRU or LRU-K for every type of data store...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concrete types have to match when we initialize a variable. We can't assign a variable as LRU or LRUK at the same time. So we have to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I mean if it is possible to modularize repetitive code.

ip_addr: &str,
port: u16,
config: ParpulseConfig,
) -> ParpulseResult<()> {
let data_store_cache_num = config
.data_store_cache_num
.unwrap_or(DEFAULT_DATA_STORE_CACHE_NUM);
match config.data_store {
ParpulseConfigDataStore::Memdisk => {
let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE);
let mem_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_MEM_CACHE_SIZE);
let mem_cache_file_size = config
.mem_cache_file_size
.unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE);
let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string());
match config.cache_policy {
ParpulseConfigCachePolicy::Lru => {
info!("starting storage node with {} mem-disk cache(s) and LRU cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size);
let mut data_store_caches = Vec::new();
for i in 0..data_store_cache_num {
let disk_replacer = LruReplacer::new(disk_cache_size);
let mem_replacer = LruReplacer::new(mem_cache_size);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + &cache_base_path,
Some(mem_replacer),
Some(mem_cache_file_size),
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
ParpulseConfigCachePolicy::Lruk => {
info!("starting storage node with {} mem-disk cache(s) and LRU-K cache policy, disk cache size: {}, mem cache size: {}, mem cache file size: {}", data_store_cache_num, disk_cache_size, mem_cache_size, mem_cache_file_size);
let mut data_store_caches = Vec::new();
let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE);
for i in 0..data_store_cache_num {
let disk_replacer = LruKReplacer::new(disk_cache_size, k);
let mem_replacer = LruKReplacer::new(mem_cache_size, k);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + &cache_base_path,
Some(mem_replacer),
Some(mem_cache_file_size),
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
};
}
ParpulseConfigDataStore::Disk => {
let disk_cache_size = config.disk_cache_size.unwrap_or(DEFAULT_DISK_CACHE_SIZE);
let cache_base_path = config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string());
match config.cache_policy {
ParpulseConfigCachePolicy::Lru => {
info!("starting storage node with {} disk-only cache(s) and LRU cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size);
let mut data_store_caches = Vec::new();
for i in 0..data_store_cache_num {
let disk_replacer = LruReplacer::new(disk_cache_size);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + &cache_base_path,
None,
None,
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
ParpulseConfigCachePolicy::Lruk => {
info!("starting storage node with {} disk-only cache(s) and LRU-K cache policy, disk cache size: {}", data_store_cache_num, disk_cache_size);
let mut data_store_caches = Vec::new();
let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE);
for i in 0..data_store_cache_num {
let disk_replacer = LruKReplacer::new(disk_cache_size, k);
let data_store_cache = MemDiskStoreCache::new(
disk_replacer,
i.to_string() + &cache_base_path,
None,
None,
);
data_store_caches.push(data_store_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
}
}
ParpulseConfigDataStore::Sqlite => {
let sqlite_base_path =
config.cache_path.unwrap_or(CACHE_BASE_PATH.to_string()) + "sqlite.db3";
let sqlite_cache_size = config.mem_cache_size.unwrap_or(DEFAULT_SQLITE_CACHE_SIZE);
match config.cache_policy {
ParpulseConfigCachePolicy::Lru => {
info!("starting storage node with {} sqlite cache(s) and LRU cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size);
let mut data_store_caches = Vec::new();
for i in 0..data_store_cache_num {
let replacer = LruReplacer::new(sqlite_cache_size);
let sqlite_data_cache = SqliteStoreCache::new(
replacer,
i.to_string() + &sqlite_base_path,
None,
)?;
data_store_caches.push(sqlite_data_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
ParpulseConfigCachePolicy::Lruk => {
info!("starting storage node with {} sqlite cache(s) and LRU-K cache policy, cache size: {}", data_store_cache_num, sqlite_cache_size);
let k = config.cache_lru_k.unwrap_or(DEFAULT_LRU_K_VALUE);
let mut data_store_caches = Vec::new();
for i in 0..data_store_cache_num {
let replacer = LruKReplacer::new(sqlite_cache_size, k);
let sqlite_data_cache = SqliteStoreCache::new(
replacer,
i.to_string() + &sqlite_base_path,
None,
)?;
data_store_caches.push(sqlite_data_cache);
}
let storage_manager = Arc::new(StorageManagerImpl::new(data_store_caches));
route(storage_manager, ip_addr, port).await;
}
}
}
};

Ok(())
}

#[cfg(test)]
mod tests {

use super::*;
use reqwest::Client;
use std::fs;
Expand All @@ -118,12 +240,14 @@ mod tests {

/// WARNING: Put userdata1.parquet in the storage-node/tests/parquet directory before running this test.
#[tokio::test]
#[allow(clippy::field_reassign_with_default)]
async fn test_server() {
let original_file_path = "tests/parquet/userdata1.parquet";

let mut config = ParpulseConfig::default();
config.data_store_cache_num = Some(6);
// Start the server
let server_handle = tokio::spawn(async move {
storage_node_serve("127.0.0.1", 3030).await.unwrap();
storage_node_serve("127.0.0.1", 3030, config).await.unwrap();
});

// Give the server some time to start
Expand Down
Loading
Loading