diff --git a/lib/amqp/gen/consumer.ex b/lib/amqp/gen/consumer.ex index 390861e..c3b6776 100644 --- a/lib/amqp/gen/consumer.ex +++ b/lib/amqp/gen/consumer.ex @@ -14,7 +14,8 @@ defmodule Amqpx.Gen.Consumer do prefetch_count: 50, backoff: 5_000, connection_name: Amqpx.Gen.ConnectionManager, - requeue_on_reject: true + requeue_on_reject: true, + cancel?: false ] @type state() :: %__MODULE__{} @@ -26,6 +27,8 @@ defmodule Amqpx.Gen.Consumer do @gen_server_opts [:name, :timeout, :debug, :spawn_opt, :hibernate_after] + @signal_handler Application.compile_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler) + @spec start_link(opts :: map()) :: GenServer.server() def start_link(opts) do {gen_server_opts, opts} = Map.split(opts, @gen_server_opts) @@ -188,6 +191,8 @@ defmodule Amqpx.Gen.Consumer do def terminate(_, %__MODULE__{channel: nil}), do: nil def terminate(reason, %__MODULE__{channel: channel}) do + Logger.warn("Terminating consumer with reason #{inspect(reason)}") + case reason do :normal -> close_channel(channel) :shutdown -> close_channel(channel) @@ -211,7 +216,7 @@ defmodule Amqpx.Gen.Consumer do defp handle_message( message, - %{delivery_tag: tag, redelivered: redelivered} = meta, + %{delivery_tag: tag, redelivered: redelivered, consumer_tag: consumer_tag} = meta, %__MODULE__{ handler_module: handler_module, handler_state: handler_state, @@ -219,9 +224,16 @@ defmodule Amqpx.Gen.Consumer do requeue_on_reject: requeue_on_reject } = state ) do - {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state) - Basic.ack(state.channel, tag) - %{state | handler_state: handler_state} + state = cancel?(state, consumer_tag) + + if stopping?() do + close_channel(state.channel) + state + else + {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state) + Basic.ack(state.channel, tag) + %{state | handler_state: handler_state} + end rescue e in _ -> Logger.error(Exception.format(:error, e, __STACKTRACE__)) @@ -242,4 +254,21 @@ defmodule Amqpx.Gen.Consumer do state end + + @spec draining? :: boolean() + def draining?, do: @signal_handler.draining? + + def stopping?, do: @signal_handler.stopping? + + defp cancel?(%{cancel?: true} = state, _), do: state + + defp cancel?(state, consumer_tag) do + Logger.info("Cancelling consumer #{consumer_tag}") + + if draining?() do + Basic.cancel(state.channel, consumer_tag) + end + + %{state | cancel?: true} + end end diff --git a/lib/amqp/no_signal_handler.ex b/lib/amqp/no_signal_handler.ex new file mode 100644 index 0000000..ba27f1f --- /dev/null +++ b/lib/amqp/no_signal_handler.ex @@ -0,0 +1,13 @@ +defmodule Amqpx.NoSignalHandler do + @moduledoc """ + Dummy signal handler module that does handle the graceful termination. + """ + + @behaviour Amqpx.SignalHandler + + @impl true + def draining?, do: false + + @impl true + def stopping?, do: false +end diff --git a/lib/amqp/signal_handler.ex b/lib/amqp/signal_handler.ex new file mode 100644 index 0000000..2eb4ff2 --- /dev/null +++ b/lib/amqp/signal_handler.ex @@ -0,0 +1,20 @@ +defmodule Amqpx.SignalHandler do + @moduledoc """ + Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application. + In the context of Rabbitmq, it will: + cancel the channel when we are in draining mode to stop prefetch new messages. + close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume. + + Check in Peano how to use it. + + """ + @doc """ + Check if the application is in draining mode. + """ + @callback draining? :: boolean + + @doc """ + Check if the application is in stopping mode. + """ + @callback stopping? :: boolean +end