Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding optional arg to BF.INSERT to allow users to check if their bloom filter can reach the desired size #41

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut wanted_capacity = -1;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -553,6 +554,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ATLEASTCAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
wanted_capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_CAPACITY));
}
};
}
"ITEMS" => {
idx += 1;
items_provided = true;
Expand All @@ -576,6 +592,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::WrongType);
}
};
// Check if we have a wanted capacity and calculate if we can reach that capacity
if wanted_capacity > 0 {
match utils::BloomObject::calculate_if_wanted_capacity_is_valid(
capacity,
fp_rate,
wanted_capacity,
tightening_ratio,
expansion,
) {
Ok(result) => result,
Err(e) => {
return Err(e);
}
}
}
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
Expand Down
41 changes: 41 additions & 0 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bloomfilter::Bloom;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Deserializer, Serialize};
use std::sync::atomic::Ordering;
use valkey_module::ValkeyError;

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand All @@ -32,6 +33,10 @@ pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
pub const WANTED_CAPACITY_EXCEEDS_MAX_SIZE: &str =
"ERR Wanted capacity would go beyond bloom object memory limit";
pub const WANTED_CAPACITY_FALSE_POSITIVE_INVALID: &str =
"ERR False positive degrades too much to reach wanted capacity";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
pub const DECODE_UNSUPPORTED_VERSION: &str =
Expand Down Expand Up @@ -455,6 +460,42 @@ impl BloomObject {
_ => Err(BloomError::DecodeUnsupportedVersion),
}
}

pub fn calculate_if_wanted_capacity_is_valid(
capacity: i64,
fp_rate: f64,
wanted_capacity: i64,
tightening_ratio: f64,
expansion: u32,
) -> Result<(), ValkeyError> {
let mut curr_capacity = capacity;
let mut curr_num_filters = 1;
let mut curr_fp_rate = fp_rate;
let mut filters_memory_usage = 0;
while curr_capacity < wanted_capacity {
curr_fp_rate = match BloomObject::calculate_fp_rate(
curr_fp_rate,
curr_num_filters,
tightening_ratio,
) {
Ok(rate) => rate,
Err(_) => {
return Err(ValkeyError::Str(WANTED_CAPACITY_FALSE_POSITIVE_INVALID));
}
};
let curr_filter_size = BloomFilter::compute_size(curr_capacity, curr_fp_rate);
let curr_object_size = BloomObject::compute_size((curr_num_filters * 2) as usize)
+ filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
return Err(ValkeyError::Str(WANTED_CAPACITY_EXCEEDS_MAX_SIZE));
}
filters_memory_usage += curr_filter_size;
curr_capacity *= expansion as i64;
curr_num_filters += 1;
}
Ok(())
}
}

/// Structure representing a single bloom filter. 200 Bytes.
Expand Down
2 changes: 2 additions & 0 deletions tests/test_bloom_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def test_bloom_command_error(self):
('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'),
('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'),
('BF.INSERT KEY HELLO', 'unknown argument received'),
('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 ATLEASTCAPACITY 10000000 EXPANSION 1', 'False positive degrades too much to reach wanted capacity'),
('BF.INSERT KEY ATLEASTCAPACITY 1000000000000', 'Wanted capacity would go beyond bloom object memory limit'),
('BF.RESERVE KEY String 100', 'bad error rate'),
('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'),
('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'),
Expand Down
Loading