Skip to content

Commit

Permalink
Support random seed per bloom object by default (configurable) (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: Karthik Subbarao <[email protected]>
  • Loading branch information
KarthikSubbarao authored Dec 5, 2024
1 parent 34d6fb0 commit 99c6f58
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 182 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ bincode = "1.3"

[dev-dependencies]
rand = "0.8"
rstest = "0.23.0"

[lib]
crate-type = ["cdylib"]
Expand Down
6 changes: 6 additions & 0 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ pub fn bloom_filter_add_value(
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
use_random_seed,
validate_size_limit,
) {
Ok(bf) => bf,
Expand Down Expand Up @@ -274,12 +276,14 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
match value {
Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)),
None => {
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
use_random_seed,
validate_size_limit,
) {
Ok(bf) => bf,
Expand Down Expand Up @@ -403,10 +407,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
capacity,
expansion,
use_random_seed,
validate_size_limit,
) {
Ok(bf) => bf,
Expand Down
14 changes: 11 additions & 3 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
Expand Down Expand Up @@ -27,7 +28,6 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
digest: Some(bloom_callback::bloom_digest),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
free: Some(bloom_callback::bloom_free),

aux_load: Some(bloom_callback::bloom_aux_load),
Expand Down Expand Up @@ -72,7 +72,10 @@ impl ValkeyDataType for BloomFilterType {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);

let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
};
let is_seed_random = is_seed_random_u64 == 1;
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand All @@ -90,7 +93,7 @@ impl ValkeyDataType for BloomFilterType {
}
};
if !BloomFilter::validate_size(capacity as u32, new_fp_rate) {
logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit.");
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
return None;
}
// Only load num_items when it's the last filter
Expand All @@ -104,6 +107,10 @@ impl ValkeyDataType for BloomFilterType {
};
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
if !is_seed_random && filter.seed() != configs::FIXED_SEED {
logging::log_warning("Failed to restore bloom object: Object in fixed seed mode, but seed does not match FIXED_SEED.");
return None;
}
filters.push(filter);
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
Expand All @@ -114,6 +121,7 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
is_seed_random,
filters,
};
Some(item)
Expand Down
Loading

0 comments on commit 99c6f58

Please sign in to comment.