Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: introduce storage node configuration #69

Merged
merged 5 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions storage-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ rusqlite = { version = "0.31", features = ["blob"] }
log = "0.4"
env_logger = "0.11"
crc32fast = "1.4.0"
clap = { version = "4.5", features = ["derive"] }
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
serial_test = "3.1"
16 changes: 10 additions & 6 deletions storage-node/src/bin/storage_node.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use clap::Parser;
use log::info;
use storage_node::server::storage_node_serve;

// TODO: Add config here.
use storage_node::{common::config::ParpulseConfig, server::storage_node_serve};

#[tokio::main]
async fn main() {
let _ = env_logger::builder()
// Init log.
if let Err(e) = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.try_init();
.try_init()
{
println!("Failed to init logger: {:?}", e);
unw9527 marked this conversation as resolved.
Show resolved Hide resolved
}
info!("starting storage node server...");
storage_node_serve("0.0.0.0", 3030).await.unwrap();
let config = ParpulseConfig::parse();
storage_node_serve("0.0.0.0", 3030, config).await.unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use crate::{
storage_reader::StorageReaderStream,
};

/// TODO(lanlou): make them configurable.
/// MAX_DISK_READER_BUFFER_SIZE is 10MB.
const MAX_DISK_READER_BUFFER_SIZE: usize = 100 * 1024 * 1024;
const DEFAULT_DISK_CHANNEL_BUFFER_SIZE: usize = 512;

/// [`DiskStore`] stores the contents of remote objects on the local disk.
pub struct DiskStore {
disk_manager: DiskManager,
/// The path to the directory where the data is stored on the disk.
base_path: String,
max_disk_reader_buffer_size: usize,
}

impl Drop for DiskStore {
Expand All @@ -37,14 +35,19 @@ impl Drop for DiskStore {
}

impl DiskStore {
pub fn new(disk_manager: DiskManager, base_path: String) -> Self {
pub fn new(
disk_manager: DiskManager,
base_path: String,
max_disk_reader_buffer_size: usize,
) -> Self {
let mut final_base_path = base_path;
if !final_base_path.ends_with('/') {
final_base_path += "/";
}
Self {
disk_manager,
base_path: final_base_path,
max_disk_reader_buffer_size,
}
}
}
Expand All @@ -63,8 +66,8 @@ impl DiskStore {
{
// TODO(lanlou): we later may consider the remaining space to decide the buffer size
let mut buffer_size = self.disk_manager.file_size(key).await? as usize;
if buffer_size > MAX_DISK_READER_BUFFER_SIZE {
buffer_size = MAX_DISK_READER_BUFFER_SIZE;
if buffer_size > self.max_disk_reader_buffer_size {
buffer_size = self.max_disk_reader_buffer_size;
}
// FIXME: Shall we consider the situation where the data is not found?
let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?;
Expand Down
19 changes: 10 additions & 9 deletions storage-node/src/cache/data_store_cache/memdisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ use self::data_store::{disk::DiskStore, memory::MemStore};

use super::cache_key_from_request;

/// The default maximum single file size for the memory cache.
/// If the file size exceeds this value, the file will be stored in the disk cache.
/// TODO(lanlou): make this value configurable.
pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512;

/// [`MemDiskStoreReplacerKey`] is a path to the remote object store.
pub type MemDiskStoreReplacerKey = String;

Expand Down Expand Up @@ -111,13 +106,14 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>>
disk_base_path: String,
mem_replacer: Option<R>,
mem_max_file_size: Option<usize>,
max_disk_reader_buffer_size: usize,
) -> Self {
let disk_manager = DiskManager::default();
let disk_store = DiskStore::new(disk_manager, disk_base_path);
let disk_store = DiskStore::new(disk_manager, disk_base_path, max_disk_reader_buffer_size);

if mem_replacer.is_some() {
let mut mem_max_file_size =
mem_max_file_size.unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE);
debug_assert!(mem_max_file_size.is_some());
let mut mem_max_file_size = mem_max_file_size.unwrap();
let replacer_max_capacity = mem_replacer.as_ref().unwrap().max_capacity();
if mem_max_file_size > replacer_max_capacity {
warn!("The maximum file size > replacer's max capacity, so we set maximum file size = 1/5 of the maximum capacity.");
Expand Down Expand Up @@ -618,7 +614,8 @@ mod tests {
LruReplacer::new(1024 * 512),
disk_base_path.to_str().unwrap().to_string(),
Some(LruReplacer::new(120000)),
None,
Some(10 * 1024),
512,
);
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
Expand Down Expand Up @@ -657,6 +654,7 @@ mod tests {
disk_base_path.to_str().unwrap().to_string(),
None,
None,
512,
);
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
Expand Down Expand Up @@ -691,6 +689,7 @@ mod tests {
disk_base_path.to_str().unwrap().to_string(),
Some(LruReplacer::new(950)),
Some(950),
512,
);
let bucket = "tests-text".to_string();
let keys = vec!["what-can-i-hold-you-with".to_string()];
Expand Down Expand Up @@ -730,6 +729,7 @@ mod tests {
disk_base_path.to_str().unwrap().to_string(),
None,
None,
512,
));
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata2.parquet".to_string()];
Expand Down Expand Up @@ -765,6 +765,7 @@ mod tests {
disk_base_path.to_str().unwrap().to_string(),
Some(LruReplacer::new(120000)),
Some(120000),
512,
));
let bucket = "tests-parquet".to_string();
let keys = vec!["userdata1.parquet".to_string()];
Expand Down
15 changes: 6 additions & 9 deletions storage-node/src/cache/data_store_cache/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ use super::{cache_key_from_request, DataStoreCache};

const SQLITE_CACHE_TABLE_NAME: &str = "parpulse_cache";
const SQLITE_CACHE_COLUMN_NAME: &str = "content";
const SQLITE_BLOB_CHANNEL_CAPACITY: usize = 5;
const SQLITE_BLOB_READER_DEFAULT_BUFFER_SIZE: usize = 1024;

const SQLITE_MAX_BLOB_SIZE: usize = 512 * 1024 * 1024; // 512 MB
const SQLITE_BLOB_CHANNEL_CAPACITY: usize = 5;

pub type SqliteStoreReplacerKey = String;
pub struct SqliteStoreReplacerValue {
Expand Down Expand Up @@ -62,27 +60,26 @@ pub struct SqliteStoreCache<R: DataStoreReplacer<SqliteStoreReplacerKey, SqliteS
{
replacer: Mutex<R>,
sqlite_base_path: String,
buffer_size: usize,
reader_buffer_size: usize,
}

impl<R: DataStoreReplacer<SqliteStoreReplacerKey, SqliteStoreReplacerValue>> SqliteStoreCache<R> {
pub fn new(
replacer: R,
sqlite_base_path: String,
buffer_size: Option<usize>,
reader_buffer_size: usize,
) -> ParpulseResult<Self> {
let db = Connection::open(&sqlite_base_path)?;
let create_table_stmt = format!(
"CREATE TABLE IF NOT EXISTS {} ({} BLOB);",
SQLITE_CACHE_TABLE_NAME, SQLITE_CACHE_COLUMN_NAME
);
let buffer_size = buffer_size.unwrap_or(SQLITE_BLOB_READER_DEFAULT_BUFFER_SIZE);
db.execute_batch(&create_table_stmt)?;

Ok(Self {
replacer: Mutex::new(replacer),
sqlite_base_path,
buffer_size,
reader_buffer_size,
})
}
}
Expand Down Expand Up @@ -113,7 +110,7 @@ impl<R: DataStoreReplacer<SqliteStoreReplacerKey, SqliteStoreReplacerValue>> Dat
let (tx, rx) = channel(SQLITE_BLOB_CHANNEL_CAPACITY);
let row_id = *replacer_value.as_value();
let sqlite_base_path = self.sqlite_base_path.clone();
let buffer_size = self.buffer_size;
let buffer_size = self.reader_buffer_size;

tokio::spawn(async move {
let db =
Expand Down Expand Up @@ -202,7 +199,7 @@ mod tests {
let cache = SqliteStoreCache::new(
replacer,
sqlite_base_path.to_str().unwrap().to_string(),
Some(buffer_size),
buffer_size,
)
.expect("create sqlite store cache failed");

Expand Down
53 changes: 53 additions & 0 deletions storage-node/src/common/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use clap::Parser;
use serde::Serialize;

#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)]
pub enum ParpulseConfigDataStore {
#[default]
Memdisk,
Disk,
Sqlite,
}

#[derive(clap::ValueEnum, Clone, Default, Debug, Serialize)]
pub enum ParpulseConfigCachePolicy {
#[default]
Lru,
Lruk,
}

#[derive(Parser, Default)]
pub struct ParpulseConfig {
#[clap(long, default_value_t, value_enum)]
pub cache_policy: ParpulseConfigCachePolicy,

#[clap(long, default_value = None)]
pub cache_lru_k: Option<usize>,

#[clap(long, default_value_t, value_enum)]
pub data_store: ParpulseConfigDataStore,

#[clap( long, default_value = None)]
pub data_store_cache_num: Option<usize>,

#[clap(long, default_value = None)]
pub mem_cache_size: Option<usize>,

#[clap(long, default_value = None)]
pub mem_cache_file_size: Option<usize>,

#[clap(long, default_value = None)]
pub disk_cache_size: Option<usize>,

#[clap(long, default_value = None)]
pub sqlite_cache_size: Option<usize>,

#[clap(long, default_value = None)]
pub cache_path: Option<String>,

#[clap(long, default_value = None)]
pub max_disk_reader_buffer_size: Option<usize>,

#[clap(long, default_value = None)]
pub sqlite_blob_reader_buffer_size: Option<usize>,
}
1 change: 1 addition & 0 deletions storage-node/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod config;
pub mod hash;
Loading
Loading