Consumer server for RabbitMQ with message publishing functionality.
- Coney
Add Coney as a dependency in your mix.exs
file.
def deps do
[{:coney, "~> 3.0"}]
end
After you are done, run mix deps.get
in your shell to fetch and compile Coney.
Default config:
# config/config.exs
config :coney,
auto_start: true,
settings: %{
url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
timeout: 1000
}
If you need to create exchanges or queues before starting the consumer, you can define your RabbitMQ topology as follows:
config :coney,
topology: %{
exchanges: [{:topic, "my_exchange", durable: true}],
queues: %{
"my_queue" => %{
options: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, "dlx_exchange"},
{"x-message-ttl", :signedint, 60000}
]
],
bindings: [
[exchange: "my_exchange", options: [routing_key: "my_queue"]]
]
}
}
}
Also, you can create a confuguration module (if you want to retreive settings from Consul or something else):
# config/config.exs
config :coney,
auto_start: true,
settings: RabbitConfig,
topology: RabbitConfig
defmodule RabbitConfig do
def settings do
%{
url: "amqp://guest:guest@localhost",
timeout: 1000
}
end
def topology do
%{
exchanges: [{:topic, "my_exchange", durable: true}],
queues: %{
"my_queue" => %{
options: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, "exchange"},
{"x-message-ttl", :signedint, 60000}
]
],
bindings: [
[exchange: "my_exchange", options: [routing_key: "my_queue"]]
]
}
}
}
end
end
If you don't want to automatically start Coney and want to control it's start, you can set auto_start
to false
and add Coney supervisor into yours:
# config/config.exs
config :coney, auto_start: false
defmodule YourApplication do
use Application
def start(_type, _args) do
Supervisor.start_link([Coney.ApplicationSupervisor], [strategy: :one_for_one])
end
end
If you want to disable Coney altogether (useful for testing config) set enabled: false
# config/config.exs
config :coney, enabled: false, settings: %{}, topology: %{}
# config/queues.exs
config :coney,
workers: [
MyApplication.MyConsumer
]
# also you can define mapping like this and skip it in consumer module:
workers: [
%{
connection: %{
prefetch_count: 10,
queue: "my_queue"
},
worker: MyApplication.MyConsumer
}
]
# web/consumers/my_consumer.ex
defmodule MyApplication.MyConsumer do
@behaviour Coney.Consumer
def connection do
%{
prefetch_count: 10,
queue: "my_queue",
consumer_tag: "MyApp - MyConsumer" # optional
}
end
def parse(payload, _meta) do
String.to_integer(payload)
end
def process(number, _meta) do
if number <= 10 do
:ok
else
:reject
end
end
# Be careful here, if call of `error_happened` will raise an exception,
# message will be not handled properly and may be left unacked in a queue
def error_happened(exception, payload, _meta) do
IO.puts "Exception raised with #{ payload }"
:redeliver
end
end
If exception was happened during calls of parse
or process
functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3
or error_happened/4
callbacks. But be careful, if call of error_happened
will raise an exception, message will be not handled properly and may be left unacked in a queue.
This callback receives exception
, original payload
and meta
as parameters. Response format is the same as in process callback.
This callback receives exception
, stacktrace
, original payload
and meta
as parameters. Response format is the same as in process callback.
:ok
- ack message.:reject
- reject message.:redeliver
- return message to the queue.{:reply, binary}
- response will be published to reply exchange.
To use {:reply, binary}
you should add response exchange in connection
:
# web/consumers/my_consumer.ex
def connection do
%{
# ...
respond_to: "response_exchange"
}
end
Response will be published to "response_exchange"
exchange.
To use the default exchange you may declare exchange as :default
:
%{
exchanges: [:default],
}
The following format is also acceptable:
%{
exchanges: [{:direct, ""}]
}
Or you can just skip it in the exchanges
settings and setup the queue in the consumer's settings:
%{
prefetch_count: 10,
queue: "my_queue"
}
Coney.publish("exchange", "message")
# or
Coney.publish("exchange", "routing_key", "message")
Coney.publish_async("exchange", "message")
# or
Coney.publish_async("exchange", "routing_key", "message")
You can useConey.status/0
if you need to get information about RabbitMQ connections:
iex> Coney.status()
[{#PID<0.972.0>, :connected}]
Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.
Connection status can be:
:pending
- when coney just started:connected
- when RabbitMQ connection has been established and all consumers have been started:disconnected
- when coney lost connection to RabbitMQ
Bug reports and pull requests are welcome on GitHub at https://github.com/coingaming/coney.
- Start the RabbitMQ instance via
docker compose up
. - Run
mix test
.
graph TD;
A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor];
A --> C[ConnectionServer - GenServer];
B -- supervises many --> D[ConsumerServer - GenServer];
D -- monitors --> E[ConsumerExecutor];
E -- sends messages to --> C;
D -- opens AMQP conns via --> C;
The library is available as open source under the terms of the MIT License.