diff --git a/sstable/colblk_writer.go b/sstable/colblk_writer.go index eae0e92c8c..afcbe9a68d 100644 --- a/sstable/colblk_writer.go +++ b/sstable/colblk_writer.go @@ -1090,7 +1090,7 @@ func shouldFlushWithoutLatestKV( func (w *RawColumnWriter) copyDataBlocks( ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle, ) error { - buf := make([]byte, 0, 256<<10) + const readSizeTarget = 256 << 10 readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error { if firstBlockIdx > lastBlockIdx { panic("pebble: readAndFlushBlocks called with invalid block range") @@ -1103,9 +1103,9 @@ func (w *RawColumnWriter) copyDataBlocks( // blocks in one request. lastBH := blocks[lastBlockIdx].bh blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset - if blocksToReadLen > uint64(cap(buf)) { - buf = make([]byte, 0, blocksToReadLen) - } + // We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes + // a pointer to the buffer to the write queue. + buf := make([]byte, 0, blocksToReadLen) if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil { return err } @@ -1120,8 +1120,8 @@ func (w *RawColumnWriter) copyDataBlocks( } return nil } - // Iterate through blocks until we have enough to fill cap(buf). When we have more than - // one block in blocksToRead and adding the next block would exceed the buffer capacity, + // Iterate through blocks until we have enough to fill readSizeTarget. When we have more than + // one block in blocksToRead and adding the next block would exceed the target buffer capacity, // we read and flush existing blocks in blocksToRead. This allows us to read as many // blocks in one IO request as possible, while still utilizing the write queue in this // writer. @@ -1133,7 +1133,7 @@ func (w *RawColumnWriter) copyDataBlocks( start := i // Note the i++ in the initializing condition; this means we will always flush at least // one block. - for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ { + for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ { } // i points to one index past the last block we want to read. if err := readAndFlushBlocks(start, i-1); err != nil { diff --git a/sstable/copier_test.go b/sstable/copier_test.go index 31759a513d..f5b7ce3724 100644 --- a/sstable/copier_test.go +++ b/sstable/copier_test.go @@ -23,7 +23,10 @@ import ( func TestCopySpan(t *testing.T) { fs := vfs.NewMem() - blockCache := cache.New(1 << 20 /* 1 MB */) + blockCache := cache.New(2 << 20 /* 1 MB */) + cacheID := cache.ID(1) + fileNameToNum := make(map[string]base.FileNum) + nextFileNum := base.FileNum(1) defer blockCache.Unref() keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16) @@ -35,6 +38,8 @@ func TestCopySpan(t *testing.T) { if err != nil { return err.Error() } + fileNameToNum[d.CmdArgs[0].Key] = nextFileNum + nextFileNum++ tableFormat := TableFormatMax blockSize := 1 var indexBlockSize int @@ -92,15 +97,28 @@ func TestCopySpan(t *testing.T) { if err != nil { return err.Error() } - r, err := NewReader(context.TODO(), readable, ReaderOptions{ + var start, end []byte + for _, arg := range d.CmdArgs[1:] { + switch arg.Key { + case "start": + start = []byte(arg.FirstVal(t)) + case "end": + end = []byte(arg.FirstVal(t)) + } + } + rOpts := ReaderOptions{ Comparer: testkeys.Comparer, KeySchemas: KeySchemas{keySchema.Name: &keySchema}, - }) + } + rOpts.internal.CacheOpts.Cache = blockCache + rOpts.internal.CacheOpts.CacheID = cacheID + rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key]) + r, err := NewReader(context.TODO(), readable, rOpts) defer r.Close() if err != nil { return err.Error() } - iter, err := r.NewIter(block.NoTransforms, nil, nil) + iter, err := r.NewIter(block.NoTransforms, start, end) if err != nil { return err.Error() } @@ -126,6 +144,8 @@ func TestCopySpan(t *testing.T) { return err.Error() } writable := objstorageprovider.NewFileWritable(output) + fileNameToNum[outputFile] = nextFileNum + nextFileNum++ f, err := fs.Open(inputFile) if err != nil { @@ -140,6 +160,8 @@ func TestCopySpan(t *testing.T) { KeySchemas: KeySchemas{keySchema.Name: &keySchema}, } rOpts.internal.CacheOpts.Cache = blockCache + rOpts.internal.CacheOpts.CacheID = cacheID + rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[inputFile]) r, err := NewReader(context.TODO(), readable, rOpts) if err != nil { return err.Error() diff --git a/sstable/testdata/copy_span b/sstable/testdata/copy_span index 099a9b06a9..16da2f59f4 100644 --- a/sstable/testdata/copy_span +++ b/sstable/testdata/copy_span @@ -92,10 +92,8 @@ c.SET.4:baz d.SET.5:foobar ---- -iter test3 +iter test3 start=c ---- -a#0,SET: foo -b#0,SET: bar c#0,SET: baz d#0,SET: foobar @@ -134,18 +132,10 @@ i.SET.5:foo j.SET.5:foo ---- -iter test32 +iter test32 start=c end=e ---- -a#0,SET: foo -b#0,SET: bar c#0,SET: baz d#0,SET: foobar -e#0,SET: foo -f#0,SET: foo -g#0,SET: foo -h#0,SET: foo -i#0,SET: foo -j#0,SET: foo copy-span test32 test33 b.SET.10 cc.SET.0 ----