Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drainer support plugin framework #911

Open
wants to merge 65 commits into
base: plugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
19b6895
add two args
tsthght Feb 5, 2020
4e0a4de
support plugin
tsthght Feb 5, 2020
2cc57b8
refine
tsthght Feb 7, 2020
b583108
optimize the plugin management structure
tsthght Feb 7, 2020
76c6be6
modify plugin framework
tsthght Feb 25, 2020
27989e2
modify plugin framework
tsthght Feb 25, 2020
4811764
modify plugin demo
tsthght Feb 25, 2020
a5c5572
gofmt
tsthght Feb 25, 2020
73c7f0b
add testcase
tsthght Feb 25, 2020
c2ac0ec
modify testcase
tsthght Feb 25, 2020
f9a74e3
modify go.mod
tsthght Feb 25, 2020
ccfd79b
modify
tsthght Feb 25, 2020
861e5df
fix a problem
tsthght Feb 25, 2020
2af5039
modify plugin
tsthght Feb 26, 2020
f4b62a2
modify plugin demo
tsthght Feb 26, 2020
3ca4924
modify plugin pos
tsthght Feb 27, 2020
6ea9197
modify FilterTxn ret
tsthght Feb 27, 2020
c085535
merge master
tsthght Feb 27, 2020
e4917f3
modify value: index
tsthght Feb 27, 2020
8f24b8a
add RecordId
tsthght Feb 27, 2020
24eaa9e
modify plugin
tsthght Feb 27, 2020
c97f3f2
support configure the mark database name and mark table name
tsthght Mar 2, 2020
536943a
add some testcase
tsthght Mar 4, 2020
af05581
add testcase
tsthght Mar 4, 2020
1d4e857
add testcase
tsthght Mar 4, 2020
fc3644f
modify some error
tsthght Mar 4, 2020
8525698
add some tests
tsthght Mar 4, 2020
3aa0f2e
modify test
tsthght Mar 4, 2020
9f0ae6a
add some log
tsthght Mar 4, 2020
b04f256
modify log
tsthght Mar 4, 2020
f0cbd85
refine code
tsthght Mar 5, 2020
7782196
add some log
tsthght Mar 5, 2020
ce64ee4
trim string
tsthght Mar 5, 2020
2df8841
should not conflict with LoopbackControl
tsthght Mar 5, 2020
0c764b1
refine comment
tsthght Mar 5, 2020
76061b5
add comment
tsthght Mar 5, 2020
78e9153
replace errors.New(fmt.Sprintf(...)) with fmt.Errorf(...)
tsthght Mar 5, 2020
59ad9ee
add comment
tsthght Mar 5, 2020
8859fd7
change RecordId to RecordID
tsthght Mar 5, 2020
f06e03f
change SetPlugin to setPlugin
tsthght Mar 5, 2020
355b679
add comment
tsthght Mar 5, 2020
9d8f6e3
refine
tsthght Mar 5, 2020
7a12571
modify comment
tsthght Mar 5, 2020
5e2e3ac
refine
tsthght Mar 5, 2020
d385b89
refine code
tsthght Mar 5, 2020
f102f7c
handle ci error
tsthght Mar 5, 2020
90c5126
add init interface
tsthght Mar 11, 2020
fbd2748
modify SyncerPlugin to
tsthght Mar 11, 2020
7df02f6
modify SyncerPlugin to SyncerFilter
tsthght Mar 11, 2020
2893d2f
modify Loopback to SyncerFilter
tsthght Mar 11, 2020
ed5852f
modify
tsthght Mar 11, 2020
518415d
add
tsthght Mar 11, 2020
6ac7d53
refine
tsthght Mar 11, 2020
0929e8d
add LoaderInit
tsthght Mar 11, 2020
124a885
add loaderdestroy
tsthght Mar 11, 2020
ce067e3
refine
tsthght Mar 11, 2020
8b5fbd9
modify
tsthght Mar 11, 2020
8a2d0a7
refine
tsthght Mar 11, 2020
8c71d4a
reine
tsthght Mar 11, 2020
1d33874
refine
tsthght Mar 11, 2020
c99c28c
modify
tsthght Mar 11, 2020
cd06a34
refine
tsthght Mar 12, 2020
613567d
add log
tsthght Mar 12, 2020
edecd47
add logs
tsthght Mar 12, 2020
016b67f
Get rid of useless parameters
tsthght Mar 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type SyncerConfig struct {
EnableDispatch bool `toml:"enable-dispatch" json:"enable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
EnableCausality bool `toml:"enable-detect" json:"enable-detect"`
PluginPath string `toml:"plugin-path" json:"plugin-path"`
PluginNames []string `toml:"plugin-names" json:"plugin-names"`
SupportPlugin bool `toml:"support-plugin" json:"support-plugin"`
MarkDBName string `toml:"mark-db-name" json:"mark-db-name"`
MarkTableName string `toml:"mark-table-name" json:"mark-table-name"`
}

// RelayConfig is the Relay log's configuration.
Expand Down Expand Up @@ -158,6 +163,12 @@ func NewConfig() *Config {
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")

fs.StringVar(&cfg.SyncerCfg.PluginPath, "plugin-path", "", "The path of the plugins")
fs.Var(newSliceNames([]string{}, &cfg.SyncerCfg.PluginNames), "plugin-names", "The names of the plugins")
fs.BoolVar(&cfg.SyncerCfg.SupportPlugin, "support-plugin", false, "Whether plugin is supported,default: false")
fs.StringVar(&cfg.SyncerCfg.MarkDBName, "mark-db-name", "rel", "mark database's name")
fs.StringVar(&cfg.SyncerCfg.MarkTableName, "mark-table-name", "_drainer_repl_mark", "mark table's name")

return cfg
}

Expand Down
72 changes: 72 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,75 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
}

func (t *testKafkaSuite) TestConfigPlugin(c *C) {
args := []string{}

cfg := NewConfig()
err := cfg.Parse(args)
c.Assert(err, IsNil)

c.Assert(len(cfg.SyncerCfg.PluginPath), Equals, 0)
c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 0)
c.Assert(cfg.SyncerCfg.SupportPlugin, Equals, false)
c.Assert(cfg.SyncerCfg.MarkDBName, Equals, "rel")
c.Assert(cfg.SyncerCfg.MarkTableName, Equals, "_drainer_repl_mark")

args = []string{
"-plugin-path", "/tmp/drainer/plugin",
"-plugin-names", "demo1",
"-support-plugin",
"-mark-db-name", "db1",
"-mark-table-name", "tb1",
}

cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)

c.Assert(cfg.SyncerCfg.PluginPath, Equals, "/tmp/drainer/plugin")
c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 1)
c.Assert(cfg.SyncerCfg.PluginNames[0], Equals, "demo1")
c.Assert(cfg.SyncerCfg.SupportPlugin, Equals, true)
c.Assert(cfg.SyncerCfg.MarkDBName, Equals, "db1")
c.Assert(cfg.SyncerCfg.MarkTableName, Equals, "tb1")

args = []string{
"-plugin-names", "demo1,demo2",
"-mark-db-name", "",
"-mark-table-name", "",
}

cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)

c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 2)
c.Assert(cfg.SyncerCfg.PluginNames[0], Equals, "demo1")
c.Assert(cfg.SyncerCfg.PluginNames[1], Equals, "demo2")
c.Assert(cfg.SyncerCfg.MarkDBName, Equals, "")
c.Assert(cfg.SyncerCfg.MarkTableName, Equals, "")

args = []string{
"-plugin-names", "",
}

cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)

c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 1)
c.Assert(cfg.SyncerCfg.PluginNames[0], Equals, "")

args = []string{
"-plugin-names", ",",
}

cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)

c.Assert(len(cfg.SyncerCfg.PluginNames), Equals, 2)
c.Assert(cfg.SyncerCfg.PluginNames[0], Equals, "")
c.Assert(cfg.SyncerCfg.PluginNames[1], Equals, "")
}
11 changes: 11 additions & 0 deletions drainer/filter_txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package drainer

import (
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"
)

// LoopBack is the interface that for syncer-plugin
type LoopBack interface {
FilterTxn(txn *loader.Txn, info *loopbacksync.LoopBackSync) (bool, error)
}
58 changes: 43 additions & 15 deletions drainer/loopbacksync/loopbacksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/plugin"
"go.uber.org/zap"
)

Expand All @@ -36,39 +37,66 @@ const (
ChannelInfo = "channel_info"
)

// CreateMarkTableDDL is the DDL to create the mark table.
var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", MarkTableName, ID, ChannelID, Val, ChannelInfo, ID, ChannelID)

// CreateMarkDBDDL is DDL to create the database of mark table.
var CreateMarkDBDDL = "create database IF NOT EXISTS retl;"

//LoopBackSync loopback sync info
type LoopBackSync struct {
ChannelID int64
LoopbackControl bool
MarkDBName string
MarkTableName string
SyncDDL bool
Index int64
PluginPath string
PluginNames []string
Hooks []*plugin.EventHooks
SupportPlugin bool
RecordID int
}

//NewLoopBackSyncInfo return LoopBackSyncInfo objec
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync {
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool, path string, names []string, SupportPlug bool, mdbname, mtablename string) *LoopBackSync {
l := &LoopBackSync{
ChannelID: ChannelID,
LoopbackControl: LoopbackControl,
SyncDDL: SyncDDL,
Index: 0,
PluginPath: path,
PluginNames: names,
SupportPlugin: SupportPlug,
MarkDBName: strings.TrimSpace(mdbname),
MarkTableName: strings.TrimSpace(mtablename),
}
if l.SupportPlugin {
l.Hooks = make([]*plugin.EventHooks, 2)
l.Hooks[plugin.SyncerPlugin] = &plugin.EventHooks{}
l.Hooks[plugin.LoaderPlugin] = &plugin.EventHooks{}
}
return l
}

// CreateMarkTable create the db and table if need.
func CreateMarkTable(db *sql.DB) error {
_, err := db.Exec(CreateMarkDBDDL)
if err != nil {
return errors.Annotate(err, "failed to create mark db")
}
func CreateMarkTable(db *sql.DB, mdbname, mtablename string) error {
// CreateMarkDBDDL is DDL to create the database of mark table.
var err error
if len(mdbname) == 0 {
tsthght marked this conversation as resolved.
Show resolved Hide resolved
// CreateMarkTableDDL is the DDL to create the mark table.
var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", mtablename, ID, ChannelID, Val, ChannelInfo, ID, ChannelID)
_, err = db.Exec(CreateMarkTableDDL)
if err != nil {
return errors.Annotate(err, "failed to create mark table")
}
} else {
var CreateMarkDBDDL = fmt.Sprintf("create database IF NOT EXISTS %s;", mdbname)
_, err = db.Exec(CreateMarkDBDDL)
if err != nil {
return errors.Annotate(err, "failed to create mark db")
}

_, err = db.Exec(CreateMarkTableDDL)
if err != nil {
return errors.Annotate(err, "failed to create mark table")
// CreateMarkTableDDL is the DDL to create the mark table.
var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s.%s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", mdbname, mtablename, ID, ChannelID, Val, ChannelInfo, ID, ChannelID)
_, err = db.Exec(CreateMarkTableDDL)
if err != nil {
return errors.Annotate(err, "failed to create mark table")
}
}

return nil
Expand Down
14 changes: 12 additions & 2 deletions drainer/loopbacksync/loopbacksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package loopbacksync

import (
"database/sql/driver"
"fmt"
"regexp"
"testing"

Expand All @@ -32,25 +33,34 @@ func (s *loopbackSuite) TestNewLoopBackSyncInfo(c *check.C) {
var ChannelID int64 = 1
var LoopbackControl = true
var SyncDDL = false
l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL)

l := NewLoopBackSyncInfo(ChannelID, LoopbackControl, SyncDDL, "", nil, false, "rel", "_drainer_repl_mark")

c.Assert(l, check.DeepEquals, &LoopBackSync{
ChannelID: ChannelID,
LoopbackControl: LoopbackControl,
SyncDDL: SyncDDL,
PluginPath: "",
PluginNames: nil,
SupportPlugin: false,
MarkDBName: "rel",
MarkTableName: "_drainer_repl_mark",
})
}

func (s *loopbackSuite) TestCreateMarkTable(c *check.C) {
db, mk, err := sqlmock.New()
c.Assert(err, check.IsNil)

CreateMarkDBDDL := "create database IF NOT EXISTS rel;"
CreateMarkTableDDL := fmt.Sprintf("CREATE TABLE If Not Exists %s.%s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", "rel", "_drainer_repl_mark", ID, ChannelID, Val, ChannelInfo, ID, ChannelID)

mk.ExpectExec(regexp.QuoteMeta(CreateMarkDBDDL)).
WillReturnResult(sqlmock.NewResult(0, 0))
mk.ExpectExec(regexp.QuoteMeta(CreateMarkTableDDL)).
WillReturnResult(sqlmock.NewResult(0, 0))

err = CreateMarkTable(db)
err = CreateMarkTable(db, "rel", "_drainer_repl_mark")
c.Assert(err, check.IsNil)

err = mk.ExpectationsWereMet()
Expand Down
86 changes: 82 additions & 4 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/plugin"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/checkpoint"
Expand Down Expand Up @@ -77,8 +77,35 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
ignoreDBs = strings.Split(cfg.IgnoreSchemas, ",")
}
syncer.filter = filter.NewFilter(ignoreDBs, cfg.IgnoreTables, cfg.DoDBs, cfg.DoTables)
syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL)
syncer.loopbackSync = loopbacksync.NewLoopBackSyncInfo(cfg.ChannelID, cfg.LoopbackControl, cfg.SyncDDL, cfg.PluginPath,
cfg.PluginNames, cfg.SupportPlugin, cfg.MarkDBName, cfg.MarkTableName)
if syncer.loopbackSync.SupportPlugin {
log.Info("Begin to Load syncer-plugins.")
for _, name := range syncer.loopbackSync.PluginNames {
n := strings.TrimSpace(name)
sym, err := plugin.LoadPlugin(syncer.loopbackSync.Hooks[plugin.SyncerPlugin],
syncer.loopbackSync.PluginPath, n)
if err != nil {
log.Error("Load plugin failed.", zap.String("plugin name", n),
zap.String("error", err.Error()))
continue
}

newPlugin, ok := sym.(func() interface{})
if !ok {
log.Error("The correct new-function is not provided.", zap.String("plugin name", n), zap.String("type", "syncer plugin"))
continue
}
plg := newPlugin()
_, ok = plg.(LoopBack)
if !ok {
log.Info("syncer plugin's interface is not implemented.", zap.String("plugin name", n), zap.String("type", "syncer plugin"))
}
plugin.RegisterPlugin(syncer.loopbackSync.Hooks[plugin.SyncerPlugin],
n, newPlugin())
log.Info("Load plugin success.", zap.String("plugin name", n), zap.String("type", "syncer plugin"))
}
}
var err error
// create schema
syncer.schema, err = NewSchema(jobs, false)
Expand Down Expand Up @@ -357,8 +384,30 @@ ForLoop:
err = errors.Annotate(err, "handlePreviousDDLJobIfNeed failed")
break ForLoop
}

var isFilterTransaction = false
var err1 error

if s.loopbackSync.SupportPlugin {
hook := s.loopbackSync.Hooks[plugin.SyncerPlugin]
var txn *loader.Txn
txn, err1 = translator.TiBinlogToTxn(s.schema, "", "", binlog, preWrite, false)
hook.Range(func(k, val interface{}) bool {
c, ok := val.(LoopBack)
if !ok {
return true
}
isFilterTransaction, err1 = c.FilterTxn(txn, s.loopbackSync)
if isFilterTransaction || err1 != nil {
tsthght marked this conversation as resolved.
Show resolved Hide resolved
return false
}
return true
})
if err1 != nil {
break ForLoop
tsthght marked this conversation as resolved.
Show resolved Hide resolved
}
}

if s.loopbackSync != nil && s.loopbackSync.LoopbackControl {
isFilterTransaction, err1 = loopBackStatus(binlog, preWrite, s.schema, s.loopbackSync)
if err1 != nil {
Expand Down Expand Up @@ -414,6 +463,35 @@ ForLoop:
break ForLoop
}

if s.loopbackSync.SupportPlugin {
var isFilterTransaction = false
var err1 error
txn := new(loader.Txn)
txn.DDL = &loader.DDL{
Database: schema,
Table: table,
SQL: string(binlog.GetDdlQuery()),
}
hook := s.loopbackSync.Hooks[plugin.SyncerPlugin]
hook.Range(func(k, val interface{}) bool {
c, ok := val.(LoopBack)
if !ok {
return true
}
isFilterTransaction, err1 = c.FilterTxn(txn, s.loopbackSync)
if isFilterTransaction || err1 != nil {
return false
}
return true
})
if err1 != nil {
break ForLoop
}
if isFilterTransaction {
continue
}
}

if s.filter.SkipSchemaAndTable(schema, table) {
log.Info("skip ddl by filter", zap.String("schema", schema), zap.String("table", table),
zap.String("sql", sql), zap.Int64("commit ts", commitTS))
Expand Down
17 changes: 17 additions & 0 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path"
"sort"
"strings"
"sync"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -191,3 +192,19 @@ func genDrainerID(listenAddr string) (string, error) {

return fmt.Sprintf("%s:%s", hostname, port), nil
}

type sliceNames []string

func newSliceNames(vals []string, p *[]string) *sliceNames {
*p = vals
return (*sliceNames)(p)
}

func (s *sliceNames) Set(val string) error {
*s = sliceNames(strings.Split(val, ","))
return nil
}

func (s *sliceNames) Get() interface{} { return []string(*s) }

func (s *sliceNames) String() string { return strings.Join([]string(*s), ",") }
Loading