From b06c0067fd7c9992c5ea937c5fab21f9168a7c42 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 25 Nov 2024 15:43:07 -0500 Subject: [PATCH] sstable: fix improper buffer reuse in copyDataBlocks Previously, in the colblk implementation of copyDataBlocks, we were reusing a buffer that could under some cases be passed directly to the write queue and would get written to sstable while later blocks are being read into the same buffer. This change also improves tests around CopySpan() to better test cache hit/miss cases. Fixes https://github.com/cockroachdb/cockroach/issues/131332. --- sstable/colblk_writer.go | 14 +++++++------- sstable/copier_test.go | 30 ++++++++++++++++++++++++++---- sstable/testdata/copy_span | 14 ++------------ 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/sstable/colblk_writer.go b/sstable/colblk_writer.go index eae0e92c8c..afcbe9a68d 100644 --- a/sstable/colblk_writer.go +++ b/sstable/colblk_writer.go @@ -1090,7 +1090,7 @@ func shouldFlushWithoutLatestKV( func (w *RawColumnWriter) copyDataBlocks( ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle, ) error { - buf := make([]byte, 0, 256<<10) + const readSizeTarget = 256 << 10 readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error { if firstBlockIdx > lastBlockIdx { panic("pebble: readAndFlushBlocks called with invalid block range") @@ -1103,9 +1103,9 @@ func (w *RawColumnWriter) copyDataBlocks( // blocks in one request. lastBH := blocks[lastBlockIdx].bh blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset - if blocksToReadLen > uint64(cap(buf)) { - buf = make([]byte, 0, blocksToReadLen) - } + // We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes + // a pointer to the buffer to the write queue. + buf := make([]byte, 0, blocksToReadLen) if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil { return err } @@ -1120,8 +1120,8 @@ func (w *RawColumnWriter) copyDataBlocks( } return nil } - // Iterate through blocks until we have enough to fill cap(buf). When we have more than - // one block in blocksToRead and adding the next block would exceed the buffer capacity, + // Iterate through blocks until we have enough to fill readSizeTarget. When we have more than + // one block in blocksToRead and adding the next block would exceed the target buffer capacity, // we read and flush existing blocks in blocksToRead. This allows us to read as many // blocks in one IO request as possible, while still utilizing the write queue in this // writer. @@ -1133,7 +1133,7 @@ func (w *RawColumnWriter) copyDataBlocks( start := i // Note the i++ in the initializing condition; this means we will always flush at least // one block. - for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ { + for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ { } // i points to one index past the last block we want to read. if err := readAndFlushBlocks(start, i-1); err != nil { diff --git a/sstable/copier_test.go b/sstable/copier_test.go index 31759a513d..f5b7ce3724 100644 --- a/sstable/copier_test.go +++ b/sstable/copier_test.go @@ -23,7 +23,10 @@ import ( func TestCopySpan(t *testing.T) { fs := vfs.NewMem() - blockCache := cache.New(1 << 20 /* 1 MB */) + blockCache := cache.New(2 << 20 /* 1 MB */) + cacheID := cache.ID(1) + fileNameToNum := make(map[string]base.FileNum) + nextFileNum := base.FileNum(1) defer blockCache.Unref() keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16) @@ -35,6 +38,8 @@ func TestCopySpan(t *testing.T) { if err != nil { return err.Error() } + fileNameToNum[d.CmdArgs[0].Key] = nextFileNum + nextFileNum++ tableFormat := TableFormatMax blockSize := 1 var indexBlockSize int @@ -92,15 +97,28 @@ func TestCopySpan(t *testing.T) { if err != nil { return err.Error() } - r, err := NewReader(context.TODO(), readable, ReaderOptions{ + var start, end []byte + for _, arg := range d.CmdArgs[1:] { + switch arg.Key { + case "start": + start = []byte(arg.FirstVal(t)) + case "end": + end = []byte(arg.FirstVal(t)) + } + } + rOpts := ReaderOptions{ Comparer: testkeys.Comparer, KeySchemas: KeySchemas{keySchema.Name: &keySchema}, - }) + } + rOpts.internal.CacheOpts.Cache = blockCache + rOpts.internal.CacheOpts.CacheID = cacheID + rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key]) + r, err := NewReader(context.TODO(), readable, rOpts) defer r.Close() if err != nil { return err.Error() } - iter, err := r.NewIter(block.NoTransforms, nil, nil) + iter, err := r.NewIter(block.NoTransforms, start, end) if err != nil { return err.Error() } @@ -126,6 +144,8 @@ func TestCopySpan(t *testing.T) { return err.Error() } writable := objstorageprovider.NewFileWritable(output) + fileNameToNum[outputFile] = nextFileNum + nextFileNum++ f, err := fs.Open(inputFile) if err != nil { @@ -140,6 +160,8 @@ func TestCopySpan(t *testing.T) { KeySchemas: KeySchemas{keySchema.Name: &keySchema}, } rOpts.internal.CacheOpts.Cache = blockCache + rOpts.internal.CacheOpts.CacheID = cacheID + rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[inputFile]) r, err := NewReader(context.TODO(), readable, rOpts) if err != nil { return err.Error() diff --git a/sstable/testdata/copy_span b/sstable/testdata/copy_span index 099a9b06a9..16da2f59f4 100644 --- a/sstable/testdata/copy_span +++ b/sstable/testdata/copy_span @@ -92,10 +92,8 @@ c.SET.4:baz d.SET.5:foobar ---- -iter test3 +iter test3 start=c ---- -a#0,SET: foo -b#0,SET: bar c#0,SET: baz d#0,SET: foobar @@ -134,18 +132,10 @@ i.SET.5:foo j.SET.5:foo ---- -iter test32 +iter test32 start=c end=e ---- -a#0,SET: foo -b#0,SET: bar c#0,SET: baz d#0,SET: foobar -e#0,SET: foo -f#0,SET: foo -g#0,SET: foo -h#0,SET: foo -i#0,SET: foo -j#0,SET: foo copy-span test32 test33 b.SET.10 cc.SET.0 ----