Skip to content

Commit

Permalink
sstable: make CopySpan() support columnar blocks
Browse files Browse the repository at this point in the history
Previously, sstable.CopySpan() was heavily coupled to the
internals of a RawRowWriter. This change updates CopySpan and the
RawWriter interface to work in a more implementation-agnostic
way, so that the columnar writer can also be used with the downloader.

Fixes #4010.
  • Loading branch information
itsbilal authored and RaduBerinde committed Oct 12, 2024
1 parent 453b46c commit 2b3795c
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 78 deletions.
9 changes: 9 additions & 0 deletions sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ type PhysicalBlock struct {
trailer Trailer
}

// NewPhysicalBlock returns a new PhysicalBlock with the provided block
// data. The trailer is set from the last TrailerLen bytes of the
// block. The data could be compressed.
func NewPhysicalBlock(data []byte) PhysicalBlock {
trailer := Trailer(data[len(data)-TrailerLen:])
data = data[:len(data)-TrailerLen]
return PhysicalBlock{data: data, trailer: trailer}
}

// LengthWithTrailer returns the length of the data block, including the trailer.
func (b *PhysicalBlock) LengthWithTrailer() int {
return len(b.data) + TrailerLen
Expand Down
125 changes: 123 additions & 2 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sstable

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
Expand Down Expand Up @@ -149,15 +150,20 @@ func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumn
}
}

numBlockPropertyCollectors := len(o.BlockPropertyCollectors) + 1 // +1 for the obsolete collector
numBlockPropertyCollectors := len(o.BlockPropertyCollectors)
if !o.disableObsoleteCollector {
numBlockPropertyCollectors++
}
if numBlockPropertyCollectors > maxPropertyCollectors {
panic(errors.New("pebble: too many block property collectors"))
}
w.blockPropCollectors = make([]BlockPropertyCollector, 0, numBlockPropertyCollectors)
for _, constructFn := range o.BlockPropertyCollectors {
w.blockPropCollectors = append(w.blockPropCollectors, constructFn())
}
w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
if !o.disableObsoleteCollector {
w.blockPropCollectors = append(w.blockPropCollectors, &w.obsoleteCollector)
}
var buf bytes.Buffer
buf.WriteString("[")
for i := range w.blockPropCollectors {
Expand Down Expand Up @@ -1090,3 +1096,118 @@ func shouldFlushWithoutLatestKV(
}
return true
}

// copyDataBlocks adds a range of blocks to the table as-is. These blocks could be
// compressed. It's specifically used by the sstable copier that can copy parts
// of an sstable to a new sstable, using CopySpan().
func (w *RawColumnWriter) copyDataBlocks(
ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
) error {
buf := make([]byte, 0, 256<<10)
readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
if firstBlockIdx > lastBlockIdx {
panic("pebble: readAndFlushBlocks called with invalid block range")
}
// We need to flush blocks[firstBlockIdx:lastBlockIdx+1] into the write queue.
// We do this by issuing one big read from the read handle into the buffer, and
// then enqueueing the writing of those blocks one-by-one.
//
// TODO(bilal): Consider refactoring the write queue to support writing multiple
// blocks in one request.
lastBH := blocks[lastBlockIdx].bh
blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
if blocksToReadLen > uint64(cap(buf)) {
buf = make([]byte, 0, blocksToReadLen)
}
if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
return err
}
for i := firstBlockIdx; i <= lastBlockIdx; i++ {
offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
blockBuf := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
cb := compressedBlockPool.Get().(*compressedBlock)
cb.physical = block.NewPhysicalBlock(blockBuf)
if err := w.enqueuePhysicalBlock(cb, blocks[i].sep); err != nil {
return err
}
}
return nil
}
// Iterate through blocks until we have enough to fill cap(buf). When we have more than
// one block in blocksToRead and adding the next block would exceed the buffer capacity,
// we read and flush existing blocks in blocksToRead. This allows us to read as many
// blocks in one IO request as possible, while still utilizing the write queue in this
// writer.
lastBlockOffset := uint64(0)
for i := 0; i < len(blocks); i++ {
if blocks[i].bh.Offset < lastBlockOffset {
panic("pebble: copyDataBlocks called with blocks out of order")
}
start := i
// Note the i++ in the initializing condition; this means we will always flush at least
// one block.
for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
}
// i points to one index past the last block we want to read.
if err := readAndFlushBlocks(start, i-1); err != nil {
return err
}
}
return nil
}

// addDataBlock adds a raw uncompressed data block to the table as-is. It's specifically used
// by the sstable copier that can copy parts of an sstable to a new sstable,
// using CopySpan().
func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
// Serialize the data block, compress it and send it to the write queue.
cb := compressedBlockPool.Get().(*compressedBlock)
cb.blockBuf.checksummer.Type = w.opts.Checksum
cb.physical = block.CompressAndChecksum(
&cb.blockBuf.compressedBuf,
b,
w.opts.Compression,
&cb.blockBuf.checksummer,
)
if !cb.physical.IsCompressed() {
// If the block isn't compressed, cb.physical's underlying data points
// directly into a buffer owned by w.dataBlock. Clone it before passing
// it to the write queue to be asynchronously written to disk.
// TODO(jackson): Should we try to avoid this clone by tracking the
// lifetime of the DataBlockWriters?
cb.physical = cb.physical.Clone()
}
if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
return err
}
return nil
}

// copyFilter copies the specified filter to the table. It's specifically used
// by the sstable copier that can copy parts of an sstable to a new sstable,
// using CopySpan().
func (w *RawColumnWriter) copyFilter(filter []byte, filterName string) error {
if w.filterBlock != nil && filterName != w.filterBlock.policyName() {
return errors.New("mismatched filters")
}
w.filterBlock = copyFilterWriter{
origPolicyName: w.filterBlock.policyName(), origMetaName: w.filterBlock.metaName(), data: filter,
}
return nil
}

// copyProperties copies properties from the specified props, and resets others
// to prepare for copying data blocks from another sstable, using the copy/addDataBlock(s)
// methods above. It's specifically used by the sstable copier that can copy parts of an
// sstable to a new sstable, using CopySpan().
func (w *RawColumnWriter) copyProperties(props Properties) {
w.props = props
// Remove all user properties to disable block properties, which we do not
// calculate for CopySpan.
w.props.UserProperties = nil
// Reset props that we'll re-derive as we build our own index.
w.props.IndexPartitions = 0
w.props.TopLevelIndexSize = 0
w.props.IndexSize = 0
w.props.IndexType = 0
}
102 changes: 29 additions & 73 deletions sstable/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/rowblk"
)

// CopySpan produces a copy of a approximate subset of an input sstable.
Expand Down Expand Up @@ -61,22 +60,19 @@ func CopySpan(
o.FilterPolicy = nil
}
o.TableFormat = r.tableFormat
w := newRowWriter(output, o)

// We don't want the writer to attempt to write out block property data in
// index blocks. This data won't be valid since we're not passing the actual
// key data through the writer. We also remove the table-level properties
// below.
//
// TODO(dt,radu): Figure out how to populate the prop collector state with
// block props from the original sst.
w.blockPropCollectors = nil
o.BlockPropertyCollectors = nil
o.disableObsoleteCollector = true
w := NewRawWriter(output, o)

defer func() {
if w != nil {
// set w.err to any non-nil error just so it aborts instead of finishing.
w.err = base.ErrNotFound
// w.Close now owns calling output.Abort().
w.Close()
}
}()
Expand All @@ -103,33 +99,20 @@ func CopySpan(
// positives for keys in blocks of the original file that we don't copy, but
// filters can always have false positives, so this is fine.
if r.tableFilter != nil {
if w.filter != nil && r.Properties.FilterPolicyName != w.filter.policyName() {
return 0, errors.New("mismatched filters")
}
filterBlock, err := r.readFilterBlock(ctx, noEnv, rh)
if err != nil {
return 0, errors.Wrap(err, "reading filter")
}
filterBytes := append([]byte{}, filterBlock.Get()...)
filterBlock.Release()
w.filter = copyFilterWriter{
origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
}
w.copyFilter(filterBytes, r.Properties.FilterPolicyName)
}

// Copy all the props from the source file; we can't compute our own for many
// that depend on seeing every key, such as total count or size so we copy the
// original props instead. This will result in over-counts but that is safer
// than under-counts.
w.props = r.Properties
// Remove all user properties to disable block properties, which we do not
// calculate.
w.props.UserProperties = nil
// Reset props that we'll re-derive as we build our own index.
w.props.IndexPartitions = 0
w.props.TopLevelIndexSize = 0
w.props.IndexSize = 0
w.props.IndexType = 0
w.copyProperties(r.Properties)

// Find the blocks that intersect our span.
blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
Expand All @@ -147,30 +130,6 @@ func CopySpan(
// Copy all blocks byte-for-byte without doing any per-key processing.
var blocksNotInCache []indexEntry

copyBlocksToFile := func(blocks []indexEntry) error {
blockOffset := blocks[0].bh.Offset
// The block lengths don't include their trailers, which just sit after the
// block length, before the next offset; We get the ones between the blocks
// we copy implicitly but need to explicitly add the last trailer to length.
length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
if spanEnd := length + blockOffset; spanEnd < blockOffset {
return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
}
if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
return err
}
// Update w.meta.Size so subsequently flushed metadata has correct offsets.
w.meta.Size += length
for i := range blocks {
blocks[i].bh.Offset = w.layout.offset
// blocks[i].bh.Length remains unmodified.
if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
return err
}
w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
}
return nil
}
for i := range blocks {
h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
if h.Get() == nil {
Expand All @@ -184,30 +143,24 @@ func CopySpan(
if len(blocksNotInCache) > 0 {
// We have some blocks that were not in cache preceding this block.
// Copy them using objstorage.Copy.
if err := copyBlocksToFile(blocksNotInCache); err != nil {
if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
h.Release()
return 0, err
}
blocksNotInCache = nil
}

// layout.WriteDataBlock keeps layout.offset up-to-date for us.
bh, err := w.layout.WriteDataBlock(h.Get(), &w.dataBlockBuf.blockBuf)
err := w.addDataBlock(h.Get(), blocks[i].sep, blocks[i].bh)
h.Release()
if err != nil {
return 0, err
}
blocks[i].bh.Handle = bh
if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
return 0, err
}
w.meta.Size += uint64(bh.Length) + block.TrailerLen
}

if len(blocksNotInCache) > 0 {
// We have some remaining blocks that were not in cache. Copy them
// using objstorage.Copy.
if err := copyBlocksToFile(blocksNotInCache); err != nil {
if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
return 0, err
}
blocksNotInCache = nil
Expand All @@ -220,7 +173,11 @@ func CopySpan(
w = nil
return 0, err
}
wrote := w.meta.Size
meta, err := w.Metadata()
if err != nil {
return 0, err
}
wrote := meta.Size
w = nil
return wrote, nil
}
Expand All @@ -237,7 +194,7 @@ var ErrEmptySpan = errors.New("cannot copy empty span")
// indexEntry captures the two components of an sst index entry: the key and the
// decoded block handle value.
type indexEntry struct {
sep InternalKey
sep []byte
bh block.HandleWithProperties
}

Expand All @@ -251,23 +208,24 @@ func intersectingIndexEntries(
indexH block.BufferHandle,
start, end InternalKey,
) ([]indexEntry, error) {
top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
top := r.tableFormat.newIndexIter()
err := top.Init(r.Compare, r.Split, indexH.Get(), NoTransforms)
if err != nil {
return nil, err
}
defer top.Close()

var alloc bytealloc.A
res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
for valid := top.SeekGE(start.UserKey); valid; valid = top.Next() {
bh, err := top.BlockHandleWithProperties()
if err != nil {
return nil, err
}
if r.Properties.IndexType != twoLevelIndex {
entry := indexEntry{bh: bh, sep: kv.K}
entry := indexEntry{bh: bh, sep: top.Separator()}
alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
alloc, entry.sep = alloc.Copy(entry.sep)
res = append(res, entry)
} else {
subBlk, err := r.readIndexBlock(ctx, noEnv, rh, bh.Handle)
Expand All @@ -276,34 +234,32 @@ func intersectingIndexEntries(
}
defer subBlk.Release() // in-loop, but it is a short loop.

sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
sub := r.tableFormat.newIndexIter()
err = sub.Init(r.Compare, r.Split, subBlk.Get(), NoTransforms)
if err != nil {
return nil, err
}
defer sub.Close() // in-loop, but it is a short loop.

for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
for valid := sub.SeekGE(start.UserKey); valid; valid = sub.Next() {
bh, err := sub.BlockHandleWithProperties()
if err != nil {
return nil, err
}
entry := indexEntry{bh: bh, sep: kv.K}
entry := indexEntry{bh: bh, sep: top.Separator()}
alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
alloc, entry.sep = alloc.Copy(entry.sep)
res = append(res, entry)
if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
if r.Compare(end.UserKey, entry.sep) <= 0 {
break
}
}
if err := sub.Error(); err != nil {
return nil, err
}
}
if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
if r.Compare(end.UserKey, top.Separator()) <= 0 {
break
}
}
return res, top.Error()
return res, nil
}

// copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
Expand Down
Loading

0 comments on commit 2b3795c

Please sign in to comment.