From 99c6f585ddb4deeb08049a3a8fc818aecd75700a Mon Sep 17 00:00:00 2001 From: KarthikSubbarao Date: Thu, 5 Dec 2024 00:03:15 -0800 Subject: [PATCH] Support random seed per bloom object by default (configurable) (#27) Signed-off-by: Karthik Subbarao --- Cargo.toml | 1 + src/bloom/command_handler.rs | 6 + src/bloom/data_type.rs | 14 +- src/bloom/utils.rs | 297 +++++++++++++++----------- src/configs.rs | 8 +- src/lib.rs | 1 + src/wrapper/bloom_callback.rs | 9 +- tests/conftest.py | 5 + tests/test_aofrewrite.py | 13 +- tests/test_basic.py | 10 +- tests/test_bloom_command.py | 1 - tests/test_bloom_metrics.py | 27 ++- tests/test_correctness.py | 2 +- tests/test_replication.py | 34 +-- tests/test_save_and_restore.py | 2 +- tests/valkey_bloom_test_case.py | 24 +++ tests/valkeytests/valkey_test_case.py | 2 +- 17 files changed, 274 insertions(+), 182 deletions(-) create mode 100644 tests/conftest.py diff --git a/Cargo.toml b/Cargo.toml index d205dbc..e889e76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ bincode = "1.3" [dev-dependencies] rand = "0.8" +rstest = "0.23.0" [lib] crate-type = ["cdylib"] diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index cd61763..2fe7464 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -112,10 +112,12 @@ pub fn bloom_filter_add_value( let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, @@ -274,12 +276,14 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke match value { Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)), None => { + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, @@ -403,10 +407,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index c8af83a..58131b4 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,5 +1,6 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; +use crate::configs; use crate::metrics::BLOOM_NUM_OBJECTS; use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; @@ -27,7 +28,6 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( digest: Some(bloom_callback::bloom_digest), mem_usage: Some(bloom_callback::bloom_mem_usage), - // TODO free: Some(bloom_callback::bloom_free), aux_load: Some(bloom_callback::bloom_aux_load), @@ -72,7 +72,10 @@ impl ValkeyDataType for BloomFilterType { return None; }; let mut filters: Vec = Vec::with_capacity(num_filters as usize); - + let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else { + return None; + }; + let is_seed_random = is_seed_random_u64 == 1; for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -90,7 +93,7 @@ impl ValkeyDataType for BloomFilterType { } }; if !BloomFilter::validate_size(capacity as u32, new_fp_rate) { - logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit."); + logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit."); return None; } // Only load num_items when it's the last filter @@ -104,6 +107,10 @@ impl ValkeyDataType for BloomFilterType { }; let filter = BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); + if !is_seed_random && filter.seed() != configs::FIXED_SEED { + logging::log_warning("Failed to restore bloom object: Object in fixed seed mode, but seed does not match FIXED_SEED."); + return None; + } filters.push(filter); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( @@ -114,6 +121,7 @@ impl ValkeyDataType for BloomFilterType { let item = BloomFilterType { expansion: expansion as u32, fp_rate, + is_seed_random, filters, }; Some(item) diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index f066d05..5894a24 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,14 +1,11 @@ +use super::data_type::BLOOM_TYPE_VERSION; use crate::{ - configs::{ - self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, - }, + configs::{self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN}, metrics, }; use bloomfilter; use bloomfilter::{deserialize, serialize}; use serde::{Deserialize, Serialize}; - -use super::data_type::BLOOM_TYPE_VERSION; use std::{mem, sync::atomic::Ordering}; /// KeySpace Notification Events @@ -70,6 +67,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, + pub is_seed_random: bool, pub filters: Vec, } @@ -79,6 +77,7 @@ impl BloomFilterType { fp_rate: f64, capacity: u32, expansion: u32, + use_random_seed: bool, validate_size_limit: bool, ) -> Result { // Reject the request, if the operation will result in creation of a bloom object containing a filter @@ -91,14 +90,17 @@ impl BloomFilterType { mem::size_of::(), std::sync::atomic::Ordering::Relaxed, ); - // Create the bloom filter and add to the main BloomFilter object. - let bloom = BloomFilter::new(fp_rate, capacity); + let bloom = match use_random_seed { + true => BloomFilter::with_random_seed(fp_rate, capacity), + false => BloomFilter::with_fixed_seed(fp_rate, capacity, &configs::FIXED_SEED), + }; let filters = vec![bloom]; let bloom = BloomFilterType { expansion, fp_rate, filters, + is_seed_random: use_random_seed, }; Ok(bloom) } @@ -118,6 +120,7 @@ impl BloomFilterType { BloomFilterType { expansion: from_bf.expansion, fp_rate: from_bf.fp_rate, + is_seed_random: from_bf.is_seed_random, filters, } } @@ -163,6 +166,15 @@ impl BloomFilterType { capacity } + /// Return the seed used by the Bloom object. Every filter in the bloom object uses the same seed as the + /// first filter regardless if the seed is fixed or randomly generated. + pub fn seed(&self) -> [u8; 32] { + self.filters + .first() + .expect("Every BloomObject is expected to have at least one filter") + .seed() + } + /// Add an item to the BloomFilterType object. /// If scaling is enabled, this can result in a new sub filter creation. pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { @@ -204,12 +216,12 @@ impl BloomFilterType { if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); + let seed = self.seed(); + let mut new_filter = BloomFilter::with_fixed_seed(new_fp_rate, new_capacity, &seed); // Add item. new_filter.set(item); new_filter.num_items += 1; self.filters.push(new_filter); - metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return Ok(1); @@ -252,39 +264,46 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { - Ok(values) => { - if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { - return Err(BloomError::BadExpansion); - } - if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { - return Err(BloomError::ErrorRateRange); - } - if values.2.len() >= configs::MAX_FILTERS_PER_OBJ as usize { - return Err(BloomError::MaxNumScalingFilters); - } - for _filter in values.2.iter() { - // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. - if validate_size_limit - && _filter.number_of_bytes() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER - .load(Ordering::Relaxed) - as usize - { - return Err(BloomError::ExceedsMaxBloomSize); - } - } - values + let (expansion, fp_rate, is_seed_random, filters): ( + u32, + f64, + bool, + Vec, + ) = match bincode::deserialize::<(u32, f64, bool, Vec)>( + &decoded_bytes[1..], + ) { + Ok(values) => { + // Expansion ratio can range from 0 to BLOOM_EXPANSION_MAX as we internally set this to 0 + // in case of non scaling filters. + if !(0..=BLOOM_EXPANSION_MAX).contains(&values.0) { + return Err(BloomError::BadExpansion); + } + if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { + return Err(BloomError::ErrorRateRange); } - Err(_) => { - return Err(BloomError::DecodeBloomFilterFailed); + if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize { + return Err(BloomError::MaxNumScalingFilters); } - }; + for _filter in values.3.iter() { + // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. + if validate_size_limit + && _filter.number_of_bytes() + > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) + as usize + { + return Err(BloomError::ExceedsMaxBloomSize); + } + } + values + } + Err(_) => { + return Err(BloomError::DecodeBloomFilterFailed); + } + }; let item = BloomFilterType { expansion, fp_rate, + is_seed_random, filters, }; // add bloom filter type metrics. @@ -308,10 +327,8 @@ impl BloomFilterType { metrics::BLOOM_CAPACITY_ACROSS_OBJECTS .fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed); } - Ok(item) } - _ => Err(BloomError::DecodeUnsupportedVersion), } } @@ -332,25 +349,30 @@ pub struct BloomFilter { } impl BloomFilter { - /// Instantiate empty BloomFilter object. - pub fn new(fp_rate: f64, capacity: u32) -> BloomFilter { - let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed( - capacity as usize, - fp_rate, - &configs::FIXED_SEED, - ) - .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + /// Instantiate empty BloomFilter object with a fixed seed used to create sip keys. + pub fn with_fixed_seed(fp_rate: f64, capacity: u32, fixed_seed: &[u8; 32]) -> BloomFilter { + let bloom = + bloomfilter::Bloom::new_for_fp_rate_with_seed(capacity as usize, fp_rate, fixed_seed) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); let fltr = BloomFilter { bloom, num_items: 0, capacity, }; - metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES - .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_CAPACITY_ACROSS_OBJECTS - .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); + fltr.incr_metrics_on_new_create(); + fltr + } + + /// Instantiate empty BloomFilter object with a randomly generated seed used to create sip keys. + pub fn with_random_seed(fp_rate: f64, capacity: u32) -> BloomFilter { + let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + let fltr = BloomFilter { + bloom, + num_items: 0, + capacity, + }; + fltr.incr_metrics_on_new_create(); fltr } @@ -364,15 +386,29 @@ impl BloomFilter { num_items, capacity, }; + fltr.incr_metrics_on_new_create(); + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed); + fltr + } + + /// Create a new BloomFilter from an existing BloomFilter object (COPY command). + pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { + BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) + } + + fn incr_metrics_on_new_create(&self) { metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS .fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES - .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS - .fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed); + .fetch_add(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_CAPACITY_ACROSS_OBJECTS - .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); - fltr + .fetch_add(self.capacity.into(), std::sync::atomic::Ordering::Relaxed); + } + + /// Return the seed used by the sip hasher of the raw bloom. + pub fn seed(&self) -> [u8; 32] { + self.bloom.seed() } pub fn number_of_bytes(&self) -> usize { @@ -398,11 +434,6 @@ impl BloomFilter { pub fn set(&mut self, item: &[u8]) { self.bloom.set(item) } - - /// Create a new BloomFilter from an existing BloomFilter object (COPY command). - pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { - BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) - } } impl Drop for BloomFilterType { @@ -430,8 +461,9 @@ impl Drop for BloomFilter { #[cfg(test)] mod tests { use super::*; - use configs::FIXED_SEED; + use configs; use rand::{distributions::Alphanumeric, Rng}; + use rstest::rstest; /// Returns random string with specified number of characters. fn random_prefix(len: usize) -> String { @@ -520,6 +552,33 @@ mod tests { fp_margin: f64, rand_prefix: &String, ) { + let is_seed_random = original_bloom_filter_type.is_seed_random; + assert_eq!( + restored_bloom_filter_type.is_seed_random, + original_bloom_filter_type.is_seed_random + ); + let original_filter_seed = original_bloom_filter_type.filters.first().unwrap().seed(); + assert_eq!(original_filter_seed, original_bloom_filter_type.seed(),); + if is_seed_random { + assert_ne!(original_filter_seed, configs::FIXED_SEED); + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == original_filter_seed)))); + } else { + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == configs::FIXED_SEED)))); + } assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -536,16 +595,6 @@ mod tests { restored_bloom_filter_type.memory_usage(), original_bloom_filter_type.memory_usage() ); - assert!(restored_bloom_filter_type - .filters - .iter() - .all(|restore_filter| original_bloom_filter_type - .filters - .iter() - .any( - |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) - && (restore_filter.bloom.seed() == FIXED_SEED) - ))); assert!(restored_bloom_filter_type .filters .iter() @@ -579,8 +628,8 @@ mod tests { fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); } - #[test] - fn test_non_scaling_filter() { + #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] + fn test_scaling_filter(is_seed_random: bool) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; @@ -588,9 +637,14 @@ mod tests { // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + initial_capacity, + expansion, + is_seed_random, + true, + ) + .expect("Expect bloom creation to succeed"); let (error_count, add_operation_idx) = add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); assert_eq!( @@ -620,7 +674,6 @@ mod tests { ); // Validate that the real fp_rate is not much more than the configured fp_rate. fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); - // Verify restore let mut restore_bf = BloomFilterType::create_copy_from(&bf); assert_eq!( @@ -637,17 +690,22 @@ mod tests { ); } - #[test] - fn test_scaling_filter() { + #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] + fn test_non_scaling_filter(is_seed_random: bool) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + initial_capacity, + expansion, + is_seed_random, + true, + ) + .expect("Expect bloom creation to succeed"); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; @@ -692,7 +750,6 @@ mod tests { ); // Validate that the real fp_rate is not much more than the configured fp_rate. fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); - // Verify restore let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); verify_restored_items( @@ -707,57 +764,55 @@ mod tests { #[test] fn test_seed() { - // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs - let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); - let seed = test_bloom_filter.bloom.seed(); - assert_eq!(seed, FIXED_SEED); + // When using the with_fixed_seed API, the sip keys generated should be equal to the constants from configs.rs + let test_bloom_filter1 = + BloomFilter::with_fixed_seed(0.5_f64, 1000_u32, &configs::FIXED_SEED); + let test_seed1 = test_bloom_filter1.seed(); + assert_eq!(test_seed1, configs::FIXED_SEED); + // When using the with_random_seed API, the sip keys generated should not be equal to the constant sip_keys. + let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_u32); + let test_seed2 = test_bloom_filter2.seed(); + assert_ne!(test_seed2, configs::FIXED_SEED); } #[test] fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true); + let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } - #[test] - fn test_bf_encode_and_decode() { - // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); - let key = "key"; - let _ = bf.add_item(key.as_bytes(), true); - + #[rstest(expansion, case::nonscaling(0), case::scaling(2))] + fn test_bf_encode_and_decode(expansion: u32) { + let mut bf = + BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap(); + let item = "item1"; + let _ = bf.add_item(item.as_bytes(), true); // action let encoder_result = bf.encode_bloom_filter(); - - // assert - // encoder sucess + // assert encode success assert!(encoder_result.is_ok()); let vec = encoder_result.unwrap(); - - // assert decode: + // assert decode success: let new_bf_result = BloomFilterType::decode_bloom_filter(&vec, true); - let new_bf = new_bf_result.unwrap(); - // verify new_bf and bf assert_eq!(bf.fp_rate, new_bf.fp_rate); assert_eq!(bf.expansion, new_bf.expansion); assert_eq!(bf.capacity(), new_bf.capacity()); - - // contains key - assert!(new_bf.item_exists(key.as_bytes())); + // verify item1 exists. + assert!(new_bf.item_exists(item.as_bytes())); } #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -779,7 +834,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -799,26 +854,12 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; let origin_fp_rate = bf.fp_rate; - // unsupoort expansion - bf.expansion = 0; - - let encoder_result = bf.encode_bloom_filter(); - - // 1. unsupport expansion - let vec = encoder_result.unwrap(); - // assert decode: - // should return error - assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), - Some(BloomError::BadExpansion) - ); - - // 1.2 Exceeded the maximum expansion + // 1. Exceeded the maximum expansion bf.expansion = BLOOM_EXPANSION_MAX + 1; let vec = bf.encode_bloom_filter().unwrap(); @@ -841,7 +882,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/configs.rs b/src/configs.rs index db83832..7ede53e 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -1,4 +1,5 @@ use lazy_static::lazy_static; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; /// Configurations @@ -14,6 +15,8 @@ pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001; pub const BLOOM_FP_RATE_MIN: f64 = 0.0; pub const BLOOM_FP_RATE_MAX: f64 = 1.0; +pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true; + // Max Memory usage allowed per bloom filter within a bloom object (64MB). // Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations. // Also, write operations that result in bloom object allocation larger than this size will be rejected. @@ -26,6 +29,7 @@ lazy_static! { pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT); pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 = AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT); + pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default(); } /// Constants @@ -40,7 +44,3 @@ pub const FIXED_SEED: [u8; 32] = [ 89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62, 152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131, ]; -pub const FIXED_SIP_KEY_ONE_A: u64 = 15713473521876537177; -pub const FIXED_SIP_KEY_ONE_B: u64 = 15671187751654921383; -pub const FIXED_SIP_KEY_TWO_A: u64 = 9766223185946773789; -pub const FIXED_SIP_KEY_TWO_B: u64 = 9453907914610147120; diff --git a/src/lib.rs b/src/lib.rs index 0b1b9c3..8f5b971 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ valkey_module! { string: [ ], bool: [ + ["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None], ], enum: [ ], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index da9e21d..a1e9a53 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -7,6 +7,7 @@ use std::ffi::CString; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; use std::sync::atomic::Ordering; +use valkey_module::logging; use valkey_module::logging::{log_io_error, ValkeyLogLevel}; use valkey_module::raw; use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; @@ -21,6 +22,11 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu raw::save_unsigned(rdb, v.filters.len() as u64); raw::save_unsigned(rdb, v.expansion as u64); raw::save_double(rdb, v.fp_rate); + let mut is_seed_random = 0; + if v.is_seed_random { + is_seed_random = 1; + } + raw::save_unsigned(rdb, is_seed_random); let filter_list = &v.filters; let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { @@ -47,6 +53,7 @@ pub unsafe extern "C" fn bloom_rdb_load( let bb = Box::new(item); Box::into_raw(bb).cast::() } else { + logging::log_warning("Failed to restore bloom object."); null_mut() } } @@ -120,7 +127,7 @@ pub unsafe extern "C" fn bloom_copy( /// # Safety /// Raw handler for the Bloom digest callback. pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) { - let mut dig = Digest::new(md); + let dig = Digest::new(md); let val = &*(value.cast::()); val.debug_digest(dig); } diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9b2c832 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,5 @@ +import pytest + +@pytest.fixture(params=['random-seed', 'fixed-seed']) +def bloom_config_parameterization(request): + return request.param \ No newline at end of file diff --git a/tests/test_aofrewrite.py b/tests/test_aofrewrite.py index fc94a1d..0f62fcf 100644 --- a/tests/test_aofrewrite.py +++ b/tests/test_aofrewrite.py @@ -8,7 +8,7 @@ class TestBloomAofRewrite(ValkeyBloomTestCaseBase): def get_custom_args(self): # test aof rewrite should avoid bloom filter override as rdb. use aof args = super().get_custom_args() - args.update({'aof-use-rdb-preamble': 'no', 'appendonly': 'yes', 'appenddirname': 'aof-{}'.format(self.port)}) + args.update({'aof-use-rdb-preamble': 'no', 'appendonly': 'yes'}) return args def test_basic_aofrewrite_and_restore(self): @@ -47,22 +47,17 @@ def test_basic_aofrewrite_and_restore(self): client.execute_command('DEL testSave') def test_aofrewrite_bloomfilter_metrics(self): + # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') - # We use a number greater than 7000 in order to have a buffer for any false positives - variables = [f"key{i+1}" for i in range(7500)] - - # Get original size to compare against size after scaled info_obj = self.client.execute_command('BF.INFO key1') - # Add keys until bloomfilter will scale out - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # cmd debug digest server_digest = self.client.debug_digest() assert server_digest != None or 0000000000000000000000000000000000000000 object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') - # save aof, restart sever + # save aof, restart server self.client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # restart server diff --git a/tests/test_basic.py b/tests/test_basic.py index 35a3510..9298f42 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -280,14 +280,20 @@ def test_debug_cmd(self): assert scenario4_obj != default_obj assert scenario4_object_digest != default_object_digest - # scenario5 validates that digest is equal on bloom objects with same properties and same items. + # scenario5 validates that digest is equal on bloom objects with same properties and same items only when we are + # using a fixed seed. Not when we are using a random seed. + is_random_seed = client.execute_command('CONFIG GET bf.bloom-use-random-seed') scenario5_obj = client.execute_command('BF.INSERT scenario5 error 0.001 capacity 1000 items 1') scenario5_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') assert scenario5_obj != default_obj assert scenario5_object_digest != default_object_digest + # Add the same items to both the original and the new bloom object. client.execute_command('BF.MADD default_obj 1 2 3') client.execute_command('BF.MADD scenario5 2 3') madd_default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') madd_scenario_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') - assert madd_scenario_object_digest == madd_default_object_digest + if is_random_seed[1] == b'yes': + assert madd_scenario_object_digest != madd_default_object_digest + else: + madd_scenario_object_digest == madd_default_object_digest diff --git a/tests/test_bloom_command.py b/tests/test_bloom_command.py index efcc8f8..b404bd5 100644 --- a/tests/test_bloom_command.py +++ b/tests/test_bloom_command.py @@ -34,7 +34,6 @@ def test_bloom_command_error(self): # not found ('BF.INFO TEST404', 'not found'), # incorrect syntax and argument usage - ('BF.ADD bf_non 2', 'non scaling filter is full'), ('bf.info key item', 'invalid information value'), ('bf.insert key CAPACITY 10000 ERROR 0.01 EXPANSION 0.99 NOCREATE NONSCALING ITEMS test1 test2 test3', 'bad expansion'), ('BF.INSERT KEY HELLO WORLD', 'unknown argument received'), diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 3df634c..4f33ed1 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -13,13 +13,13 @@ def test_basic_command_metrics(self): self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0) - # Create a default bloom filter and check its metrics values are correct - assert(self.client.execute_command('BF.ADD key item') == 1) + # Create a default bloom filter, add an item and check its metrics values are correct + self.add_items_till_capacity(self.client, "key", 1, 1, "item") self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) # Check that other commands don't influence metrics - assert(self.client.execute_command('BF.EXISTS key item') == 1) + assert(self.client.execute_command('BF.EXISTS key item1') == 1) assert(self.client.execute_command('BF.ADD key item2') == 1) assert(self.client.execute_command('BF.MADD key item3 item4')== [1, 1]) assert(self.client.execute_command('BF.MEXISTS key item3 item5')== [1, 0]) @@ -85,8 +85,7 @@ def test_scaled_bloomfilter_metrics(self): # Get original size to compare against size after scaled info_obj = self.client.execute_command('BF.INFO key1') # Add keys until bloomfilter will scale out - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # Check info for scaled bloomfilter matches metrics data for bloomfilter new_info_obj = self.client.execute_command(f'BF.INFO key1') @@ -101,28 +100,26 @@ def test_scaled_bloomfilter_metrics(self): def test_copy_metrics(self): - # Create a bloomfilter and copy it - assert(self.client.execute_command('BF.ADD key{123} item') == 1) - assert(self.client.execute_command('COPY key{123} copiedkey{123}') == 1) + # Create a bloomfilter, add one item and copy it + self.add_items_till_capacity(self.client, "originalKey", 1, 1, "item_prefix") + assert(self.client.execute_command('COPY originalKey copiedkey') == 1) # Verify that the metrics were updated correctly after copying self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 2, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Perform a FLUSHALL which should set all metrics data to 0 self.client.execute_command('FLUSHALL') - wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key{123} item'), 0) + wait_for_equal(lambda: self.client.execute_command('DBSIZE'), 0) self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) def test_save_and_restore_metrics(self): - # Create default bloom filter - assert(self.client.execute_command('BF.ADD testSave item') == 1) + # Create default bloom filter and add one item + self.add_items_till_capacity(self.client, "nonscaledfilter", 1, 1, "item_prefix") - # Create scaled bloom filter + # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') - variables = [f"key{i+1}" for i in range(7500)] - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # Get info and metrics stats of bloomfilter before rdb load original_info_obj = self.client.execute_command('BF.INFO key1') diff --git a/tests/test_correctness.py b/tests/test_correctness.py index 391e387..19354f6 100644 --- a/tests/test_correctness.py +++ b/tests/test_correctness.py @@ -80,7 +80,7 @@ def test_scaling_filter(self): total_error_count = 0 add_operation_idx = 0 for filter_idx in range(1, num_filters_to_scale + 1): - expected_total_capacity = initial_capacity * ((expansion ** filter_idx) - 1) + expected_total_capacity = self.calculate_expected_capacity(initial_capacity, expansion, filter_idx) error_count, new_add_operation_idx = self.add_items_till_capacity(client, filter_name, expected_total_capacity, add_operation_idx + 1, item_prefix) add_operation_idx = new_add_operation_idx total_error_count += error_count diff --git a/tests/test_replication.py b/tests/test_replication.py index 36012f2..c34f996 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -6,29 +6,26 @@ class TestBloomReplication(ReplicationTestCase): + # Global Parameterized Configs + use_random_seed = 'no' + def get_custom_args(self): self.set_server_version(os.environ['SERVER_VERSION']) return { 'loadmodule': os.getenv('MODULE_PATH'), + 'bf.bloom-use-random-seed': self.use_random_seed, } - def test_replication_success(self): - self.setup_replication(num_replicas=1) - assert self.client.execute_command('BF.ADD key item1') == 1 - bf_exists_result = self.client.execute_command('BF.EXISTS key item1') - bf_non_added_exists_result = self.client.execute_command('BF.EXISTS key item2') - bf_info_result = self.client.execute_command('BF.INFO key') - - self.waitForReplicaToSyncUp(self.replicas[0]) - bf_replica_exists_result = self.replicas[0].client.execute_command('BF.EXISTS key item1') - assert bf_exists_result == bf_replica_exists_result - bf_replica_non_added_exists_result = self.replicas[0].client.execute_command('BF.EXISTS key item2') - assert bf_non_added_exists_result == bf_replica_non_added_exists_result - bf_replica_info_result = self.replicas[0].client.execute_command('BF.INFO key') - assert bf_info_result == bf_replica_info_result + @pytest.fixture(autouse=True) + def use_random_seed_fixture(self, bloom_config_parameterization): + if bloom_config_parameterization == "random-seed": + self.use_random_seed = "yes" + elif bloom_config_parameterization == "fixed-seed": + self.use_random_seed = "no" def test_replication_behavior(self): self.setup_replication(num_replicas=1) + is_random_seed = self.client.execute_command('CONFIG GET bf.bloom-use-random-seed') # Test replication for write commands. bloom_write_cmds = [ ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2), @@ -72,11 +69,16 @@ def test_replication_behavior(self): # cmd debug digest server_digest_primary = self.client.debug_digest() assert server_digest_primary != None or 0000000000000000000000000000000000000000 - object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') server_digest_replica = self.client.debug_digest() assert server_digest_primary == server_digest_replica + object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') - assert object_digest_primary == debug_digest_replica + # TODO: Update the test here to validate that digest always matches during replication. Once we implement + # deterministic replication (including replicating seeds), this assert will be updated. + if is_random_seed[1] == b'yes': + assert object_digest_primary != debug_digest_replica + else: + assert object_digest_primary == debug_digest_replica self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py index 3ce539e..543ffc4 100644 --- a/tests/test_save_and_restore.py +++ b/tests/test_save_and_restore.py @@ -66,7 +66,7 @@ def test_restore_failed_large_bloom_filter(self): self.server.wait_for_save_done() self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=False) logfile = os.path.join(self.testdir, self.server.args["logfile"]) - large_obj_restore_err = "Failed to restore bloom object because it contains a filter larger than the max allowed size limit" + large_obj_restore_err = "Failed to restore bloom object: Contains a filter larger than the max allowed size limit." internal_rdb_err = "Internal error in RDB" self.wait_for_logfile(logfile, large_obj_restore_err) self.wait_for_logfile(logfile, internal_rdb_err) diff --git a/tests/valkey_bloom_test_case.py b/tests/valkey_bloom_test_case.py index 4e42d3e..42cfbe9 100644 --- a/tests/valkey_bloom_test_case.py +++ b/tests/valkey_bloom_test_case.py @@ -7,12 +7,23 @@ class ValkeyBloomTestCaseBase(ValkeyTestCase): + # Global Parameterized Configs + use_random_seed = 'no' + def get_custom_args(self): self.set_server_version(os.environ['SERVER_VERSION']) return { 'loadmodule': os.getenv('MODULE_PATH'), + 'bf.bloom-use-random-seed': self.use_random_seed, } + @pytest.fixture(autouse=True) + def use_random_seed_fixture(self, bloom_config_parameterization): + if bloom_config_parameterization == "random-seed": + self.use_random_seed = "yes" + elif bloom_config_parameterization == "fixed-seed": + self.use_random_seed = "no" + def verify_error_response(self, client, cmd, expected_err_reply): try: client.execute_command(cmd) @@ -143,6 +154,19 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p ) self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) + def calculate_expected_capacity(self, initial_capacity, expansion, num_filters): + """ + This function accepts the starting capacity (of the first filter), expansion and number of filters in + the object to calculate the expected total capacity (across all the filters) within the bloom object. + """ + curr_filt_capacity = initial_capacity + total_capacity = curr_filt_capacity + for i in range(2, num_filters + 1): + new_filt_capacity = curr_filt_capacity * expansion + curr_filt_capacity = new_filt_capacity + total_capacity += curr_filt_capacity + return total_capacity + def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters, expected_num_items, expected_sum_capacity): """ Verify the metric values are recorded properly, the expected values are as below diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 656ae8c..13b3a47 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker): self.port_tracker = resource_port_tracker def _get_valkey_args(self): - self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"}) + self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", 'appenddirname': f'aof-{self.port}', "enable-debug-command":"yes"}) self.args.update(self.get_custom_args()) return self.args