Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

valblk: move reader, fetcher #4175

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sstable/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sstable
import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// These constants are part of the file format, and should not be changed.
Expand All @@ -28,3 +29,11 @@ type InternalKey = base.InternalKey

// Span exports the keyspan.Span type.
type Span = keyspan.Span

const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3

// Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)

// Assert blockHandleLikelyMaxLen >= valblk.HandleMaxLen.
const _ = uint(blockHandleLikelyMaxLen - valblk.HandleMaxLen)
32 changes: 1 addition & 31 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
if err != nil {
return Layout{}, errors.Wrap(err, "decompressing value index")
}
layout.ValueBlock, err = decodeValueBlockIndex(vbiBlock, vbih)
layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih)
if err != nil {
return Layout{}, err
}
Expand Down Expand Up @@ -617,36 +617,6 @@ func decodeMetaindex(
return meta, vbih, nil
}

func decodeValueBlockIndex(data []byte, vbih valblk.IndexHandle) ([]block.Handle, error) {
var valueBlocks []block.Handle
indexEntryLen := int(vbih.BlockNumByteLength + vbih.BlockOffsetByteLength +
vbih.BlockLengthByteLength)
i := 0
for len(data) != 0 {
if len(data) < indexEntryLen {
return nil, errors.Errorf(
"remaining value index block %d does not contain a full entry of length %d",
len(data), indexEntryLen)
}
n := int(vbih.BlockNumByteLength)
bn := int(littleEndianGet(data, n))
if bn != i {
return nil, errors.Errorf("unexpected block num %d, expected %d",
bn, i)
}
i++
data = data[n:]
n = int(vbih.BlockOffsetByteLength)
blockOffset := littleEndianGet(data, n)
data = data[n:]
n = int(vbih.BlockLengthByteLength)
blockLen := littleEndianGet(data, n)
data = data[n:]
valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen})
}
return valueBlocks, nil
}

// layoutWriter writes the structure of an sstable to durable storage. It
// accepts serialized blocks, writes them to storage and returns a block handle
// describing the offset and length of the block.
Expand Down
15 changes: 15 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,21 @@ type WriterOptions struct {
disableObsoleteCollector bool
}

// UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
// If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
// must be < 0.
type UserKeyPrefixBound struct {
// Lower is a lower bound user key prefix.
Lower []byte
// Upper is an upper bound user key prefix.
Upper []byte
}

// IsEmpty returns true iff the bound is empty.
func (ukb *UserKeyPrefixBound) IsEmpty() bool {
return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
}

// JemallocSizeClasses are a subset of available size classes in jemalloc[1],
// suitable for the AllocatorSizeClasses option.
//
Expand Down
43 changes: 38 additions & 5 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *Reader) NewPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error) {
return r.newPointIter(
ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
Expand Down Expand Up @@ -154,7 +154,7 @@ func (r *Reader) newPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
vState *virtualState,
) (Iterator, error) {
// NB: pebble.tableCache wraps the returned iterator with one which performs
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat
func (r *Reader) NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error) {
return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
Expand All @@ -220,7 +220,7 @@ func (r *Reader) NewCompactionIter(
func (r *Reader) newCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
vState *virtualState,
bufferPool *block.BufferPool,
) (Iterator, error) {
Expand Down Expand Up @@ -448,6 +448,14 @@ func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte)
return nil
}

// ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a
// base.LazyValue to read a value block.
func (r *Reader) ReadValueBlockExternal(
ctx context.Context, bh block.Handle,
) (block.BufferHandle, error) {
return r.readValueBlock(ctx, noEnv, noReadHandle, bh)
}

func (r *Reader) readValueBlock(
ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
) (block.BufferHandle, error) {
Expand Down Expand Up @@ -749,7 +757,7 @@ func (r *Reader) Layout() (*Layout, error) {
return nil, err
}
defer vbiH.Release()
l.ValueBlock, err = decodeValueBlockIndex(vbiH.BlockData(), r.valueBIH)
l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1160,3 +1168,28 @@ func (w deterministicStopwatchForTesting) stop() time.Duration {
}
return dur
}

// MakeTrivialReaderProvider creates a valblk.ReaderProvider which always
// returns the given reader. It should be used when the Reader will outlive the
// iterator tree.
func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider {
return (*trivialReaderProvider)(r)
}

// trivialReaderProvider implements valblk.ReaderProvider for a Reader that will
// outlive the top-level iterator in the iterator tree.
//
// Defining the type in this manner (as opposed to a struct) avoids allocation.
type trivialReaderProvider Reader

var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil)

// GetReader implements ReaderProvider.
func (trp *trivialReaderProvider) GetReader(
ctx context.Context,
) (valblk.ExternalBlockReader, error) {
return (*Reader)(trp), nil
}

// Close implements ReaderProvider.
func (trp *trivialReaderProvider) Close() {}
5 changes: 3 additions & 2 deletions sstable/reader_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// CommonReader abstracts functionality over a Reader or a VirtualReader. This
Expand All @@ -33,13 +34,13 @@ type CommonReader interface {
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error)

NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error)

Expand Down
29 changes: 10 additions & 19 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// singleLevelIterator iterates over an entire table of data. To seek for a given
Expand Down Expand Up @@ -64,7 +65,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
// loading. It may not actually have loaded the block, due to an error or
// because it was considered irrelevant.
dataBH block.Handle
vbReader valueBlockReader
vbReader valblk.Reader
// vbRH is the read handle for value blocks, which are in a different
// part of the sstable than data blocks.
vbRH objstorage.ReadHandle
Expand Down Expand Up @@ -211,7 +212,7 @@ func newColumnBlockSingleLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*singleLevelIteratorColumnBlocks, error) {
if r.err != nil {
Expand All @@ -228,12 +229,7 @@ func newColumnBlockSingleLevelIterator(
)
var getLazyValuer block.GetLazyValueForPrefixAndValueHandler
if r.Properties.NumValueBlocks > 0 {
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats)
getLazyValuer = &i.vbReader
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
Expand Down Expand Up @@ -266,7 +262,7 @@ func newRowBlockSingleLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*singleLevelIteratorRowBlocks, error) {
if r.err != nil {
Expand All @@ -283,12 +279,7 @@ func newRowBlockSingleLevelIterator(
)
if r.tableFormat >= TableFormatPebblev3 {
if r.Properties.NumValueBlocks > 0 {
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats)
(&i.data).SetGetLazyValuer(&i.vbReader)
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
Expand Down Expand Up @@ -561,9 +552,9 @@ func (i *singleLevelIterator[I, PI, P, PD]) loadDataBlock(dir int8) loadBlockRes
return loadBlockOK
}

// readBlockForVBR implements the blockProviderWhenOpen interface for use by
// the valueBlockReader.
func (i *singleLevelIterator[I, PI, D, PD]) readBlockForVBR(
// ReadValueBlock implements the valblk.BlockProviderWhenOpen interface for use
// by the valblk.Reader.
func (i *singleLevelIterator[I, PI, D, PD]) ReadValueBlock(
bh block.Handle, stats *base.InternalIteratorStats,
) (block.BufferHandle, error) {
env := i.readBlockEnv
Expand Down Expand Up @@ -1582,7 +1573,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error {
if i.bpfs != nil {
releaseBlockPropertiesFilterer(i.bpfs)
}
i.vbReader.close()
i.vbReader.Close()
if i.vbRH != nil {
err = firstError(err, i.vbRH.Close())
i.vbRH = nil
Expand Down
19 changes: 5 additions & 14 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

type twoLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIterator[D]] struct {
Expand Down Expand Up @@ -162,7 +163,7 @@ func newColumnBlockTwoLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*twoLevelIteratorColumnBlocks, error) {
if r.err != nil {
Expand All @@ -186,12 +187,7 @@ func newColumnBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats)
getLazyValuer = &i.secondLevel.vbReader
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
Expand Down Expand Up @@ -225,7 +221,7 @@ func newRowBlockTwoLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*twoLevelIteratorRowBlocks, error) {
if r.err != nil {
Expand All @@ -249,12 +245,7 @@ func newRowBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats)
i.secondLevel.data.SetGetLazyValuer(&i.secondLevel.vbReader)
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
Expand Down
3 changes: 2 additions & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -230,7 +231,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
return "virtualize must be called before creating compaction iters"
}

var rp ReaderProvider
var rp valblk.ReaderProvider
transforms := IterTransforms{
SyntheticPrefixAndSuffix: block.MakeSyntheticPrefixAndSuffix(nil, syntheticSuffix),
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// VirtualReader wraps Reader. Its purpose is to restrict functionality of the
Expand Down Expand Up @@ -95,7 +96,7 @@ func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader {
func (v *VirtualReader) NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error) {
return v.reader.newCompactionIter(
Expand All @@ -119,7 +120,7 @@ func (v *VirtualReader) NewPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error) {
return v.reader.newPointIter(
ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
Expand Down
Loading