diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index c9a7189..adae41b 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -56,6 +56,7 @@ impl DiskStore { &self, key: &str, disk_replacer: Arc>, + key_replacer: String, ) -> ParpulseResult>>> where R: DataStoreReplacer + 'static, @@ -68,7 +69,6 @@ impl DiskStore { // FIXME: Shall we consider the situation where the data is not found? let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?; let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE); - let key_str = key.to_string().clone(); tokio::spawn(async move { loop { match disk_stream.next().await { @@ -78,11 +78,13 @@ impl DiskStore { .unwrap(); } Some(Err(e)) => tx.send(Err(e)).await.unwrap(), - None => break, + None => { + // TODO(lanlou): when second read, so there is no need to unpin, how to improve? + disk_replacer.lock().await.unpin(&key_replacer); + break; + } } } - // TODO(lanlou): when second read, so there is no need to unpin, how to improve? - disk_replacer.lock().await.unpin(&key_str); }); Ok(Some(rx)) } diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 0d2eb96..7d8b5b1 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -3,7 +3,7 @@ pub mod data_store; use std::{collections::HashMap, sync::Arc}; use futures::stream::StreamExt; -use log::warn; +use log::{debug, warn}; use tokio::sync::{Mutex, Notify, RwLock}; use crate::{ @@ -31,9 +31,7 @@ pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; pub type MemDiskStoreReplacerKey = String; /// Status -> completed/incompleted; usize -> file_size -/// Notify -> notify; usize -> notify_waiter_count -/// FIXME: make (Notify, Mutex) a struct to make it more readable. -type StatusKeyHashMap = HashMap)>)>; +type StatusKeyHashMap = HashMap)>; pub struct MemDiskStoreReplacerValue { /// The path to the data store. For mem store, it should be data's s3 path. For disk @@ -65,10 +63,25 @@ impl ReplacerValue for MemDiskStoreReplacerValue { } } -#[derive(Clone)] +struct MemDiskStoreNotify { + inner: Notify, + waiter_count: Mutex, +} + +impl MemDiskStoreNotify { + fn new() -> Self { + MemDiskStoreNotify { + inner: Notify::new(), + waiter_count: Mutex::new(0), + } + } +} + +#[derive(Clone, PartialEq, Debug)] enum Status { Incompleted, - Completed, + MemCompleted, + DiskCompleted, } pub struct MemDiskStoreCache< @@ -79,9 +92,6 @@ pub struct MemDiskStoreCache< /// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size) disk_replacer: Arc>, mem_replacer: Option>>, - // MemDiskStoreReplacerKey -> remote_location - // status: 0 -> incompleted; 1 -> completed; 2 -> failed - // TODO(lanlou): we should clean this hashmap. status_of_keys: RwLock, } @@ -130,6 +140,74 @@ impl> } } } + + async fn notify_waiters_error(&self, key: &str) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + notify = status_of_keys.remove(key).unwrap().1; + } + notify.inner.notify_waiters(); + debug!("Notify waiters error for key {}", key); + } + + async fn notify_waiters_mem(&self, key: &String, bytes_mem_written: usize) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); + *status = Status::MemCompleted; + debug!( + "Notify waiters disk for key {}: set status to MemCompleted", + key, + ); + *size = bytes_mem_written; + let notify_waiter = notify_ref.waiter_count.lock().await; + if *notify_waiter > 0 { + self.mem_replacer + .as_ref() + .unwrap() + .lock() + .await + .pin(key, *notify_waiter); + debug!( + "Notify waiters mem for key {}: pin with waiter count {}", + key, *notify_waiter + ); + } else { + debug!("Notify waiters mem for key {}: no waiter", key); + } + notify = notify_ref.clone(); + } + // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock + notify.inner.notify_waiters(); + } + + async fn notify_waiters_disk(&self, key: &String, bytes_disk_written: usize) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); + *status = Status::DiskCompleted; + debug!( + "Notify waiters disk for key {}: set status to DiskCompleted", + key, + ); + *size = bytes_disk_written; + let notify_waiter = notify_ref.waiter_count.lock().await; + if *notify_waiter > 0 { + self.disk_replacer.lock().await.pin(key, *notify_waiter); + debug!( + "Notify waiters disk for key {}: pin with waiter count {}", + key, *notify_waiter + ); + } else { + debug!("Notify waiters disk for key {}: no waiter", key); + } + notify = notify_ref.clone(); + } + notify.inner.notify_waiters(); + } } #[async_trait] @@ -156,6 +234,10 @@ impl> D .read_data(&data_store_key, self.mem_replacer.as_ref().unwrap().clone()) { Ok(Some(rx)) => { + debug!( + "MemDiskStore get_data_from_cache: directly read data for {} in memory", + remote_location + ); return Ok(Some(rx)); } Ok(None) => { @@ -180,9 +262,17 @@ impl> D } if let Some(data) = self .disk_store - .read_data(&data_store_key, self.disk_replacer.clone()) + .read_data( + &data_store_key, + self.disk_replacer.clone(), + remote_location.clone(), + ) .await? { + debug!( + "MemDiskStore get_data_from_cache: directly read data for {} from disk", + remote_location + ); Ok(Some(data)) } else { return Err(ParpulseError::Internal( @@ -197,55 +287,85 @@ impl> D _data_size: Option, mut data_stream: StorageReaderStream, ) -> ParpulseResult { + // If in_progress of remote_location thread fails, it will clean the data from this hash map. { - // If in_progress of remote_location thread fails, it will clean the data from this hash map. - let mut existed = false; - loop { - let status_of_keys = self.status_of_keys.read().await; - if let Some(((status, size), notify_ref)) = status_of_keys.get(&remote_location) { - let notify; - existed = true; - match status { - Status::Incompleted => { - notify = notify_ref.clone(); - *notify_ref.1.lock().await += 1; - // The Notified future is guaranteed to receive wakeups from notify_waiters() - // as soon as it has been created, even if it has not yet been polled. - let notified = notify.0.notified(); - drop(status_of_keys); - notified.await; + let status_of_keys = self.status_of_keys.read().await; + if let Some(((status, size), notify_ref)) = status_of_keys.get(&remote_location) { + let notify; + + match status { + Status::Incompleted => { + debug!( + "MemDiskStore put_data_to_cache: find incompleted status for {}", + remote_location + ); + notify = notify_ref.clone(); + *notify_ref.waiter_count.lock().await += 1; + // The Notified future is guaranteed to receive wakeups from notify_waiters() + // as soon as it has been created, even if it has not yet been polled. + let notified = notify.inner.notified(); + drop(status_of_keys); + notified.await; + if let Some(((status, size), _)) = + self.status_of_keys.read().await.get(&remote_location) + { + assert!( + *status == Status::MemCompleted || *status == Status::DiskCompleted + ); + return Ok(*size); + } else { + return Err(ParpulseError::Internal( + "Put_data_to_cache fails".to_string(), + )); } - Status::Completed => { + } + Status::MemCompleted => { + // This code only applies to: a thread tries to `put_data_to_cache` but the data is already in cache + // and not be evicted. It is not wait and notified situation. + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; + if mem_replacer.peek(&remote_location).is_some() { + debug!("MemDiskStore put_data_to_cache: find mem-completed status for {}, remote location already in mem replacer, pin + 1", remote_location); + mem_replacer.pin(&remote_location, 1); return Ok(*size); + } else { + debug!("MemDiskStore put_data_to_cache:find mem-completed status for {}, no remote location in mem replacer", remote_location); } + // If mem_replacer has no data, then update status_of_keys + } + Status::DiskCompleted => { + // This code only applies to: a thread tries to `put_data_to_cache` but the data is already in cache + // and not be evicted. It is not wait and notified situation. + let mut disk_replacer = self.disk_replacer.lock().await; + if disk_replacer.peek(&remote_location).is_some() { + debug!("MemDiskStore put_data_to_cache: find disk-completed status for {}, remote location already in disk replacer, pin + 1", remote_location); + disk_replacer.pin(&remote_location, 1); + return Ok(*size); + } else { + debug!("MemDiskStore put_data_to_cache: find disk-completed status for {}, no remote location in disk replacer", remote_location); + } + // If disk_replacer has no data, then update status_of_keys } - } else { - // FIXME: status_of_keys lock should be released after break - break; } } - if existed { - // Another in progress of remote_location thread fails - return Err(ParpulseError::Internal( - "Put_data_to_cache fails".to_string(), - )); - } - self.status_of_keys.write().await.insert( - remote_location.clone(), - ( - (Status::Incompleted, 0), - Arc::new((Notify::new(), Mutex::new(0))), - ), - ); } + self.status_of_keys.write().await.insert( + remote_location.clone(), + ( + (Status::Incompleted, 0), + Arc::new(MemDiskStoreNotify::new()), + ), + ); + debug!( + "MemDiskStore put_data_to_cache: put incompleted status for {} into status_of_keys", + remote_location + ); + // TODO: Refine the lock. // TODO(lanlou): Also write the data to network. let mut bytes_to_disk = None; let mut bytes_mem_written = 0; - let mut evicted_bytes_to_disk: Option, usize))>> = - None; - // 1. If the mem_store is enabled, first try to write the data to memory. + // If the mem_store is enabled, first try to write the data to memory. // Note: Only file which size < mem_max_file_size can be written to memory. if let Some(mem_store) = &self.mem_store { loop { @@ -261,6 +381,10 @@ impl> D { // If write_data returns something, it means the file size is too large // to fit in the memory. We should put it to disk cache. + debug!( + "MemDiskStore put_data_to_cache: data size for {} not fit in memory, transfer data to disk", + remote_location + ); bytes_to_disk = Some(bytes_vec); break; } @@ -268,7 +392,7 @@ impl> D Some(Err(e)) => { // TODO(lanlou): Every time it returns an error, I need to manually add this clean code... // How to improve? - self.status_of_keys.write().await.remove(&remote_location); + self.notify_waiters_error(&remote_location).await; return Err(e); } None => break, @@ -283,9 +407,8 @@ impl> D remote_location.clone(), MemDiskStoreReplacerValue::new(remote_location.clone(), bytes_mem_written), ); - // Insertion fails. if replacer_put_status.is_none() { - // Put the data to disk cache. + // If inserting to memory fails, put the data to disk cache. bytes_to_disk = Some( mem_store .write() @@ -297,75 +420,112 @@ impl> D } else { // TODO(lanlou): currently the pin/unpin relies on after putting, it will **always** get! mem_replacer.pin(&remote_location, 1); + debug!( + "MemDiskStore put_data_to_cache: mem pin for {} + 1", + remote_location + ); // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { - let mut evicted_bytes_to_disk_inner = Vec::new(); let mut mem_store = mem_store.write().await; + // TODO(lanlou): I have to grab this huge lock when evicting from mem to disk, since otherwise another + // thread may have the same evict_key as the new coming request, and it will put `Incompleted` into status_of_keys, + // and there is conflict. (maybe add incompleted status for evicted keys, but tricky) + let mut status_of_keys = self.status_of_keys.write().await; + drop(mem_replacer); + // Put the evicted keys to disk cache. for evicted_key in evicted_keys { - evicted_bytes_to_disk_inner.push(( + if let Some(((status, _), _)) = + status_of_keys.get_mut(&evicted_key.clone()) + { + // If the key is still in the status_of_keys, it means the data is still in progress. + // We should not put the data to disk cache. + if *status == Status::Incompleted { + debug!( + "MemDiskStore put_data_to_cache: key {} still in progress, skip and not put to disk", + evicted_key + ); + continue; + } + assert!(*status == Status::MemCompleted); + *status = Status::DiskCompleted; + debug!( + "MemDiskStore put_data_to_cache: set status for key {} from MemCompleted to DiskCompleted", + evicted_key); + } + + let (bytes_vec, data_size) = + mem_store.clean_data(&evicted_key).unwrap(); + assert_ne!(evicted_key, remote_location); + let disk_store_key = self.disk_store.data_store_key(&evicted_key); + if self + .disk_store + .write_data(disk_store_key.clone(), Some(bytes_vec), None) + .await + .is_err() + { + warn!("Failed to write evicted data to disk for {}", evicted_key); + continue; + } + let mut disk_replacer = self.disk_replacer.lock().await; + match disk_replacer.put( evicted_key.clone(), - mem_store.clean_data(&evicted_key).unwrap(), - )); + MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), + ) { + Some(evicted_keys) => { + // evict data from disk + for evicted_key in evicted_keys { + // FIXME: I think all status of the evicted_key removed here should be + // `Completed`? + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 1 remove status {:?} for key {}", + status,evicted_key + ); + } + self.disk_store + .clean_data( + &self.disk_store.data_store_key(&evicted_key), + ) + .await?; + } + } + None => { + if let Err(e) = + self.disk_store.clean_data(&disk_store_key).await + { + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + // It is not main in-progress thread for evicted_key, and its status should be + // `Completed`, so there is totally no need to notify the waiters. + // FIXME: can we safely remove evicted_key here? + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 2 remove status {:?} for key {}", + status,evicted_key + ); + } + } + } } - evicted_bytes_to_disk = Some(evicted_bytes_to_disk_inner); } } } - } - // 2. If applicable, put all the evicted data into disk cache. - // Don't need to write the data to network for evicted keys. - if let Some(evicted_bytes_to_disk) = evicted_bytes_to_disk { - for (remote_location_evicted, (bytes_vec, data_size)) in evicted_bytes_to_disk { - assert_ne!(remote_location_evicted, remote_location); - let disk_store_key = self.disk_store.data_store_key(&remote_location_evicted); - self.disk_store - .write_data(disk_store_key.clone(), Some(bytes_vec), None) - .await?; - let mut disk_replacer = self.disk_replacer.lock().await; - if disk_replacer - .put( - remote_location_evicted, - MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), - ) - .is_none() - { - if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { - warn!( - "Failed to clean data ({}) from disk store: {}", - disk_store_key, e - ); - } - warn!( - "Failed to put evicted data ({}) to disk replacer.", - disk_store_key - ); - } - } - } - - // 3. If the data is successfully written to memory, directly return. - if self.mem_store.is_some() && bytes_to_disk.is_none() { - let notify; - { - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify_ref) = - status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = bytes_mem_written; - { - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; - mem_replacer.pin(&remote_location, *notify_ref.1.lock().await); - } - notify = notify_ref.clone(); + // If the data is successfully written to memory, directly return. + if bytes_to_disk.is_none() { + self.notify_waiters_mem(&remote_location, bytes_mem_written) + .await; + return Ok(bytes_mem_written); } - // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock - notify.0.notify_waiters(); - return Ok(bytes_mem_written); } - // 4. If the data is not written to memory_cache successfully, then cache it to disk. + // If the data is not written to memory_cache successfully, then cache it to disk. // Need to write the data to network for the current key. let disk_store_key = self.disk_store.data_store_key(&remote_location); let data_size_wrap = self @@ -373,46 +533,52 @@ impl> D .write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream)) .await; if let Err(e) = data_size_wrap { - self.status_of_keys.write().await.remove(&remote_location); + self.notify_waiters_error(&remote_location).await; return Err(e); } let data_size = data_size_wrap.unwrap(); { let mut disk_replacer = self.disk_replacer.lock().await; - if disk_replacer - .put( - remote_location.clone(), - MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), - ) - .is_none() - { - if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { - // TODO: do we need to notify the caller this failure? - warn!( - "Failed to clean data ({}) from disk store: {}", - disk_store_key, e - ); + match disk_replacer.put( + remote_location.clone(), + MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), + ) { + Some(evicted_keys) => { + let mut status_of_keys = self.status_of_keys.write().await; + for evicted_key in evicted_keys { + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 3 remove status {:?} for key {}", + status, evicted_key + ); + } + self.disk_store + .clean_data(&self.disk_store.data_store_key(&evicted_key)) + .await?; + } + } + None => { + if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + // We have to notify waiters, because it is the main thread for the current key. + self.notify_waiters_error(&remote_location).await; + return Err(ParpulseError::Internal( + "Failed to put data to disk replacer.".to_string(), + )); } - self.status_of_keys.write().await.remove(&remote_location); - return Err(ParpulseError::Internal( - "Failed to put data to disk replacer.".to_string(), - )); } disk_replacer.pin(&remote_location, 1); + debug!( + "MemDiskStore put_data_to_cache: disk pin for {} + 1", + remote_location + ); } - let notify; - { - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify_ref) = status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = data_size; - self.disk_replacer - .lock() - .await - .pin(&remote_location, *notify_ref.1.lock().await); - notify = notify_ref.clone(); - } - notify.0.notify_waiters(); + self.notify_waiters_disk(&remote_location, data_size).await; Ok(data_size) } } diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 96f5e5d..c5a867e 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -611,4 +611,73 @@ mod tests { assert!(result.6.is_ok()); assert_eq!(consume_receiver(result.6.unwrap().unwrap()).await, 930); } + + #[tokio::test] + async fn test_evict_disk() { + let disk_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + None, + None, + ); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let res1 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res1.is_ok()); + assert_eq!(consume_receiver(res1.unwrap()).await, 112193); + let res2 = storage_manager.get_data(request_data2.clone(), true).await; + assert!(res2.is_ok()); + assert_eq!(consume_receiver(res2.unwrap()).await, 113629); + let res3 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res3.is_ok()); + assert_eq!(consume_receiver(res3.unwrap()).await, 112193); + } + + #[tokio::test] + async fn test_evict_mem() { + let disk_cache = LruReplacer::new(10); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let res1 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res1.is_ok()); + assert_eq!(consume_receiver(res1.unwrap()).await, 112193); + let res2 = storage_manager.get_data(request_data2.clone(), true).await; + assert!(res2.is_ok()); + assert_eq!(consume_receiver(res2.unwrap()).await, 113629); + let res3 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res3.is_ok()); + assert_eq!(consume_receiver(res3.unwrap()).await, 112193); + } }