Skip to content

Commit

Permalink
Binlog: Improve ZstdInMemoryDecompressorMaxSize management (vitessio#…
Browse files Browse the repository at this point in the history
…17220)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Nov 16, 2024
1 parent 9fdfdb9 commit 216fd70
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 10 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Flags:
--backup_storage_compress if set, the backup files will be compressed. (default true)
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Flags:
--backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups.
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_grpc_ca string the server ca to use to validate servers when connecting
Expand Down
21 changes: 12 additions & 9 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
// Length of the binlog event header for internal events within
// the transaction payload.
headerLen = binlogEventLenOffset + eventLenBytes

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

var (
Expand All @@ -75,13 +70,18 @@ var (
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory all at once.
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB

// A concurrent stateless decoder that caches decompressors. This is
// used for smaller payloads that we want to handle entirely using
// in-memory buffers via DecodeAll.
statelessDecoder *zstd.Decoder

// A pool of stateful decoders for larger payloads that we want to
// stream. The number of large (> zstdInMemoryDecompressorMaxSize)
// stream. The number of large (> ZstdInMemoryDecompressorMaxSize)
// payloads should typically be relatively low, but there may be times
// where there are many of them -- and users like vstreamer may have
// N concurrent streams per tablet which could lead to a lot of
Expand Down Expand Up @@ -271,7 +271,7 @@ func (tp *TransactionPayload) decode() error {
}

// decompress decompresses the payload. If the payload is larger than
// zstdInMemoryDecompressorMaxSize then we stream the decompression via
// ZstdInMemoryDecompressorMaxSize then we stream the decompression via
// the package's pool of zstd.Decoders, otherwise we use in-memory
// buffers with the package's concurrent statelessDecoder.
// In either case, we setup the reader that can be used within the
Expand All @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
Expand Down Expand Up @@ -366,7 +366,10 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
}
} else {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
// Use the minimum amount of memory we can in processing the transaction by
// setting lowMem to true and limiting the decoder concurrency to 1 so that
// there's no async decoding of multiple windows or blocks.
d, err := zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(1))
if err != nil { // Should only happen e.g. due to ENOMEM
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
}
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
},
}

// Ensure that we can process events where the *uncompressed* size is
// larger than ZstdInMemoryDecompressorMaxSize. The *compressed* size
// of the payload in large_compressed_trx_payload.bin is 16KiB so we
// set the max to 2KiB to test this.
ZstdInMemoryDecompressorMaxSize = 2048

for _, tc := range testCases {
memDecodingCnt := compressedTrxPayloadsInMem.Get()
streamDecodingCnt := compressedTrxPayloadsUsingStream.Get()
Expand Down Expand Up @@ -191,7 +197,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode.")
}

0 comments on commit 216fd70

Please sign in to comment.