diff --git a/compression_verifier.go b/compression_verifier.go deleted file mode 100644 index 54651902..00000000 --- a/compression_verifier.go +++ /dev/null @@ -1,246 +0,0 @@ -package ghostferry - -import ( - "crypto/md5" - "database/sql" - "encoding/hex" - "errors" - "fmt" - "strconv" - "strings" - - sq "github.com/Masterminds/squirrel" - "github.com/golang/snappy" - "github.com/siddontang/go-mysql/schema" - "github.com/sirupsen/logrus" -) - -const ( - // CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data - CompressionSnappy = "SNAPPY" -) - -type ( - // TableColumnCompressionConfig represents compression configuration for a - // column in a table as table -> column -> compression-type - // ex: books -> contents -> snappy - TableColumnCompressionConfig map[string]map[string]string -) - -// UnsupportedCompressionError is used to identify errors resulting -// from attempting to decompress unsupported algorithms -type UnsupportedCompressionError struct { - table string - column string - algorithm string -} - -func (e UnsupportedCompressionError) Error() string { - return "Compression algorithm: " + e.algorithm + - " not supported on table: " + e.table + - " for column: " + e.column -} - -// CompressionVerifier provides support for verifying the payload of compressed columns that -// may have different hashes for the same data by first decompressing the compressed -// data before fingerprinting -type CompressionVerifier struct { - logger *logrus.Entry - - supportedAlgorithms map[string]struct{} - tableColumnCompressions TableColumnCompressionConfig -} - -// GetCompressedHashes compares the source data with the target data to ensure the integrity of the -// data being copied. -// -// The GetCompressedHashes method checks if the existing table contains compressed data -// and will apply the decompression algorithm to the applicable columns if necessary. -// After the columns are decompressed, the hashes of the data are used to verify equality -func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) { - c.logger.WithFields(logrus.Fields{ - "tag": "compression_verifier", - "table": table, - }).Info("decompressing table data before verification") - - tableCompression := c.tableColumnCompressions[table] - - // Extract the raw rows using SQL to be decompressed - rows, err := getRows(db, schema, table, pkColumn, columns, pks) - if err != nil { - return nil, err - } - defer rows.Close() - - // Decompress applicable columns and hash the resulting column values for comparison - resultSet := make(map[uint64][]byte) - for rows.Next() { - rowData, err := ScanByteRow(rows, len(columns)+1) - if err != nil { - return nil, err - } - - pk, err := strconv.ParseUint(string(rowData[0]), 10, 64) - if err != nil { - return nil, err - } - - // Decompress the applicable columns and then hash them together - // to create a fingerprint. decompressedRowData contains a map of all - // the non-compressed columns and associated decompressed values by the - // index of the column - decompressedRowData := [][]byte{} - for idx, column := range columns { - if algorithm, ok := tableCompression[column.Name]; ok { - // rowData contains the result of "SELECT pkColumn, * FROM ...", so idx+1 to get each column - decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1]) - if err != nil { - return nil, err - } - decompressedRowData = append(decompressedRowData, decompressedColData) - } else { - decompressedRowData = append(decompressedRowData, rowData[idx+1]) - } - } - - // Hash the data of the row to be added to the result set - decompressedRowHash, err := c.HashRow(decompressedRowData) - if err != nil { - return nil, err - } - - resultSet[pk] = decompressedRowHash - } - - metrics.Gauge( - "compression_verifier_decompress_rows", - float64(len(resultSet)), - []MetricTag{{"table", table}}, - 1.0, - ) - - logrus.WithFields(logrus.Fields{ - "tag": "compression_verifier", - "rows": len(resultSet), - "table": table, - }).Debug("decompressed rows will be compared") - - return resultSet, nil -} - -// Decompress will apply the configured decompression algorithm to the configured columns data -func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error) { - var decompressed []byte - switch strings.ToUpper(algorithm) { - case CompressionSnappy: - return snappy.Decode(decompressed, compressed) - default: - return nil, UnsupportedCompressionError{ - table: table, - column: column, - algorithm: algorithm, - } - } - -} - -// HashRow will fingerprint the non-primary columns of the row to verify data equality -func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error) { - if len(decompressedRowData) == 0 { - return nil, errors.New("Row data to fingerprint must not be empty") - } - - hash := md5.New() - var rowFingerprint []byte - for _, colData := range decompressedRowData { - rowFingerprint = append(rowFingerprint, colData...) - } - - _, err := hash.Write(rowFingerprint) - if err != nil { - return nil, err - } - - return []byte(hex.EncodeToString(hash.Sum(nil))), nil -} - -// IsCompressedTable will identify whether or not a table is compressed -func (c *CompressionVerifier) IsCompressedTable(table string) bool { - if _, ok := c.tableColumnCompressions[table]; ok { - return true - } - return false -} - -func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompressions TableColumnCompressionConfig) error { - for table, columns := range tableColumnCompressions { - for column, algorithm := range columns { - if _, ok := c.supportedAlgorithms[strings.ToUpper(algorithm)]; !ok { - return &UnsupportedCompressionError{ - table: table, - column: column, - algorithm: algorithm, - } - } - } - } - - return nil -} - -// NewCompressionVerifier first checks the map for supported compression algorithms before -// initializing and returning the initialized instance. -func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) { - supportedAlgorithms := make(map[string]struct{}) - supportedAlgorithms[CompressionSnappy] = struct{}{} - - compressionVerifier := &CompressionVerifier{ - logger: logrus.WithField("tag", "compression_verifier"), - supportedAlgorithms: supportedAlgorithms, - tableColumnCompressions: tableColumnCompressions, - } - - if err := compressionVerifier.verifyConfiguredCompression(tableColumnCompressions); err != nil { - return nil, err - } - - return compressionVerifier, nil -} - -func getRows(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (*sql.Rows, error) { - quotedPK := quoteField(pkColumn) - sql, args, err := rowSelector(columns, pkColumn). - From(QuotedTableNameFromString(schema, table)). - Where(sq.Eq{quotedPK: pks}). - OrderBy(quotedPK). - ToSql() - - if err != nil { - return nil, err - } - - // This query must be a prepared query. If it is not, querying will use - // MySQL's plain text interface, which will scan all values into []uint8 - // if we give it []interface{}. - stmt, err := db.Prepare(sql) - if err != nil { - return nil, err - } - - defer stmt.Close() - rows, err := stmt.Query(args...) - if err != nil { - return nil, err - } - - return rows, nil -} - -func rowSelector(columns []schema.TableColumn, pkColumn string) sq.SelectBuilder { - columnStrs := make([]string, len(columns)) - for idx, column := range columns { - columnStrs[idx] = column.Name - } - - return sq.Select(fmt.Sprintf("%s, %s", quoteField(pkColumn), strings.Join(columnStrs, ","))) -} diff --git a/config.go b/config.go index 49061cd5..2e4dd8c7 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,6 @@ import ( const ( VerifierTypeChecksumTable = "ChecksumTable" - VerifierTypeIterative = "Iterative" VerifierTypeInline = "Inline" VerifierTypeNoVerification = "NoVerification" ) @@ -179,58 +178,6 @@ func (c *InlineVerifierConfig) Validate() error { return nil } -type IterativeVerifierConfig struct { - // List of tables that should be ignored by the IterativeVerifier. - IgnoredTables []string - - // List of columns that should be ignored by the IterativeVerifier. - // This is in the format of table_name -> [list of column names] - IgnoredColumns map[string][]string - - // The number of concurrent verifiers. Note that a single table can only be - // assigned to one goroutine and currently multiple goroutines per table - // is not supported. - Concurrency int - - // The maximum expected downtime during cutover, in the format of - // time.ParseDuration. - MaxExpectedDowntime string - - // Map of the table and column identifying the compression type - // (if any) of the column. This is used during verification to ensure - // the data was successfully copied as some compression algorithms can - // output different compressed data with the same input data. - // - // The data structure is a map of table names to a map of column names - // to the compression algorithm. - // ex: {books: {contents: snappy}} - // - // Currently supported compression algorithms are: - // 1. Snappy (https://google.github.io/snappy/) as "SNAPPY" - // - // Optional: defaults to empty map/no compression - // - // Note that the IterativeVerifier is in the process of being deprecated. - // If this is specified, ColumnCompressionConfig should also be filled out in - // the main Config. - TableColumnCompression TableColumnCompressionConfig -} - -func (c *IterativeVerifierConfig) Validate() error { - if c.MaxExpectedDowntime != "" { - _, err := time.ParseDuration(c.MaxExpectedDowntime) - if err != nil { - return err - } - } - - if c.Concurrency == 0 { - c.Concurrency = 4 - } - - return nil -} - // SchemaName => TableName => ColumnName => CompressionAlgorithm // Example: blog1 => articles => body => snappy // (SELECT body FROM blog1.articles => returns compressed blob) @@ -379,19 +326,12 @@ type Config struct { // The verifier to use during the run. Valid choices are: // ChecksumTable - // Iterative // NoVerification // // If it is left blank, the Verifier member variable on the Ferry will be // used. If that member variable is nil, no verification will be done. VerifierType string - // Only useful if VerifierType == Iterative. - // This specifies the configurations to the IterativeVerifier. - // - // This option is in the process of being deprecated. - IterativeVerifierConfig IterativeVerifierConfig - // Only useful if VerifierType == Inline. // This specifies the configurations to the InlineVerifierConfig. InlineVerifierConfig InlineVerifierConfig @@ -415,10 +355,6 @@ type Config struct { // uncompressed data is equal. // - This column signals to the InlineVerifier that it needs to decompress // the data to compare identity. - // - // Note: a similar option exists in IterativeVerifier. However, the - // IterativeVerifier is being deprecated and this will be the correct place - // to specify it if you don't need the IterativeVerifier. CompressedColumnsForVerification ColumnCompressionConfig // This config is also for inline verification for the same special case of @@ -452,11 +388,7 @@ func (c *Config) ValidateConfig() error { return fmt.Errorf("StateToResumeFrom version mismatch: resume = %s, current = %s", c.StateToResumeFrom.GhostferryVersion, VersionString) } - if c.VerifierType == VerifierTypeIterative { - if err := c.IterativeVerifierConfig.Validate(); err != nil { - return fmt.Errorf("IterativeVerifierConfig invalid: %v", err) - } - } else if c.VerifierType == VerifierTypeInline { + if c.VerifierType == VerifierTypeInline { if err := c.InlineVerifierConfig.Validate(); err != nil { return fmt.Errorf("InlineVerifierConfig invalid: %v", err) } diff --git a/docs/source/copydbinterruptresume.rst b/docs/source/copydbinterruptresume.rst index 5a9a983d..7cdf157a 100644 --- a/docs/source/copydbinterruptresume.rst +++ b/docs/source/copydbinterruptresume.rst @@ -111,8 +111,8 @@ Some other considerations/notes: runs. * To test resuming errored runs further, see :ref:`prodtesting`. -* Verifiers are not resumable, including the IterativeVerifier. This may change - in the future. +* ChecksumTableVerifier is not resumable, but the InlineVerifier is as long as + cut over didn't begin. * While we are confident that the algorithm to be correct, this is still a highly experimental feature. USE AT YOUR OWN RISK. diff --git a/docs/source/verifiers.rst b/docs/source/verifiers.rst index d925e25c..56f39059 100644 --- a/docs/source/verifiers.rst +++ b/docs/source/verifiers.rst @@ -6,28 +6,34 @@ Verifiers Verifiers in Ghostferry is designed to ensure that Ghostferry did not corrupt/miss data. There are two different verifiers: the -``ChecksumTableVerifier`` and the ``IterativeVerifier``. A comparison of them +``ChecksumTableVerifier`` and the ``InlineVerifier``. A comparison of them are given below: +-----------------------+-----------------------+-----------------------------+ -| | ChecksumTableVerifier | IterativeVerifier | +| | ChecksumTableVerifier | InlineVerifier | +-----------------------+-----------------------+-----------------------------+ -|Mechanism | ``CHECKSUM TABLE`` | Verify row before cutover; | -| | | Reverify changed rows during| -| | | cutover. | +|Mechanism | ``CHECKSUM TABLE`` | Each row is validated via an| +| | | MD5 type query on the MySQL | +| | | database after it is copied.| +| | | Any entries copied due to | +| | | binlog activity is verified | +| | | periodically during the copy| +| | | process and ultimately | +| | | during the cutover. | +-----------------------+-----------------------+-----------------------------+ -|Impacts on Cutover Time| Linear w.r.t data size| Linear w.r.t. change rate | -| | | [1]_ | +|Impacts on Cutover Time| Linear w.r.t data | Linear w.r.t. change rate | +| | size. | [1]_. | +-----------------------+-----------------------+-----------------------------+ -|Impacts on Copy Time | None | Linear w.r.t data size | +|Impacts on Copy Time | None. | Linear w.r.t data size [3]_.| |[2]_ | | | +-----------------------+-----------------------+-----------------------------+ -|Memory Usage | Minimal | Linear w.r.t rows changed | +|Memory Usage | Minimal. | Minimal. | +-----------------------+-----------------------+-----------------------------+ -|Partial table copy | Not supported | Supported | +|Partial table copy | Not supported. | Supported. | +-----------------------+-----------------------+-----------------------------+ -|Worst Case Scenario | Large databases causes| Verification is slower than | -| | unacceptable downtime | the change rate of the DB; | +|Worst Case Scenario | Large databases causes| Verification during cutover | +| | unacceptable downtime.| is slower than the change | +| | | rate of the DB. | +-----------------------+-----------------------+-----------------------------+ .. [1] Additional improvements could be made to reduce this as long as @@ -37,39 +43,45 @@ are given below: .. [2] Increase in copy time does not increase downtime. Downtime occurs only in cutover. +.. [3] The increase should be minimal as the verification is done immediately + after copy, when the data most likely still live in RAM. + If you want verification, you should try with the ``ChecksumTableVerifier`` first if you're copying whole tables at a time. If that takes too long, you can -try using the ``IterativeVerifier``. Alternatively, you can verify in a staging +try using the ``InlineVerifier``. Alternatively, you can verify in a staging run and not verify during the production run (see :ref:`copydbinprod`). -IterativeVerifier ------------------ - -IterativeVerifier verifies the source and target in a couple of steps: - -1. After the data copy, it first compares the hashes of each applicable rows - of the source and the target together to make sure they are the same. This - is known as the initial verification. - - a. If they are the same: the verification for that row is complete. - b. If they are not the same: add it into a reverify queue. - -2. For any rows changed during the initial verification process, add it into - the reverify queue. - -3. After the initial verification, verify the rows' hashes in the - reverification queue again. This is done to reduce the time needed to - reverify during the cutover as we assume the reverification queue will - become smaller during this process. - -4. During the cutover stage, verify all rows' hashes in the reverify queue. - - a. If they are the same: the verification for that row is complete. - b. If they are not the same: the verification fails. - -5. If no verification failure occurs, the source and the target are identical. - If verification failure does occur (4b), then the source and target are not - identical. - -A proof of concept TLA+ verification of this algorithm is done in -``_. +InlineVerifier +-------------- + +Ghostferry's core algorithm has run for millions of times and is backed by a +TLA+ specification. There's a high degree of confidence in its correctness. +However, correctness analysis assumed that the data is perfectly copied from +the source to the target MySQL. However, this may not be the case as the data +is transformed from MySQL -> Go -> MySQL. Different encodings could change the +unintentionally data, resulting in corruptions. Some observed (and fixed) cases +includes: floating point values and datetime columns. + +The InlineVerifier is designed to catch these type of problems and fail the +run if discrepencies are detected. **It is not designed to verify that certain +records are missing**. Only the ChecksumTableVerifier can do that. The way the +InlineVerifier catches encoding type issues are as follows: + +* During the DataIterator + 1. While selecting a row to copy, a MD5 checksum is calculated on the source + MySQL server. + 2. After the data is copied onto the target, the same MD5 checksum is + calculated on the target. + 3. The hash is compared. If it is different, the run is aborted with an + error. +* During the BinlogStreamer + 1. Since the MD5 checksum cannot be synchronously generated with the Binlog + event, any rows seen in the BinlogStreamer is added to a background + verification queue. + 2. Periodically in the background, the checksum for all rows within the + queue are computed both on the source and the target database. + 3. The checksums are compared. If they match, the row is removed from the + verification queue. Otherwise, it remains in the queue. + 4. During the cutover, when the source and target are read-only, checksums + for all rows within the verification queue are checked. If any + mismatches are detected, an error is raised. diff --git a/ferry.go b/ferry.go index 8df62913..6755c7d6 100644 --- a/ferry.go +++ b/ferry.go @@ -71,8 +71,7 @@ type Ferry struct { // returned in Initialize. // // If VerifierType is specified and this is nil on Ferry initialization, a - // Verifier will be created by Initialize. If an IterativeVerifier is to be - // created, IterativeVerifierConfig will be used to create the verifier. + // Verifier will be created by Initialize. Verifier Verifier inlineVerifier *InlineVerifier @@ -226,65 +225,6 @@ func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier { return v } -func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error) { - f.ensureInitialized() - - var err error - config := f.Config.IterativeVerifierConfig - - var maxExpectedDowntime time.Duration - if config.MaxExpectedDowntime != "" { - maxExpectedDowntime, err = time.ParseDuration(config.MaxExpectedDowntime) - if err != nil { - return nil, fmt.Errorf("invalid MaxExpectedDowntime: %v. this error should have been caught via .Validate()", err) - } - } - - var compressionVerifier *CompressionVerifier - if config.TableColumnCompression != nil { - compressionVerifier, err = NewCompressionVerifier(config.TableColumnCompression) - if err != nil { - return nil, err - } - } - - ignoredColumns := make(map[string]map[string]struct{}) - for table, columns := range config.IgnoredColumns { - ignoredColumns[table] = make(map[string]struct{}) - for _, column := range columns { - ignoredColumns[table][column] = struct{}{} - } - } - - v := &IterativeVerifier{ - CursorConfig: &CursorConfig{ - DB: f.SourceDB, - BatchSize: f.Config.DataIterationBatchSize, - ReadRetries: f.Config.DBReadRetries, - }, - - BinlogStreamer: f.BinlogStreamer, - SourceDB: f.SourceDB, - TargetDB: f.TargetDB, - CompressionVerifier: compressionVerifier, - - Tables: f.Tables.AsSlice(), - TableSchemaCache: f.Tables, - IgnoredTables: config.IgnoredTables, - IgnoredColumns: ignoredColumns, - DatabaseRewrites: f.Config.DatabaseRewrites, - TableRewrites: f.Config.TableRewrites, - Concurrency: config.Concurrency, - MaxExpectedDowntime: maxExpectedDowntime, - } - - if f.CopyFilter != nil { - v.CursorConfig.BuildSelect = f.CopyFilter.BuildSelect - } - - return v, v.Initialize() -} - // Initialize all the components of Ghostferry and connect to the Database func (f *Ferry) Initialize() (err error) { f.StartTime = time.Now().Truncate(time.Second) @@ -431,8 +371,6 @@ func (f *Ferry) Initialize() (err error) { f.Tables = f.StateToResumeFrom.LastKnownTableSchemaCache } - // The iterative verifier needs the binlog streamer so this has to be first. - // Eventually this can be moved below the verifier initialization. f.BinlogStreamer = f.NewBinlogStreamer() f.BinlogWriter = f.NewBinlogWriter() f.DataIterator = f.NewDataIterator() @@ -444,11 +382,6 @@ func (f *Ferry) Initialize() (err error) { } switch f.Config.VerifierType { - case VerifierTypeIterative: - f.Verifier, err = f.NewIterativeVerifier() - if err != nil { - return err - } case VerifierTypeChecksumTable: f.Verifier = f.NewChecksumTableVerifier() case VerifierTypeInline: diff --git a/inline_verifier.go b/inline_verifier.go index 49d7bde6..d4545eab 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -14,10 +14,25 @@ import ( "github.com/sirupsen/logrus" ) -// This struct is very similar to ReverifyStore, but it is more optimized -// for serialization into JSON. -// -// TODO: remove IterativeVerifier and remove this comment. +const ( + // CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data + CompressionSnappy = "SNAPPY" +) + +// UnsupportedCompressionError is used to identify errors resulting +// from attempting to decompress unsupported algorithms +type UnsupportedCompressionError struct { + table string + column string + algorithm string +} + +func (e UnsupportedCompressionError) Error() string { + return "Compression algorithm: " + e.algorithm + + " not supported on table: " + e.table + + " for column: " + e.column +} + type BinlogVerifyStore struct { EmitLogPerRowsAdded uint64 diff --git a/iterative_verifier.go b/iterative_verifier.go deleted file mode 100644 index c8c3b07e..00000000 --- a/iterative_verifier.go +++ /dev/null @@ -1,700 +0,0 @@ -package ghostferry - -import ( - "bytes" - "database/sql" - "errors" - "fmt" - "math" - "strconv" - "strings" - "sync" - "time" - - sq "github.com/Masterminds/squirrel" - "github.com/siddontang/go-mysql/schema" - "github.com/sirupsen/logrus" -) - -type ReverifyBatch struct { - Pks []uint64 - Table TableIdentifier -} - -type ReverifyEntry struct { - Pk uint64 - Table *TableSchema -} - -type ReverifyStore struct { - MapStore map[TableIdentifier]map[uint64]struct{} - mapStoreMutex *sync.Mutex - BatchStore []ReverifyBatch - RowCount uint64 - EmitLogPerRowCount uint64 -} - -func NewReverifyStore() *ReverifyStore { - r := &ReverifyStore{ - mapStoreMutex: &sync.Mutex{}, - RowCount: uint64(0), - EmitLogPerRowCount: uint64(10000), - } - - r.flushStore() - return r -} - -func (r *ReverifyStore) Add(entry ReverifyEntry) { - r.mapStoreMutex.Lock() - defer r.mapStoreMutex.Unlock() - - tableId := NewTableIdentifierFromSchemaTable(entry.Table) - if _, exists := r.MapStore[tableId]; !exists { - r.MapStore[tableId] = make(map[uint64]struct{}) - } - - if _, exists := r.MapStore[tableId][entry.Pk]; !exists { - r.MapStore[tableId][entry.Pk] = struct{}{} - r.RowCount++ - if r.RowCount%r.EmitLogPerRowCount == 0 { - metrics.Gauge("iterative_verifier_store_rows", float64(r.RowCount), []MetricTag{}, 1.0) - logrus.WithFields(logrus.Fields{ - "tag": "reverify_store", - "rows": r.RowCount, - }).Debug("added rows will be reverified") - } - } -} - -func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch { - r.mapStoreMutex.Lock() - defer r.mapStoreMutex.Unlock() - - r.BatchStore = make([]ReverifyBatch, 0) - for tableId, pkSet := range r.MapStore { - pkBatch := make([]uint64, 0, batchsize) - for pk, _ := range pkSet { - pkBatch = append(pkBatch, pk) - delete(pkSet, pk) - if len(pkBatch) >= batchsize { - r.BatchStore = append(r.BatchStore, ReverifyBatch{ - Pks: pkBatch, - Table: tableId, - }) - pkBatch = make([]uint64, 0, batchsize) - } - } - - if len(pkBatch) > 0 { - r.BatchStore = append(r.BatchStore, ReverifyBatch{ - Pks: pkBatch, - Table: tableId, - }) - } - - delete(r.MapStore, tableId) - } - - r.flushStore() - return r.BatchStore -} - -func (r *ReverifyStore) flushStore() { - r.MapStore = make(map[TableIdentifier]map[uint64]struct{}) - r.RowCount = 0 -} - -type verificationResultAndError struct { - Result VerificationResult - Error error -} - -func (r verificationResultAndError) ErroredOrFailed() bool { - return r.Error != nil || !r.Result.DataCorrect -} - -type IterativeVerifier struct { - CompressionVerifier *CompressionVerifier - CursorConfig *CursorConfig - BinlogStreamer *BinlogStreamer - TableSchemaCache TableSchemaCache - SourceDB *sql.DB - TargetDB *sql.DB - - Tables []*TableSchema - IgnoredTables []string - IgnoredColumns map[string]map[string]struct{} - DatabaseRewrites map[string]string - TableRewrites map[string]string - Concurrency int - MaxExpectedDowntime time.Duration - - reverifyStore *ReverifyStore - logger *logrus.Entry - - beforeCutoverVerifyDone bool - verifyDuringCutoverStarted AtomicBoolean - - // Variables for verification in the background - verificationResultAndStatus VerificationResultAndStatus - verificationErr error - backgroundVerificationWg *sync.WaitGroup - backgroundStartTime time.Time - backgroundDoneTime time.Time -} - -func (v *IterativeVerifier) SanityCheckParameters() error { - if v.CursorConfig == nil { - return errors.New("CursorConfig must not be nil") - } - - if v.BinlogStreamer == nil { - return errors.New("BinlogStreamer must not be nil") - } - - if v.SourceDB == nil { - return errors.New("SourceDB must not be nil") - } - - if v.TargetDB == nil { - return errors.New("TargetDB must not be nil") - } - - if v.Concurrency <= 0 { - return fmt.Errorf("iterative verifier concurrency must be greater than 0, not %d", v.Concurrency) - } - - return nil -} - -func (v *IterativeVerifier) Initialize() error { - v.logger = logrus.WithField("tag", "iterative_verifier") - - if err := v.SanityCheckParameters(); err != nil { - v.logger.WithError(err).Error("iterative verifier parameter sanity check failed") - return err - } - - v.reverifyStore = NewReverifyStore() - return nil -} - -func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error) { - v.logger.Info("starting one-off verification of all tables") - - err := v.iterateAllTables(func(pk uint64, tableSchema *TableSchema) error { - return VerificationResult{ - DataCorrect: false, - Message: fmt.Sprintf("verification failed on table: %s for pk: %d", tableSchema.String(), pk), - IncorrectTables: []string{tableSchema.String()}, - } - }) - - v.logger.Info("one-off verification complete") - - switch e := err.(type) { - case VerificationResult: - return e, nil - default: - return NewCorrectVerificationResult(), e - } -} - -func (v *IterativeVerifier) VerifyBeforeCutover() error { - if v.TableSchemaCache == nil { - return fmt.Errorf("iterative verifier must be given the table schema cache before starting verify before cutover") - } - - v.logger.Info("starting pre-cutover verification") - - v.logger.Debug("attaching binlog event listener") - v.BinlogStreamer.AddEventListener(v.binlogEventListener) - - v.logger.Debug("verifying all tables") - err := v.iterateAllTables(func(pk uint64, tableSchema *TableSchema) error { - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: tableSchema}) - return nil - }) - - if err == nil { - // This reverification phase is to reduce the size of the set of rows - // that need to be reverified during cutover. Failures during - // reverification at this point could have been caused by still - // ongoing writes and we therefore just re-add those rows to the - // store rather than failing the move prematurely. - err = v.reverifyUntilStoreIsSmallEnough(30) - } - - v.logger.Info("pre-cutover verification complete") - v.beforeCutoverVerifyDone = true - - return err -} - -func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error) { - v.logger.Info("starting verification during cutover") - v.verifyDuringCutoverStarted.Set(true) - result, err := v.verifyStore("iterative_verifier_during_cutover", []MetricTag{}) - v.logger.Info("cutover verification complete") - - return result, err -} - -func (v *IterativeVerifier) StartInBackground() error { - if v.logger == nil { - return errors.New("Initialize() must be called before this") - } - - if !v.beforeCutoverVerifyDone { - return errors.New("VerifyBeforeCutover() must be called before this") - } - - if v.verifyDuringCutoverStarted.Get() { - return errors.New("verification during cutover has already been started") - } - - v.verificationResultAndStatus = VerificationResultAndStatus{ - StartTime: time.Now(), - DoneTime: time.Time{}, - } - v.verificationErr = nil - v.backgroundVerificationWg = &sync.WaitGroup{} - - v.logger.Info("starting iterative verification in the background") - - v.backgroundVerificationWg.Add(1) - go func() { - defer func() { - v.backgroundDoneTime = time.Now() - v.backgroundVerificationWg.Done() - }() - - v.verificationResultAndStatus.VerificationResult, v.verificationErr = v.VerifyDuringCutover() - v.verificationResultAndStatus.DoneTime = time.Now() - }() - - return nil -} - -func (v *IterativeVerifier) Wait() { - v.backgroundVerificationWg.Wait() -} - -func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error) { - return v.verificationResultAndStatus, v.verificationErr -} - -func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) { - sql, args, err := GetMd5HashesSql(schema, table, pkColumn, columns, pks) - if err != nil { - return nil, err - } - - // This query must be a prepared query. If it is not, querying will use - // MySQL's plain text interface, which will scan all values into []uint8 - // if we give it []interface{}. - stmt, err := db.Prepare(sql) - if err != nil { - return nil, err - } - - defer stmt.Close() - - rows, err := stmt.Query(args...) - if err != nil { - return nil, err - } - - defer rows.Close() - - resultSet := make(map[uint64][]byte) - for rows.Next() { - rowData, err := ScanGenericRow(rows, 2) - if err != nil { - return nil, err - } - - pk, err := rowData.GetUint64(0) - if err != nil { - return nil, err - } - - resultSet[pk] = rowData[1].([]byte) - } - return resultSet, nil -} - -func (v *IterativeVerifier) reverifyUntilStoreIsSmallEnough(maxIterations int) error { - var timeToVerify time.Duration - - for iteration := 0; iteration < maxIterations; iteration++ { - before := v.reverifyStore.RowCount - start := time.Now() - - _, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", string(iteration)}}) - if err != nil { - return err - } - - after := v.reverifyStore.RowCount - timeToVerify = time.Now().Sub(start) - - v.logger.WithFields(logrus.Fields{ - "store_size_before": before, - "store_size_after": after, - "iteration": iteration, - }).Infof("completed re-verification iteration %d", iteration) - - if after <= 1000 || after >= before { - break - } - } - - if v.MaxExpectedDowntime != 0 && timeToVerify > v.MaxExpectedDowntime { - return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify) - } - - return nil -} - -func (v *IterativeVerifier) iterateAllTables(mismatchedPkFunc func(uint64, *TableSchema) error) error { - pool := &WorkerPool{ - Concurrency: v.Concurrency, - Process: func(tableIndex int) (interface{}, error) { - table := v.Tables[tableIndex] - - if v.tableIsIgnored(table) { - return nil, nil - } - - err := v.iterateTableFingerprints(table, mismatchedPkFunc) - if err != nil { - v.logger.WithError(err).WithField("table", table.String()).Error("error occured during table verification") - } - return nil, err - }, - } - - _, err := pool.Run(len(v.Tables)) - - return err -} - -func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatchedPkFunc func(uint64, *TableSchema) error) error { - // The cursor will stop iterating when it cannot find anymore rows, - // so it will not iterate until MaxUint64. - cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64) - - // It only needs the PKs, not the entire row. - cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPKColumn(0).Name)} - return cursor.Each(func(batch *RowBatch) error { - metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{ - MetricTag{"table", table.Name}, - MetricTag{"source", "iterative_verifier_before_cutover"}, - }, 1.0) - - pks := make([]uint64, 0, batch.Size()) - - for _, rowData := range batch.Values() { - pk, err := rowData.GetUint64(batch.PkIndex()) - if err != nil { - return err - } - - pks = append(pks, pk) - } - - mismatchedPks, err := v.compareFingerprints(pks, batch.TableSchema()) - if err != nil { - v.logger.WithError(err).Errorf("failed to fingerprint table %s", batch.TableSchema().String()) - return err - } - - if len(mismatchedPks) > 0 { - v.logger.WithFields(logrus.Fields{ - "table": batch.TableSchema().String(), - "mismatched_pks": mismatchedPks, - }).Info("found mismatched rows") - - for _, pk := range mismatchedPks { - err := mismatchedPkFunc(pk, batch.TableSchema()) - if err != nil { - return err - } - } - } - - return nil - }) -} - -func (v *IterativeVerifier) verifyStore(sourceTag string, additionalTags []MetricTag) (VerificationResult, error) { - allBatches := v.reverifyStore.FlushAndBatchByTable(int(v.CursorConfig.BatchSize)) - v.logger.WithField("batches", len(allBatches)).Debug("reverifying") - - if len(allBatches) == 0 { - return NewCorrectVerificationResult(), nil - } - - erroredOrFailed := errors.New("verification of store errored or failed") - - pool := &WorkerPool{ - Concurrency: v.Concurrency, - Process: func(reverifyBatchIndex int) (interface{}, error) { - reverifyBatch := allBatches[reverifyBatchIndex] - table := v.TableSchemaCache.Get(reverifyBatch.Table.SchemaName, reverifyBatch.Table.TableName) - - tags := append([]MetricTag{ - MetricTag{"table", table.Name}, - MetricTag{"source", sourceTag}, - }, additionalTags...) - - metrics.Count("RowEvent", int64(len(reverifyBatch.Pks)), tags, 1.0) - - v.logger.WithFields(logrus.Fields{ - "table": table.String(), - "len(pks)": len(reverifyBatch.Pks), - }).Debug("received pk batch to reverify") - - verificationResult, mismatchedPks, err := v.reverifyPks(table, reverifyBatch.Pks) - resultAndErr := verificationResultAndError{verificationResult, err} - - // If we haven't entered the cutover phase yet, then reverification failures - // could have been caused by ongoing writes. We will just re-add the rows for - // the cutover verification and ignore the failure at this point here. - if err == nil && !v.beforeCutoverVerifyDone { - for _, pk := range mismatchedPks { - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: table}) - } - - resultAndErr.Result = NewCorrectVerificationResult() - } - - if resultAndErr.ErroredOrFailed() { - if resultAndErr.Error != nil { - v.logger.WithError(resultAndErr.Error).Error("error occured in reverification") - } else { - v.logger.Errorf("failed reverification: %s", resultAndErr.Result.Message) - } - - return resultAndErr, erroredOrFailed - } - - return resultAndErr, nil - }, - } - - results, _ := pool.Run(len(allBatches)) - - var result VerificationResult - var err error - for i := 0; i < v.Concurrency; i++ { - if results[i] == nil { - // This means the worker pool exited early and another goroutine - // must have returned an error. - continue - } - - resultAndErr := results[i].(verificationResultAndError) - result = resultAndErr.Result - err = resultAndErr.Error - - if resultAndErr.ErroredOrFailed() { - break - } - } - - return result, err -} - -func (v *IterativeVerifier) reverifyPks(table *TableSchema, pks []uint64) (VerificationResult, []uint64, error) { - mismatchedPks, err := v.compareFingerprints(pks, table) - if err != nil { - return VerificationResult{}, mismatchedPks, err - } - - if len(mismatchedPks) == 0 { - return NewCorrectVerificationResult(), mismatchedPks, nil - } - - pkStrings := make([]string, len(mismatchedPks)) - for idx, pk := range mismatchedPks { - pkStrings[idx] = strconv.FormatUint(pk, 10) - } - - return VerificationResult{ - DataCorrect: false, - Message: fmt.Sprintf("verification failed on table: %s for pks: %s", table.String(), strings.Join(pkStrings, ",")), - IncorrectTables: []string{table.String()}, - }, mismatchedPks, nil -} - -func (v *IterativeVerifier) binlogEventListener(evs []DMLEvent) error { - if v.verifyDuringCutoverStarted.Get() { - return fmt.Errorf("cutover has started but received binlog event!") - } - - for _, ev := range evs { - if v.tableIsIgnored(ev.TableSchema()) { - continue - } - - pk, err := ev.PK() - if err != nil { - return err - } - - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: ev.TableSchema()}) - } - - return nil -} - -func (v *IterativeVerifier) tableIsIgnored(table *TableSchema) bool { - for _, ignored := range v.IgnoredTables { - if table.Name == ignored { - return true - } - } - - return false -} - -func (v *IterativeVerifier) columnsToVerify(table *TableSchema) []schema.TableColumn { - ignoredColsSet, containsIgnoredColumns := v.IgnoredColumns[table.Name] - if !containsIgnoredColumns { - return table.Columns - } - - var columns []schema.TableColumn - for _, column := range table.Columns { - if _, isIgnored := ignoredColsSet[column.Name]; !isIgnored { - columns = append(columns, column) - } - } - - return columns -} - -func (v *IterativeVerifier) compareFingerprints(pks []uint64, table *TableSchema) ([]uint64, error) { - targetDb := table.Schema - if targetDbName, exists := v.DatabaseRewrites[targetDb]; exists { - targetDb = targetDbName - } - - targetTable := table.Name - if targetTableName, exists := v.TableRewrites[targetTable]; exists { - targetTable = targetTableName - } - - wg := &sync.WaitGroup{} - wg.Add(2) - - var sourceHashes map[uint64][]byte - var sourceErr error - go func() { - defer wg.Done() - sourceErr = WithRetries(5, 0, v.logger, "get fingerprints from source db", func() (err error) { - sourceHashes, err = v.GetHashes(v.SourceDB, table.Schema, table.Name, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - return - }) - }() - - var targetHashes map[uint64][]byte - var targetErr error - go func() { - defer wg.Done() - targetErr = WithRetries(5, 0, v.logger, "get fingerprints from target db", func() (err error) { - targetHashes, err = v.GetHashes(v.TargetDB, targetDb, targetTable, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - return - }) - }() - - wg.Wait() - if sourceErr != nil { - return nil, sourceErr - } - if targetErr != nil { - return nil, targetErr - } - - mismatches := compareHashes(sourceHashes, targetHashes) - if len(mismatches) > 0 && v.CompressionVerifier != nil && v.CompressionVerifier.IsCompressedTable(table.Name) { - return v.compareCompressedHashes(targetDb, targetTable, table, pks) - } - - return mismatches, nil -} - -func (v *IterativeVerifier) compareCompressedHashes(targetDb, targetTable string, table *TableSchema, pks []uint64) ([]uint64, error) { - sourceHashes, err := v.CompressionVerifier.GetCompressedHashes(v.SourceDB, table.Schema, table.Name, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - if err != nil { - return nil, err - } - - targetHashes, err := v.CompressionVerifier.GetCompressedHashes(v.TargetDB, targetDb, targetTable, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - if err != nil { - return nil, err - } - - return compareHashes(sourceHashes, targetHashes), nil -} - -func compareHashes(source, target map[uint64][]byte) []uint64 { - mismatchSet := map[uint64]struct{}{} - - for pk, targetHash := range target { - sourceHash, exists := source[pk] - if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[pk] = struct{}{} - } - } - - for pk, sourceHash := range source { - targetHash, exists := target[pk] - if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[pk] = struct{}{} - } - } - - mismatches := make([]uint64, 0, len(mismatchSet)) - for mismatch, _ := range mismatchSet { - mismatches = append(mismatches, mismatch) - } - - return mismatches -} - -func GetMd5HashesSql(schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (string, []interface{}, error) { - quotedPK := quoteField(pkColumn) - return rowMd5Selector(columns, pkColumn). - From(QuotedTableNameFromString(schema, table)). - Where(sq.Eq{quotedPK: pks}). - OrderBy(quotedPK). - ToSql() -} - -func rowMd5Selector(columns []schema.TableColumn, pkColumn string) sq.SelectBuilder { - quotedPK := quoteField(pkColumn) - - hashStrs := make([]string, len(columns)) - for idx, column := range columns { - quotedCol := normalizeAndQuoteColumn(column) - hashStrs[idx] = fmt.Sprintf("MD5(COALESCE(%s, 'NULL'))", quotedCol) - } - - return sq.Select(fmt.Sprintf( - "%s, MD5(CONCAT(%s)) AS row_fingerprint", - quotedPK, - strings.Join(hashStrs, ","), - )) -} - -func normalizeAndQuoteColumn(column schema.TableColumn) (quoted string) { - quoted = quoteField(column.Name) - if column.Type == schema.TYPE_FLOAT { - quoted = fmt.Sprintf("(if (%s = '-0', 0, %s))", quoted, quoted) - } - return -} diff --git a/table_schema_cache.go b/table_schema_cache.go index 0a557cd0..4a3986e6 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -306,3 +306,11 @@ func maxPk(db *sql.DB, table *TableSchema) (uint64, bool, error) { return maxPrimaryKey, true, nil } } + +func normalizeAndQuoteColumn(column schema.TableColumn) (quoted string) { + quoted = quoteField(column.Name) + if column.Type == schema.TYPE_FLOAT { + quoted = fmt.Sprintf("(if (%s = '-0', 0, %s))", quoted, quoted) + } + return +} diff --git a/test/go/iterative_verifier_collation_test.go b/test/go/iterative_verifier_collation_test.go deleted file mode 100644 index 4a82b4a1..00000000 --- a/test/go/iterative_verifier_collation_test.go +++ /dev/null @@ -1,118 +0,0 @@ -package test - -import ( - "database/sql" - "fmt" - "testing" - - "github.com/Shopify/ghostferry/testhelpers" - "github.com/stretchr/testify/suite" -) - -type IterativeVerifierCollationTestSuite struct { - *IterativeVerifierTestSuite - - unsafeDb *sql.DB - asciiData string - utf8mb3Data string - utf8mb4Data string -} - -func (t *IterativeVerifierCollationTestSuite) SetupTest() { - t.IterativeVerifierTestSuite.SetupTest() - - unsafeDbConfig := t.Ferry.Source - t.Require().Equal("'STRICT_ALL_TABLES,NO_BACKSLASH_ESCAPES'", unsafeDbConfig.Params["sql_mode"]) - unsafeDbConfig.Params["sql_mode"] = "'NO_BACKSLASH_ESCAPES'" - - unsafeConfig, err := t.Ferry.Source.MySQLConfig() - t.Require().Nil(err) - - unsafeDSN := unsafeConfig.FormatDSN() - - t.unsafeDb, err = sql.Open("mysql", unsafeDSN) - t.Require().Nil(err) - - t.asciiData = "foobar" - t.utf8mb3Data = "これは普通なストリングです" - t.utf8mb4Data = "𠜎𠜱𠝹𠱓𠱸𠲖𠳏𠳕𠴕𠵼𠵿𠸎𠸏𠹷" -} - -func (t *IterativeVerifierCollationTestSuite) TearDownTest() { - t.IterativeVerifierTestSuite.TearDownTest() -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfAsciiValueDoesNotChangeFromUtf8Mb3ToUtf8Mb4() { - t.AssertIdentical(t.asciiData, "utf8mb3", "utf8mb4") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfAsciiValueDoesNotChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertIdentical(t.asciiData, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb3ValueDoesNotChangeFromUtf8Mb3ToUtf8Mb4() { - t.AssertIdentical(t.utf8mb3Data, "utf8mb3", "utf8mb4") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb3ValueDoesNotChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertIdentical(t.utf8mb3Data, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb4ValueDoesChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertDifferent(t.utf8mb4Data, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) AssertIdentical(data, from, to string) { - fingerprints := t.GetHashesFromDifferentCollations(data, from, to) - t.Require().Equal(fingerprints[0], fingerprints[1]) -} - -func (t *IterativeVerifierCollationTestSuite) AssertDifferent(data, from, to string) { - fingerprints := t.GetHashesFromDifferentCollations(data, from, to) - t.Require().NotEqual(fingerprints[0], fingerprints[1]) -} - -func (t *IterativeVerifierCollationTestSuite) GetHashesFromDifferentCollations(data, from, to string) []string { - var fingerprints []string - - t.SetDataColumnCollation(from) - t.InsertRow(42, data) - fingerprints = append(fingerprints, t.GetHashes([]uint64{42})[0]) - - t.SetDataColumnCollation(to) - fingerprints = append(fingerprints, t.GetHashes([]uint64{42})[0]) - - for _, fingerprint := range fingerprints { - t.Require().True(fingerprint != "") - } - - return fingerprints -} - -func (t *IterativeVerifierCollationTestSuite) SetDataColumnCollation(charset string) { - var collation string - if charset == "utf8mb4" { - collation = "utf8mb4_unicode_ci" - } else if charset == "utf8mb3" { - collation = "utf8_unicode_ci" - } - t.Require().True(collation != "") - - _, err := t.unsafeDb.Exec(fmt.Sprintf( - "ALTER TABLE %s.%s MODIFY data VARCHAR(255) CHARACTER SET %s COLLATE %s", - testhelpers.TestSchemaName, - testhelpers.TestTable1Name, - charset, - collation, - )) - t.Require().Nil(err) -} - -func TestIterativeVerifierCollationTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &IterativeVerifierCollationTestSuite{ - IterativeVerifierTestSuite: &IterativeVerifierTestSuite{ - GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}, - }, - }) -} diff --git a/test/go/iterative_verifier_integration_test.go b/test/go/iterative_verifier_integration_test.go deleted file mode 100644 index 15850abe..00000000 --- a/test/go/iterative_verifier_integration_test.go +++ /dev/null @@ -1,293 +0,0 @@ -package test - -import ( - "database/sql" - "testing" - - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/testhelpers" - "github.com/siddontang/go-mysql/schema" - "github.com/stretchr/testify/assert" -) - -func TestHashesSql(t *testing.T) { - columns := []schema.TableColumn{schema.TableColumn{Name: "id"}, schema.TableColumn{Name: "data"}, schema.TableColumn{Name: "float_col", Type: schema.TYPE_FLOAT}} - pks := []uint64{1, 5, 42} - - sql, args, err := ghostferry.GetMd5HashesSql("gftest", "test_table", "id", columns, pks) - - assert.Nil(t, err) - assert.Equal(t, "SELECT `id`, MD5(CONCAT(MD5(COALESCE(`id`, 'NULL')),MD5(COALESCE(`data`, 'NULL')),MD5(COALESCE((if (`float_col` = '-0', 0, `float_col`)), 'NULL')))) "+ - "AS row_fingerprint FROM `gftest`.`test_table` WHERE `id` IN (?,?,?) ORDER BY `id`", sql) - for idx, arg := range args { - assert.Equal(t, pks[idx], arg.(uint64)) - } -} - -func TestVerificationFailsDeletedRow(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - deleteTestRowsToTriggerFailure(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.False(t, result.DataCorrect) - assert.Regexp(t, "verification failed.*gftest.table1.*pks: (43)|(42)|(43,42)|(42,43)", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestVerificationFailsUpdatedRow(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyDataColumnInSourceDB(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.False(t, result.DataCorrect) - assert.Regexp(t, "verification failed.*gftest.table1.*pks: (42)|(43)|(43,42)|(42,43)", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestIgnoresColumns(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupSingleTableDatabase(ferry, sourceDB, targetDB) - iterativeVerifier.IgnoredColumns = map[string]map[string]struct{}{"table1": {"data": struct{}{}}} - }, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyDataColumnInSourceDB(ferry) - - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - assert.Equal(t, "", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestIgnoresTables(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupSingleTableDatabase(ferry, sourceDB, targetDB) - iterativeVerifier.IgnoredTables = []string{"table1"} - }, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyAllRows(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestVerificationPasses(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - } - - testcase.Run() - assert.True(t, ran) -} - -func setupIterativeVerifierFromFerry(v *ghostferry.IterativeVerifier, f *ghostferry.Ferry) { - v.CursorConfig = &ghostferry.CursorConfig{ - DB: f.SourceDB, - BatchSize: f.Config.DataIterationBatchSize, - ReadRetries: f.Config.DBReadRetries, - } - - v.BinlogStreamer = f.BinlogStreamer - v.SourceDB = f.SourceDB - v.TargetDB = f.TargetDB - v.Tables = f.Tables.AsSlice() - v.TableSchemaCache = f.Tables - v.Concurrency = 2 -} - -func ensureTestRowsAreReverified(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (42, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (43, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"43\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (44, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"44\"") - testhelpers.PanicIfError(err) -} - -func modifyDataColumnInSourceDB(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\" WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\" WHERE id = \"43\"") - testhelpers.PanicIfError(err) -} - -func modifyAllRows(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.TargetDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\"") - testhelpers.PanicIfError(err) -} - -func deleteTestRowsToTriggerFailure(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.TargetDB.Exec("DELETE FROM gftest.table1 WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.TargetDB.Exec("DELETE FROM gftest.table1 WHERE id = \"43\"") - testhelpers.PanicIfError(err) -} diff --git a/test/go/iterative_verifier_test.go b/test/go/iterative_verifier_test.go deleted file mode 100644 index a211896e..00000000 --- a/test/go/iterative_verifier_test.go +++ /dev/null @@ -1,477 +0,0 @@ -package test - -import ( - "database/sql" - "fmt" - "sort" - "testing" - "time" - - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/testhelpers" - "github.com/siddontang/go-mysql/schema" - "github.com/stretchr/testify/suite" -) - -type IterativeVerifierTestSuite struct { - *testhelpers.GhostferryUnitTestSuite - - verifier *ghostferry.IterativeVerifier - db *sql.DB - table *ghostferry.TableSchema -} - -func (t *IterativeVerifierTestSuite) SetupTest() { - t.GhostferryUnitTestSuite.SetupTest() - t.SeedSourceDB(0) - t.SeedTargetDB(0) - - tableCompressions := make(ghostferry.TableColumnCompressionConfig) - tableCompressions[testhelpers.TestCompressedTable1Name] = make(map[string]string) - tableCompressions[testhelpers.TestCompressedTable1Name][testhelpers.TestCompressedColumn1Name] = ghostferry.CompressionSnappy - - compressionVerifier, err := ghostferry.NewCompressionVerifier(tableCompressions) - if err != nil { - t.FailNow(err.Error()) - } - - t.verifier = &ghostferry.IterativeVerifier{ - CompressionVerifier: compressionVerifier, - CursorConfig: &ghostferry.CursorConfig{ - DB: t.Ferry.SourceDB, - BatchSize: t.Ferry.Config.DataIterationBatchSize, - ReadRetries: t.Ferry.Config.DBReadRetries, - }, - BinlogStreamer: t.Ferry.BinlogStreamer, - SourceDB: t.Ferry.SourceDB, - TargetDB: t.Ferry.TargetDB, - - Concurrency: 1, - } - - t.db = t.Ferry.SourceDB - t.reloadTables() - - err = t.verifier.Initialize() - testhelpers.PanicIfError(err) -} - -func (t *IterativeVerifierTestSuite) TearDownTest() { - t.GhostferryUnitTestSuite.TearDownTest() -} - -func (t *IterativeVerifierTestSuite) TestNothingToVerify() { - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOnceWithIgnoredColumns() { - ignoredColumns := map[string]map[string]struct{}{"test_table_1": {"data": struct{}{}}} - t.verifier.IgnoredColumns = ignoredColumns - - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOnceFails() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal("verification failed on table: gftest.test_table_1 for pk: 42", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyCompressedOnceFails() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData2, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal( - fmt.Sprintf("verification failed on table: %s.%s for pk: %s", testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name, "42"), - result.Message, - ) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOncePass() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "foo", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyCompressedOncePass() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyDifferentCompressedSameDecompressedDataOncePass() { - t.Require().NotEqual(testhelpers.TestCompressedData3, testhelpers.TestCompressedData4) - - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData3, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData4, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverFailuresFailAgainDuringCutover() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal("verification failed on table: gftest.test_table_1 for pks: 42", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverCompressionFailuresFailAgainDuringCutover() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData2, t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal(fmt.Sprintf("verification failed on table: %s.%s for pks: %s", "gftest", testhelpers.TestCompressedTable1Name, "42"), result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverDifferentCompressedSameDecompressedDataPassDuringCutover() { - t.Require().NotEqual(testhelpers.TestCompressedData3, testhelpers.TestCompressedData4) - - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData3, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData4, t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestErrorsIfMaxDowntimeIsSurpassed() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - t.verifier.MaxExpectedDowntime = 1 * time.Nanosecond - err := t.verifier.VerifyBeforeCutover() - t.Require().Regexp("cutover stage verification will not complete within max downtime duration \\(took .*\\)", err.Error()) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverFailuresPassDuringCutover() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - t.UpdateRowInDb(42, "foo", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestChangingDataChangesHash() { - t.InsertRow(42, "foo") - old := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "bar") - new := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(old, new) -} - -func (t *IterativeVerifierTestSuite) TestDeduplicatesHashes() { - t.InsertRow(42, "foo") - - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, []uint64{42, 42}) - t.Require().Nil(err) - t.Require().Equal(1, len(hashes)) -} - -func (t *IterativeVerifierTestSuite) TestDoesntReturnHashIfRecordDoesntExist() { - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, []uint64{42, 42}) - t.Require().Nil(err) - t.Require().Equal(0, len(hashes)) -} - -func (t *IterativeVerifierTestSuite) TestUnrelatedRowsDontAffectHash() { - t.InsertRow(42, "foo") - expected := t.GetHashes([]uint64{42})[0] - - t.InsertRow(43, "bar") - actual := t.GetHashes([]uint64{42})[0] - - t.Require().Equal(expected, actual) -} - -func (t *IterativeVerifierTestSuite) TestRowsWithSameDataButDifferentPKs() { - t.InsertRow(42, "foo") - t.InsertRow(43, "foo") - - hashes := t.GetHashes([]uint64{42, 43}) - t.Require().NotEqual(hashes[0], hashes[1]) -} - -func (t *IterativeVerifierTestSuite) TestPositiveAndNegativeZeroFloat() { - _, err := t.db.Exec("ALTER TABLE gftest.test_table_1 MODIFY data float") - t.Require().Nil(err) - t.reloadTables() - - _, err = t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, \"0.0\")") - t.Require().Nil(err) - - expected := t.GetHashes([]uint64{42})[0] - - _, err = t.db.Exec("UPDATE gftest.test_table_1 SET data=\"-0.0\" WHERE id=42") - t.Require().Nil(err) - - actual := t.GetHashes([]uint64{42})[0] - - t.Require().Equal(expected, actual) -} - -func (t *IterativeVerifierTestSuite) TestChangingNumberValueChangesHash() { - _, err := t.db.Exec("ALTER TABLE gftest.test_table_1 MODIFY data bigint(20)") - t.Require().Nil(err) - t.reloadTables() - - _, err = t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, -100)") - t.Require().Nil(err) - - neg := t.GetHashes([]uint64{42})[0] - - _, err = t.db.Exec("UPDATE gftest.test_table_1 SET data=100 WHERE id=42") - t.Require().Nil(err) - - pos := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(neg, pos) -} - -func (t *IterativeVerifierTestSuite) TestNULLValues() { - _, err := t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, NULL)") - t.Require().Nil(err) - null := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "") - empty := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "foo") - foo := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(null, empty) - t.Require().NotEqual(foo, empty) - t.Require().NotEqual(foo, null) -} - -func (t *IterativeVerifierTestSuite) InsertRow(id int, data string) { - t.InsertRowInDb(id, data, t.db) -} - -func (t *IterativeVerifierTestSuite) InsertRowInDb(id int, data string, db *sql.DB) { - _, err := db.Exec(fmt.Sprintf("INSERT INTO %s.%s VALUES (%d,\"%s\")", testhelpers.TestSchemaName, testhelpers.TestTable1Name, id, data)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) InsertCompressedRowInDb(id int, data string, db *sql.DB) { - t.SetColumnType(testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name, testhelpers.TestCompressedColumn1Name, "MEDIUMBLOB", db) - _, err := db.Exec("INSERT INTO "+testhelpers.TestSchemaName+"."+testhelpers.TestCompressedTable1Name+" VALUES (?,?)", id, data) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) SetColumnType(schema, table, column, columnType string, db *sql.DB) { - t.Require().True(columnType != "") - - _, err := db.Exec(fmt.Sprintf( - "ALTER TABLE %s.%s MODIFY %s %s", - schema, - table, - column, - columnType, - )) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) UpdateRow(id int, data string) { - t.UpdateRowInDb(id, data, t.db) -} - -func (t *IterativeVerifierTestSuite) UpdateRowInDb(id int, data string, db *sql.DB) { - _, err := db.Exec(fmt.Sprintf("UPDATE %s.%s SET data=\"%s\" WHERE id=%d", testhelpers.TestSchemaName, testhelpers.TestTable1Name, data, id)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) DeleteRow(id int) { - _, err := t.db.Exec(fmt.Sprintf("DELETE FROM %s.%s WHERE id=%d", testhelpers.TestSchemaName, testhelpers.TestTable1Name, id)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) GetHashes(ids []uint64) []string { - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, ids) - t.Require().Nil(err) - t.Require().Equal(len(hashes), len(ids)) - - res := make([]string, len(ids)) - - for idx, id := range ids { - hash, ok := hashes[id] - t.Require().True(ok) - t.Require().True(len(hash) > 0) - - res[idx] = string(hash) - } - - return res -} - -func (t *IterativeVerifierTestSuite) reloadTables() { - tableFilter := &testhelpers.TestTableFilter{ - DbsFunc: testhelpers.DbApplicabilityFilter([]string{testhelpers.TestSchemaName}), - TablesFunc: nil, - } - - tables, err := ghostferry.LoadTables(t.db, tableFilter, nil, nil) - t.Require().Nil(err) - - t.Ferry.Tables = tables - t.verifier.Tables = tables.AsSlice() - t.verifier.TableSchemaCache = tables - - t.table = tables.Get(testhelpers.TestSchemaName, testhelpers.TestTable1Name) - t.Require().NotNil(t.table) -} - -type ReverifyStoreTestSuite struct { - suite.Suite - - store *ghostferry.ReverifyStore -} - -func (t *ReverifyStoreTestSuite) SetupTest() { - t.store = ghostferry.NewReverifyStore() -} - -func (t *ReverifyStoreTestSuite) TestAddEntryIntoReverifyStoreWillDeduplicate() { - pk1 := uint64(100) - pk2 := uint64(101) - table1 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table1"}} - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk2, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk2, Table: table1}) - - t.Require().Equal(uint64(2), t.store.RowCount) - t.Require().Equal(1, len(t.store.MapStore)) - t.Require().Equal( - map[uint64]struct{}{ - pk1: struct{}{}, - pk2: struct{}{}, - }, - t.store.MapStore[ghostferry.TableIdentifier{"gftest", "table1"}], - ) -} - -func (t *ReverifyStoreTestSuite) TestFlushAndBatchByTableWillCreateReverifyBatchesAndClearTheMapStore() { - expectedTable1Pks := make([]uint64, 0, 55) - table1 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table1"}} - table2 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table2"}} - for i := uint64(100); i < 155; i++ { - t.store.Add(ghostferry.ReverifyEntry{Pk: i, Table: table1}) - expectedTable1Pks = append(expectedTable1Pks, i) - } - - expectedTable2Pks := make([]uint64, 0, 45) - for i := uint64(200); i < 245; i++ { - t.store.Add(ghostferry.ReverifyEntry{Pk: i, Table: table2}) - expectedTable2Pks = append(expectedTable2Pks, i) - } - - batches := t.store.FlushAndBatchByTable(10) - t.Require().Equal(11, len(batches)) - table1Batches := make([]ghostferry.ReverifyBatch, 0) - table2Batches := make([]ghostferry.ReverifyBatch, 0) - - for _, batch := range batches { - switch batch.Table.TableName { - case "table1": - table1Batches = append(table1Batches, batch) - case "table2": - table2Batches = append(table2Batches, batch) - } - } - - t.Require().Equal(6, len(table1Batches)) - t.Require().Equal(5, len(table2Batches)) - - actualTable1Pks := make([]uint64, 0) - for _, batch := range table1Batches { - for _, pk := range batch.Pks { - actualTable1Pks = append(actualTable1Pks, pk) - } - } - - sort.Slice(actualTable1Pks, func(i, j int) bool { return actualTable1Pks[i] < actualTable1Pks[j] }) - t.Require().Equal(expectedTable1Pks, actualTable1Pks) - - actualTable2Pks := make([]uint64, 0) - for _, batch := range table2Batches { - for _, pk := range batch.Pks { - actualTable2Pks = append(actualTable2Pks, pk) - } - } - - sort.Slice(actualTable2Pks, func(i, j int) bool { return actualTable2Pks[i] < actualTable2Pks[j] }) - t.Require().Equal(expectedTable2Pks, actualTable2Pks) - - t.Require().Equal(0, len(t.store.MapStore)) -} - -func TestIterativeVerifierTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &IterativeVerifierTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) -} - -func TestReverifyStoreTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &ReverifyStoreTestSuite{}) -} diff --git a/test/integration/iterative_verifier_test.rb b/test/integration/iterative_verifier_test.rb deleted file mode 100644 index cae3b86c..00000000 --- a/test/integration/iterative_verifier_test.rb +++ /dev/null @@ -1,56 +0,0 @@ -require "test_helper" - -class IterativeVerifierTest < GhostferryTestCase - def setup - seed_simple_database_with_single_table - end - - def test_iterative_verifier_succeeds_in_normal_run - datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Iterative" }) - - start_datawriter_with_ghostferry(datawriter, ghostferry) - stop_datawriter_during_cutover(datawriter, ghostferry) - - verification_ran = false - ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables| - verification_ran = true - assert_equal 0, incorrect_tables.length - end - - ghostferry.run - assert verification_ran - assert_test_table_is_identical - end - - def test_iterative_verifier_fails_if_binlog_streamer_incorrectly_copies_data - datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Iterative" }) - - table_name = DEFAULT_FULL_TABLE_NAME - - chosen_id = 0 - verification_ran = false - ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do - result = source_db.query("SELECT id FROM #{table_name} ORDER BY id LIMIT 1") - chosen_id = result.first["id"] - - refute chosen_id == 0 - source_db.query("UPDATE #{table_name} SET data = 'something' WHERE id = #{chosen_id}") - end - - ghostferry.on_status(Ghostferry::Status::VERIFY_DURING_CUTOVER) do - refute chosen_id == 0 - source_db.query("DELETE FROM #{table_name} WHERE id = #{chosen_id}") - end - - ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables| - verification_ran = true - - assert_equal ["gftest.test_table_1"], incorrect_tables - end - - ghostferry.run - assert verification_ran - end -end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index 5a30cdaa..868a4260 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -224,15 +224,7 @@ func NewStandardConfig() (*ghostferry.Config, error) { } } - verifierType := os.Getenv("GHOSTFERRY_VERIFIER_TYPE") - if verifierType == ghostferry.VerifierTypeIterative { - config.VerifierType = ghostferry.VerifierTypeIterative - config.IterativeVerifierConfig = ghostferry.IterativeVerifierConfig{ - Concurrency: 2, - } - } else if verifierType != "" { - config.VerifierType = verifierType - } + config.VerifierType = os.Getenv("GHOSTFERRY_VERIFIER_TYPE") return config, config.ValidateConfig() }