Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the iterative verifier #139

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

shuhaowu
Copy link
Contributor

@shuhaowu shuhaowu commented Oct 8, 2019

The InlineVerifier supercedes this code. This should eliminate a lot of confusion.

Also updated the documentation to refer to the InlineVerifier only.

Copy link
Contributor

@fjordan fjordan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. A couple grammatical nitpicks 😄

|Mechanism | ``CHECKSUM TABLE`` | Verify row before cutover; |
| | | Reverify changed rows during|
| | | cutover. |
|Mechanism | ``CHECKSUM TABLE`` | Each row is validated via a |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/a/an/ MD5 type query

InlineVerifier
--------------

Ghostferry's core algorithm has ran for millions of times and is backed by a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/ran/run

includes: floating point values and datetime columns.

The InlineVerifier is designed to catch these type of problems and fail the
run if discrepencies are detected. **It is not designed to verify that certain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out 👍

Copy link
Contributor

@hkdsun hkdsun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on deleted tests whether there's equivalent tests for them since you're most familiar with the newer tests? That would really expedite this review

GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{},
},
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have replacement for these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll comment what happened to each test inline.

@@ -1,118 +0,0 @@
package test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced in test/integration/inline_verifier_test.rb:

###################
# Collation Tests #
###################
def test_ascii_data_from_utfmb3_to_utfmb4
run_collation_test(ASCIIDATA, "utf8mb3", "utf8mb4", identical: true)
end
def test_ascii_data_from_utfmb4_to_utfmb3
run_collation_test(ASCIIDATA, "utf8mb4", "utf8mb3", identical: true)
end
def test_utfmb3_data_from_utfmb3_to_utfmb4
run_collation_test(UTF8MB3DATA, "utf8mb3", "utf8mb4", identical: true)
end
def test_utfmb3_data_from_utfmb4_to_utfmb3
run_collation_test(UTF8MB3DATA, "utf8mb4", "utf8mb3", identical: true)
end
def test_utfmb4_data_from_utfmb4_to_utfmb3
run_collation_test(UTF8MB4DATA, "utf8mb4", "utf8mb3", identical: false)
end

"github.com/stretchr/testify/assert"
)

func TestHashesSql(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (this *TableSchemaCacheTestSuite) TestFingerprintQuery() {
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil)
this.Require().Nil(err)
tables := tableSchemaCache.AsSlice()
table := tables[0]
query := table.FingerprintQuery("s", "t", 10)
this.Require().Equal("SELECT `id`,MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5 FROM `s`.`t` WHERE `id` IN (?,?,?,?,?,?,?,?,?,?)", query)
table = tables[1]
table.CompressedColumnsForVerification = map[string]string{"data": "SNAPPY"}
query = table.FingerprintQuery("s", "t", 10)
this.Require().Equal("SELECT `id`,MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5,`data` FROM `s`.`t` WHERE `id` IN (?,?,?,?,?,?,?,?,?,?)", query)
}
func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() {
tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil)
this.Require().Nil(err)
tables := tableSchemaCache.AsSlice()
table := tables[0]
query := table.RowMd5Query()
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query)
table = tables[1]
table.CompressedColumnsForVerification = map[string]string{"data": "SNAPPY"}
query = table.RowMd5Query()
this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query)
}

Copy link
Contributor Author

@shuhaowu shuhaowu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented on all the tests. Some thoughts:

  • The IterativeVerifier used the Go integration test framework which is deprecated and less powerful. The tests also abuse the fact that the verification only emit an error after cutover. Therefore, a 1:1 mapping of tests are not feasible. The Ruby integration tests with the corrupting triggers covers all the cases tho.
  • IgnoreColumn and CompressedData are covered multiple times within the IterativeVerifier test (integration AND unit). I think covering them once in the integration test is enough.
  • Due to the Go integration tests being somewhat clunky, there are a lot more unit tests for the IterativeVerifier than the InlineVerifier. The most important one of these are the tests that actually asserts the MD5 fingerprints. I think there are some value in asserting the fingerprints themselves to prevent regressions, so I'll create a follow up PR to add these tests back as it is not very easy to add them back.
  • I'm missing some tests in the ReverifyStore behaviour. The behaviour has changed a bit since we're constantly reverifying, so the tests won't look the same. I'll add these tests in a follow up PR, but there won't be an 1:1 correspondence.
  • I'm missing a successful run InlineVerifier test that I'll add in a follow up PR.
  • MaxExpectedDowntime is no longer tested. Possible unit test addition in a follow up PR.

}
}

func TestVerificationFailsDeletedRow(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test tests if a row within the reverifyStore will be detected during cutover if it was deleted on the target.

The InlineVerifier test that effectively covers this case is when the test corrupts a BinlogStreamer insert and the InlineVerifier detect it during the cutover:

def test_catches_binlog_streamer_corruption
seed_random_data(source_db, number_of_rows: 1)
seed_random_data(target_db, number_of_rows: 0)
result = source_db.query("SELECT id FROM #{DEFAULT_FULL_TABLE_NAME} LIMIT 1")
corrupting_id = result.first["id"] + 1
enable_corrupting_insert_trigger(corrupting_id)
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (#{corrupting_id}, 'data')")
end
verification_ran = false
ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables|
verification_ran = true
assert_equal ["gftest.test_table_1"], incorrect_tables
end
ghostferry.run
assert verification_ran
assert_equal "cutover verification failed for: gftest.test_table_1 [pks: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"]
end

assert.True(t, ran)
}

func TestVerificationFailsUpdatedRow(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the same as TestVerificationFailsDeletedRow, which is covered by the same case.

assert.True(t, ran)
}

func TestIgnoresColumns(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered by:

def test_different_data_in_ignored_column_passes_inline_verification
[source_db, target_db].each do |db|
db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data VARCHAR(255), data2 VARCHAR(255), primary key(id))")
end
source_db.prepare("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (?, ?, ?)").execute(1, "data1", "same")
target_db.prepare("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (?, ?, ?)").execute(1, "data2", "same")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline", ignored_column: "data" })
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = 'data3' WHERE id = 1")
end
ghostferry.run
assert_nil ghostferry.error
rows = source_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, rows.count
rows.each do |row|
assert_equal 1, row["id"]
assert_equal "data3", row["data"]
assert_equal "same", row["data2"]
end
rows = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, rows.count
rows.each do |row|
assert_equal 1, row["id"]
assert_equal "data2", row["data"]
assert_equal "same", row["data2"]
end
end

assert.True(t, ran)
}

func TestIgnoresTables(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applicable. The InlineVerifier just follows the table filter as configured globally for Ghostferry. Any tables not loaded into the TableSchemaCache will not be verified.

assert.True(t, ran)
}

func TestVerificationPasses(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a test for this.

t.Require().NotEqual(hashes[0], hashes[1])
}

func (t *IterativeVerifierTestSuite) TestPositiveAndNegativeZeroFloat() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def test_positive_negative_zero
[source_db, target_db].each do |db|
seed_random_data(db, number_of_rows: 0)
db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} MODIFY data FLOAT")
end
# If the data already exists on the target, Ghostferry's INSERT IGNORE will
# not insert again. However, the verifier should run.
# We first set the values to be different to ensure the InlineVerifier is
# indeed running as the nominal case (comparing 0.0 and -0.0) should not
# emit any error and thus we cannot say for certain if the InlineVerifier
# ran or not.
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 0.0)")
target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 1.0)")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.run_expecting_interrupt
refute_nil ghostferry.error
err_msg = ghostferry.error["ErrMessage"]
assert err_msg.include?("row fingerprints for pks [1] on #{DEFAULT_DB}.#{DEFAULT_TABLE} do not match"), message: err_msg
# Now we run the real test case.
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1")
verification_ran = false
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables|
verification_ran = true
assert_equal [], incorrect_tables
end
ghostferry.run
assert verification_ran
end

t.Require().NotEqual(neg, pos)
}

func (t *IterativeVerifierTestSuite) TestNULLValues() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def test_null_vs_null
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)")
target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)")
verification_ran = false
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables|
verification_ran = true
assert_equal [], incorrect_tables
end
ghostferry.run
assert verification_ran
end
def test_null_vs_empty_string
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)")
target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, '')")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.run_expecting_interrupt
refute_nil ghostferry.error
err_msg = ghostferry.error["ErrMessage"]
assert err_msg.include?("row fingerprints for pks [1] on #{DEFAULT_DB}.#{DEFAULT_TABLE} do not match"), message: err_msg
end
def test_null_vs_null_string
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)")
target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'NULL')")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.run_expecting_interrupt
refute_nil ghostferry.error
err_msg = ghostferry.error["ErrMessage"]
assert err_msg.include?("row fingerprints for pks [1] on #{DEFAULT_DB}.#{DEFAULT_TABLE} do not match"), message: err_msg
end
def test_null_in_different_order
seed_random_data(source_db, number_of_rows: 0)
seed_random_data(target_db, number_of_rows: 0)
source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data")
target_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data")
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL, 'data')")
target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'data', NULL)")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.run_expecting_interrupt
refute_nil ghostferry.error
err_msg = ghostferry.error["ErrMessage"]
assert err_msg.include?("row fingerprints for pks [1] on #{DEFAULT_DB}.#{DEFAULT_TABLE} do not match"), message: err_msg
end

)
}

func (t *ReverifyStoreTestSuite) TestFlushAndBatchByTableWillCreateReverifyBatchesAndClearTheMapStore() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way ReverifyStore works changed. Can add some unit tests in a follow up.

t.store = ghostferry.NewReverifyStore()
}

func (t *ReverifyStoreTestSuite) TestAddEntryIntoReverifyStoreWillDeduplicate() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way ReverifyStore works changed. Can add some unit tests in a follow up.

assert_test_table_is_identical
end

def test_iterative_verifier_fails_if_binlog_streamer_incorrectly_copies_data
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simply a ported test from the Go integration test to the ruby integration test when the ruby code was first created, thus why it is redundant.

def test_catches_binlog_streamer_corruption
seed_random_data(source_db, number_of_rows: 1)
seed_random_data(target_db, number_of_rows: 0)
result = source_db.query("SELECT id FROM #{DEFAULT_FULL_TABLE_NAME} LIMIT 1")
corrupting_id = result.first["id"] + 1
enable_corrupting_insert_trigger(corrupting_id)
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (#{corrupting_id}, 'data')")
end
verification_ran = false
ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables|
verification_ran = true
assert_equal ["gftest.test_table_1"], incorrect_tables
end
ghostferry.run
assert verification_ran
assert_equal "cutover verification failed for: gftest.test_table_1 [pks: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"]
end

@milanatshopify
Copy link
Contributor

@shuhaowu sorry for the delay, but it will probably be after BFCM before we/Hormoz have a chance to review this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants