diff --git a/lib/amqp/gen/consumer.ex b/lib/amqp/gen/consumer.ex index 390861e..62e4a7c 100644 --- a/lib/amqp/gen/consumer.ex +++ b/lib/amqp/gen/consumer.ex @@ -14,7 +14,8 @@ defmodule Amqpx.Gen.Consumer do prefetch_count: 50, backoff: 5_000, connection_name: Amqpx.Gen.ConnectionManager, - requeue_on_reject: true + requeue_on_reject: true, + cancelled?: false ] @type state() :: %__MODULE__{} @@ -86,7 +87,7 @@ defmodule Amqpx.Gen.Consumer do # Confirmation sent by the broker to the consumer process after a Basic.cancel def handle_info({:"basic.cancel_ok", _consumer_tag}, state) do - {:stop, :basic_cancel_ok, state} + {:stop, {:shutdown, :basic_cancel_ok}, state} end def handle_info( @@ -188,6 +189,8 @@ defmodule Amqpx.Gen.Consumer do def terminate(_, %__MODULE__{channel: nil}), do: nil def terminate(reason, %__MODULE__{channel: channel}) do + Logger.info("Terminating consumer with reason #{inspect(reason)}") + case reason do :normal -> close_channel(channel) :shutdown -> close_channel(channel) @@ -211,7 +214,7 @@ defmodule Amqpx.Gen.Consumer do defp handle_message( message, - %{delivery_tag: tag, redelivered: redelivered} = meta, + %{delivery_tag: tag, redelivered: redelivered, consumer_tag: consumer_tag} = meta, %__MODULE__{ handler_module: handler_module, handler_state: handler_state, @@ -219,9 +222,15 @@ defmodule Amqpx.Gen.Consumer do requeue_on_reject: requeue_on_reject } = state ) do - {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state) - Basic.ack(state.channel, tag) - %{state | handler_state: handler_state} + case handle_signals(state, consumer_tag) do + {:ok, state} -> + {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state) + Basic.ack(state.channel, tag) + %{state | handler_state: handler_state} + + {:stop, state} -> + state + end rescue e in _ -> Logger.error(Exception.format(:error, e, __STACKTRACE__)) @@ -242,4 +251,40 @@ defmodule Amqpx.Gen.Consumer do state end + + @type signal_status :: :stopping | :draining | :running + + @spec get_signal_status :: signal_status() + defp get_signal_status do + cond do + signal_handler().stopping?() -> :stopping + signal_handler().draining?() -> :draining + true -> :running + end + end + + @spec handle_signals(signal_status(), state(), String.t()) :: {:ok | :stop, state()} + defp handle_signals(signal_status \\ get_signal_status(), state, consumer_tag) + + # Close channel when we we need to stop. + defp handle_signals(:stopping, state, _) do + close_channel(state.channel) + {:stop, state} + end + + # Continue processing prefetched messages while draining + defp handle_signals(:draining, %{cancelled?: true} = state, _), do: {:ok, state} + + # Stop consuming new messages and move to cancelled state + # to continue processing prefetched messages + defp handle_signals(:draining, state, consumer_tag) do + Logger.info("Cancelling consumer #{consumer_tag}") + Basic.cancel(state.channel, consumer_tag) + {:ok, %{state | cancelled?: true}} + end + + # No signals received run as normal + defp handle_signals(:running, state, _), do: {:ok, state} + + defp signal_handler, do: Application.get_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler) end diff --git a/lib/amqp/no_signal_handler.ex b/lib/amqp/no_signal_handler.ex new file mode 100644 index 0000000..dd1df1c --- /dev/null +++ b/lib/amqp/no_signal_handler.ex @@ -0,0 +1,16 @@ +defmodule Amqpx.NoSignalHandler do + @moduledoc """ + Dummy signal handler module that does not handle the graceful termination. + + It always returns `false` for `draining?/0` and `stopping?/0`. + I.e. the comsumer will continue without handling signals. + """ + + @behaviour Amqpx.SignalHandler + + @impl true + def draining?, do: false + + @impl true + def stopping?, do: false +end diff --git a/lib/amqp/signal_handler.ex b/lib/amqp/signal_handler.ex new file mode 100644 index 0000000..2eb4ff2 --- /dev/null +++ b/lib/amqp/signal_handler.ex @@ -0,0 +1,20 @@ +defmodule Amqpx.SignalHandler do + @moduledoc """ + Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application. + In the context of Rabbitmq, it will: + cancel the channel when we are in draining mode to stop prefetch new messages. + close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume. + + Check in Peano how to use it. + + """ + @doc """ + Check if the application is in draining mode. + """ + @callback draining? :: boolean + + @doc """ + Check if the application is in stopping mode. + """ + @callback stopping? :: boolean +end diff --git a/mix.exs b/mix.exs index 59cac84..07ec337 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do [ app: :amqpx, name: "amqpx", - version: "6.0.4", + version: "6.1.0", elixir: "~> 1.7", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :production, diff --git a/test/gen_test.exs b/test/gen_test.exs index fe81ee7..113cbef 100644 --- a/test/gen_test.exs +++ b/test/gen_test.exs @@ -522,6 +522,41 @@ defmodule Amqpx.Test.AmqpxTest do end end + test "should handle stop signal by not consuming any more messages" do + start_connection1!() + start_consumer_by_name!(Consumer1) + start_producer!(:producer) + + payload = %{test: 1} + + with_mocks [ + {Amqpx.NoSignalHandler, [], stopping?: fn -> true end}, + {Consumer1, [], []} + ] do + Producer1.send_payload(payload) + :timer.sleep(50) + refute called(Consumer1.handle_message(Jason.encode!(payload), :_, :_)) + end + end + + test "should continue to handle messages while draining but not while stopping" do + start_connection1!() + start_consumer_by_name!(Consumer1) + start_producer!(:producer) + + payload = %{test: 1} + + with_mocks [ + {Amqpx.NoSignalHandler, [], stopping?: [in_series([], [false, true])], draining?: fn -> true end}, + {Consumer1, [], [handle_message: fn _, _, state -> {:ok, state} end]} + ] do + Producer1.send_payload(payload) + Producer1.send_payload(payload) + :timer.sleep(50) + assert_called_exactly(Consumer1.handle_message(Jason.encode!(payload), :_, :_), 1) + end + end + test "the consumer should stop gracefully" do start_connection1!()