Skip to content

Commit

Permalink
Add Graceful termination
Browse files Browse the repository at this point in the history
  • Loading branch information
frnmjn committed Nov 27, 2024
1 parent 20a9b3b commit aa10655
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
38 changes: 33 additions & 5 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Amqpx.Gen.Consumer do
prefetch_count: 50,
backoff: 5_000,
connection_name: Amqpx.Gen.ConnectionManager,
requeue_on_reject: true
requeue_on_reject: true,
cancel?: false
]

@type state() :: %__MODULE__{}
Expand All @@ -26,6 +27,8 @@ defmodule Amqpx.Gen.Consumer do

@gen_server_opts [:name, :timeout, :debug, :spawn_opt, :hibernate_after]

@signal_handler Application.compile_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler)

@spec start_link(opts :: map()) :: GenServer.server()
def start_link(opts) do
{gen_server_opts, opts} = Map.split(opts, @gen_server_opts)
Expand Down Expand Up @@ -188,6 +191,7 @@ defmodule Amqpx.Gen.Consumer do
def terminate(_, %__MODULE__{channel: nil}), do: nil

def terminate(reason, %__MODULE__{channel: channel}) do
Logger.warn("Terminating consumer with reason #{inspect(reason)}")
case reason do
:normal -> close_channel(channel)
:shutdown -> close_channel(channel)
Expand All @@ -211,17 +215,24 @@ defmodule Amqpx.Gen.Consumer do

defp handle_message(
message,
%{delivery_tag: tag, redelivered: redelivered} = meta,
%{delivery_tag: tag, redelivered: redelivered, consumer_tag: consumer_tag} = meta,
%__MODULE__{
handler_module: handler_module,
handler_state: handler_state,
backoff: backoff,
requeue_on_reject: requeue_on_reject
} = state
) do
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}
state = cancel?(state, consumer_tag)

if stopping?() do
close_channel(state.channel)
state
else
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}
end
rescue
e in _ ->
Logger.error(Exception.format(:error, e, __STACKTRACE__))
Expand All @@ -242,4 +253,21 @@ defmodule Amqpx.Gen.Consumer do

state
end

@spec draining?() :: boolean()
def draining?(), do: apply(@signal_handler, :draining?, [])

def stopping?(), do: apply(@signal_handler, :stopping?, [])

defp cancel?(%{cancel?: true} = state, _), do: state

defp cancel?(state, consumer_tag) do
Logger.info("Cancelling consumer #{consumer_tag}")

if draining?() do
Basic.cancel(state.channel, consumer_tag)
end

%{state | cancel?: true}
end
end
14 changes: 14 additions & 0 deletions lib/amqp/no_signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Amqpx.NoSignalHandler do
@moduledoc """
Dummy signal handler module that does handle the graceful termination.
"""

@behaviour Amqpx.SignalHandler

@impl true
def draining?, do: false

@impl true
def stopping?, do: false

end
20 changes: 20 additions & 0 deletions lib/amqp/signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Amqpx.SignalHandler do
@moduledoc """
Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application.
In the context of Rabbitmq, it will:
cancel the channel when we are in draining mode to stop prefetch new messages.
close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume.
Check in Peano how to use it.
"""
@doc """
Check if the application is in draining mode.
"""
@callback draining? :: boolean

@doc """
Check if the application is in stopping mode.
"""
@callback stopping? :: boolean
end

0 comments on commit aa10655

Please sign in to comment.