diff --git a/src/MQTT.jl b/src/MQTT.jl index 0c87a8a..5535be6 100644 --- a/src/MQTT.jl +++ b/src/MQTT.jl @@ -1,7 +1,13 @@ module MQTT -import Base: connect, ReentrantLock, lock, unlock -using Base.Threads, Base.Dates +import Base: ReentrantLock, lock, unlock +import Sockets: connect + +using Base.Threads +using Dates +using Distributed +using Random +using Sockets include("utils.jl") include("client.jl") diff --git a/src/client.jl b/src/client.jl index f268073..a367479 100644 --- a/src/client.jl +++ b/src/client.jl @@ -44,11 +44,11 @@ struct Message payload::Array{UInt8} function Message(qos::QOS, topic::String, payload...) - return Message(false, convert(UInt8, qos), false, topic, payload...) + return Message(false, UInt8(qos), false, topic, payload...) end function Message(dup::Bool, qos::QOS, retain::Bool, topic::String, payload...) - return Message(dup, convert(UInt8, qos), retain, topic, payload...) + return Message(dup, UInt8(qos), retain, topic, payload...) end function Message(dup::Bool, qos::UInt8, retain::Bool, topic::String, payload...) @@ -154,7 +154,7 @@ function handle_publish(client::Client, s::IO, cmd::UInt8, flags::UInt8) end payload = take!(s) - @schedule client.on_msg(topic, payload) + @async client.on_msg(topic, payload) end function handle_ack(client::Client, s::IO, cmd::UInt8, flags::UInt8) @@ -274,7 +274,7 @@ function keep_alive_loop(client::Client) else check_interval = client.keep_alive / 2 end - timer = Timer(0, check_interval) + timer = Timer(0, interval = check_interval) while true if time() - client.last_sent[] >= client.keep_alive || time() - client.last_received[] >= client.keep_alive @@ -339,14 +339,14 @@ end will::Message=Message(false, 0x00, false, "", Array{UInt8}()), clean_session::Bool=true) -Connects the `Client` instance to the specified broker. +Connects the `Client` instance to the specified broker. Returns a `Future` object that contains a session_present bit from the broker on success and an exception on failure. # Arguments - `keep_alive::Int64=0`: Time in seconds to wait before sending a ping to the broker if no other packets are being sent or received. - `client_id::String=randstring(8)`: The id of the client. - `user::User=User("", "")`: The MQTT authentication. -- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects. +- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects. - `clean_session::Bool=true`: Flag to resume a session with the broker if present. """ function connect_async(client::Client, host::AbstractString, port::Integer=1883; @@ -363,11 +363,11 @@ function connect_async(client::Client, host::AbstractString, port::Integer=1883; error("Could not convert keep_alive to UInt16") end client.socket = connect(host, port) - @schedule write_loop(client) - @schedule read_loop(client) + @async write_loop(client) + @async read_loop(client) if client.keep_alive > 0x0000 - @schedule keep_alive_loop(client) + @async keep_alive_loop(client) end #TODO reset client on clean_session = true @@ -415,22 +415,24 @@ end will::Message=Message(false, 0x00, false, "", Array{UInt8}()), clean_session::Bool=true) -Connects the `Client` instance to the specified broker. +Connects the `Client` instance to the specified broker. Waits until the connect is done. Returns the session_present bit from the broker on success and an exception on failure. # Arguments - `keep_alive::Int64=0`: Time in seconds to wait before sending a ping to the broker if no other packets are being sent or received. - `client_id::String=randstring(8)`: The id of the client. - `user::User=User("", "")`: The MQTT authentication. -- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects. +- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects. - `clean_session::Bool=true`: Flag to resume a session with the broker if present. """ connect(client::Client, host::AbstractString, port::Integer=1883; -keep_alive::Int64=0, -client_id::String=randstring(8), -user::User=User("", ""), -will::Message=Message(false, 0x00, false, "", Array{UInt8}()), -clean_session::Bool=true) = get(connect_async(client, host, port, keep_alive=keep_alive, client_id=client_id, user=user, will=will, clean_session=clean_session)) + keep_alive::Int64=0, + client_id::String=randstring(8), + user::User=User("", ""), + will::Message=Message(false, 0x00, false, "", Array{UInt8}(undef)), + clean_session::Bool=true) = + get(connect_async(client, host, port, keep_alive=keep_alive, client_id=client_id, + user=user, will=will, clean_session=clean_session)) """ disconnect(client::Client) @@ -466,7 +468,7 @@ end """ subscribe(client::Client, topics::Tuple{String, QOS}...) -Waits until the subscribe is fully acknowledged. Returns the actually received QOS levels for each topic on success. +Waits until the subscribe is fully acknowledged. Returns the actually received QOS levels for each topic on success. Contains an exception on failure. """ subscribe(client::Client, topics::Tuple{String, QOS}...) = get(subscribe_async(client, topics...)) @@ -475,7 +477,7 @@ subscribe(client::Client, topics::Tuple{String, QOS}...) = get(subscribe_async(c unsubscribe_async(client::Client, topics::String...) Unsubscribes the `Client` instance from the supplied topic names. -Returns a `Future` object that contains `nothing` on success and an exception on failure. +Returns a `Future` object that contains `nothing` on success and an exception on failure. """ function unsubscribe_async(client::Client, topics::String...) future = Future() @@ -487,7 +489,7 @@ function unsubscribe_async(client::Client, topics::String...) end """ - unsubscribe(client::Client, topics::String...) + unsubscribe(client::Client, topics::String...) Unsubscribes the `Client` instance from the supplied topic names. Waits until the unsubscribe is fully acknowledged. Returns `nothing` on success and an exception on failure. @@ -497,7 +499,7 @@ unsubscribe(client::Client, topics::String...) = get(unsubscribe_async(client, t """ publish_async(client::Client, message::Message) -Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure. +Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure. """ function publish_async(client::Client, message::Message) future = Future() @@ -524,12 +526,11 @@ end qos::QOS=QOS_0, retain::Bool=false) -Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure. +Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure. """ publish_async(client::Client, topic::String, payload...; - dup::Bool=false, - qos::QOS=QOS_0, - retain::Bool=false) = publish_async(client, Message(dup, convert(UInt8, qos), retain, topic, payload...)) + dup::Bool=false, qos::QOS=QOS_0, retain::Bool=false) = + publish_async(client, Message(dup, UInt8(qos), retain, topic, payload...)) """ publish(client::Client, topic::String, payload...; @@ -547,7 +548,7 @@ publish(client::Client, topic::String, payload...; # Helper method to check if it is possible to subscribe to a topic function filter_wildcard_len_check(sub) #Regex: matches any valid topic, + and # are not in allowed in strings, + is only allowed as a single symbol between two /, # is only allowed at the end - if !(ismatch(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535 + if !(occursin(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535 throw(MQTTException("Invalid topic")) end end @@ -557,7 +558,7 @@ function topic_wildcard_len_check(topic) # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. # Also returns MQTT_ERR_INVAL if the topic string is too long. # Returns MQTT_ERR_SUCCESS if everything is fine. - if !(ismatch(r"^[^#+]+$", topic)) || length(topic) > 65535 + if !(occursin(r"^[^#+]+$", topic)) || length(topic) > 65535 throw(MQTTException("Invalid topic")) end end diff --git a/test/mocksocket.jl b/test/mocksocket.jl index 76712b8..0b7fd4b 100644 --- a/test/mocksocket.jl +++ b/test/mocksocket.jl @@ -121,6 +121,6 @@ end function connect(host::AbstractString, port::Integer) th = TestFileHandler(Channel{UInt8}(256), Channel{UInt8}(256), Condition(), Condition(), false) - put_from_file(th, "data/input/connack.dat") + put_from_file(th, joinpath(datadir, "input", "connack.dat")) return th end diff --git a/test/packet.jl b/test/packet.jl index 8ba39a8..613c8b9 100644 --- a/test/packet.jl +++ b/test/packet.jl @@ -1,9 +1,9 @@ -info("Running packet tests") +@info "Running packet tests" function on_msg(topic, payload) - info("Received message topic: [", topic, "] payload: [", String(payload), "]") + @info "Received message topic: [", topic, "] payload: [", String(copy(payload)), "]" @test topic == "abc" - @test String(payload)== "qwerty" + @test String(copy(payload)) == "qwerty" end function is_out_correct(filename_expected::AbstractString, actual::Channel{UInt8}, mid::UInt16) @@ -51,79 +51,82 @@ function test() client = Client(on_msg) last_id::UInt16 = 0x0001 - info("Testing connect") + inputdir = joinpath(datadir, "input") + outputdir = joinpath(datadir, "output") + + @info "Testing connect" connect(client, "test.mosquitto.org", client_id="TestID") tfh::TestFileHandler = client.socket - @test is_out_correct("data/output/connect.dat", tfh.out_channel) + @test is_out_correct(joinpath(outputdir, "connect.dat"), tfh.out_channel) # CONNACK is automatically being sent in connect call - info("Testing subscribe") + @info "Testing subscribe" subscribe_async(client, ("abc", QOS_1), ("cba", QOS_0)) - put_from_file(tfh, "data/input/suback.dat", client.last_id) - @test is_out_correct("data/output/subreq.dat", tfh.out_channel, client.last_id) + put_from_file(tfh, joinpath(inputdir, "suback.dat"), client.last_id) + @test is_out_correct(joinpath(outputdir, "subreq.dat"), tfh.out_channel, client.last_id) - info("Testing unsubscribe") + @info "Testing unsubscribe" unsubscribe_async(client, "abc", "cba") - put_from_file(tfh, "data/input/unsuback.dat", client.last_id) - @test is_out_correct("data/output/unsubreq.dat", tfh.out_channel, client.last_id) + put_from_file(tfh, joinpath(inputdir, "unsuback.dat"), client.last_id) + @test is_out_correct(joinpath(outputdir, "unsubreq.dat"), tfh.out_channel, client.last_id) - info("Testing receive publish QOS 0") - put_from_file(tfh, "data/input/qos0pub.dat") + @info "Testing receive publish QOS 0" + put_from_file(tfh, joinpath(inputdir, "qos0pub.dat")) - info("Testing receive publish QOS 1") - put_from_file(tfh, "data/input/qos1pub.dat", last_id) - @test is_out_correct("data/output/puback.dat", tfh.out_channel, last_id) + @info "Testing receive publish QOS 1" + put_from_file(tfh, joinpath(inputdir, "qos1pub.dat"), last_id) + @test is_out_correct(joinpath(outputdir, "puback.dat"), tfh.out_channel, last_id) #last_id += 1 - info("Testing receive publish QOS 2") - put_from_file(tfh, "data/input/qos2pub.dat", last_id) - @test is_out_correct("data/output/pubrec.dat", tfh.out_channel, last_id) - put_from_file(tfh, "data/input/pubrel.dat", last_id) - @test is_out_correct("data/output/pubcomp.dat", tfh.out_channel, last_id) + @info "Testing receive publish QOS 2" + put_from_file(tfh, joinpath(inputdir, "qos2pub.dat"), last_id) + @test is_out_correct(joinpath(outputdir, "pubrec.dat"), tfh.out_channel, last_id) + put_from_file(tfh, joinpath(inputdir, "pubrel.dat"), last_id) + @test is_out_correct(joinpath(outputdir, "pubcomp.dat"), tfh.out_channel, last_id) #last_id += 1 - info("Testing send publish QOS 0") + @info "Testing send publish QOS 0" publish_async(client, "test1", "QOS_0", qos=QOS_0) - @test is_out_correct("data/output/qos0pub.dat", tfh.out_channel) + @test is_out_correct(joinpath(outputdir, "qos0pub.dat"), tfh.out_channel) - info("Testing send publish QOS 1") + @info "Testing send publish QOS 1" publish_async(client, "test2", "QOS_1", qos=QOS_1) - put_from_file(tfh, "data/input/puback.dat", client.last_id) - @test is_out_correct("data/output/qos1pub.dat", tfh.out_channel, client.last_id) + put_from_file(tfh, joinpath(inputdir, "puback.dat"), client.last_id) + @test is_out_correct(joinpath(outputdir, "qos1pub.dat"), tfh.out_channel, client.last_id) - info("Testing send publish QOS 2") + @info "Testing send publish QOS 2" publish_async(client, "test3", "test", qos=QOS_2) - @test is_out_correct("data/output/qos2pub.dat", tfh.out_channel, client.last_id) - put_from_file(tfh, "data/input/pubrec.dat", client.last_id) - @test is_out_correct("data/output/pubrel.dat", tfh.out_channel, client.last_id) - put_from_file(tfh, "data/input/pubcomp.dat", client.last_id) + @test is_out_correct(joinpath(outputdir, "qos2pub.dat"), tfh.out_channel, client.last_id) + put_from_file(tfh, joinpath(inputdir, "pubrec.dat"), client.last_id) + @test is_out_correct(joinpath(outputdir, "pubrel.dat"), tfh.out_channel, client.last_id) + put_from_file(tfh, joinpath(inputdir, "pubcomp.dat"), client.last_id) - info("Testing disconnect") + @info "Testing disconnect" disconnect(client) - @test is_out_correct("data/output/disco.dat", tfh.out_channel) + @test is_out_correct(joinpath(outputdir, "disco.dat"), tfh.out_channel) #This has to be in it's own connect flow to not interfere with other messages - info("Testing keep alive with response") + @info "Testing keep alive with response" client = Client(on_msg) client.ping_timeout = 1 connect(client, "test.mosquitto.org", client_id="TestID", keep_alive=1) tfh = client.socket - @test is_out_correct("data/output/connect_keep_alive1s.dat", tfh.out_channel) # Consume output - @test is_out_correct("data/output/pingreq.dat", tfh.out_channel) - put_from_file(tfh, "data/input/pingresp.dat") + @test is_out_correct(joinpath(outputdir, "connect_keep_alive1s.dat"), tfh.out_channel) # Consume output + @test is_out_correct(joinpath(outputdir, "pingreq.dat"), tfh.out_channel) + put_from_file(tfh, joinpath(inputdir, "pingresp.dat")) - info("Testing keep alive without response") + @info "Testing keep alive without response" sleep(1.1) - @test is_out_correct("data/output/pingreq.dat", tfh.out_channel) - @test is_out_correct("data/output/disco.dat", tfh.out_channel) + @test is_out_correct(joinpath(outputdir, "pingreq.dat"), tfh.out_channel) + @test is_out_correct(joinpath(outputdir, "disco.dat"), tfh.out_channel) - info("Testing unwanted pingresp") + @info "Testing unwanted pingresp" client = Client(on_msg) connect(client, "test.mosquitto.org", client_id="TestID", keep_alive=15) tfh = client.socket - put_from_file(tfh, "data/input/pingresp.dat") + put_from_file(tfh, joinpath(inputdir, "pingresp.dat")) sleep(0.1) @test tfh.closed end diff --git a/test/runtests.jl b/test/runtests.jl index 7c1d90f..b55cb58 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,8 +1,12 @@ -using Base.Test, MQTT +using Test +using MQTT +using Random -import Base: connect, read, write, close +import Base: read, write, close import MQTT: read_len, Message +import Sockets: connect +const datadir = joinpath(dirname(@__FILE__), "data") include("smoke.jl") include("mocksocket.jl") diff --git a/test/smoke.jl b/test/smoke.jl index cca219d..43b72f0 100644 --- a/test/smoke.jl +++ b/test/smoke.jl @@ -1,41 +1,41 @@ import MQTT.User -info("Running smoke tests") +@info "Running smoke tests" condition = Condition() topic = "foo" payload = randstring(20) function on_msg(t, p) - info("Received message topic: [", t, "] payload: [", String(p), "]") + @info "Received message topic: [", t, "] payload: [", String(copy(p)), "]" @test t == topic - @test String(p)== payload + @test String(copy(p))== payload notify(condition) end client = Client(on_msg) -info("Testing reconnect") +@info "Testing reconnect" connect(client, "test.mosquitto.org") disconnect(client) connect(client, "test.mosquitto.org") subscribe(client, (topic, QOS_0)) -info("Testing publish qos 0") +@info "Testing publish qos 0" publish(client, topic, payload, qos=QOS_0) wait(condition) -info("Testing publish qos 1") +@info "Testing publish qos 1" publish(client, topic, payload, qos=QOS_1) wait(condition) -info("Testing publish qos 2") +@info "Testing publish qos 2" publish(client, topic, payload, qos=QOS_2) wait(condition) -info("Testing connect will") +@info "Testing connect will" disconnect(client) connect(client, "test.mosquitto.org", will=Message(false, 0x00, false, topic, payload))