diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 46d6851..55e327f 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -175,6 +175,8 @@ impl BloomFilterType { // Add item. filter.set(item); filter.num_items += 1; + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return Ok(1); } // Non Scaling Filters that are filled to capacity cannot handle more inserts. @@ -208,6 +210,9 @@ impl BloomFilterType { new_filter.set(item); new_filter.num_items += 1; self.filters.push(new_filter); + + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return Ok(1); } Ok(0) @@ -289,6 +294,12 @@ impl BloomFilterType { filter.number_of_bytes(), std::sync::atomic::Ordering::Relaxed, ); + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add( + filter.num_items.into(), + std::sync::atomic::Ordering::Relaxed, + ); + metrics::BLOOM_CAPACITY_ACROSS_OBJECTS + .fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed); } Ok(item) @@ -329,6 +340,8 @@ impl BloomFilter { .fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); + metrics::BLOOM_CAPACITY_ACROSS_OBJECTS + .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); fltr } @@ -356,6 +369,10 @@ impl BloomFilter { .fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed); + metrics::BLOOM_CAPACITY_ACROSS_OBJECTS + .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); fltr } @@ -422,6 +439,10 @@ impl Drop for BloomFilter { .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES .fetch_sub(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_sub(self.num_items.into(), std::sync::atomic::Ordering::Relaxed); + metrics::BLOOM_CAPACITY_ACROSS_OBJECTS + .fetch_sub(self.capacity.into(), std::sync::atomic::Ordering::Relaxed); } } diff --git a/src/metrics.rs b/src/metrics.rs index 65bf1f4..9938022 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -6,6 +6,8 @@ lazy_static! { pub static ref BLOOM_NUM_OBJECTS: AtomicU64 = AtomicU64::new(0); pub static ref BLOOM_OBJECT_TOTAL_MEMORY_BYTES: AtomicUsize = AtomicUsize::new(0); pub static ref BLOOM_NUM_FILTERS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0); + pub static ref BLOOM_NUM_ITEMS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0); + pub static ref BLOOM_CAPACITY_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0); } pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> { @@ -27,6 +29,18 @@ pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> { .load(Ordering::Relaxed) .to_string(), )? + .field( + "bloom_num_items_across_objects", + BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .load(Ordering::Relaxed) + .to_string(), + )? + .field( + "bloom_capacity_across_objects", + BLOOM_CAPACITY_ACROSS_OBJECTS + .load(Ordering::Relaxed) + .to_string(), + )? .build_section()? .build_info()?; diff --git a/tests/test_aofrewrite.py b/tests/test_aofrewrite.py index c9f2054..42db513 100644 --- a/tests/test_aofrewrite.py +++ b/tests/test_aofrewrite.py @@ -58,11 +58,11 @@ def test_aofrewrite_bloomfilter_metrics(self): # Check info for scaled bloomfilter matches metrics data for bloomfilter new_info_obj = self.client.execute_command(f'BF.INFO key1') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000) # Check bloomfilter size has increased assert new_info_obj[3] > info_obj[3] # Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly self.client.execute_command('DEL key1') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0) \ No newline at end of file + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) \ No newline at end of file diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 2722b21..29eb4e6 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -5,17 +5,18 @@ from util.waiters import * DEFAULT_BLOOM_FILTER_SIZE = 179960 +DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase): def test_basic_command_metrics(self): # Check that bloom metrics stats start at 0 - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0) # Create a default bloom filter and check its metrics values are correct assert(self.client.execute_command('BF.ADD key item') == 1) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) # Check that other commands don't influence metrics assert(self.client.execute_command('BF.EXISTS key item') == 1) @@ -26,13 +27,13 @@ def test_basic_command_metrics(self): self.client.execute_command("BF.INFO key") assert(self.client.execute_command('BF.INSERT key ITEMS item5 item6')== [1, 1]) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY) # Create a new default bloom filter and check metrics again assert(self.client.execute_command('BF.ADD key2 item') == 1) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Create a non default filter with BF.RESERVE and check its metrics are correct assert(self.client.execute_command('BF.RESERVE key3 0.001 2917251') == b'OK') @@ -40,41 +41,41 @@ def test_basic_command_metrics(self): # We want to check the size of the newly created bloom filter but metrics contains the size of all bloomfilters so we must minus the # two default bloomfilters we already created - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251) # Delete a non default key and make sure the metrics stats are still correct self.client.execute_command('DEL key3') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Create a default filter with BF.INSERT and check its metrics are correct assert(self.client.execute_command('BF.INSERT key4 ITEMS item1 item2') == [1, 1]) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3) # Delete a default key and make sure the metrics are still correct self.client.execute_command('UNLINK key') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Create a key then cause it to expire and check if metrics are updated correctly assert self.client.execute_command('BF.ADD TEST_EXP ITEM') == 1 - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3) assert self.client.execute_command('TTL TEST_EXP') == -1 self.verify_bloom_filter_item_existence(self.client, 'TEST_EXP', 'ITEM') curr_time = int(time.time()) assert self.client.execute_command(f'EXPIREAT TEST_EXP {curr_time + 5}') == 1 wait_for_equal(lambda: self.client.execute_command('BF.EXISTS TEST_EXP ITEM'), 0) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Flush database so all keys should now be gone and metrics should all be at 0 self.client.execute_command('FLUSHDB') wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key2 item'), 0) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0) - self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0) def test_scaled_bloomfilter_metrics(self): self.client.execute_command('BF.RESERVE key1 0.001 7000') @@ -89,14 +90,14 @@ def test_scaled_bloomfilter_metrics(self): # Check info for scaled bloomfilter matches metrics data for bloomfilter new_info_obj = self.client.execute_command(f'BF.INFO key1') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000) # Check bloomfilter size has increased assert new_info_obj[3] > info_obj[3] # Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly self.client.execute_command('DEL key1') - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) def test_copy_metrics(self): @@ -105,12 +106,12 @@ def test_copy_metrics(self): assert(self.client.execute_command('COPY key{123} copiedkey{123}') == 1) # Verify that the metrics were updated correctly after copying - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 2, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Perform a FLUSHALL which should set all metrics data to 0 self.client.execute_command('FLUSHALL') wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key{123} item'), 0) - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0) + self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) def test_save_and_restore_metrics(self): @@ -138,4 +139,4 @@ def test_save_and_restore_metrics(self): for i in range(1, len(original_info_obj), 2): assert original_info_obj[i] == restored_info_obj[i] - self.verify_bloom_metrics(self.client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3) + self.verify_bloom_metrics(new_client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3, 7501, 21000 + DEFAULT_BLOOM_FILTER_CAPACITY) diff --git a/tests/valkey_bloom_test_case.py b/tests/valkey_bloom_test_case.py index 9e07830..7e3129d 100644 --- a/tests/valkey_bloom_test_case.py +++ b/tests/valkey_bloom_test_case.py @@ -140,7 +140,7 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p ) self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) - def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters): + def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters, expected_num_items, expected_sum_capacity): """ Verify the metric values are recorded properly, the expected values are as below expected_memory: the size of the memory used by the objects @@ -152,6 +152,8 @@ def verify_bloom_metrics(self, info_response, expected_memory, expected_num_obje total_memory_bites = -1 num_objects = -1 num_filters = -1 + num_items = -1 + sum_capacity = -1 for line in lines: if line.startswith('bf_bloom_total_memory_bytes:'): total_memory_bites = int(line.split(':')[1]) @@ -159,7 +161,13 @@ def verify_bloom_metrics(self, info_response, expected_memory, expected_num_obje num_objects = int(line.split(':')[1]) elif line.startswith('bf_bloom_num_filters_across_objects'): num_filters = int(line.split(':')[1]) + elif line.startswith('bf_bloom_num_items_across_objects'): + num_items = int(line.split(':')[1]) + elif line.startswith('bf_bloom_capacity_across_objects'): + sum_capacity = int(line.split(':')[1]) assert total_memory_bites == expected_memory assert num_objects == expected_num_objects - assert num_filters == expected_num_filters \ No newline at end of file + assert num_filters == expected_num_filters + assert num_items == expected_num_items + assert sum_capacity == expected_sum_capacity