-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit and monitor warmup memory usage #5568
base: main
Are you sure you want to change the base?
Changes from all commits
76edd6d
18489fc
96b39c2
21ea04f
ba3759f
23a7aef
08ddc9f
37c07bc
2f90a0c
0946bc5
244b35d
c3de281
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -226,6 +226,8 @@ pub struct SearcherConfig { | |
#[serde(default)] | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub storage_timeout_policy: Option<StorageTimeoutPolicy>, | ||
pub warmup_memory_budget: ByteSize, | ||
pub warmup_single_split_initial_allocation: ByteSize, | ||
} | ||
|
||
/// Configuration controlling how fast a searcher should timeout a `get_slice` | ||
|
@@ -263,7 +265,7 @@ impl StorageTimeoutPolicy { | |
|
||
impl Default for SearcherConfig { | ||
fn default() -> Self { | ||
Self { | ||
SearcherConfig { | ||
fast_field_cache_capacity: ByteSize::gb(1), | ||
split_footer_cache_capacity: ByteSize::mb(500), | ||
partial_request_cache_capacity: ByteSize::mb(64), | ||
|
@@ -274,6 +276,8 @@ impl Default for SearcherConfig { | |
split_cache: None, | ||
request_timeout_secs: Self::default_request_timeout_secs(), | ||
storage_timeout_policy: None, | ||
warmup_memory_budget: ByteSize::gb(1), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want larger defaults here. |
||
warmup_single_split_initial_allocation: ByteSize::mb(50), | ||
} | ||
} | ||
} | ||
|
@@ -308,6 +312,14 @@ impl SearcherConfig { | |
split_cache_limits.max_file_descriptors | ||
); | ||
} | ||
if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { | ||
anyhow::bail!( | ||
"warmup_single_split_initial_allocation ({}) must be lower or equal to \ | ||
warmup_memory_budget ({})", | ||
self.warmup_single_split_initial_allocation, | ||
self.warmup_memory_budget | ||
); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,40 +30,44 @@ use tantivy::{Directory, HasLen}; | |
|
||
/// The caching directory is a simple cache that wraps another directory. | ||
#[derive(Clone)] | ||
pub struct CachingDirectory { | ||
pub struct CachingDirectory<C> { | ||
underlying: Arc<dyn Directory>, | ||
// TODO fixme: that's a pretty ugly cache we have here. | ||
cache: Arc<ByteRangeCache>, | ||
cache: C, | ||
} | ||
|
||
impl CachingDirectory { | ||
/// Creates a new CachingDirectory. | ||
impl CachingDirectory<Arc<ByteRangeCache>> { | ||
/// Creates a new CachingDirectory with a default cache. | ||
/// | ||
/// Warming: The resulting CacheDirectory will cache all information without ever | ||
/// removing any item from the cache. | ||
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory { | ||
CachingDirectory { | ||
underlying, | ||
cache: Arc::new(ByteRangeCache::with_infinite_capacity( | ||
&quickwit_storage::STORAGE_METRICS.shortlived_cache, | ||
)), | ||
} | ||
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory<Arc<ByteRangeCache>> { | ||
let byte_range_cache = ByteRangeCache::with_infinite_capacity( | ||
&quickwit_storage::STORAGE_METRICS.shortlived_cache, | ||
); | ||
CachingDirectory::new(underlying, Arc::new(byte_range_cache)) | ||
} | ||
} | ||
|
||
impl<C: DirectoryCache> CachingDirectory<C> { | ||
/// Creates a new CachingDirectory with an existing cache. | ||
pub fn new(underlying: Arc<dyn Directory>, cache: C) -> CachingDirectory<C> { | ||
CachingDirectory { underlying, cache } | ||
} | ||
} | ||
|
||
impl fmt::Debug for CachingDirectory { | ||
impl<T> fmt::Debug for CachingDirectory<T> { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
write!(f, "CachingDirectory({:?})", self.underlying) | ||
} | ||
} | ||
|
||
struct CachingFileHandle { | ||
struct CachingFileHandle<C> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the extra complexity for? |
||
path: PathBuf, | ||
cache: Arc<ByteRangeCache>, | ||
cache: C, | ||
underlying_filehandle: Arc<dyn FileHandle>, | ||
} | ||
|
||
impl fmt::Debug for CachingFileHandle { | ||
impl<T> fmt::Debug for CachingFileHandle<T> { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
write!( | ||
f, | ||
|
@@ -75,7 +79,7 @@ impl fmt::Debug for CachingFileHandle { | |
} | ||
|
||
#[async_trait] | ||
impl FileHandle for CachingFileHandle { | ||
impl<C: DirectoryCache> FileHandle for CachingFileHandle<C> { | ||
fn read_bytes(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> { | ||
if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { | ||
return Ok(bytes); | ||
|
@@ -100,13 +104,13 @@ impl FileHandle for CachingFileHandle { | |
} | ||
} | ||
|
||
impl HasLen for CachingFileHandle { | ||
impl<C> HasLen for CachingFileHandle<C> { | ||
fn len(&self) -> usize { | ||
self.underlying_filehandle.len() | ||
} | ||
} | ||
|
||
impl Directory for CachingDirectory { | ||
impl<C: DirectoryCache> Directory for CachingDirectory<C> { | ||
fn exists(&self, path: &Path) -> std::result::Result<bool, OpenReadError> { | ||
self.underlying.exists(path) | ||
} | ||
|
@@ -136,6 +140,25 @@ impl Directory for CachingDirectory { | |
crate::read_only_directory!(); | ||
} | ||
|
||
/// A byte range cache that to be used in front of the directory. | ||
pub trait DirectoryCache: Clone + Send + Sync + 'static { | ||
/// If available, returns the cached view of the slice. | ||
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes>; | ||
|
||
/// Put the given amount of data in the cache. | ||
fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes); | ||
} | ||
|
||
impl DirectoryCache for Arc<ByteRangeCache> { | ||
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> { | ||
ByteRangeCache::get_slice(self, path, byte_range) | ||
} | ||
|
||
fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes) { | ||
ByteRangeCache::put_slice(self, path, byte_range, bytes) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
|
||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define a default value for this?
And reuse the default value function in SearcherConfig::default for consistency