Through module registration and hooks (Hooks) mechanism user can develop plugin to customize authentication and service functions for EMQ X.
The official plug-ins provided by EMQ X include:
Plugin | Configuration file | Description |
---|---|---|
emqx_dashboard | etc/plugins/emqx_dashbord.conf | Web dashboard Plugin (Default) |
emqx_management | etc/plugins/emqx_management.conf | HTTP API and CLI Management Plugin |
emqx_psk_file | etc/plugins/emqx_psk_file.conf | PSK support |
emqx_web_hook | etc/plugins/emqx_web_hook.conf | Web Hook Plugin |
emqx_lua_hook | etc/plugins/emqx_lua_hook.conf | Lua Hook Plugin |
emqx_retainer | etc/plugins/emqx_retainer.conf | Retain Message storage module |
emqx_rule_engine | etc/plugins/emqx_rule_engine.conf | Rule engine |
emqx_bridge_mqtt | etc/plugins/emqx_bridge_mqtt.conf | MQTT Message Bridge Plugin |
emqx_delayed_publish | etc/plugins/emqx_delayed_publish.conf | Delayed publish support |
emqx_coap | etc/plugins/emqx_coap.conf | CoAP protocol support |
emqx_lwm2m | etc/plugins/emqx_lwm2m.conf | LwM2M protocol support |
emqx_sn | etc/plugins/emqx_sn.conf | MQTT-SN protocol support |
emqx_stomp | etc/plugins/emqx_stomp.conf | Stomp protocol support |
emqx_recon | etc/plugins/emqx_recon.conf | Recon performance debugging |
emqx_reloader | etc/plugins/emqx_reloader.conf | Hot load plugin |
emqx_plugin_template | etc/plugins/emqx_plugin_template.conf | plugin develop template |
There are four ways to load plugins:
- Default loading
- Start and stop plugin on command line
- Start and stop plugin on Dashboard
- Start and stop plugin by calling management API
Default loading
If a plugin needs to start with the broker, add this plugin in data/loaded_plugins
. For example, the plugins that are loaded by default are:
emqx_management.
emqx_rule_engine.
emqx_recon.
emqx_retainer.
emqx_dashboard.
Start and stop plugin on command line
When the EMQ X is running, plugin list can be displayed and plugins can be loaded/unloaded using CLI command:
## Display a list of all available plugins
./bin/emqx_ctl plugins list
## Load a plugin
./bin/emqx_ctl plugins load emqx_auth_username
## Unload a plugin
./bin/emqx_ctl plugins unload emqx_auth_username
## Reload a plugin
./bin/emqx_ctl plugins reload emqx_auth_username
Start and stop plugin on Dashboard
If Dashboard plugin is started (by default), the plugins can be also managed on the dashboard. the managing page can be found under http://localhost:18083/plugins
.
emqx_dashboard is the web management console for the EMQ X broker, which is enabled by default. When EMQ X starts successfully, user can access it by visiting http://localhost:18083
with the default username/password: admin/public.
The basic information, statistics, and load status of the EMQ X broker, as well as the current client list (Connections), Sessions, Routing Table (Topics), and Subscriptions can be queried through dashboard.
In addition, dashboard provides a set of REST APIs for front-end calls. See Dashboard -> HTTP API
for details.
etc/plugins/emqx_dashboard.conf:
## Dashboard default username/password
dashboard.default_user.login = admin
dashboard.default_user.password = public
## Dashboard HTTP service Port Configuration
dashboard.listener.http = 18083
dashboard.listener.http.acceptors = 2
dashboard.listener.http.max_clients = 512
## Dashboard HTTPS service Port Configuration
## dashboard.listener.https = 18084
## dashboard.listener.https.acceptors = 2
## dashboard.listener.https.max_clients = 512
## dashboard.listener.https.handshake_timeout = 15s
## dashboard.listener.https.certfile = etc/certs/cert.pem
## dashboard.listener.https.keyfile = etc/certs/key.pem
## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
## dashboard.listener.https.verify = verify_peer
## dashboard.listener.https.fail_if_no_peer_cert = true
emqx_management is the HTTP API and CLI management plugin of the EMQ X broker,which is enabled by default. When EMQ X starts successfully, users can query the current client list and other operations via the HTTP API and CLI provided by this plugin. For details see :ref:`rest_api` and :ref:`commands`.
etc/plugins/emqx_management.conf:
## Max Row Limit
management.max_row_limit = 10000
## Default Application Secret
# management.application.default_secret = public
## Management HTTP Service Port Configuration
management.listener.http = 8080
management.listener.http.acceptors = 2
management.listener.http.max_clients = 512
management.listener.http.backlog = 512
management.listener.http.send_timeout = 15s
management.listener.http.send_timeout_close = on
## Management HTTPS Service Port Configuration
## management.listener.https = 8081
## management.listener.https.acceptors = 2
## management.listener.https.max_clients = 512
## management.listener.https.backlog = 512
## management.listener.https.send_timeout = 15s
## management.listener.https.send_timeout_close = on
## management.listener.https.certfile = etc/certs/cert.pem
## management.listener.https.keyfile = etc/certs/key.pem
## management.listener.https.cacertfile = etc/certs/cacert.pem
## management.listener.https.verify = verify_peer
## management.listener.https.fail_if_no_peer_cert = true
emqx_psk_file mainly provides PSK support that aimes to implement connection authentication through PSK when the client establishes a TLS/DTLS connection.
etc/plugins/emqx_psk_file.conf:
psk.file.path = etc/psk.txt
emqx_web_hook can send all EMQ X events and messages to the specified HTTP server.
etc/plugins/emqx_web_hook.conf:
## Callback Web Server Address
web.hook.api.url = http://127.0.0.1:8080
## Encode message payload field
## Value: undefined | base64 | base62
## Default: undefined (Do not encode)
## web.hook.encode_payload = base64
## Message and event configuration
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
web.hook.rule.session.created.1 = {"action": "on_session_created"}
web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
web.hook.rule.message.deliver.1 = {"action": "on_message_deliver"}
web.hook.rule.message.acked.1 = {"action": "on_message_acked"}
emqx_lua_hook sends all events and messages to the specified Lua function. See its README for specific use.
emqx_retainer is set to start by default and provides Retained type message support for EMQ X. It stores the Retained messages for all topics in the cluster's database and posts the message when the client subscribes to the topic
etc/plugins/emqx_retainer.conf:
## retained Message storage method
## - ram: memory only
## - disc: memory and disk
## - disc_only: disk only
retainer.storage_type = ram
## Maximum number of storage (0 means unrestricted)
retainer.max_retained_messages = 0
## Maximum storage size for single message
retainer.max_payload_size = 1MB
## Expiration time, 0 means never expired
## Unit: h hour; m minute; s second.For example, 60m means 60 minutes.
retainer.expiry_interval = 0
The concept of Bridge is that EMQ X forwards messages of some of its topics to another MQTT Broker in some way.
Difference between Bridge and cluster is that bridge does not replicate topic trees and routing tables, a bridge only forwards MQTT messages based on Bridge rules.
Currently the Bridge methods supported by EMQ X are as follows:
- RPC bridge: RPC Bridge only supports message forwarding and does not support subscribing to the topic of remote nodes to synchronize data.
- MQTT Bridge: MQTT Bridge supports both forwarding and data synchronization through subscription topic
In EMQ X, bridge is configured by modifying etc/plugins/emqx_bridge_mqtt.conf
. EMQ X distinguishes between different bridges based on different names. E.g:
## Bridge address: node name for local bridge, host:port for remote. bridge.mqtt.aws.address = 127.0.0.1:1883
This configuration declares a bridge named aws
and specifies that it is bridged to the MQTT server of 127.0.0.1:1883
by MQTT mode.
In case of creating multiple bridges, it is convenient to replicate all configuration items of the first bridge, and modify the bridge name and other configuration items if necessary (such as bridge.mqtt.$name.address, where $name refers to the name of bridge)
etc/plugins/emqx_bridge_mqtt.conf
## Bridge Address: Use node name (nodename@host) for rpc Bridge, and host:port for mqtt connection
bridge.mqtt.aws.address = [email protected]
## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
## bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
## Bridge Address: Use node name for rpc Bridge, use host:port for mqtt connection
bridge.mqtt.aws.address = 192.168.1.2:1883
## Bridged Protocol Version
## Enumeration value: mqttv3 | mqttv4 | mqttv5
bridge.mqtt.aws.proto_ver = mqttv4
## mqtt client's clientid
bridge.mqtt.aws.clientid = bridge_emq
## mqtt client's clean_start field
## Note: Some MQTT Brokers need to set the clean_start value as `true`
bridge.mqtt.aws.clean_start = true
## mqtt client's username field
bridge.mqtt.aws.username = user
## mqtt client's password field
bridge.mqtt.aws.password = passwd
## Whether the mqtt client uses ssl to connect to a remote serve or not
bridge.mqtt.aws.ssl = off
## CA Certificate of Client SSL Connection (PEM format)
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
## SSL certificate of Client SSL connection
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
## Key file of Client SSL connection
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
## SSL encryption
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TTLS PSK password
## Note 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot be configured at the same time
##
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Client's heartbeat interval
bridge.mqtt.aws.keepalive = 60s
## Supported TLS version
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
## Bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
## Subscription topic for Bridge
bridge.mqtt.aws.subscription.1.topic = cmd/topic1
## Subscription qos for Bridge
bridge.mqtt.aws.subscription.1.qos = 1
## Subscription topic for Bridge
bridge.mqtt.aws.subscription.2.topic = cmd/topic2
## Subscription qos for Bridge
bridge.mqtt.aws.subscription.2.qos = 1
## Bridge reconnection interval
## Default: 30s
bridge.mqtt.aws.reconnect_interval = 30s
## QoS1 message retransmission interval
bridge.mqtt.aws.retry_interval = 20s
## Inflight Size.
bridge.mqtt.aws.max_inflight_batches = 32
## emqx_bridge internal number of messages used for batch
bridge.mqtt.aws.queue.batch_count_limit = 32
## emqx_bridge internal number of message bytes used for batch
bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB
## The path for placing replayq queue. If the item is not specified in the configuration, then replayq will run in `mem-only` mode and messages will not be cached on disk.
bridge.mqtt.aws.queue.replayq_dir = data/emqx_emqx2_bridge/
## Replayq data segment size
bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB
emqx_delayed_publish provides the function to delay publishing messages. When the client posts a message to EMQ X using the special topic prefix $delayed/<seconds>/
, EMQ X will publish this message after <seconds> seconds.
emqx_coap provides support for the CoAP protocol (RFC 7252)。
etc/plugins/emqx_coap.conf:
coap.port = 5683
coap.keepalive = 120s
coap.enable_stats = off
DTLS can be enabled if the following two configuration items are set:
## Listen port for DTLS
coap.dtls.port = 5684
coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
## DTLS options
## coap.dtls.verify = verify_peer
## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## coap.dtls.fail_if_no_peer_cert = false
A CoAP client is necessary to test CoAP plugin. In following example the libcoap is used.
yum install libcoap
% coap client publish message
coap-client -m put -e "qos=0&retain=0&message=payload&topic=hello" coap://localhost/mqtt
emqx_lwm2m provides support for the LwM2M protocol.
etc/plugins/emqx_lwm2m.conf:
## LwM2M listening port
lwm2m.port = 5683
## Lifetime Limit
lwm2m.lifetime_min = 1s
lwm2m.lifetime_max = 86400s
## `time window` length under Q Mode Mode, in seconds.
## Messages that exceed the window will be cached
#lwm2m.qmode_time_window = 22
## Whether LwM2M is deployed after coaproxy
#lwm2m.lb = coaproxy
## Actively observe all objects after the device goes online
#lwm2m.auto_observe = off
## the subscribed topic from EMQ X after client register succeeded
## Placeholder:
## '%e': Endpoint Name
## '%a': IP Address
lwm2m.topics.command = lwm2m/%e/dn/#
## client response message to EMQ X topic
lwm2m.topics.response = lwm2m/%e/up/resp
## client notify message to EMQ X topic
lwm2m.topics.notify = lwm2m/%e/up/notify
## client register message to EMQ X topic
lwm2m.topics.register = lwm2m/%e/up/resp
# client update message to EMQ X topic
lwm2m.topics.update = lwm2m/%e/up/resp
# xml file location defined by object
lwm2m.xml_dir = etc/lwm2m_xml
DTLS support can be enabled with the following configuration:
# DTLS Certificate Configuration
lwm2m.certfile = etc/certs/cert.pem
lwm2m.keyfile = etc/certs/key.pem
emqx_sn provides support for the MQTT-SN protocol
etc/plugins/emqx_sn.conf:
mqtt.sn.port = 1884
emqx_stomp provides support for the Stomp protocol. Clients connect to EMQ X through Stomp 1.0/1.1/1.2 protocol, publish and subscribe to MQTT message.
Note
Stomp protocol port: 61613
etc/plugins/emqx_stomp.conf:
stomp.default_user.login = guest
stomp.default_user.passcode = guest
stomp.allow_anonymous = true
stomp.frame.max_headers = 10
stomp.frame.max_header_length = 1024
stomp.frame.max_body_length = 8192
stomp.listener = 61613
stomp.listener.acceptors = 4
stomp.listener.max_clients = 512
emqx_recon integrates the recon performance tuning library to view status information about the current system, for example:
./bin/emqx_ctl recon
recon memory #recon_alloc:memory/2
recon allocated #recon_alloc:memory(allocated_types, current|max)
recon bin_leak #recon:bin_leak(100)
recon node_stats #recon:node_stats(10, 1000)
recon remote_load Mod #recon:remote_load(Mod)
etc/plugins/emqx_recon.conf:
%% Garbage Collection: 10 minutes
recon.gc_interval = 600
emqx_reloader is used for code hot-upgrade during impelementation and debugging. After loading this plug-in, EMQ X updates the codes automatically according to the configuration interval.
A CLI command is also provided to force a module to reload:
./bin/emqx_ctl reload <Module>
Note
This plugin is not recommended for production environments.
etc/plugins/emqx_reloader.conf:
reloader.interval = 60
reloader.logfile = log/reloader.log
emqx_plugin_template is an EMQ X plugin template and provides no functionality by itself.
When developers need to customize a plugin, they can view this plugin's code and structure to deliver a standard EMQ X plugin faster. The plugin is actually a normal Erlang Application
with the configuration file: etc/${PluginName}.config
.
For creating a new plugin project please refer to the emqx_plugin_template .
.. NOTE:: The tag -emqx_plugin(?MODULE).
must be added to the <plugin name>_app.erl
file to indicate that this is a plugin for EMQ X.
A demo of authentication module - emqx_auth_demo.erl
-module(emqx_auth_demo).
-export([ init/1
, check/2
, description/0
]).
init(Opts) -> {ok, Opts}.
check(_Credentials = #{clientid := ClientId, username := Username, password := Password}, _State) ->
io:format("Auth Demo: clientId=~p, username=~p, password=~p~n", [ClientId, Username, Password]),
ok.
description() -> "Auth Demo Module".
A demo of access control module - emqx_acl_demo.erl
-module(emqx_acl_demo).
-include_lib("emqx/include/emqx.hrl").
%% ACL callbacks
-export([ init/1
, check_acl/5
, reload_acl/1
, description/0
]).
init(Opts) ->
{ok, Opts}.
check_acl({Credentials, PubSub, _NoMatchAction, Topic}, _State) ->
io:format("ACL Demo: ~p ~p ~p~n", [Credentials, PubSub, Topic]),
allow.
reload_acl(_State) ->
ok.
description() -> "ACL Demo Module".
Registration of authentication, access control module - emqx_plugin_template_app.erl
ok = emqx:hook('client.authenticate', fun emqx_auth_demo:check/2, []),
ok = emqx:hook('client.check_acl', fun emqx_acl_demo:check_acl/5, []).
Events of client's online and offline, topic subscription, message sending and receiving can be handled through hooks.
emqx_plugin_template.erl:
%% Called when the plugin application start
load(Env) ->
emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).
Available hooks description:
Hooks | Description |
---|---|
client.authenticate | connection authentication |
client.check_acl | ACL validation |
client.connected | client online |
client.disconnected | client disconnected |
client.subscribe | subscribe topic by client |
client.unsubscribe | unsubscribe topic by client |
session.created | session created |
session.resumed | session resumed |
session.subscribed | session after topic subscribed |
session.unsubscribed | session after topic unsubscribed |
session.terminated | session terminated |
message.publish | MQTT message publish |
message.deliver | MQTT message deliver |
message.acked | MQTT message acknowledged |
message.dropped | MQTT message dropped |
Demo module for extending command line - emqx_cli_demo.erl
-module(emqx_cli_demo).
-export([cmd/1]).
cmd(["arg1", "arg2"]) ->
emqx_cli:print("ok");
cmd(_) ->
emqx_cli:usage([{"cmd arg1 arg2", "cmd demo"}]).
Register command line module - emqx_plugin_template_app.erl
ok = emqx_ctl:register_command(cmd, {emqx_cli_demo, cmd}, []),
After the plugin is loaded,a new CLI command is added to ./bin/emqx_ctl
:
./bin/emqx_ctl cmd arg1 arg2
The plugin comes with a configuration file placed in etc/${plugin_name}.conf|config
. EMQ X supports two plugin configuration formats:
Erlang native configuration file format -
${plugin_name}.config
:[ {plugin_name, [ {key, value} ]} ].
sysctl's
k = v
universal forma -${plugin_name}.conf
:plugin_name.key = value
Note
k = v
format configuration requires the plugin developer to create a priv/plugin_name.schema
mapping file.
- clone emqx-rel project:
git clone https://github.com/emqx/emqx-rel.git
- Add dependency in rebar.config:
{deps,
[ {plugin_name, {git, "url_of_plugin", {tag, "tag_of_plugin"}}}
, ....
....
]
}
- The relx paragraph in rebar.config is added:
{relx,
[...
, ...
, {release, {emqx, git_describe},
[
{plugin_name, load},
]
}
]
}