Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COART-181]: Check Gracefully termination in all our applications #208

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Amqpx.Gen.Consumer do
prefetch_count: 50,
backoff: 5_000,
connection_name: Amqpx.Gen.ConnectionManager,
requeue_on_reject: true
requeue_on_reject: true,
cancelled?: false
]

@type state() :: %__MODULE__{}
Expand Down Expand Up @@ -188,6 +189,8 @@ defmodule Amqpx.Gen.Consumer do
def terminate(_, %__MODULE__{channel: nil}), do: nil

def terminate(reason, %__MODULE__{channel: channel}) do
Logger.info("Terminating consumer with reason #{inspect(reason)}")

case reason do
:normal -> close_channel(channel)
:shutdown -> close_channel(channel)
Expand All @@ -211,17 +214,23 @@ defmodule Amqpx.Gen.Consumer do

defp handle_message(
message,
%{delivery_tag: tag, redelivered: redelivered} = meta,
%{delivery_tag: tag, redelivered: redelivered, consumer_tag: consumer_tag} = meta,
%__MODULE__{
handler_module: handler_module,
handler_state: handler_state,
backoff: backoff,
requeue_on_reject: requeue_on_reject
} = state
) do
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}
case handle_signals(state, consumer_tag) do
{:ok, state} ->
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Basic.ack(state.channel, tag)
%{state | handler_state: handler_state}

{:stop, state} ->
state
end
rescue
e in _ ->
Logger.error(Exception.format(:error, e, __STACKTRACE__))
Expand All @@ -242,4 +251,40 @@ defmodule Amqpx.Gen.Consumer do

state
end

@type signal_status :: :stopping | :draining | :running

@spec get_signal_status :: signal_status()
defp get_signal_status do
cond do
signal_handler().stopping?() -> :stopping
signal_handler().draining?() -> :draining
true -> :running
end
end

@spec handle_signals(signal_status(), state(), String.t()) :: {:ok | :stop, state()}
defp handle_signals(signal_status \\ get_signal_status(), state, consumer_tag)

# Close channel when we we need to stop.
defp handle_signals(:stopping, state, _) do
close_channel(state.channel)
{:stop, state}
end

# Continue processing prefetched messages while draining
defp handle_signals(:draining, %{cancelled?: true} = state, _), do: {:ok, state}

# Stop consuming new messages and move to cancelled state
# to continue processing prefetched messages
defp handle_signals(:draining, state, consumer_tag) do
Logger.info("Cancelling consumer #{consumer_tag}")
Basic.cancel(state.channel, consumer_tag)
{:ok, %{state | cancelled?: true}}
end

# No signals received run as normal
defp handle_signals(:running, state, _), do: {:ok, state}

defp signal_handler, do: Application.get_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler)
end
16 changes: 16 additions & 0 deletions lib/amqp/no_signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Amqpx.NoSignalHandler do
@moduledoc """
Dummy signal handler module that does not handle the graceful termination.
It always returns `false` for `draining?/0` and `stopping?/0`.
I.e. the comsumer will continue without handling signals.
"""

@behaviour Amqpx.SignalHandler

@impl true
def draining?, do: false

@impl true
def stopping?, do: false
end
20 changes: 20 additions & 0 deletions lib/amqp/signal_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Amqpx.SignalHandler do
@moduledoc """
Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application.
In the context of Rabbitmq, it will:
cancel the channel when we are in draining mode to stop prefetch new messages.
close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume.
Check in Peano how to use it.
"""
@doc """
Check if the application is in draining mode.
"""
@callback draining? :: boolean

@doc """
Check if the application is in stopping mode.
"""
@callback stopping? :: boolean
end
35 changes: 35 additions & 0 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,41 @@ defmodule Amqpx.Test.AmqpxTest do
end
end

test "should handle stop signal by not consuming any more messages" do
start_connection1!()
start_consumer_by_name!(Consumer1)
start_producer!(:producer)

payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: fn -> true end},
{Consumer1, [], []}
] do
Producer1.send_payload(payload)
:timer.sleep(50)
refute called(Consumer1.handle_message(Jason.encode!(payload), :_, :_))
end
end

test "should continue to handle messages while draining but not while stopping" do
start_connection1!()
start_consumer_by_name!(Consumer1)
start_producer!(:producer)

payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: [in_series([], [false, true])], draining?: fn -> true end},
{Consumer1, [], [handle_message: fn _, _, state -> {:ok, state} end]}
] do
Producer1.send_payload(payload)
Producer1.send_payload(payload)
:timer.sleep(50)
assert_called_exactly(Consumer1.handle_message(Jason.encode!(payload), :_, :_), 1)
end
end

test "the consumer should stop gracefully" do
start_connection1!()

Expand Down