From 23f4b7fa73852610ea13fbad9e2d45edb3e60453 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 8 Nov 2024 20:57:20 -0500 Subject: [PATCH] cache,db: de-dup concurrent attempts to read the same block Concurrent reads of the same block have been observed to cause very high memory usage, and are wasteful of disk bandwidth. We now coordinate across multiple concurrent attempts to read the same block via a readEntry, which makes the readers take turns until one succeeds. The readEntries are embedded in a map that is part of a readShard, where there is a readShard for each cache.Shard. See the long comment in the readShard declaration for motivation. The Options.LoadBlockSema is integrated into the readEntry, to simplify the waiting logic in the caller. Callers interact with this new behavior via Cache.GetWithReadHandle, which is only for callers that intend to do a read and then populate the cache. If this method returns a ReadHandle, the caller must first wait for permission to do a read. See the ReadHandle comment for details of the contract. Fixes #4138 --- internal/cache/clockpro.go | 67 ++++- internal/cache/read_shard.go | 447 +++++++++++++++++++++++++++++ internal/cache/refcnt_normal.go | 13 + internal/cache/refcnt_tracing.go | 4 + sstable/block/buffer_pool.go | 15 +- sstable/colblk/index_block_test.go | 2 +- sstable/colblk/keyspan_test.go | 2 +- sstable/reader.go | 62 +++- 8 files changed, 591 insertions(+), 21 deletions(-) create mode 100644 internal/cache/read_shard.go diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..c7cc870427 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -26,6 +26,7 @@ import ( "sync" "sync/atomic" + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" ) @@ -114,9 +115,22 @@ type shard struct { countHot int64 countCold int64 countTest int64 -} -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard +} + +// GetWithMaybeReadHandle is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. loadBlockSema can be nil, and a non-nil +// value is only relevant when desireReadHandle is true. +func (c *shard) GetWithMaybeReadHandle( + id ID, + fileNum base.DiskFileNum, + offset uint64, + desireReadHandle bool, + loadBlockSema *fifo.Semaphore, +) (Handle, ReadHandle) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +140,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var rh ReadHandle + if value == nil && desireReadHandle { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + rh = c.readShard.getReadHandleLocked(id, fileNum, offset, loadBlockSema) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, rh } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +202,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -746,6 +783,7 @@ func newShards(size int64, shards int) *Cache { } c.shards[i].blocks.Init(16) c.shards[i].files.Init(16) + c.shards[i].readShard.Init(&c.shards[i]) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +860,24 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, rh := c.getShard(id, fileNum, offset).GetWithMaybeReadHandle( + id, fileNum, offset, false, nil) + if invariants.Enabled && rh.Valid() { + panic("ReadHandle should not be valid") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned, else a valid ReadHandle +// is returned. See the ReadHandle declaration for the contract must satisfy +// when getting a valid ReadHandle. The loadBlockSema if non-nil, is used by +// the ReadHandle -- see Options.LoadBlockSema for details. +func (c *Cache) GetWithReadHandle( + id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) (Handle, ReadHandle) { + return c.getShard(id, fileNum, offset).GetWithMaybeReadHandle( + id, fileNum, offset, true, loadBlockSema) } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..fc9d7528d6 --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,447 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/fifo" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage and read bandwidth without this turn-taking. +// +// It also introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore. +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + shard *shard + // Protected by shard.mu. + // + // shard.mu must never be held when acquiring readEntry.mu since + // readEntry.mu can be held while waiting on readEntry.loadBlockSema. + // shard.mu is a shared resource and must be released quickly (also it can + // cause subtle deadlocks). + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadHandleLocked gets a ReadHandle for (id, fileNum, offset). shard.mu +// is already write locked. All callers with the same ID represent the same +// DB, and should pass the same loadBlockSema. NB: we cannot place the +// loadBlockSema in the readShard since the readShard can be shared across the +// DBs, just like the block cache. +func (rs *readShard) getReadHandleLocked( + id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) ReadHandle { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset, loadBlockSema) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return ReadHandle{entry: e} +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + loadBlockSema *fifo.Semaphore + mu struct { + // Mutex is held while waiting on loadBlockSema, and naturally protects + // everything in this struct. + sync.RWMutex + // If readShard.loadBlockSema != nil, this represents whether the + // semaphore is held of not. It will transition at most once from false to + // true and back to false. While there are waiting readers and the + // reader(s) assigned the turn are failing (SetReadError), this will + // continue to be held. + loadBlockSemaHeld bool + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry( + rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + loadBlockSema: loadBlockSema, + } + e.refCount.init(1) + return e +} + +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. + if e.loadBlockSema != nil && !e.mu.loadBlockSemaHeld { + waitStart := time.Now() + err := e.loadBlockSema.Acquire(ctx, 1) + if err != nil { + e.mu.errorDuration += time.Since(waitStart) + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, err + } + e.mu.loadBlockSemaHeld = true + } + // Wait for turn to do the read or for someone else to do the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if ok { + // Granted permission to do the read. + e.mu.Lock() + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } else { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.loadBlockSemaHeld { + s.e.loadBlockSema.Release(1) + s.e.mu.loadBlockSemaHeld = false + } + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + if e.mu.loadBlockSemaHeld { + // Release the loadBlockSema now since it is a shared resource, and we + // don't want to wait until refCount drops to 0. + e.loadBlockSema.Release(1) + e.mu.loadBlockSemaHeld = false + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true. +// +// Contract: +// +// The caller must call WaitForPermissionOrHandle. If method returns with an +// error, or a valid Handle, the caller is done. In the latter case, someone +// else did the read on behalf of the caller. If neither, the caller has been +// granted a turn to do the read. It must immediately attempt to do a read +// (other readers are potentially waiting for it), and then call SetReadValue +// or SetReadError depending on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// WaitForReadPermissionOrHandle is called on a valid ReadHandle and returns +// either an already read value (in Handle), an error (if the context was +// cancelled), or neither, which is a directive to the caller to do the read. +// In this last case the caller must call either SetReadValue or SetReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (SetReadError) spent in doing the read. This +// duration can be greater than the time spend in WaitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (rh ReadHandle) WaitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + return rh.entry.waitForReadPermissionOrHandle(ctx) +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..94beb232f0 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the map. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..a0329c69a7 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -51,6 +51,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..80bf550cd0 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,22 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheAndReleaseForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) { + h := c.Set(cacheID, fileNum, offset, b.v) + h.Release() } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index c7e7d6a3dc..970535455e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func(it *IndexIter) { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..a306e7f192 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 093b31865c..851ec5d7c5 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -498,28 +498,67 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { + var ch cache.Handle + var crh cache.ReadHandle + if env.BufferPool == nil { + ch, crh = r.cacheOpts.Cache.GetWithReadHandle( + r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset, r.loadBlockSema) + } else { + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + } + if ch.Valid() { // Cache hit. if readHandle != nil { readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) } env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } // Cache miss. - - if sema := r.loadBlockSema; sema != nil { - if err := sema.Acquire(ctx, 1); err != nil { - // An error here can only come from the context. + if crh.Valid() { + var err error + var errorDuration time.Duration + ch, errorDuration, err = crh.WaitForReadPermissionOrHandle(ctx) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + if err != nil { return block.BufferHandle{}, err } - defer sema.Release(1) + if ch.Valid() { + return block.CacheBufferHandle(ch), nil + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + if sema := r.loadBlockSema; sema != nil { + if err := sema.Acquire(ctx, 1); err != nil { + // An error here can only come from the context. + return block.BufferHandle{}, err + } + defer sema.Release(1) + } } + // INVARIANT: !ch.Valid(). compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error + defer func() { + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + } + }() if readHandle != nil { err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset)) } else { @@ -542,7 +581,7 @@ func (r *Reader) readBlockInternal( return block.BufferHandle{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() return block.BufferHandle{}, err } @@ -555,7 +594,8 @@ func (r *Reader) readBlockInternal( decompressed = compressed } else { // Decode the length of the decompressed value. - decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) + var decodedLen, prefixLen int + decodedLen, prefixLen, err = block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() return block.BufferHandle{}, err @@ -569,11 +609,11 @@ func (r *Reader) readBlockInternal( return block.BufferHandle{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() return block.BufferHandle{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + h := decompressed.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) return h, nil }