diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 2fc35e0..5ffb639 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -3957,664 +3957,612 @@ def up end end - describe "#adjust_lock_timeout" do - let(:table_name) { "bogus_table" } - let(:migration) { Class.new(migration_klass).new } - - before(:each) do - ActiveRecord::Base.connection.execute("CREATE TABLE #{table_name}(pk SERIAL, i INTEGER)") - end - - around(:each) do |example| - @original_timeout_raw_value = ActiveRecord::Base.value_from_sql("SHOW lock_timeout") - @original_timeout_in_milliseconds = @original_timeout_raw_value.sub(/s\Z/, '').to_i * 1000 - begin - example.run - ensure - ActiveRecord::Base.connection.execute("SET lock_timeout = #{@original_timeout_in_milliseconds};") + describe "unsafe transformations" do + it "renames create_table to unsafe_create_table" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + end end - end - it "runs the block" do - expect do |block| - migration.adjust_lock_timeout(5, &block) - end.to yield_control - end + migration.suppress_messages { migration.migrate(:up) } - it "changes the lock_timeout to the requested value in seconds" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - migration.adjust_lock_timeout(seconds) do - expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq("#{seconds}s") - end + expect(ActiveRecord::Base.connection.tables).to include("foos") end - it "resets the lock_timeout to the original values even after an exception" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - expect do - migration.adjust_lock_timeout(seconds) do - raise "bogus error" + it "renames drop_table to unsafe_drop_table" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_drop_table :foos end - end.to raise_error("bogus error") + end + + migration.suppress_messages { migration.migrate(:up) } - expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq(@original_timeout_raw_value) + expect(ActiveRecord::Base.connection.tables).not_to include("foos") end - it "resets the lock_timeout to the original values even after a SQL failure in a transaction" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - expect do - migration.connection.transaction do - migration.adjust_lock_timeout(seconds) do - ActiveRecord::Base.connection.execute("select bogus;") - end + it "renames add_column to unsafe_add_column" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_add_column :foos, :bar, :integer end - end.to raise_error(ActiveRecord::StatementInvalid, /PG::UndefinedColumn/) - - expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq(@original_timeout_raw_value) - end - end + end - describe "#adjust_statement_timeout" do - let(:table_name) { "bogus_table" } - let(:migration) { Class.new(migration_klass).new } + migration.suppress_messages { migration.migrate(:up) } - before(:each) do - ActiveRecord::Base.connection.execute("CREATE TABLE #{table_name}(pk SERIAL, i INTEGER)") + expect(ActiveRecord::Base.connection.tables).to include("foos") + expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("bar") end - around(:each) do |example| - @original_timeout_raw_value = ActiveRecord::Base.value_from_sql("SHOW statement_timeout") - @original_timeout_in_milliseconds = @original_timeout_raw_value.sub(/s\Z/, '').to_i * 1000 - begin - example.run - ensure - ActiveRecord::Base.connection.execute("SET statement_timeout = #{@original_timeout_in_milliseconds};") + it "renames change_table to unsafe_change_table" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_change_table :foos do |t| + t.string :bar + end + end end - end - it "runs the block" do - expect do |block| - migration.adjust_statement_timeout(5, &block) - end.to yield_control - end + migration.suppress_messages { migration.migrate(:up) } - it "changes the statement_timeout to the requested value in seconds" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - migration.adjust_statement_timeout(seconds) do - expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq("#{seconds}s") - end + expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("bar") end - it "resets the statement_timeout to the original values even after an exception" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - expect do - migration.adjust_statement_timeout(seconds) do - raise "bogus error" + it "renames rename_table to unsafe_rename_table" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table :foos + unsafe_rename_table :foos, :bars end - end.to raise_error("bogus error") - - expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq(@original_timeout_raw_value) - end + end - it "resets the statement_timeout to the original values even after a SQL failure in a transaction" do - seconds = (@original_timeout_in_milliseconds / 1000) + 5 - expect do - migration.connection.transaction do - migration.adjust_statement_timeout(seconds) do - migration.connection.execute("select bogus;") - end - end - end.to raise_error(ActiveRecord::StatementInvalid, /PG::UndefinedColumn/) + migration.suppress_messages { migration.migrate(:up) } - expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq(@original_timeout_raw_value) + expect(ActiveRecord::Base.connection.tables).not_to include("foos") + expect(ActiveRecord::Base.connection.tables).to include("bars") end - end - ["bogus_table", :bogus_table, "public.bogus_table"].each do |table_name| - describe "#safely_acquire_lock_for_table #{table_name} of type #{table_name.class.name}" do - let(:alternate_connection_pool) do - ActiveRecord::ConnectionAdapters::ConnectionPool.new(pool_config) - end - let(:alternate_connection) do - # The #connection method was deprecated in Rails 7.2 in favor of #lease_connection - if alternate_connection_pool.respond_to?(:lease_connection) - alternate_connection_pool.lease_connection - else - alternate_connection_pool.connection + it "renames rename_column to unsafe_rename_column" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table(:foos) { |t| t.string :bar } + unsafe_rename_column :foos, :bar, :baz end end - let(:migration) { Class.new(migration_klass).new } - - before(:each) do - ActiveRecord::Base.connection.execute(<<~SQL) - CREATE TABLE #{table_name}(pk SERIAL, i INTEGER); - CREATE SCHEMA partman; - CREATE EXTENSION pg_partman SCHEMA partman; - SQL - end - after(:each) do - alternate_connection_pool.disconnect! - end + migration.suppress_messages { migration.migrate(:up) } - it "executes the block" do - expect do |block| - migration.safely_acquire_lock_for_table(table_name, &block) - end.to yield_control - end + expect(ActiveRecord::Base.connection.tables).to include("foos") + expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).not_to include("bar") + expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("baz") + end - it "acquires an exclusive lock on the table by default" do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) + it "renames change_column to unsafe_change_column" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table(:foos) { |t| t.string :bar } + unsafe_change_column :foos, :bar, :text end end - it "acquires a lock in a different mode when provided" do - migration.safely_acquire_lock_for_table(table_name, mode: :share) do - expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "ShareLock", - granted: true, - pid: kind_of(Integer), - ) - ) - end - end + migration.suppress_messages { migration.migrate(:up) } - it "raises error when invalid lock mode provided" do - expect do - migration.safely_acquire_lock_for_table(table_name, mode: :garbage) {} - end.to raise_error( - ArgumentError, - "Unrecognized lock mode :garbage. Valid modes: [:access_share, :row_share, :row_exclusive, :share_update_exclusive, :share, :share_row_exclusive, :exclusive, :access_exclusive]" - ) - end + expect(ActiveRecord::Base.connection.columns("foos").detect { |c| c.name == "bar" }.type).to eq(:text) + end - it "releases the lock (even after an exception)" do - begin - migration.safely_acquire_lock_for_table(table_name) do - raise "bogus error" - end - rescue - # Throw away error. + it "renames remove_column to unsafe_remove_column" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table(:foos) { |t| t.string :bar } + unsafe_remove_column :foos, :bar end - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end - it "waits to acquire a lock if the table is already blocked" do - block_call_count = 0 - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(3).times do |*args| - # Verify that the method under test hasn't taken out a lock. - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + migration.suppress_messages { migration.migrate(:up) } - block_call_count += 1 - if block_call_count < 3 - [PgHaMigrations::BlockingDatabaseTransactions::LongRunningTransaction.new("", "", 5, "active", [["bogus_table", "public", "AccessExclusiveLock"]])] - else - [] - end - end + expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).not_to include("bar") + end - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - end + it "renames add_index to unsafe_add_index" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table(:foos) { |t| t.string :bar } + unsafe_add_index :foos, :bar end end - it "does not wait to acquire a lock if the table has an existing but non-conflicting lock" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + migration.suppress_messages { migration.migrate(:up) } - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK bogus_table IN EXCLUSIVE MODE; - SELECT pg_sleep(2); - SQL - end + expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).to include(["bar"]) + end - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .once - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name, mode: :access_share) do - locks_for_table = locks_for_table(table_name, connection: alternate_connection) - - aggregate_failures do - expect(locks_for_table).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "ExclusiveLock", - granted: true, - pid: kind_of(Integer), - ), - having_attributes( - table: "bogus_table", - lock_type: "AccessShareLock", - granted: true, - pid: kind_of(Integer), - ), - ) - - expect(locks_for_table.first.pid).to_not eq(locks_for_table.last.pid) - end - end - end - ensure - thread.join + it "renames execute to unsafe_execute" do + migration = Class.new(migration_klass) do + def up + unsafe_execute "CREATE TABLE foos ( pk serial )" end end - it "waits to acquire a lock if the table has an existing and conflicting lock" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + migration.suppress_messages { migration.migrate(:up) } - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK bogus_table IN SHARE UPDATE EXCLUSIVE MODE; - SELECT pg_sleep(3); - SQL - end + expect(ActiveRecord::Base.connection.tables).to include("foos") + end - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .at_least(2) - .times - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name, mode: :share_row_exclusive) do - expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "ShareRowExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - end - end - ensure - thread.join + it "renames remove_index to unsafe_remove_index" do + migration = Class.new(migration_klass) do + def up + unsafe_create_table(:foos) { |t| t.string :bar } + unsafe_add_index :foos, :bar end end + migration.suppress_messages { migration.migrate(:up) } + expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).to include(["bar"]) - it "does not wait to acquire a lock if a table with the same name but in different schema is blocked" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - - ActiveRecord::Base.connection.execute("CREATE TABLE partman.bogus_table(pk SERIAL, i INTEGER)") - - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK partman.bogus_table; - SELECT pg_sleep(2); - SQL - end - - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .once - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name) do - locks_for_table = locks_for_table(table_name, connection: alternate_connection) - locks_for_other_table = locks_for_table("partman.bogus_table", connection: alternate_connection) - - aggregate_failures do - expect(locks_for_table).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_other_table).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_table.first.pid).to_not eq(locks_for_other_table.first.pid) - end - end - end - ensure - thread.join + migration = Class.new(migration_klass) do + def up + unsafe_remove_index :foos, :bar end end + migration.suppress_messages { migration.migrate(:up) } - it "waits to acquire a lock if the table is partitioned and child table is blocked" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - - ActiveRecord::Base.connection.drop_table(table_name) - create_range_partitioned_table(table_name, migration_klass, with_partman: true) - - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK bogus_table_default; - SELECT pg_sleep(3); - SQL - end + expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).not_to include(["bar"]) + end + end + end + end + end - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .at_least(2) - .times - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name) do - locks_for_parent = locks_for_table(table_name, connection: alternate_connection) - locks_for_child = locks_for_table("bogus_table_default", connection: alternate_connection) - - aggregate_failures do - expect(locks_for_parent).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_child).to contain_exactly( - having_attributes( - table: "bogus_table_default", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_parent.first.pid).to eq(locks_for_child.first.pid) - end - end - end - ensure - thread.join - end - end + describe "utility methods" do + let(:migration_klass) { ActiveRecord::Migration::Current } - it "waits to acquire a lock if the table is partitioned and child sub-partition is blocked" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + describe "#adjust_lock_timeout" do + let(:table_name) { "bogus_table" } + let(:migration) { Class.new(migration_klass).new } - ActiveRecord::Base.connection.drop_table(table_name) - create_range_partitioned_table(table_name, migration_klass) - create_range_partitioned_table("#{table_name}_sub", migration_klass, with_partman: true) - ActiveRecord::Base.connection.execute(<<~SQL) - ALTER TABLE bogus_table - ATTACH PARTITION bogus_table_sub - FOR VALUES FROM ('2020-01-01') TO ('2020-02-01') - SQL + before(:each) do + ActiveRecord::Base.connection.execute("CREATE TABLE #{table_name}(pk SERIAL, i INTEGER)") + end - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK bogus_table_sub_default; - SELECT pg_sleep(3); - SQL - end + around(:each) do |example| + @original_timeout_raw_value = ActiveRecord::Base.value_from_sql("SHOW lock_timeout") + @original_timeout_in_milliseconds = @original_timeout_raw_value.sub(/s\Z/, '').to_i * 1000 + begin + example.run + ensure + ActiveRecord::Base.connection.execute("SET lock_timeout = #{@original_timeout_in_milliseconds};") + end + end - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .at_least(2) - .times - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name) do - locks_for_parent = locks_for_table(table_name, connection: alternate_connection) - locks_for_sub = locks_for_table("bogus_table_sub_default", connection: alternate_connection) - - aggregate_failures do - expect(locks_for_parent).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_sub).to contain_exactly( - having_attributes( - table: "bogus_table_sub_default", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_parent.first.pid).to eq(locks_for_sub.first.pid) - end - end - end - ensure - thread.join - end - end + it "runs the block" do + expect do |block| + migration.adjust_lock_timeout(5, &block) + end.to yield_control + end - it "waits to acquire a lock if the table is non-natively partitioned and child table is blocked" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + it "changes the lock_timeout to the requested value in seconds" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + migration.adjust_lock_timeout(seconds) do + expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq("#{seconds}s") + end + end - ActiveRecord::Base.connection.execute(<<~SQL) - CREATE TABLE bogus_table_child(pk SERIAL, i INTEGER) INHERITS (#{table_name}) - SQL + it "resets the lock_timeout to the original values even after an exception" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + expect do + migration.adjust_lock_timeout(seconds) do + raise "bogus error" + end + end.to raise_error("bogus error") - begin - thread = Thread.new do - ActiveRecord::Base.connection.execute(<<~SQL) - LOCK bogus_table_child; - SELECT pg_sleep(3); - SQL - end + expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq(@original_timeout_raw_value) + end - sleep 1.1 - - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) - .at_least(2) - .times - .and_call_original - - migration.suppress_messages do - migration.safely_acquire_lock_for_table(table_name) do - locks_for_parent = locks_for_table(table_name, connection: alternate_connection) - locks_for_child = locks_for_table("bogus_table_child", connection: alternate_connection) - - aggregate_failures do - expect(locks_for_parent).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_child).to contain_exactly( - having_attributes( - table: "bogus_table_child", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ) - ) - - expect(locks_for_parent.first.pid).to eq(locks_for_child.first.pid) - end - end - end - ensure - thread.join - end + it "resets the lock_timeout to the original values even after a SQL failure in a transaction" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + expect do + migration.connection.transaction do + migration.adjust_lock_timeout(seconds) do + ActiveRecord::Base.connection.execute("select bogus;") end + end + end.to raise_error(ActiveRecord::StatementInvalid, /PG::UndefinedColumn/) - it "fails lock acquisition quickly if Postgres doesn't grant an exclusive lock but then retries" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + expect(ActiveRecord::Base.value_from_sql("SHOW lock_timeout")).to eq(@original_timeout_raw_value) + end + end - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(2).times.and_return([]) + describe "#adjust_statement_timeout" do + let(:table_name) { "bogus_table" } + let(:migration) { Class.new(migration_klass).new } - alternate_connection.execute("BEGIN; LOCK #{table_name};") + before(:each) do + ActiveRecord::Base.connection.execute("CREATE TABLE #{table_name}(pk SERIAL, i INTEGER)") + end - lock_call_count = 0 - time_before_lock_calls = Time.now + around(:each) do |example| + @original_timeout_raw_value = ActiveRecord::Base.value_from_sql("SHOW statement_timeout") + @original_timeout_in_milliseconds = @original_timeout_raw_value.sub(/s\Z/, '').to_i * 1000 + begin + example.run + ensure + ActiveRecord::Base.connection.execute("SET statement_timeout = #{@original_timeout_in_milliseconds};") + end + end - allow(ActiveRecord::Base.connection).to receive(:execute).at_least(:once).and_call_original - expect(ActiveRecord::Base.connection).to receive(:execute).with("LOCK \"public\".\"bogus_table\" IN ACCESS EXCLUSIVE MODE;").exactly(2).times.and_wrap_original do |m, *args| - lock_call_count += 1 + it "runs the block" do + expect do |block| + migration.adjust_statement_timeout(5, &block) + end.to yield_control + end - if lock_call_count == 2 - # Get rid of the lock we were holding. - alternate_connection.execute("ROLLBACK;") - end + it "changes the statement_timeout to the requested value in seconds" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + migration.adjust_statement_timeout(seconds) do + expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq("#{seconds}s") + end + end - return_value = nil - exception = nil - begin - return_value = m.call(*args) - rescue => e - exception = e - end + it "resets the statement_timeout to the original values even after an exception" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + expect do + migration.adjust_statement_timeout(seconds) do + raise "bogus error" + end + end.to raise_error("bogus error") - if lock_call_count == 1 - # First lock attempt should fail fast. - expect(Time.now - time_before_lock_calls).to be >= 1.seconds - expect(Time.now - time_before_lock_calls).to be < 5.seconds - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - - expect(migration).to receive(:sleep).with(1 * PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER) # Stubbed seconds times multiplier - else - # Second lock attempt should succeed. - expect(exception).not_to be_present - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - end + expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq(@original_timeout_raw_value) + end - if exception - raise exception - else - return_value - end - end - - expect do - migration.safely_acquire_lock_for_table(table_name) { } - end.to output(/Timed out trying to acquire ACCESS EXCLUSIVE lock.+"public"\."bogus_table"/m).to_stdout + it "resets the statement_timeout to the original values even after a SQL failure in a transaction" do + seconds = (@original_timeout_in_milliseconds / 1000) + 5 + expect do + migration.connection.transaction do + migration.adjust_statement_timeout(seconds) do + migration.connection.execute("select bogus;") end + end + end.to raise_error(ActiveRecord::StatementInvalid, /PG::UndefinedColumn/) + + expect(ActiveRecord::Base.value_from_sql("SHOW statement_timeout")).to eq(@original_timeout_raw_value) + end + end - it "doesn't kill a long running query inside of the lock" do - stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + describe "ensure_small_table!" do + it "does not raise error when empty: false and table is below threshold and has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos - migration.safely_acquire_lock_for_table(table_name) do - time_before_select_call = Time.now - expect do - ActiveRecord::Base.connection.execute("SELECT pg_sleep(3)") - end.not_to raise_error - time_after_select_call = Time.now + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end - expect(time_after_select_call - time_before_select_call).to be >= 3.seconds - end + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to_not raise_error + end + + it "does not raise error when empty: true and table is below threshold and is empty" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to_not raise_error + end + + it "raises error when empty: true and table has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/pg_total_relation_size/) + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows") + end + + it "raises error when empty: true and table is above threshold and is empty" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, empty: true, threshold: 1.kilobyte + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") + end + + it "raises error when empty: false and table is above threshold and has rows" do + setup_migration = Class.new(migration_klass) do + def up + safe_create_table :foos + + unsafe_execute "INSERT INTO foos DEFAULT VALUES" + end + end + + setup_migration.suppress_messages { setup_migration.migrate(:up) } + + test_migration = Class.new(migration_klass) do + def up + ensure_small_table! :foos, threshold: 1.kilobyte + end + end + + allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original + expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) + expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + + expect do + test_migration.suppress_messages { test_migration.migrate(:up) } + end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") + end + end + + ["bogus_table", :bogus_table, "public.bogus_table"].each do |table_name| + describe "#safely_acquire_lock_for_table #{table_name} of type #{table_name.class.name}" do + let(:alternate_connection_pool) do + ActiveRecord::ConnectionAdapters::ConnectionPool.new(pool_config) + end + let(:alternate_connection) do + # The #connection method was deprecated in Rails 7.2 in favor of #lease_connection + if alternate_connection_pool.respond_to?(:lease_connection) + alternate_connection_pool.lease_connection + else + alternate_connection_pool.connection + end + end + let(:migration) { Class.new(migration_klass).new } + + before(:each) do + ActiveRecord::Base.connection.execute(<<~SQL) + CREATE TABLE #{table_name}(pk SERIAL, i INTEGER); + CREATE SCHEMA partman; + CREATE EXTENSION pg_partman SCHEMA partman; + SQL + end + + after(:each) do + alternate_connection_pool.disconnect! + end + + it "executes the block" do + expect do |block| + migration.safely_acquire_lock_for_table(table_name, &block) + end.to yield_control + end + + it "acquires an exclusive lock on the table by default" do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + + it "acquires a lock in a different mode when provided" do + migration.safely_acquire_lock_for_table(table_name, mode: :share) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "ShareLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + + it "raises error when invalid lock mode provided" do + expect do + migration.safely_acquire_lock_for_table(table_name, mode: :garbage) {} + end.to raise_error( + ArgumentError, + "Unrecognized lock mode :garbage. Valid modes: [:access_share, :row_share, :row_exclusive, :share_update_exclusive, :share, :share_row_exclusive, :exclusive, :access_exclusive]" + ) + end + + it "releases the lock (even after an exception)" do + begin + migration.safely_acquire_lock_for_table(table_name) do + raise "bogus error" end + rescue + # Throw away error. + end + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end - it "prints out helpful information when waiting for a lock" do - blocking_queries_calls = 0 - expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(2).times do |*args| - blocking_queries_calls += 1 - if blocking_queries_calls == 1 - [PgHaMigrations::BlockingDatabaseTransactions::LongRunningTransaction.new("", "some_sql_query", "active", 5, [["bogus_table", "public", "AccessExclusiveLock"]])] - else - [] - end - end + it "waits to acquire a lock if the table is already blocked" do + block_call_count = 0 + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(3).times do |*args| + # Verify that the method under test hasn't taken out a lock. + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - expect do - migration = Class.new(migration_klass) do - class_attribute :table_name, instance_accessor: true + block_call_count += 1 + if block_call_count < 3 + [PgHaMigrations::BlockingDatabaseTransactions::LongRunningTransaction.new("", "", 5, "active", [["bogus_table", "public", "AccessExclusiveLock"]])] + else + [] + end + end - self.table_name = table_name + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty + end + end + end - def up - safely_acquire_lock_for_table(table_name) { } - end - end + it "does not wait to acquire a lock if the table has an existing but non-conflicting lock" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - migration.migrate(:up) - end.to output(/blocking transactions.+tables.+bogus_table.+some_sql_query/m).to_stdout + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK bogus_table IN EXCLUSIVE MODE; + SELECT pg_sleep(2); + SQL end - it "allows re-entrancy" do - migration.safely_acquire_lock_for_table(table_name) do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + sleep 1.1 + + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .once + .and_call_original + + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name, mode: :access_share) do + locks_for_table = locks_for_table(table_name, connection: alternate_connection) + + aggregate_failures do + expect(locks_for_table).to contain_exactly( having_attributes( table: "bogus_table", - lock_type: "AccessExclusiveLock", + lock_type: "ExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + having_attributes( + table: "bogus_table", + lock_type: "AccessShareLock", granted: true, pid: kind_of(Integer), ), ) + + expect(locks_for_table.first.pid).to_not eq(locks_for_table.last.pid) end + end + end + ensure + thread.join + end + end + + it "waits to acquire a lock if the table has an existing and conflicting lock" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK bogus_table IN SHARE UPDATE EXCLUSIVE MODE; + SELECT pg_sleep(3); + SQL + end + sleep 1.1 + + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .at_least(2) + .times + .and_call_original + + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name, mode: :share_row_exclusive) do expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( having_attributes( table: "bogus_table", - lock_type: "AccessExclusiveLock", + lock_type: "ShareRowExclusiveLock", granted: true, pid: kind_of(Integer), - ), + ) ) end + end + ensure + thread.join + end + end - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + it "does not wait to acquire a lock if a table with the same name but in different schema is blocked" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + + ActiveRecord::Base.connection.execute("CREATE TABLE partman.bogus_table(pk SERIAL, i INTEGER)") + + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK partman.bogus_table; + SELECT pg_sleep(2); + SQL end - it "allows re-entrancy when inner lock is a lower level" do - migration.safely_acquire_lock_for_table(table_name) do - migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) do - locks_for_table = locks_for_table(table_name, connection: alternate_connection) - - aggregate_failures do - expect(locks_for_table).to contain_exactly( - having_attributes( - table: "bogus_table", - lock_type: "AccessExclusiveLock", - granted: true, - pid: kind_of(Integer), - ), - having_attributes( - table: "bogus_table", - lock_type: "ExclusiveLock", - granted: true, - pid: kind_of(Integer), - ), - ) + sleep 1.1 - expect(locks_for_table.first.pid).to eq(locks_for_table.last.pid) - end - end + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .once + .and_call_original + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name) do locks_for_table = locks_for_table(table_name, connection: alternate_connection) + locks_for_other_table = locks_for_table("partman.bogus_table", connection: alternate_connection) aggregate_failures do expect(locks_for_table).to contain_exactly( @@ -4623,379 +4571,435 @@ def up lock_type: "AccessExclusiveLock", granted: true, pid: kind_of(Integer), - ), + ) + ) + + expect(locks_for_other_table).to contain_exactly( having_attributes( table: "bogus_table", - lock_type: "ExclusiveLock", # Postgres releases the inner lock once the outer lock is released + lock_type: "AccessExclusiveLock", granted: true, pid: kind_of(Integer), - ), + ) ) - expect(locks_for_table.first.pid).to eq(locks_for_table.last.pid) + expect(locks_for_table.first.pid).to_not eq(locks_for_other_table.first.pid) end end - - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + ensure + thread.join + end + end - it "does not allow re-entrancy when lock escalation detected" do - expect do - migration.safely_acquire_lock_for_table(table_name, mode: :share) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - - # attempting a nested lock twice to ensure the - # thread variable doesn't incorrectly get reset - expect do - migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) {} - end.to raise_error( - PgHaMigrations::InvalidMigrationError, - "Lock escalation detected! Cannot change lock level from :share to :exclusive for \"public\".\"bogus_table\"." - ) - - # the exception above was caught and therefore the parent lock shouldn't be released yet - expect(locks_for_table(table_name, connection: alternate_connection)).to_not be_empty + it "waits to acquire a lock if the table is partitioned and child table is blocked" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) {} - end - end.to raise_error( - PgHaMigrations::InvalidMigrationError, - "Lock escalation detected! Cannot change lock level from :share to :exclusive for \"public\".\"bogus_table\"." - ) + ActiveRecord::Base.connection.drop_table(table_name) + create_range_partitioned_table(table_name, migration_klass, with_partman: true) - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK bogus_table_default; + SELECT pg_sleep(3); + SQL end - it "raises error when attempting nested lock on different table" do - ActiveRecord::Base.connection.execute("CREATE TABLE foo(pk SERIAL, i INTEGER)") + sleep 1.1 - expect do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - - # attempting a nested lock twice to ensure the - # thread variable doesn't incorrectly get reset - expect do - migration.safely_acquire_lock_for_table("foo") - end.to raise_error( - PgHaMigrations::InvalidMigrationError, - "Nested lock detected! Cannot acquire lock on \"public\".\"foo\" while \"public\".\"bogus_table\" is locked." - ) + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .at_least(2) + .times + .and_call_original - migration.safely_acquire_lock_for_table("foo") - end - end.to raise_error( - PgHaMigrations::InvalidMigrationError, - "Nested lock detected! Cannot acquire lock on \"public\".\"foo\" while \"public\".\"bogus_table\" is locked." - ) + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name) do + locks_for_parent = locks_for_table(table_name, connection: alternate_connection) + locks_for_child = locks_for_table("bogus_table_default", connection: alternate_connection) - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - end + aggregate_failures do + expect(locks_for_parent).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) - it "uses statement_timeout instead of lock_timeout when on Postgres 9.1" do - allow(ActiveRecord::Base.connection).to receive(:postgresql_version).and_wrap_original do |m, *args| - if caller.detect { |line| line =~ /lib\/pg_ha_migrations\/blocking_database_transactions\.rb/ } - # The long-running transactions check needs to know the actual - # Postgres version to use the proper columns, so we don't want - # to mock any calls from it. - m.call(*args) - else - 9_01_12 - end - end + expect(locks_for_child).to contain_exactly( + having_attributes( + table: "bogus_table_default", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) - expect do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty + expect(locks_for_parent.first.pid).to eq(locks_for_child.first.pid) end - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - end.not_to make_database_queries(matching: /lock_timeout/i) + end end + ensure + thread.join end end - describe "ensure_small_table!" do - it "does not raise error when empty: false and table is below threshold and has rows" do - setup_migration = Class.new(migration_klass) do - def up - safe_create_table :foos - - unsafe_execute "INSERT INTO foos DEFAULT VALUES" - end - end + it "waits to acquire a lock if the table is partitioned and child sub-partition is blocked" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - setup_migration.suppress_messages { setup_migration.migrate(:up) } + ActiveRecord::Base.connection.drop_table(table_name) + create_range_partitioned_table(table_name, migration_klass) + create_range_partitioned_table("#{table_name}_sub", migration_klass, with_partman: true) + ActiveRecord::Base.connection.execute(<<~SQL) + ALTER TABLE bogus_table + ATTACH PARTITION bogus_table_sub + FOR VALUES FROM ('2020-01-01') TO ('2020-02-01') + SQL - test_migration = Class.new(migration_klass) do - def up - ensure_small_table! :foos - end + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK bogus_table_sub_default; + SELECT pg_sleep(3); + SQL end - allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original - expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original - - expect do - test_migration.suppress_messages { test_migration.migrate(:up) } - end.to_not raise_error - end + sleep 1.1 - it "does not raise error when empty: true and table is below threshold and is empty" do - setup_migration = Class.new(migration_klass) do - def up - safe_create_table :foos - end - end + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .at_least(2) + .times + .and_call_original - setup_migration.suppress_messages { setup_migration.migrate(:up) } + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name) do + locks_for_parent = locks_for_table(table_name, connection: alternate_connection) + locks_for_sub = locks_for_table("bogus_table_sub_default", connection: alternate_connection) - test_migration = Class.new(migration_klass) do - def up - ensure_small_table! :foos, empty: true - end - end + aggregate_failures do + expect(locks_for_parent).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) - allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + expect(locks_for_sub).to contain_exactly( + having_attributes( + table: "bogus_table_sub_default", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) - expect do - test_migration.suppress_messages { test_migration.migrate(:up) } - end.to_not raise_error + expect(locks_for_parent.first.pid).to eq(locks_for_sub.first.pid) + end + end + end + ensure + thread.join end + end - it "raises error when empty: true and table has rows" do - setup_migration = Class.new(migration_klass) do - def up - safe_create_table :foos + it "waits to acquire a lock if the table is non-natively partitioned and child table is blocked" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - unsafe_execute "INSERT INTO foos DEFAULT VALUES" - end + ActiveRecord::Base.connection.execute(<<~SQL) + CREATE TABLE bogus_table_child(pk SERIAL, i INTEGER) INHERITS (#{table_name}) + SQL + + begin + thread = Thread.new do + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK bogus_table_child; + SELECT pg_sleep(3); + SQL end - setup_migration.suppress_messages { setup_migration.migrate(:up) } + sleep 1.1 - test_migration = Class.new(migration_klass) do - def up - ensure_small_table! :foos, empty: true - end - end + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .at_least(2) + .times + .and_call_original - allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original - expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/pg_total_relation_size/) + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name) do + locks_for_parent = locks_for_table(table_name, connection: alternate_connection) + locks_for_child = locks_for_table("bogus_table_child", connection: alternate_connection) - expect do - test_migration.suppress_messages { test_migration.migrate(:up) } - end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" has rows") - end + aggregate_failures do + expect(locks_for_parent).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) - it "raises error when empty: true and table is above threshold and is empty" do - setup_migration = Class.new(migration_klass) do - def up - safe_create_table :foos + expect(locks_for_child).to contain_exactly( + having_attributes( + table: "bogus_table_child", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + + expect(locks_for_parent.first.pid).to eq(locks_for_child.first.pid) + end end end + ensure + thread.join + end + end - setup_migration.suppress_messages { setup_migration.migrate(:up) } + it "fails lock acquisition quickly if Postgres doesn't grant an exclusive lock but then retries" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - test_migration = Class.new(migration_klass) do - def up - ensure_small_table! :foos, empty: true, threshold: 1.kilobyte - end - end + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(2).times.and_return([]) - allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/SELECT EXISTS/).once.and_call_original - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + alternate_connection.execute("BEGIN; LOCK #{table_name};") - expect do - test_migration.suppress_messages { test_migration.migrate(:up) } - end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") - end + lock_call_count = 0 + time_before_lock_calls = Time.now - it "raises error when empty: false and table is above threshold and has rows" do - setup_migration = Class.new(migration_klass) do - def up - safe_create_table :foos + allow(ActiveRecord::Base.connection).to receive(:execute).at_least(:once).and_call_original + expect(ActiveRecord::Base.connection).to receive(:execute).with("LOCK \"public\".\"bogus_table\" IN ACCESS EXCLUSIVE MODE;").exactly(2).times.and_wrap_original do |m, *args| + lock_call_count += 1 - unsafe_execute "INSERT INTO foos DEFAULT VALUES" - end + if lock_call_count == 2 + # Get rid of the lock we were holding. + alternate_connection.execute("ROLLBACK;") end - setup_migration.suppress_messages { setup_migration.migrate(:up) } - - test_migration = Class.new(migration_klass) do - def up - ensure_small_table! :foos, threshold: 1.kilobyte - end + return_value = nil + exception = nil + begin + return_value = m.call(*args) + rescue => e + exception = e end - allow(ActiveRecord::Base.connection).to receive(:select_value).and_call_original - expect(ActiveRecord::Base.connection).to_not receive(:select_value).with(/SELECT EXISTS/) - expect(ActiveRecord::Base.connection).to receive(:select_value).with(/pg_total_relation_size/).once.and_call_original + if lock_call_count == 1 + # First lock attempt should fail fast. + expect(Time.now - time_before_lock_calls).to be >= 1.seconds + expect(Time.now - time_before_lock_calls).to be < 5.seconds + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - expect do - test_migration.suppress_messages { test_migration.migrate(:up) } - end.to raise_error(PgHaMigrations::InvalidMigrationError, "Table \"foos\" is larger than 1024 bytes") + expect(migration).to receive(:sleep).with(1 * PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER) # Stubbed seconds times multiplier + else + # Second lock attempt should succeed. + expect(exception).not_to be_present + expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty + end + + if exception + raise exception + else + return_value + end end + + expect do + migration.safely_acquire_lock_for_table(table_name) { } + end.to output(/Timed out trying to acquire ACCESS EXCLUSIVE lock.+"public"\."bogus_table"/m).to_stdout end - describe "unsafe transformations" do - it "renames create_table to unsafe_create_table" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table :foos - end - end + it "doesn't kill a long running query inside of the lock" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) - migration.suppress_messages { migration.migrate(:up) } + migration.safely_acquire_lock_for_table(table_name) do + time_before_select_call = Time.now + expect do + ActiveRecord::Base.connection.execute("SELECT pg_sleep(3)") + end.not_to raise_error + time_after_select_call = Time.now - expect(ActiveRecord::Base.connection.tables).to include("foos") + expect(time_after_select_call - time_before_select_call).to be >= 3.seconds end + end - it "renames drop_table to unsafe_drop_table" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table :foos - unsafe_drop_table :foos - end + it "prints out helpful information when waiting for a lock" do + blocking_queries_calls = 0 + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(2).times do |*args| + blocking_queries_calls += 1 + if blocking_queries_calls == 1 + [PgHaMigrations::BlockingDatabaseTransactions::LongRunningTransaction.new("", "some_sql_query", "active", 5, [["bogus_table", "public", "AccessExclusiveLock"]])] + else + [] end - - migration.suppress_messages { migration.migrate(:up) } - - expect(ActiveRecord::Base.connection.tables).not_to include("foos") end - it "renames add_column to unsafe_add_column" do + expect do migration = Class.new(migration_klass) do - def up - unsafe_create_table :foos - unsafe_add_column :foos, :bar, :integer - end - end + class_attribute :table_name, instance_accessor: true - migration.suppress_messages { migration.migrate(:up) } + self.table_name = table_name - expect(ActiveRecord::Base.connection.tables).to include("foos") - expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("bar") - end - - it "renames change_table to unsafe_change_table" do - migration = Class.new(migration_klass) do def up - unsafe_create_table :foos - unsafe_change_table :foos do |t| - t.string :bar - end + safely_acquire_lock_for_table(table_name) { } end end - migration.suppress_messages { migration.migrate(:up) } - - expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("bar") - end + migration.migrate(:up) + end.to output(/blocking transactions.+tables.+bogus_table.+some_sql_query/m).to_stdout + end - it "renames rename_table to unsafe_rename_table" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table :foos - unsafe_rename_table :foos, :bars - end + it "allows re-entrancy" do + migration.safely_acquire_lock_for_table(table_name) do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) end - migration.suppress_messages { migration.migrate(:up) } - - expect(ActiveRecord::Base.connection.tables).not_to include("foos") - expect(ActiveRecord::Base.connection.tables).to include("bars") + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) end - it "renames rename_column to unsafe_rename_column" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table(:foos) { |t| t.string :bar } - unsafe_rename_column :foos, :bar, :baz - end - end + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end - migration.suppress_messages { migration.migrate(:up) } + it "allows re-entrancy when inner lock is a lower level" do + migration.safely_acquire_lock_for_table(table_name) do + migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) do + locks_for_table = locks_for_table(table_name, connection: alternate_connection) - expect(ActiveRecord::Base.connection.tables).to include("foos") - expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).not_to include("bar") - expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).to include("baz") - end + aggregate_failures do + expect(locks_for_table).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + having_attributes( + table: "bogus_table", + lock_type: "ExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) - it "renames change_column to unsafe_change_column" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table(:foos) { |t| t.string :bar } - unsafe_change_column :foos, :bar, :text + expect(locks_for_table.first.pid).to eq(locks_for_table.last.pid) end end - migration.suppress_messages { migration.migrate(:up) } + locks_for_table = locks_for_table(table_name, connection: alternate_connection) - expect(ActiveRecord::Base.connection.columns("foos").detect { |c| c.name == "bar" }.type).to eq(:text) - end + aggregate_failures do + expect(locks_for_table).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + having_attributes( + table: "bogus_table", + lock_type: "ExclusiveLock", # Postgres releases the inner lock once the outer lock is released + granted: true, + pid: kind_of(Integer), + ), + ) - it "renames remove_column to unsafe_remove_column" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table(:foos) { |t| t.string :bar } - unsafe_remove_column :foos, :bar - end + expect(locks_for_table.first.pid).to eq(locks_for_table.last.pid) end - - migration.suppress_messages { migration.migrate(:up) } - - expect(ActiveRecord::Base.connection.columns("foos").map(&:name)).not_to include("bar") end - it "renames add_index to unsafe_add_index" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table(:foos) { |t| t.string :bar } - unsafe_add_index :foos, :bar - end - end + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end - migration.suppress_messages { migration.migrate(:up) } + it "does not allow re-entrancy when lock escalation detected" do + expect do + migration.safely_acquire_lock_for_table(table_name, mode: :share) do + expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).to include(["bar"]) - end + # attempting a nested lock twice to ensure the + # thread variable doesn't incorrectly get reset + expect do + migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) {} + end.to raise_error( + PgHaMigrations::InvalidMigrationError, + "Lock escalation detected! Cannot change lock level from :share to :exclusive for \"public\".\"bogus_table\"." + ) - it "renames execute to unsafe_execute" do - migration = Class.new(migration_klass) do - def up - unsafe_execute "CREATE TABLE foos ( pk serial )" - end + # the exception above was caught and therefore the parent lock shouldn't be released yet + expect(locks_for_table(table_name, connection: alternate_connection)).to_not be_empty + + migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) {} end + end.to raise_error( + PgHaMigrations::InvalidMigrationError, + "Lock escalation detected! Cannot change lock level from :share to :exclusive for \"public\".\"bogus_table\"." + ) - migration.suppress_messages { migration.migrate(:up) } + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end - expect(ActiveRecord::Base.connection.tables).to include("foos") - end + it "raises error when attempting nested lock on different table" do + ActiveRecord::Base.connection.execute("CREATE TABLE foo(pk SERIAL, i INTEGER)") - it "renames remove_index to unsafe_remove_index" do - migration = Class.new(migration_klass) do - def up - unsafe_create_table(:foos) { |t| t.string :bar } - unsafe_add_index :foos, :bar - end - end - migration.suppress_messages { migration.migrate(:up) } - expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).to include(["bar"]) + expect do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - migration = Class.new(migration_klass) do - def up - unsafe_remove_index :foos, :bar - end + # attempting a nested lock twice to ensure the + # thread variable doesn't incorrectly get reset + expect do + migration.safely_acquire_lock_for_table("foo") + end.to raise_error( + PgHaMigrations::InvalidMigrationError, + "Nested lock detected! Cannot acquire lock on \"public\".\"foo\" while \"public\".\"bogus_table\" is locked." + ) + + migration.safely_acquire_lock_for_table("foo") end - migration.suppress_messages { migration.migrate(:up) } + end.to raise_error( + PgHaMigrations::InvalidMigrationError, + "Nested lock detected! Cannot acquire lock on \"public\".\"foo\" while \"public\".\"bogus_table\" is locked." + ) - expect(ActiveRecord::Base.connection.indexes("foos").map(&:columns)).not_to include(["bar"]) + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end + + it "uses statement_timeout instead of lock_timeout when on Postgres 9.1" do + allow(ActiveRecord::Base.connection).to receive(:postgresql_version).and_wrap_original do |m, *args| + if caller.detect { |line| line =~ /lib\/pg_ha_migrations\/blocking_database_transactions\.rb/ } + # The long-running transactions check needs to know the actual + # Postgres version to use the proper columns, so we don't want + # to mock any calls from it. + m.call(*args) + else + 9_01_12 + end end + + expect do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty + end + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end.not_to make_database_queries(matching: /lock_timeout/i) end end end