diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..190e5b44295 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,12 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + + // TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation` + // TODO set serde default + pub warmup_memory_budget: ByteSize, + // TODO set serde default + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +269,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +280,10 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + // TODO change this to the method used for serde default. + warmup_memory_budget: ByteSize::gb(1), + // TODO change this to the method used for serde default. + warmup_single_split_initial_allocation: ByteSize::mb(50), } } } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 94d6bd0ccb1..78dfc706b7c 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -376,6 +376,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -434,9 +435,9 @@ async fn leaf_search_single_split( let warmup_duration: Duration = warmup_end.duration_since(warmup_start); let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); + search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes); let split_num_docs = split.num_docs; - let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -1378,7 +1379,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1393,6 +1394,7 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + &mut search_permit, ) .await; diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..07aad825eb1 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,8 +20,10 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; use tokio::sync::oneshot; +use tracing::warn; /// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. @@ -33,11 +35,14 @@ pub struct SearchPermitProvider { } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { + pub fn new(num_permits: usize, memory_budget: ByteSize, initial_allocation: ByteSize) -> SearchPermitProvider { SearchPermitProvider { inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, + memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), + memory_allocated: 0u64, + initial_allocation: initial_allocation.as_u64(), })), } } @@ -65,6 +70,14 @@ impl SearchPermitProvider { struct InnerSearchPermitProvider { num_permits_available: usize, + + // Note it is possible for memory_allocated to exceed memory_budget temporarily, + // if and only if a split leaf search task ended up using more than `initial_allocation`. + // + // When it happens, new permits will not be assigned until the memory is freed. + memory_budget: u64, + memory_allocated: u64, + initial_allocation: u64, permits_requests: VecDeque>, } @@ -94,30 +107,29 @@ impl InnerSearchPermitProvider { permits } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); - } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { + while self.num_permits_available > 0 && self.memory_allocated + self.initial_allocation <= self.memory_budget { + let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; }; let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { + let send_res = permit_requester_tx.send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, inner_arc: inner_arc.clone(), - recycle_on_drop: true, + warmup_permit_held: true, + memory_allocation: self.initial_allocation, }); match send_res { Ok(()) => { self.num_permits_available -= 1; + self.memory_allocated += self.initial_allocation; } Err(search_permit) => { + // We cannot just decrease the num_permits_available in all case and rely on + // the drop logic here: it would cause a dead lock on the inner_arc Mutex. search_permit.drop_without_recycling_permit(); } } @@ -131,23 +143,49 @@ impl InnerSearchPermitProvider { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, inner_arc: Arc>, - recycle_on_drop: bool, + warmup_permit_held: bool, + memory_allocation: u64, } impl SearchPermit { + /// After warm up, we have a proper estimate of the memory usage of a single split leaf search. + /// + /// We can then set the actual memory usage. + pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { + if new_memory_usage > self.memory_allocation { + warn!(memory_usage=new_memory_usage, memory_allocation=self.memory_allocation, "current leaf search is consuming more memory than the initial allocation"); + } + let mut inner_guard = self.inner_arc.lock().unwrap(); + let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; + inner_guard.memory_allocated += delta as u64; + inner_guard.num_permits_available += 1; + if inner_guard.memory_allocated > inner_guard.memory_budget { + warn!(memory_allocated=inner_guard.memory_allocated, memory_budget=inner_guard.memory_budget, "memory allocated exceeds memory budget"); + } + self.memory_allocation = new_memory_usage; + inner_guard.assign_available_permits(&self.inner_arc); + } + fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; + self.warmup_permit_held = false; + self.memory_allocation = 0u64; drop(self); } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { + // This is not just an optimization. This is necessary to avoid a dead lock when the + // permit requester dropped its receiver channel. + if !self.warmup_permit_held && self.memory_allocation == 0 { return; } let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); + if self.warmup_permit_held { + inner_guard.num_permits_available += 1; + } + inner_guard.memory_allocated -= self.memory_allocation; + inner_guard.assign_available_permits(&self.inner_arc); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..734344f2c74 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;