Skip to content

Commit

Permalink
Graceful shutdown (#22)
Browse files Browse the repository at this point in the history
* Start ConnectionServer before ConsumerSupervisor
* Keep channels in ConnectionServer state, pass a reference to the consumers
* Close channels before closing connection when terminating ConnectionServer
* Add logs, Mermaid diagram, tests and documentation
  • Loading branch information
ignaciogoldchluk-yolo authored Aug 9, 2024
1 parent c8ccc00 commit 953760e
Show file tree
Hide file tree
Showing 17 changed files with 347 additions and 88 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ jobs:
ports:
- "5672:5672"


runs-on: ubuntu-latest
name: Test on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/coinga
1. Start the RabbitMQ instance via `docker compose up`.
2. Run `mix test`.

## Architecture
```mermaid
graph TD;
A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor];
A --> C[ConnectionServer - GenServer];
B -- supervises many --> D[ConsumerServer - GenServer];
D -- monitors --> E[ConsumerExecutor];
E -- sends messages to --> C;
D -- opens AMQP conns via --> C;
```

## License

The library is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).
5 changes: 4 additions & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
services:
rabbitmq:
image: "rabbitmq:alpine"
# "management" version is not required, but it makes
# manual testing easier
image: "rabbitmq:3.12-management-alpine"
ports:
- "5672:5672"
- "15672:15672"
17 changes: 16 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,22 @@ config :coney,
url: "amqp://guest:guest@localhost",
timeout: 1000
},
workers: []
workers: [
FakeConsumer
],
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
}

config :logger, level: :info

Expand Down
17 changes: 16 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import Config

config :coney,
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
},
adapter: Coney.RabbitConnection,
pool_size: 1,
auto_start: true,
settings: %{
url: "amqp://guest:guest@localhost:5672",
timeout: 1000
},
workers: []
workers: [
FakeConsumer
]
11 changes: 9 additions & 2 deletions lib/coney/application_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
defmodule Coney.ApplicationSupervisor do
@moduledoc """
Supervisor responsible of `ConnectionServer` and `ConsumerSupervisor`.
Main entry point of the application.
"""

use Supervisor

alias Coney.{ConsumerSupervisor, ConnectionServer}
Expand All @@ -14,12 +20,13 @@ defmodule Coney.ApplicationSupervisor do
}
end

@impl Supervisor
def init([consumers]) do
settings = settings()

children = [
ConsumerSupervisor,
{ConnectionServer, [consumers, settings]}
{ConnectionServer, [settings]},
{ConsumerSupervisor, [consumers]}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
128 changes: 98 additions & 30 deletions lib/coney/connection_server.ex
Original file line number Diff line number Diff line change
@@ -1,67 +1,119 @@
defmodule Coney.ConnectionServer do
@moduledoc """
Handles connections between `ConsumerServer` and the RabbitMQ instance(s).
This module abstracts away the connection status of RabbitMQ. Instead, when
a new `ConsumerServer` is started, it requests `ConnectionServer` to open a channel.
ConnectionServer opens a real amqp channel, keeps a reference to it in its state and
returns an erlang reference to `ConsumerServer`. When `ConsumerServer` replies (ack/reject)
an incoming RabbitMQ message it sends the erlang reference to ConnectionServer and then
ConnectionServer looks up the real channel.
ConnectionServer can handle RabbitMQ disconnects independently of ConsumerServer.
When connection is lost and then regained, ConnectionServer simply updates its
map of {erlang_ref, AMQP.Connection}, ConsumerServer keeps using the same erlang_ref.
"""
use GenServer

require Logger

alias Coney.{
ConsumerSupervisor,
HealthCheck.ConnectionRegistry
}
alias Coney.HealthCheck.ConnectionRegistry

defmodule State do
defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology]
defstruct [:adapter, :settings, :amqp_conn, :topology, :channels]
end

def start_link([consumers, [adapter: adapter, settings: settings, topology: topology]]) do
GenServer.start_link(__MODULE__, [consumers, adapter, settings, topology], name: __MODULE__)
def start_link([[adapter: adapter, settings: settings, topology: topology]]) do
GenServer.start_link(__MODULE__, [adapter, settings, topology], name: __MODULE__)
end

def init([consumers, adapter, settings, topology]) do
send(self(), :after_init)

@impl GenServer
def init([adapter, settings, topology]) do
ConnectionRegistry.associate(self())

{:ok, %State{consumers: consumers, adapter: adapter, settings: settings, topology: topology}}
{:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()},
{:continue, nil}}
end

@impl true
def handle_continue(_continue_arg, state) do
{:noreply, rabbitmq_connect(state)}
end

def confirm(channel, tag) do
GenServer.call(__MODULE__, {:confirm, channel, tag})
@spec confirm(reference(), any()) :: :confirmed
def confirm(channel_ref, tag) do
GenServer.call(__MODULE__, {:confirm, channel_ref, tag})
end

def reject(channel, tag, requeue) do
GenServer.call(__MODULE__, {:reject, channel, tag, requeue})
@spec reject(reference(), any(), boolean()) :: :rejected
def reject(channel_ref, tag, requeue) do
GenServer.call(__MODULE__, {:reject, channel_ref, tag, requeue})
end

@spec publish(String.t(), any()) :: :published
def publish(exchange_name, message) do
GenServer.call(__MODULE__, {:publish, exchange_name, message})
end

@spec publish(String.t(), String.t(), any()) :: :published
def publish(exchange_name, routing_key, message) do
GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message})
end

def handle_info(:after_init, state) do
rabbitmq_connect(state)
@spec subscribe(any()) :: reference()
def subscribe(consumer) do
GenServer.call(__MODULE__, {:subscribe, consumer})
end

@impl GenServer
def handle_info({:DOWN, _, :process, _pid, reason}, state) do
ConnectionRegistry.disconnected(self())
Logger.error("#{__MODULE__} (#{inspect(self())}) connection lost: #{inspect(reason)}")
rabbitmq_connect(state)
{:noreply, state |> rabbitmq_connect() |> update_channels()}
end

def terminate(_reason, %State{amqp_conn: conn, adapter: adapter} = _state) do
@impl GenServer
def terminate(_reason, %State{amqp_conn: conn, adapter: adapter, channels: channels} = _state) do
Logger.info("[Coney] - Terminating #{inspect(conn)}")
close_channels(channels, adapter)
:ok = adapter.close(conn)
ConnectionRegistry.terminated(self())
end

def handle_call({:confirm, channel, tag}, _from, %State{adapter: adapter} = state) do
@impl GenServer
def handle_call(
{:confirm, channel_ref, tag},
_from,
%State{adapter: adapter, channels: channels} = state
) do
channel = channel_from_ref(channels, channel_ref)
adapter.confirm(channel, tag)

{:reply, :confirmed, state}
end

def handle_call({:reject, channel, tag, requeue}, _from, %State{adapter: adapter} = state) do
def handle_call(
{:subscribe, consumer},
{consumer_pid, _tag},
%State{amqp_conn: conn, adapter: adapter, channels: channels} = state
) do
channel = adapter.create_channel(conn)
channel_ref = :erlang.make_ref()

adapter.subscribe(channel, consumer_pid, consumer)

new_channels = Map.put(channels, channel_ref, {consumer_pid, consumer, channel})

Logger.debug("#{inspect(consumer)} (#{inspect(consumer_pid)}) started")
{:reply, channel_ref, %State{state | channels: new_channels}}
end

def handle_call(
{:reject, channel_ref, tag, requeue},
_from,
%State{adapter: adapter, channels: channels} = state
) do
channel = channel_from_ref(channels, channel_ref)
adapter.reject(channel, tag, requeue: requeue)

{:reply, :rejected, state}
Expand All @@ -85,29 +137,45 @@ defmodule Coney.ConnectionServer do

defp rabbitmq_connect(
%State{
consumers: consumers,
adapter: adapter,
settings: settings,
topology: topology
} = state
) do
conn = adapter.open(settings)
Process.monitor(conn.pid)
adapter.init_topology(conn, topology)
start_consumers(consumers, adapter, conn)

ConnectionRegistry.connected(self())

{:noreply, %State{state | amqp_conn: conn}}
%State{state | amqp_conn: conn}
end

defp start_consumers(consumers, adapter, conn) do
Enum.each(consumers, fn consumer ->
subscribe_chan = adapter.create_channel(conn)
defp channel_from_ref(channels, channel_ref) do
{_consumer_pid, _consumer, channel} = Map.fetch!(channels, channel_ref)

channel
end

{:ok, pid} = ConsumerSupervisor.start_consumer(consumer, subscribe_chan)
adapter.subscribe(subscribe_chan, pid, consumer)
defp update_channels(%State{amqp_conn: conn, adapter: adapter, channels: channels} = state) do
new_channels =
Map.new(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} ->
new_channel = adapter.create_channel(conn)
adapter.subscribe(new_channel, consumer_pid, consumer)

Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started")
{channel_ref, {consumer_pid, consumer, new_channel}}
end)

Logger.info("[Coney] - Connection re-restablished for #{inspect(conn)}")

%State{state | channels: new_channels}
end

defp close_channels(channels, adapter) do
Enum.each(channels, fn {_channel_ref, {_consumer_pid, _consumer, channel}} ->
adapter.close_channel(channel)
end)

Logger.info("[Coney] - Closed #{map_size(channels)} channels")
end
end
4 changes: 4 additions & 0 deletions lib/coney/consumer_executor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule Coney.ConsumerExecutor do
@moduledoc """
Module responsible for processing a rabbit message and send the response
back to `ConnectionServer`. Started (and monitored) by `ConsumerServer`.
"""
require Logger

alias Coney.{ConnectionServer, ExecutionTask}
Expand Down
17 changes: 14 additions & 3 deletions lib/coney/consumer_server.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
defmodule Coney.ConsumerServer do
@moduledoc """
GenServer for handling RabbitMQ messages. Spawns and monitors one task per message
and forwards the response to `ConnectionServer`.
"""

use GenServer

alias Coney.{ConnectionServer, ConsumerExecutor, ExecutionTask}

require Logger

def start_link([consumer, chan]) do
GenServer.start_link(__MODULE__, [consumer, chan])
def start_link([consumer]) do
GenServer.start_link(__MODULE__, [consumer])
end

def init([consumer, chan]) do
@impl GenServer
def init([consumer]) do
chan = ConnectionServer.subscribe(consumer)

Logger.info("[Coney] - Started consumer #{inspect(consumer)}")

{:ok, %{consumer: consumer, chan: chan, tasks: %{}}}
end

@impl GenServer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do
{:noreply, state}
end
Expand Down
21 changes: 12 additions & 9 deletions lib/coney/consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
defmodule Coney.ConsumerSupervisor do
use DynamicSupervisor
@moduledoc """
Supervisor for all ConsumerServer of the application.
"""
use Supervisor

def start_link(_args) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
alias Coney.ConsumerServer

def init([]) do
DynamicSupervisor.init(strategy: :one_for_one)
def start_link([consumers]) do
Supervisor.start_link(__MODULE__, [consumers], name: __MODULE__)
end

def start_consumer(consumer, chan) do
spec = {Coney.ConsumerServer, [consumer, chan]}
DynamicSupervisor.start_child(__MODULE__, spec)
@impl Supervisor
def init([consumers]) do
children = Enum.map(consumers, fn consumer -> {ConsumerServer, [consumer]} end)

Supervisor.init(children, strategy: :one_for_one)
end
end
Loading

0 comments on commit 953760e

Please sign in to comment.