diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c01a49c..1bbbefe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,7 +21,6 @@ jobs: ports: - "5672:5672" - runs-on: ubuntu-latest name: Test on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}} strategy: diff --git a/README.md b/README.md index df0d5aa..1261144 100644 --- a/README.md +++ b/README.md @@ -288,6 +288,17 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/coinga 1. Start the RabbitMQ instance via `docker compose up`. 2. Run `mix test`. +## Architecture +```mermaid + graph TD; + A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor]; + A --> C[ConnectionServer - GenServer]; + B -- supervises many --> D[ConsumerServer - GenServer]; + D -- monitors --> E[ConsumerExecutor]; + E -- sends messages to --> C; + D -- opens AMQP conns via --> C; +``` + ## License The library is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/compose.yaml b/compose.yaml index ce57695..121fd5a 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,8 @@ services: rabbitmq: - image: "rabbitmq:alpine" + # "management" version is not required, but it makes + # manual testing easier + image: "rabbitmq:3.12-management-alpine" ports: - "5672:5672" + - "15672:15672" diff --git a/config/config.exs b/config/config.exs index e5ba2b1..83f7fa2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,7 +8,22 @@ config :coney, url: "amqp://guest:guest@localhost", timeout: 1000 }, - workers: [] + workers: [ + FakeConsumer + ], + topology: %{ + exchanges: [{:topic, "exchange", durable: false}], + queues: %{ + "queue" => %{ + options: [ + durable: false + ], + bindings: [ + [exchange: "exchange", options: [routing_key: "queue"]] + ] + } + } + } config :logger, level: :info diff --git a/config/test.exs b/config/test.exs index 761ae22..7dacb07 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,6 +1,19 @@ import Config config :coney, + topology: %{ + exchanges: [{:topic, "exchange", durable: false}], + queues: %{ + "queue" => %{ + options: [ + durable: false + ], + bindings: [ + [exchange: "exchange", options: [routing_key: "queue"]] + ] + } + } + }, adapter: Coney.RabbitConnection, pool_size: 1, auto_start: true, @@ -8,4 +21,6 @@ config :coney, url: "amqp://guest:guest@localhost:5672", timeout: 1000 }, - workers: [] + workers: [ + FakeConsumer + ] diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index b40000b..88894d3 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -1,4 +1,10 @@ defmodule Coney.ApplicationSupervisor do + @moduledoc """ + Supervisor responsible of `ConnectionServer` and `ConsumerSupervisor`. + + Main entry point of the application. + """ + use Supervisor alias Coney.{ConsumerSupervisor, ConnectionServer} @@ -14,12 +20,13 @@ defmodule Coney.ApplicationSupervisor do } end + @impl Supervisor def init([consumers]) do settings = settings() children = [ - ConsumerSupervisor, - {ConnectionServer, [consumers, settings]} + {ConnectionServer, [settings]}, + {ConsumerSupervisor, [consumers]} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 0a2d6dc..32188c6 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -1,67 +1,119 @@ defmodule Coney.ConnectionServer do + @moduledoc """ + Handles connections between `ConsumerServer` and the RabbitMQ instance(s). + + This module abstracts away the connection status of RabbitMQ. Instead, when + a new `ConsumerServer` is started, it requests `ConnectionServer` to open a channel. + ConnectionServer opens a real amqp channel, keeps a reference to it in its state and + returns an erlang reference to `ConsumerServer`. When `ConsumerServer` replies (ack/reject) + an incoming RabbitMQ message it sends the erlang reference to ConnectionServer and then + ConnectionServer looks up the real channel. + + ConnectionServer can handle RabbitMQ disconnects independently of ConsumerServer. + When connection is lost and then regained, ConnectionServer simply updates its + map of {erlang_ref, AMQP.Connection}, ConsumerServer keeps using the same erlang_ref. + """ use GenServer require Logger - alias Coney.{ - ConsumerSupervisor, - HealthCheck.ConnectionRegistry - } + alias Coney.HealthCheck.ConnectionRegistry defmodule State do - defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology] + defstruct [:adapter, :settings, :amqp_conn, :topology, :channels] end - def start_link([consumers, [adapter: adapter, settings: settings, topology: topology]]) do - GenServer.start_link(__MODULE__, [consumers, adapter, settings, topology], name: __MODULE__) + def start_link([[adapter: adapter, settings: settings, topology: topology]]) do + GenServer.start_link(__MODULE__, [adapter, settings, topology], name: __MODULE__) end - def init([consumers, adapter, settings, topology]) do - send(self(), :after_init) - + @impl GenServer + def init([adapter, settings, topology]) do ConnectionRegistry.associate(self()) - {:ok, %State{consumers: consumers, adapter: adapter, settings: settings, topology: topology}} + {:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()}, + {:continue, nil}} + end + + @impl true + def handle_continue(_continue_arg, state) do + {:noreply, rabbitmq_connect(state)} end - def confirm(channel, tag) do - GenServer.call(__MODULE__, {:confirm, channel, tag}) + @spec confirm(reference(), any()) :: :confirmed + def confirm(channel_ref, tag) do + GenServer.call(__MODULE__, {:confirm, channel_ref, tag}) end - def reject(channel, tag, requeue) do - GenServer.call(__MODULE__, {:reject, channel, tag, requeue}) + @spec reject(reference(), any(), boolean()) :: :rejected + def reject(channel_ref, tag, requeue) do + GenServer.call(__MODULE__, {:reject, channel_ref, tag, requeue}) end + @spec publish(String.t(), any()) :: :published def publish(exchange_name, message) do GenServer.call(__MODULE__, {:publish, exchange_name, message}) end + @spec publish(String.t(), String.t(), any()) :: :published def publish(exchange_name, routing_key, message) do GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message}) end - def handle_info(:after_init, state) do - rabbitmq_connect(state) + @spec subscribe(any()) :: reference() + def subscribe(consumer) do + GenServer.call(__MODULE__, {:subscribe, consumer}) end + @impl GenServer def handle_info({:DOWN, _, :process, _pid, reason}, state) do ConnectionRegistry.disconnected(self()) Logger.error("#{__MODULE__} (#{inspect(self())}) connection lost: #{inspect(reason)}") - rabbitmq_connect(state) + {:noreply, state |> rabbitmq_connect() |> update_channels()} end - def terminate(_reason, %State{amqp_conn: conn, adapter: adapter} = _state) do + @impl GenServer + def terminate(_reason, %State{amqp_conn: conn, adapter: adapter, channels: channels} = _state) do + Logger.info("[Coney] - Terminating #{inspect(conn)}") + close_channels(channels, adapter) :ok = adapter.close(conn) ConnectionRegistry.terminated(self()) end - def handle_call({:confirm, channel, tag}, _from, %State{adapter: adapter} = state) do + @impl GenServer + def handle_call( + {:confirm, channel_ref, tag}, + _from, + %State{adapter: adapter, channels: channels} = state + ) do + channel = channel_from_ref(channels, channel_ref) adapter.confirm(channel, tag) {:reply, :confirmed, state} end - def handle_call({:reject, channel, tag, requeue}, _from, %State{adapter: adapter} = state) do + def handle_call( + {:subscribe, consumer}, + {consumer_pid, _tag}, + %State{amqp_conn: conn, adapter: adapter, channels: channels} = state + ) do + channel = adapter.create_channel(conn) + channel_ref = :erlang.make_ref() + + adapter.subscribe(channel, consumer_pid, consumer) + + new_channels = Map.put(channels, channel_ref, {consumer_pid, consumer, channel}) + + Logger.debug("#{inspect(consumer)} (#{inspect(consumer_pid)}) started") + {:reply, channel_ref, %State{state | channels: new_channels}} + end + + def handle_call( + {:reject, channel_ref, tag, requeue}, + _from, + %State{adapter: adapter, channels: channels} = state + ) do + channel = channel_from_ref(channels, channel_ref) adapter.reject(channel, tag, requeue: requeue) {:reply, :rejected, state} @@ -85,29 +137,45 @@ defmodule Coney.ConnectionServer do defp rabbitmq_connect( %State{ - consumers: consumers, adapter: adapter, settings: settings, topology: topology } = state ) do conn = adapter.open(settings) + Process.monitor(conn.pid) adapter.init_topology(conn, topology) - start_consumers(consumers, adapter, conn) ConnectionRegistry.connected(self()) - {:noreply, %State{state | amqp_conn: conn}} + %State{state | amqp_conn: conn} end - defp start_consumers(consumers, adapter, conn) do - Enum.each(consumers, fn consumer -> - subscribe_chan = adapter.create_channel(conn) + defp channel_from_ref(channels, channel_ref) do + {_consumer_pid, _consumer, channel} = Map.fetch!(channels, channel_ref) + + channel + end - {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, subscribe_chan) - adapter.subscribe(subscribe_chan, pid, consumer) + defp update_channels(%State{amqp_conn: conn, adapter: adapter, channels: channels} = state) do + new_channels = + Map.new(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} -> + new_channel = adapter.create_channel(conn) + adapter.subscribe(new_channel, consumer_pid, consumer) - Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started") + {channel_ref, {consumer_pid, consumer, new_channel}} + end) + + Logger.info("[Coney] - Connection re-restablished for #{inspect(conn)}") + + %State{state | channels: new_channels} + end + + defp close_channels(channels, adapter) do + Enum.each(channels, fn {_channel_ref, {_consumer_pid, _consumer, channel}} -> + adapter.close_channel(channel) end) + + Logger.info("[Coney] - Closed #{map_size(channels)} channels") end end diff --git a/lib/coney/consumer_executor.ex b/lib/coney/consumer_executor.ex index 2daba08..841cd16 100644 --- a/lib/coney/consumer_executor.ex +++ b/lib/coney/consumer_executor.ex @@ -1,4 +1,8 @@ defmodule Coney.ConsumerExecutor do + @moduledoc """ + Module responsible for processing a rabbit message and send the response + back to `ConnectionServer`. Started (and monitored) by `ConsumerServer`. + """ require Logger alias Coney.{ConnectionServer, ExecutionTask} diff --git a/lib/coney/consumer_server.ex b/lib/coney/consumer_server.ex index f64dd9b..4654731 100644 --- a/lib/coney/consumer_server.ex +++ b/lib/coney/consumer_server.ex @@ -1,18 +1,29 @@ defmodule Coney.ConsumerServer do + @moduledoc """ + GenServer for handling RabbitMQ messages. Spawns and monitors one task per message + and forwards the response to `ConnectionServer`. + """ + use GenServer alias Coney.{ConnectionServer, ConsumerExecutor, ExecutionTask} require Logger - def start_link([consumer, chan]) do - GenServer.start_link(__MODULE__, [consumer, chan]) + def start_link([consumer]) do + GenServer.start_link(__MODULE__, [consumer]) end - def init([consumer, chan]) do + @impl GenServer + def init([consumer]) do + chan = ConnectionServer.subscribe(consumer) + + Logger.info("[Coney] - Started consumer #{inspect(consumer)}") + {:ok, %{consumer: consumer, chan: chan, tasks: %{}}} end + @impl GenServer def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do {:noreply, state} end diff --git a/lib/coney/consumer_supervisor.ex b/lib/coney/consumer_supervisor.ex index a3ebeb7..5d99946 100644 --- a/lib/coney/consumer_supervisor.ex +++ b/lib/coney/consumer_supervisor.ex @@ -1,16 +1,19 @@ defmodule Coney.ConsumerSupervisor do - use DynamicSupervisor + @moduledoc """ + Supervisor for all ConsumerServer of the application. + """ + use Supervisor - def start_link(_args) do - DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) - end + alias Coney.ConsumerServer - def init([]) do - DynamicSupervisor.init(strategy: :one_for_one) + def start_link([consumers]) do + Supervisor.start_link(__MODULE__, [consumers], name: __MODULE__) end - def start_consumer(consumer, chan) do - spec = {Coney.ConsumerServer, [consumer, chan]} - DynamicSupervisor.start_child(__MODULE__, spec) + @impl Supervisor + def init([consumers]) do + children = Enum.map(consumers, fn consumer -> {ConsumerServer, [consumer]} end) + + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/coney/fake_connection.ex b/lib/coney/fake_connection.ex deleted file mode 100644 index 10f5d22..0000000 --- a/lib/coney/fake_connection.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Coney.FakeConnection do - def open(_) do - :conn - end - - def create_channel(_) do - :chan - end - - def subscribe(_, _, _) do - {:ok, :subscribed} - end - - def respond_to(_, _) do - nil - end - - def publish(_, _, _, _) do - :published - end - - def confirm(_, _) do - :confirmed - end - - def reject(_, _, _) do - :rejected - end - - def init_topology(_, _) do - :ok - end -end diff --git a/lib/coney/rabbit_connection.ex b/lib/coney/rabbit_connection.ex index 24e7496..6090e26 100644 --- a/lib/coney/rabbit_connection.ex +++ b/lib/coney/rabbit_connection.ex @@ -7,8 +7,6 @@ defmodule Coney.RabbitConnection do case connect(url) do {:ok, conn} -> Logger.debug("#{__MODULE__} (#{inspect(self())}) connected to #{url}") - - Process.monitor(conn.pid) conn {:error, error} -> diff --git a/mix.exs b/mix.exs index 7256122..1e5c331 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,7 @@ defmodule Coney.Mixfile do """ end - defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(env) when env in [:test, :dev], do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] @@ -47,7 +47,7 @@ defmodule Coney.Mixfile do files: ["lib", "mix.exs", "README.md", "LICENSE.txt"], maintainers: ["Aleksandr Fomin"], licenses: ["MIT"], - links: %{"GitHub" => "https://github.com/llxff/coney"} + links: %{"GitHub" => "https://github.com/coingaming/coney"} ] end end diff --git a/test/lib/coney/coney_test.exs b/test/lib/coney/coney_test.exs new file mode 100644 index 0000000..727078e --- /dev/null +++ b/test/lib/coney/coney_test.exs @@ -0,0 +1,16 @@ +defmodule Coney.ConeyTest do + use ExUnit.Case + use AMQP + + describe "publish/2" do + test "consumes a message" do + {:ok, connection} = AMQP.Connection.open(Application.get_env(:coney, :settings)[:url]) + + {:ok, channel} = AMQP.Channel.open(connection) + + assert :ok == AMQP.Basic.publish(channel, "exchange", "queue", "message", mandatory: true) + + refute 0 == AMQP.Queue.consumer_count(channel, "queue") + end + end +end diff --git a/test/lib/coney/connection_server_test.exs b/test/lib/coney/connection_server_test.exs new file mode 100644 index 0000000..d6e5b91 --- /dev/null +++ b/test/lib/coney/connection_server_test.exs @@ -0,0 +1,106 @@ +defmodule Coney.ConnectionServerTest do + use ExUnit.Case + + alias Coney.ConnectionServer + alias Coney.RabbitConnection + + setup do + settings = %{url: "amqp://guest:guest@localhost:5672", timeout: 1_000} + topology = Map.new() + %{init_args: %{adapter: RabbitConnection, settings: settings, topology: topology}} + end + + describe "init/1" do + test "starts with default settings", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) + + assert state.channels |> Map.equal?(Map.new()) + assert state.adapter == adapter + assert state.settings == settings + assert state.topology == topology + end + + test "registers itself in the connection registry", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + + assert {:ok, _state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) + + status = Coney.HealthCheck.ConnectionRegistry.status() |> Map.new() + + assert Map.get(status, self(), :connected) + end + end + + describe "handle_continue/2" do + test "sets the connection in the state", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) + + assert is_nil(state.amqp_conn) + + assert {:noreply, new_state} = ConnectionServer.handle_continue(nil, state) + + refute is_nil(new_state.amqp_conn) + end + end + + describe "handle_info/2" do + test "reconnects channels when receives a connection lost message", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + # Init + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) + + # Open connection + assert {:noreply, state} = ConnectionServer.handle_continue(nil, state) + + # Subscribe a channel + assert {:reply, channel_ref, connected_state} = + ConnectionServer.handle_call( + {:subscribe, FakeConsumer}, + {self(), :erlang.make_ref()}, + state + ) + + channel_info = Map.get(connected_state.channels, channel_ref) + + # Connection lost + down_msg = {:DOWN, :erlang.make_ref(), :process, self(), :connection_lost} + + assert {:noreply, reconnect_state} = ConnectionServer.handle_info(down_msg, connected_state) + + new_channel_info = Map.get(reconnect_state.channels, channel_ref) + + {_pid, consumer, old_channel} = channel_info + {_other_pid, ^consumer, new_channel} = new_channel_info + + refute old_channel == new_channel + end + end + + describe "handle_call/3" do + test "subscribes a consumer and returns a channel reference", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + # Init + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) + + # Open connection + assert {:noreply, state} = ConnectionServer.handle_continue(nil, state) + + # Subscribe a channel + assert {:reply, channel_ref, new_state} = + ConnectionServer.handle_call( + {:subscribe, FakeConsumer}, + {self(), :erlang.make_ref()}, + state + ) + + assert is_reference(channel_ref) + + pid = self() + + assert {^pid, FakeConsumer, _} = Map.get(new_state.channels, channel_ref) + end + end +end diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index 849e456..f7328e7 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -4,14 +4,19 @@ defmodule ConsumerServerTest do alias Coney.ConsumerServer setup do + ref = Coney.ConnectionServer.subscribe(FakeConsumer) + [ - args: [FakeConsumer, :channel], - state: %{consumer: FakeConsumer, chan: :channel, tasks: %{}} + args: [FakeConsumer], + state: %{consumer: FakeConsumer, tasks: %{}, chan: ref} ] end test "initial state", %{args: args, state: state} do - assert {:ok, ^state} = ConsumerServer.init(args) + assert {:ok, initial_state} = ConsumerServer.init(args) + assert initial_state.consumer == state.consumer + assert initial_state.tasks |> Map.equal?(Map.new()) + assert initial_state.chan |> is_reference() end test ":basic_consume_ok", %{state: state} do @@ -38,4 +43,34 @@ defmodule ConsumerServerTest do assert updated_state.consumer == state.consumer assert updated_state.chan == state.chan end + + describe "handle_info/2" do + setup do + %{state: %{consumer: FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}} + end + + test "demonitors a task once it completes successfully", %{state: state} do + task_ref = :erlang.make_ref() + state = put_in(state, [:tasks, task_ref], 1) + + refute state[:tasks] |> Map.equal?(Map.new()) + + down_msg = {:DOWN, task_ref, :dont_care, :dont_care, :normal} + + assert {:noreply, new_state} = ConsumerServer.handle_info(down_msg, state) + assert new_state[:tasks] |> Map.equal?(Map.new()) + end + + test "demonitors a task and rejects message if it terminates abruptly", %{state: state} do + task_ref = :erlang.make_ref() + + state = put_in(state, [:tasks, task_ref], 1) + + down_msg = {:DOWN, task_ref, :dont_care, :dont_care, :error} + + assert {:noreply, new_state} = ConsumerServer.handle_info(down_msg, state) + + assert new_state[:tasks] |> Map.equal?(Map.new()) + end + end end diff --git a/test/support/fake_consumer.ex b/test/support/fake_consumer.ex index c910692..6e477d4 100644 --- a/test/support/fake_consumer.ex +++ b/test/support/fake_consumer.ex @@ -18,6 +18,7 @@ defmodule FakeConsumer do :reject -> :reject :reply -> {:reply, :data} :exception -> raise "Exception happen" + _other -> :ok end end