Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): unified cache interface #16318

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions src/common/cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ pub trait Cache<K: Eq + Hash + MemSized, V: MemSized> {
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;

/// Returns a reference to the value corresponding to the key in the cache or `None` if it is
/// not present in the cache. Unlike `get`, `peek` does not update the Cache state so the key's
/// position will be unchanged.
fn peek<Q>(&self, k: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;

/// Returns the value corresponding to the item by policy or `None` if the
/// cache is empty. Like `peek`, `peek_by_policy` does not update the Cache state so the item's
/// position will be unchanged.
// TODO: change to fn peek_by_policy<'a>(&self) -> Option<(&'a K, &'a V)>;
fn peek_by_policy(&self) -> Option<(&K, &V)>;

/// Inserts a key-value pair into the cache. If the key already existed, the old value is
/// returned.
fn insert(&mut self, k: K, v: V) -> Option<V>;
Expand Down
43 changes: 0 additions & 43 deletions src/common/cache/src/cache/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,49 +128,6 @@ impl<K: Eq + Hash + MemSized, V: MemSized> Cache<K, V> for LruCache<K, V> {
}
}

/// Returns a reference to the value corresponding to the key in the cache or `None` if it is
/// not present in the cache. Unlike `get`, `peek` does not update the LRU list so the key's
/// position will be unchanged.
///
/// # Example
///
/// ```rust,ignore
/// use databend_common_cache::{Cache, LruCache};
/// let mut cache = LruCache::new(2);
///
/// cache.put(1, "a");
/// cache.put(2, "b");
///
/// assert_eq!(cache.peek(&1), Some(&"a"));
/// assert_eq!(cache.peek(&2), Some(&"b"));
/// ```
fn peek<Q>(&self, k: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.map.get(k)
}

/// Returns the value corresponding to the least recently used item or `None` if the
/// cache is empty. Like `peek`, `peek_by_policy` does not update the LRU list so the item's
/// position will be unchanged.
///
/// # Example
///
/// ```rust,ignore
/// use databend_common_cache::{Cache, LruCache};
/// let mut cache = LruCache::new(2);
///
/// cache.put(1, "a");
/// cache.put(2, "b");
///
/// assert_eq!(cache.peek_by_policy(), Some((&1, &"a")));
/// ```
fn peek_by_policy(&self) -> Option<(&K, &V)> {
self.map.front()
}

/// Inserts a key-value pair into the cache. If the key already existed, the old value is
/// returned.
///
Expand Down
13 changes: 7 additions & 6 deletions src/query/storages/common/cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ impl Display for Unit {
}

// The cache accessor, crate users usually working on this interface while manipulating caches
pub trait CacheAccessor {
type V;
pub trait CacheAccessor: Send + Sync {
type V: Send + Sync;

fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<Self::V>>;
fn get_sized<Q: AsRef<str>>(&self, k: Q, len: u64) -> Option<Arc<Self::V>>;
fn get(&self, k: &String) -> Option<Arc<Self::V>>;
fn get_sized(&self, k: &String, len: u64) -> Option<Arc<Self::V>>;

fn insert(&self, key: String, value: Self::V) -> Arc<Self::V>;
fn evict(&self, k: &str) -> bool;
fn contains_key(&self, k: &str) -> bool;
fn evict(&self, k: &String) -> bool;

fn contains_key(&self, k: &String) -> bool;
fn bytes_size(&self) -> u64;
fn items_capacity(&self) -> u64;
fn bytes_capacity(&self) -> u64;
Expand Down
46 changes: 23 additions & 23 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,82 +70,82 @@ pub type SizedColumnArray = (
// - and implement `CacheAccessor` properly
pub trait CachedObject<T> {
type Cache: CacheAccessor<V = T>;
fn cache() -> Option<Self::Cache>;
fn cache() -> Arc<dyn CacheAccessor<V = T>>;
}

impl CachedObject<CompactSegmentInfo> for CompactSegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<CompactSegmentInfoCache>;
fn cache() -> Arc<dyn CacheAccessor<V = CompactSegmentInfo>> {
CacheManager::instance().get_table_segment_cache()
}
}

impl CachedObject<CompactSegmentInfo> for SegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<CompactSegmentInfoCache>;
fn cache() -> Arc<dyn CacheAccessor<V = CompactSegmentInfo>> {
CacheManager::instance().get_table_segment_cache()
}
}

impl CachedObject<TableSnapshot> for TableSnapshot {
type Cache = TableSnapshotCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<TableSnapshotCache>;
fn cache() -> Arc<dyn CacheAccessor<V = TableSnapshot>> {
CacheManager::instance().get_table_snapshot_cache()
}
}

impl CachedObject<Vec<Arc<BlockMeta>>> for Vec<Arc<BlockMeta>> {
type Cache = BlockMetaCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<BlockMetaCache>;
fn cache() -> Arc<dyn CacheAccessor<V = Vec<Arc<BlockMeta>>>> {
CacheManager::instance().get_block_meta_cache()
}
}

impl CachedObject<TableSnapshotStatistics> for TableSnapshotStatistics {
type Cache = TableSnapshotStatisticCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<TableSnapshotStatisticCache>;
fn cache() -> Arc<dyn CacheAccessor<V = TableSnapshotStatistics>> {
CacheManager::instance().get_table_snapshot_statistics_cache()
}
}

impl CachedObject<BloomIndexMeta> for BloomIndexMeta {
type Cache = BloomIndexMetaCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<BloomIndexMetaCache>;
fn cache() -> Arc<dyn CacheAccessor<V = BloomIndexMeta>> {
CacheManager::instance().get_bloom_index_meta_cache()
}
}

impl CachedObject<(PartStatistics, Partitions)> for (PartStatistics, Partitions) {
type Cache = PrunePartitionsCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<PrunePartitionsCache>;
fn cache() -> Arc<dyn CacheAccessor<V = (PartStatistics, Partitions)>> {
CacheManager::instance().get_prune_partitions_cache()
}
}

impl CachedObject<Xor8Filter> for Xor8Filter {
type Cache = BloomIndexFilterCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<BloomIndexFilterCache>;
fn cache() -> Arc<dyn CacheAccessor<V = Xor8Filter>> {
CacheManager::instance().get_bloom_index_filter_cache()
}
}

impl CachedObject<FileMetaData> for FileMetaData {
type Cache = FileMetaDataCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<FileMetaDataCache>;
fn cache() -> Arc<dyn CacheAccessor<V = FileMetaData>> {
CacheManager::instance().get_file_meta_data_cache()
}
}

impl CachedObject<InvertedIndexFile> for InvertedIndexFile {
type Cache = InvertedIndexFileCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<InvertedIndexFileCache>;
fn cache() -> Arc<dyn CacheAccessor<V = InvertedIndexFile>> {
CacheManager::instance().get_inverted_index_file_cache()
}
}

impl CachedObject<InvertedIndexMeta> for InvertedIndexMeta {
type Cache = InvertedIndexMetaCache;
fn cache() -> Option<Self::Cache> {
type Cache = Option<InvertedIndexMetaCache>;
fn cache() -> Arc<dyn CacheAccessor<V = InvertedIndexMeta>> {
CacheManager::instance().get_inverted_index_meta_cache()
}
}
Expand Down
61 changes: 39 additions & 22 deletions src/query/storages/common/cache/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@
use std::path::PathBuf;
use std::sync::Arc;

use databend_common_arrow::parquet::metadata::FileMetaData;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_config::CacheConfig;
use databend_common_config::CacheStorageTypeInnerConfig;
use databend_common_config::DiskCacheKeyReloadPolicy;
use databend_common_exception::Result;
use databend_storages_common_index::filters::Xor8Filter;
use databend_storages_common_index::BloomIndexMeta;
use databend_storages_common_index::InvertedIndexFile;
use databend_storages_common_index::InvertedIndexMeta;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
use log::info;

use crate::caches::BlockMetaCache;
Expand All @@ -34,7 +45,9 @@ use crate::caches::InvertedIndexMetaCache;
use crate::caches::PrunePartitionsCache;
use crate::caches::TableSnapshotCache;
use crate::caches::TableSnapshotStatisticCache;
use crate::CacheAccessor;
use crate::InMemoryLruCache;
use crate::SizedColumnArray;
use crate::TableDataCache;
use crate::TableDataCacheBuilder;

Expand Down Expand Up @@ -204,52 +217,56 @@ impl CacheManager {
GlobalInstance::get()
}

pub fn get_table_snapshot_cache(&self) -> Option<TableSnapshotCache> {
self.table_snapshot_cache.clone()
pub fn get_table_snapshot_cache(&self) -> Arc<dyn CacheAccessor<V = TableSnapshot>> {
Arc::new(self.table_snapshot_cache.clone())
}

pub fn get_block_meta_cache(&self) -> Option<BlockMetaCache> {
self.block_meta_cache.clone()
pub fn get_block_meta_cache(&self) -> Arc<dyn CacheAccessor<V = Vec<Arc<BlockMeta>>>> {
Arc::new(self.block_meta_cache.clone())
}

pub fn get_table_snapshot_statistics_cache(&self) -> Option<TableSnapshotStatisticCache> {
self.table_statistic_cache.clone()
pub fn get_table_snapshot_statistics_cache(
&self,
) -> Arc<dyn CacheAccessor<V = TableSnapshotStatistics>> {
Arc::new(self.table_statistic_cache.clone())
}

pub fn get_table_segment_cache(&self) -> Option<CompactSegmentInfoCache> {
self.compact_segment_info_cache.clone()
pub fn get_table_segment_cache(&self) -> Arc<dyn CacheAccessor<V = CompactSegmentInfo>> {
Arc::new(self.compact_segment_info_cache.clone())
}

pub fn get_bloom_index_filter_cache(&self) -> Option<BloomIndexFilterCache> {
self.bloom_index_filter_cache.clone()
pub fn get_bloom_index_filter_cache(&self) -> Arc<dyn CacheAccessor<V = Xor8Filter>> {
Arc::new(self.bloom_index_filter_cache.clone())
}

pub fn get_bloom_index_meta_cache(&self) -> Option<BloomIndexMetaCache> {
self.bloom_index_meta_cache.clone()
pub fn get_bloom_index_meta_cache(&self) -> Arc<dyn CacheAccessor<V = BloomIndexMeta>> {
Arc::new(self.bloom_index_meta_cache.clone())
}

pub fn get_inverted_index_meta_cache(&self) -> Option<InvertedIndexMetaCache> {
self.inverted_index_meta_cache.clone()
pub fn get_inverted_index_meta_cache(&self) -> Arc<dyn CacheAccessor<V = InvertedIndexMeta>> {
Arc::new(self.inverted_index_meta_cache.clone())
}

pub fn get_inverted_index_file_cache(&self) -> Option<InvertedIndexFileCache> {
self.inverted_index_file_cache.clone()
pub fn get_inverted_index_file_cache(&self) -> Arc<dyn CacheAccessor<V = InvertedIndexFile>> {
Arc::new(self.inverted_index_file_cache.clone())
}

pub fn get_prune_partitions_cache(&self) -> Option<PrunePartitionsCache> {
self.prune_partitions_cache.clone()
pub fn get_prune_partitions_cache(
&self,
) -> Arc<dyn CacheAccessor<V = (PartStatistics, Partitions)>> {
Arc::new(self.prune_partitions_cache.clone())
}

pub fn get_file_meta_data_cache(&self) -> Option<FileMetaDataCache> {
self.parquet_file_meta_data_cache.clone()
pub fn get_file_meta_data_cache(&self) -> Arc<dyn CacheAccessor<V = FileMetaData>> {
Arc::new(self.parquet_file_meta_data_cache.clone())
}

pub fn get_table_data_cache(&self) -> Option<TableDataCache> {
self.table_data_cache.clone()
}

pub fn get_table_data_array_cache(&self) -> Option<ColumnArrayCache> {
self.in_memory_table_data_cache.clone()
pub fn get_table_data_array_cache(&self) -> Arc<dyn CacheAccessor<V = SizedColumnArray>> {
Arc::new(self.in_memory_table_data_cache.clone())
}

pub fn new_named_items_cache<V: Into<CacheValue<V>>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl DiskCache {
self.cache.contains(&cache_key.0)
}

pub fn get_cache_path(&mut self, key: &str) -> Option<PathBuf> {
pub fn get_cache_path(&mut self, key: &String) -> Option<PathBuf> {
let cache_key = self.cache_key(key);
self.cache
.get(&cache_key.0)
Expand All @@ -330,7 +330,7 @@ impl DiskCache {
}

/// Remove the given key from the cache.
pub fn remove(&mut self, key: &str) -> Result<()> {
pub fn remove(&mut self, key: &String) -> Result<()> {
let cache_key = self.cache_key(key);
match self.cache.pop(&cache_key.0) {
Some(_) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ impl CacheAccessor for LruDiskCacheHolder {
"LruDiskCacheHolder"
}

fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<Bytes>> {
let k = k.as_ref();
fn get(&self, k: &String) -> Option<Arc<Bytes>> {
{
let mut cache = self.write();
cache.get_cache_path(k)
Expand Down Expand Up @@ -82,7 +81,7 @@ impl CacheAccessor for LruDiskCacheHolder {
})
}

fn get_sized<Q: AsRef<str>>(&self, k: Q, len: u64) -> Option<Arc<Self::V>> {
fn get_sized(&self, k: &String, len: u64) -> Option<Arc<Self::V>> {
let Some(cached_value) = self.get(k) else {
metrics_inc_cache_miss_bytes(len, self.name());
return None;
Expand All @@ -101,7 +100,7 @@ impl CacheAccessor for LruDiskCacheHolder {
Arc::new(value)
}

fn evict(&self, k: &str) -> bool {
fn evict(&self, k: &String) -> bool {
if let Err(e) = {
let mut cache = self.write();
cache.remove(k)
Expand All @@ -113,7 +112,7 @@ impl CacheAccessor for LruDiskCacheHolder {
}
}

fn contains_key(&self, k: &str) -> bool {
fn contains_key(&self, k: &String) -> bool {
let cache = self.read();
cache.contains_key(k)
}
Expand Down
Loading
Loading