Skip to content

Commit

Permalink
dm: add TCP connection IO traffic statistics to sync stage status in …
Browse files Browse the repository at this point in the history
…OpenAPI response (#11742)

close #11741
  • Loading branch information
River2000i authored Nov 28, 2024
1 parent eb94c8a commit b909eff
Show file tree
Hide file tree
Showing 25 changed files with 1,027 additions and 648 deletions.
95 changes: 48 additions & 47 deletions cdc/processor/tablepb/table.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 32 additions & 8 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/google/uuid"
extstorage "github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/filter"
Expand Down Expand Up @@ -178,17 +179,19 @@ type SubTaskConfig struct {
ExtStorage extstorage.ExternalStorage `toml:"-" json:"-"`
MetricsFactory promutil.Factory `toml:"-" json:"-"`
FrameworkLogger *zap.Logger `toml:"-" json:"-"`
// members below are injected by dataflow engine, UUID should be unique in
// one go runtime.
// members below are injected by dataflow engine
// UUID should be unique in one go runtime.
// IOTotalBytes is used build TCPConnWithIOCounter and UUID is used to as a
// key to let MySQL driver to find the right TCPConnWithIOCounter.
UUID string `toml:"-" json:"-"`
IOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
// It will meter TCP io usage to downstream of the subtask
UUID string `toml:"uuid" json:"-"`
IOTotalBytes *atomic.Uint64 `toml:"io-total-bytes" json:"io-total-bytes"`

// meter network usage from upstream
// DumpUUID as same as UUID
// DumpIOTotalBytes meter TCP io usage from upstream of the subtask, other same as IOTotalBytes
// e.g., pulling binlog
DumpUUID string `toml:"-" json:"-"`
DumpIOTotalBytes *atomic.Uint64 `toml:"-" json:"-"`
DumpUUID string `toml:"dump-uuid" json:"-"`
DumpIOTotalBytes *atomic.Uint64 `toml:"dump-io-total-bytes" json:"dump-io-total-bytes"`
}

// SampleSubtaskConfig is the content of subtask.toml in current folder.
Expand All @@ -212,6 +215,14 @@ func (c *SubTaskConfig) SetFlagSet(flagSet *flag.FlagSet) {
c.flagSet = flagSet
}

// InitIOCounters init io counter and uuid for syncer.
func (c *SubTaskConfig) InitIOCounters() {
c.IOTotalBytes = atomic.NewUint64(0)
c.DumpIOTotalBytes = atomic.NewUint64(0)
c.UUID = uuid.NewString()
c.DumpUUID = uuid.NewString()
}

// String returns the config's json string.
func (c *SubTaskConfig) String() string {
cfg, err := json.Marshal(c)
Expand All @@ -222,6 +233,10 @@ func (c *SubTaskConfig) String() string {
}

// Toml returns TOML format representation of config.
// Note: The atomic.Uint64 fields (IOTotalBytes and DumpIOTotalBytes) are not
// encoded in the TOML output because they do not implement the necessary
// marshaling interfaces. As a result, these fields will not be included in
// the TOML representation.
func (c *SubTaskConfig) Toml() (string, error) {
var b bytes.Buffer
enc := toml.NewEncoder(&b)
Expand All @@ -242,6 +257,9 @@ func (c *SubTaskConfig) DecodeFile(fpath string, verifyDecryptPassword bool) err
}

// Decode loads config from file data.
// Note: The atomic.Uint64 fields (IOTotalBytes and DumpIOTotalBytes) will not
// be populated from the TOML data since they cannot be decoded by toml.Decode().
// As a result, these fields will remain uninitialized (zero value) after decoding.
func (c *SubTaskConfig) Decode(data string, verifyDecryptPassword bool) error {
if _, err := toml.Decode(data, c); err != nil {
return terror.ErrConfigTomlTransform.Delegate(err, "decode subtask config from data")
Expand Down Expand Up @@ -495,6 +513,12 @@ func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {
if err != nil {
return nil, terror.ErrConfigTomlTransform.Delegate(err, "decode subtask config from data")
}

// Manually copy atomic values for atomic.Uint64 doesn't implement TOML marshaling interfaces
if c.IOTotalBytes != nil {
clone.IOTotalBytes = atomic.NewUint64(c.IOTotalBytes.Load())
}
if c.DumpIOTotalBytes != nil {
clone.DumpIOTotalBytes = atomic.NewUint64(c.DumpIOTotalBytes.Load())
}
return clone, nil
}
74 changes: 74 additions & 0 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package config
import (
"context"
"crypto/rand"
"encoding/json"
"reflect"
"sync"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestSubTask(t *testing.T) {
Expand Down Expand Up @@ -345,3 +348,74 @@ func TestFetchTZSetting(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "+01:00", tz)
}

func TestSubTaskConfigMarshalAtomic(t *testing.T) {
var (
uuid = "test-uuid"
dumpUUID = "test-dump-uuid"
)
cfg := &SubTaskConfig{
Name: "test",
SourceID: "source-1",
UUID: uuid,
DumpUUID: dumpUUID,
IOTotalBytes: atomic.NewUint64(100),
DumpIOTotalBytes: atomic.NewUint64(200),
}
require.Equal(t, cfg.IOTotalBytes.Load(), uint64(100))
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

data, err := json.Marshal(cfg)
require.NoError(t, err)
jsonMap := make(map[string]interface{})
err = json.Unmarshal(data, &jsonMap)
require.NoError(t, err)

// Check atomic values exist and are numbers
ioBytes, ok := jsonMap["io-total-bytes"].(float64)
require.True(t, ok, "io-total-bytes should be a number")
require.GreaterOrEqual(t, ioBytes, float64(100))

dumpBytes, ok := jsonMap["dump-io-total-bytes"].(float64)
require.True(t, ok, "dump-io-total-bytes should be a number")
require.GreaterOrEqual(t, dumpBytes, float64(200))

// UUID fields should not be present in JSON
_, hasUUID := jsonMap["uuid"]
_, hasDumpUUID := jsonMap["dump-uuid"]
require.False(t, hasUUID, "UUID should not be in JSON")
require.False(t, hasDumpUUID, "DumpUUID should not be in JSON")
}()

wg.Add(1)
go func() {
defer wg.Done()

newCfg, err := cfg.Clone()
require.NoError(t, err)

// Check atomic values exist and are numbers
require.GreaterOrEqual(t, newCfg.IOTotalBytes.Load(), uint64(100))
require.GreaterOrEqual(t, newCfg.DumpIOTotalBytes.Load(), uint64(200))
require.Equal(t, newCfg.UUID, uuid)
require.Equal(t, newCfg.DumpUUID, dumpUUID)
}()

wg.Add(1)
go func() {
defer wg.Done()
cfg.IOTotalBytes.Add(1)
cfg.DumpIOTotalBytes.Add(1)
}()
}
wg.Wait()

require.Equal(t, cfg.IOTotalBytes.Load(), uint64(110))
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(210))
}
4 changes: 4 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]dbconfig.DBCon

cfg.CleanDumpFile = c.CleanDumpFile

cfg.InitIOCounters()

if err := cfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
}
Expand Down Expand Up @@ -308,6 +310,8 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
if task.IgnoreCheckingItems != nil && len(*task.IgnoreCheckingItems) != 0 {
subTaskCfg.IgnoreCheckingItems = *task.IgnoreCheckingItems
}
// set syncer IO total bytes counter
subTaskCfg.InitIOCounters()
// adjust sub task config
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
Expand Down
8 changes: 8 additions & 0 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"testing"

"github.com/google/uuid"
"github.com/pingcap/check"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config/dbconfig"
Expand Down Expand Up @@ -117,6 +118,13 @@ func testNoShardTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTaskConfig.BAList, check.DeepEquals, bAListFromOpenAPITask)
// check ignore check items
c.Assert(subTaskConfig.IgnoreCheckingItems, check.IsNil)
// check io total bytes counter and uuid
c.Assert(subTaskConfig.IOTotalBytes, check.NotNil)
c.Assert(subTaskConfig.DumpIOTotalBytes, check.NotNil)
c.Assert(subTaskConfig.IOTotalBytes.Load(), check.Equals, uint64(0))
c.Assert(subTaskConfig.DumpIOTotalBytes.Load(), check.Equals, uint64(0))
c.Assert(subTaskConfig.UUID, check.HasLen, len(uuid.NewString()))
c.Assert(subTaskConfig.DumpUUID, check.HasLen, len(uuid.NewString()))
}

func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
Expand Down
3 changes: 3 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/utils"
bf "github.com/pingcap/tiflow/pkg/binlog-filter"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

var correctTaskConfig = `---
Expand Down Expand Up @@ -688,6 +689,8 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
ValidatorCfg: validatorCfg,
CleanDumpFile: true,
EnableANSIQuotes: true,
IOTotalBytes: atomic.NewUint64(0),
DumpIOTotalBytes: atomic.NewUint64(0),
}
)

Expand Down
5 changes: 5 additions & 0 deletions dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func (s *Server) getTaskStatus(ctx context.Context, taskName string) ([]openapi.
// add load status
if loadS := subTaskStatus.GetLoad(); loadS != nil {
openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{
Bps: loadS.Bps,
FinishedBytes: loadS.FinishedBytes,
MetaBinlog: loadS.MetaBinlog,
MetaBinlogGtid: loadS.MetaBinlogGTID,
Expand All @@ -584,6 +585,8 @@ func (s *Server) getTaskStatus(ctx context.Context, taskName string) ([]openapi.
SyncerBinlogGtid: syncerS.SyncerBinlogGtid,
TotalEvents: syncerS.TotalEvents,
TotalTps: syncerS.TotalTps,
IoTotalBytes: syncerS.IoTotalBytes,
DumpIoTotalBytes: syncerS.DumpIOTotalBytes,
}
if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups))
Expand All @@ -601,10 +604,12 @@ func (s *Server) getTaskStatus(ctx context.Context, taskName string) ([]openapi.
// add dump status
if dumpS := subTaskStatus.GetDump(); dumpS != nil {
openapiSubTaskStatus.DumpStatus = &openapi.DumpStatus{
Bps: dumpS.Bps,
CompletedTables: dumpS.CompletedTables,
EstimateTotalRows: dumpS.EstimateTotalRows,
FinishedBytes: dumpS.FinishedBytes,
FinishedRows: dumpS.FinishedRows,
Progress: dumpS.Progress,
TotalTables: dumpS.TotalTables,
}
}
Expand Down
Loading

0 comments on commit b909eff

Please sign in to comment.