Skip to content

Commit

Permalink
Remove the need for exact row count
Browse files Browse the repository at this point in the history
This commit removes the requirement for MySQL snapshot for requiring
the exact number of rows it will snapshot in advance. We use the
number of rows to show snapshot progress and also to know for sure
that we have snapshoted the required number of rows. Querying the
table to get the number of rows in MySQL requires a full index scan.
This can be time consuming for large tables.
We don't necessarlly need to know the exact number of tables. In a
server with up to date table statistics the estimation of table rows
will be pretty much accurate.

Fixes: #REA-4886
Closes: #1386

Release-Note-Core: Improve MySQL snapshot performance by removing the
  need for exact row count for snapshot.
Change-Id: Idf887a0b8abb8e20113b3585af49c75d3da8b05f
  • Loading branch information
altmannmarcelo committed Nov 25, 2024
1 parent 080182a commit 1d280b2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 95 deletions.
92 changes: 40 additions & 52 deletions replicators/src/mysql_connector/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::StreamExt;
use itertools::Itertools;
use mysql::prelude::Queryable;
use mysql::{Transaction, TxOpts};
use mysql_async as mysql;
use mysql_async::{self as mysql, Row};
use mysql_common::constants::ColumnType;
use mysql_srv::ColumnFlags;
use nom_sql::{DialectDisplay, NonReplicatedRelation, NotReplicatedReason, Relation};
Expand All @@ -30,7 +30,6 @@ use tracing_futures::Instrument;
use super::utils::{get_mysql_version, mysql_pad_collation_column};
use crate::db_util::DatabaseSchemas;
use crate::mysql_connector::snapshot_type::SnapshotType;
use crate::mysql_connector::utils::MYSQL_BATCH_SIZE;
use crate::table_filter::TableFilter;
use crate::TablesSnapshottingGaugeGuard;

Expand Down Expand Up @@ -438,63 +437,52 @@ impl<'a> MySqlReplicator<'a> {
let mut last_report_time = start_time;
let snapshot_report_interval_secs = snapshot_report_interval_secs as u64;

// Loop until we have no more batches to process
while cnt != nrows {
// Still have rows in this batch
loop {
let row = row_stream.next().await.map_err(log_err)?;
let df_row = match row.as_ref().map(mysql_row_to_noria_row).transpose() {
Ok(Some(df_row)) => df_row,
Ok(None) => break,
Err(err) if cnt == nrows => {
info!(error = %err, "Error encountered during snapshot, but all rows replicated successfully");
break;
}
Err(err) => {
return Err(log_err(err));
}
};
rows.push(df_row);
cnt += 1;

if rows.len() == RS_BATCH_SIZE {
// We aggregate rows into batches and then send them all to noria
let send_rows = std::mem::replace(&mut rows, Vec::with_capacity(RS_BATCH_SIZE));
table_mutator
.insert_many(send_rows)
.await
.map_err(log_err)?;
}

if cnt % MYSQL_BATCH_SIZE == 0 && cnt != nrows && snapshot_type.is_key_based() {
// Last row from batch. Update lower bound with last row.
// It's safe to unwrap here because we will break out of the loop at
// mysql_row_to_noria_row if row was None.
snapshot_type.set_lower_bound(row.as_ref().unwrap());
}

if snapshot_report_interval_secs != 0
&& last_report_time.elapsed().as_secs() > snapshot_report_interval_secs
{
last_report_time = Instant::now();
crate::log_snapshot_progress(start_time.elapsed(), cnt as i64, nrows as i64);
}
}
if cnt != nrows {
// Next batch
let mut prev_row: Option<Row> = None;
loop {
let mut row = row_stream.next().await.map_err(log_err)?;
// End of MySQL stream/batch. We should query the next batch. If the new row is None, we
// have reached the end of the table and will break out of the loop after getting
// and empty df row at mysql_row_to_noria_row.
if row.is_none() && snapshot_type.is_key_based() && prev_row.is_some() {
// Last row from batch. Get the next batch lower bound with prev row.
// It's safe to unwrap here because we will break out of the loop at
// mysql_row_to_noria_row if row was None.
row_stream = trx
.exec_iter(
&bound_base_query,
mysql::Params::Positional(snapshot_type.get_lower_bound()?),
mysql::Params::Positional(
snapshot_type.get_lower_bound(prev_row.as_ref().unwrap())?,
),
)
.await
.map_err(log_err)?;
if row_stream.is_empty() {
return Err(internal_err!(
"Snapshotting for table {:?} stopped before all rows were replicated. Next batch query returned no rows.",
table_mutator.table_name()
));
row = row_stream.next().await.map_err(log_err)?;
}
let df_row = match row.as_ref().map(mysql_row_to_noria_row).transpose() {
Ok(Some(df_row)) => df_row,
Ok(None) => break,
Err(err) => {
return Err(log_err(err));
}
};
prev_row = row;
rows.push(df_row);
cnt += 1;

if rows.len() == RS_BATCH_SIZE {
// We aggregate rows into batches and then send them all to noria
let send_rows = std::mem::replace(&mut rows, Vec::with_capacity(RS_BATCH_SIZE));
table_mutator
.insert_many(send_rows)
.await
.map_err(log_err)?;
}

if snapshot_report_interval_secs != 0
&& last_report_time.elapsed().as_secs() > snapshot_report_interval_secs
{
last_report_time = Instant::now();
crate::log_snapshot_progress(start_time.elapsed(), cnt as i64, nrows as i64);
}
}

Expand Down
60 changes: 17 additions & 43 deletions replicators/src/mysql_connector/snapshot_type.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use mysql_async::{self as mysql, Value};
use nom_sql::{Column, DialectDisplay, SqlIdentifier};
use readyset_errors::{internal_err, ReadySetResult};
use readyset_errors::ReadySetResult;

use super::utils::MYSQL_BATCH_SIZE;

Expand All @@ -11,7 +11,6 @@ pub enum SnapshotType {
KeyBased {
name: Option<SqlIdentifier>,
keys: Vec<Column>,
lower_bound: Option<Vec<mysql::Value>>,
},
FullTableScan,
}
Expand All @@ -36,33 +35,9 @@ impl SnapshotType {
Ok(SnapshotType::KeyBased {
name: name.clone(),
keys: keys.to_vec(),
lower_bound: None,
})
}

/// Get the lower bound for the next query
/// Returns:
/// * The lower bound
///
/// Errors if the snapshot type is FullTableScan or the lower bound is not set
pub fn get_lower_bound(&mut self) -> ReadySetResult<Vec<mysql::Value>> {
match self {
SnapshotType::KeyBased {
ref mut lower_bound,
..
} => {
if let Some(lb) = lower_bound.take() {
Ok(lb)
} else {
Err(internal_err!("Lower bound not set"))
}
}
SnapshotType::FullTableScan => Err(internal_err!(
"Full table scan does not require a lower bound"
)),
}
}

/// Generate the queries to be used for snapshotting the table, given the snapshot type
///
/// Arguments:
Expand All @@ -84,10 +59,16 @@ impl SnapshotType {
}
SnapshotType::FullTableScan => "".to_string(),
};
let schema = match table.table_name().schema.as_ref() {
Some(schema) => {
format!("AND TABLE_SCHEMA = '{}'", schema)
}
None => "".to_string(),
};
let count_query = format!(
"SELECT COUNT(*) FROM {} {}",
table.table_name().display(nom_sql::Dialect::MySQL),
force_index
"SELECT TABLE_ROWS FROM information_schema.tables WHERE TABLE_NAME = '{}' {}",
table.table_name().name,
schema
);
let (initial_query, bound_based_query) = match self {
SnapshotType::KeyBased { ref keys, .. } => {
Expand Down Expand Up @@ -140,24 +121,17 @@ impl SnapshotType {
(count_query, initial_query, bound_based_query)
}

/// Given a row, compute the lower bound for the next query based on the keys and update the
/// lower bound. Note that the lower bound is used twice. One get all the values greater or
/// Given a row, compute the lower bound for the next query based on the keys and return it.
/// Note that the lower bound is used twice. One get all the values greater or
/// equal to the lower bound and the other to exclude the lower bound itself.
///
/// Arguments:
/// * `row` - The row to compute the lower bound from
pub fn set_lower_bound(&mut self, row: &mysql::Row) {
pub fn get_lower_bound(&mut self, row: &mysql::Row) -> ReadySetResult<Vec<mysql::Value>> {
match self {
SnapshotType::KeyBased {
ref keys,
ref mut lower_bound,
..
} => {
let capacity = match lower_bound {
Some(lower_bound) => lower_bound.len(),
// Calculate the required capacity using the triangular number formula
None => keys.len() * (keys.len() + 1) / 2,
};
SnapshotType::KeyBased { ref keys, .. } => {
// Calculate the required capacity using the triangular number formula
let capacity = keys.len() * (keys.len() + 1) / 2;

let mut new_lower_bound = Vec::with_capacity(capacity);

Expand All @@ -173,7 +147,7 @@ impl SnapshotType {
}

// Update the lower_bound with the new values
*lower_bound = Some(new_lower_bound);
Ok(new_lower_bound)
}
SnapshotType::FullTableScan => {
unreachable!("Full table scan does not require a lower bound")
Expand Down

0 comments on commit 1d280b2

Please sign in to comment.