Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crl-release-24.3: sstable: fix improper buffer reuse in copyDataBlocks #4182

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func shouldFlushWithoutLatestKV(
func (w *RawColumnWriter) copyDataBlocks(
ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
) error {
buf := make([]byte, 0, 256<<10)
const readSizeTarget = 256 << 10
readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
if firstBlockIdx > lastBlockIdx {
panic("pebble: readAndFlushBlocks called with invalid block range")
Expand All @@ -1103,9 +1103,9 @@ func (w *RawColumnWriter) copyDataBlocks(
// blocks in one request.
lastBH := blocks[lastBlockIdx].bh
blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
if blocksToReadLen > uint64(cap(buf)) {
buf = make([]byte, 0, blocksToReadLen)
}
// We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes
// a pointer to the buffer to the write queue.
buf := make([]byte, 0, blocksToReadLen)
if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
return err
}
Expand All @@ -1120,8 +1120,8 @@ func (w *RawColumnWriter) copyDataBlocks(
}
return nil
}
// Iterate through blocks until we have enough to fill cap(buf). When we have more than
// one block in blocksToRead and adding the next block would exceed the buffer capacity,
// Iterate through blocks until we have enough to fill readSizeTarget. When we have more than
// one block in blocksToRead and adding the next block would exceed the target buffer capacity,
// we read and flush existing blocks in blocksToRead. This allows us to read as many
// blocks in one IO request as possible, while still utilizing the write queue in this
// writer.
Expand All @@ -1133,7 +1133,7 @@ func (w *RawColumnWriter) copyDataBlocks(
start := i
// Note the i++ in the initializing condition; this means we will always flush at least
// one block.
for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ {
}
// i points to one index past the last block we want to read.
if err := readAndFlushBlocks(start, i-1); err != nil {
Expand Down
30 changes: 26 additions & 4 deletions sstable/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (

func TestCopySpan(t *testing.T) {
fs := vfs.NewMem()
blockCache := cache.New(1 << 20 /* 1 MB */)
blockCache := cache.New(2 << 20 /* 1 MB */)
cacheID := cache.ID(1)
fileNameToNum := make(map[string]base.FileNum)
nextFileNum := base.FileNum(1)
defer blockCache.Unref()

keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
Expand All @@ -35,6 +38,8 @@ func TestCopySpan(t *testing.T) {
if err != nil {
return err.Error()
}
fileNameToNum[d.CmdArgs[0].Key] = nextFileNum
nextFileNum++
tableFormat := TableFormatMax
blockSize := 1
var indexBlockSize int
Expand Down Expand Up @@ -92,15 +97,28 @@ func TestCopySpan(t *testing.T) {
if err != nil {
return err.Error()
}
r, err := NewReader(context.TODO(), readable, ReaderOptions{
var start, end []byte
for _, arg := range d.CmdArgs[1:] {
switch arg.Key {
case "start":
start = []byte(arg.FirstVal(t))
case "end":
end = []byte(arg.FirstVal(t))
}
}
rOpts := ReaderOptions{
Comparer: testkeys.Comparer,
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
})
}
rOpts.internal.CacheOpts.Cache = blockCache
rOpts.internal.CacheOpts.CacheID = cacheID
rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key])
r, err := NewReader(context.TODO(), readable, rOpts)
defer r.Close()
if err != nil {
return err.Error()
}
iter, err := r.NewIter(block.NoTransforms, nil, nil)
iter, err := r.NewIter(block.NoTransforms, start, end)
if err != nil {
return err.Error()
}
Expand All @@ -126,6 +144,8 @@ func TestCopySpan(t *testing.T) {
return err.Error()
}
writable := objstorageprovider.NewFileWritable(output)
fileNameToNum[outputFile] = nextFileNum
nextFileNum++

f, err := fs.Open(inputFile)
if err != nil {
Expand All @@ -140,6 +160,8 @@ func TestCopySpan(t *testing.T) {
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
}
rOpts.internal.CacheOpts.Cache = blockCache
rOpts.internal.CacheOpts.CacheID = cacheID
rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[inputFile])
r, err := NewReader(context.TODO(), readable, rOpts)
if err != nil {
return err.Error()
Expand Down
14 changes: 2 additions & 12 deletions sstable/testdata/copy_span
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ c.SET.4:baz
d.SET.5:foobar
----

iter test3
iter test3 start=c
----
a#0,SET: foo
b#0,SET: bar
c#0,SET: baz
d#0,SET: foobar

Expand Down Expand Up @@ -134,18 +132,10 @@ i.SET.5:foo
j.SET.5:foo
----

iter test32
iter test32 start=c end=e
----
a#0,SET: foo
b#0,SET: bar
c#0,SET: baz
d#0,SET: foobar
e#0,SET: foo
f#0,SET: foo
g#0,SET: foo
h#0,SET: foo
i#0,SET: foo
j#0,SET: foo

copy-span test32 test33 b.SET.10 cc.SET.0
----
Expand Down