Skip to content

Commit

Permalink
update to Julia v0.7/1.0, drop 0.6 support
Browse files Browse the repository at this point in the history
  • Loading branch information
iblislin committed Nov 14, 2018
1 parent 35fd094 commit 4dd5b06
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 81 deletions.
10 changes: 8 additions & 2 deletions src/MQTT.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
module MQTT

import Base: connect, ReentrantLock, lock, unlock
using Base.Threads, Base.Dates
import Base: ReentrantLock, lock, unlock
import Sockets: connect

using Base.Threads
using Dates
using Distributed
using Random
using Sockets

include("utils.jl")
include("client.jl")
Expand Down
53 changes: 27 additions & 26 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ struct Message
payload::Array{UInt8}

function Message(qos::QOS, topic::String, payload...)
return Message(false, convert(UInt8, qos), false, topic, payload...)
return Message(false, UInt8(qos), false, topic, payload...)
end

function Message(dup::Bool, qos::QOS, retain::Bool, topic::String, payload...)
return Message(dup, convert(UInt8, qos), retain, topic, payload...)
return Message(dup, UInt8(qos), retain, topic, payload...)
end

function Message(dup::Bool, qos::UInt8, retain::Bool, topic::String, payload...)
Expand Down Expand Up @@ -154,7 +154,7 @@ function handle_publish(client::Client, s::IO, cmd::UInt8, flags::UInt8)
end

payload = take!(s)
@schedule client.on_msg(topic, payload)
@async client.on_msg(topic, payload)
end

function handle_ack(client::Client, s::IO, cmd::UInt8, flags::UInt8)
Expand Down Expand Up @@ -274,7 +274,7 @@ function keep_alive_loop(client::Client)
else
check_interval = client.keep_alive / 2
end
timer = Timer(0, check_interval)
timer = Timer(0, interval = check_interval)

while true
if time() - client.last_sent[] >= client.keep_alive || time() - client.last_received[] >= client.keep_alive
Expand Down Expand Up @@ -339,14 +339,14 @@ end
will::Message=Message(false, 0x00, false, "", Array{UInt8}()),
clean_session::Bool=true)
Connects the `Client` instance to the specified broker.
Connects the `Client` instance to the specified broker.
Returns a `Future` object that contains a session_present bit from the broker on success and an exception on failure.
# Arguments
- `keep_alive::Int64=0`: Time in seconds to wait before sending a ping to the broker if no other packets are being sent or received.
- `client_id::String=randstring(8)`: The id of the client.
- `user::User=User("", "")`: The MQTT authentication.
- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects.
- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects.
- `clean_session::Bool=true`: Flag to resume a session with the broker if present.
"""
function connect_async(client::Client, host::AbstractString, port::Integer=1883;
Expand All @@ -363,11 +363,11 @@ function connect_async(client::Client, host::AbstractString, port::Integer=1883;
error("Could not convert keep_alive to UInt16")
end
client.socket = connect(host, port)
@schedule write_loop(client)
@schedule read_loop(client)
@async write_loop(client)
@async read_loop(client)

if client.keep_alive > 0x0000
@schedule keep_alive_loop(client)
@async keep_alive_loop(client)
end

#TODO reset client on clean_session = true
Expand Down Expand Up @@ -415,22 +415,24 @@ end
will::Message=Message(false, 0x00, false, "", Array{UInt8}()),
clean_session::Bool=true)
Connects the `Client` instance to the specified broker.
Connects the `Client` instance to the specified broker.
Waits until the connect is done. Returns the session_present bit from the broker on success and an exception on failure.
# Arguments
- `keep_alive::Int64=0`: Time in seconds to wait before sending a ping to the broker if no other packets are being sent or received.
- `client_id::String=randstring(8)`: The id of the client.
- `user::User=User("", "")`: The MQTT authentication.
- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects.
- `will::Message=Message(false, 0x00, false, "", Array{UInt8}())`: The MQTT will to send to all other clients when this client disconnects.
- `clean_session::Bool=true`: Flag to resume a session with the broker if present.
"""
connect(client::Client, host::AbstractString, port::Integer=1883;
keep_alive::Int64=0,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", Array{UInt8}()),
clean_session::Bool=true) = get(connect_async(client, host, port, keep_alive=keep_alive, client_id=client_id, user=user, will=will, clean_session=clean_session))
keep_alive::Int64=0,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", Array{UInt8}(undef)),
clean_session::Bool=true) =
get(connect_async(client, host, port, keep_alive=keep_alive, client_id=client_id,
user=user, will=will, clean_session=clean_session))

"""
disconnect(client::Client)
Expand Down Expand Up @@ -466,7 +468,7 @@ end
"""
subscribe(client::Client, topics::Tuple{String, QOS}...)
Waits until the subscribe is fully acknowledged. Returns the actually received QOS levels for each topic on success.
Waits until the subscribe is fully acknowledged. Returns the actually received QOS levels for each topic on success.
Contains an exception on failure.
"""
subscribe(client::Client, topics::Tuple{String, QOS}...) = get(subscribe_async(client, topics...))
Expand All @@ -475,7 +477,7 @@ subscribe(client::Client, topics::Tuple{String, QOS}...) = get(subscribe_async(c
unsubscribe_async(client::Client, topics::String...)
Unsubscribes the `Client` instance from the supplied topic names.
Returns a `Future` object that contains `nothing` on success and an exception on failure.
Returns a `Future` object that contains `nothing` on success and an exception on failure.
"""
function unsubscribe_async(client::Client, topics::String...)
future = Future()
Expand All @@ -487,7 +489,7 @@ function unsubscribe_async(client::Client, topics::String...)
end

"""
unsubscribe(client::Client, topics::String...)
unsubscribe(client::Client, topics::String...)
Unsubscribes the `Client` instance from the supplied topic names.
Waits until the unsubscribe is fully acknowledged. Returns `nothing` on success and an exception on failure.
Expand All @@ -497,7 +499,7 @@ unsubscribe(client::Client, topics::String...) = get(unsubscribe_async(client, t
"""
publish_async(client::Client, message::Message)
Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure.
Publishes the message. Returns a `Future` object that contains `nothing` on success and an exception on failure.
"""
function publish_async(client::Client, message::Message)
future = Future()
Expand All @@ -524,12 +526,11 @@ end
qos::QOS=QOS_0,
retain::Bool=false)
Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure.
Pulishes a message with the specified parameters. Returns a `Future` object that contains `nothing` on success and an exception on failure.
"""
publish_async(client::Client, topic::String, payload...;
dup::Bool=false,
qos::QOS=QOS_0,
retain::Bool=false) = publish_async(client, Message(dup, convert(UInt8, qos), retain, topic, payload...))
dup::Bool=false, qos::QOS=QOS_0, retain::Bool=false) =
publish_async(client, Message(dup, UInt8(qos), retain, topic, payload...))

"""
publish(client::Client, topic::String, payload...;
Expand All @@ -547,7 +548,7 @@ publish(client::Client, topic::String, payload...;
# Helper method to check if it is possible to subscribe to a topic
function filter_wildcard_len_check(sub)
#Regex: matches any valid topic, + and # are not in allowed in strings, + is only allowed as a single symbol between two /, # is only allowed at the end
if !(ismatch(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535
if !(occursin(r"(^[^#+]+|[+])(/([^#+]+|[+]))*(/#)?$", sub)) || length(sub) > 65535
throw(MQTTException("Invalid topic"))
end
end
Expand All @@ -557,7 +558,7 @@ function topic_wildcard_len_check(topic)
# Search for + or # in a topic. Return MQTT_ERR_INVAL if found.
# Also returns MQTT_ERR_INVAL if the topic string is too long.
# Returns MQTT_ERR_SUCCESS if everything is fine.
if !(ismatch(r"^[^#+]+$", topic)) || length(topic) > 65535
if !(occursin(r"^[^#+]+$", topic)) || length(topic) > 65535
throw(MQTTException("Invalid topic"))
end
end
2 changes: 1 addition & 1 deletion test/mocksocket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ end

function connect(host::AbstractString, port::Integer)
th = TestFileHandler(Channel{UInt8}(256), Channel{UInt8}(256), Condition(), Condition(), false)
put_from_file(th, "data/input/connack.dat")
put_from_file(th, joinpath(datadir, "input", "connack.dat"))
return th
end
87 changes: 45 additions & 42 deletions test/packet.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
info("Running packet tests")
@info "Running packet tests"

function on_msg(topic, payload)
info("Received message topic: [", topic, "] payload: [", String(payload), "]")
@info "Received message topic: [", topic, "] payload: [", String(copy(payload)), "]"
@test topic == "abc"
@test String(payload)== "qwerty"
@test String(copy(payload)) == "qwerty"
end

function is_out_correct(filename_expected::AbstractString, actual::Channel{UInt8}, mid::UInt16)
Expand Down Expand Up @@ -51,79 +51,82 @@ function test()
client = Client(on_msg)
last_id::UInt16 = 0x0001

info("Testing connect")
inputdir = joinpath(datadir, "input")
outputdir = joinpath(datadir, "output")

@info "Testing connect"
connect(client, "test.mosquitto.org", client_id="TestID")
tfh::TestFileHandler = client.socket
@test is_out_correct("data/output/connect.dat", tfh.out_channel)
@test is_out_correct(joinpath(outputdir, "connect.dat"), tfh.out_channel)
# CONNACK is automatically being sent in connect call

info("Testing subscribe")
@info "Testing subscribe"
subscribe_async(client, ("abc", QOS_1), ("cba", QOS_0))
put_from_file(tfh, "data/input/suback.dat", client.last_id)
@test is_out_correct("data/output/subreq.dat", tfh.out_channel, client.last_id)
put_from_file(tfh, joinpath(inputdir, "suback.dat"), client.last_id)
@test is_out_correct(joinpath(outputdir, "subreq.dat"), tfh.out_channel, client.last_id)

info("Testing unsubscribe")
@info "Testing unsubscribe"
unsubscribe_async(client, "abc", "cba")
put_from_file(tfh, "data/input/unsuback.dat", client.last_id)
@test is_out_correct("data/output/unsubreq.dat", tfh.out_channel, client.last_id)
put_from_file(tfh, joinpath(inputdir, "unsuback.dat"), client.last_id)
@test is_out_correct(joinpath(outputdir, "unsubreq.dat"), tfh.out_channel, client.last_id)

info("Testing receive publish QOS 0")
put_from_file(tfh, "data/input/qos0pub.dat")
@info "Testing receive publish QOS 0"
put_from_file(tfh, joinpath(inputdir, "qos0pub.dat"))

info("Testing receive publish QOS 1")
put_from_file(tfh, "data/input/qos1pub.dat", last_id)
@test is_out_correct("data/output/puback.dat", tfh.out_channel, last_id)
@info "Testing receive publish QOS 1"
put_from_file(tfh, joinpath(inputdir, "qos1pub.dat"), last_id)
@test is_out_correct(joinpath(outputdir, "puback.dat"), tfh.out_channel, last_id)
#last_id += 1

info("Testing receive publish QOS 2")
put_from_file(tfh, "data/input/qos2pub.dat", last_id)
@test is_out_correct("data/output/pubrec.dat", tfh.out_channel, last_id)
put_from_file(tfh, "data/input/pubrel.dat", last_id)
@test is_out_correct("data/output/pubcomp.dat", tfh.out_channel, last_id)
@info "Testing receive publish QOS 2"
put_from_file(tfh, joinpath(inputdir, "qos2pub.dat"), last_id)
@test is_out_correct(joinpath(outputdir, "pubrec.dat"), tfh.out_channel, last_id)
put_from_file(tfh, joinpath(inputdir, "pubrel.dat"), last_id)
@test is_out_correct(joinpath(outputdir, "pubcomp.dat"), tfh.out_channel, last_id)
#last_id += 1

info("Testing send publish QOS 0")
@info "Testing send publish QOS 0"
publish_async(client, "test1", "QOS_0", qos=QOS_0)
@test is_out_correct("data/output/qos0pub.dat", tfh.out_channel)
@test is_out_correct(joinpath(outputdir, "qos0pub.dat"), tfh.out_channel)

info("Testing send publish QOS 1")
@info "Testing send publish QOS 1"
publish_async(client, "test2", "QOS_1", qos=QOS_1)
put_from_file(tfh, "data/input/puback.dat", client.last_id)
@test is_out_correct("data/output/qos1pub.dat", tfh.out_channel, client.last_id)
put_from_file(tfh, joinpath(inputdir, "puback.dat"), client.last_id)
@test is_out_correct(joinpath(outputdir, "qos1pub.dat"), tfh.out_channel, client.last_id)


info("Testing send publish QOS 2")
@info "Testing send publish QOS 2"
publish_async(client, "test3", "test", qos=QOS_2)
@test is_out_correct("data/output/qos2pub.dat", tfh.out_channel, client.last_id)
put_from_file(tfh, "data/input/pubrec.dat", client.last_id)
@test is_out_correct("data/output/pubrel.dat", tfh.out_channel, client.last_id)
put_from_file(tfh, "data/input/pubcomp.dat", client.last_id)
@test is_out_correct(joinpath(outputdir, "qos2pub.dat"), tfh.out_channel, client.last_id)
put_from_file(tfh, joinpath(inputdir, "pubrec.dat"), client.last_id)
@test is_out_correct(joinpath(outputdir, "pubrel.dat"), tfh.out_channel, client.last_id)
put_from_file(tfh, joinpath(inputdir, "pubcomp.dat"), client.last_id)

info("Testing disconnect")
@info "Testing disconnect"
disconnect(client)
@test is_out_correct("data/output/disco.dat", tfh.out_channel)
@test is_out_correct(joinpath(outputdir, "disco.dat"), tfh.out_channel)

#This has to be in it's own connect flow to not interfere with other messages
info("Testing keep alive with response")
@info "Testing keep alive with response"
client = Client(on_msg)

client.ping_timeout = 1
connect(client, "test.mosquitto.org", client_id="TestID", keep_alive=1)
tfh = client.socket
@test is_out_correct("data/output/connect_keep_alive1s.dat", tfh.out_channel) # Consume output
@test is_out_correct("data/output/pingreq.dat", tfh.out_channel)
put_from_file(tfh, "data/input/pingresp.dat")
@test is_out_correct(joinpath(outputdir, "connect_keep_alive1s.dat"), tfh.out_channel) # Consume output
@test is_out_correct(joinpath(outputdir, "pingreq.dat"), tfh.out_channel)
put_from_file(tfh, joinpath(inputdir, "pingresp.dat"))

info("Testing keep alive without response")
@info "Testing keep alive without response"
sleep(1.1)
@test is_out_correct("data/output/pingreq.dat", tfh.out_channel)
@test is_out_correct("data/output/disco.dat", tfh.out_channel)
@test is_out_correct(joinpath(outputdir, "pingreq.dat"), tfh.out_channel)
@test is_out_correct(joinpath(outputdir, "disco.dat"), tfh.out_channel)

info("Testing unwanted pingresp")
@info "Testing unwanted pingresp"
client = Client(on_msg)
connect(client, "test.mosquitto.org", client_id="TestID", keep_alive=15)
tfh = client.socket
put_from_file(tfh, "data/input/pingresp.dat")
put_from_file(tfh, joinpath(inputdir, "pingresp.dat"))
sleep(0.1)
@test tfh.closed
end
Expand Down
8 changes: 6 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using Base.Test, MQTT
using Test
using MQTT
using Random

import Base: connect, read, write, close
import Base: read, write, close
import MQTT: read_len, Message
import Sockets: connect

const datadir = joinpath(dirname(@__FILE__), "data")

include("smoke.jl")
include("mocksocket.jl")
Expand Down
16 changes: 8 additions & 8 deletions test/smoke.jl
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
import MQTT.User

info("Running smoke tests")
@info "Running smoke tests"

condition = Condition()
topic = "foo"
payload = randstring(20)

function on_msg(t, p)
info("Received message topic: [", t, "] payload: [", String(p), "]")
@info "Received message topic: [", t, "] payload: [", String(copy(p)), "]"
@test t == topic
@test String(p)== payload
@test String(copy(p))== payload

notify(condition)
end

client = Client(on_msg)

info("Testing reconnect")
@info "Testing reconnect"
connect(client, "test.mosquitto.org")
disconnect(client)
connect(client, "test.mosquitto.org")

subscribe(client, (topic, QOS_0))

info("Testing publish qos 0")
@info "Testing publish qos 0"
publish(client, topic, payload, qos=QOS_0)
wait(condition)

info("Testing publish qos 1")
@info "Testing publish qos 1"
publish(client, topic, payload, qos=QOS_1)
wait(condition)

info("Testing publish qos 2")
@info "Testing publish qos 2"
publish(client, topic, payload, qos=QOS_2)
wait(condition)

info("Testing connect will")
@info "Testing connect will"
disconnect(client)
connect(client, "test.mosquitto.org", will=Message(false, 0x00, false, topic, payload))

Expand Down

0 comments on commit 4dd5b06

Please sign in to comment.