From a7710cab7ed67967e72f346d5688630b0a96d08d Mon Sep 17 00:00:00 2001 From: yolo-vikram Date: Fri, 3 Jan 2025 14:22:57 +0200 Subject: [PATCH] chore: Publish message asynchronously --- lib/coney/coney.ex | 10 ++++++++++ lib/coney/connection_server.ex | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/lib/coney/coney.ex b/lib/coney/coney.ex index 2c632ff..c9e5e85 100644 --- a/lib/coney/coney.ex +++ b/lib/coney/coney.ex @@ -11,6 +11,16 @@ defmodule Coney do ConnectionServer.publish(exchange_name, routing_key, message) end + @spec publish_async(String.t(), binary()) :: :ok + def publish_async(exchange_name, message) do + ConnectionServer.publish_async(exchange_name, message) + end + + @spec publish_async(String.t(), String.t(), binary()) :: :ok + def publish_async(exchange_name, routing_key, message) do + ConnectionServer.publish_async(exchange_name, routing_key, message) + end + @spec status() :: list({pid(), :pending | :connected | :disconnected}) def status do ConnectionRegistry.status() diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 32188c6..e5bb4ca 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -60,6 +60,16 @@ defmodule Coney.ConnectionServer do GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message}) end + @spec publish_async(String.t(), any()) :: :ok + def publish_async(exchange_name, message) do + GenServer.cast(__MODULE__, {:publish, exchange_name, message}) + end + + @spec publish_async(String.t(), String.t(), any()) :: :ok + def publish_async(exchange_name, routing_key, message) do + GenServer.cast(__MODULE__, {:publish, exchange_name, routing_key, message}) + end + @spec subscribe(any()) :: reference() def subscribe(consumer) do GenServer.call(__MODULE__, {:subscribe, consumer}) @@ -135,6 +145,19 @@ defmodule Coney.ConnectionServer do {:reply, :published, state} end + @impl GenServer + def handle_cast({:publish, exchange_name, message}, %State{} = state) do + state.adapter.publish(state.amqp_conn, exchange_name, "", message) + + {:noreply, state} + end + + def handle_cast({:publish, exchange_name, routing_key, message}, %State{} = state) do + state.adapter.publish(state.amqp_conn, exchange_name, routing_key, message) + + {:noreply, state} + end + defp rabbitmq_connect( %State{ adapter: adapter,